initial qm operation tracker

This commit is contained in:
Boki 2025-06-28 09:21:28 -04:00
parent 73399ef142
commit 4ad232c35e
8 changed files with 928 additions and 0 deletions

View file

@ -4,4 +4,5 @@
export { checkSessions, createSession } from './session.action';
export { searchSymbols, spiderSymbol } from './symbol.action';
export { updatePrices, updateIntradayBars, getOperationStats } from './price.action';

View file

@ -0,0 +1,297 @@
/**
* QM Price Actions - Price data updates with operation tracking
*/
import type { BaseHandler, ExecutionContext } from '@stock-bot/handlers';
import { QMOperationTracker } from '../shared/operation-tracker';
// Cache tracker instance
let operationTracker: QMOperationTracker | null = null;
/**
* Get or initialize the operation tracker
*/
async function getOperationTracker(handler: BaseHandler): Promise<QMOperationTracker> {
if (!operationTracker) {
const { initializeQMOperations } = await import('../shared/operation-registry');
operationTracker = await initializeQMOperations(handler.mongodb, handler.logger);
}
return operationTracker;
}
/**
* Update daily price data for stale symbols
*/
export async function updatePrices(
this: BaseHandler,
input: {
limit?: number;
symbols?: string[];
} = {},
_context?: ExecutionContext
): Promise<{
message: string;
symbolsUpdated: number;
errors: number;
}> {
const { limit = 100, symbols } = input;
const tracker = await getOperationTracker(this);
this.logger.info('Starting price update operation', { limit, specificSymbols: symbols?.length });
try {
// Get symbols that need updating
let symbolsToUpdate: string[];
if (symbols && symbols.length > 0) {
// Update specific symbols
symbolsToUpdate = symbols;
} else {
// Get stale symbols
symbolsToUpdate = await tracker.getStaleSymbols('price_update', {
minHoursSinceRun: 24,
limit
});
}
if (symbolsToUpdate.length === 0) {
this.logger.info('No symbols need price updates');
return {
message: 'No symbols need price updates',
symbolsUpdated: 0,
errors: 0
};
}
this.logger.info(`Found ${symbolsToUpdate.length} symbols for price update`);
let updated = 0;
let errors = 0;
const updateResults = [];
// Process symbols (in real implementation, you'd fetch prices from QM API)
for (const symbol of symbolsToUpdate) {
try {
// TODO: Actual price fetching logic here
// const prices = await fetchPricesFromQM(symbol);
// For now, simulate the update
const mockPrices = {
symbol,
lastPrice: Math.random() * 1000,
volume: Math.floor(Math.random() * 1000000),
date: new Date()
};
// Track the operation
updateResults.push({
symbol,
operation: 'price_update',
data: {
status: 'success' as const,
lastRecordDate: mockPrices.date,
recordCount: 1
}
});
updated++;
} catch (error) {
this.logger.error(`Failed to update prices for ${symbol}`, { error });
updateResults.push({
symbol,
operation: 'price_update',
data: {
status: 'failure' as const
}
});
errors++;
}
}
// Bulk update operation tracking
if (updateResults.length > 0) {
await tracker.bulkUpdateSymbolOperations(updateResults);
}
this.logger.info('Price update operation completed', {
symbolsUpdated: updated,
errors,
total: symbolsToUpdate.length
});
return {
message: `Updated prices for ${updated} symbols`,
symbolsUpdated: updated,
errors
};
} catch (error) {
this.logger.error('Price update operation failed', { error });
throw error;
}
}
/**
* Update intraday price bars - crawls backwards until no more data
*/
export async function updateIntradayBars(
this: BaseHandler,
input: {
symbol?: string;
limit?: number;
} = {},
_context?: ExecutionContext
): Promise<{
message: string;
symbol: string;
barsCollected: number;
crawlFinished: boolean;
}> {
const { symbol, limit = 1 } = input;
const tracker = await getOperationTracker(this);
try {
// Get symbols for intraday crawl
let symbolData;
if (symbol) {
symbolData = await tracker.getSymbolsForIntradayCrawl('intraday_bars', {
limit: 1
}).then(symbols => symbols.find(s => s.symbol === symbol));
} else {
const symbols = await tracker.getSymbolsForIntradayCrawl('intraday_bars', {
limit
});
symbolData = symbols[0];
}
if (!symbolData) {
return {
message: 'No symbols available for intraday crawl',
symbol: '',
barsCollected: 0,
crawlFinished: false
};
}
this.logger.info('Processing intraday bars', {
symbol: symbolData.symbol,
crawlState: symbolData.crawlState
});
let barsCollected = 0;
let crawlFinished = false;
if (symbolData.crawlState?.finished) {
// Already finished initial crawl, just update from last record
this.logger.debug('Symbol already crawled, updating from last record', {
symbol: symbolData.symbol,
lastRecord: symbolData.lastRecordDate
});
// TODO: Fetch bars from lastRecordDate to now
const newBars = 10; // Mock data
await tracker.updateSymbolOperation(symbolData.symbol, 'intraday_bars', {
status: 'success',
lastRecordDate: new Date(),
recordCount: (symbolData.crawlState as any).recordCount + newBars
});
return {
message: `Updated ${newBars} new bars for ${symbolData.symbol}`,
symbol: symbolData.symbol,
barsCollected: newBars,
crawlFinished: true
};
}
// Initial crawl - go backwards until no data
let currentDate = new Date();
let oldestDate = currentDate;
let totalBars = 0;
let consecutiveEmptyDays = 0;
const maxEmptyDays = 5; // Stop after 5 consecutive days with no data
while (consecutiveEmptyDays < maxEmptyDays) {
// TODO: Actual bar fetching logic
// const bars = await fetchIntradayBars(symbolData.symbol, currentDate);
// Mock data - simulate decreasing data as we go back
const bars = currentDate > new Date('2020-01-01') ? Math.floor(Math.random() * 100) : 0;
if (bars === 0) {
consecutiveEmptyDays++;
} else {
consecutiveEmptyDays = 0;
totalBars += bars;
oldestDate = new Date(currentDate);
}
// Update progress
await tracker.updateSymbolOperation(symbolData.symbol, 'intraday_bars', {
status: 'partial',
lastRecordDate: new Date(),
recordCount: totalBars,
crawlState: {
finished: false,
oldestDateReached: oldestDate
}
});
// Move to previous day
currentDate.setDate(currentDate.getDate() - 1);
// Limit crawl for this execution
if (totalBars > 1000) {
this.logger.info('Reached bar limit for this execution', {
symbol: symbolData.symbol,
barsCollected: totalBars
});
break;
}
}
// Check if we finished the crawl
if (consecutiveEmptyDays >= maxEmptyDays) {
crawlFinished = true;
await tracker.markCrawlFinished(symbolData.symbol, 'intraday_bars', oldestDate);
this.logger.info('Completed initial crawl for symbol', {
symbol: symbolData.symbol,
totalBars,
oldestDate
});
}
return {
message: `Collected ${totalBars} bars for ${symbolData.symbol}`,
symbol: symbolData.symbol,
barsCollected: totalBars,
crawlFinished
};
} catch (error) {
this.logger.error('Intraday bars update failed', { error });
throw error;
}
}
/**
* Get operation statistics
*/
export async function getOperationStats(
this: BaseHandler,
input: {
operation: string;
},
_context?: ExecutionContext
): Promise<any> {
const tracker = await getOperationTracker(this);
const stats = await tracker.getOperationStats(input.operation);
return {
operation: input.operation,
...stats
};
}

