From 5640444c47bcc303a83a4c13e5a06dd7e6e9be93 Mon Sep 17 00:00:00 2001 From: Boki Date: Sun, 29 Jun 2025 10:00:29 -0400 Subject: [PATCH] work on qm --- .../qm/actions/corporate-actions.action.ts | 14 +- .../src/handlers/qm/actions/filings.action.ts | 16 +- .../handlers/qm/actions/financials.action.ts | 14 +- .../handlers/qm/actions/intraday.action.ts | 13 +- .../src/handlers/qm/actions/prices.action.ts | 14 +- .../handlers/qm/actions/symbol-info.action.ts | 6 +- .../src/handlers/qm/shared/config.ts | 7 +- .../handlers/qm/shared/operation-tracker.ts | 882 +++++++++--------- .../src/handlers/qm/shared/types.ts | 2 +- 9 files changed, 492 insertions(+), 476 deletions(-) diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/corporate-actions.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/corporate-actions.action.ts index dfaf970..e7789fa 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/corporate-actions.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/corporate-actions.action.ts @@ -31,6 +31,7 @@ export async function updateCorporateActions( input: { symbol: string; symbolId: number; + qmSearchCode: string; }, _context?: ExecutionContext ): Promise<{ @@ -43,7 +44,7 @@ export async function updateCorporateActions( earnings: number; }; }> { - const { symbol, symbolId } = input; + const { symbol, symbolId, qmSearchCode } = input; this.logger.info('Fetching corporate actions', { symbol, symbolId }); @@ -138,7 +139,7 @@ export async function updateCorporateActions( // Update tracking for corporate actions const updateTime = new Date(); - await tracker.updateSymbolOperation(symbol, 'corporate_actions_update', { + await tracker.updateSymbolOperation(qmSearchCode, 'corporate_actions_update', { status: 'success', lastRecordDate: updateTime, recordCount: dividendCount + splitCount + earningsCount @@ -176,7 +177,7 @@ export async function updateCorporateActions( // Track failure for corporate actions const tracker = await getOperationTracker(this); - await tracker.updateSymbolOperation(symbol, 'corporate_actions_update', { + await tracker.updateSymbolOperation(qmSearchCode, 'corporate_actions_update', { status: 'failure' }); @@ -228,9 +229,9 @@ export async function scheduleCorporateActionsUpdates( // Get full symbol data to include symbolId const symbolDocs = await this.mongodb.find('qmSymbols', { - symbol: { $in: staleSymbols.slice(0, limit) } // Apply limit after deduplication + qmSearchCode: { $in: staleSymbols.slice(0, limit) } // Apply limit after deduplication }, { - projection: { symbol: 1, symbolId: 1 } + projection: { symbol: 1, symbolId: 1, qmSearchCode: 1 } }); let queued = 0; @@ -246,7 +247,8 @@ export async function scheduleCorporateActionsUpdates( await this.scheduleOperation('update-corporate-actions', { symbol: doc.symbol, - symbolId: doc.symbolId + symbolId: doc.symbolId, + qmSearchCode: doc.qmSearchCode }, { priority: 4, delay: queued * 1500 // 1.5 seconds between jobs diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/filings.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/filings.action.ts index 8b64d94..94b2064 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/filings.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/filings.action.ts @@ -30,6 +30,7 @@ export async function updateFilings( input: { symbol: string; symbolId: number; + qmSearchCode: string; }, _context?: ExecutionContext ): Promise<{ @@ -38,7 +39,7 @@ export async function updateFilings( message: string; data?: any; }> { - const { symbol, symbolId } = input; + const { symbol, symbolId, qmSearchCode } = input; this.logger.info('Fetching filings', { symbol, symbolId }); @@ -97,7 +98,7 @@ export async function updateFilings( // Update symbol to track last filings update const tracker = await getOperationTracker(this); - await tracker.updateSymbolOperation(symbol, 'filings_update', { + await tracker.updateSymbolOperation(qmSearchCode, 'filings_update', { status: 'success', lastRecordDate: new Date(), recordCount: filingsData.length @@ -117,7 +118,7 @@ export async function updateFilings( } else { // Some symbols may not have filings (non-US companies, etc) const tracker = await getOperationTracker(this); - await tracker.updateSymbolOperation(symbol, 'filings_update', { + await tracker.updateSymbolOperation(qmSearchCode, 'filings_update', { status: 'success', lastRecordDate: new Date(), recordCount: 0 @@ -145,7 +146,7 @@ export async function updateFilings( // Track failure const tracker = await getOperationTracker(this); - await tracker.updateSymbolOperation(symbol, 'filings_update', { + await tracker.updateSymbolOperation(qmSearchCode, 'filings_update', { status: 'failure' }); @@ -197,9 +198,9 @@ export async function scheduleFilingsUpdates( // Get full symbol data to include symbolId const symbolDocs = await this.mongodb.find('qmSymbols', { - symbol: { $in: staleSymbols } + qmSearchCode: { $in: staleSymbols } }, { - projection: { symbol: 1, symbolId: 1 } + projection: { symbol: 1, symbolId: 1, qmSearchCode: 1 } }); let queued = 0; @@ -215,7 +216,8 @@ export async function scheduleFilingsUpdates( await this.scheduleOperation('update-filings', { symbol: doc.symbol, - symbolId: doc.symbolId + symbolId: doc.symbolId, + qmSearchCode: doc.qmSearchCode }, { priority: 5, // Lower priority than financial data delay: queued * 2000 // 2 seconds between jobs diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/financials.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/financials.action.ts index 9ca4a57..a1eb31d 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/financials.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/financials.action.ts @@ -30,6 +30,7 @@ export async function updateFinancials( input: { symbol: string; symbolId: number; + qmSearchCode: string; }, _context?: ExecutionContext ): Promise<{ @@ -38,7 +39,7 @@ export async function updateFinancials( message: string; data?: any; }> { - const { symbol, symbolId } = input; + const { symbol, symbolId, qmSearchCode } = input; this.logger.info('Fetching financials', { symbol, symbolId }); @@ -96,7 +97,7 @@ export async function updateFinancials( // Update symbol to track last financials update const tracker = await getOperationTracker(this); - await tracker.updateSymbolOperation(symbol, 'financials_update', { + await tracker.updateSymbolOperation(qmSearchCode, 'financials_update', { status: 'success', lastRecordDate: new Date(), recordCount: financialData.length @@ -135,7 +136,7 @@ export async function updateFinancials( // Track failure const tracker = await getOperationTracker(this); - await tracker.updateSymbolOperation(symbol, 'financials_update', { + await tracker.updateSymbolOperation(qmSearchCode, 'financials_update', { status: 'failure', }); @@ -187,9 +188,9 @@ export async function scheduleFinancialsUpdates( // Get full symbol data to include symbolId const symbolDocs = await this.mongodb.find('qmSymbols', { - symbol: { $in: staleSymbols } + qmSearchCode: { $in: staleSymbols } }, { - projection: { symbol: 1, symbolId: 1 } + projection: { symbol: 1, symbolId: 1, qmSearchCode: 1 } }); let queued = 0; @@ -205,7 +206,8 @@ export async function scheduleFinancialsUpdates( await this.scheduleOperation('update-financials', { symbol: doc.symbol, - symbolId: doc.symbolId + symbolId: doc.symbolId, + qmSearchCode: doc.qmSearchCode }, { priority: 4, delay: queued * 2000 // 2 seconds between jobs diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/intraday.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/intraday.action.ts index 3965728..0f8dc58 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/intraday.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/intraday.action.ts @@ -31,6 +31,7 @@ export async function updateIntradayBars( input: { symbol: string; symbolId: number; + qmSearchCode: string; crawlDate?: string; // ISO date string for specific date crawl }, _context?: ExecutionContext @@ -40,7 +41,7 @@ export async function updateIntradayBars( message: string; data?: any; }> { - const { symbol, symbolId, crawlDate } = input; + const { symbol, symbolId, qmSearchCode, crawlDate } = input; this.logger.info('Fetching intraday bars', { symbol, symbolId, crawlDate }); @@ -203,9 +204,9 @@ export async function scheduleIntradayUpdates( // Get full symbol data symbolsToProcess = await this.mongodb.find('qmSymbols', { - symbol: { $in: staleSymbols } + qmSearchCode: { $in: staleSymbols } }, { - projection: { symbol: 1, symbolId: 1 } + projection: { symbol: 1, symbolId: 1, qmSearchCode: 1 } }); } @@ -245,6 +246,7 @@ export async function scheduleIntradayUpdates( await this.scheduleOperation('update-intraday-bars', { symbol: doc.symbol, symbolId: doc.symbolId, + qmSearchCode: doc.qmSearchCode, crawlDate: crawlDate.toISOString() }, { priority: 6, @@ -255,7 +257,7 @@ export async function scheduleIntradayUpdates( } // Update crawl state - await tracker.updateSymbolOperation(doc.symbol, 'intraday_bars', { + await tracker.updateSymbolOperation(doc.qmSearchCode, 'intraday_bars', { status: 'partial', crawlState: { finished: false, @@ -266,7 +268,8 @@ export async function scheduleIntradayUpdates( // For update mode, just fetch today's data await this.scheduleOperation('update-intraday-bars', { symbol: doc.symbol, - symbolId: doc.symbolId + symbolId: doc.symbolId, + qmSearchCode: doc.qmSearchCode }, { priority: 8, // High priority for current data delay: jobsQueued * 500 // 0.5 seconds between jobs diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/prices.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/prices.action.ts index 4195790..7b67788 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/prices.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/prices.action.ts @@ -30,6 +30,7 @@ export async function updatePrices( input: { symbol: string; symbolId: number; + qmSearchCode: string; }, _context?: ExecutionContext ): Promise<{ @@ -38,7 +39,7 @@ export async function updatePrices( message: string; data?: any; }> { - const { symbol, symbolId } = input; + const { symbol, symbolId, qmSearchCode } = input; this.logger.info('Fetching daily prices', { symbol, symbolId }); @@ -106,7 +107,7 @@ export async function updatePrices( // Update symbol to track last price update const tracker = await getOperationTracker(this); - await tracker.updateSymbolOperation(symbol, 'price_update', { + await tracker.updateSymbolOperation(qmSearchCode, 'price_update', { status: 'success', lastRecordDate: latestDate, recordCount: priceData.length @@ -149,7 +150,7 @@ export async function updatePrices( // Track failure const tracker = await getOperationTracker(this); - await tracker.updateSymbolOperation(symbol, 'price_update', { + await tracker.updateSymbolOperation(qmSearchCode, 'price_update', { status: 'failure' }); @@ -201,9 +202,9 @@ export async function schedulePriceUpdates( // Get full symbol data to include symbolId const symbolDocs = await this.mongodb.find('qmSymbols', { - symbol: { $in: staleSymbols } + qmSearchCode: { $in: staleSymbols } }, { - projection: { symbol: 1, symbolId: 1 } + projection: { symbol: 1, symbolId: 1, qmSearchCode: 1 } }); let queued = 0; @@ -219,7 +220,8 @@ export async function schedulePriceUpdates( await this.scheduleOperation('update-prices', { symbol: doc.symbol, - symbolId: doc.symbolId + symbolId: doc.symbolId, + qmSearchCode: doc.qmSearchCode }, { priority: 7, // High priority for price data delay: queued * 500 // 0.5 seconds between jobs diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/symbol-info.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/symbol-info.action.ts index b350979..d727005 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/symbol-info.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/symbol-info.action.ts @@ -196,13 +196,13 @@ export async function scheduleSymbolInfoUpdates( symbol: doc.symbol, qmSearchCode: doc.qmSearchCode || doc.symbol }, { - priority: 3, + // priority: 3, // Add some delay to avoid overwhelming the API - delay: queued * 1000 // 1 second between jobs + // delay: queued * 1000 // 1 second between jobs }); // Track that we've scheduled this symbol - await tracker.updateSymbolOperation(doc.symbol, 'symbol_info', { + await tracker.updateSymbolOperation(doc.qmSearchCode, 'symbol_info', { status: 'success' }); diff --git a/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts b/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts index eb4f7f4..d2c872f 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts @@ -7,7 +7,12 @@ import { getRandomUserAgent } from "@stock-bot/utils"; // QM Session IDs for different endpoints export const QM_SESSION_IDS = { LOOKUP: 'dc8c9930437f65d30f6597768800957017bac203a0a50342932757c8dfa158d6', // lookup endpoint - PROFILES: '1e1d7cb1de1fd2fe52684abdea41a446919a5fe12776dfab88615ac1ce1ec2f6', // getProfiles + SYMBOL: '1e1d7cb1de1fd2fe52684abdea41a446919a5fe12776dfab88615ac1ce1ec2f6', // getProfiles + // EDS: '', // + // FILINGS: '', // + // PRICES: '', // + // FINANCIALS: '', // + // INTRADAY: '', // // '5ad521e05faf5778d567f6d0012ec34d6cdbaeb2462f41568f66558bc7b4ced9': [], //4488d072b // cc1cbdaf040f76db8f4c94f7d156b9b9b716e1a7509ec9c74a48a47f6b6b9f87: [], //97ff00cf3 // getQuotes // '74963ff42f1db2320d051762b5d3950ff9eab23f9d5c5b592551b4ca0441d086': [], //32ca24e394b // getSplitsBySymbol getBrokerRatingsBySymbol getDividendsBySymbol getEarningsSurprisesBySymbol getEarningsEventsBySymbol diff --git a/apps/stock/data-ingestion/src/handlers/qm/shared/operation-tracker.ts b/apps/stock/data-ingestion/src/handlers/qm/shared/operation-tracker.ts index 18222a5..13d8c91 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/shared/operation-tracker.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/shared/operation-tracker.ts @@ -1,442 +1,442 @@ -/** - * QM Operation Tracker - Tracks operation execution times and states for symbols - * Supports dynamic operation registration with auto-indexing - */ - -import type { Logger, MongoDBClient } from '@stock-bot/types'; -import type { IntradayCrawlSymbol, QMOperationConfig } from './types'; - -export class QMOperationTracker { - private registeredOperations: Map = new Map(); - private indexesCreated: Set = new Set(); - private mongodb: MongoDBClient; - private logger: Logger; - private readonly collectionName = 'qmSymbols'; - - constructor(mongodb: MongoDBClient, logger: Logger) { - this.mongodb = mongodb; - this.logger = logger; - } - - /** - * Register a new operation type with auto-indexing - */ - async registerOperation(config: QMOperationConfig): Promise { - this.logger.info('Registering QM operation', { operation: config.name, type: config.type }); - - this.registeredOperations.set(config.name, config); - - // Auto-create indexes for this operation - await this.createOperationIndexes(config.name); - - this.logger.debug('Operation registered successfully', { operation: config.name }); - } - - /** - * Create indexes for efficient operation queries - */ - private async createOperationIndexes(operationName: string): Promise { - if (this.indexesCreated.has(operationName)) { - this.logger.debug('Indexes already created for operation', { operation: operationName }); - return; - } - - try { - const indexes = [ - // Index for finding stale symbols - { [`operations.${operationName}.lastRunAt`]: 1, symbol: 1 }, - // Index for finding by last record date - { [`operations.${operationName}.lastRecordDate`]: 1, symbol: 1 }, - ]; - - // Add crawl state index for intraday operations - const config = this.registeredOperations.get(operationName); - if (config?.type === 'intraday_crawl') { - indexes.push({ [`operations.${operationName}.crawlState.finished`]: 1, symbol: 1 }); - } - - for (const indexSpec of indexes) { - const collection = this.mongodb.collection(this.collectionName); - await collection.createIndex(indexSpec, { - background: true, - name: `op_${operationName}_${Object.keys(indexSpec).join('_')}` - }); - } - - this.indexesCreated.add(operationName); - this.logger.info('Created indexes for operation', { - operation: operationName, - indexCount: indexes.length - }); - } catch (error) { - this.logger.error('Failed to create indexes for operation', { - operation: operationName, - error - }); - throw error; - } - } - - /** - * Update symbol operation status - */ - async updateSymbolOperation( - symbol: string, - operationName: string, - data: { - status: 'success' | 'failure' | 'partial'; - lastRecordDate?: Date; - recordCount?: number; - crawlState?: { - finished?: boolean; - oldestDateReached?: Date; - }; - } - ): Promise { - const update: any = { - $set: { - [`operations.${operationName}.lastRunAt`]: new Date(), - [`operations.${operationName}.status`]: data.status, - updated_at: new Date() - } - }; - - if (data.lastRecordDate) { - update.$set[`operations.${operationName}.lastRecordDate`] = data.lastRecordDate; - } - - if (data.recordCount !== undefined) { - update.$set[`operations.${operationName}.recordCount`] = data.recordCount; - } - - if (data.crawlState) { - update.$set[`operations.${operationName}.crawlState`] = { - ...data.crawlState, - lastCrawlDirection: data.crawlState.finished ? 'forward' : 'backward' - }; - } - - await this.mongodb.updateOne(this.collectionName, { symbol }, update); - - this.logger.debug('Updated symbol operation', { - symbol, - operation: operationName, - status: data.status - }); - } - - /** - * Bulk update symbol operations for performance - */ - async bulkUpdateSymbolOperations( - updates: Array<{ - symbol: string; - operation: string; - data: { - status: 'success' | 'failure' | 'partial'; - lastRecordDate?: Date; - recordCount?: number; - crawlState?: any; - }; - }> - ): Promise { - if (updates.length === 0) {return;} - - const bulkOps = updates.map(({ symbol, operation, data }) => { - const update: any = { - $set: { - [`operations.${operation}.lastRunAt`]: new Date(), - [`operations.${operation}.status`]: data.status, - updated_at: new Date() - } - }; - - if (data.lastRecordDate) { - update.$set[`operations.${operation}.lastRecordDate`] = data.lastRecordDate; - } - - if (data.recordCount !== undefined) { - update.$set[`operations.${operation}.recordCount`] = data.recordCount; - } - - if (data.crawlState) { - update.$set[`operations.${operation}.crawlState`] = { - ...data.crawlState, - lastCrawlDirection: data.crawlState.finished ? 'forward' : 'backward' - }; - } - - return { - updateOne: { - filter: { symbol }, - update - } - }; - }); - - const collection = this.mongodb.collection(this.collectionName); - const result = await collection.bulkWrite(bulkOps as any, { ordered: false }); - - this.logger.debug('Bulk updated symbol operations', { - totalUpdates: updates.length, - modified: result.modifiedCount, - operations: Array.from(new Set(updates.map(u => u.operation))) - }); - } - - /** - * Get symbols that need processing for an operation - */ - async getStaleSymbols( - operationName: string, - options: { - notRunSince?: Date; - minHoursSinceRun?: number; - limit?: number; - excludeSymbols?: string[]; - } = {} - ): Promise { - const { limit = 1000, excludeSymbols = [] } = options; - - const cutoffDate = options.notRunSince || (() => { - const date = new Date(); - const hours = options.minHoursSinceRun || - this.registeredOperations.get(operationName)?.defaultStaleHours || 24; - date.setHours(date.getHours() - hours); - return date; - })(); - - const filter: any = { - $or: [ - { [`operations.${operationName}.lastRunAt`]: { $lt: cutoffDate } }, - { [`operations.${operationName}`]: { $exists: false } } - ] - }; - - if (excludeSymbols.length > 0) { - filter.symbol = { $nin: excludeSymbols }; - } - - const symbols = await this.mongodb.find(this.collectionName, filter, { - limit, - projection: { symbol: 1 }, - sort: { [`operations.${operationName}.lastRunAt`]: 1 } // Oldest first - }); - - return symbols.map(s => s.symbol); - } - - /** - * Get symbols for intraday crawling - */ - async getSymbolsForIntradayCrawl( - operationName: string, - options: { - limit?: number; - includeFinished?: boolean; - } = {} - ): Promise { - const { limit = 100, includeFinished = false } = options; - - const filter: any = {}; - if (!includeFinished) { - filter[`operations.${operationName}.crawlState.finished`] = { $ne: true }; - } - - const symbols = await this.mongodb.find(this.collectionName, filter, { - limit, - projection: { - symbol: 1, - [`operations.${operationName}`]: 1 - }, - sort: { - // Prioritize symbols that haven't been crawled yet - [`operations.${operationName}.lastRunAt`]: 1 - } - }); - - return symbols.map(s => ({ - symbol: s.symbol, - lastRecordDate: s.operations?.[operationName]?.lastRecordDate, - crawlState: s.operations?.[operationName]?.crawlState - })); - } - - /** - * Mark intraday crawl as finished - */ - async markCrawlFinished( - symbol: string, - operationName: string, - oldestDateReached: Date - ): Promise { - await this.updateSymbolOperation(symbol, operationName, { - status: 'success', - crawlState: { - finished: true, - oldestDateReached - } - }); - - this.logger.info('Marked crawl as finished', { - symbol, - operation: operationName, - oldestDateReached - }); - } - - /** - * Get symbols that need data updates based on last record date - */ - async getSymbolsNeedingUpdate( - operationName: string, - options: { - lastRecordBefore?: Date; - neverRun?: boolean; - limit?: number; - } = {} - ): Promise> { - const { limit = 500 } = options; - const filter: any = {}; - - if (options.neverRun) { - filter[`operations.${operationName}`] = { $exists: false }; - } else if (options.lastRecordBefore) { - filter.$or = [ - { [`operations.${operationName}.lastRecordDate`]: { $lt: options.lastRecordBefore } }, - { [`operations.${operationName}`]: { $exists: false } } - ]; - } - - const symbols = await this.mongodb.find(this.collectionName, filter, { - limit, - projection: { - symbol: 1, - [`operations.${operationName}.lastRecordDate`]: 1 - }, - sort: { [`operations.${operationName}.lastRecordDate`]: 1 } // Oldest data first - }); - - return symbols.map(s => ({ - symbol: s.symbol, - lastRecordDate: s.operations?.[operationName]?.lastRecordDate - })); - } - - /** - * Get operation statistics - */ - async getOperationStats(operationName: string): Promise<{ - totalSymbols: number; - processedSymbols: number; - staleSymbols: number; - successfulSymbols: number; - failedSymbols: number; - finishedCrawls?: number; - avgRecordsPerSymbol?: number; - }> { - const collection = this.mongodb.collection(this.collectionName); - const total = await collection.countDocuments({}); - - const processed = await collection.countDocuments({ - [`operations.${operationName}`]: { $exists: true } - }); - - const successful = await collection.countDocuments({ - [`operations.${operationName}.status`]: 'success' - }); - - const failed = await collection.countDocuments({ - [`operations.${operationName}.status`]: 'failure' - }); - - const staleDate = new Date(); - staleDate.setHours(staleDate.getHours() - ( - this.registeredOperations.get(operationName)?.defaultStaleHours || 24 - )); - - const stale = await collection.countDocuments({ - $or: [ - { [`operations.${operationName}.lastRunAt`]: { $lt: staleDate } }, - { [`operations.${operationName}`]: { $exists: false } } - ] - }); - - const result: any = { - totalSymbols: total, - processedSymbols: processed, - staleSymbols: stale, - successfulSymbols: successful, - failedSymbols: failed - }; - - // Additional stats for crawl operations - if (this.registeredOperations.get(operationName)?.type === 'intraday_crawl') { - result.finishedCrawls = await collection.countDocuments({ - [`operations.${operationName}.crawlState.finished`]: true - }); - } - - // Calculate average records per symbol - const aggregation = await collection.aggregate([ - { - $match: { - [`operations.${operationName}.recordCount`]: { $exists: true } - } - }, - { - $group: { - _id: null, - avgRecords: { $avg: `$operations.${operationName}.recordCount` } - } - } - ]).toArray(); - - if (aggregation.length > 0) { - result.avgRecordsPerSymbol = Math.round(aggregation[0].avgRecords); - } - - return result; - } - - /** - * Get all registered operations - */ - getRegisteredOperations(): QMOperationConfig[] { - return Array.from(this.registeredOperations.values()); - } - - /** - * Helper: Get symbols for price update - */ - async getSymbolsForPriceUpdate(limit = 1000): Promise { - return this.getStaleSymbols('price_update', { - minHoursSinceRun: 24, - limit - }); - } - - /** - * Helper: Get symbols with outdated financials - */ - async getSymbolsWithOldFinancials(limit = 100): Promise> { - const cutoffDate = new Date(); - cutoffDate.setDate(cutoffDate.getDate() - 90); // 90 days old - - return this.getSymbolsNeedingUpdate('financials_update', { - lastRecordBefore: cutoffDate, - limit - }); - } - - /** - * Helper: Get unprocessed symbols for an operation - */ - async getUnprocessedSymbols(operation: string, limit = 500): Promise { - const symbols = await this.getSymbolsNeedingUpdate(operation, { - neverRun: true, - limit - }); - return symbols.map(s => s.symbol); - } +/** + * QM Operation Tracker - Tracks operation execution times and states for symbols + * Supports dynamic operation registration with auto-indexing + */ + +import type { Logger, MongoDBClient } from '@stock-bot/types'; +import type { IntradayCrawlSymbol, QMOperationConfig } from './types'; + +export class QMOperationTracker { + private registeredOperations: Map = new Map(); + private indexesCreated: Set = new Set(); + private mongodb: MongoDBClient; + private logger: Logger; + private readonly collectionName = 'qmSymbols'; + + constructor(mongodb: MongoDBClient, logger: Logger) { + this.mongodb = mongodb; + this.logger = logger; + } + + /** + * Register a new operation type with auto-indexing + */ + async registerOperation(config: QMOperationConfig): Promise { + this.logger.info('Registering QM operation', { operation: config.name, type: config.type }); + + this.registeredOperations.set(config.name, config); + + // Auto-create indexes for this operation + await this.createOperationIndexes(config.name); + + this.logger.debug('Operation registered successfully', { operation: config.name }); + } + + /** + * Create indexes for efficient operation queries + */ + private async createOperationIndexes(operationName: string): Promise { + if (this.indexesCreated.has(operationName)) { + this.logger.debug('Indexes already created for operation', { operation: operationName }); + return; + } + + try { + const indexes = [ + // Index for finding stale symbols + { [`operations.${operationName}.lastRunAt`]: 1, qmSearchCode: 1 }, + // Index for finding by last record date + { [`operations.${operationName}.lastRecordDate`]: 1, qmSearchCode: 1 }, + ]; + + // Add crawl state index for intraday operations + const config = this.registeredOperations.get(operationName); + if (config?.type === 'intraday_crawl') { + indexes.push({ [`operations.${operationName}.crawlState.finished`]: 1, qmSearchCode: 1 }); + } + + for (const indexSpec of indexes) { + const collection = this.mongodb.collection(this.collectionName); + await collection.createIndex(indexSpec, { + background: true, + name: `op_${operationName}_${Object.keys(indexSpec).join('_')}` + }); + } + + this.indexesCreated.add(operationName); + this.logger.info('Created indexes for operation', { + operation: operationName, + indexCount: indexes.length + }); + } catch (error) { + this.logger.error('Failed to create indexes for operation', { + operation: operationName, + error + }); + throw error; + } + } + + /** + * Update symbol operation status + */ + async updateSymbolOperation( + qmSearchCode: string, + operationName: string, + data: { + status: 'success' | 'failure' | 'partial'; + lastRecordDate?: Date; + recordCount?: number; + crawlState?: { + finished?: boolean; + oldestDateReached?: Date; + }; + } + ): Promise { + const update: any = { + $set: { + [`operations.${operationName}.lastRunAt`]: new Date(), + [`operations.${operationName}.status`]: data.status, + updated_at: new Date() + } + }; + + if (data.lastRecordDate) { + update.$set[`operations.${operationName}.lastRecordDate`] = data.lastRecordDate; + } + + if (data.recordCount !== undefined) { + update.$set[`operations.${operationName}.recordCount`] = data.recordCount; + } + + if (data.crawlState) { + update.$set[`operations.${operationName}.crawlState`] = { + ...data.crawlState, + lastCrawlDirection: data.crawlState.finished ? 'forward' : 'backward' + }; + } + + await this.mongodb.updateOne(this.collectionName, { qmSearchCode }, update); + + this.logger.debug('Updated symbol operation', { + qmSearchCode, + operation: operationName, + status: data.status + }); + } + + /** + * Bulk update symbol operations for performance + */ + async bulkUpdateSymbolOperations( + updates: Array<{ + qmSearchCode: string; + operation: string; + data: { + status: 'success' | 'failure' | 'partial'; + lastRecordDate?: Date; + recordCount?: number; + crawlState?: any; + }; + }> + ): Promise { + if (updates.length === 0) {return;} + + const bulkOps = updates.map(({ qmSearchCode, operation, data }) => { + const update: any = { + $set: { + [`operations.${operation}.lastRunAt`]: new Date(), + [`operations.${operation}.status`]: data.status, + updated_at: new Date() + } + }; + + if (data.lastRecordDate) { + update.$set[`operations.${operation}.lastRecordDate`] = data.lastRecordDate; + } + + if (data.recordCount !== undefined) { + update.$set[`operations.${operation}.recordCount`] = data.recordCount; + } + + if (data.crawlState) { + update.$set[`operations.${operation}.crawlState`] = { + ...data.crawlState, + lastCrawlDirection: data.crawlState.finished ? 'forward' : 'backward' + }; + } + + return { + updateOne: { + filter: { qmSearchCode }, + update + } + }; + }); + + const collection = this.mongodb.collection(this.collectionName); + const result = await collection.bulkWrite(bulkOps as any, { ordered: false }); + + this.logger.debug('Bulk updated symbol operations', { + totalUpdates: updates.length, + modified: result.modifiedCount, + operations: Array.from(new Set(updates.map(u => u.operation))) + }); + } + + /** + * Get symbols that need processing for an operation + */ + async getStaleSymbols( + operationName: string, + options: { + notRunSince?: Date; + minHoursSinceRun?: number; + limit?: number; + excludeSymbols?: string[]; + } = {} + ): Promise { + const { limit = 1000, excludeSymbols = [] } = options; + + const cutoffDate = options.notRunSince || (() => { + const date = new Date(); + const hours = options.minHoursSinceRun || + this.registeredOperations.get(operationName)?.defaultStaleHours || 24; + date.setHours(date.getHours() - hours); + return date; + })(); + + const filter: any = { + $or: [ + { [`operations.${operationName}.lastRunAt`]: { $lt: cutoffDate } }, + { [`operations.${operationName}`]: { $exists: false } } + ] + }; + + if (excludeSymbols.length > 0) { + filter.qmSearchCode = { $nin: excludeSymbols }; + } + + const symbols = await this.mongodb.find(this.collectionName, filter, { + limit, + projection: { qmSearchCode: 1 }, + sort: { [`operations.${operationName}.lastRunAt`]: 1 } // Oldest first + }); + + return symbols.map(s => s.qmSearchCode); + } + + /** + * Get symbols for intraday crawling + */ + async getSymbolsForIntradayCrawl( + operationName: string, + options: { + limit?: number; + includeFinished?: boolean; + } = {} + ): Promise { + const { limit = 100, includeFinished = false } = options; + + const filter: any = {}; + if (!includeFinished) { + filter[`operations.${operationName}.crawlState.finished`] = { $ne: true }; + } + + const symbols = await this.mongodb.find(this.collectionName, filter, { + limit, + projection: { + qmSearchCode: 1, + [`operations.${operationName}`]: 1 + }, + sort: { + // Prioritize symbols that haven't been crawled yet + [`operations.${operationName}.lastRunAt`]: 1 + } + }); + + return symbols.map(s => ({ + qmSearchCode: s.qmSearchCode, + lastRecordDate: s.operations?.[operationName]?.lastRecordDate, + crawlState: s.operations?.[operationName]?.crawlState + })); + } + + /** + * Mark intraday crawl as finished + */ + async markCrawlFinished( + qmSearchCode: string, + operationName: string, + oldestDateReached: Date + ): Promise { + await this.updateSymbolOperation(qmSearchCode, operationName, { + status: 'success', + crawlState: { + finished: true, + oldestDateReached + } + }); + + this.logger.info('Marked crawl as finished', { + qmSearchCode, + operation: operationName, + oldestDateReached + }); + } + + /** + * Get symbols that need data updates based on last record date + */ + async getSymbolsNeedingUpdate( + operationName: string, + options: { + lastRecordBefore?: Date; + neverRun?: boolean; + limit?: number; + } = {} + ): Promise> { + const { limit = 500 } = options; + const filter: any = {}; + + if (options.neverRun) { + filter[`operations.${operationName}`] = { $exists: false }; + } else if (options.lastRecordBefore) { + filter.$or = [ + { [`operations.${operationName}.lastRecordDate`]: { $lt: options.lastRecordBefore } }, + { [`operations.${operationName}`]: { $exists: false } } + ]; + } + + const symbols = await this.mongodb.find(this.collectionName, filter, { + limit, + projection: { + qmSearchCode: 1, + [`operations.${operationName}.lastRecordDate`]: 1 + }, + sort: { [`operations.${operationName}.lastRecordDate`]: 1 } // Oldest data first + }); + + return symbols.map(s => ({ + qmSearchCode: s.qmSearchCode, + lastRecordDate: s.operations?.[operationName]?.lastRecordDate + })); + } + + /** + * Get operation statistics + */ + async getOperationStats(operationName: string): Promise<{ + totalSymbols: number; + processedSymbols: number; + staleSymbols: number; + successfulSymbols: number; + failedSymbols: number; + finishedCrawls?: number; + avgRecordsPerSymbol?: number; + }> { + const collection = this.mongodb.collection(this.collectionName); + const total = await collection.countDocuments({}); + + const processed = await collection.countDocuments({ + [`operations.${operationName}`]: { $exists: true } + }); + + const successful = await collection.countDocuments({ + [`operations.${operationName}.status`]: 'success' + }); + + const failed = await collection.countDocuments({ + [`operations.${operationName}.status`]: 'failure' + }); + + const staleDate = new Date(); + staleDate.setHours(staleDate.getHours() - ( + this.registeredOperations.get(operationName)?.defaultStaleHours || 24 + )); + + const stale = await collection.countDocuments({ + $or: [ + { [`operations.${operationName}.lastRunAt`]: { $lt: staleDate } }, + { [`operations.${operationName}`]: { $exists: false } } + ] + }); + + const result: any = { + totalSymbols: total, + processedSymbols: processed, + staleSymbols: stale, + successfulSymbols: successful, + failedSymbols: failed + }; + + // Additional stats for crawl operations + if (this.registeredOperations.get(operationName)?.type === 'intraday_crawl') { + result.finishedCrawls = await collection.countDocuments({ + [`operations.${operationName}.crawlState.finished`]: true + }); + } + + // Calculate average records per symbol + const aggregation = await collection.aggregate([ + { + $match: { + [`operations.${operationName}.recordCount`]: { $exists: true } + } + }, + { + $group: { + _id: null, + avgRecords: { $avg: `$operations.${operationName}.recordCount` } + } + } + ]).toArray(); + + if (aggregation.length > 0) { + result.avgRecordsPerSymbol = Math.round(aggregation[0].avgRecords); + } + + return result; + } + + /** + * Get all registered operations + */ + getRegisteredOperations(): QMOperationConfig[] { + return Array.from(this.registeredOperations.values()); + } + + /** + * Helper: Get symbols for price update + */ + async getSymbolsForPriceUpdate(limit = 1000): Promise { + return this.getStaleSymbols('price_update', { + minHoursSinceRun: 24, + limit + }); + } + + /** + * Helper: Get symbols with outdated financials + */ + async getSymbolsWithOldFinancials(limit = 100): Promise> { + const cutoffDate = new Date(); + cutoffDate.setDate(cutoffDate.getDate() - 90); // 90 days old + + return this.getSymbolsNeedingUpdate('financials_update', { + lastRecordBefore: cutoffDate, + limit + }); + } + + /** + * Helper: Get unprocessed symbols for an operation + */ + async getUnprocessedSymbols(operation: string, limit = 500): Promise { + const symbols = await this.getSymbolsNeedingUpdate(operation, { + neverRun: true, + limit + }); + return symbols.map(s => s.qmSearchCode); + } } \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/qm/shared/types.ts b/apps/stock/data-ingestion/src/handlers/qm/shared/types.ts index c3794ed..de52fcc 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/shared/types.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/shared/types.ts @@ -84,7 +84,7 @@ export interface QMSymbolOperationStatus { } export interface IntradayCrawlSymbol { - symbol: string; + qmSearchCode: string; lastRecordDate?: Date; crawlState?: { finished: boolean;