From 8f65c19d46f11b4abadd60204ff14135aec26867 Mon Sep 17 00:00:00 2001 From: Boki Date: Sun, 6 Jul 2025 21:45:12 -0400 Subject: [PATCH] added some more api's to eod --- .../handlers/eod/actions/corporate-actions.ts | 188 +++++++++++++++++ .../src/handlers/eod/actions/fundamentals.ts | 197 ++++++++++++++++++ .../src/handlers/eod/actions/index.ts | 3 + .../src/handlers/eod/actions/symbol-change.ts | 93 +++++++++ .../src/handlers/eod/eod.handler.ts | 50 ++++- 5 files changed, 530 insertions(+), 1 deletion(-) create mode 100644 apps/stock/data-ingestion/src/handlers/eod/actions/corporate-actions.ts create mode 100644 apps/stock/data-ingestion/src/handlers/eod/actions/fundamentals.ts create mode 100644 apps/stock/data-ingestion/src/handlers/eod/actions/symbol-change.ts diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/corporate-actions.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/corporate-actions.ts new file mode 100644 index 0000000..32b99c6 --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/corporate-actions.ts @@ -0,0 +1,188 @@ +import type { BaseHandler } from '@stock-bot/handlers'; +import type { DataIngestionServices } from '../../../types'; +import { EOD_CONFIG } from '../shared'; + +interface FetchCorporateActionsInput { + symbol: string; + exchange: string; + actionType: 'dividends' | 'splits'; +} + +export async function scheduleFetchCorporateActions( + this: BaseHandler +): Promise<{ success: boolean; jobsScheduled: number }> { + const logger = this.logger; + + try { + logger.info('Scheduling corporate actions fetch jobs'); + + // Calculate date one month ago for stale check + const oneMonthAgo = new Date(); + oneMonthAgo.setMonth(oneMonthAgo.getMonth() - 1); + + // Get Canadian exchanges for now + const canadianExchanges = ['TO', 'V', 'CN', 'NEO']; + + // Find symbols that need corporate actions update + const symbols = await this.mongodb.collection('eodSymbols').find({ + Exchange: { $in: canadianExchanges }, + delisted: false, + $or: [ + { lastCorporateActionsUpdate: { $lt: oneMonthAgo } }, + { lastCorporateActionsUpdate: { $exists: false } } + ] + }).limit(500).toArray(); // Limit to avoid too many jobs at once + + if (!symbols || symbols.length === 0) { + logger.info('No symbols need corporate actions update'); + return { success: true, jobsScheduled: 0 }; + } + + logger.info(`Found ${symbols.length} symbols needing corporate actions update`); + + let jobsScheduled = 0; + + // Schedule jobs for each symbol - both dividends and splits + for (let i = 0; i < symbols.length; i++) { + const symbol = symbols[i]; + + // Schedule dividends fetch + await this.scheduleOperation('fetch-corporate-actions', { + symbol: symbol.Code, + exchange: symbol.Exchange, + actionType: 'dividends' + }, { + attempts: 3, + backoff: { + type: 'exponential', + delay: 5000 + }, + delay: i * 200 // Stagger jobs by 200ms per symbol + }); + jobsScheduled++; + + // Schedule splits fetch + await this.scheduleOperation('fetch-corporate-actions', { + symbol: symbol.Code, + exchange: symbol.Exchange, + actionType: 'splits' + }, { + attempts: 3, + backoff: { + type: 'exponential', + delay: 5000 + }, + delay: (i * 200) + 100 // Offset splits by 100ms from dividends + }); + jobsScheduled++; + } + + logger.info(`Successfully scheduled ${jobsScheduled} corporate actions fetch jobs`); + + return { + success: true, + jobsScheduled + }; + } catch (error) { + logger.error('Failed to schedule corporate actions fetch jobs', { error }); + throw error; + } +} + +export async function fetchCorporateActions( + this: BaseHandler, + input: FetchCorporateActionsInput +): Promise<{ success: boolean; recordsCount: number }> { + const logger = this.logger; + const { symbol, exchange, actionType } = input; + + try { + logger.info('Fetching corporate actions', { symbol, exchange, actionType }); + + // Get API key + const apiKey = EOD_CONFIG.API_TOKEN; + if (!apiKey) { + throw new Error('EOD API key not configured'); + } + + // Build URL based on action type + const endpoint = actionType === 'dividends' ? 'div' : 'splits'; + const url = new URL(`https://eodhd.com/api/${endpoint}/${symbol}.${exchange}`); + url.searchParams.append('api_token', apiKey); + url.searchParams.append('fmt', 'json'); + + // Fetch data + const response = await fetch(url.toString()); + + if (!response.ok) { + throw new Error(`EOD ${actionType} API returned ${response.status}: ${response.statusText}`); + } + + const data = await response.json(); + + // EOD returns an array of corporate actions + if (!Array.isArray(data)) { + throw new Error('Invalid response format from EOD API - expected array'); + } + + logger.info(`Fetched ${data.length} ${actionType} records for ${symbol}.${exchange}`); + + if (data.length === 0) { + // Update symbol to indicate we checked but found no data + await this.mongodb.collection('eodSymbols').updateOne( + { Code: symbol, Exchange: exchange }, + { + $set: { + [`last${actionType.charAt(0).toUpperCase() + actionType.slice(1)}Update`]: new Date(), + [`has${actionType.charAt(0).toUpperCase() + actionType.slice(1)}`]: false + } + } + ); + + return { success: true, recordsCount: 0 }; + } + + // Add metadata to each record + const recordsWithMetadata = data.map(record => ({ + symbol, + exchange, + symbolExchange: `${symbol}.${exchange}`, + ...record, + actionType, + updatedAt: new Date(), + source: 'eod' + })); + + // Determine collection name based on action type + const collectionName = actionType === 'dividends' ? 'eodDividends' : 'eodSplits'; + + // Save to MongoDB - use date and symbol as unique identifier + const result = await this.mongodb.batchUpsert( + collectionName, + recordsWithMetadata, + ['date', 'symbolExchange'] + ); + + // Update symbol with last update timestamp + await this.mongodb.collection('eodSymbols').updateOne( + { Code: symbol, Exchange: exchange }, + { + $set: { + lastCorporateActionsUpdate: new Date(), + [`last${actionType.charAt(0).toUpperCase() + actionType.slice(1)}Update`]: new Date(), + [`has${actionType.charAt(0).toUpperCase() + actionType.slice(1)}`]: true + } + } + ); + + logger.info(`Successfully saved ${result.insertedCount} ${actionType} records for ${symbol}.${exchange}`); + + return { + success: true, + recordsCount: result.insertedCount + }; + } catch (error) { + logger.error('Failed to fetch corporate actions', { error, symbol, exchange, actionType }); + throw error; + } +} \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/fundamentals.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/fundamentals.ts new file mode 100644 index 0000000..4dde9ea --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/fundamentals.ts @@ -0,0 +1,197 @@ +import type { BaseHandler } from '@stock-bot/handlers'; +import type { DataIngestionServices } from '../../../types'; +import { EOD_CONFIG } from '../shared'; + +interface BulkFundamentalsInput { + symbols: Array<{ symbol: string; exchange: string }>; +} + +export async function scheduleFetchFundamentals( + this: BaseHandler +): Promise<{ success: boolean; jobsScheduled: number }> { + const logger = this.logger; + + try { + logger.info('Scheduling fundamentals fetch jobs'); + + // Calculate date one week ago for stale check + const oneWeekAgo = new Date(); + oneWeekAgo.setDate(oneWeekAgo.getDate() - 7); + + // Get Canadian exchanges for now + const canadianExchanges = ['TO', 'V', 'CN', 'NEO']; + + // Find symbols that need fundamentals update + const symbols = await this.mongodb.collection('eodSymbols').find({ + Exchange: { $in: canadianExchanges }, + delisted: false, + $or: [ + { lastFundamentalsUpdate: { $lt: oneWeekAgo } }, + { lastFundamentalsUpdate: { $exists: false } } + ] + }).toArray(); + + if (!symbols || symbols.length === 0) { + logger.info('No symbols need fundamentals update'); + return { success: true, jobsScheduled: 0 }; + } + + logger.info(`Found ${symbols.length} symbols needing fundamentals update`); + + let jobsScheduled = 0; + const batchSize = 500; // EOD allows up to 500 symbols per bulk request + + // Create batches of 500 symbols + for (let i = 0; i < symbols.length; i += batchSize) { + const batch = symbols.slice(i, i + batchSize); + + // Convert to array of {symbol, exchange} objects + const symbolBatch = batch.map(s => ({ + symbol: s.Code, + exchange: s.Exchange + })); + + await this.scheduleOperation('fetch-bulk-fundamentals', { + symbols: symbolBatch + }, { + attempts: 3, + backoff: { + type: 'exponential', + delay: 10000 + }, + delay: jobsScheduled * 5000 // Stagger batches by 5 seconds + }); + + jobsScheduled++; + logger.info(`Scheduled fundamentals batch ${jobsScheduled} with ${symbolBatch.length} symbols`); + } + + logger.info(`Successfully scheduled ${jobsScheduled} fundamentals fetch jobs`); + + return { + success: true, + jobsScheduled + }; + } catch (error) { + logger.error('Failed to schedule fundamentals fetch jobs', { error }); + throw error; + } +} + +export async function fetchBulkFundamentals( + this: BaseHandler, + input: BulkFundamentalsInput +): Promise<{ success: boolean; symbolsProcessed: number }> { + const logger = this.logger; + const { symbols } = input; + + try { + logger.info('Fetching bulk fundamentals', { symbolCount: symbols.length }); + + // Get API key + const apiKey = EOD_CONFIG.API_TOKEN; + if (!apiKey) { + throw new Error('EOD API key not configured'); + } + + // Group symbols by exchange for the API call + const exchangeGroups = symbols.reduce((acc, { symbol, exchange }) => { + if (!acc[exchange]) { + acc[exchange] = []; + } + acc[exchange].push(`${symbol}.${exchange}`); + return acc; + }, {} as Record); + + let totalProcessed = 0; + + // Process each exchange group + for (const [exchange, symbolList] of Object.entries(exchangeGroups)) { + // Build URL - using the exchange as the endpoint + const url = new URL(`https://eodhd.com/api/bulk-fundamentals/${exchange}`); + url.searchParams.append('api_token', apiKey); + url.searchParams.append('fmt', 'json'); + url.searchParams.append('symbols', symbolList.join(',')); + + logger.info(`Fetching fundamentals for ${symbolList.length} ${exchange} symbols`); + + // Fetch data + const response = await fetch(url.toString()); + + if (!response.ok) { + throw new Error(`EOD Bulk Fundamentals API returned ${response.status}: ${response.statusText}`); + } + + const data = await response.json(); + + // EOD bulk fundamentals returns an object with symbol keys + if (typeof data !== 'object' || !data) { + throw new Error('Invalid response format from EOD API - expected object'); + } + + // Process each symbol's fundamentals + const fundamentalsToSave = []; + const symbolsToUpdate = []; + + for (const [symbolExchange, fundamentals] of Object.entries(data)) { + if (!fundamentals || typeof fundamentals !== 'object') { + logger.warn(`No fundamentals data for ${symbolExchange}`); + continue; + } + + // Extract symbol and exchange from the key + const [symbol, exc] = symbolExchange.split('.'); + + // Add metadata + const fundamentalsWithMetadata = { + symbol, + exchange: exc, + symbolExchange, + ...fundamentals, + updatedAt: new Date(), + source: 'eod' + }; + + fundamentalsToSave.push(fundamentalsWithMetadata); + symbolsToUpdate.push({ symbol, exchange: exc }); + } + + if (fundamentalsToSave.length > 0) { + // Save fundamentals to MongoDB + const result = await this.mongodb.batchUpsert( + 'eodFundamentals', + fundamentalsToSave, + ['symbolExchange'] + ); + + logger.info(`Saved ${result.insertedCount} fundamentals records for ${exchange}`); + + // Update symbols with last update timestamp + const updatePromises = symbolsToUpdate.map(({ symbol, exchange }) => + this.mongodb.collection('eodSymbols').updateOne( + { Code: symbol, Exchange: exchange }, + { + $set: { + lastFundamentalsUpdate: new Date(), + hasFundamentals: true + } + } + ) + ); + + await Promise.all(updatePromises); + totalProcessed += fundamentalsToSave.length; + } + } + + logger.info(`Successfully processed fundamentals for ${totalProcessed} symbols`); + + return { + success: true, + symbolsProcessed: totalProcessed + }; + } catch (error) { + logger.error('Failed to fetch bulk fundamentals', { error }); + throw error; + } +} \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/index.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/index.ts index c1e3098..36c8538 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/actions/index.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/index.ts @@ -2,3 +2,6 @@ export { fetchExchanges } from './exchanges'; export { fetchSymbols, scheduleFetchSymbols } from './symbols'; export { fetchPrices, scheduleFetchPrices } from './prices'; export { fetchIntraday, crawlIntraday, scheduleIntradayCrawl } from './intraday'; +export { fetchBulkFundamentals, scheduleFetchFundamentals } from './fundamentals'; +export { fetchSymbolChangeHistory } from './symbol-change'; +export { fetchCorporateActions, scheduleFetchCorporateActions } from './corporate-actions'; diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/symbol-change.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/symbol-change.ts new file mode 100644 index 0000000..07a15ed --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/symbol-change.ts @@ -0,0 +1,93 @@ +import type { BaseHandler } from '@stock-bot/handlers'; +import type { DataIngestionServices } from '../../../types'; +import { EOD_CONFIG } from '../shared'; + +export async function fetchSymbolChangeHistory( + this: BaseHandler +): Promise<{ success: boolean; changesCount: number }> { + const logger = this.logger; + + try { + logger.info('Fetching symbol change history'); + + // Get API key + const apiKey = EOD_CONFIG.API_TOKEN; + if (!apiKey) { + throw new Error('EOD API key not configured'); + } + + // Build URL + const url = new URL('https://eodhd.com/api/symbol-change-history'); + url.searchParams.append('api_token', apiKey); + url.searchParams.append('fmt', 'json'); + + // Fetch data + const response = await fetch(url.toString()); + + if (!response.ok) { + throw new Error(`EOD Symbol Change API returned ${response.status}: ${response.statusText}`); + } + + const data = await response.json(); + + // EOD returns an array of symbol changes + if (!Array.isArray(data)) { + throw new Error('Invalid response format from EOD API - expected array'); + } + + logger.info(`Fetched ${data.length} symbol change records`); + + if (data.length === 0) { + return { success: true, changesCount: 0 }; + } + + // Add metadata to each record + const changesWithMetadata = data.map(change => ({ + ...change, + updatedAt: new Date(), + source: 'eod' + })); + + // Clear existing data and insert new (symbol changes are a complete dataset) + await this.mongodb.collection('eodSymbolChange').deleteMany({}); + const result = await this.mongodb.collection('eodSymbolChange').insertMany(changesWithMetadata); + + logger.info(`Successfully saved ${result.insertedCount} symbol change records`); + + // Process symbol changes to update affected symbols in eodSymbols collection + let updatedSymbols = 0; + for (const change of data) { + if (change.oldSymbol && change.newSymbol && change.exchange) { + // Update the symbol if it exists + const updateResult = await this.mongodb.collection('eodSymbols').updateOne( + { Code: change.oldSymbol, Exchange: change.exchange }, + { + $set: { + Code: change.newSymbol, + previousCode: change.oldSymbol, + symbolChangeDate: change.date, + symbolChangeNote: change.note || 'Symbol changed' + } + } + ); + + if (updateResult.modifiedCount > 0) { + updatedSymbols++; + logger.debug(`Updated symbol ${change.oldSymbol} to ${change.newSymbol} on ${change.exchange}`); + } + } + } + + if (updatedSymbols > 0) { + logger.info(`Updated ${updatedSymbols} symbols based on change history`); + } + + return { + success: true, + changesCount: result.insertedCount + }; + } catch (error) { + logger.error('Failed to fetch symbol change history', { error }); + throw error; + } +} \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts b/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts index 2fd5f72..774abb6 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts @@ -14,7 +14,12 @@ import { scheduleFetchPrices, fetchIntraday, crawlIntraday, - scheduleIntradayCrawl + scheduleIntradayCrawl, + fetchBulkFundamentals, + scheduleFetchFundamentals, + fetchSymbolChangeHistory, + fetchCorporateActions, + scheduleFetchCorporateActions } from './actions'; /** @@ -102,4 +107,47 @@ export class EodHandler extends BaseHandler { @Operation('fetch-intraday') @RateLimit(50) // 50 points per intraday fetch fetchIntraday = fetchIntraday; + + /** + * Schedule bulk fundamentals fetch for symbols + * DISABLED - Not scheduled automatically + */ + @Operation('schedule-fetch-fundamentals') + // @ScheduledOperation('schedule-fetch-fundamentals', '0 4 * * 0') // Disabled - would run weekly at 4 AM on Sundays + @RateLimit(1) // 1 point for scheduling + scheduleFetchFundamentals = scheduleFetchFundamentals; + + /** + * Fetch fundamentals for up to 500 symbols in bulk + * Called by schedule-fetch-fundamentals + */ + @Operation('fetch-bulk-fundamentals') + @RateLimit(100) // 100 points per bulk request + fetchBulkFundamentals = fetchBulkFundamentals; + + /** + * Fetch symbol change history + * Runs weekly on Mondays at 5 AM + */ + @Operation('fetch-symbol-change-history') + @ScheduledOperation('fetch-symbol-change-history', '0 5 * * 1') + @RateLimit(10) // 10 points per request + fetchSymbolChangeHistory = fetchSymbolChangeHistory; + + /** + * Schedule corporate actions (dividends and splits) fetch + * Runs monthly on the 1st at 6 AM + */ + @Operation('schedule-fetch-corporate-actions') + @ScheduledOperation('schedule-fetch-corporate-actions', '0 6 1 * *') + @RateLimit(1) // 1 point for scheduling + scheduleFetchCorporateActions = scheduleFetchCorporateActions; + + /** + * Fetch dividends or splits for a specific symbol + * Called by schedule-fetch-corporate-actions + */ + @Operation('fetch-corporate-actions') + @RateLimit(10) // 10 points per fetch + fetchCorporateActions = fetchCorporateActions; } \ No newline at end of file