From 18289f0a041bee73fa2c56d3da3dcef746e03102 Mon Sep 17 00:00:00 2001 From: Boki Date: Thu, 10 Jul 2025 08:02:40 -0400 Subject: [PATCH] intraday test --- .../handlers/eod/actions/corporate-actions.ts | 8 +-- .../src/handlers/eod/actions/fundamentals.ts | 4 +- .../src/handlers/eod/actions/intraday.ts | 62 +++++++++++++++---- .../src/handlers/eod/actions/prices.ts | 2 +- .../src/handlers/eod/eod.handler.ts | 24 ++++--- .../data-ingestion/src/handlers/index.ts | 34 ++++++++++ .../src/handlers/qm/qm.handler.ts | 22 ++++--- 7 files changed, 122 insertions(+), 34 deletions(-) diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/corporate-actions.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/corporate-actions.ts index 7fb9924..fda77a7 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/actions/corporate-actions.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/corporate-actions.ts @@ -24,23 +24,23 @@ export async function scheduleFetchCorporateActions( // Use OperationTracker to find stale symbols for both dividends and splits const allStaleDividends = await this.operationRegistry.getStaleSymbols('eod', 'dividends_update', { - limit: 2000 // Get more symbols to filter from + limit: 50000 // Get all symbols needing update }); const allStaleSplits = await this.operationRegistry.getStaleSymbols('eod', 'splits_update', { - limit: 2000 // Get more symbols to filter from + limit: 50000 // Get all symbols needing update }); // Filter for Canadian exchanges and non-delisted symbols const staleSymbolsDividends = allStaleDividends.filter(item => canadianExchanges.includes(item.symbol.Exchange) && item.symbol.delisted === false - ).slice(0, 500); + ); // Remove the slice limit to process all symbols const staleSymbolsSplits = allStaleSplits.filter(item => canadianExchanges.includes(item.symbol.Exchange) && item.symbol.delisted === false - ).slice(0, 500); + ); // Remove the slice limit to process all symbols if ((!staleSymbolsDividends || staleSymbolsDividends.length === 0) && (!staleSymbolsSplits || staleSymbolsSplits.length === 0)) { diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/fundamentals.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/fundamentals.ts index 878d00e..21c65b4 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/actions/fundamentals.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/fundamentals.ts @@ -27,14 +27,14 @@ export async function scheduleFetchFundamentals( // Use OperationTracker to find stale symbols const allStaleSymbols = await this.operationRegistry.getStaleSymbols('eod', 'fundamentals_update', { - limit: 5000 // Get more symbols to filter from + limit: 50000 // Get all symbols needing update }); // Filter for Canadian exchanges and non-delisted symbols const staleSymbols = allStaleSymbols.filter(item => canadianExchanges.includes(item.symbol.Exchange) && item.symbol.delisted === false - ).slice(0, 1000); // Limit to 1000 after filtering + ); // Remove the slice limit to process all symbols if (!staleSymbols || staleSymbols.length === 0) { logger.info('No symbols need fundamentals update'); diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/intraday.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/intraday.ts index 96cd87f..3edeed3 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/actions/intraday.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/intraday.ts @@ -48,12 +48,15 @@ export async function scheduleIntradayCrawl( const operationName = operationNames[i]!; // Non-null assertion since we know the array has 3 elements const symbolsForInterval = await this.operationRegistry.getStaleSymbols('eod', operationName, { - limit: 100 // Limit per interval + limit: 1000, // Get more to filter + symbolFilter: { symbol: 'AAPL' } // Filter for AAPL only }); - // Filter out delisted symbols + // Filter out delisted symbols and ensure we get AAPL with US exchange const activeSymbols = symbolsForInterval.filter(item => - item.symbol.delisted === false + item.symbol.delisted === false && + item.symbol.Code === 'AAPL' && + (item.symbol.eodExchange === 'US' || item.symbol.Exchange === 'US') ); // Add interval info to each symbol @@ -75,13 +78,18 @@ export async function scheduleIntradayCrawl( logger.info(`Found ${allSymbolsForCrawl.length} symbol/interval combinations needing intraday data`, { count: allSymbolsForCrawl.length, + byInterval: { + '1m': allSymbolsForCrawl.filter(s => s.interval === '1m').length, + '5m': allSymbolsForCrawl.filter(s => s.interval === '5m').length, + '1h': allSymbolsForCrawl.filter(s => s.interval === '1h').length + }, samples: allSymbolsForCrawl.slice(0, 5).map(s => ({ symbol: s.symbol.Code, exchange: s.symbol.eodExchange || s.symbol.Exchange, name: s.symbol.Name, interval: s.interval, - lastRun: s.lastRun, - lastSuccess: s.lastSuccess + lastRun: s.lastRun ? new Date(s.lastRun).toISOString() : 'never', + lastSuccess: s.lastSuccess ? new Date(s.lastSuccess).toISOString() : 'never' })) }); @@ -97,6 +105,7 @@ export async function scheduleIntradayCrawl( interval, country: symbol.Country }, { + priority: 5, // Initial crawl jobs get priority 5 (lower priority) attempts: 3, backoff: { type: 'exponential', @@ -127,7 +136,12 @@ export async function crawlIntraday( const { symbol, exchange, interval, country } = input; try { - logger.info(`Starting intraday crawl for ${symbol}.${exchange} - ${interval}`); + logger.info(`Starting intraday crawl for ${symbol}.${exchange} - ${interval}`, { + symbol, + exchange, + interval, + country + }); // Get symbol to check if it exists const symbolDoc = await this.mongodb.collection('eodSymbols').findOne({ @@ -161,6 +175,19 @@ export async function crawlIntraday( fromDate = new Date(toDate); fromDate.setDate(fromDate.getDate() - maxDays + 1); + logger.info(`Fetching intraday batch for ${symbol}.${exchange} - ${interval} from ${fromDate.toISOString().split('T')[0]} to ${toDate.toISOString().split('T')[0]}`, { + symbol, + exchange, + interval, + fromDate: fromDate.toISOString(), + toDate: toDate.toISOString(), + maxDays, + crawlState: { + lastProcessedDate: crawlState.lastProcessedDate, + totalDaysProcessed: crawlState.totalDaysProcessed || 0 + } + }); + // Fetch data for this batch const result = await fetchIntraday.call(this, { symbol, @@ -192,12 +219,14 @@ export async function crawlIntraday( // Check if we're finished (no data returned means we've reached the end) if (result.recordsSaved === 0) { newState.finished = true; - logger.info(`Intraday crawl finished for ${symbol}.${exchange} - ${interval}`, { + logger.info(`Intraday crawl finished for ${symbol}.${exchange} - ${interval} (${newState.oldestDateReached?.toISOString().split('T')[0]} to ${newState.newestDateReached?.toISOString().split('T')[0]})`, { symbol, exchange, interval, - oldestDate: newState.oldestDateReached, - newestDate: newState.newestDateReached, + oldestDate: newState.oldestDateReached?.toISOString(), + newestDate: newState.newestDateReached?.toISOString(), + totalDaysProcessed: newState.totalDaysProcessed, + noDataReturned: true }); } @@ -216,6 +245,7 @@ export async function crawlIntraday( interval, country }, { + priority: 3, // Continuation jobs get higher priority (3) than initial jobs (5) attempts: 3, backoff: { type: 'exponential', @@ -224,11 +254,14 @@ export async function crawlIntraday( delay: 5000 // Wait 5 seconds before next batch }); - logger.info('Scheduled next intraday batch', { + logger.info(`Scheduled next intraday batch for ${symbol}.${exchange} - ${interval}`, { symbol, exchange, interval, - nextFromDate: fromDate.toISOString() + currentBatchFrom: fromDate.toISOString(), + currentBatchTo: toDate.toISOString(), + recordsSaved: result.recordsSaved, + totalDaysProcessed: newState.totalDaysProcessed }); } @@ -252,8 +285,13 @@ export async function fetchIntraday( try { logger.info(`Fetching intraday data for ${symbol}.${exchange} - ${interval}`, { + symbol, + exchange, + interval, from: fromDate?.toISOString().split('T')[0], - to: toDate?.toISOString().split('T')[0] + to: toDate?.toISOString().split('T')[0], + country, + url: `https://eodhd.com/api/intraday/${symbol}.${exchange}` }); // Get country if not provided diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/prices.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/prices.ts index bd59573..b78cd81 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/actions/prices.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/prices.ts @@ -18,7 +18,7 @@ export async function scheduleFetchPrices( // Use OperationTracker to find stale symbols const staleSymbols = await this.operationRegistry.getStaleSymbols('eod', 'price_update', { - limit: 1000 // Process in batches to avoid overwhelming the system + limit: 50000 // Higher limit to process all symbols }); if (!staleSymbols || staleSymbols.length === 0) { diff --git a/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts b/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts index 1cc4563..e181481 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts @@ -39,16 +39,26 @@ import { createEODOperationRegistry } from './shared'; ], }) export class EodHandler extends BaseHandler { - public operationRegistry!: OperationRegistry; + public operationRegistry: OperationRegistry; constructor(services: any) { super(services); - } - - async initialize(): Promise { - // Initialize operation registry - this.operationRegistry = await createEODOperationRegistry(this.mongodb, this.logger); - this.logger.info('EOD operation registry initialized'); + + // Use pre-initialized registry if available, otherwise create new one + if ((EodHandler as any).operationRegistry) { + this.operationRegistry = (EodHandler as any).operationRegistry; + this.logger.info('Using pre-initialized EOD operation registry'); + } else { + // Fallback: Initialize operation registry with EOD provider + createEODOperationRegistry(this.mongodb, this.logger) + .then(registry => { + this.operationRegistry = registry; + this.logger.info('EOD operation registry initialized successfully'); + }) + .catch(error => { + this.logger.error('Failed to initialize EOD operations', { error }); + }); + } } /** diff --git a/apps/stock/data-ingestion/src/handlers/index.ts b/apps/stock/data-ingestion/src/handlers/index.ts index a64d177..2b4cbd3 100644 --- a/apps/stock/data-ingestion/src/handlers/index.ts +++ b/apps/stock/data-ingestion/src/handlers/index.ts @@ -14,11 +14,17 @@ import { WebShareHandler } from './webshare/webshare.handler'; import { TradingViewHandler } from './tradingview/tradingview.handler'; import { TeHandler } from './te/te.handler'; import { EodHandler } from './eod/eod.handler'; +import { createEODOperationRegistry } from './eod/shared'; +import { createQMOperationRegistry } from './qm/shared/operation-provider'; // Add more handler imports as needed const logger = getLogger('handler-init'); +// Global operation registries that handlers will reference +let eodOperationRegistry: any; +let qmOperationRegistry: any; + /** * Initialize and register all handlers * Note: The actual registration is now handled by the HandlerScanner in the DI container @@ -26,6 +32,33 @@ const logger = getLogger('handler-init'); */ export async function initializeAllHandlers(serviceContainer: IServiceContainer): Promise { try { + // Initialize operation registries first + logger.info('Initializing operation registries'); + + const mongodb = serviceContainer.mongodb; + const operationLogger = serviceContainer.logger; + + // Create operation registries + try { + eodOperationRegistry = await createEODOperationRegistry(mongodb, operationLogger); + logger.info('EOD operation registry created'); + + // Attach to handler class so instances can access it + (EodHandler as any).operationRegistry = eodOperationRegistry; + } catch (error) { + logger.error('Failed to create EOD operation registry', { error }); + } + + try { + qmOperationRegistry = await createQMOperationRegistry(mongodb, operationLogger); + logger.info('QM operation registry created'); + + // Attach to handler class so instances can access it + (QMHandler as any).operationRegistry = qmOperationRegistry; + } catch (error) { + logger.error('Failed to create QM operation registry', { error }); + } + // The HandlerScanner in the DI container will handle the actual registration // We just need to ensure handlers are imported so their decorators run @@ -69,6 +102,7 @@ export async function initializeAllHandlers(serviceContainer: IServiceContainer) registeredHandlers: handlerRegistry.getHandlerNames(), handlersWithSchedule: handlerRegistry.getAllHandlersWithSchedule().size, }); + } } else { logger.error('Could not access DI container from service container'); diff --git a/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts b/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts index 8314504..30e0956 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts @@ -40,14 +40,20 @@ export class QMHandler extends BaseHandler { constructor(services: any) { super(services); // Handler name read from @Handler decorator - // Initialize operation registry with QM provider - createQMOperationRegistry(this.mongodb, this.logger) - .then(registry => { - this.operationRegistry = registry; - }) - .catch(error => { - this.logger.error('Failed to initialize QM operations', { error }); - }); + // Use pre-initialized registry if available, otherwise create new one + if ((QMHandler as any).operationRegistry) { + this.operationRegistry = (QMHandler as any).operationRegistry; + this.logger.info('Using pre-initialized QM operation registry'); + } else { + // Fallback: Initialize operation registry with QM provider + createQMOperationRegistry(this.mongodb, this.logger) + .then(registry => { + this.operationRegistry = registry; + }) + .catch(error => { + this.logger.error('Failed to initialize QM operations', { error }); + }); + } } /**