added untested intraday
This commit is contained in:
parent
0b63690500
commit
8630852dba
3 changed files with 355 additions and 1 deletions
|
|
@ -1,3 +1,4 @@
|
||||||
export { fetchExchanges } from './exchanges';
|
export { fetchExchanges } from './exchanges';
|
||||||
export { fetchSymbols, scheduleFetchSymbols } from './symbols';
|
export { fetchSymbols, scheduleFetchSymbols } from './symbols';
|
||||||
export { fetchPrices, scheduleFetchPrices } from './prices';
|
export { fetchPrices, scheduleFetchPrices } from './prices';
|
||||||
|
export { fetchIntraday, crawlIntraday, scheduleIntradayCrawl } from './intraday';
|
||||||
|
|
|
||||||
325
apps/stock/data-ingestion/src/handlers/eod/actions/intraday.ts
Normal file
325
apps/stock/data-ingestion/src/handlers/eod/actions/intraday.ts
Normal file
|
|
@ -0,0 +1,325 @@
|
||||||
|
import type { BaseHandler } from '@stock-bot/handlers';
|
||||||
|
import type { DataIngestionServices } from '../../../types';
|
||||||
|
import { EOD_CONFIG } from '../shared';
|
||||||
|
|
||||||
|
interface FetchIntradayInput {
|
||||||
|
symbol: string;
|
||||||
|
exchange: string;
|
||||||
|
interval: '1m' | '5m' | '1h';
|
||||||
|
fromDate?: Date;
|
||||||
|
toDate?: Date;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface CrawlIntradayInput {
|
||||||
|
symbol: string;
|
||||||
|
exchange: string;
|
||||||
|
interval: '1m' | '5m' | '1h';
|
||||||
|
}
|
||||||
|
|
||||||
|
interface CrawlState {
|
||||||
|
finished: boolean;
|
||||||
|
oldestDateReached?: Date;
|
||||||
|
newestDateReached?: Date;
|
||||||
|
lastProcessedDate?: Date;
|
||||||
|
totalRecordsProcessed?: number;
|
||||||
|
totalBatchesProcessed?: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Max days per interval based on EOD limits
|
||||||
|
const MAX_DAYS_PER_INTERVAL = {
|
||||||
|
'1m': 120,
|
||||||
|
'5m': 600,
|
||||||
|
'1h': 7200
|
||||||
|
};
|
||||||
|
|
||||||
|
export async function scheduleIntradayCrawl(
|
||||||
|
this: BaseHandler<DataIngestionServices>
|
||||||
|
): Promise<{ success: boolean; jobsScheduled: number }> {
|
||||||
|
const logger = this.logger;
|
||||||
|
|
||||||
|
try {
|
||||||
|
logger.info('Scheduling intraday crawl jobs');
|
||||||
|
|
||||||
|
// Get Canadian exchanges for now
|
||||||
|
const canadianExchanges = ['TO', 'V', 'CN', 'NEO'];
|
||||||
|
|
||||||
|
// Find active symbols that need intraday data
|
||||||
|
const symbols = await this.mongodb.collection('eodSymbols').find({
|
||||||
|
Exchange: { $in: canadianExchanges },
|
||||||
|
delisted: false,
|
||||||
|
// Only symbols without complete intraday data
|
||||||
|
$or: [
|
||||||
|
{ 'intradayState.1m.finished': { $ne: true } },
|
||||||
|
{ 'intradayState.5m.finished': { $ne: true } },
|
||||||
|
{ 'intradayState.1h.finished': { $ne: true } },
|
||||||
|
{ 'intradayState': { $exists: false } }
|
||||||
|
]
|
||||||
|
}).limit(100).toArray(); // Limit to avoid too many jobs at once
|
||||||
|
|
||||||
|
if (!symbols || symbols.length === 0) {
|
||||||
|
logger.info('No symbols need intraday crawl');
|
||||||
|
return { success: true, jobsScheduled: 0 };
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info(`Found ${symbols.length} symbols needing intraday data`);
|
||||||
|
|
||||||
|
let jobsScheduled = 0;
|
||||||
|
const intervals: Array<'1m' | '5m' | '1h'> = ['1m', '5m', '1h'];
|
||||||
|
|
||||||
|
// Schedule crawl jobs for each symbol and interval
|
||||||
|
for (const symbol of symbols) {
|
||||||
|
for (const interval of intervals) {
|
||||||
|
// Check if this interval is already finished
|
||||||
|
const isFinished = symbol.intradayState?.[interval]?.finished;
|
||||||
|
if (isFinished) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.scheduleOperation('crawl-intraday', {
|
||||||
|
symbol: symbol.Code,
|
||||||
|
exchange: symbol.Exchange,
|
||||||
|
interval
|
||||||
|
}, {
|
||||||
|
attempts: 3,
|
||||||
|
backoff: {
|
||||||
|
type: 'exponential',
|
||||||
|
delay: 10000
|
||||||
|
},
|
||||||
|
delay: jobsScheduled * 500 // Stagger jobs by 500ms
|
||||||
|
});
|
||||||
|
jobsScheduled++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info(`Successfully scheduled ${jobsScheduled} intraday crawl jobs`);
|
||||||
|
|
||||||
|
return {
|
||||||
|
success: true,
|
||||||
|
jobsScheduled
|
||||||
|
};
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('Failed to schedule intraday crawl jobs', { error });
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function crawlIntraday(
|
||||||
|
this: BaseHandler<DataIngestionServices>,
|
||||||
|
input: CrawlIntradayInput
|
||||||
|
): Promise<{ success: boolean; recordsProcessed: number; finished: boolean }> {
|
||||||
|
const logger = this.logger;
|
||||||
|
const { symbol, exchange, interval } = input;
|
||||||
|
|
||||||
|
try {
|
||||||
|
logger.info('Starting intraday crawl', { symbol, exchange, interval });
|
||||||
|
|
||||||
|
// Get current crawl state
|
||||||
|
const symbolDoc = await this.mongodb.collection('eodSymbols').findOne({
|
||||||
|
Code: symbol,
|
||||||
|
Exchange: exchange
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!symbolDoc) {
|
||||||
|
throw new Error(`Symbol ${symbol}.${exchange} not found`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const crawlState: CrawlState = symbolDoc.intradayState?.[interval] || {
|
||||||
|
finished: false
|
||||||
|
};
|
||||||
|
|
||||||
|
// Determine date range for this batch
|
||||||
|
const maxDays = MAX_DAYS_PER_INTERVAL[interval];
|
||||||
|
let toDate = new Date();
|
||||||
|
let fromDate = new Date();
|
||||||
|
|
||||||
|
if (crawlState.lastProcessedDate) {
|
||||||
|
// Continue from where we left off
|
||||||
|
toDate = new Date(crawlState.lastProcessedDate);
|
||||||
|
toDate.setDate(toDate.getDate() - 1); // Start from day before last processed
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculate from date (going backwards)
|
||||||
|
fromDate = new Date(toDate);
|
||||||
|
fromDate.setDate(fromDate.getDate() - maxDays + 1);
|
||||||
|
|
||||||
|
// Fetch data for this batch
|
||||||
|
const result = await fetchIntraday.call(this, {
|
||||||
|
symbol,
|
||||||
|
exchange,
|
||||||
|
interval,
|
||||||
|
fromDate,
|
||||||
|
toDate
|
||||||
|
});
|
||||||
|
|
||||||
|
// Update crawl state
|
||||||
|
const newState: CrawlState = {
|
||||||
|
...crawlState,
|
||||||
|
lastProcessedDate: fromDate,
|
||||||
|
totalRecordsProcessed: (crawlState.totalRecordsProcessed || 0) + result.recordsSaved,
|
||||||
|
totalBatchesProcessed: (crawlState.totalBatchesProcessed || 0) + 1
|
||||||
|
};
|
||||||
|
|
||||||
|
// Set oldest date reached
|
||||||
|
if (!newState.oldestDateReached || fromDate < newState.oldestDateReached) {
|
||||||
|
newState.oldestDateReached = fromDate;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set newest date reached
|
||||||
|
if (!newState.newestDateReached || toDate > newState.newestDateReached) {
|
||||||
|
newState.newestDateReached = toDate;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if we're finished (no data returned means we've reached the end)
|
||||||
|
if (result.recordsSaved === 0) {
|
||||||
|
newState.finished = true;
|
||||||
|
logger.info('Intraday crawl finished - no more data', {
|
||||||
|
symbol,
|
||||||
|
exchange,
|
||||||
|
interval,
|
||||||
|
totalRecords: newState.totalRecordsProcessed
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update symbol with new crawl state
|
||||||
|
await this.mongodb.collection('eodSymbols').updateOne(
|
||||||
|
{ Code: symbol, Exchange: exchange },
|
||||||
|
{
|
||||||
|
$set: {
|
||||||
|
[`intradayState.${interval}`]: newState,
|
||||||
|
lastIntradayUpdate: new Date()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
// If not finished, schedule next batch
|
||||||
|
if (!newState.finished) {
|
||||||
|
await this.scheduleOperation('crawl-intraday', {
|
||||||
|
symbol,
|
||||||
|
exchange,
|
||||||
|
interval
|
||||||
|
}, {
|
||||||
|
attempts: 3,
|
||||||
|
backoff: {
|
||||||
|
type: 'exponential',
|
||||||
|
delay: 10000
|
||||||
|
},
|
||||||
|
delay: 5000 // Wait 5 seconds before next batch
|
||||||
|
});
|
||||||
|
|
||||||
|
logger.info('Scheduled next intraday batch', {
|
||||||
|
symbol,
|
||||||
|
exchange,
|
||||||
|
interval,
|
||||||
|
nextFromDate: fromDate.toISOString()
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
success: true,
|
||||||
|
recordsProcessed: result.recordsSaved,
|
||||||
|
finished: newState.finished
|
||||||
|
};
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('Failed to crawl intraday data', { error, symbol, exchange, interval });
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function fetchIntraday(
|
||||||
|
this: BaseHandler<DataIngestionServices>,
|
||||||
|
input: FetchIntradayInput
|
||||||
|
): Promise<{ success: boolean; recordsSaved: number }> {
|
||||||
|
const logger = this.logger;
|
||||||
|
const { symbol, exchange, interval, fromDate, toDate } = input;
|
||||||
|
|
||||||
|
try {
|
||||||
|
logger.info('Fetching intraday data', {
|
||||||
|
symbol,
|
||||||
|
exchange,
|
||||||
|
interval,
|
||||||
|
from: fromDate?.toISOString(),
|
||||||
|
to: toDate?.toISOString()
|
||||||
|
});
|
||||||
|
|
||||||
|
// Get API key
|
||||||
|
const apiKey = EOD_CONFIG.API_TOKEN;
|
||||||
|
if (!apiKey) {
|
||||||
|
throw new Error('EOD API key not configured');
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build URL
|
||||||
|
const url = new URL(`https://eodhd.com/api/intraday/${symbol}.${exchange}`);
|
||||||
|
url.searchParams.append('api_token', apiKey);
|
||||||
|
url.searchParams.append('fmt', 'json');
|
||||||
|
url.searchParams.append('interval', interval);
|
||||||
|
|
||||||
|
// Add date range if provided
|
||||||
|
if (fromDate) {
|
||||||
|
url.searchParams.append('from', Math.floor(fromDate.getTime() / 1000).toString());
|
||||||
|
}
|
||||||
|
if (toDate) {
|
||||||
|
url.searchParams.append('to', Math.floor(toDate.getTime() / 1000).toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch data
|
||||||
|
const response = await fetch(url.toString());
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
throw new Error(`EOD Intraday API returned ${response.status}: ${response.statusText}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const data = await response.json();
|
||||||
|
|
||||||
|
// EOD returns an array of intraday bars
|
||||||
|
if (!Array.isArray(data)) {
|
||||||
|
throw new Error('Invalid response format from EOD API - expected array');
|
||||||
|
}
|
||||||
|
|
||||||
|
if (data.length === 0) {
|
||||||
|
logger.info('No intraday data returned', { symbol, exchange, interval });
|
||||||
|
return { success: true, recordsSaved: 0 };
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info(`Fetched ${data.length} intraday records`, { symbol, exchange, interval });
|
||||||
|
|
||||||
|
// Add metadata to each record
|
||||||
|
const recordsWithMetadata = data.map(bar => ({
|
||||||
|
symbol,
|
||||||
|
exchange,
|
||||||
|
symbolExchange: `${symbol}.${exchange}`,
|
||||||
|
interval,
|
||||||
|
datetime: bar.datetime,
|
||||||
|
timestamp: bar.timestamp,
|
||||||
|
gmtoffset: bar.gmtoffset,
|
||||||
|
open: bar.open,
|
||||||
|
high: bar.high,
|
||||||
|
low: bar.low,
|
||||||
|
close: bar.close,
|
||||||
|
volume: bar.volume,
|
||||||
|
source: 'eod'
|
||||||
|
}));
|
||||||
|
|
||||||
|
// Save to MongoDB - use timestamp, symbol, and interval as unique identifier
|
||||||
|
const collectionName = `eodIntraday${interval.toUpperCase()}`;
|
||||||
|
const result = await this.mongodb.batchUpsert(
|
||||||
|
collectionName,
|
||||||
|
recordsWithMetadata,
|
||||||
|
['timestamp', 'symbolExchange', 'interval']
|
||||||
|
);
|
||||||
|
|
||||||
|
logger.info(`Saved ${result.insertedCount} intraday records`, {
|
||||||
|
symbol,
|
||||||
|
exchange,
|
||||||
|
interval,
|
||||||
|
collection: collectionName
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
success: true,
|
||||||
|
recordsSaved: result.insertedCount
|
||||||
|
};
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('Failed to fetch intraday data', { error, symbol, exchange, interval });
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -11,7 +11,10 @@ import {
|
||||||
fetchSymbols,
|
fetchSymbols,
|
||||||
scheduleFetchSymbols,
|
scheduleFetchSymbols,
|
||||||
fetchPrices,
|
fetchPrices,
|
||||||
scheduleFetchPrices
|
scheduleFetchPrices,
|
||||||
|
fetchIntraday,
|
||||||
|
crawlIntraday,
|
||||||
|
scheduleIntradayCrawl
|
||||||
} from './actions';
|
} from './actions';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -74,4 +77,29 @@ export class EodHandler extends BaseHandler<DataIngestionServices> {
|
||||||
@Operation('fetch-prices')
|
@Operation('fetch-prices')
|
||||||
@RateLimit(10) // 10 points per price fetch
|
@RateLimit(10) // 10 points per price fetch
|
||||||
fetchPrices = fetchPrices;
|
fetchPrices = fetchPrices;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Schedule intraday crawl for symbols needing intraday data
|
||||||
|
* Runs daily at 3 AM
|
||||||
|
*/
|
||||||
|
@Operation('schedule-intraday-crawl')
|
||||||
|
@ScheduledOperation('schedule-intraday-crawl', '0 3 * * *')
|
||||||
|
@RateLimit(1) // 1 point for scheduling
|
||||||
|
scheduleIntradayCrawl = scheduleIntradayCrawl;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Crawl intraday data for a specific symbol and interval
|
||||||
|
* Handles resumption and batch processing
|
||||||
|
*/
|
||||||
|
@Operation('crawl-intraday')
|
||||||
|
@RateLimit(50) // 50 points per crawl batch
|
||||||
|
crawlIntraday = crawlIntraday;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetch intraday data for a specific date range
|
||||||
|
* Called by crawl-intraday
|
||||||
|
*/
|
||||||
|
@Operation('fetch-intraday')
|
||||||
|
@RateLimit(50) // 50 points per intraday fetch
|
||||||
|
fetchIntraday = fetchIntraday;
|
||||||
}
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue