import type { EodHandler } from '../eod.handler'; import { EOD_CONFIG } from '../shared'; interface FetchIntradayInput { eodSearchCode: string; interval: '1m' | '5m' | '1h'; fromDate?: Date; toDate?: Date; } interface CrawlIntradayInput { eodSearchCode: string; interval: '1m' | '5m' | '1h'; fromDate?: Date; toDate?: Date; isInitial?: boolean; // To distinguish initial vs continuation jobs } interface ScheduleIntradayConfig { exchanges?: string[]; // Optional: limit to specific exchanges symbolTypes?: string[]; // Optional: limit to specific symbol types (e.g., ['Common Stock']) limit?: number; // Optional: limit total number of symbols to process } // Max days per interval based on EOD limits const MAX_DAYS_PER_INTERVAL = { '1m': 120, '5m': 600, '1h': 7200 }; // Default exchanges to process for intraday data // const DEFAULT_INTRADAY_EXCHANGES = [],//['US', 'TO', 'V', 'CN', 'NEO', 'CC']; export async function scheduleIntradayCrawl( this: EodHandler, config?: ScheduleIntradayConfig ): Promise<{ success: boolean; jobsScheduled: number }> { const logger = this.logger; try { logger.info('Scheduling intraday crawl jobs', { config: { exchanges: config?.exchanges, //|| DEFAULT_INTRADAY_EXCHANGES, symbolTypes: config?.symbolTypes || 'all', limit: config?.limit || 'unlimited' } }); // Check if operation registry is initialized if (!this.operationRegistry) { logger.error('Operation registry not initialized!'); return { success: false, jobsScheduled: 0 }; } logger.debug('Operation registry status:', { hasRegistry: !!this.operationRegistry, hasProvider: this.operationRegistry.hasProvider ? this.operationRegistry.hasProvider('eod') : 'no hasProvider method' }); // Use OperationTracker to find symbols needing intraday crawl const intervals: Array<'1m' | '5m' | '1h'> = ['1m', '5m', '1h']; const operationNames: string[] = ['intraday_1m', 'intraday_5m', 'intraday_1h']; const allSymbolsForCrawl: any[] = []; // Get symbols needing crawl for each interval for (let i = 0; i < intervals.length; i++) { const interval = intervals[i]; const operationName = operationNames[i]!; // Non-null assertion since we know the array has 3 elements // Use getStaleSymbols to find symbols needing intraday updates logger.debug(`Getting stale symbols for ${operationName}...`); // Get symbols with all filters applied at the database level const targetExchanges = config?.exchanges //|| DEFAULT_INTRADAY_EXCHANGES; const desiredLimit = config?.limit || 1000000; const staleSymbols = await this.operationRegistry.getStaleSymbols('eod', operationName, { limit: desiredLimit, exchanges: targetExchanges, }); logger.debug(`getStaleSymbols returned ${staleSymbols.length} symbols for ${operationName}`); // Process the returned symbols for (const staleSymbol of staleSymbols) { const { symbol, operations } = staleSymbol; const operationStatus = operations?.[operationName]; // Apply symbol type filter if specified (this is the only filter not in the DB query) if (config?.symbolTypes && !config.symbolTypes.includes(symbol.Type)) { logger.debug(`Skipping ${symbol.Code} - type ${symbol.Type} not in filter`); continue; } // Include symbols that: // 1. Have never been processed (!operationStatus) // 2. Are not finished (!operationStatus.finished) // 3. Are finished but need new data (newest date > 1 day old) const needsNewData = operationStatus?.finished && operationStatus?.newestDateReached && new Date(operationStatus.newestDateReached) < new Date(Date.now() - 30* 24 * 60 * 60 * 1000); if (!operationStatus || !operationStatus.finished || needsNewData) { // Calculate initial date range for the job const maxDays = MAX_DAYS_PER_INTERVAL[interval]; let toDate = new Date(); let fromDate = new Date(); let isInitial = true; if (operationStatus?.lastProcessedDate) { // Continue from where we left off toDate = new Date(operationStatus.lastProcessedDate); isInitial = false; } // Calculate from date (going backwards) fromDate = new Date(toDate); fromDate.setDate(fromDate.getDate() - maxDays); allSymbolsForCrawl.push({ symbol: symbol, interval: interval, operationName: operationName, lastRun: staleSymbol.lastRun, lastSuccess: staleSymbol.lastSuccess, fromDate: fromDate, toDate: toDate, isInitial: isInitial }); logger.debug(`Added ${symbol.Code}.${symbol.Exchange} for ${interval} crawl`, { hasOperation: !!operationStatus, finished: operationStatus?.finished, needsNewData, dateRange: `${fromDate.toISOString().split('T')[0]} to ${toDate.toISOString().split('T')[0]}`, isInitial }); } } } if (!allSymbolsForCrawl || allSymbolsForCrawl.length === 0) { logger.info('No symbols need intraday crawl'); return { success: true, jobsScheduled: 0 }; } logger.info(`Found ${allSymbolsForCrawl.length} symbol/interval combinations needing intraday data`, { count: allSymbolsForCrawl.length, byInterval: { '1m': allSymbolsForCrawl.filter(s => s.interval === '1m').length, '5m': allSymbolsForCrawl.filter(s => s.interval === '5m').length, '1h': allSymbolsForCrawl.filter(s => s.interval === '1h').length }, symbols: allSymbolsForCrawl.map(s => ({ symbol: s.symbol.Code, exchange: s.symbol.Exchange, name: s.symbol.Name, eodSearchCode: s.symbol.eodSearchCode, interval: s.interval, dateRange: `${s.fromDate.toISOString().split('T')[0]} to ${s.toDate.toISOString().split('T')[0]}`, isInitial: s.isInitial, lastRun: s.lastRun ? new Date(s.lastRun).toISOString() : 'never', lastSuccess: s.lastSuccess ? new Date(s.lastSuccess).toISOString() : 'never' })) }); let jobsScheduled = 0; // Schedule crawl jobs for each symbol/interval combination for (const item of allSymbolsForCrawl) { const { symbol, interval, fromDate, toDate, isInitial } = item; // Create jobId based on whether it's initial or continuation const dateStr = isInitial ? 'initial' : `${fromDate.toISOString().split('T')[0]}-${toDate.toISOString().split('T')[0]}`; const jobId = `crawl-intraday-${symbol.eodSearchCode}-${interval}-${dateStr}`; try { await this.scheduleOperation('crawl-intraday', { eodSearchCode: symbol.eodSearchCode, interval, fromDate, toDate, isInitial }, { jobId, priority: isInitial ? 5 : 3, // Initial crawl jobs get lower priority attempts: 3, backoff: { type: 'exponential', delay: 10000 }, delay: jobsScheduled * 500 // Stagger jobs by 500ms }); jobsScheduled++; logger.debug(`Scheduled crawl job`, { jobId, symbol: symbol.Code, exchange: symbol.Exchange, interval, dateRange: `${fromDate.toISOString().split('T')[0]} to ${toDate.toISOString().split('T')[0]}`, isInitial }); } catch (error: any) { if (error?.message?.includes('Job already exists')) { logger.debug(`Job already exists: ${jobId}`); } else { throw error; } } } logger.info(`Successfully scheduled ${jobsScheduled} intraday crawl jobs`); return { success: true, jobsScheduled }; } catch (error) { logger.error('Failed to schedule intraday crawl jobs', { error }); throw error; } } export async function crawlIntraday( this: EodHandler, input: CrawlIntradayInput ): Promise<{ success: boolean; recordsProcessed: number; finished: boolean }> { const logger = this.logger; const { eodSearchCode, interval, fromDate: providedFromDate, toDate: providedToDate, isInitial } = input; try { // Lookup symbol using eodSearchCode const symbolDoc = await this.mongodb.collection('eodSymbols').findOne({ eodSearchCode: eodSearchCode }); if (!symbolDoc) { logger.error(`Symbol not found for eodSearchCode: ${eodSearchCode}`); throw new Error(`Symbol not found: ${eodSearchCode}`); } const symbol = symbolDoc.Code; const exchange = symbolDoc.eodExchange || symbolDoc.Exchange; const country = symbolDoc.Country; logger.info(`Starting intraday crawl for ${symbol}.${exchange} - ${interval}`, { symbol, exchange, interval, country, eodSearchCode }); logger.debug('Found symbol document', { symbol, exchange, hasOperations: !!symbolDoc.operations, operationKeys: Object.keys(symbolDoc.operations || {}) }); // Get operation status from tracker const operationName = `intraday_${interval}`; const operationStatus = symbolDoc.operations?.[operationName] || {}; logger.info(`Current crawl state for ${symbol}.${exchange} - ${interval}`, { hasOperationStatus: !!symbolDoc.operations?.[operationName], operationStatus: operationStatus, lastProcessedDate: operationStatus.lastProcessedDate, finished: operationStatus.finished }); // Determine date range for this batch const maxDays = MAX_DAYS_PER_INTERVAL[interval]; let toDate: Date; let fromDate: Date; // Use provided dates if available (from scheduled job) if (providedFromDate && providedToDate) { fromDate = new Date(providedFromDate); toDate = new Date(providedToDate); logger.info(`Using provided date range for ${symbol}.${exchange} - ${interval}`, { fromDate: fromDate.toISOString(), toDate: toDate.toISOString(), isInitial }); } else { // Fallback to original logic (for backward compatibility) toDate = new Date(); fromDate = new Date(); if (operationStatus.lastProcessedDate) { // Continue from where we left off - the last processed date becomes the new toDate toDate = new Date(operationStatus.lastProcessedDate); } // Calculate from date (going backwards) fromDate = new Date(toDate); fromDate.setDate(fromDate.getDate() - maxDays); logger.info(`Calculated date range for ${symbol}.${exchange} - ${interval}`, { fromDate: fromDate.toISOString(), toDate: toDate.toISOString(), basedOn: operationStatus.lastProcessedDate ? 'lastProcessedDate' : 'current date' }); } logger.info(`Fetching intraday batch for ${symbol}.${exchange} - ${interval} from ${fromDate.toISOString().split('T')[0]} to ${toDate.toISOString().split('T')[0]}`, { symbol, exchange, interval, fromDate: fromDate.toISOString(), toDate: toDate.toISOString(), maxDays, operationStatus: { lastProcessedDate: operationStatus.lastProcessedDate, totalDaysProcessed: operationStatus.totalDaysProcessed || 0 } }); // Fetch data for this batch const result = await fetchIntraday.call(this, { eodSearchCode, interval, fromDate, toDate }); // Prepare update data const updateData: any = { status: 'partial', recordCount: result.recordsSaved, finished: false, lastProcessedDate: fromDate, // Store the fromDate so next batch continues from here totalDaysProcessed: (operationStatus.totalDaysProcessed || 0) + maxDays }; // Set oldest date reached if (!operationStatus.oldestDateReached || fromDate < operationStatus.oldestDateReached) { updateData.oldestDateReached = fromDate; } else { updateData.oldestDateReached = operationStatus.oldestDateReached; } // Set newest date reached if (!operationStatus.newestDateReached || toDate > operationStatus.newestDateReached) { updateData.newestDateReached = toDate; } else { updateData.newestDateReached = operationStatus.newestDateReached; } // Check if we're finished // Only mark as finished if: // 1. We got no data from the API (empty response) // 2. We've been crawling for a while and consistently getting no new records if (result.recordsSaved === 0 && result.recordsFetched === 0) { // No data returned from API - we've reached the end updateData.finished = true; updateData.status = 'success'; logger.info(`Intraday crawl finished for ${symbol}.${exchange} - ${interval} (no more data available)`, { symbol, exchange, interval, oldestDate: updateData.oldestDateReached?.toISOString(), newestDate: updateData.newestDateReached?.toISOString(), totalDaysProcessed: updateData.totalDaysProcessed, noDataReturned: true }); } else if (result.recordsSaved === 0 && result.recordsFetched > 0) { // Data was fetched but all records already exist - continue crawling logger.info(`All ${result.recordsFetched} records already exist for ${symbol}.${exchange} - ${interval}, continuing crawl`, { symbol, exchange, interval, fromDate: fromDate.toISOString(), toDate: toDate.toISOString(), recordsFetched: result.recordsFetched }); } // Update operation tracker logger.info(`Updating operation tracker for ${symbol}.${exchange} - ${interval}`, { status: updateData.status, recordCount: result.recordsSaved, lastProcessedDate: updateData.lastProcessedDate, finished: updateData.finished }); await this.operationRegistry.updateOperation('eod', eodSearchCode, operationName, updateData); logger.info(`Operation tracker updated for ${symbol}.${exchange} - ${interval}`); // If not finished, schedule next batch if (!updateData.finished) { // Calculate next batch date range const nextToDate = fromDate; // Next batch's toDate is current batch's fromDate const nextFromDate = new Date(nextToDate); nextFromDate.setDate(nextFromDate.getDate() - maxDays); const dateStr = `${nextFromDate.toISOString().split('T')[0]}-${nextToDate.toISOString().split('T')[0]}`; const jobId = `crawl-intraday-${eodSearchCode}-${interval}-${dateStr}`; await this.scheduleOperation('crawl-intraday', { eodSearchCode, interval, fromDate: nextFromDate, toDate: nextToDate, isInitial: false }, { jobId, priority: 3, // Continuation jobs get higher priority (3) than initial jobs (5) attempts: 3, backoff: { type: 'exponential', delay: 10000 }, delay: 5000 // Wait 5 seconds before next batch }); logger.info(`Scheduled next intraday batch for ${symbol}.${exchange} - ${interval}`, { symbol, exchange, interval, jobId, currentBatch: { from: fromDate.toISOString().split('T')[0], to: toDate.toISOString().split('T')[0], recordsSaved: result.recordsSaved }, nextBatch: { from: nextFromDate.toISOString().split('T')[0], to: nextToDate.toISOString().split('T')[0] }, totalDaysProcessed: updateData.totalDaysProcessed }); } return { success: true, recordsProcessed: result.recordsSaved, finished: updateData.finished }; } catch (error) { logger.error('Failed to crawl intraday data', { error, eodSearchCode, interval }); throw error; } } export async function fetchIntraday( this: EodHandler, input: FetchIntradayInput ): Promise<{ success: boolean; recordsSaved: number; recordsFetched: number }> { const logger = this.logger; const { eodSearchCode, interval, fromDate, toDate } = input; // Declare variables for catch block let symbol: string = ''; let exchange: string = ''; try { // Lookup symbol using eodSearchCode const symbolDoc = await this.mongodb.collection('eodSymbols').findOne({ eodSearchCode: eodSearchCode }); if (!symbolDoc) { logger.error(`Symbol not found for eodSearchCode: ${eodSearchCode}`); throw new Error(`Symbol not found: ${eodSearchCode}`); } symbol = symbolDoc.Code; exchange = symbolDoc.eodExchange || symbolDoc.Exchange; const country = symbolDoc.Country; logger.info(`Fetching intraday data for ${symbol}.${exchange} - ${interval}`, { symbol, exchange, interval, from: fromDate?.toISOString().split('T')[0], to: toDate?.toISOString().split('T')[0], country, url: `https://eodhd.com/api/intraday/${symbol}.${exchange}` }); // Get API key const apiKey = EOD_CONFIG.API_TOKEN; if (!apiKey) { throw new Error('EOD API key not configured'); } // Build URL // Note: 'exchange' parameter here is already the eodExchange from scheduling const url = new URL(`https://eodhd.com/api/intraday/${symbol}.${exchange}`); url.searchParams.append('api_token', apiKey); url.searchParams.append('fmt', 'json'); url.searchParams.append('interval', interval); // Add date range if provided if (fromDate) { url.searchParams.append('from', Math.floor(fromDate.getTime() / 1000).toString()); } if (toDate) { url.searchParams.append('to', Math.floor(toDate.getTime() / 1000).toString()); } // Fetch data const response = await fetch(url.toString()); if (!response.ok) { throw new Error(`EOD Intraday API returned ${response.status}: ${response.statusText}`); } const data = await response.json(); // EOD returns an array of intraday bars if (!Array.isArray(data)) { throw new Error('Invalid response format from EOD API - expected array'); } if (data.length === 0) { logger.info('No intraday data returned', { symbol, exchange, interval }); return { success: true, recordsSaved: 0, recordsFetched: 0 }; } logger.info(`Fetched ${data.length} intraday records for ${symbol}.${exchange} - ${interval}`); // Add metadata to each record const recordsWithMetadata = data.map(bar => ({ symbol, exchange, eodSearchCode, datetime: bar.datetime, timestamp: bar.timestamp, gmtoffset: bar.gmtoffset, open: bar.open, high: bar.high, low: bar.low, close: bar.close, volume: bar.volume, source: 'eod' })); // Save to MongoDB - use timestamp and eodSearchCode as unique identifier const collectionName = `eodIntraday${interval.toUpperCase()}`; const result = await this.mongodb.batchUpsert( collectionName, recordsWithMetadata, ['timestamp', 'eodSearchCode'] ); logger.info(`Saved ${result.insertedCount} intraday records`, { symbol, exchange, interval, collection: collectionName }); return { success: true, recordsSaved: result.insertedCount, recordsFetched: data.length }; } catch (error) { logger.error('Failed to fetch intraday data', { error, symbol, exchange, interval }); throw error; } }