diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/financials.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/financials.action.ts index a9b8f1c..c3465f6 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/financials.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/financials.action.ts @@ -32,15 +32,16 @@ export async function updateFinancials( exchange: number; qmSearchCode: string; reportType: 'Q' | 'A'; // Quarterly or Annual + lastRecordDate?: Date; // Optional, used for tracking }, _context?: ExecutionContext ): Promise<{ success: boolean; - symbol: string; + qmSearchCode: string; message: string; data?: any; }> { - const { symbol, exchange, qmSearchCode, reportType } = input; + const { symbol, exchange, qmSearchCode, reportType, lastRecordDate } = input; this.logger.info('Fetching financials', { symbol, exchange, qmSearchCode }); @@ -61,7 +62,7 @@ export async function updateFinancials( currency: 'true', lang: 'en', latestfiscaldate: 'true', - numberOfReports: '300', + numberOfReports: lastRecordDate ? '5' : '300', pathName: '/demo/portal/company-research.php', qmodTool: 'Financials', reportType: reportType, @@ -82,23 +83,25 @@ export async function updateFinancials( } const financialData = await response.json(); - - // Update session success stats - await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid); + let reports = []; + if(Array.isArray(financialData.results.Company)){ //WEIRD BUG ON THEIR END FOR SOME REASON FOR ONE COMP ITS A ARRAY INSTEAD OF OBJECT + reports = financialData?.results?.Company[0].Report || []; + }else{ + reports = financialData?.results?.Company.Report || []; + } // Process and store financial data - if (financialData && financialData.length > 0) { + if (reports && reports.length > 0) { // Store financial statements in a separate collection await this.mongodb.batchUpsert( - 'qmFinancials', - financialData.map((statement: any) => ({ + 'qmFinancials-new', + reports.map((statement: any) => ({ ...statement, symbol, exchange, - qmSearchCode, - updated_at: new Date() + qmSearchCode })), - ['symbol', 'period', 'statementType'] // Unique keys + ['qmSearchCode', 'reportPeriod', 'reportDate'] // Unique keys ); // Update symbol to track last financials update @@ -106,26 +109,26 @@ export async function updateFinancials( await tracker.updateSymbolOperation(qmSearchCode, 'financials_update', { status: 'success', lastRecordDate: new Date(), - recordCount: financialData.length + recordCount: reports.length }); - this.logger.info('Financials updated successfully', { - symbol, - statementCount: financialData.length + this.logger.info(`Financials updated successfully ${reportType} - ${qmSearchCode} (${reports.length})`, { + qmSearchCode, + statementCount: reports.length }); return { success: true, - symbol, - message: `Financials updated for ${symbol}`, - data: { count: financialData.length } + qmSearchCode, + message: `Financials updated for ${qmSearchCode} - ${reportType} - ${reports.length}`, + data: { count: reports.length } }; } else { - this.logger.warn('No financial data returned from API', { symbol }); + this.logger.warn('No financial data returned from API', { qmSearchCode }); return { success: false, - symbol, - message: `No financial data found for symbol ${symbol}` + qmSearchCode, + message: `No financial data found for symbol ${qmSearchCode}` }; } @@ -148,7 +151,7 @@ export async function updateFinancials( return { success: false, - symbol, + qmSearchCode, message: `Failed to fetch financials: ${error instanceof Error ? error.message : 'Unknown error'}` }; } @@ -169,13 +172,13 @@ export async function scheduleFinancialsUpdates( symbolsQueued: number; errors: number; }> { - const { limit = 100, forceUpdate = false } = input; + const { limit = 100000, forceUpdate = false } = input; const tracker = await getOperationTracker(this); this.logger.info('Scheduling financials updates', { limit, forceUpdate }); try { - // Get symbols that need updating + // Get symbols that need updating for both quarterly and annual const staleSymbolsQ = await tracker.getStaleSymbols('financials_update_quarterly', { minHoursSinceRun: forceUpdate ? 0 : 24 * 7, // Weekly by default limit @@ -194,11 +197,14 @@ export async function scheduleFinancialsUpdates( }; } - this.logger.info(`Found ${staleSymbols.length} symbols needing financials updates`); + this.logger.info(`Found ${staleSymbolsQ.length} symbols needing quarterly updates and ${staleSymbolsA.length} symbols needing annual updates`); - // Get full symbol data to include symbolId + // Combine unique symbols from both lists + const allStaleSymbols = [...new Set([...staleSymbolsQ, ...staleSymbolsA])]; + + // Get full symbol data const symbolDocs = await this.mongodb.find('qmSymbols', { - qmSearchCode: { $in: staleSymbols } + qmSearchCode: { $in: allStaleSymbols } }, { projection: { symbol: 1, exchange: 1, qmSearchCode: 1 } }); @@ -206,33 +212,60 @@ export async function scheduleFinancialsUpdates( let queued = 0; let errors = 0; - // Schedule individual update jobs for each symbol + // Schedule individual update jobs for each symbol and report type for (const doc of symbolDocs) { - try { - await this.scheduleOperation('update-financials', { - symbol: doc.symbol, - exchange: doc.exchange, - qmSearchCode: doc.qmSearchCode, - }, { - priority: 4, - delay: queued * 1000 // 2 seconds between jobs - }); - - queued++; - } catch (error) { - this.logger.error(`Failed to schedule financials update for ${doc.qmSearchCode}`, { error }); - errors++; + // Check if this symbol needs quarterly updates + if (staleSymbolsQ.includes(doc.qmSearchCode)) { + try { + await this.scheduleOperation('update-financials', { + symbol: doc.symbol, + exchange: doc.exchange, + qmSearchCode: doc.qmSearchCode, + reportType: 'Q', + lastRecordDate: doc.operations?.price_update?.lastRecordDate, + }, { + priority: 4, + delay: queued * 200 // 1 second between jobs + }); + + queued++; + } catch (error) { + this.logger.error(`Failed to schedule quarterly financials update for ${doc.qmSearchCode}`, { error }); + errors++; + } + } + + // Check if this symbol needs annual updates + if (staleSymbolsA.includes(doc.qmSearchCode)) { + try { + await this.scheduleOperation('update-financials', { + symbol: doc.symbol, + exchange: doc.exchange, + qmSearchCode: doc.qmSearchCode, + reportType: 'A', + lastRecordDate: doc.operations?.price_update?.lastRecordDate, + }, { + priority: 4, + delay: queued * 200 // 1 second between jobs + }); + + queued++; + } catch (error) { + this.logger.error(`Failed to schedule annual financials update for ${doc.qmSearchCode}`, { error }); + errors++; + } } } this.logger.info('Financials update scheduling completed', { symbolsQueued: queued, errors, - total: staleSymbols.length + totalQuarterly: staleSymbolsQ.length, + totalAnnual: staleSymbolsA.length }); return { - message: `Scheduled financials updates for ${queued} symbols`, + message: `Scheduled ${queued} financials updates (${staleSymbolsQ.length} quarterly, ${staleSymbolsA.length} annual)`, symbolsQueued: queued, errors }; diff --git a/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts b/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts index 70202df..a216bf0 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts @@ -105,7 +105,7 @@ export class QMHandler extends BaseHandler { @Operation('update-financials') updateFinancials = updateFinancials; - @Disabled() + // @Disabled() @ScheduledOperation('schedule-financials-updates', '0 2 * * *', { priority: 6, immediately: false, diff --git a/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts b/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts index 4733e6a..62626c3 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts @@ -42,7 +42,6 @@ export const QM_CONFIG = { // Session management settings export const SESSION_CONFIG = { - MIN_SESSIONS: 20, MAX_SESSIONS: 100, MAX_FAILED_CALLS: 5, SESSION_TIMEOUT: 5000, // 10 seconds