View file

@ -5,6 +5,8 @@
import type { BaseHandler, ExecutionContext } from '@stock-bot/handlers';
import { QM_CONFIG, QM_SESSION_IDS } from '../shared/config';
import { QMSessionManager } from '../shared/session-manager';
import { QMOperationTracker } from '../shared/operation-tracker';
import { initializeQMOperations } from '../shared/operation-registry';
import type { Exchange, SymbolSpiderJob } from '../shared/types';
/**

View file

@ -5,6 +5,7 @@ import {
ScheduledOperation,
} from '@stock-bot/handlers';
import { checkSessions, createSession, searchSymbols, spiderSymbol } from './actions';
import { updatePrices, updateIntradayBars, getOperationStats } from './actions/price.action';
@Handler('qm')
export class QMHandler extends BaseHandler {
@ -37,4 +38,27 @@ export class QMHandler extends BaseHandler {
@Operation('search-symbols')
searchSymbols = searchSymbols;
/**
* PRICE DATA
*/
@ScheduledOperation('update-prices', '0 */6 * * *', {
priority: 5,
immediately: false,
description: 'Update daily prices every 6 hours'
})
updatePrices = updatePrices;
@ScheduledOperation('update-intraday-bars', '*/30 * * * *', {
priority: 6,
immediately: false,
description: 'Update intraday bars every 30 minutes during market hours'
})
updateIntradayBars = updateIntradayBars;
/**
* MONITORING
*/
@Operation('get-operation-stats')
getOperationStats = getOperationStats;
}

