From eeed957fe191ab6382f12a3d3c0275c26a51214c Mon Sep 17 00:00:00 2001 From: Boki Date: Thu, 26 Mar 2026 16:32:37 -0400 Subject: [PATCH] test --- .../src/handlers/eod/eod.handler.ts | 2 + .../src/handlers/ib/ib.handler.ts | 2 + .../data-ingestion/src/handlers/index.ts | 15 + .../src/handlers/te/actions/spider.action.ts | 520 ++++++++++++------ .../src/handlers/te/te.handler.ts | 147 ++++- 5 files changed, 490 insertions(+), 196 deletions(-) diff --git a/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts b/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts index eebaef6..a9332fe 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts @@ -1,5 +1,6 @@ import { BaseHandler, + Disabled, Handler, Operation, RateLimit, @@ -32,6 +33,7 @@ import { createEODOperationRegistry } from './shared'; * Operations can specify just a cost to use handler limits, or override with custom limits */ @Handler('eod') +@Disabled() @RateLimit({ limits: [ { points: 900, duration: 60 }, // 1000 points per minute diff --git a/apps/stock/data-ingestion/src/handlers/ib/ib.handler.ts b/apps/stock/data-ingestion/src/handlers/ib/ib.handler.ts index f70a91f..737c51b 100644 --- a/apps/stock/data-ingestion/src/handlers/ib/ib.handler.ts +++ b/apps/stock/data-ingestion/src/handlers/ib/ib.handler.ts @@ -1,5 +1,6 @@ import { BaseHandler, + Disabled, Handler, Operation, ScheduledOperation, @@ -7,6 +8,7 @@ import { import { fetchExchanges, fetchExchangesAndSymbols, fetchSession, fetchSymbols } from './actions'; @Handler('ib') +@Disabled() export class IbHandler extends BaseHandler { constructor(services: any) { super(services); diff --git a/apps/stock/data-ingestion/src/handlers/index.ts b/apps/stock/data-ingestion/src/handlers/index.ts index 2b4cbd3..db9a896 100644 --- a/apps/stock/data-ingestion/src/handlers/index.ts +++ b/apps/stock/data-ingestion/src/handlers/index.ts @@ -103,6 +103,21 @@ export async function initializeAllHandlers(serviceContainer: IServiceContainer) handlersWithSchedule: handlerRegistry.getAllHandlersWithSchedule().size, }); + // Initialize handlers that have onInit method + // We need to instantiate handlers and call their onInit + for (const HandlerClass of handlers) { + try { + // Create handler instance with service container + const handlerInstance = new HandlerClass(serviceContainer); + if (handlerInstance && typeof handlerInstance.onInit === 'function') { + const handlerName = (HandlerClass as any).__handlerName || HandlerClass.name; + logger.info(`Calling onInit for handler: ${handlerName}`); + await handlerInstance.onInit(); + } + } catch (error) { + logger.error(`Failed to initialize handler ${HandlerClass.name}:`, error); + } + } } } else { logger.error('Could not access DI container from service container'); diff --git a/apps/stock/data-ingestion/src/handlers/te/actions/spider.action.ts b/apps/stock/data-ingestion/src/handlers/te/actions/spider.action.ts index d7cc71b..a8e3a0e 100644 --- a/apps/stock/data-ingestion/src/handlers/te/actions/spider.action.ts +++ b/apps/stock/data-ingestion/src/handlers/te/actions/spider.action.ts @@ -1,167 +1,355 @@ -import { getRandomUserAgent } from '@stock-bot/utils'; -import * as cheerio from 'cheerio'; -import { TE_CONFIG } from '../shared/config'; -import type { TeHandler } from '../te.handler'; - -export async function spiderUrl(this: TeHandler, payload: { url: string }): Promise { - const { logger, mongodb } = this; - const reqUrl = payload && payload.url ? TE_CONFIG.MAIN_URL + payload.url : TE_CONFIG.MAIN_URL; - this.logger.info(`Spiderring URL: ${reqUrl}`, {reqUrl}); - - // if( mongoRecord){ - // const url = mongoRecord.url; - // if (shouldSkipUrl(url)) { - // logger.info(`Skipping URL ${url} as its too deep`); - // return null; - // } - // } - const mongoRecord = await mongodb?.findOne('teUrls', { url: payload?.url || '/' }); - if(payload && payload.url && mongoRecord && mongoRecord.lastCrawled && mongoRecord.lastCrawled.getTime() > Date.now() - 30 * 24 * 60 * 60 * 1000) { - this.logger.info(`Skipping URL ${reqUrl} as it was already crawled in the last 24 hours`); - return null; // Skip if already crawled in the last 24 hours - } - - - if (!payload) { - const oneDayAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); - const records = await mongodb?.find('teUrls', { - $or: [ - { lastCrawled: { $lt: oneDayAgo } }, // Crawled more than 24 hours ago - { lastCrawled: { $exists: false } } // Never crawled - ] - }); - this.logger.info(`Found ${records?.length || 0} records to process`); - for (const record of records || []) { - const url = record.url; - if (shouldSkipUrl(url)) { - logger.info(`Skipping URL ${url} as its too deep`); - continue; - } - - await this.scheduleOperation('te-spider', { - url: record.url, - }, { - jobId: `te-spider-${record.url}`, - priority: 5, // Lower priority than financial data - }); - } - } - - try { - // 1. Fetch the HTML page - const reqInfo = { - proxy: 'http://5.79.66.2:13010',//this.proxy.getProxy(), - headers: { - 'User-Agent': getRandomUserAgent(), - 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', - 'Accept-Language': 'en-US,en;q=0.5', - 'Accept-Encoding': 'gzip, deflate, br', - }, - } - const response = await fetch(reqUrl, reqInfo); - - logger.debug('Response status:', { - status: response.status, - statusText: response.statusText, - url: response.url - }); - - if (!response.ok) { - throw new Error(`Failed to fetch: ${response.status} ${response.statusText}`); - } - - const html = await response.text(); - - - let match = html.match(/TESymbol = '([^']+)'/); - const teSymbol = match ? match[1] : undefined; - match = html.match(/;TELastUpdate = '([^']+)'/); - const teLastUpdate = match ? match[1] : undefined; - match = html.match(/; var TEChartsDatasource = '([^']+)'/); - const teChartUrl = match ? match[1] : undefined; - match = html.match(/; var TEChartsToken = '([^']+)'/); - const teChartToken = match ? match[1] : undefined; - - console.log(teSymbol, teLastUpdate, teChartUrl, teChartToken); - - const $ = cheerio.load(html); - const urls: string[] = []; - - $('.list-group-item, a[href^="/"]').each((_, element) => { - const $el = $(element); - let url: string | undefined; - if ($el.is('a')) { - const href = $el.attr('href'); - if (href && href.startsWith('/') && !href.includes('.aspx')) { - url = href; - } - } - - if (url && urls.indexOf(url) === -1) { - urls.push(url); - } - }); - - if (urls.length === 0) { - throw new Error('No urls found in HTML'); - } - - // 3. Save to MongoDB - try { - if (urls.length > 0) { - const urlMap: {url: string, lastCrawled?: Date, teSymbol? : string, teLastUpdate? : string, teChartUrl? : string, teChartToken? : string}[] = urls.map(url => ({url})); - if( payload && payload.url) { - urlMap.push({ - url: payload.url, - lastCrawled: new Date(), - teSymbol, - teLastUpdate, - teChartUrl, - teChartToken,}) - }else { - urlMap.push({url: '/', lastCrawled: new Date()}) - } - - const result = await mongodb?.batchUpsert('teUrls', urlMap, ['url']); - logger.info('TE URLs saved to MongoDB', { - matched: result.matchedCount, - modified: result.modifiedCount, - upserted: result.upsertedCount, - }); - } - } catch (dbError) { - logger.error('Failed to save urls to MongoDB', { error: dbError }); - throw dbError; - } - - for (const url of urls) { - if (shouldSkipUrl(url)) { - logger.info(`Skipping URL ${url} as its too deep`); - continue; // Skip if it's a subpage or already crawled - } - this.scheduleOperation('te-spider', { - url: url, - }, { - jobId: `te-spider-${url}`, - priority: 5, // Lower priority than financial data - }) - } - - return urls; - } catch (error) { - logger.error(`Failed to fetch Trading Economics URLs ${reqUrl}`, { - error: error instanceof Error ? error.message : String(error), - stack: error instanceof Error ? error.stack : undefined, - }); - return null; - } -} - -function shouldSkipUrl(url?: string): boolean { - // Skip if it's a subpage or already crawled in the last 24 hours - if (!url) { - return false; - } - const matches = url.match(/\//g); - return matches !== null && matches.length >= 4; +import { getRandomUserAgent } from '@stock-bot/utils'; +import * as cheerio from 'cheerio'; +import { TE_CONFIG } from '../shared/config'; +import type { TeHandler } from '../te.handler'; + +export async function spiderUrl(this: TeHandler, payload: { url: string }): Promise { + const { logger, mongodb, proxy, http } = this; + const urlPath = payload?.url || '/'; + const fullUrl = TE_CONFIG.MAIN_URL + urlPath; + + // 1. Check if already crawled recently (30 days) + const record = await mongodb?.findOne('teUrls', { url: urlPath }); + if (record?.lastCrawled) { + const daysSinceCrawl = (Date.now() - record.lastCrawled.getTime()) / (1000 * 60 * 60 * 24 * 30); + if (daysSinceCrawl < 30) { + logger.info(`Skipping ${urlPath} - crawled ${(daysSinceCrawl * 30).toFixed(1)} days ago`); + return null; + } + } + + // 2. Fetch the page + logger.info(`Crawling ${fullUrl}`); + const html = await fetchPage.call(this, fullUrl); + if (!html) { + logger.error(`Failed to fetch ${fullUrl}`); + // Mark as crawled anyway to avoid retrying immediately + await mongodb?.updateOne( + 'teUrls', + { url: urlPath }, + { + $set: { + lastCrawled: new Date(), + foundLinks: 0, + error: true + } + }, + { upsert: true } + ); + return null; + } + + // 3. Extract TE data from the HTML + let match = html.match(/var TESymbol = '([^']+)'/); + const teSymbol = match ? match[1] : undefined; + + match = html.match(/TELastUpdate = '([^']+)'/); + const teLastUpdate = match ? match[1] : undefined; + + match = html.match(/TEChart = '([^']+)'/); + const teChart = match ? match[1] : undefined; + + match = html.match(/var TEAlertsName\s*=\s*'([^']+)'/); + const teAlertsName = match ? match[1] : undefined; + + // Extract from script tags - these appear after TEChartUrl + match = html.match(/TEChartUrl = '([^']+)'/); + const teChartUrl = match ? match[1] : undefined; + + match = html.match(/TECountry = '([^']+)'/); + const teCountry = match ? match[1] : undefined; + + match = html.match(/TECategory = '([^']+)'/); + const teCategory = match ? match[1] : undefined; + + match = html.match(/TEType = '([^']+)'/); + const teType = match ? match[1] : undefined; + + match = html.match(/TEFrequency = '([^']+)'/); + const teFrequency = match ? match[1] : undefined; + + // Extract array data + match = html.match(/TEForecast\s*=\s*\[([^\]]+)\]/); + const teForecast = match ? JSON.parse('[' + match[1] + ']') : undefined; + + // Extract JSON metadata - use a more flexible regex + match = html.match(/TEChartsMeta = (\[[\s\S]*?\]);/); + const teChartsMeta = match ? (() => { + try { + return JSON.parse(match[1]); + } catch (e) { + logger.warn('Failed to parse TEChartsMeta:', e); + return undefined; + } + })() : undefined; + + // Extract symbol data + match = html.match(/symbol = '([^']+)'/); + const symbol = match ? match[1] : undefined; + + match = html.match(/symbolType = '([^']+)'/); + const symbolType = match ? match[1] : undefined; + + // TEChartsDatasource is the CDN URL for chart data + match = html.match(/; var TEChartsDatasource = '([^']+)'/); + const teChartsDatasource = match ? match[1] : undefined; + + match = html.match(/; var TEChartsToken = '([^']+)'/); + const teChartToken = match ? match[1] : undefined; + + // Log if we found any TE data + const foundData = teSymbol || teLastUpdate || teChartUrl || teChartsDatasource || + teChartToken || teChart || teAlertsName || teCountry || + teCategory || teType || teFrequency || teForecast || + teChartsMeta || symbol || symbolType; + + if (foundData) { + logger.debug('Found TE data:', { + teSymbol, + teLastUpdate, + teChart, + teAlertsName, + teChartUrl, + teCountry, + teCategory, + teType, + teFrequency, + teForecast: teForecast ? `[${teForecast.length} values]` : undefined, + teChartsMeta: teChartsMeta ? `[${teChartsMeta.length} items]` : undefined, + symbol, + symbolType, + teChartsDatasource, + teChartToken + }); + } + + // 4. Extract all links + const $ = cheerio.load(html); + const foundUrls = new Set(); + + // Add some seed URLs if this is the root page + if (urlPath === '/') { + const seedUrls = [ + '/united-states', '/china', '/eurozone', '/japan', '/united-kingdom', + '/indicators', '/calendar', '/forecasts', '/countries', + '/gdp', '/inflation', '/unemployment', '/interest-rate', + '/stocks', '/bonds', '/commodity', '/currency' + ]; + seedUrls.forEach(url => foundUrls.add(url)); + } + + // Extract links from the page + $('a[href]').each((_, element) => { + const href = $(element).attr('href'); + if (!href) { + return; + } + + // Convert to absolute path + let path: string; + if (href.startsWith('/')) { + path = href; + } else if (href.includes('tradingeconomics.com')) { + try { + path = new URL(href).pathname; + } catch { + return; // Invalid URL + } + } else { + return; // Skip external links + } + + // Clean the path + path = path.split('?')[0].split('#')[0]; + + // Basic filtering + if (shouldCrawl(path)) { + foundUrls.add(path); + } + }); + + const urls = Array.from(foundUrls); + logger.info(`Found ${urls.length} URLs on ${urlPath}`); + + // 5. Update database + // Mark current URL as crawled with TE data if found + const updateData: any = { + lastCrawled: new Date(), + foundLinks: urls.length, + error: false + }; + + // Add TE data if found + if (teSymbol) { + updateData.teSymbol = teSymbol; + } + if (teLastUpdate) { + updateData.teLastUpdate = teLastUpdate; + } + if (teChart) { + updateData.teChart = teChart; + } + if (teAlertsName) { + updateData.teAlertsName = teAlertsName; + } + if (teChartUrl) { + updateData.teChartUrl = teChartUrl; // Full chart URL with PNG + } + if (teCountry) { + updateData.teCountry = teCountry; + } + if (teCategory) { + updateData.teCategory = teCategory; + } + if (teType) { + updateData.teType = teType; + } + if (teFrequency) { + updateData.teFrequency = teFrequency; + } + if (teForecast) { + updateData.teForecast = teForecast; + } + if (teChartsMeta) { + updateData.teChartsMeta = teChartsMeta; + } + if (symbol) { + updateData.symbol = symbol; + } + if (symbolType) { + updateData.symbolType = symbolType; + } + if (teChartsDatasource) { + updateData.teChartsDatasource = teChartsDatasource; // CDN URL for chart data + } + if (teChartToken) { + updateData.teChartToken = teChartToken; + } + + await mongodb?.updateOne( + 'teUrls', + { url: urlPath }, + { $set: updateData }, + { upsert: true } + ); + + // Insert new URLs (without lastCrawled so they'll be picked up for crawling) + const newUrls: string[] = []; + for (const url of urls) { + const result = await mongodb?.updateOne( + 'teUrls', + { url }, + { + $setOnInsert: { + url, + createdAt: new Date(), + source: urlPath + } + }, + { upsert: true } + ); + // Only schedule if it was actually inserted (not already existing) + if (result?.upsertedCount > 0) { + newUrls.push(url); + } + } + + // 6. Schedule individual jobs for each new URL + logger.info(`Scheduling ${newUrls.length} new URLs for crawling`); + for (const url of newUrls) { + await this.scheduleOperation('te-spider', { url }, { + jobId: `spider-${url.replace(/\//g, '-')}`, + priority: 10, + delay: Math.floor(Math.random() * 10000) // Spread requests over 10 seconds + }); + } + + return urls; +} + +// Simple fetch with retry using http service with proxy +async function fetchPage(this: TeHandler, url: string): Promise { + const { http, proxy, logger } = this; + + for (let attempt = 1; attempt <= 3; attempt++) { + try { + // Get a proxy URL for this request + const proxyUrl = proxy?.getProxy(); + + const response = await http.get(url, { + proxy: proxyUrl, + headers: { + 'User-Agent': getRandomUserAgent(), + 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', + 'Accept-Language': 'en-US,en;q=0.5', + 'Accept-Encoding': 'gzip, deflate, br', + 'Connection': 'keep-alive', + 'Upgrade-Insecure-Requests': '1', + 'Cache-Control': 'max-age=0' + } + }); + + if (response.ok) { + return await response.text(); + } + + // If not OK, log the status + logger.error(`HTTP ${response.status} for ${url}`); + + } catch (error) { + logger.error(`Attempt ${attempt}/3 failed for ${url}:`, error); + if (attempt === 3) { + return null; + } + + // Wait before retrying (exponential backoff) + await new Promise(resolve => setTimeout(resolve, 1000 * attempt)); + } + } + return null; +} + +// Simple URL filtering +function shouldCrawl(url: string): boolean { + // Skip empty or root + if (!url || url === '/') { + return false; + } + + // Skip static files + if (/\.(pdf|jpg|jpeg|png|gif|svg|css|js|ico|xml|rss|json|txt|csv|xlsx|xls|doc|docx|zip)$/i.test(url)) { + return false; + } + + // Skip deep URLs (more than 4 levels) + const depth = (url.match(/\//g) || []).length; + if (depth > 4) { + return false; + } + + // Skip common non-content pages + const skipPatterns = [ + /\/api\//, + /\/login/, + /\/register/, + /\/logout/, + /\/admin/, + /\/search/, + /\/print\//, + /\/download\//, + /\/embed\//, + /\/widget\//, + /\/stream\//, + /\/rss\//, + /#/, + ]; + + for (const pattern of skipPatterns) { + if (pattern.test(url)) { + return false; + } + } + + // Accept everything else + return true; } \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/te/te.handler.ts b/apps/stock/data-ingestion/src/handlers/te/te.handler.ts index cc29c5b..8b12509 100644 --- a/apps/stock/data-ingestion/src/handlers/te/te.handler.ts +++ b/apps/stock/data-ingestion/src/handlers/te/te.handler.ts @@ -1,31 +1,118 @@ -import { - BaseHandler, - Disabled, - Handler, - ScheduledOperation -} from '@stock-bot/handlers'; -import type { DataIngestionServices } from '../../types'; -import { fetchCountries, spiderUrl } from './actions'; - -@Handler('te') -@Disabled() -export class TeHandler extends BaseHandler { - constructor(services: any) { - super(services); - } - - @ScheduledOperation('te-countries', '0 0 * * 0', { - priority: 5, - description: 'Fetch and update Trading Economics countries data', - immediately: false, - }) - @Disabled() - fetchCountries = fetchCountries; - - @ScheduledOperation('te-spider', '0 0 * * 0', { - priority: 5, - description: 'Fetch and update Trading Economics countries data', - immediately: false, - }) - spiderUrlSchedule = spiderUrl; +import { + BaseHandler, + Disabled, + Handler, + ScheduledOperation +} from '@stock-bot/handlers'; +import type { DataIngestionServices } from '../../types'; +import { fetchCountries, spiderUrl } from './actions'; + +@Handler('te') +export class TeHandler extends BaseHandler { + constructor(services: any) { + super(services); + } + + /** + * Initialize handler and create necessary indexes + */ + async onInit(): Promise { + this.logger.info('Initializing TeHandler and creating indexes'); + + if (!this.mongodb) { + this.logger.warn('MongoDB not available, skipping index creation'); + return; + } + + try { + // Create indexes for teUrls collection + const indexes = [ + // Compound index for finding URLs to crawl + { + indexSpec: { lastCrawled: 1, url: 1 }, + options: { + name: 'crawl_status_idx', + background: true + } + }, + // Unique index on URL to prevent duplicates + { + indexSpec: { url: 1 }, + options: { + name: 'url_unique_idx', + unique: true, + background: true + } + }, + // Index for finding URLs by symbol + { + indexSpec: { teSymbol: 1 }, + options: { + name: 'symbol_idx', + sparse: true, + background: true + } + }, + // Index for skip reason filtering + { + indexSpec: { skipReason: 1 }, + options: { + name: 'skip_reason_idx', + sparse: true, + background: true + } + }, + // Compound index for efficient batch queries + { + indexSpec: { lastCrawled: 1, skipReason: 1 }, + options: { + name: 'batch_query_idx', + background: true + } + }, + // Index for finding URLs with chart data + { + indexSpec: { teChartUrl: 1 }, + options: { + name: 'chart_url_idx', + sparse: true, + background: true + } + } + ]; + + for (const index of indexes) { + try { + await this.mongodb.createIndex('teUrls', index.indexSpec, index.options); + this.logger.info(`Created/verified index: ${index.options.name}`); + } catch (error) { + // Index might already exist, that's OK + this.logger.debug(`Index ${index.options.name} may already exist:`, error); + } + } + + // Check collection stats + const count = await this.mongodb.countDocuments('teUrls', {}); + this.logger.info(`TeUrls collection has ${count} documents`); + + } catch (error) { + this.logger.error('Error creating indexes for TeHandler:', error); + // Don't throw - allow handler to continue even if indexes fail + } + } + + @ScheduledOperation('te-countries', '0 0 * * 0', { + priority: 5, + description: 'Fetch and update Trading Economics countries data', + immediately: false, + }) + @Disabled() + fetchCountries = fetchCountries; + + @ScheduledOperation('te-spider', '* * * * *', { + priority: 5, + description: 'Spider Trading Economics URLs for data extraction (every minute)', + immediately: true, + }) + spiderUrlSchedule = spiderUrl; } \ No newline at end of file