work on qm

This commit is contained in:
Boki 2025-06-29 10:00:29 -04:00
parent 6082a54d14
commit 5640444c47
9 changed files with 492 additions and 476 deletions

View file

@ -31,6 +31,7 @@ export async function updateCorporateActions(
input: { input: {
symbol: string; symbol: string;
symbolId: number; symbolId: number;
qmSearchCode: string;
}, },
_context?: ExecutionContext _context?: ExecutionContext
): Promise<{ ): Promise<{
@ -43,7 +44,7 @@ export async function updateCorporateActions(
earnings: number; earnings: number;
}; };
}> { }> {
const { symbol, symbolId } = input; const { symbol, symbolId, qmSearchCode } = input;
this.logger.info('Fetching corporate actions', { symbol, symbolId }); this.logger.info('Fetching corporate actions', { symbol, symbolId });
@ -138,7 +139,7 @@ export async function updateCorporateActions(
// Update tracking for corporate actions // Update tracking for corporate actions
const updateTime = new Date(); const updateTime = new Date();
await tracker.updateSymbolOperation(symbol, 'corporate_actions_update', { await tracker.updateSymbolOperation(qmSearchCode, 'corporate_actions_update', {
status: 'success', status: 'success',
lastRecordDate: updateTime, lastRecordDate: updateTime,
recordCount: dividendCount + splitCount + earningsCount recordCount: dividendCount + splitCount + earningsCount
@ -176,7 +177,7 @@ export async function updateCorporateActions(
// Track failure for corporate actions // Track failure for corporate actions
const tracker = await getOperationTracker(this); const tracker = await getOperationTracker(this);
await tracker.updateSymbolOperation(symbol, 'corporate_actions_update', { await tracker.updateSymbolOperation(qmSearchCode, 'corporate_actions_update', {
status: 'failure' status: 'failure'
}); });
@ -228,9 +229,9 @@ export async function scheduleCorporateActionsUpdates(
// Get full symbol data to include symbolId // Get full symbol data to include symbolId
const symbolDocs = await this.mongodb.find('qmSymbols', { const symbolDocs = await this.mongodb.find('qmSymbols', {
symbol: { $in: staleSymbols.slice(0, limit) } // Apply limit after deduplication qmSearchCode: { $in: staleSymbols.slice(0, limit) } // Apply limit after deduplication
}, { }, {
projection: { symbol: 1, symbolId: 1 } projection: { symbol: 1, symbolId: 1, qmSearchCode: 1 }
}); });
let queued = 0; let queued = 0;
@ -246,7 +247,8 @@ export async function scheduleCorporateActionsUpdates(
await this.scheduleOperation('update-corporate-actions', { await this.scheduleOperation('update-corporate-actions', {
symbol: doc.symbol, symbol: doc.symbol,
symbolId: doc.symbolId symbolId: doc.symbolId,
qmSearchCode: doc.qmSearchCode
}, { }, {
priority: 4, priority: 4,
delay: queued * 1500 // 1.5 seconds between jobs delay: queued * 1500 // 1.5 seconds between jobs

View file

@ -30,6 +30,7 @@ export async function updateFilings(
input: { input: {
symbol: string; symbol: string;
symbolId: number; symbolId: number;
qmSearchCode: string;
}, },
_context?: ExecutionContext _context?: ExecutionContext
): Promise<{ ): Promise<{
@ -38,7 +39,7 @@ export async function updateFilings(
message: string; message: string;
data?: any; data?: any;
}> { }> {
const { symbol, symbolId } = input; const { symbol, symbolId, qmSearchCode } = input;
this.logger.info('Fetching filings', { symbol, symbolId }); this.logger.info('Fetching filings', { symbol, symbolId });
@ -97,7 +98,7 @@ export async function updateFilings(
// Update symbol to track last filings update // Update symbol to track last filings update
const tracker = await getOperationTracker(this); const tracker = await getOperationTracker(this);
await tracker.updateSymbolOperation(symbol, 'filings_update', { await tracker.updateSymbolOperation(qmSearchCode, 'filings_update', {
status: 'success', status: 'success',
lastRecordDate: new Date(), lastRecordDate: new Date(),
recordCount: filingsData.length recordCount: filingsData.length
@ -117,7 +118,7 @@ export async function updateFilings(
} else { } else {
// Some symbols may not have filings (non-US companies, etc) // Some symbols may not have filings (non-US companies, etc)
const tracker = await getOperationTracker(this); const tracker = await getOperationTracker(this);
await tracker.updateSymbolOperation(symbol, 'filings_update', { await tracker.updateSymbolOperation(qmSearchCode, 'filings_update', {
status: 'success', status: 'success',
lastRecordDate: new Date(), lastRecordDate: new Date(),
recordCount: 0 recordCount: 0
@ -145,7 +146,7 @@ export async function updateFilings(
// Track failure // Track failure
const tracker = await getOperationTracker(this); const tracker = await getOperationTracker(this);
await tracker.updateSymbolOperation(symbol, 'filings_update', { await tracker.updateSymbolOperation(qmSearchCode, 'filings_update', {
status: 'failure' status: 'failure'
}); });
@ -197,9 +198,9 @@ export async function scheduleFilingsUpdates(
// Get full symbol data to include symbolId // Get full symbol data to include symbolId
const symbolDocs = await this.mongodb.find('qmSymbols', { const symbolDocs = await this.mongodb.find('qmSymbols', {
symbol: { $in: staleSymbols } qmSearchCode: { $in: staleSymbols }
}, { }, {
projection: { symbol: 1, symbolId: 1 } projection: { symbol: 1, symbolId: 1, qmSearchCode: 1 }
}); });
let queued = 0; let queued = 0;
@ -215,7 +216,8 @@ export async function scheduleFilingsUpdates(
await this.scheduleOperation('update-filings', { await this.scheduleOperation('update-filings', {
symbol: doc.symbol, symbol: doc.symbol,
symbolId: doc.symbolId symbolId: doc.symbolId,
qmSearchCode: doc.qmSearchCode
}, { }, {
priority: 5, // Lower priority than financial data priority: 5, // Lower priority than financial data
delay: queued * 2000 // 2 seconds between jobs delay: queued * 2000 // 2 seconds between jobs

View file

@ -30,6 +30,7 @@ export async function updateFinancials(
input: { input: {
symbol: string; symbol: string;
symbolId: number; symbolId: number;
qmSearchCode: string;
}, },
_context?: ExecutionContext _context?: ExecutionContext
): Promise<{ ): Promise<{
@ -38,7 +39,7 @@ export async function updateFinancials(
message: string; message: string;
data?: any; data?: any;
}> { }> {
const { symbol, symbolId } = input; const { symbol, symbolId, qmSearchCode } = input;
this.logger.info('Fetching financials', { symbol, symbolId }); this.logger.info('Fetching financials', { symbol, symbolId });
@ -96,7 +97,7 @@ export async function updateFinancials(
// Update symbol to track last financials update // Update symbol to track last financials update
const tracker = await getOperationTracker(this); const tracker = await getOperationTracker(this);
await tracker.updateSymbolOperation(symbol, 'financials_update', { await tracker.updateSymbolOperation(qmSearchCode, 'financials_update', {
status: 'success', status: 'success',
lastRecordDate: new Date(), lastRecordDate: new Date(),
recordCount: financialData.length recordCount: financialData.length
@ -135,7 +136,7 @@ export async function updateFinancials(
// Track failure // Track failure
const tracker = await getOperationTracker(this); const tracker = await getOperationTracker(this);
await tracker.updateSymbolOperation(symbol, 'financials_update', { await tracker.updateSymbolOperation(qmSearchCode, 'financials_update', {
status: 'failure', status: 'failure',
}); });
@ -187,9 +188,9 @@ export async function scheduleFinancialsUpdates(
// Get full symbol data to include symbolId // Get full symbol data to include symbolId
const symbolDocs = await this.mongodb.find('qmSymbols', { const symbolDocs = await this.mongodb.find('qmSymbols', {
symbol: { $in: staleSymbols } qmSearchCode: { $in: staleSymbols }
}, { }, {
projection: { symbol: 1, symbolId: 1 } projection: { symbol: 1, symbolId: 1, qmSearchCode: 1 }
}); });
let queued = 0; let queued = 0;
@ -205,7 +206,8 @@ export async function scheduleFinancialsUpdates(
await this.scheduleOperation('update-financials', { await this.scheduleOperation('update-financials', {
symbol: doc.symbol, symbol: doc.symbol,
symbolId: doc.symbolId symbolId: doc.symbolId,
qmSearchCode: doc.qmSearchCode
}, { }, {
priority: 4, priority: 4,
delay: queued * 2000 // 2 seconds between jobs delay: queued * 2000 // 2 seconds between jobs

View file

@ -31,6 +31,7 @@ export async function updateIntradayBars(
input: { input: {
symbol: string; symbol: string;
symbolId: number; symbolId: number;
qmSearchCode: string;
crawlDate?: string; // ISO date string for specific date crawl crawlDate?: string; // ISO date string for specific date crawl
}, },
_context?: ExecutionContext _context?: ExecutionContext
@ -40,7 +41,7 @@ export async function updateIntradayBars(
message: string; message: string;
data?: any; data?: any;
}> { }> {
const { symbol, symbolId, crawlDate } = input; const { symbol, symbolId, qmSearchCode, crawlDate } = input;
this.logger.info('Fetching intraday bars', { symbol, symbolId, crawlDate }); this.logger.info('Fetching intraday bars', { symbol, symbolId, crawlDate });
@ -203,9 +204,9 @@ export async function scheduleIntradayUpdates(
// Get full symbol data // Get full symbol data
symbolsToProcess = await this.mongodb.find('qmSymbols', { symbolsToProcess = await this.mongodb.find('qmSymbols', {
symbol: { $in: staleSymbols } qmSearchCode: { $in: staleSymbols }
}, { }, {
projection: { symbol: 1, symbolId: 1 } projection: { symbol: 1, symbolId: 1, qmSearchCode: 1 }
}); });
} }
@ -245,6 +246,7 @@ export async function scheduleIntradayUpdates(
await this.scheduleOperation('update-intraday-bars', { await this.scheduleOperation('update-intraday-bars', {
symbol: doc.symbol, symbol: doc.symbol,
symbolId: doc.symbolId, symbolId: doc.symbolId,
qmSearchCode: doc.qmSearchCode,
crawlDate: crawlDate.toISOString() crawlDate: crawlDate.toISOString()
}, { }, {
priority: 6, priority: 6,
@ -255,7 +257,7 @@ export async function scheduleIntradayUpdates(
} }
// Update crawl state // Update crawl state
await tracker.updateSymbolOperation(doc.symbol, 'intraday_bars', { await tracker.updateSymbolOperation(doc.qmSearchCode, 'intraday_bars', {
status: 'partial', status: 'partial',
crawlState: { crawlState: {
finished: false, finished: false,
@ -266,7 +268,8 @@ export async function scheduleIntradayUpdates(
// For update mode, just fetch today's data // For update mode, just fetch today's data
await this.scheduleOperation('update-intraday-bars', { await this.scheduleOperation('update-intraday-bars', {
symbol: doc.symbol, symbol: doc.symbol,
symbolId: doc.symbolId symbolId: doc.symbolId,
qmSearchCode: doc.qmSearchCode
}, { }, {
priority: 8, // High priority for current data priority: 8, // High priority for current data
delay: jobsQueued * 500 // 0.5 seconds between jobs delay: jobsQueued * 500 // 0.5 seconds between jobs

View file

@ -30,6 +30,7 @@ export async function updatePrices(
input: { input: {
symbol: string; symbol: string;
symbolId: number; symbolId: number;
qmSearchCode: string;
}, },
_context?: ExecutionContext _context?: ExecutionContext
): Promise<{ ): Promise<{
@ -38,7 +39,7 @@ export async function updatePrices(
message: string; message: string;
data?: any; data?: any;
}> { }> {
const { symbol, symbolId } = input; const { symbol, symbolId, qmSearchCode } = input;
this.logger.info('Fetching daily prices', { symbol, symbolId }); this.logger.info('Fetching daily prices', { symbol, symbolId });
@ -106,7 +107,7 @@ export async function updatePrices(
// Update symbol to track last price update // Update symbol to track last price update
const tracker = await getOperationTracker(this); const tracker = await getOperationTracker(this);
await tracker.updateSymbolOperation(symbol, 'price_update', { await tracker.updateSymbolOperation(qmSearchCode, 'price_update', {
status: 'success', status: 'success',
lastRecordDate: latestDate, lastRecordDate: latestDate,
recordCount: priceData.length recordCount: priceData.length
@ -149,7 +150,7 @@ export async function updatePrices(
// Track failure // Track failure
const tracker = await getOperationTracker(this); const tracker = await getOperationTracker(this);
await tracker.updateSymbolOperation(symbol, 'price_update', { await tracker.updateSymbolOperation(qmSearchCode, 'price_update', {
status: 'failure' status: 'failure'
}); });
@ -201,9 +202,9 @@ export async function schedulePriceUpdates(
// Get full symbol data to include symbolId // Get full symbol data to include symbolId
const symbolDocs = await this.mongodb.find('qmSymbols', { const symbolDocs = await this.mongodb.find('qmSymbols', {
symbol: { $in: staleSymbols } qmSearchCode: { $in: staleSymbols }
}, { }, {
projection: { symbol: 1, symbolId: 1 } projection: { symbol: 1, symbolId: 1, qmSearchCode: 1 }
}); });
let queued = 0; let queued = 0;
@ -219,7 +220,8 @@ export async function schedulePriceUpdates(
await this.scheduleOperation('update-prices', { await this.scheduleOperation('update-prices', {
symbol: doc.symbol, symbol: doc.symbol,
symbolId: doc.symbolId symbolId: doc.symbolId,
qmSearchCode: doc.qmSearchCode
}, { }, {
priority: 7, // High priority for price data priority: 7, // High priority for price data
delay: queued * 500 // 0.5 seconds between jobs delay: queued * 500 // 0.5 seconds between jobs

View file

@ -196,13 +196,13 @@ export async function scheduleSymbolInfoUpdates(
symbol: doc.symbol, symbol: doc.symbol,
qmSearchCode: doc.qmSearchCode || doc.symbol qmSearchCode: doc.qmSearchCode || doc.symbol
}, { }, {
priority: 3, // priority: 3,
// Add some delay to avoid overwhelming the API // Add some delay to avoid overwhelming the API
delay: queued * 1000 // 1 second between jobs // delay: queued * 1000 // 1 second between jobs
}); });
// Track that we've scheduled this symbol // Track that we've scheduled this symbol
await tracker.updateSymbolOperation(doc.symbol, 'symbol_info', { await tracker.updateSymbolOperation(doc.qmSearchCode, 'symbol_info', {
status: 'success' status: 'success'
}); });

View file

@ -7,7 +7,12 @@ import { getRandomUserAgent } from "@stock-bot/utils";
// QM Session IDs for different endpoints // QM Session IDs for different endpoints
export const QM_SESSION_IDS = { export const QM_SESSION_IDS = {
LOOKUP: 'dc8c9930437f65d30f6597768800957017bac203a0a50342932757c8dfa158d6', // lookup endpoint LOOKUP: 'dc8c9930437f65d30f6597768800957017bac203a0a50342932757c8dfa158d6', // lookup endpoint
PROFILES: '1e1d7cb1de1fd2fe52684abdea41a446919a5fe12776dfab88615ac1ce1ec2f6', // getProfiles SYMBOL: '1e1d7cb1de1fd2fe52684abdea41a446919a5fe12776dfab88615ac1ce1ec2f6', // getProfiles
// EDS: '', //
// FILINGS: '', //
// PRICES: '', //
// FINANCIALS: '', //
// INTRADAY: '', //
// '5ad521e05faf5778d567f6d0012ec34d6cdbaeb2462f41568f66558bc7b4ced9': [], //4488d072b // '5ad521e05faf5778d567f6d0012ec34d6cdbaeb2462f41568f66558bc7b4ced9': [], //4488d072b
// cc1cbdaf040f76db8f4c94f7d156b9b9b716e1a7509ec9c74a48a47f6b6b9f87: [], //97ff00cf3 // getQuotes // cc1cbdaf040f76db8f4c94f7d156b9b9b716e1a7509ec9c74a48a47f6b6b9f87: [], //97ff00cf3 // getQuotes
// '74963ff42f1db2320d051762b5d3950ff9eab23f9d5c5b592551b4ca0441d086': [], //32ca24e394b // getSplitsBySymbol getBrokerRatingsBySymbol getDividendsBySymbol getEarningsSurprisesBySymbol getEarningsEventsBySymbol // '74963ff42f1db2320d051762b5d3950ff9eab23f9d5c5b592551b4ca0441d086': [], //32ca24e394b // getSplitsBySymbol getBrokerRatingsBySymbol getDividendsBySymbol getEarningsSurprisesBySymbol getEarningsEventsBySymbol

View file

@ -1,442 +1,442 @@
/** /**
* QM Operation Tracker - Tracks operation execution times and states for symbols * QM Operation Tracker - Tracks operation execution times and states for symbols
* Supports dynamic operation registration with auto-indexing * Supports dynamic operation registration with auto-indexing
*/ */
import type { Logger, MongoDBClient } from '@stock-bot/types'; import type { Logger, MongoDBClient } from '@stock-bot/types';
import type { IntradayCrawlSymbol, QMOperationConfig } from './types'; import type { IntradayCrawlSymbol, QMOperationConfig } from './types';
export class QMOperationTracker { export class QMOperationTracker {
private registeredOperations: Map<string, QMOperationConfig> = new Map(); private registeredOperations: Map<string, QMOperationConfig> = new Map();
private indexesCreated: Set<string> = new Set(); private indexesCreated: Set<string> = new Set();
private mongodb: MongoDBClient; private mongodb: MongoDBClient;
private logger: Logger; private logger: Logger;
private readonly collectionName = 'qmSymbols'; private readonly collectionName = 'qmSymbols';
constructor(mongodb: MongoDBClient, logger: Logger) { constructor(mongodb: MongoDBClient, logger: Logger) {
this.mongodb = mongodb; this.mongodb = mongodb;
this.logger = logger; this.logger = logger;
} }
/** /**
* Register a new operation type with auto-indexing * Register a new operation type with auto-indexing
*/ */
async registerOperation(config: QMOperationConfig): Promise<void> { async registerOperation(config: QMOperationConfig): Promise<void> {
this.logger.info('Registering QM operation', { operation: config.name, type: config.type }); this.logger.info('Registering QM operation', { operation: config.name, type: config.type });
this.registeredOperations.set(config.name, config); this.registeredOperations.set(config.name, config);
// Auto-create indexes for this operation // Auto-create indexes for this operation
await this.createOperationIndexes(config.name); await this.createOperationIndexes(config.name);
this.logger.debug('Operation registered successfully', { operation: config.name }); this.logger.debug('Operation registered successfully', { operation: config.name });
} }
/** /**
* Create indexes for efficient operation queries * Create indexes for efficient operation queries
*/ */
private async createOperationIndexes(operationName: string): Promise<void> { private async createOperationIndexes(operationName: string): Promise<void> {
if (this.indexesCreated.has(operationName)) { if (this.indexesCreated.has(operationName)) {
this.logger.debug('Indexes already created for operation', { operation: operationName }); this.logger.debug('Indexes already created for operation', { operation: operationName });
return; return;
} }
try { try {
const indexes = [ const indexes = [
// Index for finding stale symbols // Index for finding stale symbols
{ [`operations.${operationName}.lastRunAt`]: 1, symbol: 1 }, { [`operations.${operationName}.lastRunAt`]: 1, qmSearchCode: 1 },
// Index for finding by last record date // Index for finding by last record date
{ [`operations.${operationName}.lastRecordDate`]: 1, symbol: 1 }, { [`operations.${operationName}.lastRecordDate`]: 1, qmSearchCode: 1 },
]; ];
// Add crawl state index for intraday operations // Add crawl state index for intraday operations
const config = this.registeredOperations.get(operationName); const config = this.registeredOperations.get(operationName);
if (config?.type === 'intraday_crawl') { if (config?.type === 'intraday_crawl') {
indexes.push({ [`operations.${operationName}.crawlState.finished`]: 1, symbol: 1 }); indexes.push({ [`operations.${operationName}.crawlState.finished`]: 1, qmSearchCode: 1 });
} }
for (const indexSpec of indexes) { for (const indexSpec of indexes) {
const collection = this.mongodb.collection(this.collectionName); const collection = this.mongodb.collection(this.collectionName);
await collection.createIndex(indexSpec, { await collection.createIndex(indexSpec, {
background: true, background: true,
name: `op_${operationName}_${Object.keys(indexSpec).join('_')}` name: `op_${operationName}_${Object.keys(indexSpec).join('_')}`
}); });
} }
this.indexesCreated.add(operationName); this.indexesCreated.add(operationName);
this.logger.info('Created indexes for operation', { this.logger.info('Created indexes for operation', {
operation: operationName, operation: operationName,
indexCount: indexes.length indexCount: indexes.length
}); });
} catch (error) { } catch (error) {
this.logger.error('Failed to create indexes for operation', { this.logger.error('Failed to create indexes for operation', {
operation: operationName, operation: operationName,
error error
}); });
throw error; throw error;
} }
} }
/** /**
* Update symbol operation status * Update symbol operation status
*/ */
async updateSymbolOperation( async updateSymbolOperation(
symbol: string, qmSearchCode: string,
operationName: string, operationName: string,
data: { data: {
status: 'success' | 'failure' | 'partial'; status: 'success' | 'failure' | 'partial';
lastRecordDate?: Date; lastRecordDate?: Date;
recordCount?: number; recordCount?: number;
crawlState?: { crawlState?: {
finished?: boolean; finished?: boolean;
oldestDateReached?: Date; oldestDateReached?: Date;
}; };
} }
): Promise<void> { ): Promise<void> {
const update: any = { const update: any = {
$set: { $set: {
[`operations.${operationName}.lastRunAt`]: new Date(), [`operations.${operationName}.lastRunAt`]: new Date(),
[`operations.${operationName}.status`]: data.status, [`operations.${operationName}.status`]: data.status,
updated_at: new Date() updated_at: new Date()
} }
}; };
if (data.lastRecordDate) { if (data.lastRecordDate) {
update.$set[`operations.${operationName}.lastRecordDate`] = data.lastRecordDate; update.$set[`operations.${operationName}.lastRecordDate`] = data.lastRecordDate;
} }
if (data.recordCount !== undefined) { if (data.recordCount !== undefined) {
update.$set[`operations.${operationName}.recordCount`] = data.recordCount; update.$set[`operations.${operationName}.recordCount`] = data.recordCount;
} }
if (data.crawlState) { if (data.crawlState) {
update.$set[`operations.${operationName}.crawlState`] = { update.$set[`operations.${operationName}.crawlState`] = {
...data.crawlState, ...data.crawlState,
lastCrawlDirection: data.crawlState.finished ? 'forward' : 'backward' lastCrawlDirection: data.crawlState.finished ? 'forward' : 'backward'
}; };
} }
await this.mongodb.updateOne(this.collectionName, { symbol }, update); await this.mongodb.updateOne(this.collectionName, { qmSearchCode }, update);
this.logger.debug('Updated symbol operation', { this.logger.debug('Updated symbol operation', {
symbol, qmSearchCode,
operation: operationName, operation: operationName,
status: data.status status: data.status
}); });
} }
/** /**
* Bulk update symbol operations for performance * Bulk update symbol operations for performance
*/ */
async bulkUpdateSymbolOperations( async bulkUpdateSymbolOperations(
updates: Array<{ updates: Array<{
symbol: string; qmSearchCode: string;
operation: string; operation: string;
data: { data: {
status: 'success' | 'failure' | 'partial'; status: 'success' | 'failure' | 'partial';
lastRecordDate?: Date; lastRecordDate?: Date;
recordCount?: number; recordCount?: number;
crawlState?: any; crawlState?: any;
}; };
}> }>
): Promise<void> { ): Promise<void> {
if (updates.length === 0) {return;} if (updates.length === 0) {return;}
const bulkOps = updates.map(({ symbol, operation, data }) => { const bulkOps = updates.map(({ qmSearchCode, operation, data }) => {
const update: any = { const update: any = {
$set: { $set: {
[`operations.${operation}.lastRunAt`]: new Date(), [`operations.${operation}.lastRunAt`]: new Date(),
[`operations.${operation}.status`]: data.status, [`operations.${operation}.status`]: data.status,
updated_at: new Date() updated_at: new Date()
} }
}; };
if (data.lastRecordDate) { if (data.lastRecordDate) {
update.$set[`operations.${operation}.lastRecordDate`] = data.lastRecordDate; update.$set[`operations.${operation}.lastRecordDate`] = data.lastRecordDate;
} }
if (data.recordCount !== undefined) { if (data.recordCount !== undefined) {
update.$set[`operations.${operation}.recordCount`] = data.recordCount; update.$set[`operations.${operation}.recordCount`] = data.recordCount;
} }
if (data.crawlState) { if (data.crawlState) {
update.$set[`operations.${operation}.crawlState`] = { update.$set[`operations.${operation}.crawlState`] = {
...data.crawlState, ...data.crawlState,
lastCrawlDirection: data.crawlState.finished ? 'forward' : 'backward' lastCrawlDirection: data.crawlState.finished ? 'forward' : 'backward'
}; };
} }
return { return {
updateOne: { updateOne: {
filter: { symbol }, filter: { qmSearchCode },
update update
} }
}; };
}); });
const collection = this.mongodb.collection(this.collectionName); const collection = this.mongodb.collection(this.collectionName);
const result = await collection.bulkWrite(bulkOps as any, { ordered: false }); const result = await collection.bulkWrite(bulkOps as any, { ordered: false });
this.logger.debug('Bulk updated symbol operations', { this.logger.debug('Bulk updated symbol operations', {
totalUpdates: updates.length, totalUpdates: updates.length,
modified: result.modifiedCount, modified: result.modifiedCount,
operations: Array.from(new Set(updates.map(u => u.operation))) operations: Array.from(new Set(updates.map(u => u.operation)))
}); });
} }
/** /**
* Get symbols that need processing for an operation * Get symbols that need processing for an operation
*/ */
async getStaleSymbols( async getStaleSymbols(
operationName: string, operationName: string,
options: { options: {
notRunSince?: Date; notRunSince?: Date;
minHoursSinceRun?: number; minHoursSinceRun?: number;
limit?: number; limit?: number;
excludeSymbols?: string[]; excludeSymbols?: string[];
} = {} } = {}
): Promise<string[]> { ): Promise<string[]> {
const { limit = 1000, excludeSymbols = [] } = options; const { limit = 1000, excludeSymbols = [] } = options;
const cutoffDate = options.notRunSince || (() => { const cutoffDate = options.notRunSince || (() => {
const date = new Date(); const date = new Date();
const hours = options.minHoursSinceRun || const hours = options.minHoursSinceRun ||
this.registeredOperations.get(operationName)?.defaultStaleHours || 24; this.registeredOperations.get(operationName)?.defaultStaleHours || 24;
date.setHours(date.getHours() - hours); date.setHours(date.getHours() - hours);
return date; return date;
})(); })();
const filter: any = { const filter: any = {
$or: [ $or: [
{ [`operations.${operationName}.lastRunAt`]: { $lt: cutoffDate } }, { [`operations.${operationName}.lastRunAt`]: { $lt: cutoffDate } },
{ [`operations.${operationName}`]: { $exists: false } } { [`operations.${operationName}`]: { $exists: false } }
] ]
}; };
if (excludeSymbols.length > 0) { if (excludeSymbols.length > 0) {
filter.symbol = { $nin: excludeSymbols }; filter.qmSearchCode = { $nin: excludeSymbols };
} }
const symbols = await this.mongodb.find(this.collectionName, filter, { const symbols = await this.mongodb.find(this.collectionName, filter, {
limit, limit,
projection: { symbol: 1 }, projection: { qmSearchCode: 1 },
sort: { [`operations.${operationName}.lastRunAt`]: 1 } // Oldest first sort: { [`operations.${operationName}.lastRunAt`]: 1 } // Oldest first
}); });
return symbols.map(s => s.symbol); return symbols.map(s => s.qmSearchCode);
} }
/** /**
* Get symbols for intraday crawling * Get symbols for intraday crawling
*/ */
async getSymbolsForIntradayCrawl( async getSymbolsForIntradayCrawl(
operationName: string, operationName: string,
options: { options: {
limit?: number; limit?: number;
includeFinished?: boolean; includeFinished?: boolean;
} = {} } = {}
): Promise<IntradayCrawlSymbol[]> { ): Promise<IntradayCrawlSymbol[]> {
const { limit = 100, includeFinished = false } = options; const { limit = 100, includeFinished = false } = options;
const filter: any = {}; const filter: any = {};
if (!includeFinished) { if (!includeFinished) {
filter[`operations.${operationName}.crawlState.finished`] = { $ne: true }; filter[`operations.${operationName}.crawlState.finished`] = { $ne: true };
} }
const symbols = await this.mongodb.find(this.collectionName, filter, { const symbols = await this.mongodb.find(this.collectionName, filter, {
limit, limit,
projection: { projection: {
symbol: 1, qmSearchCode: 1,
[`operations.${operationName}`]: 1 [`operations.${operationName}`]: 1
}, },
sort: { sort: {
// Prioritize symbols that haven't been crawled yet // Prioritize symbols that haven't been crawled yet
[`operations.${operationName}.lastRunAt`]: 1 [`operations.${operationName}.lastRunAt`]: 1
} }
}); });
return symbols.map(s => ({ return symbols.map(s => ({
symbol: s.symbol, qmSearchCode: s.qmSearchCode,
lastRecordDate: s.operations?.[operationName]?.lastRecordDate, lastRecordDate: s.operations?.[operationName]?.lastRecordDate,
crawlState: s.operations?.[operationName]?.crawlState crawlState: s.operations?.[operationName]?.crawlState
})); }));
} }
/** /**
* Mark intraday crawl as finished * Mark intraday crawl as finished
*/ */
async markCrawlFinished( async markCrawlFinished(
symbol: string, qmSearchCode: string,
operationName: string, operationName: string,
oldestDateReached: Date oldestDateReached: Date
): Promise<void> { ): Promise<void> {
await this.updateSymbolOperation(symbol, operationName, { await this.updateSymbolOperation(qmSearchCode, operationName, {
status: 'success', status: 'success',
crawlState: { crawlState: {
finished: true, finished: true,
oldestDateReached oldestDateReached
} }
}); });
this.logger.info('Marked crawl as finished', { this.logger.info('Marked crawl as finished', {
symbol, qmSearchCode,
operation: operationName, operation: operationName,
oldestDateReached oldestDateReached
}); });
} }
/** /**
* Get symbols that need data updates based on last record date * Get symbols that need data updates based on last record date
*/ */
async getSymbolsNeedingUpdate( async getSymbolsNeedingUpdate(
operationName: string, operationName: string,
options: { options: {
lastRecordBefore?: Date; lastRecordBefore?: Date;
neverRun?: boolean; neverRun?: boolean;
limit?: number; limit?: number;
} = {} } = {}
): Promise<Array<{ symbol: string; lastRecordDate?: Date }>> { ): Promise<Array<{ qmSearchCode: string; lastRecordDate?: Date }>> {
const { limit = 500 } = options; const { limit = 500 } = options;
const filter: any = {}; const filter: any = {};
if (options.neverRun) { if (options.neverRun) {
filter[`operations.${operationName}`] = { $exists: false }; filter[`operations.${operationName}`] = { $exists: false };
} else if (options.lastRecordBefore) { } else if (options.lastRecordBefore) {
filter.$or = [ filter.$or = [
{ [`operations.${operationName}.lastRecordDate`]: { $lt: options.lastRecordBefore } }, { [`operations.${operationName}.lastRecordDate`]: { $lt: options.lastRecordBefore } },
{ [`operations.${operationName}`]: { $exists: false } } { [`operations.${operationName}`]: { $exists: false } }
]; ];
} }
const symbols = await this.mongodb.find(this.collectionName, filter, { const symbols = await this.mongodb.find(this.collectionName, filter, {
limit, limit,
projection: { projection: {
symbol: 1, qmSearchCode: 1,
[`operations.${operationName}.lastRecordDate`]: 1 [`operations.${operationName}.lastRecordDate`]: 1
}, },
sort: { [`operations.${operationName}.lastRecordDate`]: 1 } // Oldest data first sort: { [`operations.${operationName}.lastRecordDate`]: 1 } // Oldest data first
}); });
return symbols.map(s => ({ return symbols.map(s => ({
symbol: s.symbol, qmSearchCode: s.qmSearchCode,
lastRecordDate: s.operations?.[operationName]?.lastRecordDate lastRecordDate: s.operations?.[operationName]?.lastRecordDate
})); }));
} }
/** /**
* Get operation statistics * Get operation statistics
*/ */
async getOperationStats(operationName: string): Promise<{ async getOperationStats(operationName: string): Promise<{
totalSymbols: number; totalSymbols: number;
processedSymbols: number; processedSymbols: number;
staleSymbols: number; staleSymbols: number;
successfulSymbols: number; successfulSymbols: number;
failedSymbols: number; failedSymbols: number;
finishedCrawls?: number; finishedCrawls?: number;
avgRecordsPerSymbol?: number; avgRecordsPerSymbol?: number;
}> { }> {
const collection = this.mongodb.collection(this.collectionName); const collection = this.mongodb.collection(this.collectionName);
const total = await collection.countDocuments({}); const total = await collection.countDocuments({});
const processed = await collection.countDocuments({ const processed = await collection.countDocuments({
[`operations.${operationName}`]: { $exists: true } [`operations.${operationName}`]: { $exists: true }
}); });
const successful = await collection.countDocuments({ const successful = await collection.countDocuments({
[`operations.${operationName}.status`]: 'success' [`operations.${operationName}.status`]: 'success'
}); });
const failed = await collection.countDocuments({ const failed = await collection.countDocuments({
[`operations.${operationName}.status`]: 'failure' [`operations.${operationName}.status`]: 'failure'
}); });
const staleDate = new Date(); const staleDate = new Date();
staleDate.setHours(staleDate.getHours() - ( staleDate.setHours(staleDate.getHours() - (
this.registeredOperations.get(operationName)?.defaultStaleHours || 24 this.registeredOperations.get(operationName)?.defaultStaleHours || 24
)); ));
const stale = await collection.countDocuments({ const stale = await collection.countDocuments({
$or: [ $or: [
{ [`operations.${operationName}.lastRunAt`]: { $lt: staleDate } }, { [`operations.${operationName}.lastRunAt`]: { $lt: staleDate } },
{ [`operations.${operationName}`]: { $exists: false } } { [`operations.${operationName}`]: { $exists: false } }
] ]
}); });
const result: any = { const result: any = {
totalSymbols: total, totalSymbols: total,
processedSymbols: processed, processedSymbols: processed,
staleSymbols: stale, staleSymbols: stale,
successfulSymbols: successful, successfulSymbols: successful,
failedSymbols: failed failedSymbols: failed
}; };
// Additional stats for crawl operations // Additional stats for crawl operations
if (this.registeredOperations.get(operationName)?.type === 'intraday_crawl') { if (this.registeredOperations.get(operationName)?.type === 'intraday_crawl') {
result.finishedCrawls = await collection.countDocuments({ result.finishedCrawls = await collection.countDocuments({
[`operations.${operationName}.crawlState.finished`]: true [`operations.${operationName}.crawlState.finished`]: true
}); });
} }
// Calculate average records per symbol // Calculate average records per symbol
const aggregation = await collection.aggregate([ const aggregation = await collection.aggregate([
{ {
$match: { $match: {
[`operations.${operationName}.recordCount`]: { $exists: true } [`operations.${operationName}.recordCount`]: { $exists: true }
} }
}, },
{ {
$group: { $group: {
_id: null, _id: null,
avgRecords: { $avg: `$operations.${operationName}.recordCount` } avgRecords: { $avg: `$operations.${operationName}.recordCount` }
} }
} }
]).toArray(); ]).toArray();
if (aggregation.length > 0) { if (aggregation.length > 0) {
result.avgRecordsPerSymbol = Math.round(aggregation[0].avgRecords); result.avgRecordsPerSymbol = Math.round(aggregation[0].avgRecords);
} }
return result; return result;
} }
/** /**
* Get all registered operations * Get all registered operations
*/ */
getRegisteredOperations(): QMOperationConfig[] { getRegisteredOperations(): QMOperationConfig[] {
return Array.from(this.registeredOperations.values()); return Array.from(this.registeredOperations.values());
} }
/** /**
* Helper: Get symbols for price update * Helper: Get symbols for price update
*/ */
async getSymbolsForPriceUpdate(limit = 1000): Promise<string[]> { async getSymbolsForPriceUpdate(limit = 1000): Promise<string[]> {
return this.getStaleSymbols('price_update', { return this.getStaleSymbols('price_update', {
minHoursSinceRun: 24, minHoursSinceRun: 24,
limit limit
}); });
} }
/** /**
* Helper: Get symbols with outdated financials * Helper: Get symbols with outdated financials
*/ */
async getSymbolsWithOldFinancials(limit = 100): Promise<Array<{ symbol: string; lastRecordDate?: Date }>> { async getSymbolsWithOldFinancials(limit = 100): Promise<Array<{ qmSearchCode: string; lastRecordDate?: Date }>> {
const cutoffDate = new Date(); const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - 90); // 90 days old cutoffDate.setDate(cutoffDate.getDate() - 90); // 90 days old
return this.getSymbolsNeedingUpdate('financials_update', { return this.getSymbolsNeedingUpdate('financials_update', {
lastRecordBefore: cutoffDate, lastRecordBefore: cutoffDate,
limit limit
}); });
} }
/** /**
* Helper: Get unprocessed symbols for an operation * Helper: Get unprocessed symbols for an operation
*/ */
async getUnprocessedSymbols(operation: string, limit = 500): Promise<string[]> { async getUnprocessedSymbols(operation: string, limit = 500): Promise<string[]> {
const symbols = await this.getSymbolsNeedingUpdate(operation, { const symbols = await this.getSymbolsNeedingUpdate(operation, {
neverRun: true, neverRun: true,
limit limit
}); });
return symbols.map(s => s.symbol); return symbols.map(s => s.qmSearchCode);
} }
} }

View file

@ -84,7 +84,7 @@ export interface QMSymbolOperationStatus {
} }
export interface IntradayCrawlSymbol { export interface IntradayCrawlSymbol {
symbol: string; qmSearchCode: string;
lastRecordDate?: Date; lastRecordDate?: Date;
crawlState?: { crawlState?: {
finished: boolean; finished: boolean;