qm scaffolding done
This commit is contained in:
parent
736b86e66a
commit
c799962f05
11 changed files with 1693 additions and 336 deletions
|
|
@ -0,0 +1,276 @@
|
||||||
|
/**
|
||||||
|
* QM Corporate Actions - Fetch and update dividends, splits, and earnings together
|
||||||
|
*/
|
||||||
|
|
||||||
|
import type { BaseHandler, ExecutionContext } from '@stock-bot/handlers';
|
||||||
|
import { QM_CONFIG, QM_SESSION_IDS } from '../shared/config';
|
||||||
|
import { QMSessionManager } from '../shared/session-manager';
|
||||||
|
import { QMOperationTracker } from '../shared/operation-tracker';
|
||||||
|
|
||||||
|
// Cache tracker instance
|
||||||
|
let operationTracker: QMOperationTracker | null = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get or initialize the operation tracker
|
||||||
|
*/
|
||||||
|
async function getOperationTracker(handler: BaseHandler): Promise<QMOperationTracker> {
|
||||||
|
if (!operationTracker) {
|
||||||
|
const { initializeQMOperations } = await import('../shared/operation-registry');
|
||||||
|
operationTracker = await initializeQMOperations(handler.mongodb, handler.logger);
|
||||||
|
}
|
||||||
|
return operationTracker;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update corporate actions (dividends, splits, earnings) for a single symbol
|
||||||
|
* Single API call returns all three data types
|
||||||
|
*/
|
||||||
|
export async function updateCorporateActions(
|
||||||
|
this: BaseHandler,
|
||||||
|
input: {
|
||||||
|
symbol: string;
|
||||||
|
symbolId: number;
|
||||||
|
},
|
||||||
|
_context?: ExecutionContext
|
||||||
|
): Promise<{
|
||||||
|
success: boolean;
|
||||||
|
symbol: string;
|
||||||
|
message: string;
|
||||||
|
data?: {
|
||||||
|
dividends: number;
|
||||||
|
splits: number;
|
||||||
|
earnings: number;
|
||||||
|
};
|
||||||
|
}> {
|
||||||
|
const { symbol, symbolId } = input;
|
||||||
|
|
||||||
|
this.logger.info('Fetching corporate actions', { symbol, symbolId });
|
||||||
|
|
||||||
|
const sessionManager = QMSessionManager.getInstance();
|
||||||
|
sessionManager.initialize(this.cache, this.logger);
|
||||||
|
|
||||||
|
// Get a session - you'll need to add the appropriate session ID
|
||||||
|
const sessionId = QM_SESSION_IDS.LOOKUP; // TODO: Update with correct session ID
|
||||||
|
const session = await sessionManager.getSession(sessionId);
|
||||||
|
|
||||||
|
if (!session || !session.uuid) {
|
||||||
|
throw new Error(`No active session found for QM corporate actions`);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Build API request for corporate actions
|
||||||
|
const searchParams = new URLSearchParams({
|
||||||
|
symbol: symbol,
|
||||||
|
symbolId: symbolId.toString(),
|
||||||
|
qmodTool: 'CorporateActions', // Assuming this returns all three
|
||||||
|
webmasterId: '500'
|
||||||
|
});
|
||||||
|
|
||||||
|
// TODO: Update with correct corporate actions endpoint
|
||||||
|
const apiUrl = `${QM_CONFIG.BASE_URL}/datatool/corporate-actions.json?${searchParams.toString()}`;
|
||||||
|
|
||||||
|
const response = await fetch(apiUrl, {
|
||||||
|
method: 'GET',
|
||||||
|
headers: session.headers,
|
||||||
|
proxy: session.proxy,
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
throw new Error(`QM API request failed: ${response.status} ${response.statusText}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const corporateData = await response.json();
|
||||||
|
|
||||||
|
// Update session success stats
|
||||||
|
await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid);
|
||||||
|
|
||||||
|
const tracker = await getOperationTracker(this);
|
||||||
|
let dividendCount = 0;
|
||||||
|
let splitCount = 0;
|
||||||
|
let earningsCount = 0;
|
||||||
|
|
||||||
|
// Process dividends
|
||||||
|
if (corporateData.dividends && corporateData.dividends.length > 0) {
|
||||||
|
await this.mongodb.batchUpsert(
|
||||||
|
'qmDividends',
|
||||||
|
corporateData.dividends.map((dividend: any) => ({
|
||||||
|
...dividend,
|
||||||
|
symbol,
|
||||||
|
symbolId,
|
||||||
|
updated_at: new Date()
|
||||||
|
})),
|
||||||
|
['symbol', 'exDate', 'recordDate']
|
||||||
|
);
|
||||||
|
dividendCount = corporateData.dividends.length;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process splits
|
||||||
|
if (corporateData.splits && corporateData.splits.length > 0) {
|
||||||
|
await this.mongodb.batchUpsert(
|
||||||
|
'qmSplits',
|
||||||
|
corporateData.splits.map((split: any) => ({
|
||||||
|
...split,
|
||||||
|
symbol,
|
||||||
|
symbolId,
|
||||||
|
updated_at: new Date()
|
||||||
|
})),
|
||||||
|
['symbol', 'splitDate', 'ratio']
|
||||||
|
);
|
||||||
|
splitCount = corporateData.splits.length;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process earnings
|
||||||
|
if (corporateData.earnings && corporateData.earnings.length > 0) {
|
||||||
|
await this.mongodb.batchUpsert(
|
||||||
|
'qmEarnings',
|
||||||
|
corporateData.earnings.map((earning: any) => ({
|
||||||
|
...earning,
|
||||||
|
symbol,
|
||||||
|
symbolId,
|
||||||
|
updated_at: new Date()
|
||||||
|
})),
|
||||||
|
['symbol', 'reportDate', 'fiscalQuarter']
|
||||||
|
);
|
||||||
|
earningsCount = corporateData.earnings.length;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update tracking for corporate actions
|
||||||
|
const updateTime = new Date();
|
||||||
|
|
||||||
|
await tracker.updateSymbolOperation(symbol, 'corporate_actions_update', {
|
||||||
|
status: 'success',
|
||||||
|
lastRecordDate: updateTime,
|
||||||
|
recordCount: dividendCount + splitCount + earningsCount
|
||||||
|
});
|
||||||
|
|
||||||
|
this.logger.info('Corporate actions updated successfully', {
|
||||||
|
symbol,
|
||||||
|
dividendCount,
|
||||||
|
splitCount,
|
||||||
|
earningsCount
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
success: true,
|
||||||
|
symbol,
|
||||||
|
message: `Corporate actions updated for ${symbol}`,
|
||||||
|
data: {
|
||||||
|
dividends: dividendCount,
|
||||||
|
splits: splitCount,
|
||||||
|
earnings: earningsCount
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
} catch (error) {
|
||||||
|
// Update session failure stats
|
||||||
|
if (session.uuid) {
|
||||||
|
await sessionManager.incrementFailedCalls(sessionId, session.uuid);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger.error('Error fetching corporate actions', {
|
||||||
|
symbol,
|
||||||
|
error: error instanceof Error ? error.message : 'Unknown error'
|
||||||
|
});
|
||||||
|
|
||||||
|
// Track failure for corporate actions
|
||||||
|
const tracker = await getOperationTracker(this);
|
||||||
|
|
||||||
|
await tracker.updateSymbolOperation(symbol, 'corporate_actions_update', {
|
||||||
|
status: 'failure'
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
success: false,
|
||||||
|
symbol,
|
||||||
|
message: `Failed to fetch corporate actions: ${error instanceof Error ? error.message : 'Unknown error'}`
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Schedule corporate actions updates for symbols that need refreshing
|
||||||
|
*/
|
||||||
|
export async function scheduleCorporateActionsUpdates(
|
||||||
|
this: BaseHandler,
|
||||||
|
input: {
|
||||||
|
limit?: number;
|
||||||
|
forceUpdate?: boolean;
|
||||||
|
} = {},
|
||||||
|
_context?: ExecutionContext
|
||||||
|
): Promise<{
|
||||||
|
message: string;
|
||||||
|
symbolsQueued: number;
|
||||||
|
errors: number;
|
||||||
|
}> {
|
||||||
|
const { limit = 100, forceUpdate = false } = input;
|
||||||
|
const tracker = await getOperationTracker(this);
|
||||||
|
|
||||||
|
this.logger.info('Scheduling corporate actions updates', { limit, forceUpdate });
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Get symbols that need corporate actions updates
|
||||||
|
const staleSymbols = await tracker.getStaleSymbols('corporate_actions_update', {
|
||||||
|
minHoursSinceRun: forceUpdate ? 0 : 24 * 7, // Weekly
|
||||||
|
limit
|
||||||
|
});
|
||||||
|
|
||||||
|
if (staleSymbols.length === 0) {
|
||||||
|
this.logger.info('No symbols need corporate actions updates');
|
||||||
|
return {
|
||||||
|
message: 'No symbols need corporate actions updates',
|
||||||
|
symbolsQueued: 0,
|
||||||
|
errors: 0
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger.info(`Found ${staleSymbols.length} symbols needing corporate actions updates`);
|
||||||
|
|
||||||
|
// Get full symbol data to include symbolId
|
||||||
|
const symbolDocs = await this.mongodb.find('qmSymbols', {
|
||||||
|
symbol: { $in: staleSymbols.slice(0, limit) } // Apply limit after deduplication
|
||||||
|
}, {
|
||||||
|
projection: { symbol: 1, symbolId: 1 }
|
||||||
|
});
|
||||||
|
|
||||||
|
let queued = 0;
|
||||||
|
let errors = 0;
|
||||||
|
|
||||||
|
// Schedule individual update jobs for each symbol
|
||||||
|
for (const doc of symbolDocs) {
|
||||||
|
try {
|
||||||
|
if (!doc.symbolId) {
|
||||||
|
this.logger.warn(`Symbol ${doc.symbol} missing symbolId, skipping`);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.scheduleOperation('update-corporate-actions', {
|
||||||
|
symbol: doc.symbol,
|
||||||
|
symbolId: doc.symbolId
|
||||||
|
}, {
|
||||||
|
priority: 4,
|
||||||
|
delay: queued * 1500 // 1.5 seconds between jobs
|
||||||
|
});
|
||||||
|
|
||||||
|
queued++;
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.error(`Failed to schedule corporate actions update for ${doc.symbol}`, { error });
|
||||||
|
errors++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger.info('Corporate actions update scheduling completed', {
|
||||||
|
symbolsQueued: queued,
|
||||||
|
errors,
|
||||||
|
total: staleSymbols.length
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
message: `Scheduled corporate actions updates for ${queued} symbols`,
|
||||||
|
symbolsQueued: queued,
|
||||||
|
errors
|
||||||
|
};
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.error('Corporate actions scheduling failed', { error });
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,245 @@
|
||||||
|
/**
|
||||||
|
* QM Filings Actions - Fetch and update SEC filings
|
||||||
|
*/
|
||||||
|
|
||||||
|
import type { BaseHandler, ExecutionContext } from '@stock-bot/handlers';
|
||||||
|
import { QM_CONFIG, QM_SESSION_IDS } from '../shared/config';
|
||||||
|
import { QMSessionManager } from '../shared/session-manager';
|
||||||
|
import { QMOperationTracker } from '../shared/operation-tracker';
|
||||||
|
|
||||||
|
// Cache tracker instance
|
||||||
|
let operationTracker: QMOperationTracker | null = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get or initialize the operation tracker
|
||||||
|
*/
|
||||||
|
async function getOperationTracker(handler: BaseHandler): Promise<QMOperationTracker> {
|
||||||
|
if (!operationTracker) {
|
||||||
|
const { initializeQMOperations } = await import('../shared/operation-registry');
|
||||||
|
operationTracker = await initializeQMOperations(handler.mongodb, handler.logger);
|
||||||
|
}
|
||||||
|
return operationTracker;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update filings for a single symbol
|
||||||
|
*/
|
||||||
|
export async function updateFilings(
|
||||||
|
this: BaseHandler,
|
||||||
|
input: {
|
||||||
|
symbol: string;
|
||||||
|
symbolId: number;
|
||||||
|
},
|
||||||
|
_context?: ExecutionContext
|
||||||
|
): Promise<{
|
||||||
|
success: boolean;
|
||||||
|
symbol: string;
|
||||||
|
message: string;
|
||||||
|
data?: any;
|
||||||
|
}> {
|
||||||
|
const { symbol, symbolId } = input;
|
||||||
|
|
||||||
|
this.logger.info('Fetching filings', { symbol, symbolId });
|
||||||
|
|
||||||
|
const sessionManager = QMSessionManager.getInstance();
|
||||||
|
sessionManager.initialize(this.cache, this.logger);
|
||||||
|
|
||||||
|
// Get a session - you'll need to add the appropriate session ID for filings
|
||||||
|
const sessionId = QM_SESSION_IDS.LOOKUP; // TODO: Update with correct session ID
|
||||||
|
const session = await sessionManager.getSession(sessionId);
|
||||||
|
|
||||||
|
if (!session || !session.uuid) {
|
||||||
|
throw new Error(`No active session found for QM filings`);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Build API request for filings
|
||||||
|
const searchParams = new URLSearchParams({
|
||||||
|
symbol: symbol,
|
||||||
|
symbolId: symbolId.toString(),
|
||||||
|
qmodTool: 'Filings',
|
||||||
|
webmasterId: '500',
|
||||||
|
limit: '50' // Get recent filings
|
||||||
|
});
|
||||||
|
|
||||||
|
// TODO: Update with correct filings endpoint
|
||||||
|
const apiUrl = `${QM_CONFIG.BASE_URL}/datatool/filings.json?${searchParams.toString()}`;
|
||||||
|
|
||||||
|
const response = await fetch(apiUrl, {
|
||||||
|
method: 'GET',
|
||||||
|
headers: session.headers,
|
||||||
|
proxy: session.proxy,
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
throw new Error(`QM API request failed: ${response.status} ${response.statusText}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const filingsData = await response.json();
|
||||||
|
|
||||||
|
// Update session success stats
|
||||||
|
await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid);
|
||||||
|
|
||||||
|
// Process and store filings data
|
||||||
|
if (filingsData && filingsData.length > 0) {
|
||||||
|
// Store filings in a separate collection
|
||||||
|
await this.mongodb.batchUpsert(
|
||||||
|
'qmFilings',
|
||||||
|
filingsData.map((filing: any) => ({
|
||||||
|
...filing,
|
||||||
|
symbol,
|
||||||
|
symbolId,
|
||||||
|
updated_at: new Date()
|
||||||
|
})),
|
||||||
|
['symbol', 'filingDate', 'formType', 'accessionNumber'] // Unique keys
|
||||||
|
);
|
||||||
|
|
||||||
|
// Update symbol to track last filings update
|
||||||
|
const tracker = await getOperationTracker(this);
|
||||||
|
await tracker.updateSymbolOperation(symbol, 'filings_update', {
|
||||||
|
status: 'success',
|
||||||
|
lastRecordDate: new Date(),
|
||||||
|
recordCount: filingsData.length
|
||||||
|
});
|
||||||
|
|
||||||
|
this.logger.info('Filings updated successfully', {
|
||||||
|
symbol,
|
||||||
|
filingsCount: filingsData.length
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
success: true,
|
||||||
|
symbol,
|
||||||
|
message: `Filings updated for ${symbol}`,
|
||||||
|
data: { count: filingsData.length }
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
// Some symbols may not have filings (non-US companies, etc)
|
||||||
|
const tracker = await getOperationTracker(this);
|
||||||
|
await tracker.updateSymbolOperation(symbol, 'filings_update', {
|
||||||
|
status: 'success',
|
||||||
|
lastRecordDate: new Date(),
|
||||||
|
recordCount: 0
|
||||||
|
});
|
||||||
|
|
||||||
|
this.logger.info('No filings found for symbol', { symbol });
|
||||||
|
return {
|
||||||
|
success: true,
|
||||||
|
symbol,
|
||||||
|
message: `No filings found for ${symbol}`,
|
||||||
|
data: { count: 0 }
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (error) {
|
||||||
|
// Update session failure stats
|
||||||
|
if (session.uuid) {
|
||||||
|
await sessionManager.incrementFailedCalls(sessionId, session.uuid);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger.error('Error fetching filings', {
|
||||||
|
symbol,
|
||||||
|
error: error instanceof Error ? error.message : 'Unknown error'
|
||||||
|
});
|
||||||
|
|
||||||
|
// Track failure
|
||||||
|
const tracker = await getOperationTracker(this);
|
||||||
|
await tracker.updateSymbolOperation(symbol, 'filings_update', {
|
||||||
|
status: 'failure'
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
success: false,
|
||||||
|
symbol,
|
||||||
|
message: `Failed to fetch filings: ${error instanceof Error ? error.message : 'Unknown error'}`
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Schedule filings updates for symbols that need refreshing
|
||||||
|
*/
|
||||||
|
export async function scheduleFilingsUpdates(
|
||||||
|
this: BaseHandler,
|
||||||
|
input: {
|
||||||
|
limit?: number;
|
||||||
|
forceUpdate?: boolean;
|
||||||
|
} = {},
|
||||||
|
_context?: ExecutionContext
|
||||||
|
): Promise<{
|
||||||
|
message: string;
|
||||||
|
symbolsQueued: number;
|
||||||
|
errors: number;
|
||||||
|
}> {
|
||||||
|
const { limit = 100, forceUpdate = false } = input;
|
||||||
|
const tracker = await getOperationTracker(this);
|
||||||
|
|
||||||
|
this.logger.info('Scheduling filings updates', { limit, forceUpdate });
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Get symbols that need updating
|
||||||
|
const staleSymbols = await tracker.getStaleSymbols('filings_update', {
|
||||||
|
minHoursSinceRun: forceUpdate ? 0 : 24, // Daily for filings
|
||||||
|
limit
|
||||||
|
});
|
||||||
|
|
||||||
|
if (staleSymbols.length === 0) {
|
||||||
|
this.logger.info('No symbols need filings updates');
|
||||||
|
return {
|
||||||
|
message: 'No symbols need filings updates',
|
||||||
|
symbolsQueued: 0,
|
||||||
|
errors: 0
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger.info(`Found ${staleSymbols.length} symbols needing filings updates`);
|
||||||
|
|
||||||
|
// Get full symbol data to include symbolId
|
||||||
|
const symbolDocs = await this.mongodb.find('qmSymbols', {
|
||||||
|
symbol: { $in: staleSymbols }
|
||||||
|
}, {
|
||||||
|
projection: { symbol: 1, symbolId: 1 }
|
||||||
|
});
|
||||||
|
|
||||||
|
let queued = 0;
|
||||||
|
let errors = 0;
|
||||||
|
|
||||||
|
// Schedule individual update jobs for each symbol
|
||||||
|
for (const doc of symbolDocs) {
|
||||||
|
try {
|
||||||
|
if (!doc.symbolId) {
|
||||||
|
this.logger.warn(`Symbol ${doc.symbol} missing symbolId, skipping`);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.scheduleOperation('update-filings', {
|
||||||
|
symbol: doc.symbol,
|
||||||
|
symbolId: doc.symbolId
|
||||||
|
}, {
|
||||||
|
priority: 5, // Lower priority than financial data
|
||||||
|
delay: queued * 2000 // 2 seconds between jobs
|
||||||
|
});
|
||||||
|
|
||||||
|
queued++;
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.error(`Failed to schedule filings update for ${doc.symbol}`, { error });
|
||||||
|
errors++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger.info('Filings update scheduling completed', {
|
||||||
|
symbolsQueued: queued,
|
||||||
|
errors,
|
||||||
|
total: staleSymbols.length
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
message: `Scheduled filings updates for ${queued} symbols`,
|
||||||
|
symbolsQueued: queued,
|
||||||
|
errors
|
||||||
|
};
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.error('Filings scheduling failed', { error });
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,236 @@
|
||||||
|
/**
|
||||||
|
* QM Financials Actions - Fetch and update financial statements
|
||||||
|
*/
|
||||||
|
|
||||||
|
import type { BaseHandler, ExecutionContext } from '@stock-bot/handlers';
|
||||||
|
import { QM_CONFIG, QM_SESSION_IDS } from '../shared/config';
|
||||||
|
import { QMSessionManager } from '../shared/session-manager';
|
||||||
|
import { QMOperationTracker } from '../shared/operation-tracker';
|
||||||
|
|
||||||
|
// Cache tracker instance
|
||||||
|
let operationTracker: QMOperationTracker | null = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get or initialize the operation tracker
|
||||||
|
*/
|
||||||
|
async function getOperationTracker(handler: BaseHandler): Promise<QMOperationTracker> {
|
||||||
|
if (!operationTracker) {
|
||||||
|
const { initializeQMOperations } = await import('../shared/operation-registry');
|
||||||
|
operationTracker = await initializeQMOperations(handler.mongodb, handler.logger);
|
||||||
|
}
|
||||||
|
return operationTracker;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update financials for a single symbol
|
||||||
|
*/
|
||||||
|
export async function updateFinancials(
|
||||||
|
this: BaseHandler,
|
||||||
|
input: {
|
||||||
|
symbol: string;
|
||||||
|
symbolId: number;
|
||||||
|
},
|
||||||
|
_context?: ExecutionContext
|
||||||
|
): Promise<{
|
||||||
|
success: boolean;
|
||||||
|
symbol: string;
|
||||||
|
message: string;
|
||||||
|
data?: any;
|
||||||
|
}> {
|
||||||
|
const { symbol, symbolId } = input;
|
||||||
|
|
||||||
|
this.logger.info('Fetching financials', { symbol, symbolId });
|
||||||
|
|
||||||
|
const sessionManager = QMSessionManager.getInstance();
|
||||||
|
sessionManager.initialize(this.cache, this.logger);
|
||||||
|
|
||||||
|
// Get a session - you'll need to add the appropriate session ID for financials
|
||||||
|
const sessionId = QM_SESSION_IDS.LOOKUP; // TODO: Update with correct session ID
|
||||||
|
const session = await sessionManager.getSession(sessionId);
|
||||||
|
|
||||||
|
if (!session || !session.uuid) {
|
||||||
|
throw new Error(`No active session found for QM financials`);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Build API request for financials
|
||||||
|
const searchParams = new URLSearchParams({
|
||||||
|
symbol: symbol,
|
||||||
|
symbolId: symbolId.toString(),
|
||||||
|
qmodTool: 'Financials',
|
||||||
|
webmasterId: '500'
|
||||||
|
});
|
||||||
|
|
||||||
|
// TODO: Update with correct financials endpoint
|
||||||
|
const apiUrl = `${QM_CONFIG.BASE_URL}/datatool/financials.json?${searchParams.toString()}`;
|
||||||
|
|
||||||
|
const response = await fetch(apiUrl, {
|
||||||
|
method: 'GET',
|
||||||
|
headers: session.headers,
|
||||||
|
proxy: session.proxy,
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
throw new Error(`QM API request failed: ${response.status} ${response.statusText}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const financialData = await response.json();
|
||||||
|
|
||||||
|
// Update session success stats
|
||||||
|
await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid);
|
||||||
|
|
||||||
|
// Process and store financial data
|
||||||
|
if (financialData && financialData.length > 0) {
|
||||||
|
// Store financial statements in a separate collection
|
||||||
|
await this.mongodb.batchUpsert(
|
||||||
|
'qmFinancials',
|
||||||
|
financialData.map((statement: any) => ({
|
||||||
|
...statement,
|
||||||
|
symbol,
|
||||||
|
symbolId,
|
||||||
|
updated_at: new Date()
|
||||||
|
})),
|
||||||
|
['symbol', 'period', 'statementType'] // Unique keys
|
||||||
|
);
|
||||||
|
|
||||||
|
// Update symbol to track last financials update
|
||||||
|
const tracker = await getOperationTracker(this);
|
||||||
|
await tracker.updateSymbolOperation(symbol, 'financials_update', {
|
||||||
|
status: 'success',
|
||||||
|
lastRecordDate: new Date(),
|
||||||
|
recordCount: financialData.length
|
||||||
|
});
|
||||||
|
|
||||||
|
this.logger.info('Financials updated successfully', {
|
||||||
|
symbol,
|
||||||
|
statementCount: financialData.length
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
success: true,
|
||||||
|
symbol,
|
||||||
|
message: `Financials updated for ${symbol}`,
|
||||||
|
data: { count: financialData.length }
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
this.logger.warn('No financial data returned from API', { symbol });
|
||||||
|
return {
|
||||||
|
success: false,
|
||||||
|
symbol,
|
||||||
|
message: `No financial data found for symbol ${symbol}`
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (error) {
|
||||||
|
// Update session failure stats
|
||||||
|
if (session.uuid) {
|
||||||
|
await sessionManager.incrementFailedCalls(sessionId, session.uuid);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger.error('Error fetching financials', {
|
||||||
|
symbol,
|
||||||
|
error: error instanceof Error ? error.message : 'Unknown error'
|
||||||
|
});
|
||||||
|
|
||||||
|
// Track failure
|
||||||
|
const tracker = await getOperationTracker(this);
|
||||||
|
await tracker.updateSymbolOperation(symbol, 'financials_update', {
|
||||||
|
status: 'failure',
|
||||||
|
lastRunAt: new Date()
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
success: false,
|
||||||
|
symbol,
|
||||||
|
message: `Failed to fetch financials: ${error instanceof Error ? error.message : 'Unknown error'}`
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Schedule financial updates for symbols that need refreshing
|
||||||
|
*/
|
||||||
|
export async function scheduleFinancialsUpdates(
|
||||||
|
this: BaseHandler,
|
||||||
|
input: {
|
||||||
|
limit?: number;
|
||||||
|
forceUpdate?: boolean;
|
||||||
|
} = {},
|
||||||
|
_context?: ExecutionContext
|
||||||
|
): Promise<{
|
||||||
|
message: string;
|
||||||
|
symbolsQueued: number;
|
||||||
|
errors: number;
|
||||||
|
}> {
|
||||||
|
const { limit = 100, forceUpdate = false } = input;
|
||||||
|
const tracker = await getOperationTracker(this);
|
||||||
|
|
||||||
|
this.logger.info('Scheduling financials updates', { limit, forceUpdate });
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Get symbols that need updating
|
||||||
|
const staleSymbols = await tracker.getStaleSymbols('financials_update', {
|
||||||
|
minHoursSinceRun: forceUpdate ? 0 : 24 * 7, // Weekly by default
|
||||||
|
limit
|
||||||
|
});
|
||||||
|
|
||||||
|
if (staleSymbols.length === 0) {
|
||||||
|
this.logger.info('No symbols need financials updates');
|
||||||
|
return {
|
||||||
|
message: 'No symbols need financials updates',
|
||||||
|
symbolsQueued: 0,
|
||||||
|
errors: 0
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger.info(`Found ${staleSymbols.length} symbols needing financials updates`);
|
||||||
|
|
||||||
|
// Get full symbol data to include symbolId
|
||||||
|
const symbolDocs = await this.mongodb.find('qmSymbols', {
|
||||||
|
symbol: { $in: staleSymbols }
|
||||||
|
}, {
|
||||||
|
projection: { symbol: 1, symbolId: 1 }
|
||||||
|
});
|
||||||
|
|
||||||
|
let queued = 0;
|
||||||
|
let errors = 0;
|
||||||
|
|
||||||
|
// Schedule individual update jobs for each symbol
|
||||||
|
for (const doc of symbolDocs) {
|
||||||
|
try {
|
||||||
|
if (!doc.symbolId) {
|
||||||
|
this.logger.warn(`Symbol ${doc.symbol} missing symbolId, skipping`);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.scheduleOperation('update-financials', {
|
||||||
|
symbol: doc.symbol,
|
||||||
|
symbolId: doc.symbolId
|
||||||
|
}, {
|
||||||
|
priority: 4,
|
||||||
|
delay: queued * 2000 // 2 seconds between jobs
|
||||||
|
});
|
||||||
|
|
||||||
|
queued++;
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.error(`Failed to schedule financials update for ${doc.symbol}`, { error });
|
||||||
|
errors++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger.info('Financials update scheduling completed', {
|
||||||
|
symbolsQueued: queued,
|
||||||
|
errors,
|
||||||
|
total: staleSymbols.length
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
message: `Scheduled financials updates for ${queued} symbols`,
|
||||||
|
symbolsQueued: queued,
|
||||||
|
errors
|
||||||
|
};
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.error('Financials scheduling failed', { error });
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -4,5 +4,11 @@
|
||||||
|
|
||||||
export { checkSessions, createSession } from './session.action';
|
export { checkSessions, createSession } from './session.action';
|
||||||
export { searchSymbols, spiderSymbol } from './symbol.action';
|
export { searchSymbols, spiderSymbol } from './symbol.action';
|
||||||
export { updatePrices, updateIntradayBars, getOperationStats } from './price.action';
|
export { getOperationStats } from './price.action';
|
||||||
|
export { updateSymbolInfo, scheduleSymbolInfoUpdates } from './symbol-info.action';
|
||||||
|
export { updateFinancials, scheduleFinancialsUpdates } from './financials.action';
|
||||||
|
export { updateCorporateActions, scheduleCorporateActionsUpdates } from './corporate-actions.action';
|
||||||
|
export { updateFilings, scheduleFilingsUpdates } from './filings.action';
|
||||||
|
export { updatePrices, schedulePriceUpdates } from './prices.action';
|
||||||
|
export { updateIntradayBars, scheduleIntradayUpdates } from './intraday.action';
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,302 @@
|
||||||
|
/**
|
||||||
|
* QM Intraday Actions - Fetch and update intraday price bars
|
||||||
|
*/
|
||||||
|
|
||||||
|
import type { BaseHandler, ExecutionContext } from '@stock-bot/handlers';
|
||||||
|
import { QM_CONFIG, QM_SESSION_IDS } from '../shared/config';
|
||||||
|
import { QMSessionManager } from '../shared/session-manager';
|
||||||
|
import { QMOperationTracker } from '../shared/operation-tracker';
|
||||||
|
|
||||||
|
// Cache tracker instance
|
||||||
|
let operationTracker: QMOperationTracker | null = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get or initialize the operation tracker
|
||||||
|
*/
|
||||||
|
async function getOperationTracker(handler: BaseHandler): Promise<QMOperationTracker> {
|
||||||
|
if (!operationTracker) {
|
||||||
|
const { initializeQMOperations } = await import('../shared/operation-registry');
|
||||||
|
operationTracker = await initializeQMOperations(handler.mongodb, handler.logger);
|
||||||
|
}
|
||||||
|
return operationTracker;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update intraday bars for a single symbol
|
||||||
|
* This handles both initial crawl and incremental updates
|
||||||
|
*/
|
||||||
|
export async function updateIntradayBars(
|
||||||
|
this: BaseHandler,
|
||||||
|
input: {
|
||||||
|
symbol: string;
|
||||||
|
symbolId: number;
|
||||||
|
crawlDate?: string; // ISO date string for specific date crawl
|
||||||
|
},
|
||||||
|
_context?: ExecutionContext
|
||||||
|
): Promise<{
|
||||||
|
success: boolean;
|
||||||
|
symbol: string;
|
||||||
|
message: string;
|
||||||
|
data?: any;
|
||||||
|
}> {
|
||||||
|
const { symbol, symbolId, crawlDate } = input;
|
||||||
|
|
||||||
|
this.logger.info('Fetching intraday bars', { symbol, symbolId, crawlDate });
|
||||||
|
|
||||||
|
const sessionManager = QMSessionManager.getInstance();
|
||||||
|
sessionManager.initialize(this.cache, this.logger);
|
||||||
|
|
||||||
|
// Get a session - you'll need to add the appropriate session ID for intraday
|
||||||
|
const sessionId = QM_SESSION_IDS.LOOKUP; // TODO: Update with correct session ID
|
||||||
|
const session = await sessionManager.getSession(sessionId);
|
||||||
|
|
||||||
|
if (!session || !session.uuid) {
|
||||||
|
throw new Error(`No active session found for QM intraday`);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Determine the date to fetch
|
||||||
|
const targetDate = crawlDate ? new Date(crawlDate) : new Date();
|
||||||
|
|
||||||
|
// Build API request for intraday bars
|
||||||
|
const searchParams = new URLSearchParams({
|
||||||
|
symbol: symbol,
|
||||||
|
symbolId: symbolId.toString(),
|
||||||
|
qmodTool: 'IntradayBars',
|
||||||
|
webmasterId: '500',
|
||||||
|
date: targetDate.toISOString().split('T')[0],
|
||||||
|
interval: '1' // 1-minute bars
|
||||||
|
});
|
||||||
|
|
||||||
|
// TODO: Update with correct intraday endpoint
|
||||||
|
const apiUrl = `${QM_CONFIG.BASE_URL}/datatool/intraday.json?${searchParams.toString()}`;
|
||||||
|
|
||||||
|
const response = await fetch(apiUrl, {
|
||||||
|
method: 'GET',
|
||||||
|
headers: session.headers,
|
||||||
|
proxy: session.proxy,
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
throw new Error(`QM API request failed: ${response.status} ${response.statusText}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const barsData = await response.json();
|
||||||
|
|
||||||
|
// Update session success stats
|
||||||
|
await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid);
|
||||||
|
|
||||||
|
// Process and store intraday data
|
||||||
|
if (barsData && barsData.length > 0) {
|
||||||
|
// Store bars in a separate collection
|
||||||
|
const processedBars = barsData.map((bar: any) => ({
|
||||||
|
...bar,
|
||||||
|
symbol,
|
||||||
|
symbolId,
|
||||||
|
timestamp: new Date(bar.timestamp),
|
||||||
|
date: targetDate,
|
||||||
|
updated_at: new Date()
|
||||||
|
}));
|
||||||
|
|
||||||
|
await this.mongodb.batchUpsert(
|
||||||
|
'qmIntradayBars',
|
||||||
|
processedBars,
|
||||||
|
['symbol', 'timestamp'] // Unique keys
|
||||||
|
);
|
||||||
|
|
||||||
|
this.logger.info('Intraday bars updated successfully', {
|
||||||
|
symbol,
|
||||||
|
date: targetDate,
|
||||||
|
barCount: barsData.length
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
success: true,
|
||||||
|
symbol,
|
||||||
|
message: `Intraday bars updated for ${symbol} on ${targetDate.toISOString().split('T')[0]}`,
|
||||||
|
data: {
|
||||||
|
count: barsData.length,
|
||||||
|
date: targetDate
|
||||||
|
}
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
// No data for this date (weekend, holiday, or no trading)
|
||||||
|
this.logger.info('No intraday data for date', { symbol, date: targetDate });
|
||||||
|
return {
|
||||||
|
success: true,
|
||||||
|
symbol,
|
||||||
|
message: `No intraday data for ${symbol} on ${targetDate.toISOString().split('T')[0]}`,
|
||||||
|
data: {
|
||||||
|
count: 0,
|
||||||
|
date: targetDate
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (error) {
|
||||||
|
// Update session failure stats
|
||||||
|
if (session.uuid) {
|
||||||
|
await sessionManager.incrementFailedCalls(sessionId, session.uuid);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger.error('Error fetching intraday bars', {
|
||||||
|
symbol,
|
||||||
|
error: error instanceof Error ? error.message : 'Unknown error'
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
success: false,
|
||||||
|
symbol,
|
||||||
|
message: `Failed to fetch intraday bars: ${error instanceof Error ? error.message : 'Unknown error'}`
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Schedule intraday updates for symbols
|
||||||
|
* This handles both initial crawls and regular updates
|
||||||
|
*/
|
||||||
|
export async function scheduleIntradayUpdates(
|
||||||
|
this: BaseHandler,
|
||||||
|
input: {
|
||||||
|
limit?: number;
|
||||||
|
mode?: 'crawl' | 'update'; // crawl for historical, update for recent
|
||||||
|
forceUpdate?: boolean;
|
||||||
|
} = {},
|
||||||
|
_context?: ExecutionContext
|
||||||
|
): Promise<{
|
||||||
|
message: string;
|
||||||
|
symbolsQueued: number;
|
||||||
|
jobsQueued: number;
|
||||||
|
errors: number;
|
||||||
|
}> {
|
||||||
|
const { limit = 50, mode = 'update', forceUpdate = false } = input;
|
||||||
|
const tracker = await getOperationTracker(this);
|
||||||
|
|
||||||
|
this.logger.info('Scheduling intraday updates', { limit, mode, forceUpdate });
|
||||||
|
|
||||||
|
try {
|
||||||
|
let symbolsToProcess: any[] = [];
|
||||||
|
|
||||||
|
if (mode === 'crawl') {
|
||||||
|
// Get symbols that need historical crawl
|
||||||
|
symbolsToProcess = await tracker.getSymbolsForIntradayCrawl('intraday_bars', {
|
||||||
|
limit
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
// Get symbols that need regular updates
|
||||||
|
const staleSymbols = await tracker.getStaleSymbols('intraday_bars', {
|
||||||
|
minHoursSinceRun: forceUpdate ? 0 : 1, // Hourly updates
|
||||||
|
limit
|
||||||
|
});
|
||||||
|
|
||||||
|
if (staleSymbols.length === 0) {
|
||||||
|
this.logger.info('No symbols need intraday updates');
|
||||||
|
return {
|
||||||
|
message: 'No symbols need intraday updates',
|
||||||
|
symbolsQueued: 0,
|
||||||
|
jobsQueued: 0,
|
||||||
|
errors: 0
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get full symbol data
|
||||||
|
symbolsToProcess = await this.mongodb.find('qmSymbols', {
|
||||||
|
symbol: { $in: staleSymbols }
|
||||||
|
}, {
|
||||||
|
projection: { symbol: 1, symbolId: 1 }
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if (symbolsToProcess.length === 0) {
|
||||||
|
this.logger.info('No symbols to process for intraday');
|
||||||
|
return {
|
||||||
|
message: 'No symbols to process',
|
||||||
|
symbolsQueued: 0,
|
||||||
|
jobsQueued: 0,
|
||||||
|
errors: 0
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger.info(`Found ${symbolsToProcess.length} symbols for intraday ${mode}`);
|
||||||
|
|
||||||
|
let symbolsQueued = 0;
|
||||||
|
let jobsQueued = 0;
|
||||||
|
let errors = 0;
|
||||||
|
|
||||||
|
// Process each symbol
|
||||||
|
for (const doc of symbolsToProcess) {
|
||||||
|
try {
|
||||||
|
if (!doc.symbolId) {
|
||||||
|
this.logger.warn(`Symbol ${doc.symbol} missing symbolId, skipping`);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mode === 'crawl' && doc.crawlState) {
|
||||||
|
// For crawl mode, schedule multiple days going backwards
|
||||||
|
const startDate = doc.crawlState.oldestDateReached || new Date();
|
||||||
|
const daysToFetch = 30; // Fetch 30 days at a time
|
||||||
|
|
||||||
|
for (let i = 0; i < daysToFetch; i++) {
|
||||||
|
const crawlDate = new Date(startDate);
|
||||||
|
crawlDate.setDate(crawlDate.getDate() - i);
|
||||||
|
|
||||||
|
await this.scheduleOperation('update-intraday-bars', {
|
||||||
|
symbol: doc.symbol,
|
||||||
|
symbolId: doc.symbolId,
|
||||||
|
crawlDate: crawlDate.toISOString()
|
||||||
|
}, {
|
||||||
|
priority: 6,
|
||||||
|
delay: jobsQueued * 1000 // 1 second between jobs
|
||||||
|
});
|
||||||
|
|
||||||
|
jobsQueued++;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update crawl state
|
||||||
|
await tracker.updateSymbolOperation(doc.symbol, 'intraday_bars', {
|
||||||
|
status: 'partial',
|
||||||
|
crawlState: {
|
||||||
|
finished: false,
|
||||||
|
oldestDateReached: new Date(startDate.getTime() - daysToFetch * 24 * 60 * 60 * 1000),
|
||||||
|
lastCrawlDirection: 'backward'
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
// For update mode, just fetch today's data
|
||||||
|
await this.scheduleOperation('update-intraday-bars', {
|
||||||
|
symbol: doc.symbol,
|
||||||
|
symbolId: doc.symbolId
|
||||||
|
}, {
|
||||||
|
priority: 8, // High priority for current data
|
||||||
|
delay: jobsQueued * 500 // 0.5 seconds between jobs
|
||||||
|
});
|
||||||
|
|
||||||
|
jobsQueued++;
|
||||||
|
}
|
||||||
|
|
||||||
|
symbolsQueued++;
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.error(`Failed to schedule intraday update for ${doc.symbol}`, { error });
|
||||||
|
errors++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger.info('Intraday update scheduling completed', {
|
||||||
|
symbolsQueued,
|
||||||
|
jobsQueued,
|
||||||
|
errors,
|
||||||
|
mode
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
message: `Scheduled intraday ${mode} for ${symbolsQueued} symbols (${jobsQueued} jobs)`,
|
||||||
|
symbolsQueued,
|
||||||
|
jobsQueued,
|
||||||
|
errors
|
||||||
|
};
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.error('Intraday scheduling failed', { error });
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
/**
|
/**
|
||||||
* QM Price Actions - Price data updates with operation tracking
|
* QM Price Actions - Operation statistics
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import type { BaseHandler, ExecutionContext } from '@stock-bot/handlers';
|
import type { BaseHandler, ExecutionContext } from '@stock-bot/handlers';
|
||||||
|
|
@ -19,263 +19,6 @@ async function getOperationTracker(handler: BaseHandler): Promise<QMOperationTra
|
||||||
return operationTracker;
|
return operationTracker;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Update daily price data for stale symbols
|
|
||||||
*/
|
|
||||||
export async function updatePrices(
|
|
||||||
this: BaseHandler,
|
|
||||||
input: {
|
|
||||||
limit?: number;
|
|
||||||
symbols?: string[];
|
|
||||||
} = {},
|
|
||||||
_context?: ExecutionContext
|
|
||||||
): Promise<{
|
|
||||||
message: string;
|
|
||||||
symbolsUpdated: number;
|
|
||||||
errors: number;
|
|
||||||
}> {
|
|
||||||
const { limit = 100, symbols } = input;
|
|
||||||
const tracker = await getOperationTracker(this);
|
|
||||||
|
|
||||||
this.logger.info('Starting price update operation', { limit, specificSymbols: symbols?.length });
|
|
||||||
|
|
||||||
try {
|
|
||||||
// Get symbols that need updating
|
|
||||||
let symbolsToUpdate: string[];
|
|
||||||
|
|
||||||
if (symbols && symbols.length > 0) {
|
|
||||||
// Update specific symbols
|
|
||||||
symbolsToUpdate = symbols;
|
|
||||||
} else {
|
|
||||||
// Get stale symbols
|
|
||||||
symbolsToUpdate = await tracker.getStaleSymbols('price_update', {
|
|
||||||
minHoursSinceRun: 24,
|
|
||||||
limit
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
if (symbolsToUpdate.length === 0) {
|
|
||||||
this.logger.info('No symbols need price updates');
|
|
||||||
return {
|
|
||||||
message: 'No symbols need price updates',
|
|
||||||
symbolsUpdated: 0,
|
|
||||||
errors: 0
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
this.logger.info(`Found ${symbolsToUpdate.length} symbols for price update`);
|
|
||||||
|
|
||||||
let updated = 0;
|
|
||||||
let errors = 0;
|
|
||||||
const updateResults = [];
|
|
||||||
|
|
||||||
// Process symbols (in real implementation, you'd fetch prices from QM API)
|
|
||||||
for (const symbol of symbolsToUpdate) {
|
|
||||||
try {
|
|
||||||
// TODO: Actual price fetching logic here
|
|
||||||
// const prices = await fetchPricesFromQM(symbol);
|
|
||||||
|
|
||||||
// For now, simulate the update
|
|
||||||
const mockPrices = {
|
|
||||||
symbol,
|
|
||||||
lastPrice: Math.random() * 1000,
|
|
||||||
volume: Math.floor(Math.random() * 1000000),
|
|
||||||
date: new Date()
|
|
||||||
};
|
|
||||||
|
|
||||||
// Track the operation
|
|
||||||
updateResults.push({
|
|
||||||
symbol,
|
|
||||||
operation: 'price_update',
|
|
||||||
data: {
|
|
||||||
status: 'success' as const,
|
|
||||||
lastRecordDate: mockPrices.date,
|
|
||||||
recordCount: 1
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
updated++;
|
|
||||||
} catch (error) {
|
|
||||||
this.logger.error(`Failed to update prices for ${symbol}`, { error });
|
|
||||||
|
|
||||||
updateResults.push({
|
|
||||||
symbol,
|
|
||||||
operation: 'price_update',
|
|
||||||
data: {
|
|
||||||
status: 'failure' as const
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
errors++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Bulk update operation tracking
|
|
||||||
if (updateResults.length > 0) {
|
|
||||||
await tracker.bulkUpdateSymbolOperations(updateResults);
|
|
||||||
}
|
|
||||||
|
|
||||||
this.logger.info('Price update operation completed', {
|
|
||||||
symbolsUpdated: updated,
|
|
||||||
errors,
|
|
||||||
total: symbolsToUpdate.length
|
|
||||||
});
|
|
||||||
|
|
||||||
return {
|
|
||||||
message: `Updated prices for ${updated} symbols`,
|
|
||||||
symbolsUpdated: updated,
|
|
||||||
errors
|
|
||||||
};
|
|
||||||
} catch (error) {
|
|
||||||
this.logger.error('Price update operation failed', { error });
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Update intraday price bars - crawls backwards until no more data
|
|
||||||
*/
|
|
||||||
export async function updateIntradayBars(
|
|
||||||
this: BaseHandler,
|
|
||||||
input: {
|
|
||||||
symbol?: string;
|
|
||||||
limit?: number;
|
|
||||||
} = {},
|
|
||||||
_context?: ExecutionContext
|
|
||||||
): Promise<{
|
|
||||||
message: string;
|
|
||||||
symbol: string;
|
|
||||||
barsCollected: number;
|
|
||||||
crawlFinished: boolean;
|
|
||||||
}> {
|
|
||||||
const { symbol, limit = 1 } = input;
|
|
||||||
const tracker = await getOperationTracker(this);
|
|
||||||
|
|
||||||
try {
|
|
||||||
// Get symbols for intraday crawl
|
|
||||||
let symbolData;
|
|
||||||
if (symbol) {
|
|
||||||
symbolData = await tracker.getSymbolsForIntradayCrawl('intraday_bars', {
|
|
||||||
limit: 1
|
|
||||||
}).then(symbols => symbols.find(s => s.symbol === symbol));
|
|
||||||
} else {
|
|
||||||
const symbols = await tracker.getSymbolsForIntradayCrawl('intraday_bars', {
|
|
||||||
limit
|
|
||||||
});
|
|
||||||
symbolData = symbols[0];
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!symbolData) {
|
|
||||||
return {
|
|
||||||
message: 'No symbols available for intraday crawl',
|
|
||||||
symbol: '',
|
|
||||||
barsCollected: 0,
|
|
||||||
crawlFinished: false
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
this.logger.info('Processing intraday bars', {
|
|
||||||
symbol: symbolData.symbol,
|
|
||||||
crawlState: symbolData.crawlState
|
|
||||||
});
|
|
||||||
|
|
||||||
let barsCollected = 0;
|
|
||||||
let crawlFinished = false;
|
|
||||||
|
|
||||||
if (symbolData.crawlState?.finished) {
|
|
||||||
// Already finished initial crawl, just update from last record
|
|
||||||
this.logger.debug('Symbol already crawled, updating from last record', {
|
|
||||||
symbol: symbolData.symbol,
|
|
||||||
lastRecord: symbolData.lastRecordDate
|
|
||||||
});
|
|
||||||
|
|
||||||
// TODO: Fetch bars from lastRecordDate to now
|
|
||||||
const newBars = 10; // Mock data
|
|
||||||
|
|
||||||
await tracker.updateSymbolOperation(symbolData.symbol, 'intraday_bars', {
|
|
||||||
status: 'success',
|
|
||||||
lastRecordDate: new Date(),
|
|
||||||
recordCount: (symbolData.crawlState as any).recordCount + newBars
|
|
||||||
});
|
|
||||||
|
|
||||||
return {
|
|
||||||
message: `Updated ${newBars} new bars for ${symbolData.symbol}`,
|
|
||||||
symbol: symbolData.symbol,
|
|
||||||
barsCollected: newBars,
|
|
||||||
crawlFinished: true
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
// Initial crawl - go backwards until no data
|
|
||||||
let currentDate = new Date();
|
|
||||||
let oldestDate = currentDate;
|
|
||||||
let totalBars = 0;
|
|
||||||
let consecutiveEmptyDays = 0;
|
|
||||||
const maxEmptyDays = 5; // Stop after 5 consecutive days with no data
|
|
||||||
|
|
||||||
while (consecutiveEmptyDays < maxEmptyDays) {
|
|
||||||
// TODO: Actual bar fetching logic
|
|
||||||
// const bars = await fetchIntradayBars(symbolData.symbol, currentDate);
|
|
||||||
|
|
||||||
// Mock data - simulate decreasing data as we go back
|
|
||||||
const bars = currentDate > new Date('2020-01-01') ? Math.floor(Math.random() * 100) : 0;
|
|
||||||
|
|
||||||
if (bars === 0) {
|
|
||||||
consecutiveEmptyDays++;
|
|
||||||
} else {
|
|
||||||
consecutiveEmptyDays = 0;
|
|
||||||
totalBars += bars;
|
|
||||||
oldestDate = new Date(currentDate);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update progress
|
|
||||||
await tracker.updateSymbolOperation(symbolData.symbol, 'intraday_bars', {
|
|
||||||
status: 'partial',
|
|
||||||
lastRecordDate: new Date(),
|
|
||||||
recordCount: totalBars,
|
|
||||||
crawlState: {
|
|
||||||
finished: false,
|
|
||||||
oldestDateReached: oldestDate
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Move to previous day
|
|
||||||
currentDate.setDate(currentDate.getDate() - 1);
|
|
||||||
|
|
||||||
// Limit crawl for this execution
|
|
||||||
if (totalBars > 1000) {
|
|
||||||
this.logger.info('Reached bar limit for this execution', {
|
|
||||||
symbol: symbolData.symbol,
|
|
||||||
barsCollected: totalBars
|
|
||||||
});
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if we finished the crawl
|
|
||||||
if (consecutiveEmptyDays >= maxEmptyDays) {
|
|
||||||
crawlFinished = true;
|
|
||||||
await tracker.markCrawlFinished(symbolData.symbol, 'intraday_bars', oldestDate);
|
|
||||||
|
|
||||||
this.logger.info('Completed initial crawl for symbol', {
|
|
||||||
symbol: symbolData.symbol,
|
|
||||||
totalBars,
|
|
||||||
oldestDate
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
return {
|
|
||||||
message: `Collected ${totalBars} bars for ${symbolData.symbol}`,
|
|
||||||
symbol: symbolData.symbol,
|
|
||||||
barsCollected: totalBars,
|
|
||||||
crawlFinished
|
|
||||||
};
|
|
||||||
} catch (error) {
|
|
||||||
this.logger.error('Intraday bars update failed', { error });
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get operation statistics
|
* Get operation statistics
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,249 @@
|
||||||
|
/**
|
||||||
|
* QM Prices Actions - Fetch and update daily price data
|
||||||
|
*/
|
||||||
|
|
||||||
|
import type { BaseHandler, ExecutionContext } from '@stock-bot/handlers';
|
||||||
|
import { QM_CONFIG, QM_SESSION_IDS } from '../shared/config';
|
||||||
|
import { QMSessionManager } from '../shared/session-manager';
|
||||||
|
import { QMOperationTracker } from '../shared/operation-tracker';
|
||||||
|
|
||||||
|
// Cache tracker instance
|
||||||
|
let operationTracker: QMOperationTracker | null = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get or initialize the operation tracker
|
||||||
|
*/
|
||||||
|
async function getOperationTracker(handler: BaseHandler): Promise<QMOperationTracker> {
|
||||||
|
if (!operationTracker) {
|
||||||
|
const { initializeQMOperations } = await import('../shared/operation-registry');
|
||||||
|
operationTracker = await initializeQMOperations(handler.mongodb, handler.logger);
|
||||||
|
}
|
||||||
|
return operationTracker;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update daily prices for a single symbol
|
||||||
|
*/
|
||||||
|
export async function updatePrices(
|
||||||
|
this: BaseHandler,
|
||||||
|
input: {
|
||||||
|
symbol: string;
|
||||||
|
symbolId: number;
|
||||||
|
},
|
||||||
|
_context?: ExecutionContext
|
||||||
|
): Promise<{
|
||||||
|
success: boolean;
|
||||||
|
symbol: string;
|
||||||
|
message: string;
|
||||||
|
data?: any;
|
||||||
|
}> {
|
||||||
|
const { symbol, symbolId } = input;
|
||||||
|
|
||||||
|
this.logger.info('Fetching daily prices', { symbol, symbolId });
|
||||||
|
|
||||||
|
const sessionManager = QMSessionManager.getInstance();
|
||||||
|
sessionManager.initialize(this.cache, this.logger);
|
||||||
|
|
||||||
|
// Get a session - you'll need to add the appropriate session ID for prices
|
||||||
|
const sessionId = QM_SESSION_IDS.LOOKUP; // TODO: Update with correct session ID
|
||||||
|
const session = await sessionManager.getSession(sessionId);
|
||||||
|
|
||||||
|
if (!session || !session.uuid) {
|
||||||
|
throw new Error(`No active session found for QM prices`);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Build API request for daily prices
|
||||||
|
const searchParams = new URLSearchParams({
|
||||||
|
symbol: symbol,
|
||||||
|
symbolId: symbolId.toString(),
|
||||||
|
qmodTool: 'DailyPrices',
|
||||||
|
webmasterId: '500',
|
||||||
|
days: '30' // Get last 30 days
|
||||||
|
});
|
||||||
|
|
||||||
|
// TODO: Update with correct prices endpoint
|
||||||
|
const apiUrl = `${QM_CONFIG.BASE_URL}/datatool/prices.json?${searchParams.toString()}`;
|
||||||
|
|
||||||
|
const response = await fetch(apiUrl, {
|
||||||
|
method: 'GET',
|
||||||
|
headers: session.headers,
|
||||||
|
proxy: session.proxy,
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
throw new Error(`QM API request failed: ${response.status} ${response.statusText}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const priceData = await response.json();
|
||||||
|
|
||||||
|
// Update session success stats
|
||||||
|
await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid);
|
||||||
|
|
||||||
|
// Process and store price data
|
||||||
|
if (priceData && priceData.length > 0) {
|
||||||
|
// Store prices in a separate collection
|
||||||
|
const processedPrices = priceData.map((price: any) => ({
|
||||||
|
...price,
|
||||||
|
symbol,
|
||||||
|
symbolId,
|
||||||
|
date: new Date(price.date),
|
||||||
|
updated_at: new Date()
|
||||||
|
}));
|
||||||
|
|
||||||
|
await this.mongodb.batchUpsert(
|
||||||
|
'qmPrices',
|
||||||
|
processedPrices,
|
||||||
|
['symbol', 'date'] // Unique keys
|
||||||
|
);
|
||||||
|
|
||||||
|
// Find the latest price date
|
||||||
|
const latestDate = processedPrices.reduce((latest: Date, price: any) =>
|
||||||
|
price.date > latest ? price.date : latest,
|
||||||
|
new Date(0)
|
||||||
|
);
|
||||||
|
|
||||||
|
// Update symbol to track last price update
|
||||||
|
const tracker = await getOperationTracker(this);
|
||||||
|
await tracker.updateSymbolOperation(symbol, 'price_update', {
|
||||||
|
status: 'success',
|
||||||
|
lastRecordDate: latestDate,
|
||||||
|
recordCount: priceData.length
|
||||||
|
});
|
||||||
|
|
||||||
|
this.logger.info('Prices updated successfully', {
|
||||||
|
symbol,
|
||||||
|
priceCount: priceData.length,
|
||||||
|
latestDate
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
success: true,
|
||||||
|
symbol,
|
||||||
|
message: `Prices updated for ${symbol}`,
|
||||||
|
data: {
|
||||||
|
count: priceData.length,
|
||||||
|
latestDate
|
||||||
|
}
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
this.logger.warn('No price data returned from API', { symbol });
|
||||||
|
return {
|
||||||
|
success: false,
|
||||||
|
symbol,
|
||||||
|
message: `No price data found for symbol ${symbol}`
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (error) {
|
||||||
|
// Update session failure stats
|
||||||
|
if (session.uuid) {
|
||||||
|
await sessionManager.incrementFailedCalls(sessionId, session.uuid);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger.error('Error fetching prices', {
|
||||||
|
symbol,
|
||||||
|
error: error instanceof Error ? error.message : 'Unknown error'
|
||||||
|
});
|
||||||
|
|
||||||
|
// Track failure
|
||||||
|
const tracker = await getOperationTracker(this);
|
||||||
|
await tracker.updateSymbolOperation(symbol, 'price_update', {
|
||||||
|
status: 'failure'
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
success: false,
|
||||||
|
symbol,
|
||||||
|
message: `Failed to fetch prices: ${error instanceof Error ? error.message : 'Unknown error'}`
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Schedule price updates for symbols that need refreshing
|
||||||
|
*/
|
||||||
|
export async function schedulePriceUpdates(
|
||||||
|
this: BaseHandler,
|
||||||
|
input: {
|
||||||
|
limit?: number;
|
||||||
|
forceUpdate?: boolean;
|
||||||
|
} = {},
|
||||||
|
_context?: ExecutionContext
|
||||||
|
): Promise<{
|
||||||
|
message: string;
|
||||||
|
symbolsQueued: number;
|
||||||
|
errors: number;
|
||||||
|
}> {
|
||||||
|
const { limit = 100, forceUpdate = false } = input;
|
||||||
|
const tracker = await getOperationTracker(this);
|
||||||
|
|
||||||
|
this.logger.info('Scheduling price updates', { limit, forceUpdate });
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Get symbols that need updating
|
||||||
|
const staleSymbols = await tracker.getStaleSymbols('price_update', {
|
||||||
|
minHoursSinceRun: forceUpdate ? 0 : 24, // Daily updates
|
||||||
|
limit
|
||||||
|
});
|
||||||
|
|
||||||
|
if (staleSymbols.length === 0) {
|
||||||
|
this.logger.info('No symbols need price updates');
|
||||||
|
return {
|
||||||
|
message: 'No symbols need price updates',
|
||||||
|
symbolsQueued: 0,
|
||||||
|
errors: 0
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger.info(`Found ${staleSymbols.length} symbols needing price updates`);
|
||||||
|
|
||||||
|
// Get full symbol data to include symbolId
|
||||||
|
const symbolDocs = await this.mongodb.find('qmSymbols', {
|
||||||
|
symbol: { $in: staleSymbols }
|
||||||
|
}, {
|
||||||
|
projection: { symbol: 1, symbolId: 1 }
|
||||||
|
});
|
||||||
|
|
||||||
|
let queued = 0;
|
||||||
|
let errors = 0;
|
||||||
|
|
||||||
|
// Schedule individual update jobs for each symbol
|
||||||
|
for (const doc of symbolDocs) {
|
||||||
|
try {
|
||||||
|
if (!doc.symbolId) {
|
||||||
|
this.logger.warn(`Symbol ${doc.symbol} missing symbolId, skipping`);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.scheduleOperation('update-prices', {
|
||||||
|
symbol: doc.symbol,
|
||||||
|
symbolId: doc.symbolId
|
||||||
|
}, {
|
||||||
|
priority: 7, // High priority for price data
|
||||||
|
delay: queued * 500 // 0.5 seconds between jobs
|
||||||
|
});
|
||||||
|
|
||||||
|
queued++;
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.error(`Failed to schedule price update for ${doc.symbol}`, { error });
|
||||||
|
errors++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger.info('Price update scheduling completed', {
|
||||||
|
symbolsQueued: queued,
|
||||||
|
errors,
|
||||||
|
total: staleSymbols.length
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
message: `Scheduled price updates for ${queued} symbols`,
|
||||||
|
symbolsQueued: queued,
|
||||||
|
errors
|
||||||
|
};
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.error('Price scheduling failed', { error });
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,230 @@
|
||||||
|
/**
|
||||||
|
* QM Symbol Info Actions - Fetch and update symbol metadata
|
||||||
|
*/
|
||||||
|
|
||||||
|
import type { BaseHandler, ExecutionContext } from '@stock-bot/handlers';
|
||||||
|
import { QM_CONFIG, QM_SESSION_IDS } from '../shared/config';
|
||||||
|
import { QMSessionManager } from '../shared/session-manager';
|
||||||
|
import { QMOperationTracker } from '../shared/operation-tracker';
|
||||||
|
|
||||||
|
// Cache tracker instance
|
||||||
|
let operationTracker: QMOperationTracker | null = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get or initialize the operation tracker
|
||||||
|
*/
|
||||||
|
async function getOperationTracker(handler: BaseHandler): Promise<QMOperationTracker> {
|
||||||
|
if (!operationTracker) {
|
||||||
|
const { initializeQMOperations } = await import('../shared/operation-registry');
|
||||||
|
operationTracker = await initializeQMOperations(handler.mongodb, handler.logger);
|
||||||
|
}
|
||||||
|
return operationTracker;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update symbol info for a single symbol
|
||||||
|
* This is a simple API fetch operation - no tracking logic here
|
||||||
|
*/
|
||||||
|
export async function updateSymbolInfo(
|
||||||
|
this: BaseHandler,
|
||||||
|
input: {
|
||||||
|
symbol: string;
|
||||||
|
qmSearchCode: string;
|
||||||
|
},
|
||||||
|
_context?: ExecutionContext
|
||||||
|
): Promise<{
|
||||||
|
success: boolean;
|
||||||
|
symbol: string;
|
||||||
|
message: string;
|
||||||
|
data?: any;
|
||||||
|
}> {
|
||||||
|
const { symbol, qmSearchCode } = input;
|
||||||
|
|
||||||
|
this.logger.info('Fetching symbol info', { symbol, qmSearchCode });
|
||||||
|
|
||||||
|
const sessionManager = QMSessionManager.getInstance();
|
||||||
|
sessionManager.initialize(this.cache, this.logger);
|
||||||
|
|
||||||
|
// Get a session
|
||||||
|
const sessionId = QM_SESSION_IDS.LOOKUP;
|
||||||
|
const session = await sessionManager.getSession(sessionId);
|
||||||
|
|
||||||
|
if (!session || !session.uuid) {
|
||||||
|
throw new Error(`No active session found for QM LOOKUP`);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Build API request for symbol info
|
||||||
|
const searchParams = new URLSearchParams({
|
||||||
|
q: qmSearchCode || symbol,
|
||||||
|
qmodTool: 'SymbolInfo',
|
||||||
|
webmasterId: '500',
|
||||||
|
includeExtended: 'true'
|
||||||
|
});
|
||||||
|
|
||||||
|
const apiUrl = `${QM_CONFIG.LOOKUP_URL}?${searchParams.toString()}`;
|
||||||
|
|
||||||
|
const response = await fetch(apiUrl, {
|
||||||
|
method: 'GET',
|
||||||
|
headers: session.headers,
|
||||||
|
proxy: session.proxy,
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
throw new Error(`QM API request failed: ${response.status} ${response.statusText}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const symbolData = await response.json();
|
||||||
|
|
||||||
|
// Update session success stats
|
||||||
|
await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid);
|
||||||
|
|
||||||
|
// Process and store symbol info
|
||||||
|
if (symbolData && (Array.isArray(symbolData) ? symbolData.length > 0 : true)) {
|
||||||
|
const symbolInfo = Array.isArray(symbolData) ? symbolData[0] : symbolData;
|
||||||
|
|
||||||
|
// Update symbol in database with new metadata
|
||||||
|
const updateData = {
|
||||||
|
...symbolInfo,
|
||||||
|
symbol: symbol,
|
||||||
|
qmSearchCode: qmSearchCode || symbolInfo.symbol,
|
||||||
|
lastInfoUpdate: new Date(),
|
||||||
|
updated_at: new Date()
|
||||||
|
};
|
||||||
|
|
||||||
|
await this.mongodb.updateOne(
|
||||||
|
'qmSymbols',
|
||||||
|
{ symbol },
|
||||||
|
{ $set: updateData },
|
||||||
|
{ upsert: true }
|
||||||
|
);
|
||||||
|
|
||||||
|
this.logger.info('Symbol info updated successfully', {
|
||||||
|
symbol,
|
||||||
|
name: symbolInfo.name,
|
||||||
|
exchange: symbolInfo.exchange
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
success: true,
|
||||||
|
symbol,
|
||||||
|
message: `Symbol info updated for ${symbol}`,
|
||||||
|
data: symbolInfo
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
this.logger.warn('No symbol data returned from API', { symbol });
|
||||||
|
return {
|
||||||
|
success: false,
|
||||||
|
symbol,
|
||||||
|
message: `No data found for symbol ${symbol}`
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (error) {
|
||||||
|
// Update session failure stats
|
||||||
|
if (session.uuid) {
|
||||||
|
await sessionManager.incrementFailedCalls(sessionId, session.uuid);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger.error('Error fetching symbol info', {
|
||||||
|
symbol,
|
||||||
|
error: error instanceof Error ? error.message : 'Unknown error'
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
success: false,
|
||||||
|
symbol,
|
||||||
|
message: `Failed to fetch symbol info: ${error instanceof Error ? error.message : 'Unknown error'}`
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Schedule symbol info updates for symbols that need refreshing
|
||||||
|
* This is the scheduled job that finds stale symbols and queues individual updates
|
||||||
|
*/
|
||||||
|
export async function scheduleSymbolInfoUpdates(
|
||||||
|
this: BaseHandler,
|
||||||
|
input: {
|
||||||
|
limit?: number;
|
||||||
|
forceUpdate?: boolean;
|
||||||
|
} = {},
|
||||||
|
_context?: ExecutionContext
|
||||||
|
): Promise<{
|
||||||
|
message: string;
|
||||||
|
symbolsQueued: number;
|
||||||
|
errors: number;
|
||||||
|
}> {
|
||||||
|
const { limit = 100, forceUpdate = false } = input;
|
||||||
|
const tracker = await getOperationTracker(this);
|
||||||
|
|
||||||
|
this.logger.info('Scheduling symbol info updates', { limit, forceUpdate });
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Get symbols that need updating
|
||||||
|
const staleSymbols = await tracker.getStaleSymbols('symbol_info', {
|
||||||
|
minHoursSinceRun: forceUpdate ? 0 : 24 * 7, // Weekly by default
|
||||||
|
limit
|
||||||
|
});
|
||||||
|
|
||||||
|
if (staleSymbols.length === 0) {
|
||||||
|
this.logger.info('No symbols need info updates');
|
||||||
|
return {
|
||||||
|
message: 'No symbols need info updates',
|
||||||
|
symbolsQueued: 0,
|
||||||
|
errors: 0
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger.info(`Found ${staleSymbols.length} symbols needing info updates`);
|
||||||
|
|
||||||
|
// Get full symbol data to include qmSearchCode
|
||||||
|
const symbolDocs = await this.mongodb.find('qmSymbols', {
|
||||||
|
symbol: { $in: staleSymbols }
|
||||||
|
}, {
|
||||||
|
projection: { symbol: 1, qmSearchCode: 1 }
|
||||||
|
});
|
||||||
|
|
||||||
|
let queued = 0;
|
||||||
|
let errors = 0;
|
||||||
|
|
||||||
|
// Schedule individual update jobs for each symbol
|
||||||
|
for (const doc of symbolDocs) {
|
||||||
|
try {
|
||||||
|
await this.scheduleOperation('update-symbol-info', {
|
||||||
|
symbol: doc.symbol,
|
||||||
|
qmSearchCode: doc.qmSearchCode || doc.symbol
|
||||||
|
}, {
|
||||||
|
priority: 3,
|
||||||
|
// Add some delay to avoid overwhelming the API
|
||||||
|
delay: queued * 1000 // 1 second between jobs
|
||||||
|
});
|
||||||
|
|
||||||
|
// Track that we've scheduled this symbol
|
||||||
|
await tracker.updateSymbolOperation(doc.symbol, 'symbol_info', {
|
||||||
|
status: 'success'
|
||||||
|
});
|
||||||
|
|
||||||
|
queued++;
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.error(`Failed to schedule update for ${doc.symbol}`, { error });
|
||||||
|
errors++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger.info('Symbol info update scheduling completed', {
|
||||||
|
symbolsQueued: queued,
|
||||||
|
errors,
|
||||||
|
total: staleSymbols.length
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
message: `Scheduled info updates for ${queued} symbols`,
|
||||||
|
symbolsQueued: queued,
|
||||||
|
errors
|
||||||
|
};
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.error('Symbol info scheduling failed', { error });
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -4,15 +4,35 @@ import {
|
||||||
Operation,
|
Operation,
|
||||||
ScheduledOperation,
|
ScheduledOperation,
|
||||||
} from '@stock-bot/handlers';
|
} from '@stock-bot/handlers';
|
||||||
import { checkSessions, createSession, searchSymbols, spiderSymbol } from './actions';
|
import {
|
||||||
import { getOperationStats } from './actions/price.action';
|
checkSessions,
|
||||||
|
createSession,
|
||||||
|
searchSymbols,
|
||||||
|
spiderSymbol,
|
||||||
|
getOperationStats,
|
||||||
|
updateSymbolInfo,
|
||||||
|
scheduleSymbolInfoUpdates,
|
||||||
|
updateFinancials,
|
||||||
|
scheduleFinancialsUpdates,
|
||||||
|
updateCorporateActions,
|
||||||
|
scheduleCorporateActionsUpdates,
|
||||||
|
updateFilings,
|
||||||
|
scheduleFilingsUpdates,
|
||||||
|
updatePrices,
|
||||||
|
schedulePriceUpdates,
|
||||||
|
updateIntradayBars,
|
||||||
|
scheduleIntradayUpdates
|
||||||
|
} from './actions';
|
||||||
import { initializeQMOperations } from './shared/operation-registry';
|
import { initializeQMOperations } from './shared/operation-registry';
|
||||||
|
|
||||||
@Handler('qm')
|
@Handler('qm')
|
||||||
export class QMHandler extends BaseHandler {
|
export class QMHandler extends BaseHandler {
|
||||||
constructor(services: any) {
|
constructor(services: any) {
|
||||||
initializeQMOperations(services.mongodb, services.logger);
|
|
||||||
super(services); // Handler name read from @Handler decorator
|
super(services); // Handler name read from @Handler decorator
|
||||||
|
// Initialize operations after super() so services are available
|
||||||
|
initializeQMOperations(this.mongodb, this.logger).catch(error => {
|
||||||
|
this.logger.error('Failed to initialize QM operations', { error });
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -41,22 +61,83 @@ export class QMHandler extends BaseHandler {
|
||||||
@Operation('search-symbols')
|
@Operation('search-symbols')
|
||||||
searchSymbols = searchSymbols;
|
searchSymbols = searchSymbols;
|
||||||
|
|
||||||
// /**
|
/**
|
||||||
// * PRICE DATA
|
* SYMBOL INFO
|
||||||
// */
|
*/
|
||||||
// @ScheduledOperation('update-prices', '0 */6 * * *', {
|
@Operation('update-symbol-info')
|
||||||
// priority: 5,
|
updateSymbolInfo = updateSymbolInfo;
|
||||||
// immediately: false,
|
|
||||||
// description: 'Update daily prices every 6 hours'
|
|
||||||
// })
|
|
||||||
// updatePrices = updatePrices;
|
|
||||||
|
|
||||||
// @ScheduledOperation('update-intraday-bars', '*/30 * * * *', {
|
@ScheduledOperation('schedule-symbol-info-updates', '0 */6 * * *', {
|
||||||
// priority: 6,
|
priority: 7,
|
||||||
// immediately: false,
|
immediately: false,
|
||||||
// description: 'Update intraday bars every 30 minutes during market hours'
|
description: 'Check for symbols needing info updates every 6 hours'
|
||||||
// })
|
})
|
||||||
// updateIntradayBars = updateIntradayBars;
|
scheduleSymbolInfoUpdates = scheduleSymbolInfoUpdates;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* FINANCIALS
|
||||||
|
*/
|
||||||
|
@Operation('update-financials')
|
||||||
|
updateFinancials = updateFinancials;
|
||||||
|
|
||||||
|
@ScheduledOperation('schedule-financials-updates', '0 2 * * *', {
|
||||||
|
priority: 6,
|
||||||
|
immediately: false,
|
||||||
|
description: 'Check for symbols needing financials updates daily at 2 AM'
|
||||||
|
})
|
||||||
|
scheduleFinancialsUpdates = scheduleFinancialsUpdates;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* CORPORATE ACTIONS (Dividends, Splits, Earnings)
|
||||||
|
*/
|
||||||
|
@Operation('update-corporate-actions')
|
||||||
|
updateCorporateActions = updateCorporateActions;
|
||||||
|
|
||||||
|
@ScheduledOperation('schedule-corporate-actions-updates', '0 3 * * *', {
|
||||||
|
priority: 6,
|
||||||
|
immediately: false,
|
||||||
|
description: 'Check for symbols needing corporate actions updates daily at 3 AM'
|
||||||
|
})
|
||||||
|
scheduleCorporateActionsUpdates = scheduleCorporateActionsUpdates;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* FILINGS
|
||||||
|
*/
|
||||||
|
@Operation('update-filings')
|
||||||
|
updateFilings = updateFilings;
|
||||||
|
|
||||||
|
@ScheduledOperation('schedule-filings-updates', '0 */8 * * *', {
|
||||||
|
priority: 5,
|
||||||
|
immediately: false,
|
||||||
|
description: 'Check for symbols needing filings updates every 8 hours'
|
||||||
|
})
|
||||||
|
scheduleFilingsUpdates = scheduleFilingsUpdates;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* PRICE DATA
|
||||||
|
*/
|
||||||
|
@Operation('update-prices')
|
||||||
|
updatePrices = updatePrices;
|
||||||
|
|
||||||
|
@ScheduledOperation('schedule-price-updates', '0 */6 * * *', {
|
||||||
|
priority: 8,
|
||||||
|
immediately: false,
|
||||||
|
description: 'Check for symbols needing price updates every 6 hours'
|
||||||
|
})
|
||||||
|
schedulePriceUpdates = schedulePriceUpdates;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTRADAY DATA
|
||||||
|
*/
|
||||||
|
@Operation('update-intraday-bars')
|
||||||
|
updateIntradayBars = updateIntradayBars;
|
||||||
|
|
||||||
|
@ScheduledOperation('schedule-intraday-updates', '*/30 * * * *', {
|
||||||
|
priority: 9,
|
||||||
|
immediately: false,
|
||||||
|
description: 'Check for symbols needing intraday updates every 30 minutes'
|
||||||
|
})
|
||||||
|
scheduleIntradayUpdates = scheduleIntradayUpdates;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* MONITORING
|
* MONITORING
|
||||||
|
|
|
||||||
|
|
@ -2,8 +2,7 @@
|
||||||
* QM Operation Registry - Define and register all QM operations
|
* QM Operation Registry - Define and register all QM operations
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import type { MongoDBClient } from '@stock-bot/mongodb';
|
import type { MongoDBClient, Logger } from '@stock-bot/types';
|
||||||
import type { Logger } from '@stock-bot/types';
|
|
||||||
import { QMOperationTracker } from './operation-tracker';
|
import { QMOperationTracker } from './operation-tracker';
|
||||||
import type { QMOperationConfig } from './types';
|
import type { QMOperationConfig } from './types';
|
||||||
|
|
||||||
|
|
@ -16,53 +15,42 @@ export const QM_OPERATIONS: QMOperationConfig[] = [
|
||||||
description: 'Update symbol metadata',
|
description: 'Update symbol metadata',
|
||||||
defaultStaleHours: 24 * 7 // Weekly
|
defaultStaleHours: 24 * 7 // Weekly
|
||||||
},
|
},
|
||||||
// {
|
{
|
||||||
// name: 'price_update',
|
name: 'price_update',
|
||||||
// type: 'standard',
|
type: 'standard',
|
||||||
// description: 'Update daily price data',
|
description: 'Update daily price data',
|
||||||
// defaultStaleHours: 24
|
defaultStaleHours: 24
|
||||||
// },
|
},
|
||||||
// {
|
{
|
||||||
// name: 'intraday_bars',
|
name: 'intraday_bars',
|
||||||
// type: 'intraday_crawl',
|
type: 'intraday_crawl',
|
||||||
// description: 'Crawl intraday price bars from today backwards',
|
description: 'Crawl intraday price bars from today backwards',
|
||||||
// requiresFinishedFlag: true,
|
requiresFinishedFlag: true,
|
||||||
// defaultStaleHours: 1 // Check every hour for new data
|
defaultStaleHours: 1 // Check every hour for new data
|
||||||
// },
|
},
|
||||||
|
|
||||||
// // Fundamental data operations
|
// Fundamental data operations
|
||||||
// {
|
{
|
||||||
// name: 'financials_update',
|
name: 'financials_update',
|
||||||
// type: 'standard',
|
type: 'standard',
|
||||||
// description: 'Update financial statements',
|
description: 'Update financial statements',
|
||||||
// defaultStaleHours: 24 * 7 // Weekly
|
defaultStaleHours: 24 * 7 // Weekly
|
||||||
// },
|
},
|
||||||
// {
|
// Corporate actions - fetched together in one API call
|
||||||
// name: 'earnings_update',-
|
{
|
||||||
// type: 'standard',
|
name: 'corporate_actions_update',
|
||||||
// description: 'Update earnings data',
|
type: 'standard',
|
||||||
// defaultStaleHours: 24 * 7 // Weekly
|
description: 'Update corporate actions (earnings, dividends, splits)',
|
||||||
// },
|
defaultStaleHours: 24 * 7 // Weekly
|
||||||
// {
|
},
|
||||||
// name: 'dividends_update',
|
|
||||||
// type: 'standard',
|
|
||||||
// description: 'Update dividend history',
|
|
||||||
// defaultStaleHours: 24 * 7 // Weekly
|
|
||||||
// },
|
|
||||||
// {
|
|
||||||
// name: 'splits_update',
|
|
||||||
// type: 'standard',
|
|
||||||
// description: 'Update stock split history',
|
|
||||||
// defaultStaleHours: 24 * 30 // Monthly
|
|
||||||
// },
|
|
||||||
|
|
||||||
// // News and filings
|
// News and filings
|
||||||
// {
|
{
|
||||||
// name: 'filings_update',
|
name: 'filings_update',
|
||||||
// type: 'standard',
|
type: 'standard',
|
||||||
// description: 'Update SEC filings',
|
description: 'Update SEC filings',
|
||||||
// defaultStaleHours: 24 // Daily
|
defaultStaleHours: 24 // Daily
|
||||||
// },
|
},
|
||||||
// {
|
// {
|
||||||
// name: 'news_update',
|
// name: 'news_update',
|
||||||
// type: 'standard',
|
// type: 'standard',
|
||||||
|
|
|
||||||
|
|
@ -3,8 +3,7 @@
|
||||||
* Supports dynamic operation registration with auto-indexing
|
* Supports dynamic operation registration with auto-indexing
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import type { MongoDBClient } from '@stock-bot/mongodb';
|
import type { MongoDBClient, Logger } from '@stock-bot/types';
|
||||||
import type { Logger } from '@stock-bot/types';
|
|
||||||
import type { IntradayCrawlSymbol, QMOperationConfig } from './types';
|
import type { IntradayCrawlSymbol, QMOperationConfig } from './types';
|
||||||
|
|
||||||
export class QMOperationTracker {
|
export class QMOperationTracker {
|
||||||
|
|
@ -57,7 +56,8 @@ export class QMOperationTracker {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const indexSpec of indexes) {
|
for (const indexSpec of indexes) {
|
||||||
await this.mongodb.createIndex(this.collectionName, indexSpec, {
|
const collection = this.mongodb.collection(this.collectionName);
|
||||||
|
await collection.createIndex(indexSpec, {
|
||||||
background: true,
|
background: true,
|
||||||
name: `op_${operationName}_${Object.keys(indexSpec).join('_')}`
|
name: `op_${operationName}_${Object.keys(indexSpec).join('_')}`
|
||||||
});
|
});
|
||||||
|
|
@ -174,7 +174,7 @@ export class QMOperationTracker {
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
|
|
||||||
const collection = this.mongodb.getCollection(this.collectionName);
|
const collection = this.mongodb.collection(this.collectionName);
|
||||||
const result = await collection.bulkWrite(bulkOps as any, { ordered: false });
|
const result = await collection.bulkWrite(bulkOps as any, { ordered: false });
|
||||||
|
|
||||||
this.logger.debug('Bulk updated symbol operations', {
|
this.logger.debug('Bulk updated symbol operations', {
|
||||||
|
|
@ -335,17 +335,18 @@ export class QMOperationTracker {
|
||||||
finishedCrawls?: number;
|
finishedCrawls?: number;
|
||||||
avgRecordsPerSymbol?: number;
|
avgRecordsPerSymbol?: number;
|
||||||
}> {
|
}> {
|
||||||
const total = await this.mongodb.countDocuments(this.collectionName);
|
const collection = this.mongodb.collection(this.collectionName);
|
||||||
|
const total = await collection.countDocuments({});
|
||||||
|
|
||||||
const processed = await this.mongodb.countDocuments(this.collectionName, {
|
const processed = await collection.countDocuments({
|
||||||
[`operations.${operationName}`]: { $exists: true }
|
[`operations.${operationName}`]: { $exists: true }
|
||||||
});
|
});
|
||||||
|
|
||||||
const successful = await this.mongodb.countDocuments(this.collectionName, {
|
const successful = await collection.countDocuments({
|
||||||
[`operations.${operationName}.status`]: 'success'
|
[`operations.${operationName}.status`]: 'success'
|
||||||
});
|
});
|
||||||
|
|
||||||
const failed = await this.mongodb.countDocuments(this.collectionName, {
|
const failed = await collection.countDocuments({
|
||||||
[`operations.${operationName}.status`]: 'failure'
|
[`operations.${operationName}.status`]: 'failure'
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
@ -354,7 +355,7 @@ export class QMOperationTracker {
|
||||||
this.registeredOperations.get(operationName)?.defaultStaleHours || 24
|
this.registeredOperations.get(operationName)?.defaultStaleHours || 24
|
||||||
));
|
));
|
||||||
|
|
||||||
const stale = await this.mongodb.countDocuments(this.collectionName, {
|
const stale = await collection.countDocuments({
|
||||||
$or: [
|
$or: [
|
||||||
{ [`operations.${operationName}.lastRunAt`]: { $lt: staleDate } },
|
{ [`operations.${operationName}.lastRunAt`]: { $lt: staleDate } },
|
||||||
{ [`operations.${operationName}`]: { $exists: false } }
|
{ [`operations.${operationName}`]: { $exists: false } }
|
||||||
|
|
@ -371,13 +372,13 @@ export class QMOperationTracker {
|
||||||
|
|
||||||
// Additional stats for crawl operations
|
// Additional stats for crawl operations
|
||||||
if (this.registeredOperations.get(operationName)?.type === 'intraday_crawl') {
|
if (this.registeredOperations.get(operationName)?.type === 'intraday_crawl') {
|
||||||
result.finishedCrawls = await this.mongodb.countDocuments(this.collectionName, {
|
result.finishedCrawls = await collection.countDocuments({
|
||||||
[`operations.${operationName}.crawlState.finished`]: true
|
[`operations.${operationName}.crawlState.finished`]: true
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// Calculate average records per symbol
|
// Calculate average records per symbol
|
||||||
const aggregation = await this.mongodb.aggregate(this.collectionName, [
|
const aggregation = await collection.aggregate([
|
||||||
{
|
{
|
||||||
$match: {
|
$match: {
|
||||||
[`operations.${operationName}.recordCount`]: { $exists: true }
|
[`operations.${operationName}.recordCount`]: { $exists: true }
|
||||||
|
|
@ -389,7 +390,7 @@ export class QMOperationTracker {
|
||||||
avgRecords: { $avg: `$operations.${operationName}.recordCount` }
|
avgRecords: { $avg: `$operations.${operationName}.recordCount` }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
]);
|
]).toArray();
|
||||||
|
|
||||||
if (aggregation.length > 0) {
|
if (aggregation.length > 0) {
|
||||||
result.avgRecordsPerSymbol = Math.round(aggregation[0].avgRecords);
|
result.avgRecordsPerSymbol = Math.round(aggregation[0].avgRecords);
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue