intra day work

This commit is contained in:
Boki 2025-07-14 11:47:08 -04:00
parent e341cc0226
commit 4705550359
5 changed files with 138 additions and 49 deletions

View file

@ -14,6 +14,11 @@ interface CrawlIntradayInput {
interval: '1m' | '5m' | '1h'; interval: '1m' | '5m' | '1h';
} }
interface ScheduleIntradayConfig {
exchanges?: string[]; // Optional: limit to specific exchanges
symbolTypes?: string[]; // Optional: limit to specific symbol types (e.g., ['Common Stock'])
limit?: number; // Optional: limit total number of symbols to process
}
// Max days per interval based on EOD limits // Max days per interval based on EOD limits
const MAX_DAYS_PER_INTERVAL = { const MAX_DAYS_PER_INTERVAL = {
@ -22,13 +27,34 @@ const MAX_DAYS_PER_INTERVAL = {
'1h': 7200 '1h': 7200
}; };
// Default exchanges to process for intraday data
const DEFAULT_INTRADAY_EXCHANGES = ['US', 'TO', 'V', 'CN', 'NEO'];
export async function scheduleIntradayCrawl( export async function scheduleIntradayCrawl(
this: EodHandler this: EodHandler,
config?: ScheduleIntradayConfig
): Promise<{ success: boolean; jobsScheduled: number }> { ): Promise<{ success: boolean; jobsScheduled: number }> {
const logger = this.logger; const logger = this.logger;
try { try {
logger.info('Scheduling intraday crawl jobs'); logger.info('Scheduling intraday crawl jobs', {
config: {
exchanges: config?.exchanges || DEFAULT_INTRADAY_EXCHANGES,
symbolTypes: config?.symbolTypes || 'all',
limit: config?.limit || 'unlimited'
}
});
// Check if operation registry is initialized
if (!this.operationRegistry) {
logger.error('Operation registry not initialized!');
return { success: false, jobsScheduled: 0 };
}
logger.debug('Operation registry status:', {
hasRegistry: !!this.operationRegistry,
hasProvider: this.operationRegistry.hasProvider ? this.operationRegistry.hasProvider('eod') : 'no hasProvider method'
});
// Use OperationTracker to find symbols needing intraday crawl // Use OperationTracker to find symbols needing intraday crawl
const intervals: Array<'1m' | '5m' | '1h'> = ['1m', '5m', '1h']; const intervals: Array<'1m' | '5m' | '1h'> = ['1m', '5m', '1h'];
@ -41,31 +67,55 @@ export async function scheduleIntradayCrawl(
const interval = intervals[i]; const interval = intervals[i];
const operationName = operationNames[i]!; // Non-null assertion since we know the array has 3 elements const operationName = operationNames[i]!; // Non-null assertion since we know the array has 3 elements
// For intraday, we want to check even finished crawls for new data // Use getStaleSymbols to find symbols needing intraday updates
// So we'll query the symbols directly logger.debug(`Getting stale symbols for ${operationName}...`);
const symbolsForInterval = await this.mongodb.collection('eodSymbols').find({
eodSearchCode: 'AAPL.US',
delisted: false
}).toArray();
// Add interval info to each symbol // Get symbols with all filters applied at the database level
symbolsForInterval.forEach((symbol: any) => { const targetExchanges = config?.exchanges || DEFAULT_INTRADAY_EXCHANGES;
// Check if this interval needs processing (not finished or needs new data) const desiredLimit = config?.limit || 5000;
const operationStatus = symbol.operations?.[operationName];
const shouldProcess = !operationStatus || !operationStatus.finished || const staleSymbols = await this.operationRegistry.getStaleSymbols('eod', operationName, {
(operationStatus.newestDateReached && limit: desiredLimit,
new Date(operationStatus.newestDateReached) < new Date(Date.now() - 24 * 60 * 60 * 1000)); // Check if newest date is more than 1 day old exchanges: targetExchanges,
delisted: false
});
logger.debug(`getStaleSymbols returned ${staleSymbols.length} symbols for ${operationName}`);
// Process the returned symbols
for (const staleSymbol of staleSymbols) {
const { symbol, operations } = staleSymbol;
const operationStatus = operations?.[operationName];
if (shouldProcess) { // Apply symbol type filter if specified (this is the only filter not in the DB query)
if (config?.symbolTypes && !config.symbolTypes.includes(symbol.Type)) {
logger.debug(`Skipping ${symbol.Code} - type ${symbol.Type} not in filter`);
continue;
}
// Include symbols that:
// 1. Have never been processed (!operationStatus)
// 2. Are not finished (!operationStatus.finished)
// 3. Are finished but need new data (newest date > 1 day old)
const needsNewData = operationStatus?.finished && operationStatus?.newestDateReached &&
new Date(operationStatus.newestDateReached) < new Date(Date.now() - 24 * 60 * 60 * 1000);
if (!operationStatus || !operationStatus.finished || needsNewData) {
allSymbolsForCrawl.push({ allSymbolsForCrawl.push({
symbol: symbol, symbol: symbol,
interval: interval, interval: interval,
operationName: operationName, operationName: operationName,
lastRun: operationStatus?.lastRunAt, lastRun: staleSymbol.lastRun,
lastSuccess: operationStatus?.lastSuccessAt lastSuccess: staleSymbol.lastSuccess
});
logger.debug(`Added ${symbol.Code}.${symbol.Exchange} for ${interval} crawl`, {
hasOperation: !!operationStatus,
finished: operationStatus?.finished,
needsNewData
}); });
} }
}); }
} }
if (!allSymbolsForCrawl || allSymbolsForCrawl.length === 0) { if (!allSymbolsForCrawl || allSymbolsForCrawl.length === 0) {
@ -80,10 +130,11 @@ export async function scheduleIntradayCrawl(
'5m': allSymbolsForCrawl.filter(s => s.interval === '5m').length, '5m': allSymbolsForCrawl.filter(s => s.interval === '5m').length,
'1h': allSymbolsForCrawl.filter(s => s.interval === '1h').length '1h': allSymbolsForCrawl.filter(s => s.interval === '1h').length
}, },
samples: allSymbolsForCrawl.slice(0, 5).map(s => ({ symbols: allSymbolsForCrawl.map(s => ({
symbol: s.symbol.Code, symbol: s.symbol.Code,
exchange: s.symbol.eodExchange || s.symbol.Exchange, exchange: s.symbol.Exchange,
name: s.symbol.Name, name: s.symbol.Name,
eodSearchCode: s.symbol.eodSearchCode,
interval: s.interval, interval: s.interval,
lastRun: s.lastRun ? new Date(s.lastRun).toISOString() : 'never', lastRun: s.lastRun ? new Date(s.lastRun).toISOString() : 'never',
lastSuccess: s.lastSuccess ? new Date(s.lastSuccess).toISOString() : 'never' lastSuccess: s.lastSuccess ? new Date(s.lastSuccess).toISOString() : 'never'
@ -96,19 +147,29 @@ export async function scheduleIntradayCrawl(
for (const item of allSymbolsForCrawl) { for (const item of allSymbolsForCrawl) {
const { symbol, interval } = item; const { symbol, interval } = item;
await this.scheduleOperation('crawl-intraday', { const jobId = `crawl-intraday-${symbol.eodSearchCode}-${interval}`;
eodSearchCode: symbol.eodSearchCode, try {
interval await this.scheduleOperation('crawl-intraday', {
}, { eodSearchCode: symbol.eodSearchCode,
priority: 5, // Initial crawl jobs get priority 5 (lower priority) interval
attempts: 3, }, {
backoff: { jobId,
type: 'exponential', priority: 5, // Initial crawl jobs get priority 5 (lower priority)
delay: 10000 attempts: 3,
}, backoff: {
delay: jobsScheduled * 500 // Stagger jobs by 500ms type: 'exponential',
}); delay: 10000
jobsScheduled++; },
delay: jobsScheduled * 500 // Stagger jobs by 500ms
});
jobsScheduled++;
} catch (error: any) {
if (error?.message?.includes('Job already exists')) {
logger.debug(`Job already exists: ${jobId}`);
} else {
throw error;
}
}
} }
logger.info(`Successfully scheduled ${jobsScheduled} intraday crawl jobs`); logger.info(`Successfully scheduled ${jobsScheduled} intraday crawl jobs`);
@ -277,6 +338,7 @@ export async function crawlIntraday(
eodSearchCode, eodSearchCode,
interval interval
}, { }, {
jobId: `crawl-intraday-${eodSearchCode}-${interval}`,
priority: 3, // Continuation jobs get higher priority (3) than initial jobs (5) priority: 3, // Continuation jobs get higher priority (3) than initial jobs (5)
attempts: 3, attempts: 3,
backoff: { backoff: {

View file

@ -42,23 +42,34 @@ export async function scheduleFetchPrices(
continue; continue;
} }
const { symbol } = staleSymbol; const { symbol } = staleSymbol;
const jobId = `fetch-prices-${symbol.eodSearchCode}`;
logger.debug(`Scheduling price fetch for ${symbol.Code}.${symbol.Exchange}`, { logger.debug(`Scheduling price fetch for ${symbol.Code}.${symbol.Exchange}`, {
name: symbol.Name, name: symbol.Name,
lastUpdate: staleSymbol.lastRun, lastUpdate: staleSymbol.lastRun,
delay: i * 100 delay: i * 100,
jobId
}); });
await this.scheduleOperation('fetch-prices', { try {
eodSearchCode: symbol.eodSearchCode await this.scheduleOperation('fetch-prices', {
}, { eodSearchCode: symbol.eodSearchCode
attempts: 3, }, {
backoff: { jobId,
type: 'exponential', attempts: 3,
delay: 5000 backoff: {
}, type: 'exponential',
delay: i * 100 // Stagger jobs by 100ms per symbol to avoid rate limit spikes delay: 5000
}); },
jobsScheduled++; delay: i * 100 // Stagger jobs by 100ms per symbol to avoid rate limit spikes
});
jobsScheduled++;
} catch (error: any) {
if (error?.message?.includes('Job already exists')) {
logger.debug(`Job already exists: ${jobId}`);
} else {
throw error;
}
}
} }
logger.info(`Successfully scheduled ${jobsScheduled} price fetch jobs`); logger.info(`Successfully scheduled ${jobsScheduled} price fetch jobs`);

View file

@ -34,7 +34,7 @@ import { createEODOperationRegistry } from './shared';
@Handler('eod') @Handler('eod')
@RateLimit({ @RateLimit({
limits: [ limits: [
{ points: 500, duration: 60 }, // 1000 points per minute { points: 900, duration: 60 }, // 1000 points per minute
{ points: 100500, duration: 86400 }, // 100,500 points per day { points: 100500, duration: 86400 }, // 100,500 points per day
], ],
}) })
@ -110,7 +110,7 @@ export class EodHandler extends BaseHandler<DataIngestionServices> {
*/ */
@Operation('schedule-intraday-crawl') @Operation('schedule-intraday-crawl')
@ScheduledOperation('schedule-intraday-crawl', '0 3 * * *', { @ScheduledOperation('schedule-intraday-crawl', '0 3 * * *', {
// immediately: true, immediately: true,
}) })
@RateLimit(1) // 1 point for scheduling @RateLimit(1) // 1 point for scheduling
scheduleIntradayCrawl = scheduleIntradayCrawl; scheduleIntradayCrawl = scheduleIntradayCrawl;

View file

@ -356,7 +356,9 @@ export class OperationTracker {
limit = 1000, limit = 1000,
excludeSymbols = [], excludeSymbols = [],
activeOnly = true, activeOnly = true,
symbolFilter symbolFilter,
exchanges,
delisted
} = options; } = options;
this.provider.validateOperation(operationName); this.provider.validateOperation(operationName);
@ -389,6 +391,16 @@ export class OperationTracker {
filter[symbolField] = symbolFilter.symbol; filter[symbolField] = symbolFilter.symbol;
} }
// Add exchange filter if provided
if (exchanges && exchanges.length > 0) {
filter.Exchange = { $in: exchanges };
}
// Add delisted filter if provided
if (delisted !== undefined) {
filter.delisted = delisted;
}
const symbols = await this.mongodb.find(collectionName, filter, { const symbols = await this.mongodb.find(collectionName, filter, {
limit, limit,
projection: { }, // Return all fields projection: { }, // Return all fields

View file

@ -124,6 +124,10 @@ export interface StaleSymbolOptions {
activeOnly?: boolean; activeOnly?: boolean;
/** Filter for specific symbols */ /** Filter for specific symbols */
symbolFilter?: { symbol?: string }; symbolFilter?: { symbol?: string };
/** Filter by exchanges */
exchanges?: string[];
/** Filter by delisted status */
delisted?: boolean;
} }
/** /**