stock-bot/apps/stock/data-ingestion/src/handlers/eod/actions/intraday.ts

429 lines
No EOL
14 KiB
TypeScript

import type { EodHandler } from '../eod.handler';
import { EOD_CONFIG } from '../shared';
interface FetchIntradayInput {
eodSearchCode: string;
interval: '1m' | '5m' | '1h';
fromDate?: Date;
toDate?: Date;
}
interface CrawlIntradayInput {
eodSearchCode: string;
interval: '1m' | '5m' | '1h';
}
// Max days per interval based on EOD limits
const MAX_DAYS_PER_INTERVAL = {
'1m': 120,
'5m': 600,
'1h': 7200
};
export async function scheduleIntradayCrawl(
this: EodHandler
): Promise<{ success: boolean; jobsScheduled: number }> {
const logger = this.logger;
try {
logger.info('Scheduling intraday crawl jobs');
// Use OperationTracker to find symbols needing intraday crawl
const intervals: Array<'1m' | '5m' | '1h'> = ['1m', '5m', '1h'];
const operationNames: string[] = ['intraday_1m', 'intraday_5m', 'intraday_1h'];
const allSymbolsForCrawl: any[] = [];
// Get symbols needing crawl for each interval
for (let i = 0; i < intervals.length; i++) {
const interval = intervals[i];
const operationName = operationNames[i]!; // Non-null assertion since we know the array has 3 elements
// For intraday, we want to check even finished crawls for new data
// So we'll query the symbols directly
const symbolsForInterval = await this.mongodb.collection('eodSymbols').find({
eodSearchCode: 'AAPL.US',
delisted: false
}).toArray();
// Add interval info to each symbol
symbolsForInterval.forEach((symbol: any) => {
// Check if this interval needs processing (not finished or needs new data)
const operationStatus = symbol.operations?.[operationName];
const shouldProcess = !operationStatus || !operationStatus.finished ||
(operationStatus.newestDateReached &&
new Date(operationStatus.newestDateReached) < new Date(Date.now() - 24 * 60 * 60 * 1000)); // Check if newest date is more than 1 day old
if (shouldProcess) {
allSymbolsForCrawl.push({
symbol: symbol,
interval: interval,
operationName: operationName,
lastRun: operationStatus?.lastRunAt,
lastSuccess: operationStatus?.lastSuccessAt
});
}
});
}
if (!allSymbolsForCrawl || allSymbolsForCrawl.length === 0) {
logger.info('No symbols need intraday crawl');
return { success: true, jobsScheduled: 0 };
}
logger.info(`Found ${allSymbolsForCrawl.length} symbol/interval combinations needing intraday data`, {
count: allSymbolsForCrawl.length,
byInterval: {
'1m': allSymbolsForCrawl.filter(s => s.interval === '1m').length,
'5m': allSymbolsForCrawl.filter(s => s.interval === '5m').length,
'1h': allSymbolsForCrawl.filter(s => s.interval === '1h').length
},
samples: allSymbolsForCrawl.slice(0, 5).map(s => ({
symbol: s.symbol.Code,
exchange: s.symbol.eodExchange || s.symbol.Exchange,
name: s.symbol.Name,
interval: s.interval,
lastRun: s.lastRun ? new Date(s.lastRun).toISOString() : 'never',
lastSuccess: s.lastSuccess ? new Date(s.lastSuccess).toISOString() : 'never'
}))
});
let jobsScheduled = 0;
// Schedule crawl jobs for each symbol/interval combination
for (const item of allSymbolsForCrawl) {
const { symbol, interval } = item;
await this.scheduleOperation('crawl-intraday', {
eodSearchCode: symbol.eodSearchCode,
interval
}, {
priority: 5, // Initial crawl jobs get priority 5 (lower priority)
attempts: 3,
backoff: {
type: 'exponential',
delay: 10000
},
delay: jobsScheduled * 500 // Stagger jobs by 500ms
});
jobsScheduled++;
}
logger.info(`Successfully scheduled ${jobsScheduled} intraday crawl jobs`);
return {
success: true,
jobsScheduled
};
} catch (error) {
logger.error('Failed to schedule intraday crawl jobs', { error });
throw error;
}
}
export async function crawlIntraday(
this: EodHandler,
input: CrawlIntradayInput
): Promise<{ success: boolean; recordsProcessed: number; finished: boolean }> {
const logger = this.logger;
const { eodSearchCode, interval } = input;
try {
// Lookup symbol using eodSearchCode
const symbolDoc = await this.mongodb.collection('eodSymbols').findOne({
eodSearchCode: eodSearchCode
});
if (!symbolDoc) {
logger.error(`Symbol not found for eodSearchCode: ${eodSearchCode}`);
throw new Error(`Symbol not found: ${eodSearchCode}`);
}
const symbol = symbolDoc.Code;
const exchange = symbolDoc.eodExchange || symbolDoc.Exchange;
const country = symbolDoc.Country;
logger.info(`Starting intraday crawl for ${symbol}.${exchange} - ${interval}`, {
symbol,
exchange,
interval,
country,
eodSearchCode
});
logger.debug('Found symbol document', {
symbol,
exchange,
hasOperations: !!symbolDoc.operations,
operationKeys: Object.keys(symbolDoc.operations || {})
});
// Get operation status from tracker
const operationName = `intraday_${interval}`;
const operationStatus = symbolDoc.operations?.[operationName] || {};
logger.info(`Current crawl state for ${symbol}.${exchange} - ${interval}`, {
hasOperationStatus: !!symbolDoc.operations?.[operationName],
operationStatus: operationStatus,
lastProcessedDate: operationStatus.lastProcessedDate,
finished: operationStatus.finished
});
// Determine date range for this batch
const maxDays = MAX_DAYS_PER_INTERVAL[interval];
let toDate = new Date();
let fromDate = new Date();
if (operationStatus.lastProcessedDate) {
// Continue from where we left off - the last processed date becomes the new toDate
toDate = new Date(operationStatus.lastProcessedDate);
// No need to subtract a day - lastProcessedDate is the fromDate of the last batch
}
// Calculate from date (going backwards)
fromDate = new Date(toDate);
fromDate.setDate(fromDate.getDate() - maxDays);
logger.info(`Fetching intraday batch for ${symbol}.${exchange} - ${interval} from ${fromDate.toISOString().split('T')[0]} to ${toDate.toISOString().split('T')[0]}`, {
symbol,
exchange,
interval,
fromDate: fromDate.toISOString(),
toDate: toDate.toISOString(),
maxDays,
operationStatus: {
lastProcessedDate: operationStatus.lastProcessedDate,
totalDaysProcessed: operationStatus.totalDaysProcessed || 0
}
});
// Fetch data for this batch
const result = await fetchIntraday.call(this, {
eodSearchCode,
interval,
fromDate,
toDate
});
// Prepare update data
const updateData: any = {
status: 'partial',
recordCount: result.recordsSaved,
finished: false,
lastProcessedDate: fromDate, // Store the fromDate so next batch continues from here
totalDaysProcessed: (operationStatus.totalDaysProcessed || 0) + maxDays
};
// Set oldest date reached
if (!operationStatus.oldestDateReached || fromDate < operationStatus.oldestDateReached) {
updateData.oldestDateReached = fromDate;
} else {
updateData.oldestDateReached = operationStatus.oldestDateReached;
}
// Set newest date reached
if (!operationStatus.newestDateReached || toDate > operationStatus.newestDateReached) {
updateData.newestDateReached = toDate;
} else {
updateData.newestDateReached = operationStatus.newestDateReached;
}
// Check if we're finished
// Only mark as finished if:
// 1. We got no data from the API (empty response)
// 2. We've been crawling for a while and consistently getting no new records
if (result.recordsSaved === 0 && result.recordsFetched === 0) {
// No data returned from API - we've reached the end
updateData.finished = true;
updateData.status = 'success';
logger.info(`Intraday crawl finished for ${symbol}.${exchange} - ${interval} (no more data available)`, {
symbol,
exchange,
interval,
oldestDate: updateData.oldestDateReached?.toISOString(),
newestDate: updateData.newestDateReached?.toISOString(),
totalDaysProcessed: updateData.totalDaysProcessed,
noDataReturned: true
});
} else if (result.recordsSaved === 0 && result.recordsFetched > 0) {
// Data was fetched but all records already exist - continue crawling
logger.info(`All ${result.recordsFetched} records already exist for ${symbol}.${exchange} - ${interval}, continuing crawl`, {
symbol,
exchange,
interval,
fromDate: fromDate.toISOString(),
toDate: toDate.toISOString(),
recordsFetched: result.recordsFetched
});
}
// Update operation tracker
logger.info(`Updating operation tracker for ${symbol}.${exchange} - ${interval}`, {
status: updateData.status,
recordCount: result.recordsSaved,
lastProcessedDate: updateData.lastProcessedDate,
finished: updateData.finished
});
await this.operationRegistry.updateOperation('eod', eodSearchCode, operationName, updateData);
logger.info(`Operation tracker updated for ${symbol}.${exchange} - ${interval}`);
// If not finished, schedule next batch
if (!updateData.finished) {
await this.scheduleOperation('crawl-intraday', {
eodSearchCode,
interval
}, {
priority: 3, // Continuation jobs get higher priority (3) than initial jobs (5)
attempts: 3,
backoff: {
type: 'exponential',
delay: 10000
},
delay: 5000 // Wait 5 seconds before next batch
});
logger.info(`Scheduled next intraday batch for ${symbol}.${exchange} - ${interval}`, {
symbol,
exchange,
interval,
currentBatchFrom: fromDate.toISOString(),
currentBatchTo: toDate.toISOString(),
recordsSaved: result.recordsSaved,
totalDaysProcessed: updateData.totalDaysProcessed
});
}
return {
success: true,
recordsProcessed: result.recordsSaved,
finished: updateData.finished
};
} catch (error) {
logger.error('Failed to crawl intraday data', { error, symbol, exchange, interval });
throw error;
}
}
export async function fetchIntraday(
this: EodHandler,
input: FetchIntradayInput
): Promise<{ success: boolean; recordsSaved: number; recordsFetched: number }> {
const logger = this.logger;
const { eodSearchCode, interval, fromDate, toDate } = input;
// Declare variables for catch block
let symbol: string = '';
let exchange: string = '';
try {
// Lookup symbol using eodSearchCode
const symbolDoc = await this.mongodb.collection('eodSymbols').findOne({
eodSearchCode: eodSearchCode
});
if (!symbolDoc) {
logger.error(`Symbol not found for eodSearchCode: ${eodSearchCode}`);
throw new Error(`Symbol not found: ${eodSearchCode}`);
}
symbol = symbolDoc.Code;
exchange = symbolDoc.eodExchange || symbolDoc.Exchange;
const country = symbolDoc.Country;
logger.info(`Fetching intraday data for ${symbol}.${exchange} - ${interval}`, {
symbol,
exchange,
interval,
from: fromDate?.toISOString().split('T')[0],
to: toDate?.toISOString().split('T')[0],
country,
url: `https://eodhd.com/api/intraday/${symbol}.${exchange}`
});
// Get API key
const apiKey = EOD_CONFIG.API_TOKEN;
if (!apiKey) {
throw new Error('EOD API key not configured');
}
// Build URL
// Note: 'exchange' parameter here is already the eodExchange from scheduling
const url = new URL(`https://eodhd.com/api/intraday/${symbol}.${exchange}`);
url.searchParams.append('api_token', apiKey);
url.searchParams.append('fmt', 'json');
url.searchParams.append('interval', interval);
// Add date range if provided
if (fromDate) {
url.searchParams.append('from', Math.floor(fromDate.getTime() / 1000).toString());
}
if (toDate) {
url.searchParams.append('to', Math.floor(toDate.getTime() / 1000).toString());
}
// Fetch data
const response = await fetch(url.toString());
if (!response.ok) {
throw new Error(`EOD Intraday API returned ${response.status}: ${response.statusText}`);
}
const data = await response.json();
// EOD returns an array of intraday bars
if (!Array.isArray(data)) {
throw new Error('Invalid response format from EOD API - expected array');
}
if (data.length === 0) {
logger.info('No intraday data returned', { symbol, exchange, interval });
return { success: true, recordsSaved: 0, recordsFetched: 0 };
}
logger.info(`Fetched ${data.length} intraday records for ${symbol}.${exchange} - ${interval}`);
// Add metadata to each record
const recordsWithMetadata = data.map(bar => ({
symbol,
exchange,
eodSearchCode,
datetime: bar.datetime,
timestamp: bar.timestamp,
gmtoffset: bar.gmtoffset,
open: bar.open,
high: bar.high,
low: bar.low,
close: bar.close,
volume: bar.volume,
source: 'eod'
}));
// Save to MongoDB - use timestamp and eodSearchCode as unique identifier
const collectionName = `eodIntraday${interval.toUpperCase()}`;
const result = await this.mongodb.batchUpsert(
collectionName,
recordsWithMetadata,
['timestamp', 'eodSearchCode']
);
logger.info(`Saved ${result.insertedCount} intraday records`, {
symbol,
exchange,
interval,
collection: collectionName
});
return {
success: true,
recordsSaved: result.insertedCount,
recordsFetched: data.length
};
} catch (error) {
logger.error('Failed to fetch intraday data', { error, symbol, exchange, interval });
throw error;
}
}