From 8630852dba93023bc422d263616c26222a576886 Mon Sep 17 00:00:00 2001 From: Boki Date: Sun, 6 Jul 2025 21:27:11 -0400 Subject: [PATCH] added untested intraday --- .../src/handlers/eod/actions/index.ts | 1 + .../src/handlers/eod/actions/intraday.ts | 325 ++++++++++++++++++ .../src/handlers/eod/eod.handler.ts | 30 +- 3 files changed, 355 insertions(+), 1 deletion(-) create mode 100644 apps/stock/data-ingestion/src/handlers/eod/actions/intraday.ts diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/index.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/index.ts index 3aac827..c1e3098 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/actions/index.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/index.ts @@ -1,3 +1,4 @@ export { fetchExchanges } from './exchanges'; export { fetchSymbols, scheduleFetchSymbols } from './symbols'; export { fetchPrices, scheduleFetchPrices } from './prices'; +export { fetchIntraday, crawlIntraday, scheduleIntradayCrawl } from './intraday'; diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/intraday.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/intraday.ts new file mode 100644 index 0000000..f056956 --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/intraday.ts @@ -0,0 +1,325 @@ +import type { BaseHandler } from '@stock-bot/handlers'; +import type { DataIngestionServices } from '../../../types'; +import { EOD_CONFIG } from '../shared'; + +interface FetchIntradayInput { + symbol: string; + exchange: string; + interval: '1m' | '5m' | '1h'; + fromDate?: Date; + toDate?: Date; +} + +interface CrawlIntradayInput { + symbol: string; + exchange: string; + interval: '1m' | '5m' | '1h'; +} + +interface CrawlState { + finished: boolean; + oldestDateReached?: Date; + newestDateReached?: Date; + lastProcessedDate?: Date; + totalRecordsProcessed?: number; + totalBatchesProcessed?: number; +} + +// Max days per interval based on EOD limits +const MAX_DAYS_PER_INTERVAL = { + '1m': 120, + '5m': 600, + '1h': 7200 +}; + +export async function scheduleIntradayCrawl( + this: BaseHandler +): Promise<{ success: boolean; jobsScheduled: number }> { + const logger = this.logger; + + try { + logger.info('Scheduling intraday crawl jobs'); + + // Get Canadian exchanges for now + const canadianExchanges = ['TO', 'V', 'CN', 'NEO']; + + // Find active symbols that need intraday data + const symbols = await this.mongodb.collection('eodSymbols').find({ + Exchange: { $in: canadianExchanges }, + delisted: false, + // Only symbols without complete intraday data + $or: [ + { 'intradayState.1m.finished': { $ne: true } }, + { 'intradayState.5m.finished': { $ne: true } }, + { 'intradayState.1h.finished': { $ne: true } }, + { 'intradayState': { $exists: false } } + ] + }).limit(100).toArray(); // Limit to avoid too many jobs at once + + if (!symbols || symbols.length === 0) { + logger.info('No symbols need intraday crawl'); + return { success: true, jobsScheduled: 0 }; + } + + logger.info(`Found ${symbols.length} symbols needing intraday data`); + + let jobsScheduled = 0; + const intervals: Array<'1m' | '5m' | '1h'> = ['1m', '5m', '1h']; + + // Schedule crawl jobs for each symbol and interval + for (const symbol of symbols) { + for (const interval of intervals) { + // Check if this interval is already finished + const isFinished = symbol.intradayState?.[interval]?.finished; + if (isFinished) { + continue; + } + + await this.scheduleOperation('crawl-intraday', { + symbol: symbol.Code, + exchange: symbol.Exchange, + interval + }, { + attempts: 3, + backoff: { + type: 'exponential', + delay: 10000 + }, + delay: jobsScheduled * 500 // Stagger jobs by 500ms + }); + jobsScheduled++; + } + } + + logger.info(`Successfully scheduled ${jobsScheduled} intraday crawl jobs`); + + return { + success: true, + jobsScheduled + }; + } catch (error) { + logger.error('Failed to schedule intraday crawl jobs', { error }); + throw error; + } +} + +export async function crawlIntraday( + this: BaseHandler, + input: CrawlIntradayInput +): Promise<{ success: boolean; recordsProcessed: number; finished: boolean }> { + const logger = this.logger; + const { symbol, exchange, interval } = input; + + try { + logger.info('Starting intraday crawl', { symbol, exchange, interval }); + + // Get current crawl state + const symbolDoc = await this.mongodb.collection('eodSymbols').findOne({ + Code: symbol, + Exchange: exchange + }); + + if (!symbolDoc) { + throw new Error(`Symbol ${symbol}.${exchange} not found`); + } + + const crawlState: CrawlState = symbolDoc.intradayState?.[interval] || { + finished: false + }; + + // Determine date range for this batch + const maxDays = MAX_DAYS_PER_INTERVAL[interval]; + let toDate = new Date(); + let fromDate = new Date(); + + if (crawlState.lastProcessedDate) { + // Continue from where we left off + toDate = new Date(crawlState.lastProcessedDate); + toDate.setDate(toDate.getDate() - 1); // Start from day before last processed + } + + // Calculate from date (going backwards) + fromDate = new Date(toDate); + fromDate.setDate(fromDate.getDate() - maxDays + 1); + + // Fetch data for this batch + const result = await fetchIntraday.call(this, { + symbol, + exchange, + interval, + fromDate, + toDate + }); + + // Update crawl state + const newState: CrawlState = { + ...crawlState, + lastProcessedDate: fromDate, + totalRecordsProcessed: (crawlState.totalRecordsProcessed || 0) + result.recordsSaved, + totalBatchesProcessed: (crawlState.totalBatchesProcessed || 0) + 1 + }; + + // Set oldest date reached + if (!newState.oldestDateReached || fromDate < newState.oldestDateReached) { + newState.oldestDateReached = fromDate; + } + + // Set newest date reached + if (!newState.newestDateReached || toDate > newState.newestDateReached) { + newState.newestDateReached = toDate; + } + + // Check if we're finished (no data returned means we've reached the end) + if (result.recordsSaved === 0) { + newState.finished = true; + logger.info('Intraday crawl finished - no more data', { + symbol, + exchange, + interval, + totalRecords: newState.totalRecordsProcessed + }); + } + + // Update symbol with new crawl state + await this.mongodb.collection('eodSymbols').updateOne( + { Code: symbol, Exchange: exchange }, + { + $set: { + [`intradayState.${interval}`]: newState, + lastIntradayUpdate: new Date() + } + } + ); + + // If not finished, schedule next batch + if (!newState.finished) { + await this.scheduleOperation('crawl-intraday', { + symbol, + exchange, + interval + }, { + attempts: 3, + backoff: { + type: 'exponential', + delay: 10000 + }, + delay: 5000 // Wait 5 seconds before next batch + }); + + logger.info('Scheduled next intraday batch', { + symbol, + exchange, + interval, + nextFromDate: fromDate.toISOString() + }); + } + + return { + success: true, + recordsProcessed: result.recordsSaved, + finished: newState.finished + }; + } catch (error) { + logger.error('Failed to crawl intraday data', { error, symbol, exchange, interval }); + throw error; + } +} + +export async function fetchIntraday( + this: BaseHandler, + input: FetchIntradayInput +): Promise<{ success: boolean; recordsSaved: number }> { + const logger = this.logger; + const { symbol, exchange, interval, fromDate, toDate } = input; + + try { + logger.info('Fetching intraday data', { + symbol, + exchange, + interval, + from: fromDate?.toISOString(), + to: toDate?.toISOString() + }); + + // Get API key + const apiKey = EOD_CONFIG.API_TOKEN; + if (!apiKey) { + throw new Error('EOD API key not configured'); + } + + // Build URL + const url = new URL(`https://eodhd.com/api/intraday/${symbol}.${exchange}`); + url.searchParams.append('api_token', apiKey); + url.searchParams.append('fmt', 'json'); + url.searchParams.append('interval', interval); + + // Add date range if provided + if (fromDate) { + url.searchParams.append('from', Math.floor(fromDate.getTime() / 1000).toString()); + } + if (toDate) { + url.searchParams.append('to', Math.floor(toDate.getTime() / 1000).toString()); + } + + // Fetch data + const response = await fetch(url.toString()); + + if (!response.ok) { + throw new Error(`EOD Intraday API returned ${response.status}: ${response.statusText}`); + } + + const data = await response.json(); + + // EOD returns an array of intraday bars + if (!Array.isArray(data)) { + throw new Error('Invalid response format from EOD API - expected array'); + } + + if (data.length === 0) { + logger.info('No intraday data returned', { symbol, exchange, interval }); + return { success: true, recordsSaved: 0 }; + } + + logger.info(`Fetched ${data.length} intraday records`, { symbol, exchange, interval }); + + // Add metadata to each record + const recordsWithMetadata = data.map(bar => ({ + symbol, + exchange, + symbolExchange: `${symbol}.${exchange}`, + interval, + datetime: bar.datetime, + timestamp: bar.timestamp, + gmtoffset: bar.gmtoffset, + open: bar.open, + high: bar.high, + low: bar.low, + close: bar.close, + volume: bar.volume, + source: 'eod' + })); + + // Save to MongoDB - use timestamp, symbol, and interval as unique identifier + const collectionName = `eodIntraday${interval.toUpperCase()}`; + const result = await this.mongodb.batchUpsert( + collectionName, + recordsWithMetadata, + ['timestamp', 'symbolExchange', 'interval'] + ); + + logger.info(`Saved ${result.insertedCount} intraday records`, { + symbol, + exchange, + interval, + collection: collectionName + }); + + return { + success: true, + recordsSaved: result.insertedCount + }; + } catch (error) { + logger.error('Failed to fetch intraday data', { error, symbol, exchange, interval }); + throw error; + } +} \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts b/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts index 65cad75..2fd5f72 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts @@ -11,7 +11,10 @@ import { fetchSymbols, scheduleFetchSymbols, fetchPrices, - scheduleFetchPrices + scheduleFetchPrices, + fetchIntraday, + crawlIntraday, + scheduleIntradayCrawl } from './actions'; /** @@ -74,4 +77,29 @@ export class EodHandler extends BaseHandler { @Operation('fetch-prices') @RateLimit(10) // 10 points per price fetch fetchPrices = fetchPrices; + + /** + * Schedule intraday crawl for symbols needing intraday data + * Runs daily at 3 AM + */ + @Operation('schedule-intraday-crawl') + @ScheduledOperation('schedule-intraday-crawl', '0 3 * * *') + @RateLimit(1) // 1 point for scheduling + scheduleIntradayCrawl = scheduleIntradayCrawl; + + /** + * Crawl intraday data for a specific symbol and interval + * Handles resumption and batch processing + */ + @Operation('crawl-intraday') + @RateLimit(50) // 50 points per crawl batch + crawlIntraday = crawlIntraday; + + /** + * Fetch intraday data for a specific date range + * Called by crawl-intraday + */ + @Operation('fetch-intraday') + @RateLimit(50) // 50 points per intraday fetch + fetchIntraday = fetchIntraday; } \ No newline at end of file