diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/intraday.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/intraday.ts index 126876c..6470f42 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/actions/intraday.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/intraday.ts @@ -14,6 +14,11 @@ interface CrawlIntradayInput { interval: '1m' | '5m' | '1h'; } +interface ScheduleIntradayConfig { + exchanges?: string[]; // Optional: limit to specific exchanges + symbolTypes?: string[]; // Optional: limit to specific symbol types (e.g., ['Common Stock']) + limit?: number; // Optional: limit total number of symbols to process +} // Max days per interval based on EOD limits const MAX_DAYS_PER_INTERVAL = { @@ -22,13 +27,34 @@ const MAX_DAYS_PER_INTERVAL = { '1h': 7200 }; +// Default exchanges to process for intraday data +const DEFAULT_INTRADAY_EXCHANGES = ['US', 'TO', 'V', 'CN', 'NEO']; + export async function scheduleIntradayCrawl( - this: EodHandler + this: EodHandler, + config?: ScheduleIntradayConfig ): Promise<{ success: boolean; jobsScheduled: number }> { const logger = this.logger; try { - logger.info('Scheduling intraday crawl jobs'); + logger.info('Scheduling intraday crawl jobs', { + config: { + exchanges: config?.exchanges || DEFAULT_INTRADAY_EXCHANGES, + symbolTypes: config?.symbolTypes || 'all', + limit: config?.limit || 'unlimited' + } + }); + + // Check if operation registry is initialized + if (!this.operationRegistry) { + logger.error('Operation registry not initialized!'); + return { success: false, jobsScheduled: 0 }; + } + + logger.debug('Operation registry status:', { + hasRegistry: !!this.operationRegistry, + hasProvider: this.operationRegistry.hasProvider ? this.operationRegistry.hasProvider('eod') : 'no hasProvider method' + }); // Use OperationTracker to find symbols needing intraday crawl const intervals: Array<'1m' | '5m' | '1h'> = ['1m', '5m', '1h']; @@ -41,31 +67,55 @@ export async function scheduleIntradayCrawl( const interval = intervals[i]; const operationName = operationNames[i]!; // Non-null assertion since we know the array has 3 elements - // For intraday, we want to check even finished crawls for new data - // So we'll query the symbols directly - const symbolsForInterval = await this.mongodb.collection('eodSymbols').find({ - eodSearchCode: 'AAPL.US', - delisted: false - }).toArray(); + // Use getStaleSymbols to find symbols needing intraday updates + logger.debug(`Getting stale symbols for ${operationName}...`); - // Add interval info to each symbol - symbolsForInterval.forEach((symbol: any) => { - // Check if this interval needs processing (not finished or needs new data) - const operationStatus = symbol.operations?.[operationName]; - const shouldProcess = !operationStatus || !operationStatus.finished || - (operationStatus.newestDateReached && - new Date(operationStatus.newestDateReached) < new Date(Date.now() - 24 * 60 * 60 * 1000)); // Check if newest date is more than 1 day old + // Get symbols with all filters applied at the database level + const targetExchanges = config?.exchanges || DEFAULT_INTRADAY_EXCHANGES; + const desiredLimit = config?.limit || 5000; + + const staleSymbols = await this.operationRegistry.getStaleSymbols('eod', operationName, { + limit: desiredLimit, + exchanges: targetExchanges, + delisted: false + }); + + logger.debug(`getStaleSymbols returned ${staleSymbols.length} symbols for ${operationName}`); + + // Process the returned symbols + for (const staleSymbol of staleSymbols) { + const { symbol, operations } = staleSymbol; + const operationStatus = operations?.[operationName]; - if (shouldProcess) { + // Apply symbol type filter if specified (this is the only filter not in the DB query) + if (config?.symbolTypes && !config.symbolTypes.includes(symbol.Type)) { + logger.debug(`Skipping ${symbol.Code} - type ${symbol.Type} not in filter`); + continue; + } + + // Include symbols that: + // 1. Have never been processed (!operationStatus) + // 2. Are not finished (!operationStatus.finished) + // 3. Are finished but need new data (newest date > 1 day old) + const needsNewData = operationStatus?.finished && operationStatus?.newestDateReached && + new Date(operationStatus.newestDateReached) < new Date(Date.now() - 24 * 60 * 60 * 1000); + + if (!operationStatus || !operationStatus.finished || needsNewData) { allSymbolsForCrawl.push({ symbol: symbol, interval: interval, operationName: operationName, - lastRun: operationStatus?.lastRunAt, - lastSuccess: operationStatus?.lastSuccessAt + lastRun: staleSymbol.lastRun, + lastSuccess: staleSymbol.lastSuccess + }); + + logger.debug(`Added ${symbol.Code}.${symbol.Exchange} for ${interval} crawl`, { + hasOperation: !!operationStatus, + finished: operationStatus?.finished, + needsNewData }); } - }); + } } if (!allSymbolsForCrawl || allSymbolsForCrawl.length === 0) { @@ -80,10 +130,11 @@ export async function scheduleIntradayCrawl( '5m': allSymbolsForCrawl.filter(s => s.interval === '5m').length, '1h': allSymbolsForCrawl.filter(s => s.interval === '1h').length }, - samples: allSymbolsForCrawl.slice(0, 5).map(s => ({ + symbols: allSymbolsForCrawl.map(s => ({ symbol: s.symbol.Code, - exchange: s.symbol.eodExchange || s.symbol.Exchange, + exchange: s.symbol.Exchange, name: s.symbol.Name, + eodSearchCode: s.symbol.eodSearchCode, interval: s.interval, lastRun: s.lastRun ? new Date(s.lastRun).toISOString() : 'never', lastSuccess: s.lastSuccess ? new Date(s.lastSuccess).toISOString() : 'never' @@ -96,19 +147,29 @@ export async function scheduleIntradayCrawl( for (const item of allSymbolsForCrawl) { const { symbol, interval } = item; - await this.scheduleOperation('crawl-intraday', { - eodSearchCode: symbol.eodSearchCode, - interval - }, { - priority: 5, // Initial crawl jobs get priority 5 (lower priority) - attempts: 3, - backoff: { - type: 'exponential', - delay: 10000 - }, - delay: jobsScheduled * 500 // Stagger jobs by 500ms - }); - jobsScheduled++; + const jobId = `crawl-intraday-${symbol.eodSearchCode}-${interval}`; + try { + await this.scheduleOperation('crawl-intraday', { + eodSearchCode: symbol.eodSearchCode, + interval + }, { + jobId, + priority: 5, // Initial crawl jobs get priority 5 (lower priority) + attempts: 3, + backoff: { + type: 'exponential', + delay: 10000 + }, + delay: jobsScheduled * 500 // Stagger jobs by 500ms + }); + jobsScheduled++; + } catch (error: any) { + if (error?.message?.includes('Job already exists')) { + logger.debug(`Job already exists: ${jobId}`); + } else { + throw error; + } + } } logger.info(`Successfully scheduled ${jobsScheduled} intraday crawl jobs`); @@ -277,6 +338,7 @@ export async function crawlIntraday( eodSearchCode, interval }, { + jobId: `crawl-intraday-${eodSearchCode}-${interval}`, priority: 3, // Continuation jobs get higher priority (3) than initial jobs (5) attempts: 3, backoff: { diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/prices.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/prices.ts index d66cb2c..648acee 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/actions/prices.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/prices.ts @@ -42,23 +42,34 @@ export async function scheduleFetchPrices( continue; } const { symbol } = staleSymbol; + const jobId = `fetch-prices-${symbol.eodSearchCode}`; logger.debug(`Scheduling price fetch for ${symbol.Code}.${symbol.Exchange}`, { name: symbol.Name, lastUpdate: staleSymbol.lastRun, - delay: i * 100 + delay: i * 100, + jobId }); - await this.scheduleOperation('fetch-prices', { - eodSearchCode: symbol.eodSearchCode - }, { - attempts: 3, - backoff: { - type: 'exponential', - delay: 5000 - }, - delay: i * 100 // Stagger jobs by 100ms per symbol to avoid rate limit spikes - }); - jobsScheduled++; + try { + await this.scheduleOperation('fetch-prices', { + eodSearchCode: symbol.eodSearchCode + }, { + jobId, + attempts: 3, + backoff: { + type: 'exponential', + delay: 5000 + }, + delay: i * 100 // Stagger jobs by 100ms per symbol to avoid rate limit spikes + }); + jobsScheduled++; + } catch (error: any) { + if (error?.message?.includes('Job already exists')) { + logger.debug(`Job already exists: ${jobId}`); + } else { + throw error; + } + } } logger.info(`Successfully scheduled ${jobsScheduled} price fetch jobs`); diff --git a/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts b/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts index becf0da..eebaef6 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts @@ -34,7 +34,7 @@ import { createEODOperationRegistry } from './shared'; @Handler('eod') @RateLimit({ limits: [ - { points: 500, duration: 60 }, // 1000 points per minute + { points: 900, duration: 60 }, // 1000 points per minute { points: 100500, duration: 86400 }, // 100,500 points per day ], }) @@ -110,7 +110,7 @@ export class EodHandler extends BaseHandler { */ @Operation('schedule-intraday-crawl') @ScheduledOperation('schedule-intraday-crawl', '0 3 * * *', { - // immediately: true, + immediately: true, }) @RateLimit(1) // 1 point for scheduling scheduleIntradayCrawl = scheduleIntradayCrawl; diff --git a/apps/stock/data-ingestion/src/shared/operation-manager/OperationTracker.ts b/apps/stock/data-ingestion/src/shared/operation-manager/OperationTracker.ts index b9f076c..a848049 100644 --- a/apps/stock/data-ingestion/src/shared/operation-manager/OperationTracker.ts +++ b/apps/stock/data-ingestion/src/shared/operation-manager/OperationTracker.ts @@ -356,7 +356,9 @@ export class OperationTracker { limit = 1000, excludeSymbols = [], activeOnly = true, - symbolFilter + symbolFilter, + exchanges, + delisted } = options; this.provider.validateOperation(operationName); @@ -389,6 +391,16 @@ export class OperationTracker { filter[symbolField] = symbolFilter.symbol; } + // Add exchange filter if provided + if (exchanges && exchanges.length > 0) { + filter.Exchange = { $in: exchanges }; + } + + // Add delisted filter if provided + if (delisted !== undefined) { + filter.delisted = delisted; + } + const symbols = await this.mongodb.find(collectionName, filter, { limit, projection: { }, // Return all fields diff --git a/apps/stock/data-ingestion/src/shared/operation-manager/types.ts b/apps/stock/data-ingestion/src/shared/operation-manager/types.ts index 13ab9d7..f205511 100644 --- a/apps/stock/data-ingestion/src/shared/operation-manager/types.ts +++ b/apps/stock/data-ingestion/src/shared/operation-manager/types.ts @@ -124,6 +124,10 @@ export interface StaleSymbolOptions { activeOnly?: boolean; /** Filter for specific symbols */ symbolFilter?: { symbol?: string }; + /** Filter by exchanges */ + exchanges?: string[]; + /** Filter by delisted status */ + delisted?: boolean; } /**