diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/corporate-actions.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/events.action.ts similarity index 55% rename from apps/stock/data-ingestion/src/handlers/qm/actions/corporate-actions.action.ts rename to apps/stock/data-ingestion/src/handlers/qm/actions/events.action.ts index d4473cc..a0584d4 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/corporate-actions.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/events.action.ts @@ -1,5 +1,5 @@ /** - * QM Corporate Actions - Fetch and update dividends, splits, and earnings together + * QM Events Actions - Fetch and update dividends, splits, and earnings together */ import type { ExecutionContext } from '@stock-bot/handlers'; @@ -23,14 +23,14 @@ async function getOperationTracker(handler: QMHandler): Promise { - const { symbol, symbolId, qmSearchCode } = input; + const { symbol, exchange, qmSearchCode } = input; - this.logger.info('Fetching corporate actions', { symbol, symbolId }); + this.logger.info(`Fetching events ${qmSearchCode}`, { symbol, exchange, qmSearchCode }); const sessionManager = QMSessionManager.getInstance(); await sessionManager.initialize(this.cache, this.logger); // Get a session - you'll need to add the appropriate session ID - const sessionId = QM_SESSION_IDS.LOOKUP; // TODO: Update with correct session ID + const sessionId = QM_SESSION_IDS.PRICES; // TODO: Update with correct session ID const session = await sessionManager.getSession(sessionId); if (!session || !session.uuid) { - throw new Error(`No active session found for QM corporate actions`); + throw new Error(`No active session found for QM events ${qmSearchCode}`); } try { - // Build API request for corporate actions + // Build API request for events const searchParams = new URLSearchParams({ - symbol: symbol, - symbolId: symbolId.toString(), - qmodTool: 'CorporateActions', // Assuming this returns all three - webmasterId: '500' + earnings: 'true', + splits: 'true', + dividends: 'true', + symbol: qmSearchCode, }); - - // TODO: Update with correct corporate actions endpoint - const apiUrl = `${QM_CONFIG.BASE_URL}/datatool/corporate-actions.json?${searchParams.toString()}`; + + + // TODO: Update with correct events endpoint + const apiUrl = `${QM_CONFIG.EVENTS_URL}?${searchParams.toString()}`; const response = await fetch(apiUrl, { method: 'GET', headers: session.headers, proxy: session.proxy, }); + const tracker = await getOperationTracker(this); if (!response.ok) { throw new Error(`QM API request failed: ${response.status} ${response.statusText}`); } const corporateData = await response.json(); + const results = corporateData.results; + if (typeof results.error === 'object') { + await tracker.updateSymbolOperation(qmSearchCode, 'events_update', { + status: 'success', + }); + throw new Error(`Invalid response structure from QM API for ${qmSearchCode}`); + } // Update session success stats await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid); - const tracker = await getOperationTracker(this); + let dividendCount = 0; let splitCount = 0; let earningsCount = 0; // Process dividends - if (corporateData.dividends && corporateData.dividends.length > 0) { + if (results.dividends[0].dividend && results.dividends[0].dividend.length > 0 && Array.isArray(results.dividends[0].dividend)) { await this.mongodb.batchUpsert( 'qmDividends', - corporateData.dividends.map((dividend: any) => ({ + results.dividends[0].dividend.map((dividend: any) => ({ ...dividend, symbol, - symbolId, - updated_at: new Date() + exchange, + qmSearchCode, + reportDate: new Date(dividend.date), })), - ['symbol', 'exDate', 'recordDate'] + ['qmSearchCode','reportDate'] ); - dividendCount = corporateData.dividends.length; + dividendCount = results.dividends[0].dividend.length; } // Process splits - if (corporateData.splits && corporateData.splits.length > 0) { + if (results.splits[0].split && results.splits[0].split.length > 0 && Array.isArray(results.splits[0].split)) { await this.mongodb.batchUpsert( 'qmSplits', - corporateData.splits.map((split: any) => ({ + results.splits[0].split.map((split: any) => ({ ...split, symbol, - symbolId, - updated_at: new Date() + exchange, + qmSearchCode, + reportDate: new Date(split.date), })), - ['symbol', 'splitDate', 'ratio'] + ['qmSearchCode','reportDate'] ); - splitCount = corporateData.splits.length; + splitCount = results.splits[0].split.length; } // Process earnings - if (corporateData.earnings && corporateData.earnings.length > 0) { + if (results.earnings[0].earning && results.earnings[0].earning.length > 0 && Array.isArray(results.earnings[0].earning)) { await this.mongodb.batchUpsert( 'qmEarnings', - corporateData.earnings.map((earning: any) => ({ + results.earnings[0].earning.map((earning: any) => ({ ...earning, symbol, - symbolId, - updated_at: new Date() + exchange, + qmSearchCode, + reportDate: new Date(earning.reportDate[0]), })), - ['symbol', 'reportDate', 'fiscalQuarter'] + ['qmSearchCode','reportDate'] ); - earningsCount = corporateData.earnings.length; + earningsCount = results.earnings[0].earning.length; } - // Update tracking for corporate actions + // Update tracking for events const updateTime = new Date(); - await tracker.updateSymbolOperation(qmSearchCode, 'corporate_actions_update', { + await tracker.updateSymbolOperation(qmSearchCode, 'events_update', { status: 'success', lastRecordDate: updateTime, recordCount: dividendCount + splitCount + earningsCount }); - this.logger.info('Corporate actions updated successfully', { + this.logger.info(`Events updated successfully ${qmSearchCode}`, { symbol, dividendCount, splitCount, @@ -155,7 +167,7 @@ export async function updateCorporateActions( return { success: true, symbol, - message: `Corporate actions updated for ${symbol}`, + message: `Events updated for ${qmSearchCode}`, data: { dividends: dividendCount, splits: splitCount, @@ -169,30 +181,30 @@ export async function updateCorporateActions( await sessionManager.incrementFailedCalls(sessionId, session.uuid); } - this.logger.error('Error fetching corporate actions', { + this.logger.error('Error fetching events', { symbol, error: error instanceof Error ? error.message : 'Unknown error' }); - // Track failure for corporate actions + // Track failure for events const tracker = await getOperationTracker(this); - await tracker.updateSymbolOperation(qmSearchCode, 'corporate_actions_update', { + await tracker.updateSymbolOperation(qmSearchCode, 'events_update', { status: 'failure' }); return { success: false, symbol, - message: `Failed to fetch corporate actions: ${error instanceof Error ? error.message : 'Unknown error'}` + message: `Failed to fetch events: ${error instanceof Error ? error.message : 'Unknown error'}` }; } } /** - * Schedule corporate actions updates for symbols that need refreshing + * Schedule events updates for symbols that need refreshing */ -export async function scheduleCorporateActionsUpdates( +export async function scheduleEventsUpdates( this: QMHandler, input: { limit?: number; @@ -204,34 +216,34 @@ export async function scheduleCorporateActionsUpdates( symbolsQueued: number; errors: number; }> { - const { limit = 100, forceUpdate = false } = input; + const { limit = 100000, forceUpdate = false } = input; const tracker = await getOperationTracker(this); - this.logger.info('Scheduling corporate actions updates', { limit, forceUpdate }); + this.logger.info('Scheduling events updates', { limit, forceUpdate }); try { - // Get symbols that need corporate actions updates - const staleSymbols = await tracker.getStaleSymbols('corporate_actions_update', { + // Get symbols that need events updates + const staleSymbols = await tracker.getStaleSymbols('events_update', { minHoursSinceRun: forceUpdate ? 0 : 24 * 7, // Weekly limit }); if (staleSymbols.length === 0) { - this.logger.info('No symbols need corporate actions updates'); + this.logger.info('No symbols need events updates'); return { - message: 'No symbols need corporate actions updates', + message: 'No symbols need events updates', symbolsQueued: 0, errors: 0 }; } - this.logger.info(`Found ${staleSymbols.length} symbols needing corporate actions updates`); + this.logger.info(`Found ${staleSymbols.length} symbols needing events updates`); // Get full symbol data to include symbolId const symbolDocs = await this.mongodb.find('qmSymbols', { qmSearchCode: { $in: staleSymbols.slice(0, limit) } // Apply limit after deduplication }, { - projection: { symbol: 1, symbolId: 1, qmSearchCode: 1 } + projection: { symbol: 1, exchange: 1, qmSearchCode: 1 } }); let queued = 0; @@ -240,40 +252,35 @@ export async function scheduleCorporateActionsUpdates( // Schedule individual update jobs for each symbol for (const doc of symbolDocs) { try { - if (!doc.symbolId) { - this.logger.warn(`Symbol ${doc.symbol} missing symbolId, skipping`); - continue; - } - - await this.scheduleOperation('update-corporate-actions', { + await this.scheduleOperation('update-events', { symbol: doc.symbol, - symbolId: doc.symbolId, + exchange: doc.exchange, qmSearchCode: doc.qmSearchCode }, { priority: 4, - delay: queued * 1500 // 1.5 seconds between jobs + delay: queued * 0.05 // 1.5 seconds between jobs }); queued++; } catch (error) { - this.logger.error(`Failed to schedule corporate actions update for ${doc.symbol}`, { error }); + this.logger.error(`Failed to schedule events update for ${doc.symbol}`, { error }); errors++; } } - this.logger.info('Corporate actions update scheduling completed', { + this.logger.info('Events update scheduling completed', { symbolsQueued: queued, errors, total: staleSymbols.length }); return { - message: `Scheduled corporate actions updates for ${queued} symbols`, + message: `Scheduled events updates for ${queued} symbols`, symbolsQueued: queued, errors }; } catch (error) { - this.logger.error('Corporate actions scheduling failed', { error }); + this.logger.error('Events scheduling failed', { error }); throw error; } } \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/index.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/index.ts index c9eedd1..cc0f464 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/index.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/index.ts @@ -1,14 +1,14 @@ -/** - * QM Action Exports - */ - -export { scheduleCorporateActionsUpdates, updateCorporateActions } from './corporate-actions.action'; -export { scheduleFilingsUpdates, updateFilings } from './filings.action'; -export { scheduleFinancialsUpdates, updateFinancials } from './financials.action'; -export { scheduleIntradayUpdates, updateIntradayBars } from './intraday.action'; -export { schedulePriceUpdates, updatePrices } from './prices.action'; -export { checkSessions, createSession } from './session.action'; -export { scheduleSymbolInfoUpdates, updateSymbolInfo } from './symbol-info.action'; -export { searchSymbols, spiderSymbol } from './symbol.action'; -export { deduplicateSymbols, updateExchangeStats, updateExchangeStatsAndDeduplicate } from './symbol-dedup.action'; - +/** + * QM Action Exports + */ + +export { scheduleEventsUpdates, updateEvents } from './events.action'; +export { scheduleFilingsUpdates, updateFilings } from './filings.action'; +export { scheduleFinancialsUpdates, updateFinancials } from './financials.action'; +export { scheduleIntradayUpdates, updateIntradayBars } from './intraday.action'; +export { schedulePriceUpdates, updatePrices } from './prices.action'; +export { checkSessions, createSession } from './session.action'; +export { scheduleSymbolInfoUpdates, updateSymbolInfo } from './symbol-info.action'; +export { searchSymbols, spiderSymbol } from './symbol.action'; +export { deduplicateSymbols, updateExchangeStats, updateExchangeStatsAndDeduplicate } from './symbol-dedup.action'; + diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/symbol-info.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/symbol-info.action.ts index e25e61a..ecff569 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/symbol-info.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/symbol-info.action.ts @@ -163,7 +163,7 @@ export async function scheduleSymbolInfoUpdates( symbolsQueued: number; errors: number; }> { - const { limit = 1, forceUpdate = false } = input; + const { limit = 100000, forceUpdate = false } = input; const tracker = await getOperationTracker(this); this.logger.info('Scheduling symbol info updates', { limit, forceUpdate }); diff --git a/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts b/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts index ef5a123..70202df 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts @@ -10,7 +10,7 @@ import { checkSessions, createSession, deduplicateSymbols, - scheduleCorporateActionsUpdates, + scheduleEventsUpdates, scheduleFilingsUpdates, scheduleFinancialsUpdates, scheduleIntradayUpdates, @@ -18,7 +18,7 @@ import { scheduleSymbolInfoUpdates, searchSymbols, spiderSymbol, - updateCorporateActions, + updateEvents, updateExchangeStats, updateExchangeStatsAndDeduplicate, updateFilings, @@ -114,18 +114,18 @@ export class QMHandler extends BaseHandler { scheduleFinancialsUpdates = scheduleFinancialsUpdates; /** - * CORPORATE ACTIONS (Dividends, Splits, Earnings) + * EVENTS (Dividends, Splits, Earnings) */ - @Operation('update-corporate-actions') - updateCorporateActions = updateCorporateActions; + @Operation('update-events') + updateEvents = updateEvents; - @Disabled() - @ScheduledOperation('schedule-corporate-actions-updates', '0 3 * * *', { + // @Disabled() + @ScheduledOperation('schedule-events-updates', '0 3 * * *', { priority: 6, immediately: false, - description: 'Check for symbols needing corporate actions updates daily at 3 AM' + description: 'Check for symbols needing events updates daily at 3 AM' }) - scheduleCorporateActionsUpdates = scheduleCorporateActionsUpdates; + scheduleEventsUpdates = scheduleEventsUpdates; /** * FILINGS diff --git a/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts b/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts index 36b6151..73c78ca 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts @@ -38,6 +38,7 @@ export const QM_CONFIG = { LOOKUP_URL: 'https://app.quotemedia.com/datatool/lookup.json', SYMBOL_URL: 'https://app.quotemedia.com/datatool/getProfiles.json', PRICES_URL: 'https://app.quotemedia.com/datatool/getEnhancedChartData.json', + EVENTS_URL: 'https://app.quotemedia.com/datatool/getIndicatorsBySymbol.json' } as const; // Session management settings diff --git a/apps/stock/data-ingestion/src/handlers/qm/shared/operation-registry.ts b/apps/stock/data-ingestion/src/handlers/qm/shared/operation-registry.ts index 9fcbdc7..5326092 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/shared/operation-registry.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/shared/operation-registry.ts @@ -1,112 +1,104 @@ -/** - * QM Operation Registry - Define and register all QM operations - */ - -import type { MongoDBClient, Logger } from '@stock-bot/types'; -import { QMOperationTracker } from './operation-tracker'; -import type { QMOperationConfig } from './types'; - -// Define all QM operations -export const QM_OPERATIONS: QMOperationConfig[] = [ - // Price data operations - { - name: 'symbol_info', - type: 'standard', - description: 'Update symbol metadata', - defaultStaleHours: 24 * 7 // Weekly - }, - { - name: 'price_update', - type: 'standard', - description: 'Update daily price data', - defaultStaleHours: 24 - }, - { - name: 'intraday_bars', - type: 'intraday_crawl', - description: 'Crawl intraday price bars from today backwards', - requiresFinishedFlag: true, - defaultStaleHours: 1 // Check every hour for new data - }, - - // Fundamental data operations - { - name: 'financials_update', - type: 'standard', - description: 'Update financial statements', - defaultStaleHours: 24 * 7 // Weekly - }, - // Corporate actions - fetched together in one API call - { - name: 'corporate_actions_update', - type: 'standard', - description: 'Update corporate actions (earnings, dividends, splits)', - defaultStaleHours: 24 * 7 // Weekly - }, - - // News and filings - { - name: 'filings_update', - type: 'standard', - description: 'Update SEC filings', - defaultStaleHours: 24 // Daily - }, - // { - // name: 'news_update', - // type: 'standard', - // description: 'Update news articles', - // defaultStaleHours: 6 // Every 6 hours - // }, - - // // Technical indicators - // { - // name: 'indicators_update', - // type: 'standard', - // description: 'Calculate technical indicators', - // defaultStaleHours: 24 // Daily - // }, - - // // Options data - // { - // name: 'options_chain', - // type: 'standard', - // description: 'Update options chain data', - // defaultStaleHours: 1 // Hourly during market hours - // } -]; - -/** - * Initialize operation tracker with all registered operations - */ -export async function initializeQMOperations( - mongodb: MongoDBClient, - logger: Logger -): Promise { - logger.info('Initializing QM operations tracker'); - - const tracker = new QMOperationTracker(mongodb, logger); - - // Register all operations - for (const operation of QM_OPERATIONS) { - try { - await tracker.registerOperation(operation); - logger.debug(`Registered operation: ${operation.name}`); - } catch (error) { - logger.error(`Failed to register operation: ${operation.name}`, { error }); - throw error; - } - } - - logger.info('QM operations tracker initialized', { - operationCount: QM_OPERATIONS.length - }); - - return tracker; -} - -/** - * Get operation configuration by name - */ -export function getOperationConfig(name: string): QMOperationConfig | undefined { - return QM_OPERATIONS.find(op => op.name === name); +/** + * QM Operation Registry - Define and register all QM operations + */ + +import type { Logger, MongoDBClient } from '@stock-bot/types'; +import { QMOperationTracker } from './operation-tracker'; +import type { QMOperationConfig } from './types'; + +// Define all QM operations +export const QM_OPERATIONS: QMOperationConfig[] = [ + // Price data operations + { + name: 'symbol_info', + type: 'standard', + description: 'Update symbol metadata', + defaultStaleHours: 24 * 7 // Weekly + }, + { + name: 'price_update', + type: 'standard', + description: 'Update daily price data', + defaultStaleHours: 24 + }, + { + name: 'intraday_bars', + type: 'intraday_crawl', + description: 'Crawl intraday price bars from today backwards', + requiresFinishedFlag: true, + defaultStaleHours: 1 // Check every hour for new data + }, + + // Fundamental data operations + { + name: 'financials_update', + type: 'standard', + description: 'Update financial statements', + defaultStaleHours: 24 * 7 // Weekly + }, + // Corporate actions - fetched together in one API call + { + name: 'events_update', + type: 'standard', + description: 'Update events (earnings, dividends, splits)', + defaultStaleHours: 24 * 7 // Weekly + }, + + // News and filings + { + name: 'filings_update', + type: 'standard', + description: 'Update SEC filings', + defaultStaleHours: 24 // Daily + }, + // { + // name: 'news_update', + // type: 'standard', + // description: 'Update news articles', + // defaultStaleHours: 6 // Every 6 hours + // }, + + // // Options data + // { + // name: 'options_chain', + // type: 'standard', + // description: 'Update options chain data', + // defaultStaleHours: 1 // Hourly during market hours + // } +]; + +/** + * Initialize operation tracker with all registered operations + */ +export async function initializeQMOperations( + mongodb: MongoDBClient, + logger: Logger +): Promise { + logger.info('Initializing QM operations tracker'); + + const tracker = new QMOperationTracker(mongodb, logger); + + // Register all operations + for (const operation of QM_OPERATIONS) { + try { + await tracker.registerOperation(operation); + logger.debug(`Registered operation: ${operation.name}`); + } catch (error) { + logger.error(`Failed to register operation: ${operation.name}`, { error }); + throw error; + } + } + + logger.info('QM operations tracker initialized', { + operationCount: QM_OPERATIONS.length + }); + + return tracker; +} + +/** + * Get operation configuration by name + */ +export function getOperationConfig(name: string): QMOperationConfig | undefined { + return QM_OPERATIONS.find(op => op.name === name); } \ No newline at end of file