This commit is contained in:
Boki 2026-03-26 16:32:37 -04:00
parent deeb934526
commit eeed957fe1
5 changed files with 490 additions and 196 deletions

View file

@ -1,5 +1,6 @@
import { import {
BaseHandler, BaseHandler,
Disabled,
Handler, Handler,
Operation, Operation,
RateLimit, RateLimit,
@ -32,6 +33,7 @@ import { createEODOperationRegistry } from './shared';
* Operations can specify just a cost to use handler limits, or override with custom limits * Operations can specify just a cost to use handler limits, or override with custom limits
*/ */
@Handler('eod') @Handler('eod')
@Disabled()
@RateLimit({ @RateLimit({
limits: [ limits: [
{ points: 900, duration: 60 }, // 1000 points per minute { points: 900, duration: 60 }, // 1000 points per minute

View file

@ -1,5 +1,6 @@
import { import {
BaseHandler, BaseHandler,
Disabled,
Handler, Handler,
Operation, Operation,
ScheduledOperation, ScheduledOperation,
@ -7,6 +8,7 @@ import {
import { fetchExchanges, fetchExchangesAndSymbols, fetchSession, fetchSymbols } from './actions'; import { fetchExchanges, fetchExchangesAndSymbols, fetchSession, fetchSymbols } from './actions';
@Handler('ib') @Handler('ib')
@Disabled()
export class IbHandler extends BaseHandler { export class IbHandler extends BaseHandler {
constructor(services: any) { constructor(services: any) {
super(services); super(services);

View file

@ -103,6 +103,21 @@ export async function initializeAllHandlers(serviceContainer: IServiceContainer)
handlersWithSchedule: handlerRegistry.getAllHandlersWithSchedule().size, handlersWithSchedule: handlerRegistry.getAllHandlersWithSchedule().size,
}); });
// Initialize handlers that have onInit method
// We need to instantiate handlers and call their onInit
for (const HandlerClass of handlers) {
try {
// Create handler instance with service container
const handlerInstance = new HandlerClass(serviceContainer);
if (handlerInstance && typeof handlerInstance.onInit === 'function') {
const handlerName = (HandlerClass as any).__handlerName || HandlerClass.name;
logger.info(`Calling onInit for handler: ${handlerName}`);
await handlerInstance.onInit();
}
} catch (error) {
logger.error(`Failed to initialize handler ${HandlerClass.name}:`, error);
}
}
} }
} else { } else {
logger.error('Could not access DI container from service container'); logger.error('Could not access DI container from service container');

View file

@ -4,164 +4,352 @@ import { TE_CONFIG } from '../shared/config';
import type { TeHandler } from '../te.handler'; import type { TeHandler } from '../te.handler';
export async function spiderUrl(this: TeHandler, payload: { url: string }): Promise<string[] | null> { export async function spiderUrl(this: TeHandler, payload: { url: string }): Promise<string[] | null> {
const { logger, mongodb } = this; const { logger, mongodb, proxy, http } = this;
const reqUrl = payload && payload.url ? TE_CONFIG.MAIN_URL + payload.url : TE_CONFIG.MAIN_URL; const urlPath = payload?.url || '/';
this.logger.info(`Spiderring URL: ${reqUrl}`, {reqUrl}); const fullUrl = TE_CONFIG.MAIN_URL + urlPath;
// if( mongoRecord){ // 1. Check if already crawled recently (30 days)
// const url = mongoRecord.url; const record = await mongodb?.findOne('teUrls', { url: urlPath });
// if (shouldSkipUrl(url)) { if (record?.lastCrawled) {
// logger.info(`Skipping URL ${url} as its too deep`); const daysSinceCrawl = (Date.now() - record.lastCrawled.getTime()) / (1000 * 60 * 60 * 24 * 30);
// return null; if (daysSinceCrawl < 30) {
// } logger.info(`Skipping ${urlPath} - crawled ${(daysSinceCrawl * 30).toFixed(1)} days ago`);
// }
const mongoRecord = await mongodb?.findOne('teUrls', { url: payload?.url || '/' });
if(payload && payload.url && mongoRecord && mongoRecord.lastCrawled && mongoRecord.lastCrawled.getTime() > Date.now() - 30 * 24 * 60 * 60 * 1000) {
this.logger.info(`Skipping URL ${reqUrl} as it was already crawled in the last 24 hours`);
return null; // Skip if already crawled in the last 24 hours
}
if (!payload) {
const oneDayAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000);
const records = await mongodb?.find('teUrls', {
$or: [
{ lastCrawled: { $lt: oneDayAgo } }, // Crawled more than 24 hours ago
{ lastCrawled: { $exists: false } } // Never crawled
]
});
this.logger.info(`Found ${records?.length || 0} records to process`);
for (const record of records || []) {
const url = record.url;
if (shouldSkipUrl(url)) {
logger.info(`Skipping URL ${url} as its too deep`);
continue;
}
await this.scheduleOperation('te-spider', {
url: record.url,
}, {
jobId: `te-spider-${record.url}`,
priority: 5, // Lower priority than financial data
});
}
}
try {
// 1. Fetch the HTML page
const reqInfo = {
proxy: 'http://5.79.66.2:13010',//this.proxy.getProxy(),
headers: {
'User-Agent': getRandomUserAgent(),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate, br',
},
}
const response = await fetch(reqUrl, reqInfo);
logger.debug('Response status:', {
status: response.status,
statusText: response.statusText,
url: response.url
});
if (!response.ok) {
throw new Error(`Failed to fetch: ${response.status} ${response.statusText}`);
}
const html = await response.text();
let match = html.match(/TESymbol = '([^']+)'/);
const teSymbol = match ? match[1] : undefined;
match = html.match(/;TELastUpdate = '([^']+)'/);
const teLastUpdate = match ? match[1] : undefined;
match = html.match(/; var TEChartsDatasource = '([^']+)'/);
const teChartUrl = match ? match[1] : undefined;
match = html.match(/; var TEChartsToken = '([^']+)'/);
const teChartToken = match ? match[1] : undefined;
console.log(teSymbol, teLastUpdate, teChartUrl, teChartToken);
const $ = cheerio.load(html);
const urls: string[] = [];
$('.list-group-item, a[href^="/"]').each((_, element) => {
const $el = $(element);
let url: string | undefined;
if ($el.is('a')) {
const href = $el.attr('href');
if (href && href.startsWith('/') && !href.includes('.aspx')) {
url = href;
}
}
if (url && urls.indexOf(url) === -1) {
urls.push(url);
}
});
if (urls.length === 0) {
throw new Error('No urls found in HTML');
}
// 3. Save to MongoDB
try {
if (urls.length > 0) {
const urlMap: {url: string, lastCrawled?: Date, teSymbol? : string, teLastUpdate? : string, teChartUrl? : string, teChartToken? : string}[] = urls.map(url => ({url}));
if( payload && payload.url) {
urlMap.push({
url: payload.url,
lastCrawled: new Date(),
teSymbol,
teLastUpdate,
teChartUrl,
teChartToken,})
}else {
urlMap.push({url: '/', lastCrawled: new Date()})
}
const result = await mongodb?.batchUpsert('teUrls', urlMap, ['url']);
logger.info('TE URLs saved to MongoDB', {
matched: result.matchedCount,
modified: result.modifiedCount,
upserted: result.upsertedCount,
});
}
} catch (dbError) {
logger.error('Failed to save urls to MongoDB', { error: dbError });
throw dbError;
}
for (const url of urls) {
if (shouldSkipUrl(url)) {
logger.info(`Skipping URL ${url} as its too deep`);
continue; // Skip if it's a subpage or already crawled
}
this.scheduleOperation('te-spider', {
url: url,
}, {
jobId: `te-spider-${url}`,
priority: 5, // Lower priority than financial data
})
}
return urls;
} catch (error) {
logger.error(`Failed to fetch Trading Economics URLs ${reqUrl}`, {
error: error instanceof Error ? error.message : String(error),
stack: error instanceof Error ? error.stack : undefined,
});
return null; return null;
} }
} }
function shouldSkipUrl(url?: string): boolean { // 2. Fetch the page
// Skip if it's a subpage or already crawled in the last 24 hours logger.info(`Crawling ${fullUrl}`);
if (!url) { const html = await fetchPage.call(this, fullUrl);
if (!html) {
logger.error(`Failed to fetch ${fullUrl}`);
// Mark as crawled anyway to avoid retrying immediately
await mongodb?.updateOne(
'teUrls',
{ url: urlPath },
{
$set: {
lastCrawled: new Date(),
foundLinks: 0,
error: true
}
},
{ upsert: true }
);
return null;
}
// 3. Extract TE data from the HTML
let match = html.match(/var TESymbol = '([^']+)'/);
const teSymbol = match ? match[1] : undefined;
match = html.match(/TELastUpdate = '([^']+)'/);
const teLastUpdate = match ? match[1] : undefined;
match = html.match(/TEChart = '([^']+)'/);
const teChart = match ? match[1] : undefined;
match = html.match(/var TEAlertsName\s*=\s*'([^']+)'/);
const teAlertsName = match ? match[1] : undefined;
// Extract from script tags - these appear after TEChartUrl
match = html.match(/TEChartUrl = '([^']+)'/);
const teChartUrl = match ? match[1] : undefined;
match = html.match(/TECountry = '([^']+)'/);
const teCountry = match ? match[1] : undefined;
match = html.match(/TECategory = '([^']+)'/);
const teCategory = match ? match[1] : undefined;
match = html.match(/TEType = '([^']+)'/);
const teType = match ? match[1] : undefined;
match = html.match(/TEFrequency = '([^']+)'/);
const teFrequency = match ? match[1] : undefined;
// Extract array data
match = html.match(/TEForecast\s*=\s*\[([^\]]+)\]/);
const teForecast = match ? JSON.parse('[' + match[1] + ']') : undefined;
// Extract JSON metadata - use a more flexible regex
match = html.match(/TEChartsMeta = (\[[\s\S]*?\]);/);
const teChartsMeta = match ? (() => {
try {
return JSON.parse(match[1]);
} catch (e) {
logger.warn('Failed to parse TEChartsMeta:', e);
return undefined;
}
})() : undefined;
// Extract symbol data
match = html.match(/symbol = '([^']+)'/);
const symbol = match ? match[1] : undefined;
match = html.match(/symbolType = '([^']+)'/);
const symbolType = match ? match[1] : undefined;
// TEChartsDatasource is the CDN URL for chart data
match = html.match(/; var TEChartsDatasource = '([^']+)'/);
const teChartsDatasource = match ? match[1] : undefined;
match = html.match(/; var TEChartsToken = '([^']+)'/);
const teChartToken = match ? match[1] : undefined;
// Log if we found any TE data
const foundData = teSymbol || teLastUpdate || teChartUrl || teChartsDatasource ||
teChartToken || teChart || teAlertsName || teCountry ||
teCategory || teType || teFrequency || teForecast ||
teChartsMeta || symbol || symbolType;
if (foundData) {
logger.debug('Found TE data:', {
teSymbol,
teLastUpdate,
teChart,
teAlertsName,
teChartUrl,
teCountry,
teCategory,
teType,
teFrequency,
teForecast: teForecast ? `[${teForecast.length} values]` : undefined,
teChartsMeta: teChartsMeta ? `[${teChartsMeta.length} items]` : undefined,
symbol,
symbolType,
teChartsDatasource,
teChartToken
});
}
// 4. Extract all links
const $ = cheerio.load(html);
const foundUrls = new Set<string>();
// Add some seed URLs if this is the root page
if (urlPath === '/') {
const seedUrls = [
'/united-states', '/china', '/eurozone', '/japan', '/united-kingdom',
'/indicators', '/calendar', '/forecasts', '/countries',
'/gdp', '/inflation', '/unemployment', '/interest-rate',
'/stocks', '/bonds', '/commodity', '/currency'
];
seedUrls.forEach(url => foundUrls.add(url));
}
// Extract links from the page
$('a[href]').each((_, element) => {
const href = $(element).attr('href');
if (!href) {
return;
}
// Convert to absolute path
let path: string;
if (href.startsWith('/')) {
path = href;
} else if (href.includes('tradingeconomics.com')) {
try {
path = new URL(href).pathname;
} catch {
return; // Invalid URL
}
} else {
return; // Skip external links
}
// Clean the path
path = path.split('?')[0].split('#')[0];
// Basic filtering
if (shouldCrawl(path)) {
foundUrls.add(path);
}
});
const urls = Array.from(foundUrls);
logger.info(`Found ${urls.length} URLs on ${urlPath}`);
// 5. Update database
// Mark current URL as crawled with TE data if found
const updateData: any = {
lastCrawled: new Date(),
foundLinks: urls.length,
error: false
};
// Add TE data if found
if (teSymbol) {
updateData.teSymbol = teSymbol;
}
if (teLastUpdate) {
updateData.teLastUpdate = teLastUpdate;
}
if (teChart) {
updateData.teChart = teChart;
}
if (teAlertsName) {
updateData.teAlertsName = teAlertsName;
}
if (teChartUrl) {
updateData.teChartUrl = teChartUrl; // Full chart URL with PNG
}
if (teCountry) {
updateData.teCountry = teCountry;
}
if (teCategory) {
updateData.teCategory = teCategory;
}
if (teType) {
updateData.teType = teType;
}
if (teFrequency) {
updateData.teFrequency = teFrequency;
}
if (teForecast) {
updateData.teForecast = teForecast;
}
if (teChartsMeta) {
updateData.teChartsMeta = teChartsMeta;
}
if (symbol) {
updateData.symbol = symbol;
}
if (symbolType) {
updateData.symbolType = symbolType;
}
if (teChartsDatasource) {
updateData.teChartsDatasource = teChartsDatasource; // CDN URL for chart data
}
if (teChartToken) {
updateData.teChartToken = teChartToken;
}
await mongodb?.updateOne(
'teUrls',
{ url: urlPath },
{ $set: updateData },
{ upsert: true }
);
// Insert new URLs (without lastCrawled so they'll be picked up for crawling)
const newUrls: string[] = [];
for (const url of urls) {
const result = await mongodb?.updateOne(
'teUrls',
{ url },
{
$setOnInsert: {
url,
createdAt: new Date(),
source: urlPath
}
},
{ upsert: true }
);
// Only schedule if it was actually inserted (not already existing)
if (result?.upsertedCount > 0) {
newUrls.push(url);
}
}
// 6. Schedule individual jobs for each new URL
logger.info(`Scheduling ${newUrls.length} new URLs for crawling`);
for (const url of newUrls) {
await this.scheduleOperation('te-spider', { url }, {
jobId: `spider-${url.replace(/\//g, '-')}`,
priority: 10,
delay: Math.floor(Math.random() * 10000) // Spread requests over 10 seconds
});
}
return urls;
}
// Simple fetch with retry using http service with proxy
async function fetchPage(this: TeHandler, url: string): Promise<string | null> {
const { http, proxy, logger } = this;
for (let attempt = 1; attempt <= 3; attempt++) {
try {
// Get a proxy URL for this request
const proxyUrl = proxy?.getProxy();
const response = await http.get(url, {
proxy: proxyUrl,
headers: {
'User-Agent': getRandomUserAgent(),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Cache-Control': 'max-age=0'
}
});
if (response.ok) {
return await response.text();
}
// If not OK, log the status
logger.error(`HTTP ${response.status} for ${url}`);
} catch (error) {
logger.error(`Attempt ${attempt}/3 failed for ${url}:`, error);
if (attempt === 3) {
return null;
}
// Wait before retrying (exponential backoff)
await new Promise(resolve => setTimeout(resolve, 1000 * attempt));
}
}
return null;
}
// Simple URL filtering
function shouldCrawl(url: string): boolean {
// Skip empty or root
if (!url || url === '/') {
return false; return false;
} }
const matches = url.match(/\//g);
return matches !== null && matches.length >= 4; // Skip static files
if (/\.(pdf|jpg|jpeg|png|gif|svg|css|js|ico|xml|rss|json|txt|csv|xlsx|xls|doc|docx|zip)$/i.test(url)) {
return false;
}
// Skip deep URLs (more than 4 levels)
const depth = (url.match(/\//g) || []).length;
if (depth > 4) {
return false;
}
// Skip common non-content pages
const skipPatterns = [
/\/api\//,
/\/login/,
/\/register/,
/\/logout/,
/\/admin/,
/\/search/,
/\/print\//,
/\/download\//,
/\/embed\//,
/\/widget\//,
/\/stream\//,
/\/rss\//,
/#/,
];
for (const pattern of skipPatterns) {
if (pattern.test(url)) {
return false;
}
}
// Accept everything else
return true;
} }

View file

@ -8,12 +8,99 @@ import type { DataIngestionServices } from '../../types';
import { fetchCountries, spiderUrl } from './actions'; import { fetchCountries, spiderUrl } from './actions';
@Handler('te') @Handler('te')
@Disabled()
export class TeHandler extends BaseHandler<DataIngestionServices> { export class TeHandler extends BaseHandler<DataIngestionServices> {
constructor(services: any) { constructor(services: any) {
super(services); super(services);
} }
/**
* Initialize handler and create necessary indexes
*/
async onInit(): Promise<void> {
this.logger.info('Initializing TeHandler and creating indexes');
if (!this.mongodb) {
this.logger.warn('MongoDB not available, skipping index creation');
return;
}
try {
// Create indexes for teUrls collection
const indexes = [
// Compound index for finding URLs to crawl
{
indexSpec: { lastCrawled: 1, url: 1 },
options: {
name: 'crawl_status_idx',
background: true
}
},
// Unique index on URL to prevent duplicates
{
indexSpec: { url: 1 },
options: {
name: 'url_unique_idx',
unique: true,
background: true
}
},
// Index for finding URLs by symbol
{
indexSpec: { teSymbol: 1 },
options: {
name: 'symbol_idx',
sparse: true,
background: true
}
},
// Index for skip reason filtering
{
indexSpec: { skipReason: 1 },
options: {
name: 'skip_reason_idx',
sparse: true,
background: true
}
},
// Compound index for efficient batch queries
{
indexSpec: { lastCrawled: 1, skipReason: 1 },
options: {
name: 'batch_query_idx',
background: true
}
},
// Index for finding URLs with chart data
{
indexSpec: { teChartUrl: 1 },
options: {
name: 'chart_url_idx',
sparse: true,
background: true
}
}
];
for (const index of indexes) {
try {
await this.mongodb.createIndex('teUrls', index.indexSpec, index.options);
this.logger.info(`Created/verified index: ${index.options.name}`);
} catch (error) {
// Index might already exist, that's OK
this.logger.debug(`Index ${index.options.name} may already exist:`, error);
}
}
// Check collection stats
const count = await this.mongodb.countDocuments('teUrls', {});
this.logger.info(`TeUrls collection has ${count} documents`);
} catch (error) {
this.logger.error('Error creating indexes for TeHandler:', error);
// Don't throw - allow handler to continue even if indexes fail
}
}
@ScheduledOperation('te-countries', '0 0 * * 0', { @ScheduledOperation('te-countries', '0 0 * * 0', {
priority: 5, priority: 5,
description: 'Fetch and update Trading Economics countries data', description: 'Fetch and update Trading Economics countries data',
@ -22,10 +109,10 @@ export class TeHandler extends BaseHandler<DataIngestionServices> {
@Disabled() @Disabled()
fetchCountries = fetchCountries; fetchCountries = fetchCountries;
@ScheduledOperation('te-spider', '0 0 * * 0', { @ScheduledOperation('te-spider', '* * * * *', {
priority: 5, priority: 5,
description: 'Fetch and update Trading Economics countries data', description: 'Spider Trading Economics URLs for data extraction (every minute)',
immediately: false, immediately: true,
}) })
spiderUrlSchedule = spiderUrl; spiderUrlSchedule = spiderUrl;
} }