/** * QM Operation Tracker - Tracks operation execution times and states for symbols * Supports dynamic operation registration with auto-indexing */ import type { Logger, MongoDBClient } from '@stock-bot/types'; import type { IntradayCrawlSymbol, QMOperationConfig } from './types'; export class QMOperationTracker { private registeredOperations: Map = new Map(); private indexesCreated: Set = new Set(); private mongodb: MongoDBClient; private logger: Logger; private readonly collectionName = 'qmSymbols'; constructor(mongodb: MongoDBClient, logger: Logger) { this.mongodb = mongodb; this.logger = logger; } /** * Register a new operation type with auto-indexing */ async registerOperation(config: QMOperationConfig): Promise { this.logger.info('Registering QM operation', { operation: config.name, type: config.type }); this.registeredOperations.set(config.name, config); // Auto-create indexes for this operation await this.createOperationIndexes(config.name); this.logger.debug('Operation registered successfully', { operation: config.name }); } /** * Create indexes for efficient operation queries */ private async createOperationIndexes(operationName: string): Promise { if (this.indexesCreated.has(operationName)) { this.logger.debug('Indexes already created for operation', { operation: operationName }); return; } try { const indexes = [ // Index for finding stale symbols { [`operations.${operationName}.lastRunAt`]: 1, qmSearchCode: 1 }, // Index for finding by last record date { [`operations.${operationName}.lastRecordDate`]: 1, qmSearchCode: 1 }, ]; // Add crawl state index for intraday operations const config = this.registeredOperations.get(operationName); if (config?.type === 'intraday_crawl') { indexes.push({ [`operations.${operationName}.crawlState.finished`]: 1, qmSearchCode: 1 }); } for (const indexSpec of indexes) { const collection = this.mongodb.collection(this.collectionName); await collection.createIndex(indexSpec, { background: true, name: `op_${operationName}_${Object.keys(indexSpec).join('_')}` }); } this.indexesCreated.add(operationName); this.logger.info('Created indexes for operation', { operation: operationName, indexCount: indexes.length }); } catch (error) { this.logger.error('Failed to create indexes for operation', { operation: operationName, error }); throw error; } } /** * Update symbol operation status */ async updateSymbolOperation( qmSearchCode: string, operationName: string, data: { status: 'success' | 'failure' | 'partial'; lastRecordDate?: Date; recordCount?: number; crawlState?: { finished?: boolean; oldestDateReached?: Date; }; } ): Promise { const update: any = { $set: { [`operations.${operationName}.lastRunAt`]: new Date(), [`operations.${operationName}.status`]: data.status, updated_at: new Date() } }; // Only update lastSuccessAt on successful operations if (data.status === 'success') { update.$set[`operations.${operationName}.lastSuccessAt`] = new Date(); } if (data.lastRecordDate) { update.$set[`operations.${operationName}.lastRecordDate`] = data.lastRecordDate; } if (data.recordCount !== undefined) { update.$set[`operations.${operationName}.recordCount`] = data.recordCount; } if (data.crawlState) { update.$set[`operations.${operationName}.crawlState`] = { ...data.crawlState, lastCrawlDirection: data.crawlState.finished ? 'forward' : 'backward' }; } await this.mongodb.updateOne(this.collectionName, { qmSearchCode }, update); this.logger.debug('Updated symbol operation', { qmSearchCode, operation: operationName, status: data.status }); } /** * Bulk update symbol operations for performance */ async bulkUpdateSymbolOperations( updates: Array<{ qmSearchCode: string; operation: string; data: { status: 'success' | 'failure' | 'partial'; lastRecordDate?: Date; recordCount?: number; crawlState?: any; }; }> ): Promise { if (updates.length === 0) {return;} const bulkOps = updates.map(({ qmSearchCode, operation, data }) => { const update: any = { $set: { [`operations.${operation}.lastRunAt`]: new Date(), [`operations.${operation}.status`]: data.status, updated_at: new Date() } }; // Only update lastSuccessAt on successful operations if (data.status === 'success') { update.$set[`operations.${operation}.lastSuccessAt`] = new Date(); } if (data.lastRecordDate) { update.$set[`operations.${operation}.lastRecordDate`] = data.lastRecordDate; } if (data.recordCount !== undefined) { update.$set[`operations.${operation}.recordCount`] = data.recordCount; } if (data.crawlState) { update.$set[`operations.${operation}.crawlState`] = { ...data.crawlState, lastCrawlDirection: data.crawlState.finished ? 'forward' : 'backward' }; } return { updateOne: { filter: { qmSearchCode }, update } }; }); const collection = this.mongodb.collection(this.collectionName); const result = await collection.bulkWrite(bulkOps as any, { ordered: false }); this.logger.debug('Bulk updated symbol operations', { totalUpdates: updates.length, modified: result.modifiedCount, operations: Array.from(new Set(updates.map(u => u.operation))) }); } /** * Get symbols that need processing for an operation */ async getStaleSymbols( operationName: string, options: { notRunSince?: Date; minHoursSinceRun?: number; limit?: number; excludeSymbols?: string[]; } = {} ): Promise { const { limit = 1000, excludeSymbols = [] } = options; const cutoffDate = options.notRunSince || (() => { const date = new Date(); const hours = options.minHoursSinceRun || this.registeredOperations.get(operationName)?.defaultStaleHours || 24; date.setHours(date.getHours() - hours); return date; })(); const filter: any = { active: { $ne: false }, // Only active symbols (active: true or active doesn't exist) $or: [ { [`operations.${operationName}.lastSuccessAt`]: { $lt: cutoffDate } }, { [`operations.${operationName}.lastSuccessAt`]: { $exists: false } }, { [`operations.${operationName}`]: { $exists: false } } ] }; if (excludeSymbols.length > 0) { filter.qmSearchCode = { $nin: excludeSymbols }; } const symbols = await this.mongodb.find(this.collectionName, filter, { limit, projection: { qmSearchCode: 1 }, sort: { [`operations.${operationName}.lastSuccessAt`]: 1 } // Oldest successful run first }); return symbols.map(s => s.qmSearchCode); } /** * Get symbols for intraday crawling */ async getSymbolsForIntradayCrawl( operationName: string, options: { limit?: number; includeFinished?: boolean; } = {} ): Promise { const { limit = 100, includeFinished = false } = options; const filter: any = { active: { $ne: false } // Only active symbols }; if (!includeFinished) { filter[`operations.${operationName}.crawlState.finished`] = { $ne: true }; } const symbols = await this.mongodb.find(this.collectionName, filter, { limit, projection: { qmSearchCode: 1, [`operations.${operationName}`]: 1 }, sort: { // Prioritize symbols that haven't been crawled yet [`operations.${operationName}.lastRunAt`]: 1 } }); return symbols.map(s => ({ qmSearchCode: s.qmSearchCode, lastRecordDate: s.operations?.[operationName]?.lastRecordDate, crawlState: s.operations?.[operationName]?.crawlState })); } /** * Mark intraday crawl as finished */ async markCrawlFinished( qmSearchCode: string, operationName: string, oldestDateReached: Date ): Promise { await this.updateSymbolOperation(qmSearchCode, operationName, { status: 'success', crawlState: { finished: true, oldestDateReached } }); this.logger.info('Marked crawl as finished', { qmSearchCode, operation: operationName, oldestDateReached }); } /** * Get symbols that need data updates based on last record date */ async getSymbolsNeedingUpdate( operationName: string, options: { lastRecordBefore?: Date; neverRun?: boolean; limit?: number; } = {} ): Promise> { const { limit = 500 } = options; const filter: any = { active: { $ne: false } // Only active symbols }; if (options.neverRun) { filter[`operations.${operationName}`] = { $exists: false }; } else if (options.lastRecordBefore) { filter.$or = [ { [`operations.${operationName}.lastRecordDate`]: { $lt: options.lastRecordBefore } }, { [`operations.${operationName}`]: { $exists: false } } ]; } const symbols = await this.mongodb.find(this.collectionName, filter, { limit, projection: { qmSearchCode: 1, [`operations.${operationName}.lastRecordDate`]: 1 }, sort: { [`operations.${operationName}.lastRecordDate`]: 1 } // Oldest data first }); return symbols.map(s => ({ qmSearchCode: s.qmSearchCode, lastRecordDate: s.operations?.[operationName]?.lastRecordDate })); } /** * Get operation statistics */ async getOperationStats(operationName: string): Promise<{ totalSymbols: number; processedSymbols: number; staleSymbols: number; successfulSymbols: number; failedSymbols: number; finishedCrawls?: number; avgRecordsPerSymbol?: number; }> { const collection = this.mongodb.collection(this.collectionName); const total = await collection.countDocuments({}); const processed = await collection.countDocuments({ [`operations.${operationName}`]: { $exists: true } }); const successful = await collection.countDocuments({ [`operations.${operationName}.status`]: 'success' }); const failed = await collection.countDocuments({ [`operations.${operationName}.status`]: 'failure' }); const staleDate = new Date(); staleDate.setHours(staleDate.getHours() - ( this.registeredOperations.get(operationName)?.defaultStaleHours || 24 )); const stale = await collection.countDocuments({ $or: [ { [`operations.${operationName}.lastRunAt`]: { $lt: staleDate } }, { [`operations.${operationName}`]: { $exists: false } } ] }); const result: any = { totalSymbols: total, processedSymbols: processed, staleSymbols: stale, successfulSymbols: successful, failedSymbols: failed }; // Additional stats for crawl operations if (this.registeredOperations.get(operationName)?.type === 'intraday_crawl') { result.finishedCrawls = await collection.countDocuments({ [`operations.${operationName}.crawlState.finished`]: true }); } // Calculate average records per symbol const aggregation = await collection.aggregate([ { $match: { [`operations.${operationName}.recordCount`]: { $exists: true } } }, { $group: { _id: null, avgRecords: { $avg: `$operations.${operationName}.recordCount` } } } ]).toArray(); if (aggregation.length > 0) { result.avgRecordsPerSymbol = Math.round(aggregation[0].avgRecords); } return result; } /** * Get all registered operations */ getRegisteredOperations(): QMOperationConfig[] { return Array.from(this.registeredOperations.values()); } /** * Helper: Get symbols for price update */ async getSymbolsForPriceUpdate(limit = 1000): Promise { return this.getStaleSymbols('price_update', { minHoursSinceRun: 24, limit }); } /** * Helper: Get symbols with outdated financials */ async getSymbolsWithOldFinancials(limit = 100): Promise> { const cutoffDate = new Date(); cutoffDate.setDate(cutoffDate.getDate() - 90); // 90 days old return this.getSymbolsNeedingUpdate('financials_update', { lastRecordBefore: cutoffDate, limit }); } /** * Helper: Get unprocessed symbols for an operation */ async getUnprocessedSymbols(operation: string, limit = 500): Promise { const symbols = await this.getSymbolsNeedingUpdate(operation, { neverRun: true, limit }); return symbols.map(s => s.qmSearchCode); } }