From 5ca8fafe7e77872c41f7aab7b7d25037a0f0641a Mon Sep 17 00:00:00 2001 From: Boki Date: Sun, 6 Jul 2025 23:42:43 -0400 Subject: [PATCH] work on eod --- .../handlers/eod/actions/corporate-actions.ts | 24 +++- .../src/handlers/eod/actions/exchanges.ts | 18 ++- .../src/handlers/eod/actions/fundamentals.ts | 135 +++++++++++++++++- .../src/handlers/eod/actions/index.ts | 2 +- .../src/handlers/eod/actions/intraday.ts | 33 +++-- .../src/handlers/eod/actions/prices.ts | 37 +++-- .../src/handlers/eod/actions/symbol-change.ts | 18 ++- .../src/handlers/eod/actions/symbols.ts | 22 ++- .../src/handlers/eod/eod.handler.ts | 29 ++-- .../src/handlers/eod/shared/config.ts | 2 +- 10 files changed, 271 insertions(+), 49 deletions(-) diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/corporate-actions.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/corporate-actions.ts index 32b99c6..0390c27 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/actions/corporate-actions.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/corporate-actions.ts @@ -38,7 +38,15 @@ export async function scheduleFetchCorporateActions( return { success: true, jobsScheduled: 0 }; } - logger.info(`Found ${symbols.length} symbols needing corporate actions update`); + logger.info(`Found ${symbols.length} symbols needing corporate actions update`, { + count: symbols.length, + samples: symbols.slice(0, 5).map(s => ({ + symbol: s.Code, + exchange: s.Exchange, + name: s.Name, + lastUpdate: s.lastCorporateActionsUpdate + })) + }); let jobsScheduled = 0; @@ -97,7 +105,7 @@ export async function fetchCorporateActions( const { symbol, exchange, actionType } = input; try { - logger.info('Fetching corporate actions', { symbol, exchange, actionType }); + logger.info(`Fetching ${actionType} for ${symbol}.${exchange}`); // Get API key const apiKey = EOD_CONFIG.API_TOKEN; @@ -127,6 +135,18 @@ export async function fetchCorporateActions( logger.info(`Fetched ${data.length} ${actionType} records for ${symbol}.${exchange}`); + // Log sample records if any + if (data.length > 0) { + logger.debug(`Sample ${actionType} for ${symbol}.${exchange}:`, { + count: data.length, + samples: data.slice(0, 3).map(record => ({ + date: record.date, + value: record.value || record.split || record.amount, + ...record + })) + }); + } + if (data.length === 0) { // Update symbol to indicate we checked but found no data await this.mongodb.collection('eodSymbols').updateOne( diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/exchanges.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/exchanges.ts index 3c31191..370b79c 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/actions/exchanges.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/exchanges.ts @@ -34,10 +34,26 @@ export async function fetchExchanges( throw new Error('Invalid response format from EOD API - expected array'); } logger.info(`Fetched ${exchanges.length} exchanges from EOD`); + + // Log some example exchanges + if (exchanges.length > 0) { + logger.debug('Sample exchanges:', { + count: exchanges.length, + samples: exchanges.slice(0, 5).map(e => ({ + code: e.Code, + name: e.Name, + country: e.Country + })) + }); + } const result = await this.mongodb.batchUpsert('eodExchanges', exchanges, ['Code']); - logger.info(`Successfully saved ${result.insertedCount} exchanges to MongoDB`); + logger.info(`Successfully saved exchanges to MongoDB`, { + total: exchanges.length, + inserted: result.insertedCount, + updated: exchanges.length - result.insertedCount + }); return { success: true, diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/fundamentals.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/fundamentals.ts index 4dde9ea..e7c9c41 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/actions/fundamentals.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/fundamentals.ts @@ -6,6 +6,11 @@ interface BulkFundamentalsInput { symbols: Array<{ symbol: string; exchange: string }>; } +interface FetchSingleFundamentalsInput { + symbol: string; + exchange: string; +} + export async function scheduleFetchFundamentals( this: BaseHandler ): Promise<{ success: boolean; jobsScheduled: number }> { @@ -38,12 +43,52 @@ export async function scheduleFetchFundamentals( logger.info(`Found ${symbols.length} symbols needing fundamentals update`); + // Separate ETFs from regular stocks + const etfs = symbols.filter(s => s.Type === 'ETF'); + const nonEtfs = symbols.filter(s => s.Type !== 'ETF'); + + logger.info(`Found ${etfs.length} ETFs and ${nonEtfs.length} non-ETFs`, { + etfSamples: etfs.slice(0, 5).map(e => ({ + symbol: e.Code, + exchange: e.Exchange, + name: e.Name + })), + nonEtfSamples: nonEtfs.slice(0, 5).map(s => ({ + symbol: s.Code, + exchange: s.Exchange, + name: s.Name, + type: s.Type + })) + }); + let jobsScheduled = 0; + + // Schedule individual jobs for ETFs + for (let i = 0; i < etfs.length; i++) { + const etf = etfs[i]; + await this.scheduleOperation('fetch-single-fundamentals', { + symbol: etf.Code, + exchange: etf.Exchange + }, { + attempts: 3, + backoff: { + type: 'exponential', + delay: 10000 + }, + delay: jobsScheduled * 1000 // Stagger by 1 second + }); + jobsScheduled++; + } + + if (etfs.length > 0) { + logger.info(`Scheduled ${etfs.length} individual ETF fundamentals jobs`); + } + + // Create batches of 500 symbols for non-ETFs const batchSize = 500; // EOD allows up to 500 symbols per bulk request - // Create batches of 500 symbols - for (let i = 0; i < symbols.length; i += batchSize) { - const batch = symbols.slice(i, i + batchSize); + for (let i = 0; i < nonEtfs.length; i += batchSize) { + const batch = nonEtfs.slice(i, i + batchSize); // Convert to array of {symbol, exchange} objects const symbolBatch = batch.map(s => ({ @@ -63,7 +108,7 @@ export async function scheduleFetchFundamentals( }); jobsScheduled++; - logger.info(`Scheduled fundamentals batch ${jobsScheduled} with ${symbolBatch.length} symbols`); + logger.info(`Scheduled fundamentals batch with ${symbolBatch.length} non-ETF symbols`); } logger.info(`Successfully scheduled ${jobsScheduled} fundamentals fetch jobs`); @@ -113,7 +158,11 @@ export async function fetchBulkFundamentals( url.searchParams.append('fmt', 'json'); url.searchParams.append('symbols', symbolList.join(',')); - logger.info(`Fetching fundamentals for ${symbolList.length} ${exchange} symbols`); + logger.info(`Fetching bulk fundamentals for ${symbolList.length} ${exchange} symbols`, { + exchange, + count: symbolList.length, + samples: symbolList.slice(0, 5) + }); // Fetch data const response = await fetch(url.toString()); @@ -194,4 +243,80 @@ export async function fetchBulkFundamentals( logger.error('Failed to fetch bulk fundamentals', { error }); throw error; } +} + +export async function fetchSingleFundamentals( + this: BaseHandler, + input: FetchSingleFundamentalsInput +): Promise<{ success: boolean; saved: boolean }> { + const logger = this.logger; + const { symbol, exchange } = input; + + try { + logger.info(`Fetching single fundamentals for ${symbol}.${exchange}`); + + // Get API key + const apiKey = EOD_CONFIG.API_TOKEN; + if (!apiKey) { + throw new Error('EOD API key not configured'); + } + + // Build URL for single fundamentals endpoint + const url = new URL(`https://eodhd.com/api/fundamentals/${symbol}.${exchange}`); + url.searchParams.append('api_token', apiKey); + url.searchParams.append('fmt', 'json'); + + // Fetch data + const response = await fetch(url.toString()); + + if (!response.ok) { + throw new Error(`EOD Single Fundamentals API returned ${response.status}: ${response.statusText}`); + } + + const fundamentals = await response.json(); + + // Check if we got valid data + if (!fundamentals || typeof fundamentals !== 'object') { + logger.warn(`No fundamentals data returned for ${symbol}.${exchange}`); + return { success: true, saved: false }; + } + + // Add metadata + const fundamentalsWithMetadata = { + symbol, + exchange, + symbolExchange: `${symbol}.${exchange}`, + ...fundamentals, + updatedAt: new Date(), + source: 'eod' + }; + + // Save to MongoDB + const result = await this.mongodb.batchUpsert( + 'eodFundamentals', + [fundamentalsWithMetadata], + ['symbolExchange'] + ); + + // Update symbol with last update timestamp + await this.mongodb.collection('eodSymbols').updateOne( + { Code: symbol, Exchange: exchange }, + { + $set: { + lastFundamentalsUpdate: new Date(), + hasFundamentals: true + } + } + ); + + logger.info(`Successfully saved fundamentals for ${symbol}.${exchange}`); + + return { + success: true, + saved: result.insertedCount > 0 + }; + } catch (error) { + logger.error('Failed to fetch single fundamentals', { error, symbol, exchange }); + throw error; + } } \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/index.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/index.ts index 36c8538..a9e7320 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/actions/index.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/index.ts @@ -2,6 +2,6 @@ export { fetchExchanges } from './exchanges'; export { fetchSymbols, scheduleFetchSymbols } from './symbols'; export { fetchPrices, scheduleFetchPrices } from './prices'; export { fetchIntraday, crawlIntraday, scheduleIntradayCrawl } from './intraday'; -export { fetchBulkFundamentals, scheduleFetchFundamentals } from './fundamentals'; +export { fetchBulkFundamentals, fetchSingleFundamentals, scheduleFetchFundamentals } from './fundamentals'; export { fetchSymbolChangeHistory } from './symbol-change'; export { fetchCorporateActions, scheduleFetchCorporateActions } from './corporate-actions'; diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/intraday.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/intraday.ts index f056956..f14b683 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/actions/intraday.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/intraday.ts @@ -61,7 +61,15 @@ export async function scheduleIntradayCrawl( return { success: true, jobsScheduled: 0 }; } - logger.info(`Found ${symbols.length} symbols needing intraday data`); + logger.info(`Found ${symbols.length} symbols needing intraday data`, { + count: symbols.length, + samples: symbols.slice(0, 5).map(s => ({ + symbol: s.Code, + exchange: s.Exchange, + name: s.Name, + intradayState: s.intradayState + })) + }); let jobsScheduled = 0; const intervals: Array<'1m' | '5m' | '1h'> = ['1m', '5m', '1h']; @@ -111,7 +119,7 @@ export async function crawlIntraday( const { symbol, exchange, interval } = input; try { - logger.info('Starting intraday crawl', { symbol, exchange, interval }); + logger.info(`Starting intraday crawl for ${symbol}.${exchange} - ${interval}`); // Get current crawl state const symbolDoc = await this.mongodb.collection('eodSymbols').findOne({ @@ -172,11 +180,11 @@ export async function crawlIntraday( // Check if we're finished (no data returned means we've reached the end) if (result.recordsSaved === 0) { newState.finished = true; - logger.info('Intraday crawl finished - no more data', { - symbol, - exchange, - interval, - totalRecords: newState.totalRecordsProcessed + logger.info(`Intraday crawl finished for ${symbol}.${exchange} - ${interval}`, { + totalRecords: newState.totalRecordsProcessed, + oldestDate: newState.oldestDateReached, + newestDate: newState.newestDateReached, + batches: newState.totalBatchesProcessed }); } @@ -233,12 +241,9 @@ export async function fetchIntraday( const { symbol, exchange, interval, fromDate, toDate } = input; try { - logger.info('Fetching intraday data', { - symbol, - exchange, - interval, - from: fromDate?.toISOString(), - to: toDate?.toISOString() + logger.info(`Fetching intraday data for ${symbol}.${exchange} - ${interval}`, { + from: fromDate?.toISOString().split('T')[0], + to: toDate?.toISOString().split('T')[0] }); // Get API key @@ -280,7 +285,7 @@ export async function fetchIntraday( return { success: true, recordsSaved: 0 }; } - logger.info(`Fetched ${data.length} intraday records`, { symbol, exchange, interval }); + logger.info(`Fetched ${data.length} intraday records for ${symbol}.${exchange} - ${interval}`); // Add metadata to each record const recordsWithMetadata = data.map(bar => ({ diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/prices.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/prices.ts index f4d43e7..1a1c10d 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/actions/prices.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/prices.ts @@ -13,18 +13,14 @@ export async function scheduleFetchPrices( const logger = this.logger; try { - logger.info('Scheduling price fetch jobs for Canadian symbols'); + logger.info('Scheduling price fetch jobs for all symbols'); // Calculate date one week ago const oneWeekAgo = new Date(); oneWeekAgo.setDate(oneWeekAgo.getDate() - 7); - // Get Canadian exchanges (TSX, TSV, CNQ, NEO) - const canadianExchanges = ['TO', 'V', 'CN', 'NEO']; - - // Find symbols that haven't been updated in the last week + // Find ALL symbols that haven't been updated in the last week const symbols = await this.mongodb.collection('eodSymbols').find({ - Exchange: { $in: canadianExchanges }, delisted: false, $or: [ { lastPriceUpdate: { $lt: oneWeekAgo } }, @@ -33,17 +29,30 @@ export async function scheduleFetchPrices( }).toArray(); if (!symbols || symbols.length === 0) { - logger.info('No Canadian symbols need price updates'); + logger.info('No symbols need price updates'); return { success: true, jobsScheduled: 0 }; } - logger.info(`Found ${symbols.length} Canadian symbols needing price updates`); + logger.info(`Found ${symbols.length} symbols needing price updates`, { + symbols: symbols.map(s => ({ + symbol: s.Code, + exchange: s.Exchange, + name: s.Name, + lastUpdate: s.lastPriceUpdate + })) + }); let jobsScheduled = 0; // Schedule jobs with staggered delays for (let i = 0; i < symbols.length; i++) { const symbol = symbols[i]; + logger.debug(`Scheduling price fetch for ${symbol.Code}.${symbol.Exchange}`, { + name: symbol.Name, + lastUpdate: symbol.lastPriceUpdate, + delay: i * 100 + }); + await this.scheduleOperation('fetch-prices', { symbol: symbol.Code, exchange: symbol.Exchange @@ -78,7 +87,7 @@ export async function fetchPrices( const { symbol, exchange } = input; try { - logger.info('Fetching prices', { symbol, exchange }); + logger.info(`Fetching prices for ${symbol}.${exchange}`); // Get API key from config const apiKey = EOD_CONFIG.API_TOKEN; @@ -90,7 +99,6 @@ export async function fetchPrices( const url = new URL(`https://eodhd.com/api/eod/${symbol}.${exchange}`); url.searchParams.append('api_token', apiKey); url.searchParams.append('fmt', 'json'); - // Fetch price data from EOD API const response = await fetch(url.toString()); @@ -107,6 +115,15 @@ export async function fetchPrices( logger.info(`Fetched ${priceData.length} price records for ${symbol}.${exchange}`); + // Log date range of prices + if (priceData.length > 0) { + logger.debug(`Price data range for ${symbol}.${exchange}:`, { + oldest: priceData[0].date, + newest: priceData[priceData.length - 1].date, + count: priceData.length + }); + } + // Add metadata to each price record const pricesWithMetadata = priceData.map(price => ({ symbol, diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/symbol-change.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/symbol-change.ts index 07a15ed..95bba36 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/actions/symbol-change.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/symbol-change.ts @@ -38,9 +38,22 @@ export async function fetchSymbolChangeHistory( logger.info(`Fetched ${data.length} symbol change records`); if (data.length === 0) { + logger.info('No symbol changes found'); return { success: true, changesCount: 0 }; } + // Log some sample changes + logger.debug('Sample symbol changes:', { + count: data.length, + samples: data.slice(0, 5).map(c => ({ + oldSymbol: c.oldSymbol, + newSymbol: c.newSymbol, + exchange: c.exchange, + date: c.date, + note: c.note + })) + }); + // Add metadata to each record const changesWithMetadata = data.map(change => ({ ...change, @@ -73,7 +86,10 @@ export async function fetchSymbolChangeHistory( if (updateResult.modifiedCount > 0) { updatedSymbols++; - logger.debug(`Updated symbol ${change.oldSymbol} to ${change.newSymbol} on ${change.exchange}`); + logger.info(`Updated symbol ${change.oldSymbol} to ${change.newSymbol} on ${change.exchange}`, { + date: change.date, + note: change.note + }); } } } diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/symbols.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/symbols.ts index 8080c3c..2d64d08 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/actions/symbols.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/symbols.ts @@ -14,17 +14,19 @@ export async function scheduleFetchSymbols( const logger = this.logger; try { - logger.info('Scheduling symbol fetch jobs for all exchanges'); + logger.info('Starting symbol fetch job scheduling'); // Get all exchanges from MongoDB const exchanges = await this.mongodb.collection('eodExchanges').find({}).toArray(); if (!exchanges || exchanges.length === 0) { - logger.warn('No exchanges found in database'); + logger.warn('No exchanges found in database - run fetch-exchanges first'); return { success: true, jobsCreated: 0 }; } - logger.info(`Found ${exchanges.length} exchanges to process`); + logger.info(`Found ${exchanges.length} exchanges to process`, { + exchanges: exchanges.map(e => ({ code: e.Code, name: e.Name, country: e.Country })) + }); let jobsCreated = 0; @@ -79,7 +81,7 @@ export async function fetchSymbols( const { exchangeCode, delisted } = input; try { - logger.info('Fetching symbols for exchange', { exchangeCode, delisted }); + logger.info(`Fetching ${delisted ? 'delisted' : 'active'} symbols for ${exchangeCode}`); // Get API key from config const apiKey = EOD_CONFIG.API_TOKEN; @@ -111,6 +113,18 @@ export async function fetchSymbols( logger.info(`Fetched ${symbols.length} ${delisted ? 'delisted' : 'active'} symbols for ${exchangeCode}`); + // Log some sample symbols + if (symbols.length > 0) { + logger.debug(`Sample ${delisted ? 'delisted' : 'active'} symbols for ${exchangeCode}:`, { + count: symbols.length, + samples: symbols.slice(0, 5).map(s => ({ + code: s.Code, + name: s.Name, + type: s.Type + })) + }); + } + // Add metadata to each symbol const symbolsWithMetadata = symbols.map(symbol => ({ ...symbol, diff --git a/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts b/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts index 774abb6..b481efd 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts @@ -16,6 +16,7 @@ import { crawlIntraday, scheduleIntradayCrawl, fetchBulkFundamentals, + fetchSingleFundamentals, scheduleFetchFundamentals, fetchSymbolChangeHistory, fetchCorporateActions, @@ -63,11 +64,11 @@ export class EodHandler extends BaseHandler { * Called by schedule-fetch-symbols for each exchange */ @Operation('fetch-symbols') - @RateLimit(10) // 10 points per exchange + @RateLimit(1) // 1 point per exchange fetchSymbols = fetchSymbols; /** - * Schedule price fetching for Canadian symbols not updated in last week + * Schedule price fetching for all symbols not updated in last week * Runs daily at 2 AM */ @Operation('schedule-fetch-prices') @@ -80,7 +81,7 @@ export class EodHandler extends BaseHandler { * Called by schedule-fetch-prices for each symbol */ @Operation('fetch-prices') - @RateLimit(10) // 10 points per price fetch + @RateLimit(1) // 1 point per price fetch fetchPrices = fetchPrices; /** @@ -97,7 +98,7 @@ export class EodHandler extends BaseHandler { * Handles resumption and batch processing */ @Operation('crawl-intraday') - @RateLimit(50) // 50 points per crawl batch + @RateLimit(5) // 5 points per crawl batch crawlIntraday = crawlIntraday; /** @@ -105,7 +106,7 @@ export class EodHandler extends BaseHandler { * Called by crawl-intraday */ @Operation('fetch-intraday') - @RateLimit(50) // 50 points per intraday fetch + @RateLimit(5) // 5 points per intraday fetch fetchIntraday = fetchIntraday; /** @@ -119,27 +120,35 @@ export class EodHandler extends BaseHandler { /** * Fetch fundamentals for up to 500 symbols in bulk - * Called by schedule-fetch-fundamentals + * Called by schedule-fetch-fundamentals for non-ETFs */ @Operation('fetch-bulk-fundamentals') - @RateLimit(100) // 100 points per bulk request + @RateLimit(600) // 100 base + up to 500 symbols = 600 points max fetchBulkFundamentals = fetchBulkFundamentals; + /** + * Fetch fundamentals for a single symbol + * Called by schedule-fetch-fundamentals for ETFs + */ + @Operation('fetch-single-fundamentals') + @RateLimit(10) // 10 points per single fundamentals fetch + fetchSingleFundamentals = fetchSingleFundamentals; + /** * Fetch symbol change history * Runs weekly on Mondays at 5 AM */ @Operation('fetch-symbol-change-history') @ScheduledOperation('fetch-symbol-change-history', '0 5 * * 1') - @RateLimit(10) // 10 points per request + @RateLimit(5) // 5 points per request fetchSymbolChangeHistory = fetchSymbolChangeHistory; /** * Schedule corporate actions (dividends and splits) fetch - * Runs monthly on the 1st at 6 AM + * DISABLED - Corporate actions likely come with fundamentals */ @Operation('schedule-fetch-corporate-actions') - @ScheduledOperation('schedule-fetch-corporate-actions', '0 6 1 * *') + // @ScheduledOperation('schedule-fetch-corporate-actions', '0 6 1 * *') // Disabled - would run monthly on the 1st at 6 AM @RateLimit(1) // 1 point for scheduling scheduleFetchCorporateActions = scheduleFetchCorporateActions; diff --git a/apps/stock/data-ingestion/src/handlers/eod/shared/config.ts b/apps/stock/data-ingestion/src/handlers/eod/shared/config.ts index 27fac75..a9644bf 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/shared/config.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/shared/config.ts @@ -1,5 +1,5 @@ export const EOD_CONFIG = { // API configuration API_BASE_URL: 'https://eodhd.com/api/', - API_TOKEN: '657fe003583a32.85708911"', + API_TOKEN: '657fe003583a32.85708911', }; \ No newline at end of file