intraday test

This commit is contained in:
Boki 2025-07-10 08:02:40 -04:00
parent c24e551734
commit 18289f0a04
7 changed files with 122 additions and 34 deletions

View file

@ -24,23 +24,23 @@ export async function scheduleFetchCorporateActions(
// Use OperationTracker to find stale symbols for both dividends and splits // Use OperationTracker to find stale symbols for both dividends and splits
const allStaleDividends = await this.operationRegistry.getStaleSymbols('eod', 'dividends_update', { const allStaleDividends = await this.operationRegistry.getStaleSymbols('eod', 'dividends_update', {
limit: 2000 // Get more symbols to filter from limit: 50000 // Get all symbols needing update
}); });
const allStaleSplits = await this.operationRegistry.getStaleSymbols('eod', 'splits_update', { const allStaleSplits = await this.operationRegistry.getStaleSymbols('eod', 'splits_update', {
limit: 2000 // Get more symbols to filter from limit: 50000 // Get all symbols needing update
}); });
// Filter for Canadian exchanges and non-delisted symbols // Filter for Canadian exchanges and non-delisted symbols
const staleSymbolsDividends = allStaleDividends.filter(item => const staleSymbolsDividends = allStaleDividends.filter(item =>
canadianExchanges.includes(item.symbol.Exchange) && canadianExchanges.includes(item.symbol.Exchange) &&
item.symbol.delisted === false item.symbol.delisted === false
).slice(0, 500); ); // Remove the slice limit to process all symbols
const staleSymbolsSplits = allStaleSplits.filter(item => const staleSymbolsSplits = allStaleSplits.filter(item =>
canadianExchanges.includes(item.symbol.Exchange) && canadianExchanges.includes(item.symbol.Exchange) &&
item.symbol.delisted === false item.symbol.delisted === false
).slice(0, 500); ); // Remove the slice limit to process all symbols
if ((!staleSymbolsDividends || staleSymbolsDividends.length === 0) && if ((!staleSymbolsDividends || staleSymbolsDividends.length === 0) &&
(!staleSymbolsSplits || staleSymbolsSplits.length === 0)) { (!staleSymbolsSplits || staleSymbolsSplits.length === 0)) {

View file

@ -27,14 +27,14 @@ export async function scheduleFetchFundamentals(
// Use OperationTracker to find stale symbols // Use OperationTracker to find stale symbols
const allStaleSymbols = await this.operationRegistry.getStaleSymbols('eod', 'fundamentals_update', { const allStaleSymbols = await this.operationRegistry.getStaleSymbols('eod', 'fundamentals_update', {
limit: 5000 // Get more symbols to filter from limit: 50000 // Get all symbols needing update
}); });
// Filter for Canadian exchanges and non-delisted symbols // Filter for Canadian exchanges and non-delisted symbols
const staleSymbols = allStaleSymbols.filter(item => const staleSymbols = allStaleSymbols.filter(item =>
canadianExchanges.includes(item.symbol.Exchange) && canadianExchanges.includes(item.symbol.Exchange) &&
item.symbol.delisted === false item.symbol.delisted === false
).slice(0, 1000); // Limit to 1000 after filtering ); // Remove the slice limit to process all symbols
if (!staleSymbols || staleSymbols.length === 0) { if (!staleSymbols || staleSymbols.length === 0) {
logger.info('No symbols need fundamentals update'); logger.info('No symbols need fundamentals update');

View file

@ -48,12 +48,15 @@ export async function scheduleIntradayCrawl(
const operationName = operationNames[i]!; // Non-null assertion since we know the array has 3 elements const operationName = operationNames[i]!; // Non-null assertion since we know the array has 3 elements
const symbolsForInterval = await this.operationRegistry.getStaleSymbols('eod', operationName, { const symbolsForInterval = await this.operationRegistry.getStaleSymbols('eod', operationName, {
limit: 100 // Limit per interval limit: 1000, // Get more to filter
symbolFilter: { symbol: 'AAPL' } // Filter for AAPL only
}); });
// Filter out delisted symbols // Filter out delisted symbols and ensure we get AAPL with US exchange
const activeSymbols = symbolsForInterval.filter(item => const activeSymbols = symbolsForInterval.filter(item =>
item.symbol.delisted === false item.symbol.delisted === false &&
item.symbol.Code === 'AAPL' &&
(item.symbol.eodExchange === 'US' || item.symbol.Exchange === 'US')
); );
// Add interval info to each symbol // Add interval info to each symbol
@ -75,13 +78,18 @@ export async function scheduleIntradayCrawl(
logger.info(`Found ${allSymbolsForCrawl.length} symbol/interval combinations needing intraday data`, { logger.info(`Found ${allSymbolsForCrawl.length} symbol/interval combinations needing intraday data`, {
count: allSymbolsForCrawl.length, count: allSymbolsForCrawl.length,
byInterval: {
'1m': allSymbolsForCrawl.filter(s => s.interval === '1m').length,
'5m': allSymbolsForCrawl.filter(s => s.interval === '5m').length,
'1h': allSymbolsForCrawl.filter(s => s.interval === '1h').length
},
samples: allSymbolsForCrawl.slice(0, 5).map(s => ({ samples: allSymbolsForCrawl.slice(0, 5).map(s => ({
symbol: s.symbol.Code, symbol: s.symbol.Code,
exchange: s.symbol.eodExchange || s.symbol.Exchange, exchange: s.symbol.eodExchange || s.symbol.Exchange,
name: s.symbol.Name, name: s.symbol.Name,
interval: s.interval, interval: s.interval,
lastRun: s.lastRun, lastRun: s.lastRun ? new Date(s.lastRun).toISOString() : 'never',
lastSuccess: s.lastSuccess lastSuccess: s.lastSuccess ? new Date(s.lastSuccess).toISOString() : 'never'
})) }))
}); });
@ -97,6 +105,7 @@ export async function scheduleIntradayCrawl(
interval, interval,
country: symbol.Country country: symbol.Country
}, { }, {
priority: 5, // Initial crawl jobs get priority 5 (lower priority)
attempts: 3, attempts: 3,
backoff: { backoff: {
type: 'exponential', type: 'exponential',
@ -127,7 +136,12 @@ export async function crawlIntraday(
const { symbol, exchange, interval, country } = input; const { symbol, exchange, interval, country } = input;
try { try {
logger.info(`Starting intraday crawl for ${symbol}.${exchange} - ${interval}`); logger.info(`Starting intraday crawl for ${symbol}.${exchange} - ${interval}`, {
symbol,
exchange,
interval,
country
});
// Get symbol to check if it exists // Get symbol to check if it exists
const symbolDoc = await this.mongodb.collection('eodSymbols').findOne({ const symbolDoc = await this.mongodb.collection('eodSymbols').findOne({
@ -161,6 +175,19 @@ export async function crawlIntraday(
fromDate = new Date(toDate); fromDate = new Date(toDate);
fromDate.setDate(fromDate.getDate() - maxDays + 1); fromDate.setDate(fromDate.getDate() - maxDays + 1);
logger.info(`Fetching intraday batch for ${symbol}.${exchange} - ${interval} from ${fromDate.toISOString().split('T')[0]} to ${toDate.toISOString().split('T')[0]}`, {
symbol,
exchange,
interval,
fromDate: fromDate.toISOString(),
toDate: toDate.toISOString(),
maxDays,
crawlState: {
lastProcessedDate: crawlState.lastProcessedDate,
totalDaysProcessed: crawlState.totalDaysProcessed || 0
}
});
// Fetch data for this batch // Fetch data for this batch
const result = await fetchIntraday.call(this, { const result = await fetchIntraday.call(this, {
symbol, symbol,
@ -192,12 +219,14 @@ export async function crawlIntraday(
// Check if we're finished (no data returned means we've reached the end) // Check if we're finished (no data returned means we've reached the end)
if (result.recordsSaved === 0) { if (result.recordsSaved === 0) {
newState.finished = true; newState.finished = true;
logger.info(`Intraday crawl finished for ${symbol}.${exchange} - ${interval}`, { logger.info(`Intraday crawl finished for ${symbol}.${exchange} - ${interval} (${newState.oldestDateReached?.toISOString().split('T')[0]} to ${newState.newestDateReached?.toISOString().split('T')[0]})`, {
symbol, symbol,
exchange, exchange,
interval, interval,
oldestDate: newState.oldestDateReached, oldestDate: newState.oldestDateReached?.toISOString(),
newestDate: newState.newestDateReached, newestDate: newState.newestDateReached?.toISOString(),
totalDaysProcessed: newState.totalDaysProcessed,
noDataReturned: true
}); });
} }
@ -216,6 +245,7 @@ export async function crawlIntraday(
interval, interval,
country country
}, { }, {
priority: 3, // Continuation jobs get higher priority (3) than initial jobs (5)
attempts: 3, attempts: 3,
backoff: { backoff: {
type: 'exponential', type: 'exponential',
@ -224,11 +254,14 @@ export async function crawlIntraday(
delay: 5000 // Wait 5 seconds before next batch delay: 5000 // Wait 5 seconds before next batch
}); });
logger.info('Scheduled next intraday batch', { logger.info(`Scheduled next intraday batch for ${symbol}.${exchange} - ${interval}`, {
symbol, symbol,
exchange, exchange,
interval, interval,
nextFromDate: fromDate.toISOString() currentBatchFrom: fromDate.toISOString(),
currentBatchTo: toDate.toISOString(),
recordsSaved: result.recordsSaved,
totalDaysProcessed: newState.totalDaysProcessed
}); });
} }
@ -252,8 +285,13 @@ export async function fetchIntraday(
try { try {
logger.info(`Fetching intraday data for ${symbol}.${exchange} - ${interval}`, { logger.info(`Fetching intraday data for ${symbol}.${exchange} - ${interval}`, {
symbol,
exchange,
interval,
from: fromDate?.toISOString().split('T')[0], from: fromDate?.toISOString().split('T')[0],
to: toDate?.toISOString().split('T')[0] to: toDate?.toISOString().split('T')[0],
country,
url: `https://eodhd.com/api/intraday/${symbol}.${exchange}`
}); });
// Get country if not provided // Get country if not provided

View file

@ -18,7 +18,7 @@ export async function scheduleFetchPrices(
// Use OperationTracker to find stale symbols // Use OperationTracker to find stale symbols
const staleSymbols = await this.operationRegistry.getStaleSymbols('eod', 'price_update', { const staleSymbols = await this.operationRegistry.getStaleSymbols('eod', 'price_update', {
limit: 1000 // Process in batches to avoid overwhelming the system limit: 50000 // Higher limit to process all symbols
}); });
if (!staleSymbols || staleSymbols.length === 0) { if (!staleSymbols || staleSymbols.length === 0) {

View file

@ -39,16 +39,26 @@ import { createEODOperationRegistry } from './shared';
], ],
}) })
export class EodHandler extends BaseHandler<DataIngestionServices> { export class EodHandler extends BaseHandler<DataIngestionServices> {
public operationRegistry!: OperationRegistry; public operationRegistry: OperationRegistry;
constructor(services: any) { constructor(services: any) {
super(services); super(services);
}
// Use pre-initialized registry if available, otherwise create new one
async initialize(): Promise<void> { if ((EodHandler as any).operationRegistry) {
// Initialize operation registry this.operationRegistry = (EodHandler as any).operationRegistry;
this.operationRegistry = await createEODOperationRegistry(this.mongodb, this.logger); this.logger.info('Using pre-initialized EOD operation registry');
this.logger.info('EOD operation registry initialized'); } else {
// Fallback: Initialize operation registry with EOD provider
createEODOperationRegistry(this.mongodb, this.logger)
.then(registry => {
this.operationRegistry = registry;
this.logger.info('EOD operation registry initialized successfully');
})
.catch(error => {
this.logger.error('Failed to initialize EOD operations', { error });
});
}
} }
/** /**

View file

@ -14,11 +14,17 @@ import { WebShareHandler } from './webshare/webshare.handler';
import { TradingViewHandler } from './tradingview/tradingview.handler'; import { TradingViewHandler } from './tradingview/tradingview.handler';
import { TeHandler } from './te/te.handler'; import { TeHandler } from './te/te.handler';
import { EodHandler } from './eod/eod.handler'; import { EodHandler } from './eod/eod.handler';
import { createEODOperationRegistry } from './eod/shared';
import { createQMOperationRegistry } from './qm/shared/operation-provider';
// Add more handler imports as needed // Add more handler imports as needed
const logger = getLogger('handler-init'); const logger = getLogger('handler-init');
// Global operation registries that handlers will reference
let eodOperationRegistry: any;
let qmOperationRegistry: any;
/** /**
* Initialize and register all handlers * Initialize and register all handlers
* Note: The actual registration is now handled by the HandlerScanner in the DI container * Note: The actual registration is now handled by the HandlerScanner in the DI container
@ -26,6 +32,33 @@ const logger = getLogger('handler-init');
*/ */
export async function initializeAllHandlers(serviceContainer: IServiceContainer): Promise<void> { export async function initializeAllHandlers(serviceContainer: IServiceContainer): Promise<void> {
try { try {
// Initialize operation registries first
logger.info('Initializing operation registries');
const mongodb = serviceContainer.mongodb;
const operationLogger = serviceContainer.logger;
// Create operation registries
try {
eodOperationRegistry = await createEODOperationRegistry(mongodb, operationLogger);
logger.info('EOD operation registry created');
// Attach to handler class so instances can access it
(EodHandler as any).operationRegistry = eodOperationRegistry;
} catch (error) {
logger.error('Failed to create EOD operation registry', { error });
}
try {
qmOperationRegistry = await createQMOperationRegistry(mongodb, operationLogger);
logger.info('QM operation registry created');
// Attach to handler class so instances can access it
(QMHandler as any).operationRegistry = qmOperationRegistry;
} catch (error) {
logger.error('Failed to create QM operation registry', { error });
}
// The HandlerScanner in the DI container will handle the actual registration // The HandlerScanner in the DI container will handle the actual registration
// We just need to ensure handlers are imported so their decorators run // We just need to ensure handlers are imported so their decorators run
@ -69,6 +102,7 @@ export async function initializeAllHandlers(serviceContainer: IServiceContainer)
registeredHandlers: handlerRegistry.getHandlerNames(), registeredHandlers: handlerRegistry.getHandlerNames(),
handlersWithSchedule: handlerRegistry.getAllHandlersWithSchedule().size, handlersWithSchedule: handlerRegistry.getAllHandlersWithSchedule().size,
}); });
} }
} else { } else {
logger.error('Could not access DI container from service container'); logger.error('Could not access DI container from service container');

View file

@ -40,14 +40,20 @@ export class QMHandler extends BaseHandler<DataIngestionServices> {
constructor(services: any) { constructor(services: any) {
super(services); // Handler name read from @Handler decorator super(services); // Handler name read from @Handler decorator
// Initialize operation registry with QM provider // Use pre-initialized registry if available, otherwise create new one
createQMOperationRegistry(this.mongodb, this.logger) if ((QMHandler as any).operationRegistry) {
.then(registry => { this.operationRegistry = (QMHandler as any).operationRegistry;
this.operationRegistry = registry; this.logger.info('Using pre-initialized QM operation registry');
}) } else {
.catch(error => { // Fallback: Initialize operation registry with QM provider
this.logger.error('Failed to initialize QM operations', { error }); createQMOperationRegistry(this.mongodb, this.logger)
}); .then(registry => {
this.operationRegistry = registry;
})
.catch(error => {
this.logger.error('Failed to initialize QM operations', { error });
});
}
} }
/** /**