From c799962f0570eeff11d7621713a704409ee94905 Mon Sep 17 00:00:00 2001 From: Boki Date: Sat, 28 Jun 2025 20:48:17 -0400 Subject: [PATCH] qm scaffolding done --- .../qm/actions/corporate-actions.action.ts | 276 ++++++++++++++++ .../src/handlers/qm/actions/filings.action.ts | 245 ++++++++++++++ .../handlers/qm/actions/financials.action.ts | 236 ++++++++++++++ .../src/handlers/qm/actions/index.ts | 8 +- .../handlers/qm/actions/intraday.action.ts | 302 ++++++++++++++++++ .../src/handlers/qm/actions/price.action.ts | 259 +-------------- .../src/handlers/qm/actions/prices.action.ts | 249 +++++++++++++++ .../handlers/qm/actions/symbol-info.action.ts | 230 +++++++++++++ .../src/handlers/qm/qm.handler.ts | 117 +++++-- .../handlers/qm/shared/operation-registry.ts | 82 ++--- .../handlers/qm/shared/operation-tracker.ts | 25 +- 11 files changed, 1693 insertions(+), 336 deletions(-) create mode 100644 apps/stock/data-ingestion/src/handlers/qm/actions/corporate-actions.action.ts create mode 100644 apps/stock/data-ingestion/src/handlers/qm/actions/filings.action.ts create mode 100644 apps/stock/data-ingestion/src/handlers/qm/actions/financials.action.ts create mode 100644 apps/stock/data-ingestion/src/handlers/qm/actions/intraday.action.ts create mode 100644 apps/stock/data-ingestion/src/handlers/qm/actions/prices.action.ts create mode 100644 apps/stock/data-ingestion/src/handlers/qm/actions/symbol-info.action.ts diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/corporate-actions.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/corporate-actions.action.ts new file mode 100644 index 0000000..7510945 --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/corporate-actions.action.ts @@ -0,0 +1,276 @@ +/** + * QM Corporate Actions - Fetch and update dividends, splits, and earnings together + */ + +import type { BaseHandler, ExecutionContext } from '@stock-bot/handlers'; +import { QM_CONFIG, QM_SESSION_IDS } from '../shared/config'; +import { QMSessionManager } from '../shared/session-manager'; +import { QMOperationTracker } from '../shared/operation-tracker'; + +// Cache tracker instance +let operationTracker: QMOperationTracker | null = null; + +/** + * Get or initialize the operation tracker + */ +async function getOperationTracker(handler: BaseHandler): Promise { + if (!operationTracker) { + const { initializeQMOperations } = await import('../shared/operation-registry'); + operationTracker = await initializeQMOperations(handler.mongodb, handler.logger); + } + return operationTracker; +} + +/** + * Update corporate actions (dividends, splits, earnings) for a single symbol + * Single API call returns all three data types + */ +export async function updateCorporateActions( + this: BaseHandler, + input: { + symbol: string; + symbolId: number; + }, + _context?: ExecutionContext +): Promise<{ + success: boolean; + symbol: string; + message: string; + data?: { + dividends: number; + splits: number; + earnings: number; + }; +}> { + const { symbol, symbolId } = input; + + this.logger.info('Fetching corporate actions', { symbol, symbolId }); + + const sessionManager = QMSessionManager.getInstance(); + sessionManager.initialize(this.cache, this.logger); + + // Get a session - you'll need to add the appropriate session ID + const sessionId = QM_SESSION_IDS.LOOKUP; // TODO: Update with correct session ID + const session = await sessionManager.getSession(sessionId); + + if (!session || !session.uuid) { + throw new Error(`No active session found for QM corporate actions`); + } + + try { + // Build API request for corporate actions + const searchParams = new URLSearchParams({ + symbol: symbol, + symbolId: symbolId.toString(), + qmodTool: 'CorporateActions', // Assuming this returns all three + webmasterId: '500' + }); + + // TODO: Update with correct corporate actions endpoint + const apiUrl = `${QM_CONFIG.BASE_URL}/datatool/corporate-actions.json?${searchParams.toString()}`; + + const response = await fetch(apiUrl, { + method: 'GET', + headers: session.headers, + proxy: session.proxy, + }); + + if (!response.ok) { + throw new Error(`QM API request failed: ${response.status} ${response.statusText}`); + } + + const corporateData = await response.json(); + + // Update session success stats + await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid); + + const tracker = await getOperationTracker(this); + let dividendCount = 0; + let splitCount = 0; + let earningsCount = 0; + + // Process dividends + if (corporateData.dividends && corporateData.dividends.length > 0) { + await this.mongodb.batchUpsert( + 'qmDividends', + corporateData.dividends.map((dividend: any) => ({ + ...dividend, + symbol, + symbolId, + updated_at: new Date() + })), + ['symbol', 'exDate', 'recordDate'] + ); + dividendCount = corporateData.dividends.length; + } + + // Process splits + if (corporateData.splits && corporateData.splits.length > 0) { + await this.mongodb.batchUpsert( + 'qmSplits', + corporateData.splits.map((split: any) => ({ + ...split, + symbol, + symbolId, + updated_at: new Date() + })), + ['symbol', 'splitDate', 'ratio'] + ); + splitCount = corporateData.splits.length; + } + + // Process earnings + if (corporateData.earnings && corporateData.earnings.length > 0) { + await this.mongodb.batchUpsert( + 'qmEarnings', + corporateData.earnings.map((earning: any) => ({ + ...earning, + symbol, + symbolId, + updated_at: new Date() + })), + ['symbol', 'reportDate', 'fiscalQuarter'] + ); + earningsCount = corporateData.earnings.length; + } + + // Update tracking for corporate actions + const updateTime = new Date(); + + await tracker.updateSymbolOperation(symbol, 'corporate_actions_update', { + status: 'success', + lastRecordDate: updateTime, + recordCount: dividendCount + splitCount + earningsCount + }); + + this.logger.info('Corporate actions updated successfully', { + symbol, + dividendCount, + splitCount, + earningsCount + }); + + return { + success: true, + symbol, + message: `Corporate actions updated for ${symbol}`, + data: { + dividends: dividendCount, + splits: splitCount, + earnings: earningsCount + } + }; + + } catch (error) { + // Update session failure stats + if (session.uuid) { + await sessionManager.incrementFailedCalls(sessionId, session.uuid); + } + + this.logger.error('Error fetching corporate actions', { + symbol, + error: error instanceof Error ? error.message : 'Unknown error' + }); + + // Track failure for corporate actions + const tracker = await getOperationTracker(this); + + await tracker.updateSymbolOperation(symbol, 'corporate_actions_update', { + status: 'failure' + }); + + return { + success: false, + symbol, + message: `Failed to fetch corporate actions: ${error instanceof Error ? error.message : 'Unknown error'}` + }; + } +} + +/** + * Schedule corporate actions updates for symbols that need refreshing + */ +export async function scheduleCorporateActionsUpdates( + this: BaseHandler, + input: { + limit?: number; + forceUpdate?: boolean; + } = {}, + _context?: ExecutionContext +): Promise<{ + message: string; + symbolsQueued: number; + errors: number; +}> { + const { limit = 100, forceUpdate = false } = input; + const tracker = await getOperationTracker(this); + + this.logger.info('Scheduling corporate actions updates', { limit, forceUpdate }); + + try { + // Get symbols that need corporate actions updates + const staleSymbols = await tracker.getStaleSymbols('corporate_actions_update', { + minHoursSinceRun: forceUpdate ? 0 : 24 * 7, // Weekly + limit + }); + + if (staleSymbols.length === 0) { + this.logger.info('No symbols need corporate actions updates'); + return { + message: 'No symbols need corporate actions updates', + symbolsQueued: 0, + errors: 0 + }; + } + + this.logger.info(`Found ${staleSymbols.length} symbols needing corporate actions updates`); + + // Get full symbol data to include symbolId + const symbolDocs = await this.mongodb.find('qmSymbols', { + symbol: { $in: staleSymbols.slice(0, limit) } // Apply limit after deduplication + }, { + projection: { symbol: 1, symbolId: 1 } + }); + + let queued = 0; + let errors = 0; + + // Schedule individual update jobs for each symbol + for (const doc of symbolDocs) { + try { + if (!doc.symbolId) { + this.logger.warn(`Symbol ${doc.symbol} missing symbolId, skipping`); + continue; + } + + await this.scheduleOperation('update-corporate-actions', { + symbol: doc.symbol, + symbolId: doc.symbolId + }, { + priority: 4, + delay: queued * 1500 // 1.5 seconds between jobs + }); + + queued++; + } catch (error) { + this.logger.error(`Failed to schedule corporate actions update for ${doc.symbol}`, { error }); + errors++; + } + } + + this.logger.info('Corporate actions update scheduling completed', { + symbolsQueued: queued, + errors, + total: staleSymbols.length + }); + + return { + message: `Scheduled corporate actions updates for ${queued} symbols`, + symbolsQueued: queued, + errors + }; + } catch (error) { + this.logger.error('Corporate actions scheduling failed', { error }); + throw error; + } +} \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/filings.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/filings.action.ts new file mode 100644 index 0000000..b4b98ff --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/filings.action.ts @@ -0,0 +1,245 @@ +/** + * QM Filings Actions - Fetch and update SEC filings + */ + +import type { BaseHandler, ExecutionContext } from '@stock-bot/handlers'; +import { QM_CONFIG, QM_SESSION_IDS } from '../shared/config'; +import { QMSessionManager } from '../shared/session-manager'; +import { QMOperationTracker } from '../shared/operation-tracker'; + +// Cache tracker instance +let operationTracker: QMOperationTracker | null = null; + +/** + * Get or initialize the operation tracker + */ +async function getOperationTracker(handler: BaseHandler): Promise { + if (!operationTracker) { + const { initializeQMOperations } = await import('../shared/operation-registry'); + operationTracker = await initializeQMOperations(handler.mongodb, handler.logger); + } + return operationTracker; +} + +/** + * Update filings for a single symbol + */ +export async function updateFilings( + this: BaseHandler, + input: { + symbol: string; + symbolId: number; + }, + _context?: ExecutionContext +): Promise<{ + success: boolean; + symbol: string; + message: string; + data?: any; +}> { + const { symbol, symbolId } = input; + + this.logger.info('Fetching filings', { symbol, symbolId }); + + const sessionManager = QMSessionManager.getInstance(); + sessionManager.initialize(this.cache, this.logger); + + // Get a session - you'll need to add the appropriate session ID for filings + const sessionId = QM_SESSION_IDS.LOOKUP; // TODO: Update with correct session ID + const session = await sessionManager.getSession(sessionId); + + if (!session || !session.uuid) { + throw new Error(`No active session found for QM filings`); + } + + try { + // Build API request for filings + const searchParams = new URLSearchParams({ + symbol: symbol, + symbolId: symbolId.toString(), + qmodTool: 'Filings', + webmasterId: '500', + limit: '50' // Get recent filings + }); + + // TODO: Update with correct filings endpoint + const apiUrl = `${QM_CONFIG.BASE_URL}/datatool/filings.json?${searchParams.toString()}`; + + const response = await fetch(apiUrl, { + method: 'GET', + headers: session.headers, + proxy: session.proxy, + }); + + if (!response.ok) { + throw new Error(`QM API request failed: ${response.status} ${response.statusText}`); + } + + const filingsData = await response.json(); + + // Update session success stats + await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid); + + // Process and store filings data + if (filingsData && filingsData.length > 0) { + // Store filings in a separate collection + await this.mongodb.batchUpsert( + 'qmFilings', + filingsData.map((filing: any) => ({ + ...filing, + symbol, + symbolId, + updated_at: new Date() + })), + ['symbol', 'filingDate', 'formType', 'accessionNumber'] // Unique keys + ); + + // Update symbol to track last filings update + const tracker = await getOperationTracker(this); + await tracker.updateSymbolOperation(symbol, 'filings_update', { + status: 'success', + lastRecordDate: new Date(), + recordCount: filingsData.length + }); + + this.logger.info('Filings updated successfully', { + symbol, + filingsCount: filingsData.length + }); + + return { + success: true, + symbol, + message: `Filings updated for ${symbol}`, + data: { count: filingsData.length } + }; + } else { + // Some symbols may not have filings (non-US companies, etc) + const tracker = await getOperationTracker(this); + await tracker.updateSymbolOperation(symbol, 'filings_update', { + status: 'success', + lastRecordDate: new Date(), + recordCount: 0 + }); + + this.logger.info('No filings found for symbol', { symbol }); + return { + success: true, + symbol, + message: `No filings found for ${symbol}`, + data: { count: 0 } + }; + } + + } catch (error) { + // Update session failure stats + if (session.uuid) { + await sessionManager.incrementFailedCalls(sessionId, session.uuid); + } + + this.logger.error('Error fetching filings', { + symbol, + error: error instanceof Error ? error.message : 'Unknown error' + }); + + // Track failure + const tracker = await getOperationTracker(this); + await tracker.updateSymbolOperation(symbol, 'filings_update', { + status: 'failure' + }); + + return { + success: false, + symbol, + message: `Failed to fetch filings: ${error instanceof Error ? error.message : 'Unknown error'}` + }; + } +} + +/** + * Schedule filings updates for symbols that need refreshing + */ +export async function scheduleFilingsUpdates( + this: BaseHandler, + input: { + limit?: number; + forceUpdate?: boolean; + } = {}, + _context?: ExecutionContext +): Promise<{ + message: string; + symbolsQueued: number; + errors: number; +}> { + const { limit = 100, forceUpdate = false } = input; + const tracker = await getOperationTracker(this); + + this.logger.info('Scheduling filings updates', { limit, forceUpdate }); + + try { + // Get symbols that need updating + const staleSymbols = await tracker.getStaleSymbols('filings_update', { + minHoursSinceRun: forceUpdate ? 0 : 24, // Daily for filings + limit + }); + + if (staleSymbols.length === 0) { + this.logger.info('No symbols need filings updates'); + return { + message: 'No symbols need filings updates', + symbolsQueued: 0, + errors: 0 + }; + } + + this.logger.info(`Found ${staleSymbols.length} symbols needing filings updates`); + + // Get full symbol data to include symbolId + const symbolDocs = await this.mongodb.find('qmSymbols', { + symbol: { $in: staleSymbols } + }, { + projection: { symbol: 1, symbolId: 1 } + }); + + let queued = 0; + let errors = 0; + + // Schedule individual update jobs for each symbol + for (const doc of symbolDocs) { + try { + if (!doc.symbolId) { + this.logger.warn(`Symbol ${doc.symbol} missing symbolId, skipping`); + continue; + } + + await this.scheduleOperation('update-filings', { + symbol: doc.symbol, + symbolId: doc.symbolId + }, { + priority: 5, // Lower priority than financial data + delay: queued * 2000 // 2 seconds between jobs + }); + + queued++; + } catch (error) { + this.logger.error(`Failed to schedule filings update for ${doc.symbol}`, { error }); + errors++; + } + } + + this.logger.info('Filings update scheduling completed', { + symbolsQueued: queued, + errors, + total: staleSymbols.length + }); + + return { + message: `Scheduled filings updates for ${queued} symbols`, + symbolsQueued: queued, + errors + }; + } catch (error) { + this.logger.error('Filings scheduling failed', { error }); + throw error; + } +} \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/financials.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/financials.action.ts new file mode 100644 index 0000000..7c4d7fd --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/financials.action.ts @@ -0,0 +1,236 @@ +/** + * QM Financials Actions - Fetch and update financial statements + */ + +import type { BaseHandler, ExecutionContext } from '@stock-bot/handlers'; +import { QM_CONFIG, QM_SESSION_IDS } from '../shared/config'; +import { QMSessionManager } from '../shared/session-manager'; +import { QMOperationTracker } from '../shared/operation-tracker'; + +// Cache tracker instance +let operationTracker: QMOperationTracker | null = null; + +/** + * Get or initialize the operation tracker + */ +async function getOperationTracker(handler: BaseHandler): Promise { + if (!operationTracker) { + const { initializeQMOperations } = await import('../shared/operation-registry'); + operationTracker = await initializeQMOperations(handler.mongodb, handler.logger); + } + return operationTracker; +} + +/** + * Update financials for a single symbol + */ +export async function updateFinancials( + this: BaseHandler, + input: { + symbol: string; + symbolId: number; + }, + _context?: ExecutionContext +): Promise<{ + success: boolean; + symbol: string; + message: string; + data?: any; +}> { + const { symbol, symbolId } = input; + + this.logger.info('Fetching financials', { symbol, symbolId }); + + const sessionManager = QMSessionManager.getInstance(); + sessionManager.initialize(this.cache, this.logger); + + // Get a session - you'll need to add the appropriate session ID for financials + const sessionId = QM_SESSION_IDS.LOOKUP; // TODO: Update with correct session ID + const session = await sessionManager.getSession(sessionId); + + if (!session || !session.uuid) { + throw new Error(`No active session found for QM financials`); + } + + try { + // Build API request for financials + const searchParams = new URLSearchParams({ + symbol: symbol, + symbolId: symbolId.toString(), + qmodTool: 'Financials', + webmasterId: '500' + }); + + // TODO: Update with correct financials endpoint + const apiUrl = `${QM_CONFIG.BASE_URL}/datatool/financials.json?${searchParams.toString()}`; + + const response = await fetch(apiUrl, { + method: 'GET', + headers: session.headers, + proxy: session.proxy, + }); + + if (!response.ok) { + throw new Error(`QM API request failed: ${response.status} ${response.statusText}`); + } + + const financialData = await response.json(); + + // Update session success stats + await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid); + + // Process and store financial data + if (financialData && financialData.length > 0) { + // Store financial statements in a separate collection + await this.mongodb.batchUpsert( + 'qmFinancials', + financialData.map((statement: any) => ({ + ...statement, + symbol, + symbolId, + updated_at: new Date() + })), + ['symbol', 'period', 'statementType'] // Unique keys + ); + + // Update symbol to track last financials update + const tracker = await getOperationTracker(this); + await tracker.updateSymbolOperation(symbol, 'financials_update', { + status: 'success', + lastRecordDate: new Date(), + recordCount: financialData.length + }); + + this.logger.info('Financials updated successfully', { + symbol, + statementCount: financialData.length + }); + + return { + success: true, + symbol, + message: `Financials updated for ${symbol}`, + data: { count: financialData.length } + }; + } else { + this.logger.warn('No financial data returned from API', { symbol }); + return { + success: false, + symbol, + message: `No financial data found for symbol ${symbol}` + }; + } + + } catch (error) { + // Update session failure stats + if (session.uuid) { + await sessionManager.incrementFailedCalls(sessionId, session.uuid); + } + + this.logger.error('Error fetching financials', { + symbol, + error: error instanceof Error ? error.message : 'Unknown error' + }); + + // Track failure + const tracker = await getOperationTracker(this); + await tracker.updateSymbolOperation(symbol, 'financials_update', { + status: 'failure', + lastRunAt: new Date() + }); + + return { + success: false, + symbol, + message: `Failed to fetch financials: ${error instanceof Error ? error.message : 'Unknown error'}` + }; + } +} + +/** + * Schedule financial updates for symbols that need refreshing + */ +export async function scheduleFinancialsUpdates( + this: BaseHandler, + input: { + limit?: number; + forceUpdate?: boolean; + } = {}, + _context?: ExecutionContext +): Promise<{ + message: string; + symbolsQueued: number; + errors: number; +}> { + const { limit = 100, forceUpdate = false } = input; + const tracker = await getOperationTracker(this); + + this.logger.info('Scheduling financials updates', { limit, forceUpdate }); + + try { + // Get symbols that need updating + const staleSymbols = await tracker.getStaleSymbols('financials_update', { + minHoursSinceRun: forceUpdate ? 0 : 24 * 7, // Weekly by default + limit + }); + + if (staleSymbols.length === 0) { + this.logger.info('No symbols need financials updates'); + return { + message: 'No symbols need financials updates', + symbolsQueued: 0, + errors: 0 + }; + } + + this.logger.info(`Found ${staleSymbols.length} symbols needing financials updates`); + + // Get full symbol data to include symbolId + const symbolDocs = await this.mongodb.find('qmSymbols', { + symbol: { $in: staleSymbols } + }, { + projection: { symbol: 1, symbolId: 1 } + }); + + let queued = 0; + let errors = 0; + + // Schedule individual update jobs for each symbol + for (const doc of symbolDocs) { + try { + if (!doc.symbolId) { + this.logger.warn(`Symbol ${doc.symbol} missing symbolId, skipping`); + continue; + } + + await this.scheduleOperation('update-financials', { + symbol: doc.symbol, + symbolId: doc.symbolId + }, { + priority: 4, + delay: queued * 2000 // 2 seconds between jobs + }); + + queued++; + } catch (error) { + this.logger.error(`Failed to schedule financials update for ${doc.symbol}`, { error }); + errors++; + } + } + + this.logger.info('Financials update scheduling completed', { + symbolsQueued: queued, + errors, + total: staleSymbols.length + }); + + return { + message: `Scheduled financials updates for ${queued} symbols`, + symbolsQueued: queued, + errors + }; + } catch (error) { + this.logger.error('Financials scheduling failed', { error }); + throw error; + } +} \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/index.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/index.ts index 17a7490..2a2e201 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/index.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/index.ts @@ -4,5 +4,11 @@ export { checkSessions, createSession } from './session.action'; export { searchSymbols, spiderSymbol } from './symbol.action'; -export { updatePrices, updateIntradayBars, getOperationStats } from './price.action'; +export { getOperationStats } from './price.action'; +export { updateSymbolInfo, scheduleSymbolInfoUpdates } from './symbol-info.action'; +export { updateFinancials, scheduleFinancialsUpdates } from './financials.action'; +export { updateCorporateActions, scheduleCorporateActionsUpdates } from './corporate-actions.action'; +export { updateFilings, scheduleFilingsUpdates } from './filings.action'; +export { updatePrices, schedulePriceUpdates } from './prices.action'; +export { updateIntradayBars, scheduleIntradayUpdates } from './intraday.action'; diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/intraday.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/intraday.action.ts new file mode 100644 index 0000000..96bb4f7 --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/intraday.action.ts @@ -0,0 +1,302 @@ +/** + * QM Intraday Actions - Fetch and update intraday price bars + */ + +import type { BaseHandler, ExecutionContext } from '@stock-bot/handlers'; +import { QM_CONFIG, QM_SESSION_IDS } from '../shared/config'; +import { QMSessionManager } from '../shared/session-manager'; +import { QMOperationTracker } from '../shared/operation-tracker'; + +// Cache tracker instance +let operationTracker: QMOperationTracker | null = null; + +/** + * Get or initialize the operation tracker + */ +async function getOperationTracker(handler: BaseHandler): Promise { + if (!operationTracker) { + const { initializeQMOperations } = await import('../shared/operation-registry'); + operationTracker = await initializeQMOperations(handler.mongodb, handler.logger); + } + return operationTracker; +} + +/** + * Update intraday bars for a single symbol + * This handles both initial crawl and incremental updates + */ +export async function updateIntradayBars( + this: BaseHandler, + input: { + symbol: string; + symbolId: number; + crawlDate?: string; // ISO date string for specific date crawl + }, + _context?: ExecutionContext +): Promise<{ + success: boolean; + symbol: string; + message: string; + data?: any; +}> { + const { symbol, symbolId, crawlDate } = input; + + this.logger.info('Fetching intraday bars', { symbol, symbolId, crawlDate }); + + const sessionManager = QMSessionManager.getInstance(); + sessionManager.initialize(this.cache, this.logger); + + // Get a session - you'll need to add the appropriate session ID for intraday + const sessionId = QM_SESSION_IDS.LOOKUP; // TODO: Update with correct session ID + const session = await sessionManager.getSession(sessionId); + + if (!session || !session.uuid) { + throw new Error(`No active session found for QM intraday`); + } + + try { + // Determine the date to fetch + const targetDate = crawlDate ? new Date(crawlDate) : new Date(); + + // Build API request for intraday bars + const searchParams = new URLSearchParams({ + symbol: symbol, + symbolId: symbolId.toString(), + qmodTool: 'IntradayBars', + webmasterId: '500', + date: targetDate.toISOString().split('T')[0], + interval: '1' // 1-minute bars + }); + + // TODO: Update with correct intraday endpoint + const apiUrl = `${QM_CONFIG.BASE_URL}/datatool/intraday.json?${searchParams.toString()}`; + + const response = await fetch(apiUrl, { + method: 'GET', + headers: session.headers, + proxy: session.proxy, + }); + + if (!response.ok) { + throw new Error(`QM API request failed: ${response.status} ${response.statusText}`); + } + + const barsData = await response.json(); + + // Update session success stats + await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid); + + // Process and store intraday data + if (barsData && barsData.length > 0) { + // Store bars in a separate collection + const processedBars = barsData.map((bar: any) => ({ + ...bar, + symbol, + symbolId, + timestamp: new Date(bar.timestamp), + date: targetDate, + updated_at: new Date() + })); + + await this.mongodb.batchUpsert( + 'qmIntradayBars', + processedBars, + ['symbol', 'timestamp'] // Unique keys + ); + + this.logger.info('Intraday bars updated successfully', { + symbol, + date: targetDate, + barCount: barsData.length + }); + + return { + success: true, + symbol, + message: `Intraday bars updated for ${symbol} on ${targetDate.toISOString().split('T')[0]}`, + data: { + count: barsData.length, + date: targetDate + } + }; + } else { + // No data for this date (weekend, holiday, or no trading) + this.logger.info('No intraday data for date', { symbol, date: targetDate }); + return { + success: true, + symbol, + message: `No intraday data for ${symbol} on ${targetDate.toISOString().split('T')[0]}`, + data: { + count: 0, + date: targetDate + } + }; + } + + } catch (error) { + // Update session failure stats + if (session.uuid) { + await sessionManager.incrementFailedCalls(sessionId, session.uuid); + } + + this.logger.error('Error fetching intraday bars', { + symbol, + error: error instanceof Error ? error.message : 'Unknown error' + }); + + return { + success: false, + symbol, + message: `Failed to fetch intraday bars: ${error instanceof Error ? error.message : 'Unknown error'}` + }; + } +} + +/** + * Schedule intraday updates for symbols + * This handles both initial crawls and regular updates + */ +export async function scheduleIntradayUpdates( + this: BaseHandler, + input: { + limit?: number; + mode?: 'crawl' | 'update'; // crawl for historical, update for recent + forceUpdate?: boolean; + } = {}, + _context?: ExecutionContext +): Promise<{ + message: string; + symbolsQueued: number; + jobsQueued: number; + errors: number; +}> { + const { limit = 50, mode = 'update', forceUpdate = false } = input; + const tracker = await getOperationTracker(this); + + this.logger.info('Scheduling intraday updates', { limit, mode, forceUpdate }); + + try { + let symbolsToProcess: any[] = []; + + if (mode === 'crawl') { + // Get symbols that need historical crawl + symbolsToProcess = await tracker.getSymbolsForIntradayCrawl('intraday_bars', { + limit + }); + } else { + // Get symbols that need regular updates + const staleSymbols = await tracker.getStaleSymbols('intraday_bars', { + minHoursSinceRun: forceUpdate ? 0 : 1, // Hourly updates + limit + }); + + if (staleSymbols.length === 0) { + this.logger.info('No symbols need intraday updates'); + return { + message: 'No symbols need intraday updates', + symbolsQueued: 0, + jobsQueued: 0, + errors: 0 + }; + } + + // Get full symbol data + symbolsToProcess = await this.mongodb.find('qmSymbols', { + symbol: { $in: staleSymbols } + }, { + projection: { symbol: 1, symbolId: 1 } + }); + } + + if (symbolsToProcess.length === 0) { + this.logger.info('No symbols to process for intraday'); + return { + message: 'No symbols to process', + symbolsQueued: 0, + jobsQueued: 0, + errors: 0 + }; + } + + this.logger.info(`Found ${symbolsToProcess.length} symbols for intraday ${mode}`); + + let symbolsQueued = 0; + let jobsQueued = 0; + let errors = 0; + + // Process each symbol + for (const doc of symbolsToProcess) { + try { + if (!doc.symbolId) { + this.logger.warn(`Symbol ${doc.symbol} missing symbolId, skipping`); + continue; + } + + if (mode === 'crawl' && doc.crawlState) { + // For crawl mode, schedule multiple days going backwards + const startDate = doc.crawlState.oldestDateReached || new Date(); + const daysToFetch = 30; // Fetch 30 days at a time + + for (let i = 0; i < daysToFetch; i++) { + const crawlDate = new Date(startDate); + crawlDate.setDate(crawlDate.getDate() - i); + + await this.scheduleOperation('update-intraday-bars', { + symbol: doc.symbol, + symbolId: doc.symbolId, + crawlDate: crawlDate.toISOString() + }, { + priority: 6, + delay: jobsQueued * 1000 // 1 second between jobs + }); + + jobsQueued++; + } + + // Update crawl state + await tracker.updateSymbolOperation(doc.symbol, 'intraday_bars', { + status: 'partial', + crawlState: { + finished: false, + oldestDateReached: new Date(startDate.getTime() - daysToFetch * 24 * 60 * 60 * 1000), + lastCrawlDirection: 'backward' + } + }); + } else { + // For update mode, just fetch today's data + await this.scheduleOperation('update-intraday-bars', { + symbol: doc.symbol, + symbolId: doc.symbolId + }, { + priority: 8, // High priority for current data + delay: jobsQueued * 500 // 0.5 seconds between jobs + }); + + jobsQueued++; + } + + symbolsQueued++; + } catch (error) { + this.logger.error(`Failed to schedule intraday update for ${doc.symbol}`, { error }); + errors++; + } + } + + this.logger.info('Intraday update scheduling completed', { + symbolsQueued, + jobsQueued, + errors, + mode + }); + + return { + message: `Scheduled intraday ${mode} for ${symbolsQueued} symbols (${jobsQueued} jobs)`, + symbolsQueued, + jobsQueued, + errors + }; + } catch (error) { + this.logger.error('Intraday scheduling failed', { error }); + throw error; + } +} \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/price.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/price.action.ts index 546d927..7867d3c 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/price.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/price.action.ts @@ -1,5 +1,5 @@ /** - * QM Price Actions - Price data updates with operation tracking + * QM Price Actions - Operation statistics */ import type { BaseHandler, ExecutionContext } from '@stock-bot/handlers'; @@ -19,263 +19,6 @@ async function getOperationTracker(handler: BaseHandler): Promise { - const { limit = 100, symbols } = input; - const tracker = await getOperationTracker(this); - - this.logger.info('Starting price update operation', { limit, specificSymbols: symbols?.length }); - - try { - // Get symbols that need updating - let symbolsToUpdate: string[]; - - if (symbols && symbols.length > 0) { - // Update specific symbols - symbolsToUpdate = symbols; - } else { - // Get stale symbols - symbolsToUpdate = await tracker.getStaleSymbols('price_update', { - minHoursSinceRun: 24, - limit - }); - } - - if (symbolsToUpdate.length === 0) { - this.logger.info('No symbols need price updates'); - return { - message: 'No symbols need price updates', - symbolsUpdated: 0, - errors: 0 - }; - } - - this.logger.info(`Found ${symbolsToUpdate.length} symbols for price update`); - - let updated = 0; - let errors = 0; - const updateResults = []; - - // Process symbols (in real implementation, you'd fetch prices from QM API) - for (const symbol of symbolsToUpdate) { - try { - // TODO: Actual price fetching logic here - // const prices = await fetchPricesFromQM(symbol); - - // For now, simulate the update - const mockPrices = { - symbol, - lastPrice: Math.random() * 1000, - volume: Math.floor(Math.random() * 1000000), - date: new Date() - }; - - // Track the operation - updateResults.push({ - symbol, - operation: 'price_update', - data: { - status: 'success' as const, - lastRecordDate: mockPrices.date, - recordCount: 1 - } - }); - - updated++; - } catch (error) { - this.logger.error(`Failed to update prices for ${symbol}`, { error }); - - updateResults.push({ - symbol, - operation: 'price_update', - data: { - status: 'failure' as const - } - }); - - errors++; - } - } - - // Bulk update operation tracking - if (updateResults.length > 0) { - await tracker.bulkUpdateSymbolOperations(updateResults); - } - - this.logger.info('Price update operation completed', { - symbolsUpdated: updated, - errors, - total: symbolsToUpdate.length - }); - - return { - message: `Updated prices for ${updated} symbols`, - symbolsUpdated: updated, - errors - }; - } catch (error) { - this.logger.error('Price update operation failed', { error }); - throw error; - } -} - -/** - * Update intraday price bars - crawls backwards until no more data - */ -export async function updateIntradayBars( - this: BaseHandler, - input: { - symbol?: string; - limit?: number; - } = {}, - _context?: ExecutionContext -): Promise<{ - message: string; - symbol: string; - barsCollected: number; - crawlFinished: boolean; -}> { - const { symbol, limit = 1 } = input; - const tracker = await getOperationTracker(this); - - try { - // Get symbols for intraday crawl - let symbolData; - if (symbol) { - symbolData = await tracker.getSymbolsForIntradayCrawl('intraday_bars', { - limit: 1 - }).then(symbols => symbols.find(s => s.symbol === symbol)); - } else { - const symbols = await tracker.getSymbolsForIntradayCrawl('intraday_bars', { - limit - }); - symbolData = symbols[0]; - } - - if (!symbolData) { - return { - message: 'No symbols available for intraday crawl', - symbol: '', - barsCollected: 0, - crawlFinished: false - }; - } - - this.logger.info('Processing intraday bars', { - symbol: symbolData.symbol, - crawlState: symbolData.crawlState - }); - - let barsCollected = 0; - let crawlFinished = false; - - if (symbolData.crawlState?.finished) { - // Already finished initial crawl, just update from last record - this.logger.debug('Symbol already crawled, updating from last record', { - symbol: symbolData.symbol, - lastRecord: symbolData.lastRecordDate - }); - - // TODO: Fetch bars from lastRecordDate to now - const newBars = 10; // Mock data - - await tracker.updateSymbolOperation(symbolData.symbol, 'intraday_bars', { - status: 'success', - lastRecordDate: new Date(), - recordCount: (symbolData.crawlState as any).recordCount + newBars - }); - - return { - message: `Updated ${newBars} new bars for ${symbolData.symbol}`, - symbol: symbolData.symbol, - barsCollected: newBars, - crawlFinished: true - }; - } - - // Initial crawl - go backwards until no data - let currentDate = new Date(); - let oldestDate = currentDate; - let totalBars = 0; - let consecutiveEmptyDays = 0; - const maxEmptyDays = 5; // Stop after 5 consecutive days with no data - - while (consecutiveEmptyDays < maxEmptyDays) { - // TODO: Actual bar fetching logic - // const bars = await fetchIntradayBars(symbolData.symbol, currentDate); - - // Mock data - simulate decreasing data as we go back - const bars = currentDate > new Date('2020-01-01') ? Math.floor(Math.random() * 100) : 0; - - if (bars === 0) { - consecutiveEmptyDays++; - } else { - consecutiveEmptyDays = 0; - totalBars += bars; - oldestDate = new Date(currentDate); - } - - // Update progress - await tracker.updateSymbolOperation(symbolData.symbol, 'intraday_bars', { - status: 'partial', - lastRecordDate: new Date(), - recordCount: totalBars, - crawlState: { - finished: false, - oldestDateReached: oldestDate - } - }); - - // Move to previous day - currentDate.setDate(currentDate.getDate() - 1); - - // Limit crawl for this execution - if (totalBars > 1000) { - this.logger.info('Reached bar limit for this execution', { - symbol: symbolData.symbol, - barsCollected: totalBars - }); - break; - } - } - - // Check if we finished the crawl - if (consecutiveEmptyDays >= maxEmptyDays) { - crawlFinished = true; - await tracker.markCrawlFinished(symbolData.symbol, 'intraday_bars', oldestDate); - - this.logger.info('Completed initial crawl for symbol', { - symbol: symbolData.symbol, - totalBars, - oldestDate - }); - } - - return { - message: `Collected ${totalBars} bars for ${symbolData.symbol}`, - symbol: symbolData.symbol, - barsCollected: totalBars, - crawlFinished - }; - } catch (error) { - this.logger.error('Intraday bars update failed', { error }); - throw error; - } -} - /** * Get operation statistics */ diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/prices.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/prices.action.ts new file mode 100644 index 0000000..8333895 --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/prices.action.ts @@ -0,0 +1,249 @@ +/** + * QM Prices Actions - Fetch and update daily price data + */ + +import type { BaseHandler, ExecutionContext } from '@stock-bot/handlers'; +import { QM_CONFIG, QM_SESSION_IDS } from '../shared/config'; +import { QMSessionManager } from '../shared/session-manager'; +import { QMOperationTracker } from '../shared/operation-tracker'; + +// Cache tracker instance +let operationTracker: QMOperationTracker | null = null; + +/** + * Get or initialize the operation tracker + */ +async function getOperationTracker(handler: BaseHandler): Promise { + if (!operationTracker) { + const { initializeQMOperations } = await import('../shared/operation-registry'); + operationTracker = await initializeQMOperations(handler.mongodb, handler.logger); + } + return operationTracker; +} + +/** + * Update daily prices for a single symbol + */ +export async function updatePrices( + this: BaseHandler, + input: { + symbol: string; + symbolId: number; + }, + _context?: ExecutionContext +): Promise<{ + success: boolean; + symbol: string; + message: string; + data?: any; +}> { + const { symbol, symbolId } = input; + + this.logger.info('Fetching daily prices', { symbol, symbolId }); + + const sessionManager = QMSessionManager.getInstance(); + sessionManager.initialize(this.cache, this.logger); + + // Get a session - you'll need to add the appropriate session ID for prices + const sessionId = QM_SESSION_IDS.LOOKUP; // TODO: Update with correct session ID + const session = await sessionManager.getSession(sessionId); + + if (!session || !session.uuid) { + throw new Error(`No active session found for QM prices`); + } + + try { + // Build API request for daily prices + const searchParams = new URLSearchParams({ + symbol: symbol, + symbolId: symbolId.toString(), + qmodTool: 'DailyPrices', + webmasterId: '500', + days: '30' // Get last 30 days + }); + + // TODO: Update with correct prices endpoint + const apiUrl = `${QM_CONFIG.BASE_URL}/datatool/prices.json?${searchParams.toString()}`; + + const response = await fetch(apiUrl, { + method: 'GET', + headers: session.headers, + proxy: session.proxy, + }); + + if (!response.ok) { + throw new Error(`QM API request failed: ${response.status} ${response.statusText}`); + } + + const priceData = await response.json(); + + // Update session success stats + await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid); + + // Process and store price data + if (priceData && priceData.length > 0) { + // Store prices in a separate collection + const processedPrices = priceData.map((price: any) => ({ + ...price, + symbol, + symbolId, + date: new Date(price.date), + updated_at: new Date() + })); + + await this.mongodb.batchUpsert( + 'qmPrices', + processedPrices, + ['symbol', 'date'] // Unique keys + ); + + // Find the latest price date + const latestDate = processedPrices.reduce((latest: Date, price: any) => + price.date > latest ? price.date : latest, + new Date(0) + ); + + // Update symbol to track last price update + const tracker = await getOperationTracker(this); + await tracker.updateSymbolOperation(symbol, 'price_update', { + status: 'success', + lastRecordDate: latestDate, + recordCount: priceData.length + }); + + this.logger.info('Prices updated successfully', { + symbol, + priceCount: priceData.length, + latestDate + }); + + return { + success: true, + symbol, + message: `Prices updated for ${symbol}`, + data: { + count: priceData.length, + latestDate + } + }; + } else { + this.logger.warn('No price data returned from API', { symbol }); + return { + success: false, + symbol, + message: `No price data found for symbol ${symbol}` + }; + } + + } catch (error) { + // Update session failure stats + if (session.uuid) { + await sessionManager.incrementFailedCalls(sessionId, session.uuid); + } + + this.logger.error('Error fetching prices', { + symbol, + error: error instanceof Error ? error.message : 'Unknown error' + }); + + // Track failure + const tracker = await getOperationTracker(this); + await tracker.updateSymbolOperation(symbol, 'price_update', { + status: 'failure' + }); + + return { + success: false, + symbol, + message: `Failed to fetch prices: ${error instanceof Error ? error.message : 'Unknown error'}` + }; + } +} + +/** + * Schedule price updates for symbols that need refreshing + */ +export async function schedulePriceUpdates( + this: BaseHandler, + input: { + limit?: number; + forceUpdate?: boolean; + } = {}, + _context?: ExecutionContext +): Promise<{ + message: string; + symbolsQueued: number; + errors: number; +}> { + const { limit = 100, forceUpdate = false } = input; + const tracker = await getOperationTracker(this); + + this.logger.info('Scheduling price updates', { limit, forceUpdate }); + + try { + // Get symbols that need updating + const staleSymbols = await tracker.getStaleSymbols('price_update', { + minHoursSinceRun: forceUpdate ? 0 : 24, // Daily updates + limit + }); + + if (staleSymbols.length === 0) { + this.logger.info('No symbols need price updates'); + return { + message: 'No symbols need price updates', + symbolsQueued: 0, + errors: 0 + }; + } + + this.logger.info(`Found ${staleSymbols.length} symbols needing price updates`); + + // Get full symbol data to include symbolId + const symbolDocs = await this.mongodb.find('qmSymbols', { + symbol: { $in: staleSymbols } + }, { + projection: { symbol: 1, symbolId: 1 } + }); + + let queued = 0; + let errors = 0; + + // Schedule individual update jobs for each symbol + for (const doc of symbolDocs) { + try { + if (!doc.symbolId) { + this.logger.warn(`Symbol ${doc.symbol} missing symbolId, skipping`); + continue; + } + + await this.scheduleOperation('update-prices', { + symbol: doc.symbol, + symbolId: doc.symbolId + }, { + priority: 7, // High priority for price data + delay: queued * 500 // 0.5 seconds between jobs + }); + + queued++; + } catch (error) { + this.logger.error(`Failed to schedule price update for ${doc.symbol}`, { error }); + errors++; + } + } + + this.logger.info('Price update scheduling completed', { + symbolsQueued: queued, + errors, + total: staleSymbols.length + }); + + return { + message: `Scheduled price updates for ${queued} symbols`, + symbolsQueued: queued, + errors + }; + } catch (error) { + this.logger.error('Price scheduling failed', { error }); + throw error; + } +} \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/symbol-info.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/symbol-info.action.ts new file mode 100644 index 0000000..288175c --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/symbol-info.action.ts @@ -0,0 +1,230 @@ +/** + * QM Symbol Info Actions - Fetch and update symbol metadata + */ + +import type { BaseHandler, ExecutionContext } from '@stock-bot/handlers'; +import { QM_CONFIG, QM_SESSION_IDS } from '../shared/config'; +import { QMSessionManager } from '../shared/session-manager'; +import { QMOperationTracker } from '../shared/operation-tracker'; + +// Cache tracker instance +let operationTracker: QMOperationTracker | null = null; + +/** + * Get or initialize the operation tracker + */ +async function getOperationTracker(handler: BaseHandler): Promise { + if (!operationTracker) { + const { initializeQMOperations } = await import('../shared/operation-registry'); + operationTracker = await initializeQMOperations(handler.mongodb, handler.logger); + } + return operationTracker; +} + +/** + * Update symbol info for a single symbol + * This is a simple API fetch operation - no tracking logic here + */ +export async function updateSymbolInfo( + this: BaseHandler, + input: { + symbol: string; + qmSearchCode: string; + }, + _context?: ExecutionContext +): Promise<{ + success: boolean; + symbol: string; + message: string; + data?: any; +}> { + const { symbol, qmSearchCode } = input; + + this.logger.info('Fetching symbol info', { symbol, qmSearchCode }); + + const sessionManager = QMSessionManager.getInstance(); + sessionManager.initialize(this.cache, this.logger); + + // Get a session + const sessionId = QM_SESSION_IDS.LOOKUP; + const session = await sessionManager.getSession(sessionId); + + if (!session || !session.uuid) { + throw new Error(`No active session found for QM LOOKUP`); + } + + try { + // Build API request for symbol info + const searchParams = new URLSearchParams({ + q: qmSearchCode || symbol, + qmodTool: 'SymbolInfo', + webmasterId: '500', + includeExtended: 'true' + }); + + const apiUrl = `${QM_CONFIG.LOOKUP_URL}?${searchParams.toString()}`; + + const response = await fetch(apiUrl, { + method: 'GET', + headers: session.headers, + proxy: session.proxy, + }); + + if (!response.ok) { + throw new Error(`QM API request failed: ${response.status} ${response.statusText}`); + } + + const symbolData = await response.json(); + + // Update session success stats + await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid); + + // Process and store symbol info + if (symbolData && (Array.isArray(symbolData) ? symbolData.length > 0 : true)) { + const symbolInfo = Array.isArray(symbolData) ? symbolData[0] : symbolData; + + // Update symbol in database with new metadata + const updateData = { + ...symbolInfo, + symbol: symbol, + qmSearchCode: qmSearchCode || symbolInfo.symbol, + lastInfoUpdate: new Date(), + updated_at: new Date() + }; + + await this.mongodb.updateOne( + 'qmSymbols', + { symbol }, + { $set: updateData }, + { upsert: true } + ); + + this.logger.info('Symbol info updated successfully', { + symbol, + name: symbolInfo.name, + exchange: symbolInfo.exchange + }); + + return { + success: true, + symbol, + message: `Symbol info updated for ${symbol}`, + data: symbolInfo + }; + } else { + this.logger.warn('No symbol data returned from API', { symbol }); + return { + success: false, + symbol, + message: `No data found for symbol ${symbol}` + }; + } + + } catch (error) { + // Update session failure stats + if (session.uuid) { + await sessionManager.incrementFailedCalls(sessionId, session.uuid); + } + + this.logger.error('Error fetching symbol info', { + symbol, + error: error instanceof Error ? error.message : 'Unknown error' + }); + + return { + success: false, + symbol, + message: `Failed to fetch symbol info: ${error instanceof Error ? error.message : 'Unknown error'}` + }; + } +} + +/** + * Schedule symbol info updates for symbols that need refreshing + * This is the scheduled job that finds stale symbols and queues individual updates + */ +export async function scheduleSymbolInfoUpdates( + this: BaseHandler, + input: { + limit?: number; + forceUpdate?: boolean; + } = {}, + _context?: ExecutionContext +): Promise<{ + message: string; + symbolsQueued: number; + errors: number; +}> { + const { limit = 100, forceUpdate = false } = input; + const tracker = await getOperationTracker(this); + + this.logger.info('Scheduling symbol info updates', { limit, forceUpdate }); + + try { + // Get symbols that need updating + const staleSymbols = await tracker.getStaleSymbols('symbol_info', { + minHoursSinceRun: forceUpdate ? 0 : 24 * 7, // Weekly by default + limit + }); + + if (staleSymbols.length === 0) { + this.logger.info('No symbols need info updates'); + return { + message: 'No symbols need info updates', + symbolsQueued: 0, + errors: 0 + }; + } + + this.logger.info(`Found ${staleSymbols.length} symbols needing info updates`); + + // Get full symbol data to include qmSearchCode + const symbolDocs = await this.mongodb.find('qmSymbols', { + symbol: { $in: staleSymbols } + }, { + projection: { symbol: 1, qmSearchCode: 1 } + }); + + let queued = 0; + let errors = 0; + + // Schedule individual update jobs for each symbol + for (const doc of symbolDocs) { + try { + await this.scheduleOperation('update-symbol-info', { + symbol: doc.symbol, + qmSearchCode: doc.qmSearchCode || doc.symbol + }, { + priority: 3, + // Add some delay to avoid overwhelming the API + delay: queued * 1000 // 1 second between jobs + }); + + // Track that we've scheduled this symbol + await tracker.updateSymbolOperation(doc.symbol, 'symbol_info', { + status: 'success' + }); + + queued++; + } catch (error) { + this.logger.error(`Failed to schedule update for ${doc.symbol}`, { error }); + errors++; + } + } + + this.logger.info('Symbol info update scheduling completed', { + symbolsQueued: queued, + errors, + total: staleSymbols.length + }); + + return { + message: `Scheduled info updates for ${queued} symbols`, + symbolsQueued: queued, + errors + }; + } catch (error) { + this.logger.error('Symbol info scheduling failed', { error }); + throw error; + } +} \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts b/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts index c8a06ac..2bae5ee 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts @@ -4,15 +4,35 @@ import { Operation, ScheduledOperation, } from '@stock-bot/handlers'; -import { checkSessions, createSession, searchSymbols, spiderSymbol } from './actions'; -import { getOperationStats } from './actions/price.action'; +import { + checkSessions, + createSession, + searchSymbols, + spiderSymbol, + getOperationStats, + updateSymbolInfo, + scheduleSymbolInfoUpdates, + updateFinancials, + scheduleFinancialsUpdates, + updateCorporateActions, + scheduleCorporateActionsUpdates, + updateFilings, + scheduleFilingsUpdates, + updatePrices, + schedulePriceUpdates, + updateIntradayBars, + scheduleIntradayUpdates +} from './actions'; import { initializeQMOperations } from './shared/operation-registry'; @Handler('qm') export class QMHandler extends BaseHandler { constructor(services: any) { - initializeQMOperations(services.mongodb, services.logger); super(services); // Handler name read from @Handler decorator + // Initialize operations after super() so services are available + initializeQMOperations(this.mongodb, this.logger).catch(error => { + this.logger.error('Failed to initialize QM operations', { error }); + }); } /** @@ -41,22 +61,83 @@ export class QMHandler extends BaseHandler { @Operation('search-symbols') searchSymbols = searchSymbols; - // /** - // * PRICE DATA - // */ - // @ScheduledOperation('update-prices', '0 */6 * * *', { - // priority: 5, - // immediately: false, - // description: 'Update daily prices every 6 hours' - // }) - // updatePrices = updatePrices; + /** + * SYMBOL INFO + */ + @Operation('update-symbol-info') + updateSymbolInfo = updateSymbolInfo; - // @ScheduledOperation('update-intraday-bars', '*/30 * * * *', { - // priority: 6, - // immediately: false, - // description: 'Update intraday bars every 30 minutes during market hours' - // }) - // updateIntradayBars = updateIntradayBars; + @ScheduledOperation('schedule-symbol-info-updates', '0 */6 * * *', { + priority: 7, + immediately: false, + description: 'Check for symbols needing info updates every 6 hours' + }) + scheduleSymbolInfoUpdates = scheduleSymbolInfoUpdates; + + /** + * FINANCIALS + */ + @Operation('update-financials') + updateFinancials = updateFinancials; + + @ScheduledOperation('schedule-financials-updates', '0 2 * * *', { + priority: 6, + immediately: false, + description: 'Check for symbols needing financials updates daily at 2 AM' + }) + scheduleFinancialsUpdates = scheduleFinancialsUpdates; + + /** + * CORPORATE ACTIONS (Dividends, Splits, Earnings) + */ + @Operation('update-corporate-actions') + updateCorporateActions = updateCorporateActions; + + @ScheduledOperation('schedule-corporate-actions-updates', '0 3 * * *', { + priority: 6, + immediately: false, + description: 'Check for symbols needing corporate actions updates daily at 3 AM' + }) + scheduleCorporateActionsUpdates = scheduleCorporateActionsUpdates; + + /** + * FILINGS + */ + @Operation('update-filings') + updateFilings = updateFilings; + + @ScheduledOperation('schedule-filings-updates', '0 */8 * * *', { + priority: 5, + immediately: false, + description: 'Check for symbols needing filings updates every 8 hours' + }) + scheduleFilingsUpdates = scheduleFilingsUpdates; + + /** + * PRICE DATA + */ + @Operation('update-prices') + updatePrices = updatePrices; + + @ScheduledOperation('schedule-price-updates', '0 */6 * * *', { + priority: 8, + immediately: false, + description: 'Check for symbols needing price updates every 6 hours' + }) + schedulePriceUpdates = schedulePriceUpdates; + + /** + * INTRADAY DATA + */ + @Operation('update-intraday-bars') + updateIntradayBars = updateIntradayBars; + + @ScheduledOperation('schedule-intraday-updates', '*/30 * * * *', { + priority: 9, + immediately: false, + description: 'Check for symbols needing intraday updates every 30 minutes' + }) + scheduleIntradayUpdates = scheduleIntradayUpdates; /** * MONITORING diff --git a/apps/stock/data-ingestion/src/handlers/qm/shared/operation-registry.ts b/apps/stock/data-ingestion/src/handlers/qm/shared/operation-registry.ts index 7853918..9fcbdc7 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/shared/operation-registry.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/shared/operation-registry.ts @@ -2,8 +2,7 @@ * QM Operation Registry - Define and register all QM operations */ -import type { MongoDBClient } from '@stock-bot/mongodb'; -import type { Logger } from '@stock-bot/types'; +import type { MongoDBClient, Logger } from '@stock-bot/types'; import { QMOperationTracker } from './operation-tracker'; import type { QMOperationConfig } from './types'; @@ -16,53 +15,42 @@ export const QM_OPERATIONS: QMOperationConfig[] = [ description: 'Update symbol metadata', defaultStaleHours: 24 * 7 // Weekly }, - // { - // name: 'price_update', - // type: 'standard', - // description: 'Update daily price data', - // defaultStaleHours: 24 - // }, - // { - // name: 'intraday_bars', - // type: 'intraday_crawl', - // description: 'Crawl intraday price bars from today backwards', - // requiresFinishedFlag: true, - // defaultStaleHours: 1 // Check every hour for new data - // }, + { + name: 'price_update', + type: 'standard', + description: 'Update daily price data', + defaultStaleHours: 24 + }, + { + name: 'intraday_bars', + type: 'intraday_crawl', + description: 'Crawl intraday price bars from today backwards', + requiresFinishedFlag: true, + defaultStaleHours: 1 // Check every hour for new data + }, - // // Fundamental data operations - // { - // name: 'financials_update', - // type: 'standard', - // description: 'Update financial statements', - // defaultStaleHours: 24 * 7 // Weekly - // }, - // { - // name: 'earnings_update',- - // type: 'standard', - // description: 'Update earnings data', - // defaultStaleHours: 24 * 7 // Weekly - // }, - // { - // name: 'dividends_update', - // type: 'standard', - // description: 'Update dividend history', - // defaultStaleHours: 24 * 7 // Weekly - // }, - // { - // name: 'splits_update', - // type: 'standard', - // description: 'Update stock split history', - // defaultStaleHours: 24 * 30 // Monthly - // }, + // Fundamental data operations + { + name: 'financials_update', + type: 'standard', + description: 'Update financial statements', + defaultStaleHours: 24 * 7 // Weekly + }, + // Corporate actions - fetched together in one API call + { + name: 'corporate_actions_update', + type: 'standard', + description: 'Update corporate actions (earnings, dividends, splits)', + defaultStaleHours: 24 * 7 // Weekly + }, - // // News and filings - // { - // name: 'filings_update', - // type: 'standard', - // description: 'Update SEC filings', - // defaultStaleHours: 24 // Daily - // }, + // News and filings + { + name: 'filings_update', + type: 'standard', + description: 'Update SEC filings', + defaultStaleHours: 24 // Daily + }, // { // name: 'news_update', // type: 'standard', diff --git a/apps/stock/data-ingestion/src/handlers/qm/shared/operation-tracker.ts b/apps/stock/data-ingestion/src/handlers/qm/shared/operation-tracker.ts index fe2dcbf..fd0ee2e 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/shared/operation-tracker.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/shared/operation-tracker.ts @@ -3,8 +3,7 @@ * Supports dynamic operation registration with auto-indexing */ -import type { MongoDBClient } from '@stock-bot/mongodb'; -import type { Logger } from '@stock-bot/types'; +import type { MongoDBClient, Logger } from '@stock-bot/types'; import type { IntradayCrawlSymbol, QMOperationConfig } from './types'; export class QMOperationTracker { @@ -57,7 +56,8 @@ export class QMOperationTracker { } for (const indexSpec of indexes) { - await this.mongodb.createIndex(this.collectionName, indexSpec, { + const collection = this.mongodb.collection(this.collectionName); + await collection.createIndex(indexSpec, { background: true, name: `op_${operationName}_${Object.keys(indexSpec).join('_')}` }); @@ -174,7 +174,7 @@ export class QMOperationTracker { }; }); - const collection = this.mongodb.getCollection(this.collectionName); + const collection = this.mongodb.collection(this.collectionName); const result = await collection.bulkWrite(bulkOps as any, { ordered: false }); this.logger.debug('Bulk updated symbol operations', { @@ -335,17 +335,18 @@ export class QMOperationTracker { finishedCrawls?: number; avgRecordsPerSymbol?: number; }> { - const total = await this.mongodb.countDocuments(this.collectionName); + const collection = this.mongodb.collection(this.collectionName); + const total = await collection.countDocuments({}); - const processed = await this.mongodb.countDocuments(this.collectionName, { + const processed = await collection.countDocuments({ [`operations.${operationName}`]: { $exists: true } }); - const successful = await this.mongodb.countDocuments(this.collectionName, { + const successful = await collection.countDocuments({ [`operations.${operationName}.status`]: 'success' }); - const failed = await this.mongodb.countDocuments(this.collectionName, { + const failed = await collection.countDocuments({ [`operations.${operationName}.status`]: 'failure' }); @@ -354,7 +355,7 @@ export class QMOperationTracker { this.registeredOperations.get(operationName)?.defaultStaleHours || 24 )); - const stale = await this.mongodb.countDocuments(this.collectionName, { + const stale = await collection.countDocuments({ $or: [ { [`operations.${operationName}.lastRunAt`]: { $lt: staleDate } }, { [`operations.${operationName}`]: { $exists: false } } @@ -371,13 +372,13 @@ export class QMOperationTracker { // Additional stats for crawl operations if (this.registeredOperations.get(operationName)?.type === 'intraday_crawl') { - result.finishedCrawls = await this.mongodb.countDocuments(this.collectionName, { + result.finishedCrawls = await collection.countDocuments({ [`operations.${operationName}.crawlState.finished`]: true }); } // Calculate average records per symbol - const aggregation = await this.mongodb.aggregate(this.collectionName, [ + const aggregation = await collection.aggregate([ { $match: { [`operations.${operationName}.recordCount`]: { $exists: true } @@ -389,7 +390,7 @@ export class QMOperationTracker { avgRecords: { $avg: `$operations.${operationName}.recordCount` } } } - ]); + ]).toArray(); if (aggregation.length > 0) { result.avgRecordsPerSymbol = Math.round(aggregation[0].avgRecords);