From 960edbaa474fbcd58ab97fdbac901571788b5e89 Mon Sep 17 00:00:00 2001 From: Boki Date: Sun, 6 Jul 2025 20:24:05 -0400 Subject: [PATCH] working on eod puller added exchanges and symbols --- .env | 4 +- .../eod/actions/fetch-daily-prices.action.ts | 62 --------- .../handlers/eod/actions/fetch-exchanges.ts | 45 ++++++ .../eod/actions/fetch-fundamentals.action.ts | 60 -------- .../handlers/eod/actions/fetch-news.action.ts | 65 --------- .../src/handlers/eod/actions/fetch-symbols.ts | 131 ++++++++++++++++++ .../src/handlers/eod/actions/index.ts | 5 +- .../src/handlers/eod/eod.handler.ts | 109 +++------------ 8 files changed, 200 insertions(+), 281 deletions(-) delete mode 100644 apps/stock/data-ingestion/src/handlers/eod/actions/fetch-daily-prices.action.ts create mode 100644 apps/stock/data-ingestion/src/handlers/eod/actions/fetch-exchanges.ts delete mode 100644 apps/stock/data-ingestion/src/handlers/eod/actions/fetch-fundamentals.action.ts delete mode 100644 apps/stock/data-ingestion/src/handlers/eod/actions/fetch-news.action.ts create mode 100644 apps/stock/data-ingestion/src/handlers/eod/actions/fetch-symbols.ts diff --git a/.env b/.env index 8923e13..f7fe91e 100644 --- a/.env +++ b/.env @@ -4,8 +4,8 @@ # Core Application Settings NODE_ENV=development -LOG_LEVEL=trace -LOG_HIDE_OBJECT=false +LOG_LEVEL=info +LOG_HIDE_OBJECT=true # Data Service Configuration DATA_SERVICE_PORT=2001 diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/fetch-daily-prices.action.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/fetch-daily-prices.action.ts deleted file mode 100644 index 38a651a..0000000 --- a/apps/stock/data-ingestion/src/handlers/eod/actions/fetch-daily-prices.action.ts +++ /dev/null @@ -1,62 +0,0 @@ -import type { EodHandler } from '../eod.handler'; -import { EOD_CONFIG } from '../shared/config'; - -/** - * Simulates fetching daily price data - * This is a high-volume operation that should be rate limited - */ -export async function fetchDailyPrices( - this: EodHandler -): Promise { - const { logger, mongodb } = this; - - // Generate fake data for testing - const symbols = ['AAPL', 'GOOGL', 'MSFT', 'AMZN', 'TSLA']; - const date = new Date().toISOString().split('T')[0]; - - logger.info('Fetching daily prices', { - symbols: symbols.length, - date, - timestamp: new Date().toISOString() - }); - - try { - // Simulate API call delay - await new Promise(resolve => setTimeout(resolve, 100 + Math.random() * 200)); - - // Simulate processing each symbol - const results = []; - for (const symbol of symbols) { - const priceData = { - symbol, - date, - open: 100 + Math.random() * 50, - high: 120 + Math.random() * 50, - low: 90 + Math.random() * 50, - close: 110 + Math.random() * 50, - volume: Math.floor(1000000 + Math.random() * 5000000), - timestamp: new Date().toISOString() - }; - - results.push(priceData); - - logger.debug('Processed price data', { symbol, date }); - } - - // Simulate saving to database - if (mongodb) { - await mongodb.batchUpsert('testPrices',results, ['symbol', 'date']); - logger.info('Saved price data to MongoDB', { count: results.length }); - } - - return { - success: true, - processed: results.length, - date, - message: `Fetched daily prices for ${symbols.length} symbols` - }; - } catch (error) { - logger.error('Failed to fetch daily prices', { error, symbols }); - throw error; - } -} \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/fetch-exchanges.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/fetch-exchanges.ts new file mode 100644 index 0000000..157b1de --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/fetch-exchanges.ts @@ -0,0 +1,45 @@ +import type { BaseHandler } from '@stock-bot/handlers'; +import type { DataIngestionServices } from '../../../types'; + +export async function fetchExchanges( + this: BaseHandler +): Promise<{ success: boolean; count: number }> {; + const logger = this.logger; + + try { + logger.info('Fetching EOD exchanges list'); + + // Build URL with query parameters + const url = new URL('https://eodhd.com/api/exchanges-list/'); + url.searchParams.append('api_token', '657fe003583a32.85708911'); + url.searchParams.append('fmt', 'json'); + + // Fetch exchanges from EOD API using Bun fetch + const response = await fetch(url.toString()); + + if (!response.ok) { + throw new Error(`EOD Exchanges API returned ${response.status}: ${response.statusText}`); + } + + const exchanges = await response.json(); + + if (!Array.isArray(exchanges)) { + throw new Error('Invalid response format from EOD API - expected array'); + } + logger.info(`Fetched ${exchanges.length} exchanges from EOD`); + + const result = await this.mongodb.batchUpsert('eodExchanges', exchanges, ['Code']); + + logger.info(`Successfully saved ${result.insertedCount} exchanges to MongoDB`); + + return { + success: true, + count: result.insertedCount + }; + } catch (error) { + logger.error('Failed to fetch or save EOD exchanges', { error }); + throw error; + } +} + + diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/fetch-fundamentals.action.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/fetch-fundamentals.action.ts deleted file mode 100644 index 4b5a408..0000000 --- a/apps/stock/data-ingestion/src/handlers/eod/actions/fetch-fundamentals.action.ts +++ /dev/null @@ -1,60 +0,0 @@ -import type { EodHandler } from '../eod.handler'; - -/** - * Simulates fetching fundamental data - * This is a medium-volume operation with moderate rate limits - */ -export async function fetchFundamentals( - this: EodHandler -): Promise { - const { logger, mongodb } = this; - - // Generate fake data for testing - const symbols = ['AAPL', 'GOOGL', 'MSFT']; - const metrics = ['pe_ratio', 'market_cap', 'revenue', 'earnings']; - - logger.info('Fetching fundamentals', { - symbols: symbols.length, - metrics, - timestamp: new Date().toISOString() - }); - - try { - // Simulate API call delay (longer than prices) - await new Promise(resolve => setTimeout(resolve, 200 + Math.random() * 300)); - - // Simulate processing each symbol - const results = []; - for (const symbol of symbols) { - const fundamentalData = { - symbol, - pe_ratio: 15 + Math.random() * 20, - market_cap: Math.floor(1000000000 + Math.random() * 500000000000), - revenue: Math.floor(10000000 + Math.random() * 100000000000), - earnings: Math.floor(1000000 + Math.random() * 10000000000), - last_updated: new Date().toISOString() - }; - - results.push(fundamentalData); - - logger.debug('Processed fundamental data', { symbol }); - } - - // Simulate saving to database - if (mongodb) { - const result = await mongodb.batchUpsert('testFundamentals', results, ['symbol']); - logger.info('Saved fundamental data to MongoDB', { - count: results.length - }); - } - - return { - success: true, - processed: results.length, - message: `Fetched fundamentals for ${symbols.length} symbols` - }; - } catch (error) { - logger.error('Failed to fetch fundamentals', { error, symbols }); - throw error; - } -} \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/fetch-news.action.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/fetch-news.action.ts deleted file mode 100644 index 07ade84..0000000 --- a/apps/stock/data-ingestion/src/handlers/eod/actions/fetch-news.action.ts +++ /dev/null @@ -1,65 +0,0 @@ -import type { EodHandler } from '../eod.handler'; - -/** - * Simulates fetching news data - * This is a low-volume operation with strict rate limits - */ -export async function fetchNews( - this: EodHandler -): Promise { - const { logger, mongodb } = this; - - // Generate fake data for testing - const symbols = ['AAPL', 'TSLA']; - const keywords = ['earnings', 'market']; - const limit = 10; - - logger.info('Fetching news', { - symbols: symbols.length, - keywords: keywords.length, - limit, - timestamp: new Date().toISOString() - }); - - try { - // Simulate API call delay (longest delay for news) - await new Promise(resolve => setTimeout(resolve, 300 + Math.random() * 500)); - - // Simulate fetching news articles - const articles = []; - const articleCount = Math.min(limit, 5 + Math.floor(Math.random() * 10)); - - for (let i = 0; i < articleCount; i++) { - const article = { - id: `news_${Date.now()}_${i}`, - title: `Breaking: Market Update ${i + 1}`, - summary: `Important market news regarding ${symbols.join(', ') || 'general market'}`, - symbols: symbols, - keywords: keywords, - published_at: new Date(Date.now() - Math.random() * 86400000).toISOString(), - source: ['Reuters', 'Bloomberg', 'CNBC', 'WSJ'][Math.floor(Math.random() * 4)], - sentiment: ['positive', 'negative', 'neutral'][Math.floor(Math.random() * 3)], - fetched_at: new Date().toISOString() - }; - - articles.push(article); - } - - logger.debug('Fetched news articles', { count: articles.length }); - - // Simulate saving to database - if (mongodb && articles.length > 0) { - await mongodb.batchUpsert('testNews', articles, ['id']); - logger.info('Saved news articles to MongoDB', { count: articles.length }); - } - - return { - success: true, - articles: articles.length, - message: `Fetched ${articles.length} news articles` - }; - } catch (error) { - logger.error('Failed to fetch news', { error, symbols, keywords }); - throw error; - } -} \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/fetch-symbols.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/fetch-symbols.ts new file mode 100644 index 0000000..75e05a9 --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/fetch-symbols.ts @@ -0,0 +1,131 @@ +import type { BaseHandler } from '@stock-bot/handlers'; +import type { DataIngestionServices } from '../../../types'; + +interface FetchSymbolsInput { + exchangeCode: string; + delisted: boolean; +} + + +export async function scheduleFetchSymbols( + this: BaseHandler +): Promise<{ success: boolean; jobsCreated: number }> { + const logger = this.logger; + + try { + logger.info('Scheduling symbol fetch jobs for all exchanges'); + + // Get all exchanges from MongoDB + const exchanges = await this.mongodb.collection('eodExchanges').find({}).toArray(); + + if (!exchanges || exchanges.length === 0) { + logger.warn('No exchanges found in database'); + return { success: true, jobsCreated: 0 }; + } + + logger.info(`Found ${exchanges.length} exchanges to process`); + + let jobsCreated = 0; + + // Create jobs for each exchange - both active and delisted symbols + for (const exchange of exchanges) { + const exchangeCode = exchange.Code; + + // Job for active symbols + await this.scheduleOperation('fetch-symbols', { + exchangeCode, + delisted: false + }, { + attempts: 3, + backoff: { + type: 'exponential', + delay: 5000 + } + }); + jobsCreated++; + + // Job for delisted symbols + await this.scheduleOperation('fetch-symbols', { + exchangeCode, + delisted: true + }, { + attempts: 3, + backoff: { + type: 'exponential', + delay: 5000 + } + }); + jobsCreated++; + } + + logger.info(`Successfully created ${jobsCreated} symbol fetch jobs`); + + return { + success: true, + jobsCreated + }; + } catch (error) { + logger.error('Failed to schedule symbol fetch jobs', { error }); + throw error; + } +} + +export async function fetchSymbols( + this: BaseHandler, + input: FetchSymbolsInput +): Promise<{ success: boolean; count: number }> { + const logger = this.logger; + const { exchangeCode, delisted } = input; + + try { + logger.info('Fetching symbols for exchange', { exchangeCode, delisted }); + + // Build URL with query parameters + const url = new URL(`https://eodhd.com/api/exchange-symbol-list/${exchangeCode}`); + url.searchParams.append('api_token', '657fe003583a32.85708911'); + url.searchParams.append('fmt', 'json'); + + if (delisted) { + url.searchParams.append('delisted', '1'); + } + + // Fetch symbols from EOD API using Bun fetch + const response = await fetch(url.toString()); + + if (!response.ok) { + throw new Error(`EOD Symbols API returned ${response.status}: ${response.statusText}`); + } + + const symbols = await response.json(); + + if (!Array.isArray(symbols)) { + throw new Error('Invalid response format from EOD API - expected array'); + } + + logger.info(`Fetched ${symbols.length} ${delisted ? 'delisted' : 'active'} symbols for ${exchangeCode}`); + + // Add metadata to each symbol + const symbolsWithMetadata = symbols.map(symbol => ({ + ...symbol, + Exchange: symbol.Exchange || exchangeCode, // Ensure Exchange is set + delisted: delisted, + })); + + // Save to MongoDB - use Code as unique identifier along with exchange + const result = await this.mongodb.batchUpsert( + 'eodSymbols', + symbolsWithMetadata, + ['Code', 'Exchange'] + ); + + logger.info(`Successfully saved ${result.insertedCount} ${delisted ? 'delisted' : 'active'} symbols for ${exchangeCode}`); + + return { + success: true, + count: result.insertedCount + }; + } catch (error) { + logger.error('Failed to fetch or save symbols', { error, exchangeCode, delisted }); + throw error; + } +} \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/index.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/index.ts index b1501dc..f019a42 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/actions/index.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/index.ts @@ -1,3 +1,2 @@ -export * from './fetch-daily-prices.action'; -export * from './fetch-fundamentals.action'; -export * from './fetch-news.action'; +export { fetchExchanges } from './fetch-exchanges'; +export { fetchSymbols, scheduleFetchSymbols } from './fetch-symbols'; diff --git a/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts b/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts index 9b96c4e..9b04e1b 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts @@ -6,11 +6,7 @@ import { ScheduledOperation } from '@stock-bot/handlers'; import type { DataIngestionServices } from '../../types'; -import { - fetchDailyPrices, - fetchFundamentals, - fetchNews, -} from './actions'; +import { fetchExchanges, fetchSymbols, scheduleFetchSymbols } from './actions'; /** * EOD (End of Day) Handler demonstrating advanced rate limiting @@ -21,11 +17,9 @@ import { @Handler('eod') @RateLimit({ limits: [ - { points: 10, duration: 1 }, // 100 points per second - { points: 10000, duration: 3600 }, // 10k points per hour - { points: 100000, duration: 86400 }, // 100k points per day + { points: 1000, duration: 60 }, // 1000 points per minute + { points: 100500, duration: 86400 }, // 100,500 points per day ], - cost: 1, // Default cost for operations using this handler }) export class EodHandler extends BaseHandler { constructor(services: any) { @@ -33,91 +27,28 @@ export class EodHandler extends BaseHandler { } /** - * Fetch daily price data - Low cost operation - * Uses handler rate limits but costs only 1 point + * Fetch exchanges list from EOD + * Runs weekly on Sundays at midnight */ - @Operation('fetch-daily-prices') - @RateLimit(1) // Costs 1 point per call - fetchDailyPrices = fetchDailyPrices; + @Operation('fetch-exchanges') + @ScheduledOperation('fetch-exchanges', '0 0 * * 0') + @RateLimit(1) // 1 point per call + fetchExchanges = fetchExchanges; /** - * Fetch fundamental data - Medium cost operation - * Uses handler rate limits but costs 10 points + * Schedule symbol fetching for all exchanges + * Runs daily at 1 AM */ - @Operation('fetch-fundamentals') - @RateLimit(1) // Costs 10 points per call - fetchFundamentals = fetchFundamentals; + @Operation('schedule-fetch-symbols') + @ScheduledOperation('schedule-fetch-symbols', '0 1 * * *') + @RateLimit(1) // 1 point for scheduling + scheduleFetchSymbols = scheduleFetchSymbols; /** - * Fetch news data - High cost operation - * Has custom limits AND high cost + * Fetch symbols for a specific exchange + * Called by schedule-fetch-symbols for each exchange */ - @Operation('fetch-news') - @RateLimit(1) - fetchNews = fetchNews; - - /** - * Test burst operations - For testing rate limit behavior - * Uses handler default cost (1 point) - */ - @Operation('test-burst') - @RateLimit(0) - async testBurstOperations(input: { operationsToTest: string[], burstSize: number }): Promise { - this.logger.info('Testing burst operations', input); - - const results = { - attempted: 0, - scheduled: 0, - failed: 0, - operations: {} as Record - }; - - try { - const promises = []; - for (let i = 0; i < input.burstSize; i++) { - const operation = input.operationsToTest[i % input.operationsToTest.length] || 'fetch-news'; - results.attempted++; - results.operations[operation] = (results.operations[operation] || 0) + 1; - - const promise = this.scheduleOperation(operation, { index: i }).then(() => { - results.scheduled++; - }).catch((error) => { - results.failed++; - this.logger.debug('Failed to schedule operation', { operation, error: error.message }); - }); - - promises.push(promise); - } - - await Promise.allSettled(promises); - - return { - success: true, - results, - message: `Scheduled ${results.scheduled}/${results.attempted} operations`, - breakdown: results.operations - }; - } catch (error) { - this.logger.error('Burst test failed', { error }); - throw error; - } - } - - /** - * Scheduled job to test rate limits - * Runs every 5 minutes for testing - */ - @ScheduledOperation('eod-rate-limit-test', '*/5 * * * *', { - priority: 5, - description: 'Test rate limit behavior', - immediately: true, - }) - @RateLimit(0) // No cost for this test operation - async scheduledRateLimitTest(): Promise { - this.logger.info('Starting rate limit test'); - return this.testBurstOperations({ - operationsToTest: ['fetch-daily-prices', 'fetch-fundamentals', 'fetch-news'], - burstSize: 200 - }); - } + @Operation('fetch-symbols') + @RateLimit(1) // 10 points per exchange + fetchSymbols = fetchSymbols; } \ No newline at end of file