From 4ad232c35e1468d338ec4ddbd81c23017ead5b98 Mon Sep 17 00:00:00 2001 From: Boki Date: Sat, 28 Jun 2025 09:21:28 -0400 Subject: [PATCH] initial qm operation tracker --- .../src/handlers/qm/actions/index.ts | 1 + .../src/handlers/qm/actions/price.action.ts | 297 ++++++++++++ .../src/handlers/qm/actions/symbol.action.ts | 2 + .../src/handlers/qm/qm.handler.ts | 24 + .../src/handlers/qm/shared/index.ts | 4 + .../handlers/qm/shared/operation-registry.ts | 118 +++++ .../handlers/qm/shared/operation-tracker.ts | 441 ++++++++++++++++++ .../src/handlers/qm/shared/types.ts | 41 ++ 8 files changed, 928 insertions(+) create mode 100644 apps/stock/data-ingestion/src/handlers/qm/actions/price.action.ts create mode 100644 apps/stock/data-ingestion/src/handlers/qm/shared/index.ts create mode 100644 apps/stock/data-ingestion/src/handlers/qm/shared/operation-registry.ts create mode 100644 apps/stock/data-ingestion/src/handlers/qm/shared/operation-tracker.ts diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/index.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/index.ts index e3f1cdb..17a7490 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/index.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/index.ts @@ -4,4 +4,5 @@ export { checkSessions, createSession } from './session.action'; export { searchSymbols, spiderSymbol } from './symbol.action'; +export { updatePrices, updateIntradayBars, getOperationStats } from './price.action'; diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/price.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/price.action.ts new file mode 100644 index 0000000..546d927 --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/price.action.ts @@ -0,0 +1,297 @@ +/** + * QM Price Actions - Price data updates with operation tracking + */ + +import type { BaseHandler, ExecutionContext } from '@stock-bot/handlers'; +import { QMOperationTracker } from '../shared/operation-tracker'; + +// Cache tracker instance +let operationTracker: QMOperationTracker | null = null; + +/** + * Get or initialize the operation tracker + */ +async function getOperationTracker(handler: BaseHandler): Promise { + if (!operationTracker) { + const { initializeQMOperations } = await import('../shared/operation-registry'); + operationTracker = await initializeQMOperations(handler.mongodb, handler.logger); + } + return operationTracker; +} + +/** + * Update daily price data for stale symbols + */ +export async function updatePrices( + this: BaseHandler, + input: { + limit?: number; + symbols?: string[]; + } = {}, + _context?: ExecutionContext +): Promise<{ + message: string; + symbolsUpdated: number; + errors: number; +}> { + const { limit = 100, symbols } = input; + const tracker = await getOperationTracker(this); + + this.logger.info('Starting price update operation', { limit, specificSymbols: symbols?.length }); + + try { + // Get symbols that need updating + let symbolsToUpdate: string[]; + + if (symbols && symbols.length > 0) { + // Update specific symbols + symbolsToUpdate = symbols; + } else { + // Get stale symbols + symbolsToUpdate = await tracker.getStaleSymbols('price_update', { + minHoursSinceRun: 24, + limit + }); + } + + if (symbolsToUpdate.length === 0) { + this.logger.info('No symbols need price updates'); + return { + message: 'No symbols need price updates', + symbolsUpdated: 0, + errors: 0 + }; + } + + this.logger.info(`Found ${symbolsToUpdate.length} symbols for price update`); + + let updated = 0; + let errors = 0; + const updateResults = []; + + // Process symbols (in real implementation, you'd fetch prices from QM API) + for (const symbol of symbolsToUpdate) { + try { + // TODO: Actual price fetching logic here + // const prices = await fetchPricesFromQM(symbol); + + // For now, simulate the update + const mockPrices = { + symbol, + lastPrice: Math.random() * 1000, + volume: Math.floor(Math.random() * 1000000), + date: new Date() + }; + + // Track the operation + updateResults.push({ + symbol, + operation: 'price_update', + data: { + status: 'success' as const, + lastRecordDate: mockPrices.date, + recordCount: 1 + } + }); + + updated++; + } catch (error) { + this.logger.error(`Failed to update prices for ${symbol}`, { error }); + + updateResults.push({ + symbol, + operation: 'price_update', + data: { + status: 'failure' as const + } + }); + + errors++; + } + } + + // Bulk update operation tracking + if (updateResults.length > 0) { + await tracker.bulkUpdateSymbolOperations(updateResults); + } + + this.logger.info('Price update operation completed', { + symbolsUpdated: updated, + errors, + total: symbolsToUpdate.length + }); + + return { + message: `Updated prices for ${updated} symbols`, + symbolsUpdated: updated, + errors + }; + } catch (error) { + this.logger.error('Price update operation failed', { error }); + throw error; + } +} + +/** + * Update intraday price bars - crawls backwards until no more data + */ +export async function updateIntradayBars( + this: BaseHandler, + input: { + symbol?: string; + limit?: number; + } = {}, + _context?: ExecutionContext +): Promise<{ + message: string; + symbol: string; + barsCollected: number; + crawlFinished: boolean; +}> { + const { symbol, limit = 1 } = input; + const tracker = await getOperationTracker(this); + + try { + // Get symbols for intraday crawl + let symbolData; + if (symbol) { + symbolData = await tracker.getSymbolsForIntradayCrawl('intraday_bars', { + limit: 1 + }).then(symbols => symbols.find(s => s.symbol === symbol)); + } else { + const symbols = await tracker.getSymbolsForIntradayCrawl('intraday_bars', { + limit + }); + symbolData = symbols[0]; + } + + if (!symbolData) { + return { + message: 'No symbols available for intraday crawl', + symbol: '', + barsCollected: 0, + crawlFinished: false + }; + } + + this.logger.info('Processing intraday bars', { + symbol: symbolData.symbol, + crawlState: symbolData.crawlState + }); + + let barsCollected = 0; + let crawlFinished = false; + + if (symbolData.crawlState?.finished) { + // Already finished initial crawl, just update from last record + this.logger.debug('Symbol already crawled, updating from last record', { + symbol: symbolData.symbol, + lastRecord: symbolData.lastRecordDate + }); + + // TODO: Fetch bars from lastRecordDate to now + const newBars = 10; // Mock data + + await tracker.updateSymbolOperation(symbolData.symbol, 'intraday_bars', { + status: 'success', + lastRecordDate: new Date(), + recordCount: (symbolData.crawlState as any).recordCount + newBars + }); + + return { + message: `Updated ${newBars} new bars for ${symbolData.symbol}`, + symbol: symbolData.symbol, + barsCollected: newBars, + crawlFinished: true + }; + } + + // Initial crawl - go backwards until no data + let currentDate = new Date(); + let oldestDate = currentDate; + let totalBars = 0; + let consecutiveEmptyDays = 0; + const maxEmptyDays = 5; // Stop after 5 consecutive days with no data + + while (consecutiveEmptyDays < maxEmptyDays) { + // TODO: Actual bar fetching logic + // const bars = await fetchIntradayBars(symbolData.symbol, currentDate); + + // Mock data - simulate decreasing data as we go back + const bars = currentDate > new Date('2020-01-01') ? Math.floor(Math.random() * 100) : 0; + + if (bars === 0) { + consecutiveEmptyDays++; + } else { + consecutiveEmptyDays = 0; + totalBars += bars; + oldestDate = new Date(currentDate); + } + + // Update progress + await tracker.updateSymbolOperation(symbolData.symbol, 'intraday_bars', { + status: 'partial', + lastRecordDate: new Date(), + recordCount: totalBars, + crawlState: { + finished: false, + oldestDateReached: oldestDate + } + }); + + // Move to previous day + currentDate.setDate(currentDate.getDate() - 1); + + // Limit crawl for this execution + if (totalBars > 1000) { + this.logger.info('Reached bar limit for this execution', { + symbol: symbolData.symbol, + barsCollected: totalBars + }); + break; + } + } + + // Check if we finished the crawl + if (consecutiveEmptyDays >= maxEmptyDays) { + crawlFinished = true; + await tracker.markCrawlFinished(symbolData.symbol, 'intraday_bars', oldestDate); + + this.logger.info('Completed initial crawl for symbol', { + symbol: symbolData.symbol, + totalBars, + oldestDate + }); + } + + return { + message: `Collected ${totalBars} bars for ${symbolData.symbol}`, + symbol: symbolData.symbol, + barsCollected: totalBars, + crawlFinished + }; + } catch (error) { + this.logger.error('Intraday bars update failed', { error }); + throw error; + } +} + +/** + * Get operation statistics + */ +export async function getOperationStats( + this: BaseHandler, + input: { + operation: string; + }, + _context?: ExecutionContext +): Promise { + const tracker = await getOperationTracker(this); + + const stats = await tracker.getOperationStats(input.operation); + + return { + operation: input.operation, + ...stats + }; +} \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/symbol.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/symbol.action.ts index 9f091c9..bdcf6e3 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/symbol.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/symbol.action.ts @@ -5,6 +5,8 @@ import type { BaseHandler, ExecutionContext } from '@stock-bot/handlers'; import { QM_CONFIG, QM_SESSION_IDS } from '../shared/config'; import { QMSessionManager } from '../shared/session-manager'; +import { QMOperationTracker } from '../shared/operation-tracker'; +import { initializeQMOperations } from '../shared/operation-registry'; import type { Exchange, SymbolSpiderJob } from '../shared/types'; /** diff --git a/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts b/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts index 7a7c1a7..47046da 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts @@ -5,6 +5,7 @@ import { ScheduledOperation, } from '@stock-bot/handlers'; import { checkSessions, createSession, searchSymbols, spiderSymbol } from './actions'; +import { updatePrices, updateIntradayBars, getOperationStats } from './actions/price.action'; @Handler('qm') export class QMHandler extends BaseHandler { @@ -37,4 +38,27 @@ export class QMHandler extends BaseHandler { @Operation('search-symbols') searchSymbols = searchSymbols; + + /** + * PRICE DATA + */ + @ScheduledOperation('update-prices', '0 */6 * * *', { + priority: 5, + immediately: false, + description: 'Update daily prices every 6 hours' + }) + updatePrices = updatePrices; + + @ScheduledOperation('update-intraday-bars', '*/30 * * * *', { + priority: 6, + immediately: false, + description: 'Update intraday bars every 30 minutes during market hours' + }) + updateIntradayBars = updateIntradayBars; + + /** + * MONITORING + */ + @Operation('get-operation-stats') + getOperationStats = getOperationStats; } diff --git a/apps/stock/data-ingestion/src/handlers/qm/shared/index.ts b/apps/stock/data-ingestion/src/handlers/qm/shared/index.ts new file mode 100644 index 0000000..b391d81 --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/qm/shared/index.ts @@ -0,0 +1,4 @@ +export * from './config'; +export * from './session-manager'; +export * from './types'; +export * from './operation-tracker'; \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/qm/shared/operation-registry.ts b/apps/stock/data-ingestion/src/handlers/qm/shared/operation-registry.ts new file mode 100644 index 0000000..d5dcad5 --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/qm/shared/operation-registry.ts @@ -0,0 +1,118 @@ +/** + * QM Operation Registry - Define and register all QM operations + */ + +import type { MongoDBClient } from '@stock-bot/mongodb'; +import type { Logger } from '@stock-bot/types'; +import { QMOperationTracker } from './operation-tracker'; +import type { QMOperationConfig } from './types'; + +// Define all QM operations +export const QM_OPERATIONS: QMOperationConfig[] = [ + // Price data operations + { + name: 'price_update', + type: 'standard', + description: 'Update daily price data', + defaultStaleHours: 24 + }, + { + name: 'intraday_bars', + type: 'intraday_crawl', + description: 'Crawl intraday price bars from today backwards', + requiresFinishedFlag: true, + defaultStaleHours: 1 // Check every hour for new data + }, + + // Fundamental data operations + { + name: 'financials_update', + type: 'standard', + description: 'Update financial statements', + defaultStaleHours: 24 * 7 // Weekly + }, + { + name: 'earnings_update', + type: 'standard', + description: 'Update earnings data', + defaultStaleHours: 24 * 7 // Weekly + }, + { + name: 'dividends_update', + type: 'standard', + description: 'Update dividend history', + defaultStaleHours: 24 * 7 // Weekly + }, + { + name: 'splits_update', + type: 'standard', + description: 'Update stock split history', + defaultStaleHours: 24 * 30 // Monthly + }, + + // News and filings + { + name: 'filings_update', + type: 'standard', + description: 'Update SEC filings', + defaultStaleHours: 24 // Daily + }, + { + name: 'news_update', + type: 'standard', + description: 'Update news articles', + defaultStaleHours: 6 // Every 6 hours + }, + + // Technical indicators + { + name: 'indicators_update', + type: 'standard', + description: 'Calculate technical indicators', + defaultStaleHours: 24 // Daily + }, + + // Options data + { + name: 'options_chain', + type: 'standard', + description: 'Update options chain data', + defaultStaleHours: 1 // Hourly during market hours + } +]; + +/** + * Initialize operation tracker with all registered operations + */ +export async function initializeQMOperations( + mongodb: MongoDBClient, + logger: Logger +): Promise { + logger.info('Initializing QM operations tracker'); + + const tracker = new QMOperationTracker(mongodb, logger); + + // Register all operations + for (const operation of QM_OPERATIONS) { + try { + await tracker.registerOperation(operation); + logger.debug(`Registered operation: ${operation.name}`); + } catch (error) { + logger.error(`Failed to register operation: ${operation.name}`, { error }); + throw error; + } + } + + logger.info('QM operations tracker initialized', { + operationCount: QM_OPERATIONS.length + }); + + return tracker; +} + +/** + * Get operation configuration by name + */ +export function getOperationConfig(name: string): QMOperationConfig | undefined { + return QM_OPERATIONS.find(op => op.name === name); +} \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/qm/shared/operation-tracker.ts b/apps/stock/data-ingestion/src/handlers/qm/shared/operation-tracker.ts new file mode 100644 index 0000000..e4ebef4 --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/qm/shared/operation-tracker.ts @@ -0,0 +1,441 @@ +/** + * QM Operation Tracker - Tracks operation execution times and states for symbols + * Supports dynamic operation registration with auto-indexing + */ + +import type { Logger } from '@stock-bot/types'; +import type { MongoDBClient } from '@stock-bot/mongodb'; +import type { IntradayCrawlSymbol, QMOperationConfig } from './types'; + +export class QMOperationTracker { + private registeredOperations: Map = new Map(); + private indexesCreated: Set = new Set(); + private mongodb: MongoDBClient; + private logger: Logger; + private readonly collectionName = 'qmSymbols'; + + constructor(mongodb: MongoDBClient, logger: Logger) { + this.mongodb = mongodb; + this.logger = logger; + } + + /** + * Register a new operation type with auto-indexing + */ + async registerOperation(config: QMOperationConfig): Promise { + this.logger.info('Registering QM operation', { operation: config.name, type: config.type }); + + this.registeredOperations.set(config.name, config); + + // Auto-create indexes for this operation + await this.createOperationIndexes(config.name); + + this.logger.debug('Operation registered successfully', { operation: config.name }); + } + + /** + * Create indexes for efficient operation queries + */ + private async createOperationIndexes(operationName: string): Promise { + if (this.indexesCreated.has(operationName)) { + this.logger.debug('Indexes already created for operation', { operation: operationName }); + return; + } + + try { + const indexes = [ + // Index for finding stale symbols + { [`operations.${operationName}.lastRunAt`]: 1, symbol: 1 }, + // Index for finding by last record date + { [`operations.${operationName}.lastRecordDate`]: 1, symbol: 1 }, + ]; + + // Add crawl state index for intraday operations + const config = this.registeredOperations.get(operationName); + if (config?.type === 'intraday_crawl') { + indexes.push({ [`operations.${operationName}.crawlState.finished`]: 1, symbol: 1 }); + } + + for (const indexSpec of indexes) { + await this.mongodb.createIndex(this.collectionName, indexSpec, { + background: true, + name: `op_${operationName}_${Object.keys(indexSpec).join('_')}` + }); + } + + this.indexesCreated.add(operationName); + this.logger.info('Created indexes for operation', { + operation: operationName, + indexCount: indexes.length + }); + } catch (error) { + this.logger.error('Failed to create indexes for operation', { + operation: operationName, + error + }); + throw error; + } + } + + /** + * Update symbol operation status + */ + async updateSymbolOperation( + symbol: string, + operationName: string, + data: { + status: 'success' | 'failure' | 'partial'; + lastRecordDate?: Date; + recordCount?: number; + crawlState?: { + finished?: boolean; + oldestDateReached?: Date; + }; + } + ): Promise { + const update: any = { + $set: { + [`operations.${operationName}.lastRunAt`]: new Date(), + [`operations.${operationName}.status`]: data.status, + updated_at: new Date() + } + }; + + if (data.lastRecordDate) { + update.$set[`operations.${operationName}.lastRecordDate`] = data.lastRecordDate; + } + + if (data.recordCount !== undefined) { + update.$set[`operations.${operationName}.recordCount`] = data.recordCount; + } + + if (data.crawlState) { + update.$set[`operations.${operationName}.crawlState`] = { + ...data.crawlState, + lastCrawlDirection: data.crawlState.finished ? 'forward' : 'backward' + }; + } + + await this.mongodb.updateOne(this.collectionName, { symbol }, update); + + this.logger.trace('Updated symbol operation', { + symbol, + operation: operationName, + status: data.status + }); + } + + /** + * Bulk update symbol operations for performance + */ + async bulkUpdateSymbolOperations( + updates: Array<{ + symbol: string; + operation: string; + data: { + status: 'success' | 'failure' | 'partial'; + lastRecordDate?: Date; + recordCount?: number; + crawlState?: any; + }; + }> + ): Promise { + if (updates.length === 0) return; + + const bulkOps = updates.map(({ symbol, operation, data }) => { + const update: any = { + $set: { + [`operations.${operation}.lastRunAt`]: new Date(), + [`operations.${operation}.status`]: data.status, + updated_at: new Date() + } + }; + + if (data.lastRecordDate) { + update.$set[`operations.${operation}.lastRecordDate`] = data.lastRecordDate; + } + + if (data.recordCount !== undefined) { + update.$set[`operations.${operation}.recordCount`] = data.recordCount; + } + + if (data.crawlState) { + update.$set[`operations.${operation}.crawlState`] = { + ...data.crawlState, + lastCrawlDirection: data.crawlState.finished ? 'forward' : 'backward' + }; + } + + return { + updateOne: { + filter: { symbol }, + update + } + }; + }); + + const collection = this.mongodb.getCollection(this.collectionName); + const result = await collection.bulkWrite(bulkOps as any, { ordered: false }); + + this.logger.debug('Bulk updated symbol operations', { + totalUpdates: updates.length, + modified: result.modifiedCount, + operations: [...new Set(updates.map(u => u.operation))] + }); + } + + /** + * Get symbols that need processing for an operation + */ + async getStaleSymbols( + operationName: string, + options: { + notRunSince?: Date; + minHoursSinceRun?: number; + limit?: number; + excludeSymbols?: string[]; + } = {} + ): Promise { + const { limit = 1000, excludeSymbols = [] } = options; + + const cutoffDate = options.notRunSince || (() => { + const date = new Date(); + const hours = options.minHoursSinceRun || + this.registeredOperations.get(operationName)?.defaultStaleHours || 24; + date.setHours(date.getHours() - hours); + return date; + })(); + + const filter: any = { + $or: [ + { [`operations.${operationName}.lastRunAt`]: { $lt: cutoffDate } }, + { [`operations.${operationName}`]: { $exists: false } } + ] + }; + + if (excludeSymbols.length > 0) { + filter.symbol = { $nin: excludeSymbols }; + } + + const symbols = await this.mongodb.find(this.collectionName, filter, { + limit, + projection: { symbol: 1 }, + sort: { [`operations.${operationName}.lastRunAt`]: 1 } // Oldest first + }); + + return symbols.map(s => s.symbol); + } + + /** + * Get symbols for intraday crawling + */ + async getSymbolsForIntradayCrawl( + operationName: string, + options: { + limit?: number; + includeFinished?: boolean; + } = {} + ): Promise { + const { limit = 100, includeFinished = false } = options; + + const filter: any = {}; + if (!includeFinished) { + filter[`operations.${operationName}.crawlState.finished`] = { $ne: true }; + } + + const symbols = await this.mongodb.find(this.collectionName, filter, { + limit, + projection: { + symbol: 1, + [`operations.${operationName}`]: 1 + }, + sort: { + // Prioritize symbols that haven't been crawled yet + [`operations.${operationName}.lastRunAt`]: 1 + } + }); + + return symbols.map(s => ({ + symbol: s.symbol, + lastRecordDate: s.operations?.[operationName]?.lastRecordDate, + crawlState: s.operations?.[operationName]?.crawlState + })); + } + + /** + * Mark intraday crawl as finished + */ + async markCrawlFinished( + symbol: string, + operationName: string, + oldestDateReached: Date + ): Promise { + await this.updateSymbolOperation(symbol, operationName, { + status: 'success', + crawlState: { + finished: true, + oldestDateReached + } + }); + + this.logger.info('Marked crawl as finished', { + symbol, + operation: operationName, + oldestDateReached + }); + } + + /** + * Get symbols that need data updates based on last record date + */ + async getSymbolsNeedingUpdate( + operationName: string, + options: { + lastRecordBefore?: Date; + neverRun?: boolean; + limit?: number; + } = {} + ): Promise> { + const { limit = 500 } = options; + const filter: any = {}; + + if (options.neverRun) { + filter[`operations.${operationName}`] = { $exists: false }; + } else if (options.lastRecordBefore) { + filter.$or = [ + { [`operations.${operationName}.lastRecordDate`]: { $lt: options.lastRecordBefore } }, + { [`operations.${operationName}`]: { $exists: false } } + ]; + } + + const symbols = await this.mongodb.find(this.collectionName, filter, { + limit, + projection: { + symbol: 1, + [`operations.${operationName}.lastRecordDate`]: 1 + }, + sort: { [`operations.${operationName}.lastRecordDate`]: 1 } // Oldest data first + }); + + return symbols.map(s => ({ + symbol: s.symbol, + lastRecordDate: s.operations?.[operationName]?.lastRecordDate + })); + } + + /** + * Get operation statistics + */ + async getOperationStats(operationName: string): Promise<{ + totalSymbols: number; + processedSymbols: number; + staleSymbols: number; + successfulSymbols: number; + failedSymbols: number; + finishedCrawls?: number; + avgRecordsPerSymbol?: number; + }> { + const total = await this.mongodb.countDocuments(this.collectionName); + + const processed = await this.mongodb.countDocuments(this.collectionName, { + [`operations.${operationName}`]: { $exists: true } + }); + + const successful = await this.mongodb.countDocuments(this.collectionName, { + [`operations.${operationName}.status`]: 'success' + }); + + const failed = await this.mongodb.countDocuments(this.collectionName, { + [`operations.${operationName}.status`]: 'failure' + }); + + const staleDate = new Date(); + staleDate.setHours(staleDate.getHours() - ( + this.registeredOperations.get(operationName)?.defaultStaleHours || 24 + )); + + const stale = await this.mongodb.countDocuments(this.collectionName, { + $or: [ + { [`operations.${operationName}.lastRunAt`]: { $lt: staleDate } }, + { [`operations.${operationName}`]: { $exists: false } } + ] + }); + + const result: any = { + totalSymbols: total, + processedSymbols: processed, + staleSymbols: stale, + successfulSymbols: successful, + failedSymbols: failed + }; + + // Additional stats for crawl operations + if (this.registeredOperations.get(operationName)?.type === 'intraday_crawl') { + result.finishedCrawls = await this.mongodb.countDocuments(this.collectionName, { + [`operations.${operationName}.crawlState.finished`]: true + }); + } + + // Calculate average records per symbol + const aggregation = await this.mongodb.aggregate(this.collectionName, [ + { + $match: { + [`operations.${operationName}.recordCount`]: { $exists: true } + } + }, + { + $group: { + _id: null, + avgRecords: { $avg: `$operations.${operationName}.recordCount` } + } + } + ]); + + if (aggregation.length > 0) { + result.avgRecordsPerSymbol = Math.round(aggregation[0].avgRecords); + } + + return result; + } + + /** + * Get all registered operations + */ + getRegisteredOperations(): QMOperationConfig[] { + return Array.from(this.registeredOperations.values()); + } + + /** + * Helper: Get symbols for price update + */ + async getSymbolsForPriceUpdate(limit = 1000): Promise { + return this.getStaleSymbols('price_update', { + minHoursSinceRun: 24, + limit + }); + } + + /** + * Helper: Get symbols with outdated financials + */ + async getSymbolsWithOldFinancials(limit = 100): Promise> { + const cutoffDate = new Date(); + cutoffDate.setDate(cutoffDate.getDate() - 90); // 90 days old + + return this.getSymbolsNeedingUpdate('financials_update', { + lastRecordBefore: cutoffDate, + limit + }); + } + + /** + * Helper: Get unprocessed symbols for an operation + */ + async getUnprocessedSymbols(operation: string, limit = 500): Promise { + const symbols = await this.getSymbolsNeedingUpdate(operation, { + neverRun: true, + limit + }); + return symbols.map(s => s.symbol); + } +} \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/qm/shared/types.ts b/apps/stock/data-ingestion/src/handlers/qm/shared/types.ts index fcbe3d1..c3794ed 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/shared/types.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/shared/types.ts @@ -51,3 +51,44 @@ export interface CachedSession extends QMSession { id: string; sessionType: string; } + +/** + * Operation tracking types + */ +export interface QMOperationConfig { + name: string; + type: 'standard' | 'intraday_crawl'; + description?: string; + defaultStaleHours?: number; + requiresFinishedFlag?: boolean; +} + +export interface QMSymbolOperationStatus { + symbol: string; + qmSearchCode: string; + operations: { + [operationName: string]: { + lastRunAt: Date; + lastRecordDate?: Date; + status: 'success' | 'failure' | 'partial'; + recordCount?: number; + // For intraday crawling operations + crawlState?: { + finished: boolean; + oldestDateReached?: Date; + lastCrawlDirection?: 'forward' | 'backward'; + }; + }; + }; + updated_at: Date; +} + +export interface IntradayCrawlSymbol { + symbol: string; + lastRecordDate?: Date; + crawlState?: { + finished: boolean; + oldestDateReached?: Date; + lastCrawlDirection?: 'forward' | 'backward'; + }; +}