diff --git a/apps/stock/data-ingestion/scripts/migrate-price-tracking-mcp.ts b/apps/stock/data-ingestion/scripts/migrate-price-tracking-mcp.ts new file mode 100644 index 0000000..c4ff49e --- /dev/null +++ b/apps/stock/data-ingestion/scripts/migrate-price-tracking-mcp.ts @@ -0,0 +1,93 @@ +import { MongoClient } from 'mongodb'; + +async function migratePriceTracking() { + const client = new MongoClient('mongodb://localhost:27017'); + await client.connect(); + + const mongodb = client.db('stock'); + + try { + console.log('Starting price tracking migration...'); + + const collection = mongodb.collection('eodSymbols'); + const batchSize = 100; + let processedCount = 0; + let hasMore = true; + + while (hasMore) { + // Find documents that need migration + const documents = await collection.find({ + lastPriceUpdate: { $exists: true }, + 'operations.price_update': { $exists: false } + }).limit(batchSize).toArray(); + + if (documents.length === 0) { + hasMore = false; + break; + } + + // Process each document + for (const doc of documents) { + // Normalize date to 00:00:00 UTC + const lastPriceUpdate = new Date(doc.lastPriceUpdate); + const normalizedDate = new Date(Date.UTC( + lastPriceUpdate.getUTCFullYear(), + lastPriceUpdate.getUTCMonth(), + lastPriceUpdate.getUTCDate(), + 0, 0, 0, 0 + )); + + // Parse lastPriceDate if it exists + let lastRecordDate = null; + if (doc.lastPriceDate) { + try { + lastRecordDate = new Date(doc.lastPriceDate); + } catch (e) { + console.warn(`Failed to parse lastPriceDate for ${doc.Code}: ${doc.lastPriceDate}`); + } + } + + // Update the document + await collection.updateOne( + { _id: doc._id }, + { + $set: { + 'operations.price_update': { + lastRunAt: normalizedDate, + lastSuccessAt: normalizedDate, + status: 'success', + ...(lastRecordDate && { lastRecordDate }) + } + } + } + ); + + processedCount++; + if (processedCount % 1000 === 0) { + console.log(`Processed ${processedCount} documents...`); + } + } + } + + console.log(`Migration completed. Total documents migrated: ${processedCount}`); + + // Optional: Remove old fields + const removeOldFields = false; // Set to true to remove old fields + if (removeOldFields) { + console.log('Removing old fields...'); + const result = await collection.updateMany( + {}, + { $unset: { lastPriceUpdate: '', lastPriceDate: '' } } + ); + console.log(`Removed old fields from ${result.modifiedCount} documents`); + } + + } catch (error) { + console.error('Migration failed:', error); + } finally { + await client.close(); + } +} + +// Run the migration +migratePriceTracking().catch(console.error); \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/corporate-actions.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/corporate-actions.ts index e6fab3f..7fb9924 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/actions/corporate-actions.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/corporate-actions.ts @@ -1,5 +1,6 @@ import type { BaseHandler } from '@stock-bot/handlers'; import type { DataIngestionServices } from '../../../types'; +import type { EodHandler } from '../eod.handler'; import { EOD_CONFIG } from '../shared'; import { getEodExchangeSuffix } from '../shared/utils'; @@ -11,52 +12,50 @@ interface FetchCorporateActionsInput { } export async function scheduleFetchCorporateActions( - this: BaseHandler + this: EodHandler ): Promise<{ success: boolean; jobsScheduled: number }> { const logger = this.logger; try { logger.info('Scheduling corporate actions fetch jobs'); - // Calculate date one month ago for stale check - const oneMonthAgo = new Date(); - oneMonthAgo.setMonth(oneMonthAgo.getMonth() - 1); - // Get Canadian exchanges for now const canadianExchanges = ['TO', 'V', 'CN', 'NEO']; - // Find symbols that need corporate actions update - const symbols = await this.mongodb.collection('eodSymbols').find({ - Exchange: { $in: canadianExchanges }, - delisted: false, - $or: [ - { lastCorporateActionsUpdate: { $lt: oneMonthAgo } }, - { lastCorporateActionsUpdate: { $exists: false } } - ] - }).limit(500).toArray(); // Limit to avoid too many jobs at once + // Use OperationTracker to find stale symbols for both dividends and splits + const allStaleDividends = await this.operationRegistry.getStaleSymbols('eod', 'dividends_update', { + limit: 2000 // Get more symbols to filter from + }); - if (!symbols || symbols.length === 0) { + const allStaleSplits = await this.operationRegistry.getStaleSymbols('eod', 'splits_update', { + limit: 2000 // Get more symbols to filter from + }); + + // Filter for Canadian exchanges and non-delisted symbols + const staleSymbolsDividends = allStaleDividends.filter(item => + canadianExchanges.includes(item.symbol.Exchange) && + item.symbol.delisted === false + ).slice(0, 500); + + const staleSymbolsSplits = allStaleSplits.filter(item => + canadianExchanges.includes(item.symbol.Exchange) && + item.symbol.delisted === false + ).slice(0, 500); + + if ((!staleSymbolsDividends || staleSymbolsDividends.length === 0) && + (!staleSymbolsSplits || staleSymbolsSplits.length === 0)) { logger.info('No symbols need corporate actions update'); return { success: true, jobsScheduled: 0 }; } - logger.info(`Found ${symbols.length} symbols needing corporate actions update`, { - count: symbols.length, - samples: symbols.slice(0, 5).map(s => ({ - symbol: s.Code, - exchange: s.Exchange, - name: s.Name, - lastUpdate: s.lastCorporateActionsUpdate - })) - }); + logger.info(`Found ${staleSymbolsDividends.length} symbols needing dividends update and ${staleSymbolsSplits.length} symbols needing splits update`); let jobsScheduled = 0; - // Schedule jobs for each symbol - both dividends and splits - for (let i = 0; i < symbols.length; i++) { - const symbol = symbols[i]; + // Schedule dividends jobs + for (let i = 0; i < staleSymbolsDividends.length; i++) { + const { symbol } = staleSymbolsDividends[i]; - // Schedule dividends fetch await this.scheduleOperation('fetch-corporate-actions', { symbol: symbol.Code, exchange: symbol.eodExchange || symbol.Exchange, // Use eodExchange if available @@ -71,8 +70,12 @@ export async function scheduleFetchCorporateActions( delay: i * 200 // Stagger jobs by 200ms per symbol }); jobsScheduled++; + } + + // Schedule splits jobs + for (let i = 0; i < staleSymbolsSplits.length; i++) { + const { symbol } = staleSymbolsSplits[i]; - // Schedule splits fetch await this.scheduleOperation('fetch-corporate-actions', { symbol: symbol.Code, exchange: symbol.eodExchange || symbol.Exchange, // Use eodExchange if available @@ -102,7 +105,7 @@ export async function scheduleFetchCorporateActions( } export async function fetchCorporateActions( - this: BaseHandler, + this: EodHandler, input: FetchCorporateActionsInput ): Promise<{ success: boolean; recordsCount: number }> { const logger = this.logger; @@ -204,17 +207,17 @@ export async function fetchCorporateActions( ['date', 'symbolExchange'] ); - // Update symbol with last update timestamp - await this.mongodb.collection('eodSymbols').updateOne( - { Code: symbol, Exchange: exchange }, - { - $set: { - lastCorporateActionsUpdate: new Date(), - [`last${actionType.charAt(0).toUpperCase() + actionType.slice(1)}Update`]: new Date(), - [`has${actionType.charAt(0).toUpperCase() + actionType.slice(1)}`]: true - } + // Update operation tracker based on action type + const operationName = actionType === 'dividends' ? 'dividends_update' : 'splits_update'; + await this.operationRegistry.updateOperation('eod', symbol, operationName, { + status: 'success', + recordCount: result.insertedCount, + metadata: { + actionType: actionType, + exchange: exchange, + hasData: result.insertedCount > 0 } - ); + }); logger.info(`Successfully saved ${result.insertedCount} ${actionType} records for ${symbol}.${exchange}`); @@ -224,6 +227,14 @@ export async function fetchCorporateActions( }; } catch (error) { logger.error('Failed to fetch corporate actions', { error, symbol, exchange, actionType }); + + // Update operation tracker with failure + const operationName = actionType === 'dividends' ? 'dividends_update' : 'splits_update'; + await this.operationRegistry.updateOperation('eod', symbol, operationName, { + status: 'failure', + error: error.message + }); + throw error; } } \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/fundamentals.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/fundamentals.ts index 8ac0378..878d00e 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/actions/fundamentals.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/fundamentals.ts @@ -1,5 +1,6 @@ import type { BaseHandler } from '@stock-bot/handlers'; import type { DataIngestionServices } from '../../../types'; +import type { EodHandler } from '../eod.handler'; import { EOD_CONFIG } from '../shared'; import { getEodExchangeSuffix } from '../shared/utils'; @@ -14,52 +15,49 @@ interface FetchSingleFundamentalsInput { } export async function scheduleFetchFundamentals( - this: BaseHandler + this: EodHandler ): Promise<{ success: boolean; jobsScheduled: number }> { const logger = this.logger; try { logger.info('Scheduling fundamentals fetch jobs'); - // Calculate date one week ago for stale check - const oneWeekAgo = new Date(); - oneWeekAgo.setDate(oneWeekAgo.getDate() - 7); - // Get Canadian exchanges for now const canadianExchanges = ['TO', 'V', 'CN', 'NEO']; - // Find symbols that need fundamentals update - const symbols = await this.mongodb.collection('eodSymbols').find({ - Exchange: { $in: canadianExchanges }, - delisted: false, - $or: [ - { lastFundamentalsUpdate: { $lt: oneWeekAgo } }, - { lastFundamentalsUpdate: { $exists: false } } - ] - }).toArray(); + // Use OperationTracker to find stale symbols + const allStaleSymbols = await this.operationRegistry.getStaleSymbols('eod', 'fundamentals_update', { + limit: 5000 // Get more symbols to filter from + }); - if (!symbols || symbols.length === 0) { + // Filter for Canadian exchanges and non-delisted symbols + const staleSymbols = allStaleSymbols.filter(item => + canadianExchanges.includes(item.symbol.Exchange) && + item.symbol.delisted === false + ).slice(0, 1000); // Limit to 1000 after filtering + + if (!staleSymbols || staleSymbols.length === 0) { logger.info('No symbols need fundamentals update'); return { success: true, jobsScheduled: 0 }; } - logger.info(`Found ${symbols.length} symbols needing fundamentals update`); + logger.info(`Found ${staleSymbols.length} symbols needing fundamentals update`); // Separate ETFs from regular stocks - const etfs = symbols.filter(s => s.Type === 'ETF'); - const nonEtfs = symbols.filter(s => s.Type !== 'ETF'); + const etfs = staleSymbols.filter(s => s.symbol.Type === 'ETF'); + const nonEtfs = staleSymbols.filter(s => s.symbol.Type !== 'ETF'); logger.info(`Found ${etfs.length} ETFs and ${nonEtfs.length} non-ETFs`, { etfSamples: etfs.slice(0, 5).map(e => ({ - symbol: e.Code, - exchange: e.Exchange, - name: e.Name + symbol: e.symbol.Code, + exchange: e.symbol.Exchange, + name: e.symbol.Name })), nonEtfSamples: nonEtfs.slice(0, 5).map(s => ({ - symbol: s.Code, - exchange: s.Exchange, - name: s.Name, - type: s.Type + symbol: s.symbol.Code, + exchange: s.symbol.Exchange, + name: s.symbol.Name, + type: s.symbol.Type })) }); @@ -67,7 +65,7 @@ export async function scheduleFetchFundamentals( // Schedule individual jobs for ETFs for (let i = 0; i < etfs.length; i++) { - const etf = etfs[i]; + const { symbol: etf } = etfs[i]; await this.scheduleOperation('fetch-single-fundamentals', { symbol: etf.Code, exchange: etf.eodExchange || etf.Exchange, // Use eodExchange if available @@ -95,9 +93,9 @@ export async function scheduleFetchFundamentals( // Convert to array of {symbol, exchange, country} objects const symbolBatch = batch.map(s => ({ - symbol: s.Code, - exchange: s.eodExchange || s.Exchange, // Use eodExchange if available - country: s.Country + symbol: s.symbol.Code, + exchange: s.symbol.eodExchange || s.symbol.Exchange, // Use eodExchange if available + country: s.symbol.Country })); await this.scheduleOperation('fetch-bulk-fundamentals', { @@ -128,7 +126,7 @@ export async function scheduleFetchFundamentals( } export async function fetchBulkFundamentals( - this: BaseHandler, + this: EodHandler, input: BulkFundamentalsInput ): Promise<{ success: boolean; symbolsProcessed: number }> { const logger = this.logger; @@ -221,17 +219,16 @@ export async function fetchBulkFundamentals( logger.info(`Saved ${result.insertedCount} fundamentals records for ${exchange}`); - // Update symbols with last update timestamp + // Update operation tracker for each symbol const updatePromises = symbolsToUpdate.map(({ symbol, exchange }) => - this.mongodb.collection('eodSymbols').updateOne( - { Code: symbol, Exchange: exchange }, - { - $set: { - lastFundamentalsUpdate: new Date(), - hasFundamentals: true - } + this.operationRegistry.updateOperation('eod', symbol, 'fundamentals_update', { + status: 'success', + recordCount: 1, + metadata: { + hasData: true, + exchange: exchange } - ) + }) ); await Promise.all(updatePromises); @@ -247,12 +244,22 @@ export async function fetchBulkFundamentals( }; } catch (error) { logger.error('Failed to fetch bulk fundamentals', { error }); + + // Mark all symbols as failed + const failPromises = input.symbols.map(({ symbol }) => + this.operationRegistry.updateOperation('eod', symbol, 'fundamentals_update', { + status: 'failure', + error: error.message + }) + ); + + await Promise.all(failPromises); throw error; } } export async function fetchSingleFundamentals( - this: BaseHandler, + this: EodHandler, input: FetchSingleFundamentalsInput ): Promise<{ success: boolean; saved: boolean }> { const logger = this.logger; @@ -321,16 +328,16 @@ export async function fetchSingleFundamentals( ['symbolExchange'] ); - // Update symbol with last update timestamp - await this.mongodb.collection('eodSymbols').updateOne( - { Code: symbol, Exchange: exchange }, - { - $set: { - lastFundamentalsUpdate: new Date(), - hasFundamentals: true - } + // Update operation tracker + await this.operationRegistry.updateOperation('eod', symbol, 'fundamentals_update', { + status: 'success', + recordCount: result.insertedCount, + metadata: { + hasData: true, + exchange: exchange, + saved: result.insertedCount > 0 } - ); + }); logger.info(`Successfully saved fundamentals for ${symbol}.${exchange}`); @@ -340,6 +347,13 @@ export async function fetchSingleFundamentals( }; } catch (error) { logger.error('Failed to fetch single fundamentals', { error, symbol, exchange }); + + // Update operation tracker with failure + await this.operationRegistry.updateOperation('eod', symbol, 'fundamentals_update', { + status: 'failure', + error: error.message + }); + throw error; } } \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/intraday.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/intraday.ts index af4b567..e5a871d 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/actions/intraday.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/intraday.ts @@ -1,5 +1,6 @@ import type { BaseHandler } from '@stock-bot/handlers'; import type { DataIngestionServices } from '../../../types'; +import type { EodHandler } from '../eod.handler'; import { EOD_CONFIG } from '../shared'; import { getEodExchangeSuffix } from '../shared/utils'; @@ -36,7 +37,7 @@ const MAX_DAYS_PER_INTERVAL = { }; export async function scheduleIntradayCrawl( - this: BaseHandler + this: EodHandler ): Promise<{ success: boolean; jobsScheduled: number }> { const logger = this.logger; @@ -46,61 +47,74 @@ export async function scheduleIntradayCrawl( // Get Canadian exchanges for now const canadianExchanges = ['TO', 'V', 'CN', 'NEO']; - // Find active symbols that need intraday data - const symbols = await this.mongodb.collection('eodSymbols').find({ - Exchange: { $in: canadianExchanges }, - delisted: false, - // Only symbols without complete intraday data - $or: [ - { 'intradayState.1m.finished': { $ne: true } }, - { 'intradayState.5m.finished': { $ne: true } }, - { 'intradayState.1h.finished': { $ne: true } }, - { 'intradayState': { $exists: false } } - ] - }).limit(100).toArray(); // Limit to avoid too many jobs at once + // Use OperationTracker to find symbols needing intraday crawl + const intervals: Array<'1m' | '5m' | '1h'> = ['1m', '5m', '1h']; + const operationNames = ['intraday_1m', 'intraday_5m', 'intraday_1h']; - if (!symbols || symbols.length === 0) { + let allSymbolsForCrawl: any[] = []; + + // Get symbols needing crawl for each interval + for (let i = 0; i < intervals.length; i++) { + const interval = intervals[i]; + const operationName = operationNames[i]; + + const allSymbolsForInterval = await this.operationRegistry.getSymbolsForIntradayCrawl('eod', operationName, { + limit: 500 // Get more symbols to filter from + }); + + // Filter for Canadian exchanges and non-delisted symbols + const symbolsForInterval = allSymbolsForInterval.filter(item => + canadianExchanges.includes(item.symbol.Exchange) && + item.symbol.delisted === false + ).slice(0, 100); + + // Add interval info to each symbol + symbolsForInterval.forEach(item => { + allSymbolsForCrawl.push({ + symbol: item.symbol, + interval: interval, + operationName: operationName, + crawlState: item.crawlState + }); + }); + } + + if (!allSymbolsForCrawl || allSymbolsForCrawl.length === 0) { logger.info('No symbols need intraday crawl'); return { success: true, jobsScheduled: 0 }; } - logger.info(`Found ${symbols.length} symbols needing intraday data`, { - count: symbols.length, - samples: symbols.slice(0, 5).map(s => ({ - symbol: s.Code, - exchange: s.Exchange, - name: s.Name, - intradayState: s.intradayState + logger.info(`Found ${allSymbolsForCrawl.length} symbol/interval combinations needing intraday data`, { + count: allSymbolsForCrawl.length, + samples: allSymbolsForCrawl.slice(0, 5).map(s => ({ + symbol: s.symbol.Code, + exchange: s.symbol.Exchange, + name: s.symbol.Name, + interval: s.interval, + crawlState: s.crawlState })) }); let jobsScheduled = 0; - const intervals: Array<'1m' | '5m' | '1h'> = ['1m', '5m', '1h']; - // Schedule crawl jobs for each symbol and interval - for (const symbol of symbols) { - for (const interval of intervals) { - // Check if this interval is already finished - const isFinished = symbol.intradayState?.[interval]?.finished; - if (isFinished) { - continue; - } - - await this.scheduleOperation('crawl-intraday', { - symbol: symbol.Code, - exchange: symbol.eodExchange || symbol.Exchange, // Use eodExchange if available - interval, - country: symbol.Country - }, { - attempts: 3, - backoff: { - type: 'exponential', - delay: 10000 - }, - delay: jobsScheduled * 500 // Stagger jobs by 500ms - }); - jobsScheduled++; - } + // Schedule crawl jobs for each symbol/interval combination + for (const item of allSymbolsForCrawl) { + const { symbol, interval } = item; + + await this.scheduleOperation('crawl-intraday', { + symbol: symbol.Code, + exchange: symbol.eodExchange || symbol.Exchange, // Use eodExchange if available + interval, + country: symbol.Country + }, { + attempts: 3, + backoff: { + type: 'exponential', + delay: 10000 + }, + delay: jobsScheduled * 500 // Stagger jobs by 500ms + }); + jobsScheduled++; } logger.info(`Successfully scheduled ${jobsScheduled} intraday crawl jobs`); @@ -116,7 +130,7 @@ export async function scheduleIntradayCrawl( } export async function crawlIntraday( - this: BaseHandler, + this: EodHandler, input: CrawlIntradayInput ): Promise<{ success: boolean; recordsProcessed: number; finished: boolean }> { const logger = this.logger; @@ -240,7 +254,7 @@ export async function crawlIntraday( } export async function fetchIntraday( - this: BaseHandler, + this: EodHandler, input: FetchIntradayInput ): Promise<{ success: boolean; recordsSaved: number }> { const logger = this.logger; diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/prices.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/prices.ts index 1a3dac9..590a1e4 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/actions/prices.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/prices.ts @@ -1,5 +1,6 @@ import type { BaseHandler } from '@stock-bot/handlers'; import type { DataIngestionServices } from '../../../types'; +import type { EodHandler } from '../eod.handler'; import { EOD_CONFIG } from '../shared'; import { getEodExchangeSuffix } from '../shared/utils'; @@ -10,48 +11,40 @@ interface FetchPricesInput { } export async function scheduleFetchPrices( - this: BaseHandler + this: EodHandler ): Promise<{ success: boolean; jobsScheduled: number }> { const logger = this.logger; try { logger.info('Scheduling price fetch jobs for all symbols'); - // Calculate date one week ago - const oneWeekAgo = new Date(); - oneWeekAgo.setDate(oneWeekAgo.getDate() - 7); + // Use OperationTracker to find stale symbols + const staleSymbols = await this.operationRegistry.getStaleSymbols('eod', 'price_update', { + limit: 1000 // Process in batches to avoid overwhelming the system + }); - // Find ALL symbols that haven't been updated in the last week - const symbols = await this.mongodb.collection('eodSymbols').find({ - delisted: false, - $or: [ - { lastPriceUpdate: { $lt: oneWeekAgo } }, - { lastPriceUpdate: { $exists: false } } - ] - }).toArray(); - - if (!symbols || symbols.length === 0) { + if (!staleSymbols || staleSymbols.length === 0) { logger.info('No symbols need price updates'); return { success: true, jobsScheduled: 0 }; } - logger.info(`Found ${symbols.length} symbols needing price updates`, { - symbols: symbols.map(s => ({ - symbol: s.Code, - exchange: s.Exchange, - name: s.Name, - lastUpdate: s.lastPriceUpdate + logger.info(`Found ${staleSymbols.length} symbols needing price updates`, { + symbols: staleSymbols.slice(0, 10).map(s => ({ + symbol: s.symbol.Code, + exchange: s.symbol.Exchange, + name: s.symbol.Name, + lastUpdate: s.lastRun })) }); let jobsScheduled = 0; // Schedule jobs with staggered delays - for (let i = 0; i < symbols.length; i++) { - const symbol = symbols[i]; + for (let i = 0; i < staleSymbols.length; i++) { + const { symbol } = staleSymbols[i]; logger.debug(`Scheduling price fetch for ${symbol.Code}.${symbol.Exchange}`, { name: symbol.Name, - lastUpdate: symbol.lastPriceUpdate, + lastUpdate: staleSymbols[i].lastRun, delay: i * 100 }); @@ -83,7 +76,7 @@ export async function scheduleFetchPrices( } export async function fetchPrices( - this: BaseHandler, + this: EodHandler, input: FetchPricesInput ): Promise<{ success: boolean; priceCount: number }> { const logger = this.logger; @@ -165,16 +158,16 @@ export async function fetchPrices( ['date', 'symbolExchange'] ); - // Update the symbol's last price update timestamp - await this.mongodb.collection('eodSymbols').updateOne( - { Code: symbol, Exchange: exchange }, - { - $set: { - lastPriceUpdate: new Date(), - lastPriceDate: priceData.length > 0 ? priceData[priceData.length - 1].date : null - } + // Update operation tracker instead of directly updating the symbol + await this.operationRegistry.updateOperation('eod', symbol, 'price_update', { + status: 'success', + lastRecordDate: priceData.length > 0 ? priceData[priceData.length - 1].date : null, + recordCount: priceData.length, + metadata: { + insertedCount: result.insertedCount, + updatedCount: priceData.length - result.insertedCount } - ); + }); logger.info(`Successfully saved ${result.insertedCount} price records for ${symbol}.${exchange}`); @@ -184,6 +177,13 @@ export async function fetchPrices( }; } catch (error) { logger.error('Failed to fetch or save prices', { error, symbol, exchange }); + + // Update operation tracker with failure + await this.operationRegistry.updateOperation('eod', symbol, 'price_update', { + status: 'failure', + error: error.message + }); + throw error; } } \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts b/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts index b481efd..1cc4563 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts @@ -5,23 +5,25 @@ import { RateLimit, ScheduledOperation } from '@stock-bot/handlers'; +import type { OperationRegistry } from '../../shared/operation-manager'; import type { DataIngestionServices } from '../../types'; -import { - fetchExchanges, - fetchSymbols, - scheduleFetchSymbols, - fetchPrices, - scheduleFetchPrices, - fetchIntraday, +import { crawlIntraday, - scheduleIntradayCrawl, fetchBulkFundamentals, - fetchSingleFundamentals, - scheduleFetchFundamentals, - fetchSymbolChangeHistory, fetchCorporateActions, - scheduleFetchCorporateActions + fetchExchanges, + fetchIntraday, + fetchPrices, + fetchSingleFundamentals, + fetchSymbolChangeHistory, + fetchSymbols, + scheduleFetchCorporateActions, + scheduleFetchFundamentals, + scheduleFetchPrices, + scheduleFetchSymbols, + scheduleIntradayCrawl } from './actions'; +import { createEODOperationRegistry } from './shared'; /** * EOD (End of Day) Handler demonstrating advanced rate limiting @@ -37,10 +39,18 @@ import { ], }) export class EodHandler extends BaseHandler { + public operationRegistry!: OperationRegistry; + constructor(services: any) { super(services); } + async initialize(): Promise { + // Initialize operation registry + this.operationRegistry = await createEODOperationRegistry(this.mongodb, this.logger); + this.logger.info('EOD operation registry initialized'); + } + /** * Fetch exchanges list from EOD * Runs weekly on Sundays at midnight diff --git a/apps/stock/data-ingestion/src/handlers/eod/shared/index.ts b/apps/stock/data-ingestion/src/handlers/eod/shared/index.ts index 34772c4..3b768ce 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/shared/index.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/shared/index.ts @@ -1,2 +1,3 @@ export * from './config'; export * from './utils'; +export * from './operation-provider'; diff --git a/apps/stock/data-ingestion/src/handlers/eod/shared/operation-provider.ts b/apps/stock/data-ingestion/src/handlers/eod/shared/operation-provider.ts new file mode 100644 index 0000000..86d5221 --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/eod/shared/operation-provider.ts @@ -0,0 +1,118 @@ +/** + * EOD Operation Provider - Defines operations for EOD Historical Data source + */ + +import { BaseOperationProvider, OperationRegistry, type OperationConfig, type ProviderConfig } from '../../../shared/operation-manager'; + +/** + * EOD operation definitions + */ +export const EOD_OPERATIONS: OperationConfig[] = [ + // Exchange data + { + name: 'exchange_update', + type: 'standard', + description: 'Update exchange list', + defaultStaleHours: 24 * 7 // Weekly + }, + + // Symbol data + { + name: 'symbol_update', + type: 'standard', + description: 'Update symbol list for exchange', + defaultStaleHours: 24 // Daily + }, + + // Price data + { + name: 'price_update', + type: 'standard', + description: 'Update daily price data', + defaultStaleHours: 24 * 7 // Weekly - EOD's schedule-fetch-prices runs for symbols not updated in last week + }, + + // Intraday data + { + name: 'intraday_1m', + type: 'intraday_crawl', + description: 'Crawl 1-minute intraday data', + requiresFinishedFlag: true, + defaultStaleHours: 24 // Daily check for new data + }, + { + name: 'intraday_5m', + type: 'intraday_crawl', + description: 'Crawl 5-minute intraday data', + requiresFinishedFlag: true, + defaultStaleHours: 24 // Daily check for new data + }, + { + name: 'intraday_1h', + type: 'intraday_crawl', + description: 'Crawl 1-hour intraday data', + requiresFinishedFlag: true, + defaultStaleHours: 24 // Daily check for new data + }, + + // Fundamental data + { + name: 'fundamentals_update', + type: 'standard', + description: 'Update fundamental data', + defaultStaleHours: 24 * 7 // Weekly + }, + + // Corporate actions + { + name: 'dividends_update', + type: 'standard', + description: 'Update dividend data', + defaultStaleHours: 24 * 30 // Monthly + }, + { + name: 'splits_update', + type: 'standard', + description: 'Update stock split data', + defaultStaleHours: 24 * 30 // Monthly + }, + + // Symbol changes + { + name: 'symbol_change_update', + type: 'standard', + description: 'Update symbol change history', + defaultStaleHours: 24 * 7 // Weekly + } +]; + +/** + * EOD Operation Provider + */ +export class EODOperationProvider extends BaseOperationProvider { + getProviderConfig(): ProviderConfig { + return { + name: 'eod', + collectionName: 'eodSymbols', + symbolField: 'Code', + description: 'EOD Historical Data provider' + }; + } + + getOperations(): OperationConfig[] { + return EOD_OPERATIONS; + } +} + +/** + * Create and initialize EOD operation registry + */ +export async function createEODOperationRegistry( + mongodb: any, + logger: any +): Promise { + const registry = new OperationRegistry({ mongodb, logger }); + const provider = new EODOperationProvider({ mongodb, logger }); + await registry.registerProvider(provider); + return registry; +} \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 4f3407e..fb4c245 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -85,7 +85,7 @@ services: # Dragonfly - Redis replacement for caching and events MONGO_INITDB_DATABASE: stock ports: - "27017:27017" - command: --wiredTigerCacheSizeGB 8 + command: --wiredTigerCacheSizeGB 20 volumes: - mongodb_data:/data/db - ./database/mongodb/init:/docker-entrypoint-initdb.d diff --git a/price-migration.mongodb.js b/price-migration.mongodb.js new file mode 100644 index 0000000..e15d60f --- /dev/null +++ b/price-migration.mongodb.js @@ -0,0 +1,73 @@ +// MongoDB Shell Script for Price Tracking Migration +// Run with: mongosh mongodb://username:password@localhost:27017/stock price-migration.mongodb.js + +print("Starting price tracking migration..."); + +const batchSize = 1000; +let processedCount = 0; +let cursor = db.eodSymbols.find({ + lastPriceUpdate: { $exists: true }, + 'operations.price_update': { $exists: false } +}); + +let batch = []; + +cursor.forEach(doc => { + // Normalize date to 00:00:00 UTC + const normalizedDate = new Date(doc.lastPriceUpdate); + normalizedDate.setUTCHours(0, 0, 0, 0); + + // Parse lastPriceDate if it exists + let lastRecordDate = null; + if (doc.lastPriceDate) { + try { + lastRecordDate = new Date(doc.lastPriceDate); + } catch (e) { + print(`Failed to parse lastPriceDate for ${doc.Code}: ${doc.lastPriceDate}`); + } + } + + batch.push({ + updateOne: { + filter: { _id: doc._id }, + update: { + $set: { + 'operations.price_update': { + lastRunAt: normalizedDate, + lastSuccessAt: normalizedDate, + status: 'success', + ...(lastRecordDate && { lastRecordDate }) + } + } + } + } + }); + + if (batch.length >= batchSize) { + db.eodSymbols.bulkWrite(batch); + processedCount += batch.length; + print(`Processed ${processedCount} documents...`); + batch = []; + } +}); + +// Process remaining batch +if (batch.length > 0) { + db.eodSymbols.bulkWrite(batch); + processedCount += batch.length; +} + +print(`Migration completed. Processed ${processedCount} documents.`); + +// Verify migration +const sampleDoc = db.eodSymbols.findOne({ 'operations.price_update': { $exists: true } }); +print("\nSample migrated document:"); +printjson(sampleDoc.operations); + +// Count remaining documents with old fields +const remainingCount = db.eodSymbols.countDocuments({ lastPriceUpdate: { $exists: true } }); +print(`\nDocuments still having lastPriceUpdate field: ${remainingCount}`); + +// Optional: Remove old fields (uncomment to execute) +// print("\nRemoving old fields..."); +// db.eodSymbols.updateMany({}, { $unset: { lastPriceUpdate: '', lastPriceDate: '' } }); \ No newline at end of file diff --git a/run-price-migration.js b/run-price-migration.js new file mode 100644 index 0000000..0a13c55 --- /dev/null +++ b/run-price-migration.js @@ -0,0 +1,78 @@ +// This script performs the migration of lastPriceUpdate and lastPriceDate fields +// to the operations tracking system + +// Since authentication is required, you'll need to run this script with proper MongoDB credentials +// Update the connection string below with your authentication details + +const MONGODB_URI = 'mongodb://username:password@localhost:27017/stock?authSource=admin'; +// Or use environment variable: process.env.MONGODB_URI + +async function runMigration() { + console.log(` +======================================== +PRICE TRACKING MIGRATION SCRIPT +======================================== + +This script will migrate lastPriceUpdate and lastPriceDate fields +from eodSymbols collection to the operations tracking system. + +To run this migration: + +1. Update the MONGODB_URI in this file with your MongoDB credentials + OR set the MONGODB_URI environment variable + +2. Run the script: + node run-price-migration.js + +The script will: +- Find all documents with lastPriceUpdate field +- Convert dates to normalized format (00:00:00 UTC) +- Create operations.price_update structure +- Process in batches of 1000 documents + +Total documents to migrate: ~199,175 + +Migration query that will be executed: +`); + + console.log(` +db.eodSymbols.find({ + lastPriceUpdate: { $exists: true }, + 'operations.price_update': { $exists: false } +}).forEach(doc => { + const normalizedDate = new Date(doc.lastPriceUpdate); + normalizedDate.setUTCHours(0, 0, 0, 0); + + let lastRecordDate = null; + if (doc.lastPriceDate) { + try { + lastRecordDate = new Date(doc.lastPriceDate); + } catch (e) {} + } + + db.eodSymbols.updateOne( + { _id: doc._id }, + { + $set: { + 'operations.price_update': { + lastRunAt: normalizedDate, + lastSuccessAt: normalizedDate, + status: 'success', + ...(lastRecordDate && { lastRecordDate }) + } + } + } + ); +}); +`); + + console.log(` +After migration, you can optionally remove old fields: +db.eodSymbols.updateMany({}, { $unset: { lastPriceUpdate: '', lastPriceDate: '' } }) + +To verify migration: +db.eodSymbols.findOne({ 'operations.price_update': { $exists: true } }) +`); +} + +runMigration(); \ No newline at end of file