View file

@ -0,0 +1,4 @@
export * from './config';
export * from './session-manager';
export * from './types';
export * from './operation-tracker';

View file

@ -0,0 +1,118 @@
/**
* QM Operation Registry - Define and register all QM operations
*/
import type { MongoDBClient } from '@stock-bot/mongodb';
import type { Logger } from '@stock-bot/types';
import { QMOperationTracker } from './operation-tracker';
import type { QMOperationConfig } from './types';
// Define all QM operations
export const QM_OPERATIONS: QMOperationConfig[] = [
// Price data operations
{
name: 'price_update',
type: 'standard',
description: 'Update daily price data',
defaultStaleHours: 24
},
{
name: 'intraday_bars',
type: 'intraday_crawl',
description: 'Crawl intraday price bars from today backwards',
requiresFinishedFlag: true,
defaultStaleHours: 1 // Check every hour for new data
},
// Fundamental data operations
{
name: 'financials_update',
type: 'standard',
description: 'Update financial statements',
defaultStaleHours: 24 * 7 // Weekly
},
{
name: 'earnings_update',
type: 'standard',
description: 'Update earnings data',
defaultStaleHours: 24 * 7 // Weekly
},
{
name: 'dividends_update',
type: 'standard',
description: 'Update dividend history',
defaultStaleHours: 24 * 7 // Weekly
},
{
name: 'splits_update',
type: 'standard',
description: 'Update stock split history',
defaultStaleHours: 24 * 30 // Monthly
},
// News and filings
{
name: 'filings_update',
type: 'standard',
description: 'Update SEC filings',
defaultStaleHours: 24 // Daily
},
{
name: 'news_update',
type: 'standard',
description: 'Update news articles',
defaultStaleHours: 6 // Every 6 hours
},
// Technical indicators
{
name: 'indicators_update',
type: 'standard',
description: 'Calculate technical indicators',
defaultStaleHours: 24 // Daily
},
// Options data
{
name: 'options_chain',
type: 'standard',
description: 'Update options chain data',
defaultStaleHours: 1 // Hourly during market hours
}
];
/**
* Initialize operation tracker with all registered operations
*/
export async function initializeQMOperations(
mongodb: MongoDBClient,
logger: Logger
): Promise<QMOperationTracker> {
logger.info('Initializing QM operations tracker');
const tracker = new QMOperationTracker(mongodb, logger);
// Register all operations
for (const operation of QM_OPERATIONS) {
try {
await tracker.registerOperation(operation);
logger.debug(`Registered operation: ${operation.name}`);
} catch (error) {
logger.error(`Failed to register operation: ${operation.name}`, { error });
throw error;
}
}
logger.info('QM operations tracker initialized', {
operationCount: QM_OPERATIONS.length
});
return tracker;
}
/**
* Get operation configuration by name
*/
export function getOperationConfig(name: string): QMOperationConfig | undefined {
return QM_OPERATIONS.find(op => op.name === name);
}

View file

