work on refactoring operation tracker

This commit is contained in:
Boki 2025-07-10 00:19:18 -04:00
parent cbf002a31a
commit c24e551734
11 changed files with 121 additions and 145 deletions

View file

@ -1,8 +1,7 @@
import type { BaseHandler } from '@stock-bot/handlers';
import type { DataIngestionServices } from '../../../types'; import type { CrawlState } from '../../../shared/operation-manager/types';
import type { EodHandler } from '../eod.handler'; import type { EodHandler } from '../eod.handler';
import { EOD_CONFIG } from '../shared'; import { EOD_CONFIG } from '../shared';
import { getEodExchangeSuffix } from '../shared/utils';
interface FetchIntradayInput { interface FetchIntradayInput {
symbol: string; symbol: string;
@ -20,14 +19,7 @@ interface CrawlIntradayInput {
country?: string; country?: string;
} }
interface CrawlState { // CrawlState is imported from operation-manager types
finished: boolean;
oldestDateReached?: Date;
newestDateReached?: Date;
lastProcessedDate?: Date;
totalRecordsProcessed?: number;
totalBatchesProcessed?: number;
}
// Max days per interval based on EOD limits // Max days per interval based on EOD limits
const MAX_DAYS_PER_INTERVAL = { const MAX_DAYS_PER_INTERVAL = {
@ -44,37 +36,34 @@ export async function scheduleIntradayCrawl(
try { try {
logger.info('Scheduling intraday crawl jobs'); logger.info('Scheduling intraday crawl jobs');
// Get Canadian exchanges for now
const canadianExchanges = ['TO', 'V', 'CN', 'NEO'];
// Use OperationTracker to find symbols needing intraday crawl // Use OperationTracker to find symbols needing intraday crawl
const intervals: Array<'1m' | '5m' | '1h'> = ['1m', '5m', '1h']; const intervals: Array<'1m' | '5m' | '1h'> = ['1m', '5m', '1h'];
const operationNames = ['intraday_1m', 'intraday_5m', 'intraday_1h']; const operationNames: string[] = ['intraday_1m', 'intraday_5m', 'intraday_1h'];
let allSymbolsForCrawl: any[] = []; const allSymbolsForCrawl: any[] = [];
// Get symbols needing crawl for each interval // Get symbols needing crawl for each interval
for (let i = 0; i < intervals.length; i++) { for (let i = 0; i < intervals.length; i++) {
const interval = intervals[i]; const interval = intervals[i];
const operationName = operationNames[i]; const operationName = operationNames[i]!; // Non-null assertion since we know the array has 3 elements
const allSymbolsForInterval = await this.operationRegistry.getSymbolsForIntradayCrawl('eod', operationName, { const symbolsForInterval = await this.operationRegistry.getStaleSymbols('eod', operationName, {
limit: 500 // Get more symbols to filter from limit: 100 // Limit per interval
}); });
// Filter for Canadian exchanges and non-delisted symbols // Filter out delisted symbols
const symbolsForInterval = allSymbolsForInterval.filter(item => const activeSymbols = symbolsForInterval.filter(item =>
canadianExchanges.includes(item.symbol.Exchange) &&
item.symbol.delisted === false item.symbol.delisted === false
).slice(0, 100); );
// Add interval info to each symbol // Add interval info to each symbol
symbolsForInterval.forEach(item => { activeSymbols.forEach(item => {
allSymbolsForCrawl.push({ allSymbolsForCrawl.push({
symbol: item.symbol, symbol: item.symbol,
interval: interval, interval: interval,
operationName: operationName, operationName: operationName,
crawlState: item.crawlState lastRun: item.lastRun,
lastSuccess: item.lastSuccess
}); });
}); });
} }
@ -88,10 +77,11 @@ export async function scheduleIntradayCrawl(
count: allSymbolsForCrawl.length, count: allSymbolsForCrawl.length,
samples: allSymbolsForCrawl.slice(0, 5).map(s => ({ samples: allSymbolsForCrawl.slice(0, 5).map(s => ({
symbol: s.symbol.Code, symbol: s.symbol.Code,
exchange: s.symbol.Exchange, exchange: s.symbol.eodExchange || s.symbol.Exchange,
name: s.symbol.Name, name: s.symbol.Name,
interval: s.interval, interval: s.interval,
crawlState: s.crawlState lastRun: s.lastRun,
lastSuccess: s.lastSuccess
})) }))
}); });
@ -139,17 +129,20 @@ export async function crawlIntraday(
try { try {
logger.info(`Starting intraday crawl for ${symbol}.${exchange} - ${interval}`); logger.info(`Starting intraday crawl for ${symbol}.${exchange} - ${interval}`);
// Get current crawl state // Get symbol to check if it exists
const symbolDoc = await this.mongodb.collection('eodSymbols').findOne({ const symbolDoc = await this.mongodb.collection('eodSymbols').findOne({
Code: symbol, Code: symbol,
Exchange: exchange eodExchange: exchange
}); });
if (!symbolDoc) { if (!symbolDoc) {
throw new Error(`Symbol ${symbol}.${exchange} not found`); throw new Error(`Symbol ${symbol}.${exchange} not found`);
} }
const crawlState: CrawlState = symbolDoc.intradayState?.[interval] || { // Get operation status from tracker
const operationName = `intraday_${interval}`;
const operationStatus = symbolDoc.operations?.[operationName];
const crawlState: CrawlState = operationStatus?.crawlState || {
finished: false finished: false
}; };
@ -181,9 +174,9 @@ export async function crawlIntraday(
// Update crawl state // Update crawl state
const newState: CrawlState = { const newState: CrawlState = {
...crawlState, ...crawlState,
finished: false,
lastProcessedDate: fromDate, lastProcessedDate: fromDate,
totalRecordsProcessed: (crawlState.totalRecordsProcessed || 0) + result.recordsSaved, totalDaysProcessed: (crawlState.totalDaysProcessed || 0) + 1
totalBatchesProcessed: (crawlState.totalBatchesProcessed || 0) + 1
}; };
// Set oldest date reached // Set oldest date reached
@ -200,23 +193,20 @@ export async function crawlIntraday(
if (result.recordsSaved === 0) { if (result.recordsSaved === 0) {
newState.finished = true; newState.finished = true;
logger.info(`Intraday crawl finished for ${symbol}.${exchange} - ${interval}`, { logger.info(`Intraday crawl finished for ${symbol}.${exchange} - ${interval}`, {
totalRecords: newState.totalRecordsProcessed, symbol,
exchange,
interval,
oldestDate: newState.oldestDateReached, oldestDate: newState.oldestDateReached,
newestDate: newState.newestDateReached, newestDate: newState.newestDateReached,
batches: newState.totalBatchesProcessed
}); });
} }
// Update symbol with new crawl state // Update operation tracker with crawl state
await this.mongodb.collection('eodSymbols').updateOne( await this.operationRegistry.updateOperation('eod', symbol, operationName, {
{ Code: symbol, Exchange: exchange }, status: newState.finished ? 'success' : 'partial',
{ recordCount: result.recordsSaved,
$set: { crawlState: newState
[`intradayState.${interval}`]: newState, });
lastIntradayUpdate: new Date()
}
}
);
// If not finished, schedule next batch // If not finished, schedule next batch
if (!newState.finished) { if (!newState.finished) {
@ -271,7 +261,7 @@ export async function fetchIntraday(
if (!symbolCountry) { if (!symbolCountry) {
const symbolDoc = await this.mongodb.collection('eodSymbols').findOne({ const symbolDoc = await this.mongodb.collection('eodSymbols').findOne({
Code: symbol, Code: symbol,
Exchange: exchange eodExchange: exchange
}); });
if (!symbolDoc) { if (!symbolDoc) {
@ -287,10 +277,8 @@ export async function fetchIntraday(
} }
// Build URL // Build URL
// Use utility function to handle US symbols and EUFUND special case // Note: 'exchange' parameter here is already the eodExchange from scheduling
const exchangeSuffix = getEodExchangeSuffix(exchange, symbolCountry); const url = new URL(`https://eodhd.com/api/intraday/${symbol}.${exchange}`);
const url = new URL(`https://eodhd.com/api/intraday/${symbol}.${exchangeSuffix}`);
url.searchParams.append('api_token', apiKey); url.searchParams.append('api_token', apiKey);
url.searchParams.append('fmt', 'json'); url.searchParams.append('fmt', 'json');
url.searchParams.append('interval', interval); url.searchParams.append('interval', interval);

View file

@ -1,5 +1,3 @@
import type { BaseHandler } from '@stock-bot/handlers';
import type { DataIngestionServices } from '../../../types';
import type { EodHandler } from '../eod.handler'; import type { EodHandler } from '../eod.handler';
import { EOD_CONFIG } from '../shared'; import { EOD_CONFIG } from '../shared';
import { getEodExchangeSuffix } from '../shared/utils'; import { getEodExchangeSuffix } from '../shared/utils';
@ -175,7 +173,7 @@ export async function fetchPrices(
success: true, success: true,
priceCount: result.insertedCount priceCount: result.insertedCount
}; };
} catch (error) { } catch (error: unknown) {
logger.error('Failed to fetch or save prices', { error, symbol, exchange }); logger.error('Failed to fetch or save prices', { error, symbol, exchange });
// Update operation tracker with failure // Update operation tracker with failure

View file

@ -221,23 +221,16 @@ export async function scheduleEventsUpdates(
this.logger.info(`Found ${staleSymbols.length} symbols needing events updates`); this.logger.info(`Found ${staleSymbols.length} symbols needing events updates`);
// Get full symbol data to include symbolId
const symbolDocs = await this.mongodb.find('qmSymbols', {
qmSearchCode: { $in: staleSymbols.slice(0, limit) } // Apply limit after deduplication
}, {
projection: { symbol: 1, exchange: 1, qmSearchCode: 1 }
});
let queued = 0; let queued = 0;
let errors = 0; let errors = 0;
// Schedule individual update jobs for each symbol // Schedule individual update jobs for each symbol
for (const doc of symbolDocs) { for (const item of staleSymbols) {
try { try {
await this.scheduleOperation('update-events', { await this.scheduleOperation('update-events', {
symbol: doc.symbol, symbol: item.symbol.symbol,
exchange: doc.exchange, exchange: item.symbol.exchange,
qmSearchCode: doc.qmSearchCode qmSearchCode: item.symbol.qmSearchCode
}, { }, {
priority: 4, priority: 4,
delay: queued * 0.05 // 1.5 seconds between jobs delay: queued * 0.05 // 1.5 seconds between jobs
@ -245,7 +238,7 @@ export async function scheduleEventsUpdates(
queued++; queued++;
} catch (error) { } catch (error) {
this.logger.error(`Failed to schedule events update for ${doc.symbol}`, { error }); this.logger.error(`Failed to schedule events update for ${item.symbol.symbol}`, { error });
errors++; errors++;
} }
} }

View file

@ -181,30 +181,43 @@ export async function scheduleFinancialsUpdates(
this.logger.info(`Found ${staleSymbolsQ.length} symbols needing quarterly updates and ${staleSymbolsA.length} symbols needing annual updates`); this.logger.info(`Found ${staleSymbolsQ.length} symbols needing quarterly updates and ${staleSymbolsA.length} symbols needing annual updates`);
// Combine unique symbols from both lists // Create a map of symbols needing updates
const allStaleSymbols = [...new Set([...staleSymbolsQ, ...staleSymbolsA])]; const symbolsNeedingUpdates = new Map<string, { quarterly: boolean, annual: boolean, item: any }>();
// Get full symbol data // Add quarterly symbols
const symbolDocs = await this.mongodb.find('qmSymbols', { for (const item of staleSymbolsQ) {
qmSearchCode: { $in: allStaleSymbols } const key = item.symbol.qmSearchCode;
}, { if (!symbolsNeedingUpdates.has(key)) {
projection: { symbol: 1, exchange: 1, qmSearchCode: 1 } symbolsNeedingUpdates.set(key, { quarterly: true, annual: false, item });
}); } else {
symbolsNeedingUpdates.get(key)!.quarterly = true;
}
}
// Add annual symbols
for (const item of staleSymbolsA) {
const key = item.symbol.qmSearchCode;
if (!symbolsNeedingUpdates.has(key)) {
symbolsNeedingUpdates.set(key, { quarterly: false, annual: true, item });
} else {
symbolsNeedingUpdates.get(key)!.annual = true;
}
}
let queued = 0; let queued = 0;
let errors = 0; let errors = 0;
// Schedule individual update jobs for each symbol and report type // Schedule individual update jobs for each symbol and report type
for (const doc of symbolDocs) { for (const [qmSearchCode, { quarterly, annual, item }] of symbolsNeedingUpdates) {
// Check if this symbol needs quarterly updates // Schedule quarterly updates if needed
if (staleSymbolsQ.includes(doc.qmSearchCode)) { if (quarterly) {
try { try {
await this.scheduleOperation('update-financials', { await this.scheduleOperation('update-financials', {
symbol: doc.symbol, symbol: item.symbol.symbol,
exchange: doc.exchange, exchange: item.symbol.exchange,
qmSearchCode: doc.qmSearchCode, qmSearchCode: qmSearchCode,
reportType: 'Q', reportType: 'Q',
lastRecordDate: doc.operations?.price_update?.lastRecordDate, lastRecordDate: item.operations?.financials_update_quarterly?.lastRecordDate,
}, { }, {
priority: 4, priority: 4,
delay: queued // 1 second between jobs delay: queued // 1 second between jobs
@ -212,20 +225,20 @@ export async function scheduleFinancialsUpdates(
queued++; queued++;
} catch (error) { } catch (error) {
this.logger.error(`Failed to schedule quarterly financials update for ${doc.qmSearchCode}`, { error }); this.logger.error(`Failed to schedule quarterly financials update for ${qmSearchCode}`, { error });
errors++; errors++;
} }
} }
// Check if this symbol needs annual updates // Schedule annual updates if needed
if (staleSymbolsA.includes(doc.qmSearchCode)) { if (annual) {
try { try {
await this.scheduleOperation('update-financials', { await this.scheduleOperation('update-financials', {
symbol: doc.symbol, symbol: item.symbol.symbol,
exchange: doc.exchange, exchange: item.symbol.exchange,
qmSearchCode: doc.qmSearchCode, qmSearchCode: qmSearchCode,
reportType: 'A', reportType: 'A',
lastRecordDate: doc.operations?.price_update?.lastRecordDate, lastRecordDate: item.operations?.financials_update_annual?.lastRecordDate,
}, { }, {
priority: 4, priority: 4,
delay: queued // 1 second between jobs delay: queued // 1 second between jobs
@ -233,7 +246,7 @@ export async function scheduleFinancialsUpdates(
queued++; queued++;
} catch (error) { } catch (error) {
this.logger.error(`Failed to schedule annual financials update for ${doc.qmSearchCode}`, { error }); this.logger.error(`Failed to schedule annual financials update for ${qmSearchCode}`, { error });
errors++; errors++;
} }
} }

View file

@ -233,30 +233,23 @@ export async function scheduleInsidersUpdates(
}; };
} }
// Get full symbol data this.logger.info(`Found ${staleSymbols.length} symbols for insider updates`);
const symbolsToProcess = await this.mongodb.find('qmSymbols', {
qmSearchCode: { $in: staleSymbols }
}, {
projection: { symbol: 1, symbolId: 1, qmSearchCode: 1 }
});
this.logger.info(`Found ${symbolsToProcess.length} symbols for insider updates`);
let symbolsQueued = 0; let symbolsQueued = 0;
let errors = 0; let errors = 0;
// Schedule update jobs // Schedule update jobs
for (const doc of symbolsToProcess) { for (const item of staleSymbols) {
try { try {
if (!doc.symbolId) { if (!item.symbol.symbolId) {
this.logger.warn(`Symbol ${doc.symbol} missing symbolId, skipping`); this.logger.warn(`Symbol ${item.symbol.symbol} missing symbolId, skipping`);
continue; continue;
} }
await this.scheduleOperation('update-insiders', { await this.scheduleOperation('update-insiders', {
symbol: doc.symbol, symbol: item.symbol.symbol,
symbolId: doc.symbolId, symbolId: item.symbol.symbolId,
qmSearchCode: doc.qmSearchCode qmSearchCode: item.symbol.qmSearchCode
}, { }, {
priority: 5, // Medium priority priority: 5, // Medium priority
delay: symbolsQueued * 1000 // 1 second between jobs delay: symbolsQueued * 1000 // 1 second between jobs
@ -264,7 +257,7 @@ export async function scheduleInsidersUpdates(
symbolsQueued++; symbolsQueued++;
} catch (error) { } catch (error) {
this.logger.error(`Failed to schedule insider update for ${doc.symbol}`, { error }); this.logger.error(`Failed to schedule insider update for ${item.symbol.symbol}`, { error });
errors++; errors++;
} }
} }

View file

@ -395,30 +395,23 @@ export async function scheduleSymbolNewsUpdates(
}; };
} }
// Get full symbol data this.logger.info(`Found ${staleSymbols.length} symbols for news updates`);
const symbolsToProcess = await this.mongodb.find('qmSymbols', {
qmSearchCode: { $in: staleSymbols }
}, {
projection: { symbol: 1, symbolId: 1, qmSearchCode: 1 }
});
this.logger.info(`Found ${symbolsToProcess.length} symbols for news updates`);
let symbolsQueued = 0; let symbolsQueued = 0;
let errors = 0; let errors = 0;
// Schedule update jobs // Schedule update jobs
for (const doc of symbolsToProcess) { for (const item of staleSymbols) {
try { try {
if (!doc.symbolId) { if (!item.symbol.symbolId) {
this.logger.warn(`Symbol ${doc.symbol} missing symbolId, skipping`); this.logger.warn(`Symbol ${item.symbol.symbol} missing symbolId, skipping`);
continue; continue;
} }
await this.scheduleOperation('update-symbol-news', { await this.scheduleOperation('update-symbol-news', {
symbol: doc.symbol, symbol: item.symbol.symbol,
symbolId: doc.symbolId, symbolId: item.symbol.symbolId,
qmSearchCode: doc.qmSearchCode qmSearchCode: item.symbol.qmSearchCode
}, { }, {
priority: 4, // Lower priority than price data priority: 4, // Lower priority than price data
delay: symbolsQueued * 500 // 0.5 seconds between jobs delay: symbolsQueued * 500 // 0.5 seconds between jobs
@ -426,7 +419,7 @@ export async function scheduleSymbolNewsUpdates(
symbolsQueued++; symbolsQueued++;
} catch (error) { } catch (error) {
this.logger.error(`Failed to schedule news update for ${doc.symbol}`, { error }); this.logger.error(`Failed to schedule news update for ${item.symbol.symbol}`, { error });
errors++; errors++;
} }
} }

View file

@ -203,24 +203,17 @@ export async function schedulePriceUpdates(
this.logger.info(`Found ${staleSymbols.length} symbols needing price updates`); this.logger.info(`Found ${staleSymbols.length} symbols needing price updates`);
// Get full symbol data to include symbolId
const symbolDocs = await this.mongodb.find('qmSymbols', {
qmSearchCode: { $in: staleSymbols }
}, {
projection: { qmSearchCode: 1, operations: 1, symbol: 1, exchange: 1 },
});
let queued = 0; let queued = 0;
let errors = 0; let errors = 0;
// Schedule individual update jobs for each symbol // Schedule individual update jobs for each symbol
for (const doc of symbolDocs) { for (const item of staleSymbols) {
try { try {
await this.scheduleOperation('update-prices', { await this.scheduleOperation('update-prices', {
qmSearchCode: doc.qmSearchCode, qmSearchCode: item.symbol.qmSearchCode,
lastRecordDate: doc.operations?.price_update?.lastRecordDate, lastRecordDate: item.operations?.price_update?.lastRecordDate,
symbol: doc.symbol, symbol: item.symbol.symbol,
exchange: doc.exchange exchange: item.symbol.exchange
}, { }, {
priority: 7, // High priority for price data priority: 7, // High priority for price data
delay: queued * 100 // 0.1 seconds between jobs delay: queued * 100 // 0.1 seconds between jobs
@ -228,7 +221,7 @@ export async function schedulePriceUpdates(
queued++; queued++;
} catch (error) { } catch (error) {
this.logger.error(`Failed to schedule price update for ${doc.qmSearchCode}`, { error }); this.logger.error(`Failed to schedule price update for ${item.symbol.qmSearchCode}`, { error });
errors++; errors++;
} }
} }

View file

@ -168,21 +168,14 @@ export async function scheduleSymbolInfoUpdates(
this.logger.info(`Found ${staleSymbols.length} symbols needing info updates`); this.logger.info(`Found ${staleSymbols.length} symbols needing info updates`);
// Get full symbol data to include qmSearchCode
const symbolDocs = await this.mongodb.find('qmSymbols', {
qmSearchCode: { $in: staleSymbols }
}, {
projection: { qmSearchCode: 1 }
});
let queued = 0; let queued = 0;
let errors = 0; let errors = 0;
// Schedule individual update jobs for each symbol // Schedule individual update jobs for each symbol
for (const doc of symbolDocs) { for (const item of staleSymbols) {
try { try {
await this.scheduleOperation('update-symbol-info', { await this.scheduleOperation('update-symbol-info', {
qmSearchCode: doc.qmSearchCode qmSearchCode: item.symbol.qmSearchCode
}, { }, {
// priority: 3, // priority: 3,
// Add some delay to avoid overwhelming the API // Add some delay to avoid overwhelming the API
@ -191,7 +184,7 @@ export async function scheduleSymbolInfoUpdates(
queued++; queued++;
} catch (error) { } catch (error) {
this.logger.error(`Failed to schedule update for ${doc.qmSearchCode}`, { error }); this.logger.error(`Failed to schedule update for ${item.symbol.qmSearchCode}`, { error });
errors++; errors++;
} }
} }

View file

@ -147,7 +147,7 @@ export class OperationRegistry {
providerName: string, providerName: string,
operationName: string, operationName: string,
options?: StaleSymbolOptions options?: StaleSymbolOptions
): Promise<string[]> { ): Promise<Array<{ symbol: any; lastRun?: Date; lastSuccess?: Date; operations?: any }>> {
const tracker = this.getTracker(providerName); const tracker = this.getTracker(providerName);
return tracker.getStaleSymbols(operationName, options); return tracker.getStaleSymbols(operationName, options);
} }

View file

@ -312,7 +312,7 @@ export class OperationTracker {
async getStaleSymbols( async getStaleSymbols(
operationName: string, operationName: string,
options: StaleSymbolOptions = {} options: StaleSymbolOptions = {}
): Promise<string[]> { ): Promise<Array<{ symbol: any; lastRun?: Date; lastSuccess?: Date; operations?: any }>> {
const { collectionName, symbolField } = this.provider.getProviderConfig(); const { collectionName, symbolField } = this.provider.getProviderConfig();
const { const {
limit = 1000, limit = 1000,
@ -353,11 +353,16 @@ export class OperationTracker {
const symbols = await this.mongodb.find(collectionName, filter, { const symbols = await this.mongodb.find(collectionName, filter, {
limit, limit,
projection: { [symbolField]: 1 }, projection: { }, // Return all fields
sort: { [`operations.${operationName}.lastSuccessAt`]: 1 } sort: { [`operations.${operationName}.lastSuccessAt`]: 1 }
}); });
return symbols.map(doc => doc[symbolField]); return symbols.map(doc => ({
symbol: doc,
lastRun: doc.operations?.[operationName]?.lastRunAt,
lastSuccess: doc.operations?.[operationName]?.lastSuccessAt,
operations: doc.operations
}));
} }
/** /**

View file

@ -15,6 +15,13 @@ export {
mongodbConfigSchema, mongodbConfigSchema,
postgresConfigSchema, postgresConfigSchema,
questdbConfigSchema, questdbConfigSchema,
providerConfigSchema,
providerSchemas,
eodProviderConfigSchema,
ibProviderConfigSchema,
qmProviderConfigSchema,
yahooProviderConfigSchema,
type ProviderName,
} from './schemas'; } from './schemas';
// createAppConfig function for apps/stock // createAppConfig function for apps/stock