diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/intraday.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/intraday.ts index 737f636..96cd87f 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/actions/intraday.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/intraday.ts @@ -1,8 +1,7 @@ -import type { BaseHandler } from '@stock-bot/handlers'; -import type { DataIngestionServices } from '../../../types'; + +import type { CrawlState } from '../../../shared/operation-manager/types'; import type { EodHandler } from '../eod.handler'; import { EOD_CONFIG } from '../shared'; -import { getEodExchangeSuffix } from '../shared/utils'; interface FetchIntradayInput { symbol: string; @@ -20,14 +19,7 @@ interface CrawlIntradayInput { country?: string; } -interface CrawlState { - finished: boolean; - oldestDateReached?: Date; - newestDateReached?: Date; - lastProcessedDate?: Date; - totalRecordsProcessed?: number; - totalBatchesProcessed?: number; -} +// CrawlState is imported from operation-manager types // Max days per interval based on EOD limits const MAX_DAYS_PER_INTERVAL = { @@ -44,37 +36,34 @@ export async function scheduleIntradayCrawl( try { logger.info('Scheduling intraday crawl jobs'); - // Get Canadian exchanges for now - const canadianExchanges = ['TO', 'V', 'CN', 'NEO']; - // Use OperationTracker to find symbols needing intraday crawl const intervals: Array<'1m' | '5m' | '1h'> = ['1m', '5m', '1h']; - const operationNames = ['intraday_1m', 'intraday_5m', 'intraday_1h']; + const operationNames: string[] = ['intraday_1m', 'intraday_5m', 'intraday_1h']; - let allSymbolsForCrawl: any[] = []; + const allSymbolsForCrawl: any[] = []; // Get symbols needing crawl for each interval for (let i = 0; i < intervals.length; i++) { const interval = intervals[i]; - const operationName = operationNames[i]; + const operationName = operationNames[i]!; // Non-null assertion since we know the array has 3 elements - const allSymbolsForInterval = await this.operationRegistry.getSymbolsForIntradayCrawl('eod', operationName, { - limit: 500 // Get more symbols to filter from + const symbolsForInterval = await this.operationRegistry.getStaleSymbols('eod', operationName, { + limit: 100 // Limit per interval }); - // Filter for Canadian exchanges and non-delisted symbols - const symbolsForInterval = allSymbolsForInterval.filter(item => - canadianExchanges.includes(item.symbol.Exchange) && + // Filter out delisted symbols + const activeSymbols = symbolsForInterval.filter(item => item.symbol.delisted === false - ).slice(0, 100); + ); // Add interval info to each symbol - symbolsForInterval.forEach(item => { + activeSymbols.forEach(item => { allSymbolsForCrawl.push({ symbol: item.symbol, interval: interval, operationName: operationName, - crawlState: item.crawlState + lastRun: item.lastRun, + lastSuccess: item.lastSuccess }); }); } @@ -88,10 +77,11 @@ export async function scheduleIntradayCrawl( count: allSymbolsForCrawl.length, samples: allSymbolsForCrawl.slice(0, 5).map(s => ({ symbol: s.symbol.Code, - exchange: s.symbol.Exchange, + exchange: s.symbol.eodExchange || s.symbol.Exchange, name: s.symbol.Name, interval: s.interval, - crawlState: s.crawlState + lastRun: s.lastRun, + lastSuccess: s.lastSuccess })) }); @@ -139,17 +129,20 @@ export async function crawlIntraday( try { logger.info(`Starting intraday crawl for ${symbol}.${exchange} - ${interval}`); - // Get current crawl state + // Get symbol to check if it exists const symbolDoc = await this.mongodb.collection('eodSymbols').findOne({ Code: symbol, - Exchange: exchange + eodExchange: exchange }); if (!symbolDoc) { throw new Error(`Symbol ${symbol}.${exchange} not found`); } - const crawlState: CrawlState = symbolDoc.intradayState?.[interval] || { + // Get operation status from tracker + const operationName = `intraday_${interval}`; + const operationStatus = symbolDoc.operations?.[operationName]; + const crawlState: CrawlState = operationStatus?.crawlState || { finished: false }; @@ -181,9 +174,9 @@ export async function crawlIntraday( // Update crawl state const newState: CrawlState = { ...crawlState, + finished: false, lastProcessedDate: fromDate, - totalRecordsProcessed: (crawlState.totalRecordsProcessed || 0) + result.recordsSaved, - totalBatchesProcessed: (crawlState.totalBatchesProcessed || 0) + 1 + totalDaysProcessed: (crawlState.totalDaysProcessed || 0) + 1 }; // Set oldest date reached @@ -200,23 +193,20 @@ export async function crawlIntraday( if (result.recordsSaved === 0) { newState.finished = true; logger.info(`Intraday crawl finished for ${symbol}.${exchange} - ${interval}`, { - totalRecords: newState.totalRecordsProcessed, + symbol, + exchange, + interval, oldestDate: newState.oldestDateReached, newestDate: newState.newestDateReached, - batches: newState.totalBatchesProcessed }); } - // Update symbol with new crawl state - await this.mongodb.collection('eodSymbols').updateOne( - { Code: symbol, Exchange: exchange }, - { - $set: { - [`intradayState.${interval}`]: newState, - lastIntradayUpdate: new Date() - } - } - ); + // Update operation tracker with crawl state + await this.operationRegistry.updateOperation('eod', symbol, operationName, { + status: newState.finished ? 'success' : 'partial', + recordCount: result.recordsSaved, + crawlState: newState + }); // If not finished, schedule next batch if (!newState.finished) { @@ -271,7 +261,7 @@ export async function fetchIntraday( if (!symbolCountry) { const symbolDoc = await this.mongodb.collection('eodSymbols').findOne({ Code: symbol, - Exchange: exchange + eodExchange: exchange }); if (!symbolDoc) { @@ -287,10 +277,8 @@ export async function fetchIntraday( } // Build URL - // Use utility function to handle US symbols and EUFUND special case - const exchangeSuffix = getEodExchangeSuffix(exchange, symbolCountry); - - const url = new URL(`https://eodhd.com/api/intraday/${symbol}.${exchangeSuffix}`); + // Note: 'exchange' parameter here is already the eodExchange from scheduling + const url = new URL(`https://eodhd.com/api/intraday/${symbol}.${exchange}`); url.searchParams.append('api_token', apiKey); url.searchParams.append('fmt', 'json'); url.searchParams.append('interval', interval); diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/prices.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/prices.ts index 590a1e4..bd59573 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/actions/prices.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/prices.ts @@ -1,5 +1,3 @@ -import type { BaseHandler } from '@stock-bot/handlers'; -import type { DataIngestionServices } from '../../../types'; import type { EodHandler } from '../eod.handler'; import { EOD_CONFIG } from '../shared'; import { getEodExchangeSuffix } from '../shared/utils'; @@ -175,7 +173,7 @@ export async function fetchPrices( success: true, priceCount: result.insertedCount }; - } catch (error) { + } catch (error: unknown) { logger.error('Failed to fetch or save prices', { error, symbol, exchange }); // Update operation tracker with failure diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/events.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/events.action.ts index 01d27e3..455480b 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/events.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/events.action.ts @@ -221,23 +221,16 @@ export async function scheduleEventsUpdates( this.logger.info(`Found ${staleSymbols.length} symbols needing events updates`); - // Get full symbol data to include symbolId - const symbolDocs = await this.mongodb.find('qmSymbols', { - qmSearchCode: { $in: staleSymbols.slice(0, limit) } // Apply limit after deduplication - }, { - projection: { symbol: 1, exchange: 1, qmSearchCode: 1 } - }); - let queued = 0; let errors = 0; // Schedule individual update jobs for each symbol - for (const doc of symbolDocs) { + for (const item of staleSymbols) { try { await this.scheduleOperation('update-events', { - symbol: doc.symbol, - exchange: doc.exchange, - qmSearchCode: doc.qmSearchCode + symbol: item.symbol.symbol, + exchange: item.symbol.exchange, + qmSearchCode: item.symbol.qmSearchCode }, { priority: 4, delay: queued * 0.05 // 1.5 seconds between jobs @@ -245,7 +238,7 @@ export async function scheduleEventsUpdates( queued++; } catch (error) { - this.logger.error(`Failed to schedule events update for ${doc.symbol}`, { error }); + this.logger.error(`Failed to schedule events update for ${item.symbol.symbol}`, { error }); errors++; } } diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/financials.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/financials.action.ts index 1dfea57..77f17f2 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/financials.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/financials.action.ts @@ -181,30 +181,43 @@ export async function scheduleFinancialsUpdates( this.logger.info(`Found ${staleSymbolsQ.length} symbols needing quarterly updates and ${staleSymbolsA.length} symbols needing annual updates`); - // Combine unique symbols from both lists - const allStaleSymbols = [...new Set([...staleSymbolsQ, ...staleSymbolsA])]; - - // Get full symbol data - const symbolDocs = await this.mongodb.find('qmSymbols', { - qmSearchCode: { $in: allStaleSymbols } - }, { - projection: { symbol: 1, exchange: 1, qmSearchCode: 1 } - }); + // Create a map of symbols needing updates + const symbolsNeedingUpdates = new Map(); + + // Add quarterly symbols + for (const item of staleSymbolsQ) { + const key = item.symbol.qmSearchCode; + if (!symbolsNeedingUpdates.has(key)) { + symbolsNeedingUpdates.set(key, { quarterly: true, annual: false, item }); + } else { + symbolsNeedingUpdates.get(key)!.quarterly = true; + } + } + + // Add annual symbols + for (const item of staleSymbolsA) { + const key = item.symbol.qmSearchCode; + if (!symbolsNeedingUpdates.has(key)) { + symbolsNeedingUpdates.set(key, { quarterly: false, annual: true, item }); + } else { + symbolsNeedingUpdates.get(key)!.annual = true; + } + } let queued = 0; let errors = 0; // Schedule individual update jobs for each symbol and report type - for (const doc of symbolDocs) { - // Check if this symbol needs quarterly updates - if (staleSymbolsQ.includes(doc.qmSearchCode)) { + for (const [qmSearchCode, { quarterly, annual, item }] of symbolsNeedingUpdates) { + // Schedule quarterly updates if needed + if (quarterly) { try { await this.scheduleOperation('update-financials', { - symbol: doc.symbol, - exchange: doc.exchange, - qmSearchCode: doc.qmSearchCode, + symbol: item.symbol.symbol, + exchange: item.symbol.exchange, + qmSearchCode: qmSearchCode, reportType: 'Q', - lastRecordDate: doc.operations?.price_update?.lastRecordDate, + lastRecordDate: item.operations?.financials_update_quarterly?.lastRecordDate, }, { priority: 4, delay: queued // 1 second between jobs @@ -212,20 +225,20 @@ export async function scheduleFinancialsUpdates( queued++; } catch (error) { - this.logger.error(`Failed to schedule quarterly financials update for ${doc.qmSearchCode}`, { error }); + this.logger.error(`Failed to schedule quarterly financials update for ${qmSearchCode}`, { error }); errors++; } } - // Check if this symbol needs annual updates - if (staleSymbolsA.includes(doc.qmSearchCode)) { + // Schedule annual updates if needed + if (annual) { try { await this.scheduleOperation('update-financials', { - symbol: doc.symbol, - exchange: doc.exchange, - qmSearchCode: doc.qmSearchCode, + symbol: item.symbol.symbol, + exchange: item.symbol.exchange, + qmSearchCode: qmSearchCode, reportType: 'A', - lastRecordDate: doc.operations?.price_update?.lastRecordDate, + lastRecordDate: item.operations?.financials_update_annual?.lastRecordDate, }, { priority: 4, delay: queued // 1 second between jobs @@ -233,7 +246,7 @@ export async function scheduleFinancialsUpdates( queued++; } catch (error) { - this.logger.error(`Failed to schedule annual financials update for ${doc.qmSearchCode}`, { error }); + this.logger.error(`Failed to schedule annual financials update for ${qmSearchCode}`, { error }); errors++; } } diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/insiders.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/insiders.action.ts index c4d649c..ee2ae8f 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/insiders.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/insiders.action.ts @@ -233,30 +233,23 @@ export async function scheduleInsidersUpdates( }; } - // Get full symbol data - const symbolsToProcess = await this.mongodb.find('qmSymbols', { - qmSearchCode: { $in: staleSymbols } - }, { - projection: { symbol: 1, symbolId: 1, qmSearchCode: 1 } - }); - - this.logger.info(`Found ${symbolsToProcess.length} symbols for insider updates`); + this.logger.info(`Found ${staleSymbols.length} symbols for insider updates`); let symbolsQueued = 0; let errors = 0; // Schedule update jobs - for (const doc of symbolsToProcess) { + for (const item of staleSymbols) { try { - if (!doc.symbolId) { - this.logger.warn(`Symbol ${doc.symbol} missing symbolId, skipping`); + if (!item.symbol.symbolId) { + this.logger.warn(`Symbol ${item.symbol.symbol} missing symbolId, skipping`); continue; } await this.scheduleOperation('update-insiders', { - symbol: doc.symbol, - symbolId: doc.symbolId, - qmSearchCode: doc.qmSearchCode + symbol: item.symbol.symbol, + symbolId: item.symbol.symbolId, + qmSearchCode: item.symbol.qmSearchCode }, { priority: 5, // Medium priority delay: symbolsQueued * 1000 // 1 second between jobs @@ -264,7 +257,7 @@ export async function scheduleInsidersUpdates( symbolsQueued++; } catch (error) { - this.logger.error(`Failed to schedule insider update for ${doc.symbol}`, { error }); + this.logger.error(`Failed to schedule insider update for ${item.symbol.symbol}`, { error }); errors++; } } diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/news.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/news.action.ts index bce518f..d707389 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/news.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/news.action.ts @@ -395,30 +395,23 @@ export async function scheduleSymbolNewsUpdates( }; } - // Get full symbol data - const symbolsToProcess = await this.mongodb.find('qmSymbols', { - qmSearchCode: { $in: staleSymbols } - }, { - projection: { symbol: 1, symbolId: 1, qmSearchCode: 1 } - }); - - this.logger.info(`Found ${symbolsToProcess.length} symbols for news updates`); + this.logger.info(`Found ${staleSymbols.length} symbols for news updates`); let symbolsQueued = 0; let errors = 0; // Schedule update jobs - for (const doc of symbolsToProcess) { + for (const item of staleSymbols) { try { - if (!doc.symbolId) { - this.logger.warn(`Symbol ${doc.symbol} missing symbolId, skipping`); + if (!item.symbol.symbolId) { + this.logger.warn(`Symbol ${item.symbol.symbol} missing symbolId, skipping`); continue; } await this.scheduleOperation('update-symbol-news', { - symbol: doc.symbol, - symbolId: doc.symbolId, - qmSearchCode: doc.qmSearchCode + symbol: item.symbol.symbol, + symbolId: item.symbol.symbolId, + qmSearchCode: item.symbol.qmSearchCode }, { priority: 4, // Lower priority than price data delay: symbolsQueued * 500 // 0.5 seconds between jobs @@ -426,7 +419,7 @@ export async function scheduleSymbolNewsUpdates( symbolsQueued++; } catch (error) { - this.logger.error(`Failed to schedule news update for ${doc.symbol}`, { error }); + this.logger.error(`Failed to schedule news update for ${item.symbol.symbol}`, { error }); errors++; } } diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/prices.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/prices.action.ts index a3ba4fb..0406ba7 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/prices.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/prices.action.ts @@ -203,24 +203,17 @@ export async function schedulePriceUpdates( this.logger.info(`Found ${staleSymbols.length} symbols needing price updates`); - // Get full symbol data to include symbolId - const symbolDocs = await this.mongodb.find('qmSymbols', { - qmSearchCode: { $in: staleSymbols } - }, { - projection: { qmSearchCode: 1, operations: 1, symbol: 1, exchange: 1 }, - }); - let queued = 0; let errors = 0; // Schedule individual update jobs for each symbol - for (const doc of symbolDocs) { + for (const item of staleSymbols) { try { await this.scheduleOperation('update-prices', { - qmSearchCode: doc.qmSearchCode, - lastRecordDate: doc.operations?.price_update?.lastRecordDate, - symbol: doc.symbol, - exchange: doc.exchange + qmSearchCode: item.symbol.qmSearchCode, + lastRecordDate: item.operations?.price_update?.lastRecordDate, + symbol: item.symbol.symbol, + exchange: item.symbol.exchange }, { priority: 7, // High priority for price data delay: queued * 100 // 0.1 seconds between jobs @@ -228,7 +221,7 @@ export async function schedulePriceUpdates( queued++; } catch (error) { - this.logger.error(`Failed to schedule price update for ${doc.qmSearchCode}`, { error }); + this.logger.error(`Failed to schedule price update for ${item.symbol.qmSearchCode}`, { error }); errors++; } } diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/symbol-info.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/symbol-info.action.ts index 6571864..fa69a64 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/symbol-info.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/symbol-info.action.ts @@ -168,21 +168,14 @@ export async function scheduleSymbolInfoUpdates( this.logger.info(`Found ${staleSymbols.length} symbols needing info updates`); - // Get full symbol data to include qmSearchCode - const symbolDocs = await this.mongodb.find('qmSymbols', { - qmSearchCode: { $in: staleSymbols } - }, { - projection: { qmSearchCode: 1 } - }); - let queued = 0; let errors = 0; // Schedule individual update jobs for each symbol - for (const doc of symbolDocs) { + for (const item of staleSymbols) { try { await this.scheduleOperation('update-symbol-info', { - qmSearchCode: doc.qmSearchCode + qmSearchCode: item.symbol.qmSearchCode }, { // priority: 3, // Add some delay to avoid overwhelming the API @@ -191,7 +184,7 @@ export async function scheduleSymbolInfoUpdates( queued++; } catch (error) { - this.logger.error(`Failed to schedule update for ${doc.qmSearchCode}`, { error }); + this.logger.error(`Failed to schedule update for ${item.symbol.qmSearchCode}`, { error }); errors++; } } diff --git a/apps/stock/data-ingestion/src/shared/operation-manager/OperationRegistry.ts b/apps/stock/data-ingestion/src/shared/operation-manager/OperationRegistry.ts index b5abddc..221628b 100644 --- a/apps/stock/data-ingestion/src/shared/operation-manager/OperationRegistry.ts +++ b/apps/stock/data-ingestion/src/shared/operation-manager/OperationRegistry.ts @@ -147,7 +147,7 @@ export class OperationRegistry { providerName: string, operationName: string, options?: StaleSymbolOptions - ): Promise { + ): Promise> { const tracker = this.getTracker(providerName); return tracker.getStaleSymbols(operationName, options); } diff --git a/apps/stock/data-ingestion/src/shared/operation-manager/OperationTracker.ts b/apps/stock/data-ingestion/src/shared/operation-manager/OperationTracker.ts index af3cbbf..907e130 100644 --- a/apps/stock/data-ingestion/src/shared/operation-manager/OperationTracker.ts +++ b/apps/stock/data-ingestion/src/shared/operation-manager/OperationTracker.ts @@ -312,7 +312,7 @@ export class OperationTracker { async getStaleSymbols( operationName: string, options: StaleSymbolOptions = {} - ): Promise { + ): Promise> { const { collectionName, symbolField } = this.provider.getProviderConfig(); const { limit = 1000, @@ -353,11 +353,16 @@ export class OperationTracker { const symbols = await this.mongodb.find(collectionName, filter, { limit, - projection: { [symbolField]: 1 }, + projection: { }, // Return all fields sort: { [`operations.${operationName}.lastSuccessAt`]: 1 } }); - return symbols.map(doc => doc[symbolField]); + return symbols.map(doc => ({ + symbol: doc, + lastRun: doc.operations?.[operationName]?.lastRunAt, + lastSuccess: doc.operations?.[operationName]?.lastSuccessAt, + operations: doc.operations + })); } /** diff --git a/libs/core/config/src/index.ts b/libs/core/config/src/index.ts index 212be5e..8acb702 100644 --- a/libs/core/config/src/index.ts +++ b/libs/core/config/src/index.ts @@ -15,6 +15,13 @@ export { mongodbConfigSchema, postgresConfigSchema, questdbConfigSchema, + providerConfigSchema, + providerSchemas, + eodProviderConfigSchema, + ibProviderConfigSchema, + qmProviderConfigSchema, + yahooProviderConfigSchema, + type ProviderName, } from './schemas'; // createAppConfig function for apps/stock