import type { BaseHandler } from '@stock-bot/handlers'; import type { DataIngestionServices } from '../../../types'; import { EOD_CONFIG } from '../shared'; import { getEodExchangeSuffix } from '../shared/utils'; interface FetchIntradayInput { symbol: string; exchange: string; interval: '1m' | '5m' | '1h'; fromDate?: Date; toDate?: Date; country?: string; } interface CrawlIntradayInput { symbol: string; exchange: string; interval: '1m' | '5m' | '1h'; country?: string; } interface CrawlState { finished: boolean; oldestDateReached?: Date; newestDateReached?: Date; lastProcessedDate?: Date; totalRecordsProcessed?: number; totalBatchesProcessed?: number; } // Max days per interval based on EOD limits const MAX_DAYS_PER_INTERVAL = { '1m': 120, '5m': 600, '1h': 7200 }; export async function scheduleIntradayCrawl( this: BaseHandler ): Promise<{ success: boolean; jobsScheduled: number }> { const logger = this.logger; try { logger.info('Scheduling intraday crawl jobs'); // Get Canadian exchanges for now const canadianExchanges = ['TO', 'V', 'CN', 'NEO']; // Find active symbols that need intraday data const symbols = await this.mongodb.collection('eodSymbols').find({ Exchange: { $in: canadianExchanges }, delisted: false, // Only symbols without complete intraday data $or: [ { 'intradayState.1m.finished': { $ne: true } }, { 'intradayState.5m.finished': { $ne: true } }, { 'intradayState.1h.finished': { $ne: true } }, { 'intradayState': { $exists: false } } ] }).limit(100).toArray(); // Limit to avoid too many jobs at once if (!symbols || symbols.length === 0) { logger.info('No symbols need intraday crawl'); return { success: true, jobsScheduled: 0 }; } logger.info(`Found ${symbols.length} symbols needing intraday data`, { count: symbols.length, samples: symbols.slice(0, 5).map(s => ({ symbol: s.Code, exchange: s.Exchange, name: s.Name, intradayState: s.intradayState })) }); let jobsScheduled = 0; const intervals: Array<'1m' | '5m' | '1h'> = ['1m', '5m', '1h']; // Schedule crawl jobs for each symbol and interval for (const symbol of symbols) { for (const interval of intervals) { // Check if this interval is already finished const isFinished = symbol.intradayState?.[interval]?.finished; if (isFinished) { continue; } await this.scheduleOperation('crawl-intraday', { symbol: symbol.Code, exchange: symbol.eodExchange || symbol.Exchange, // Use eodExchange if available interval, country: symbol.Country }, { attempts: 3, backoff: { type: 'exponential', delay: 10000 }, delay: jobsScheduled * 500 // Stagger jobs by 500ms }); jobsScheduled++; } } logger.info(`Successfully scheduled ${jobsScheduled} intraday crawl jobs`); return { success: true, jobsScheduled }; } catch (error) { logger.error('Failed to schedule intraday crawl jobs', { error }); throw error; } } export async function crawlIntraday( this: BaseHandler, input: CrawlIntradayInput ): Promise<{ success: boolean; recordsProcessed: number; finished: boolean }> { const logger = this.logger; const { symbol, exchange, interval, country } = input; try { logger.info(`Starting intraday crawl for ${symbol}.${exchange} - ${interval}`); // Get current crawl state const symbolDoc = await this.mongodb.collection('eodSymbols').findOne({ Code: symbol, Exchange: exchange }); if (!symbolDoc) { throw new Error(`Symbol ${symbol}.${exchange} not found`); } const crawlState: CrawlState = symbolDoc.intradayState?.[interval] || { finished: false }; // Determine date range for this batch const maxDays = MAX_DAYS_PER_INTERVAL[interval]; let toDate = new Date(); let fromDate = new Date(); if (crawlState.lastProcessedDate) { // Continue from where we left off toDate = new Date(crawlState.lastProcessedDate); toDate.setDate(toDate.getDate() - 1); // Start from day before last processed } // Calculate from date (going backwards) fromDate = new Date(toDate); fromDate.setDate(fromDate.getDate() - maxDays + 1); // Fetch data for this batch const result = await fetchIntraday.call(this, { symbol, exchange, interval, fromDate, toDate, country }); // Update crawl state const newState: CrawlState = { ...crawlState, lastProcessedDate: fromDate, totalRecordsProcessed: (crawlState.totalRecordsProcessed || 0) + result.recordsSaved, totalBatchesProcessed: (crawlState.totalBatchesProcessed || 0) + 1 }; // Set oldest date reached if (!newState.oldestDateReached || fromDate < newState.oldestDateReached) { newState.oldestDateReached = fromDate; } // Set newest date reached if (!newState.newestDateReached || toDate > newState.newestDateReached) { newState.newestDateReached = toDate; } // Check if we're finished (no data returned means we've reached the end) if (result.recordsSaved === 0) { newState.finished = true; logger.info(`Intraday crawl finished for ${symbol}.${exchange} - ${interval}`, { totalRecords: newState.totalRecordsProcessed, oldestDate: newState.oldestDateReached, newestDate: newState.newestDateReached, batches: newState.totalBatchesProcessed }); } // Update symbol with new crawl state await this.mongodb.collection('eodSymbols').updateOne( { Code: symbol, Exchange: exchange }, { $set: { [`intradayState.${interval}`]: newState, lastIntradayUpdate: new Date() } } ); // If not finished, schedule next batch if (!newState.finished) { await this.scheduleOperation('crawl-intraday', { symbol, exchange, interval, country }, { attempts: 3, backoff: { type: 'exponential', delay: 10000 }, delay: 5000 // Wait 5 seconds before next batch }); logger.info('Scheduled next intraday batch', { symbol, exchange, interval, nextFromDate: fromDate.toISOString() }); } return { success: true, recordsProcessed: result.recordsSaved, finished: newState.finished }; } catch (error) { logger.error('Failed to crawl intraday data', { error, symbol, exchange, interval }); throw error; } } export async function fetchIntraday( this: BaseHandler, input: FetchIntradayInput ): Promise<{ success: boolean; recordsSaved: number }> { const logger = this.logger; const { symbol, exchange, interval, fromDate, toDate, country } = input; try { logger.info(`Fetching intraday data for ${symbol}.${exchange} - ${interval}`, { from: fromDate?.toISOString().split('T')[0], to: toDate?.toISOString().split('T')[0] }); // Get country if not provided let symbolCountry = country; if (!symbolCountry) { const symbolDoc = await this.mongodb.collection('eodSymbols').findOne({ Code: symbol, Exchange: exchange }); if (!symbolDoc) { throw new Error(`Symbol ${symbol}.${exchange} not found in database`); } symbolCountry = symbolDoc.Country; } // Get API key const apiKey = EOD_CONFIG.API_TOKEN; if (!apiKey) { throw new Error('EOD API key not configured'); } // Build URL // Use utility function to handle US symbols and EUFUND special case const exchangeSuffix = getEodExchangeSuffix(exchange, symbolCountry); const url = new URL(`https://eodhd.com/api/intraday/${symbol}.${exchangeSuffix}`); url.searchParams.append('api_token', apiKey); url.searchParams.append('fmt', 'json'); url.searchParams.append('interval', interval); // Add date range if provided if (fromDate) { url.searchParams.append('from', Math.floor(fromDate.getTime() / 1000).toString()); } if (toDate) { url.searchParams.append('to', Math.floor(toDate.getTime() / 1000).toString()); } // Fetch data const response = await fetch(url.toString()); if (!response.ok) { throw new Error(`EOD Intraday API returned ${response.status}: ${response.statusText}`); } const data = await response.json(); // EOD returns an array of intraday bars if (!Array.isArray(data)) { throw new Error('Invalid response format from EOD API - expected array'); } if (data.length === 0) { logger.info('No intraday data returned', { symbol, exchange, interval }); return { success: true, recordsSaved: 0 }; } logger.info(`Fetched ${data.length} intraday records for ${symbol}.${exchange} - ${interval}`); // Add metadata to each record const recordsWithMetadata = data.map(bar => ({ symbol, exchange, symbolExchange: `${symbol}.${exchange}`, interval, datetime: bar.datetime, timestamp: bar.timestamp, gmtoffset: bar.gmtoffset, open: bar.open, high: bar.high, low: bar.low, close: bar.close, volume: bar.volume, source: 'eod' })); // Save to MongoDB - use timestamp, symbol, and interval as unique identifier const collectionName = `eodIntraday${interval.toUpperCase()}`; const result = await this.mongodb.batchUpsert( collectionName, recordsWithMetadata, ['timestamp', 'symbolExchange', 'interval'] ); logger.info(`Saved ${result.insertedCount} intraday records`, { symbol, exchange, interval, collection: collectionName }); return { success: true, recordsSaved: result.insertedCount }; } catch (error) { logger.error('Failed to fetch intraday data', { error, symbol, exchange, interval }); throw error; } }