@ -0,0 +1,441 @@
/**
* QM Operation Tracker - Tracks operation execution times and states for symbols
* Supports dynamic operation registration with auto-indexing
*/
import type { Logger } from '@stock-bot/types';
import type { MongoDBClient } from '@stock-bot/mongodb';
import type { IntradayCrawlSymbol, QMOperationConfig } from './types';
export class QMOperationTracker {
private registeredOperations: Map<string, QMOperationConfig> = new Map();
private indexesCreated: Set<string> = new Set();
private mongodb: MongoDBClient;
private logger: Logger;
private readonly collectionName = 'qmSymbols';
constructor(mongodb: MongoDBClient, logger: Logger) {
this.mongodb = mongodb;
this.logger = logger;
}
/**
* Register a new operation type with auto-indexing
*/
async registerOperation(config: QMOperationConfig): Promise<void> {
this.logger.info('Registering QM operation', { operation: config.name, type: config.type });
this.registeredOperations.set(config.name, config);
// Auto-create indexes for this operation
await this.createOperationIndexes(config.name);
this.logger.debug('Operation registered successfully', { operation: config.name });
}
/**
* Create indexes for efficient operation queries
*/
private async createOperationIndexes(operationName: string): Promise<void> {
if (this.indexesCreated.has(operationName)) {
this.logger.debug('Indexes already created for operation', { operation: operationName });
return;
}
try {
const indexes = [
// Index for finding stale symbols
{ [`operations.${operationName}.lastRunAt`]: 1, symbol: 1 },
// Index for finding by last record date
{ [`operations.${operationName}.lastRecordDate`]: 1, symbol: 1 },
];
// Add crawl state index for intraday operations
const config = this.registeredOperations.get(operationName);
if (config?.type === 'intraday_crawl') {
indexes.push({ [`operations.${operationName}.crawlState.finished`]: 1, symbol: 1 });
}
for (const indexSpec of indexes) {
await this.mongodb.createIndex(this.collectionName, indexSpec, {
background: true,
name: `op_${operationName}_${Object.keys(indexSpec).join('_')}`
});
}
this.indexesCreated.add(operationName);
this.logger.info('Created indexes for operation', {
operation: operationName,
indexCount: indexes.length
});
} catch (error) {
this.logger.error('Failed to create indexes for operation', {
operation: operationName,
error
});
throw error;
}
}
/**
* Update symbol operation status
*/
async updateSymbolOperation(
symbol: string,
operationName: string,
data: {
status: 'success' | 'failure' | 'partial';
lastRecordDate?: Date;
recordCount?: number;
crawlState?: {
finished?: boolean;
oldestDateReached?: Date;
};
}
): Promise<void> {
const update: any = {
$set: {
[`operations.${operationName}.lastRunAt`]: new Date(),
[`operations.${operationName}.status`]: data.status,
updated_at: new Date()
}
};
if (data.lastRecordDate) {
update.$set[`operations.${operationName}.lastRecordDate`] = data.lastRecordDate;
}
if (data.recordCount !== undefined) {
update.$set[`operations.${operationName}.recordCount`] = data.recordCount;
}
if (data.crawlState) {
update.$set[`operations.${operationName}.crawlState`] = {
...data.crawlState,
lastCrawlDirection: data.crawlState.finished ? 'forward' : 'backward'
};
}
await this.mongodb.updateOne(this.collectionName, { symbol }, update);
this.logger.trace('Updated symbol operation', {
symbol,
operation: operationName,
status: data.status
});
}
/**
* Bulk update symbol operations for performance
*/
async bulkUpdateSymbolOperations(
updates: Array<{
symbol: string;
operation: string;
data: {
status: 'success' | 'failure' | 'partial';
lastRecordDate?: Date;
recordCount?: number;
crawlState?: any;
};
}>
): Promise<void> {
if (updates.length === 0) return;
const bulkOps = updates.map(({ symbol, operation, data }) => {
const update: any = {
$set: {
[`operations.${operation}.lastRunAt`]: new Date(),
[`operations.${operation}.status`]: data.status,
updated_at: new Date()
}
};
if (data.lastRecordDate) {
update.$set[`operations.${operation}.lastRecordDate`] = data.lastRecordDate;
}
if (data.recordCount !== undefined) {
update.$set[`operations.${operation}.recordCount`] = data.recordCount;
}
if (data.crawlState) {
update.$set[`operations.${operation}.crawlState`] = {
...data.crawlState,
lastCrawlDirection: data.crawlState.finished ? 'forward' : 'backward'
};
}
return {
updateOne: {
filter: { symbol },
update
}
};
});
const collection = this.mongodb.getCollection(this.collectionName);
const result = await collection.bulkWrite(bulkOps as any, { ordered: false });
this.logger.debug('Bulk updated symbol operations', {
totalUpdates: updates.length,
modified: result.modifiedCount,
operations: [...new Set(updates.map(u => u.operation))]
});
}
/**
* Get symbols that need processing for an operation
*/
async getStaleSymbols(
operationName: string,
options: {
notRunSince?: Date;
minHoursSinceRun?: number;
limit?: number;
excludeSymbols?: string[];
} = {}
): Promise<string[]> {
const { limit = 1000, excludeSymbols = [] } = options;
const cutoffDate = options.notRunSince || (() => {
const date = new Date();
const hours = options.minHoursSinceRun ||
this.registeredOperations.get(operationName)?.defaultStaleHours || 24;
date.setHours(date.getHours() - hours);
return date;
})();
const filter: any = {
$or: [
{ [`operations.${operationName}.lastRunAt`]: { $lt: cutoffDate } },
{ [`operations.${operationName}`]: { $exists: false } }
]
};
if (excludeSymbols.length > 0) {
filter.symbol = { $nin: excludeSymbols };
}
const symbols = await this.mongodb.find(this.collectionName, filter, {
limit,
projection: { symbol: 1 },
sort: { [`operations.${operationName}.lastRunAt`]: 1 } // Oldest first
});
return symbols.map(s => s.symbol);
}
/**
* Get symbols for intraday crawling
*/
async getSymbolsForIntradayCrawl(
operationName: string,
options: {
limit?: number;
includeFinished?: boolean;
} = {}
): Promise<IntradayCrawlSymbol[]> {
const { limit = 100, includeFinished = false } = options;
const filter: any = {};
if (!includeFinished) {
filter[`operations.${operationName}.crawlState.finished`] = { $ne: true };
}
const symbols = await this.mongodb.find(this.collectionName, filter, {
limit,
projection: {
symbol: 1,
[`operations.${operationName}`]: 1
},
sort: {
// Prioritize symbols that haven't been crawled yet
[`operations.${operationName}.lastRunAt`]: 1
}
});
return symbols.map(s => ({
symbol: s.symbol,
lastRecordDate: s.operations?.[operationName]?.lastRecordDate,
crawlState: s.operations?.[operationName]?.crawlState
}));
}
/**
* Mark intraday crawl as finished
*/
async markCrawlFinished(
symbol: string,
operationName: string,
oldestDateReached: Date
): Promise<void> {
await this.updateSymbolOperation(symbol, operationName, {
status: 'success',
crawlState: {
finished: true,
oldestDateReached
}
});
this.logger.info('Marked crawl as finished', {
symbol,
operation: operationName,
oldestDateReached
});
}
/**
* Get symbols that need data updates based on last record date
*/
async getSymbolsNeedingUpdate(
operationName: string,
options: {
lastRecordBefore?: Date;
neverRun?: boolean;
limit?: number;
} = {}
): Promise<Array<{ symbol: string; lastRecordDate?: Date }>> {
const { limit = 500 } = options;
const filter: any = {};
if (options.neverRun) {
filter[`operations.${operationName}`] = { $exists: false };
} else if (options.lastRecordBefore) {
filter.$or = [
{ [`operations.${operationName}.lastRecordDate`]: { $lt: options.lastRecordBefore } },
{ [`operations.${operationName}`]: { $exists: false } }
];
}
const symbols = await this.mongodb.find(this.collectionName, filter, {
limit,
projection: {
symbol: 1,
[`operations.${operationName}.lastRecordDate`]: 1
},
sort: { [`operations.${operationName}.lastRecordDate`]: 1 } // Oldest data first
});
return symbols.map(s => ({
symbol: s.symbol,
lastRecordDate: s.operations?.[operationName]?.lastRecordDate
}));
}
/**
* Get operation statistics
*/
async getOperationStats(operationName: string): Promise<{
totalSymbols: number;
processedSymbols: number;
staleSymbols: number;
successfulSymbols: number;
failedSymbols: number;
finishedCrawls?: number;
avgRecordsPerSymbol?: number;
}> {
const total = await this.mongodb.countDocuments(this.collectionName);
const processed = await this.mongodb.countDocuments(this.collectionName, {
[`operations.${operationName}`]: { $exists: true }
});
const successful = await this.mongodb.countDocuments(this.collectionName, {
[`operations.${operationName}.status`]: 'success'
});
const failed = await this.mongodb.countDocuments(this.collectionName, {
[`operations.${operationName}.status`]: 'failure'
});
const staleDate = new Date();
staleDate.setHours(staleDate.getHours() - (
this.registeredOperations.get(operationName)?.defaultStaleHours || 24
));
const stale = await this.mongodb.countDocuments(this.collectionName, {
$or: [
{ [`operations.${operationName}.lastRunAt`]: { $lt: staleDate } },
{ [`operations.${operationName}`]: { $exists: false } }
]
});
const result: any = {
totalSymbols: total,
processedSymbols: processed,
staleSymbols: stale,
successfulSymbols: successful,
failedSymbols: failed
};
// Additional stats for crawl operations
if (this.registeredOperations.get(operationName)?.type === 'intraday_crawl') {
result.finishedCrawls = await this.mongodb.countDocuments(this.collectionName, {
[`operations.${operationName}.crawlState.finished`]: true
});
}
// Calculate average records per symbol
const aggregation = await this.mongodb.aggregate(this.collectionName, [
{
$match: {
[`operations.${operationName}.recordCount`]: { $exists: true }
}
},
{
$group: {
_id: null,
avgRecords: { $avg: `$operations.${operationName}.recordCount` }
}
}
]);
if (aggregation.length > 0) {
result.avgRecordsPerSymbol = Math.round(aggregation[0].avgRecords);
}
return result;
}
/**
* Get all registered operations
*/
getRegisteredOperations(): QMOperationConfig[] {
return Array.from(this.registeredOperations.values());
}
/**
* Helper: Get symbols for price update
*/
async getSymbolsForPriceUpdate(limit = 1000): Promise<string[]> {
return this.getStaleSymbols('price_update', {
minHoursSinceRun: 24,
limit
});
}
/**
* Helper: Get symbols with outdated financials
*/
async getSymbolsWithOldFinancials(limit = 100): Promise<Array<{ symbol: string; lastRecordDate?: Date }>> {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - 90); // 90 days old
return this.getSymbolsNeedingUpdate('financials_update', {
lastRecordBefore: cutoffDate,
limit
});
}
/**
* Helper: Get unprocessed symbols for an operation
*/
async getUnprocessedSymbols(operation: string, limit = 500): Promise<string[]> {
const symbols = await this.getSymbolsNeedingUpdate(operation, {
neverRun: true,
limit
});
return symbols.map(s => s.symbol);
}
}

View file

@ -51,3 +51,44 @@ export interface CachedSession extends QMSession {
id: string;
sessionType: string;
}
/**
* Operation tracking types
*/
export interface QMOperationConfig {
name: string;
type: 'standard' | 'intraday_crawl';
description?: string;
defaultStaleHours?: number;
requiresFinishedFlag?: boolean;
}
export interface QMSymbolOperationStatus {
symbol: string;
qmSearchCode: string;
operations: {
[operationName: string]: {
lastRunAt: Date;
lastRecordDate?: Date;
status: 'success' | 'failure' | 'partial';
recordCount?: number;
// For intraday crawling operations
crawlState?: {
finished: boolean;
oldestDateReached?: Date;
lastCrawlDirection?: 'forward' | 'backward';
};
};
};
updated_at: Date;
}
export interface IntradayCrawlSymbol {
symbol: string;
lastRecordDate?: Date;
crawlState?: {
finished: boolean;
oldestDateReached?: Date;
lastCrawlDirection?: 'forward' | 'backward';
};
}