renamed corporate-actions to events and finished intial work on it

This commit is contained in:
Boki 2025-06-29 18:50:35 -04:00
parent bb16a52bf7
commit d3850f9eaf
6 changed files with 200 additions and 200 deletions

View file

@ -1,5 +1,5 @@
/**
* QM Corporate Actions - Fetch and update dividends, splits, and earnings together
* QM Events Actions - Fetch and update dividends, splits, and earnings together
*/
import type { ExecutionContext } from '@stock-bot/handlers';
@ -23,14 +23,14 @@ async function getOperationTracker(handler: QMHandler): Promise<QMOperationTrack
}
/**
* Update corporate actions (dividends, splits, earnings) for a single symbol
* Update events (dividends, splits, earnings) for a single symbol
* Single API call returns all three data types
*/
export async function updateCorporateActions(
export async function updateEvents(
this: QMHandler,
input: {
symbol: string;
symbolId: number;
exchange: string;
qmSearchCode: string;
},
_context?: ExecutionContext
@ -44,108 +44,120 @@ export async function updateCorporateActions(
earnings: number;
};
}> {
const { symbol, symbolId, qmSearchCode } = input;
const { symbol, exchange, qmSearchCode } = input;
this.logger.info('Fetching corporate actions', { symbol, symbolId });
this.logger.info(`Fetching events ${qmSearchCode}`, { symbol, exchange, qmSearchCode });
const sessionManager = QMSessionManager.getInstance();
await sessionManager.initialize(this.cache, this.logger);
// Get a session - you'll need to add the appropriate session ID
const sessionId = QM_SESSION_IDS.LOOKUP; // TODO: Update with correct session ID
const sessionId = QM_SESSION_IDS.PRICES; // TODO: Update with correct session ID
const session = await sessionManager.getSession(sessionId);
if (!session || !session.uuid) {
throw new Error(`No active session found for QM corporate actions`);
throw new Error(`No active session found for QM events ${qmSearchCode}`);
}
try {
// Build API request for corporate actions
// Build API request for events
const searchParams = new URLSearchParams({
symbol: symbol,
symbolId: symbolId.toString(),
qmodTool: 'CorporateActions', // Assuming this returns all three
webmasterId: '500'
earnings: 'true',
splits: 'true',
dividends: 'true',
symbol: qmSearchCode,
});
// TODO: Update with correct corporate actions endpoint
const apiUrl = `${QM_CONFIG.BASE_URL}/datatool/corporate-actions.json?${searchParams.toString()}`;
// TODO: Update with correct events endpoint
const apiUrl = `${QM_CONFIG.EVENTS_URL}?${searchParams.toString()}`;
const response = await fetch(apiUrl, {
method: 'GET',
headers: session.headers,
proxy: session.proxy,
});
const tracker = await getOperationTracker(this);
if (!response.ok) {
throw new Error(`QM API request failed: ${response.status} ${response.statusText}`);
}
const corporateData = await response.json();
const results = corporateData.results;
if (typeof results.error === 'object') {
await tracker.updateSymbolOperation(qmSearchCode, 'events_update', {
status: 'success',
});
throw new Error(`Invalid response structure from QM API for ${qmSearchCode}`);
}
// Update session success stats
await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid);
const tracker = await getOperationTracker(this);
let dividendCount = 0;
let splitCount = 0;
let earningsCount = 0;
// Process dividends
if (corporateData.dividends && corporateData.dividends.length > 0) {
if (results.dividends[0].dividend && results.dividends[0].dividend.length > 0 && Array.isArray(results.dividends[0].dividend)) {
await this.mongodb.batchUpsert(
'qmDividends',
corporateData.dividends.map((dividend: any) => ({
results.dividends[0].dividend.map((dividend: any) => ({
...dividend,
symbol,
symbolId,
updated_at: new Date()
exchange,
qmSearchCode,
reportDate: new Date(dividend.date),
})),
['symbol', 'exDate', 'recordDate']
['qmSearchCode','reportDate']
);
dividendCount = corporateData.dividends.length;
dividendCount = results.dividends[0].dividend.length;
}
// Process splits
if (corporateData.splits && corporateData.splits.length > 0) {
if (results.splits[0].split && results.splits[0].split.length > 0 && Array.isArray(results.splits[0].split)) {
await this.mongodb.batchUpsert(
'qmSplits',
corporateData.splits.map((split: any) => ({
results.splits[0].split.map((split: any) => ({
...split,
symbol,
symbolId,
updated_at: new Date()
exchange,
qmSearchCode,
reportDate: new Date(split.date),
})),
['symbol', 'splitDate', 'ratio']
['qmSearchCode','reportDate']
);
splitCount = corporateData.splits.length;
splitCount = results.splits[0].split.length;
}
// Process earnings
if (corporateData.earnings && corporateData.earnings.length > 0) {
if (results.earnings[0].earning && results.earnings[0].earning.length > 0 && Array.isArray(results.earnings[0].earning)) {
await this.mongodb.batchUpsert(
'qmEarnings',
corporateData.earnings.map((earning: any) => ({
results.earnings[0].earning.map((earning: any) => ({
...earning,
symbol,
symbolId,
updated_at: new Date()
exchange,
qmSearchCode,
reportDate: new Date(earning.reportDate[0]),
})),
['symbol', 'reportDate', 'fiscalQuarter']
['qmSearchCode','reportDate']
);
earningsCount = corporateData.earnings.length;
earningsCount = results.earnings[0].earning.length;
}
// Update tracking for corporate actions
// Update tracking for events
const updateTime = new Date();
await tracker.updateSymbolOperation(qmSearchCode, 'corporate_actions_update', {
await tracker.updateSymbolOperation(qmSearchCode, 'events_update', {
status: 'success',
lastRecordDate: updateTime,
recordCount: dividendCount + splitCount + earningsCount
});
this.logger.info('Corporate actions updated successfully', {
this.logger.info(`Events updated successfully ${qmSearchCode}`, {
symbol,
dividendCount,
splitCount,
@ -155,7 +167,7 @@ export async function updateCorporateActions(
return {
success: true,
symbol,
message: `Corporate actions updated for ${symbol}`,
message: `Events updated for ${qmSearchCode}`,
data: {
dividends: dividendCount,
splits: splitCount,
@ -169,30 +181,30 @@ export async function updateCorporateActions(
await sessionManager.incrementFailedCalls(sessionId, session.uuid);
}
this.logger.error('Error fetching corporate actions', {
this.logger.error('Error fetching events', {
symbol,
error: error instanceof Error ? error.message : 'Unknown error'
});
// Track failure for corporate actions
// Track failure for events
const tracker = await getOperationTracker(this);
await tracker.updateSymbolOperation(qmSearchCode, 'corporate_actions_update', {
await tracker.updateSymbolOperation(qmSearchCode, 'events_update', {
status: 'failure'
});
return {
success: false,
symbol,
message: `Failed to fetch corporate actions: ${error instanceof Error ? error.message : 'Unknown error'}`
message: `Failed to fetch events: ${error instanceof Error ? error.message : 'Unknown error'}`
};
}
}
/**
* Schedule corporate actions updates for symbols that need refreshing
* Schedule events updates for symbols that need refreshing
*/
export async function scheduleCorporateActionsUpdates(
export async function scheduleEventsUpdates(
this: QMHandler,
input: {
limit?: number;
@ -204,34 +216,34 @@ export async function scheduleCorporateActionsUpdates(
symbolsQueued: number;
errors: number;
}> {
const { limit = 100, forceUpdate = false } = input;
const { limit = 100000, forceUpdate = false } = input;
const tracker = await getOperationTracker(this);
this.logger.info('Scheduling corporate actions updates', { limit, forceUpdate });
this.logger.info('Scheduling events updates', { limit, forceUpdate });
try {
// Get symbols that need corporate actions updates
const staleSymbols = await tracker.getStaleSymbols('corporate_actions_update', {
// Get symbols that need events updates
const staleSymbols = await tracker.getStaleSymbols('events_update', {
minHoursSinceRun: forceUpdate ? 0 : 24 * 7, // Weekly
limit
});
if (staleSymbols.length === 0) {
this.logger.info('No symbols need corporate actions updates');
this.logger.info('No symbols need events updates');
return {
message: 'No symbols need corporate actions updates',
message: 'No symbols need events updates',
symbolsQueued: 0,
errors: 0
};
}
this.logger.info(`Found ${staleSymbols.length} symbols needing corporate actions updates`);
this.logger.info(`Found ${staleSymbols.length} symbols needing events updates`);
// Get full symbol data to include symbolId
const symbolDocs = await this.mongodb.find('qmSymbols', {
qmSearchCode: { $in: staleSymbols.slice(0, limit) } // Apply limit after deduplication
}, {
projection: { symbol: 1, symbolId: 1, qmSearchCode: 1 }
projection: { symbol: 1, exchange: 1, qmSearchCode: 1 }
});
let queued = 0;
@ -240,40 +252,35 @@ export async function scheduleCorporateActionsUpdates(
// Schedule individual update jobs for each symbol
for (const doc of symbolDocs) {
try {
if (!doc.symbolId) {
this.logger.warn(`Symbol ${doc.symbol} missing symbolId, skipping`);
continue;
}
await this.scheduleOperation('update-corporate-actions', {
await this.scheduleOperation('update-events', {
symbol: doc.symbol,
symbolId: doc.symbolId,
exchange: doc.exchange,
qmSearchCode: doc.qmSearchCode
}, {
priority: 4,
delay: queued * 1500 // 1.5 seconds between jobs
delay: queued * 0.05 // 1.5 seconds between jobs
});
queued++;
} catch (error) {
this.logger.error(`Failed to schedule corporate actions update for ${doc.symbol}`, { error });
this.logger.error(`Failed to schedule events update for ${doc.symbol}`, { error });
errors++;
}
}
this.logger.info('Corporate actions update scheduling completed', {
this.logger.info('Events update scheduling completed', {
symbolsQueued: queued,
errors,
total: staleSymbols.length
});
return {
message: `Scheduled corporate actions updates for ${queued} symbols`,
message: `Scheduled events updates for ${queued} symbols`,
symbolsQueued: queued,
errors
};
} catch (error) {
this.logger.error('Corporate actions scheduling failed', { error });
this.logger.error('Events scheduling failed', { error });
throw error;
}
}

View file

@ -1,14 +1,14 @@
/**
* QM Action Exports
*/
export { scheduleCorporateActionsUpdates, updateCorporateActions } from './corporate-actions.action';
export { scheduleFilingsUpdates, updateFilings } from './filings.action';
export { scheduleFinancialsUpdates, updateFinancials } from './financials.action';
export { scheduleIntradayUpdates, updateIntradayBars } from './intraday.action';
export { schedulePriceUpdates, updatePrices } from './prices.action';
export { checkSessions, createSession } from './session.action';
export { scheduleSymbolInfoUpdates, updateSymbolInfo } from './symbol-info.action';
export { searchSymbols, spiderSymbol } from './symbol.action';
export { deduplicateSymbols, updateExchangeStats, updateExchangeStatsAndDeduplicate } from './symbol-dedup.action';
/**
* QM Action Exports
*/
export { scheduleEventsUpdates, updateEvents } from './events.action';
export { scheduleFilingsUpdates, updateFilings } from './filings.action';
export { scheduleFinancialsUpdates, updateFinancials } from './financials.action';
export { scheduleIntradayUpdates, updateIntradayBars } from './intraday.action';
export { schedulePriceUpdates, updatePrices } from './prices.action';
export { checkSessions, createSession } from './session.action';
export { scheduleSymbolInfoUpdates, updateSymbolInfo } from './symbol-info.action';
export { searchSymbols, spiderSymbol } from './symbol.action';
export { deduplicateSymbols, updateExchangeStats, updateExchangeStatsAndDeduplicate } from './symbol-dedup.action';

View file

@ -163,7 +163,7 @@ export async function scheduleSymbolInfoUpdates(
symbolsQueued: number;
errors: number;
}> {
const { limit = 1, forceUpdate = false } = input;
const { limit = 100000, forceUpdate = false } = input;
const tracker = await getOperationTracker(this);
this.logger.info('Scheduling symbol info updates', { limit, forceUpdate });

View file

@ -10,7 +10,7 @@ import {
checkSessions,
createSession,
deduplicateSymbols,
scheduleCorporateActionsUpdates,
scheduleEventsUpdates,
scheduleFilingsUpdates,
scheduleFinancialsUpdates,
scheduleIntradayUpdates,
@ -18,7 +18,7 @@ import {
scheduleSymbolInfoUpdates,
searchSymbols,
spiderSymbol,
updateCorporateActions,
updateEvents,
updateExchangeStats,
updateExchangeStatsAndDeduplicate,
updateFilings,
@ -114,18 +114,18 @@ export class QMHandler extends BaseHandler<DataIngestionServices> {
scheduleFinancialsUpdates = scheduleFinancialsUpdates;
/**
* CORPORATE ACTIONS (Dividends, Splits, Earnings)
* EVENTS (Dividends, Splits, Earnings)
*/
@Operation('update-corporate-actions')
updateCorporateActions = updateCorporateActions;
@Operation('update-events')
updateEvents = updateEvents;
@Disabled()
@ScheduledOperation('schedule-corporate-actions-updates', '0 3 * * *', {
// @Disabled()
@ScheduledOperation('schedule-events-updates', '0 3 * * *', {
priority: 6,
immediately: false,
description: 'Check for symbols needing corporate actions updates daily at 3 AM'
description: 'Check for symbols needing events updates daily at 3 AM'
})
scheduleCorporateActionsUpdates = scheduleCorporateActionsUpdates;
scheduleEventsUpdates = scheduleEventsUpdates;
/**
* FILINGS

View file

@ -38,6 +38,7 @@ export const QM_CONFIG = {
LOOKUP_URL: 'https://app.quotemedia.com/datatool/lookup.json',
SYMBOL_URL: 'https://app.quotemedia.com/datatool/getProfiles.json',
PRICES_URL: 'https://app.quotemedia.com/datatool/getEnhancedChartData.json',
EVENTS_URL: 'https://app.quotemedia.com/datatool/getIndicatorsBySymbol.json'
} as const;
// Session management settings

View file

@ -1,112 +1,104 @@
/**
* QM Operation Registry - Define and register all QM operations
*/
import type { MongoDBClient, Logger } from '@stock-bot/types';
import { QMOperationTracker } from './operation-tracker';
import type { QMOperationConfig } from './types';
// Define all QM operations
export const QM_OPERATIONS: QMOperationConfig[] = [
// Price data operations
{
name: 'symbol_info',
type: 'standard',
description: 'Update symbol metadata',
defaultStaleHours: 24 * 7 // Weekly
},
{
name: 'price_update',
type: 'standard',
description: 'Update daily price data',
defaultStaleHours: 24
},
{
name: 'intraday_bars',
type: 'intraday_crawl',
description: 'Crawl intraday price bars from today backwards',
requiresFinishedFlag: true,
defaultStaleHours: 1 // Check every hour for new data
},
// Fundamental data operations
{
name: 'financials_update',
type: 'standard',
description: 'Update financial statements',
defaultStaleHours: 24 * 7 // Weekly
},
// Corporate actions - fetched together in one API call
{
name: 'corporate_actions_update',
type: 'standard',
description: 'Update corporate actions (earnings, dividends, splits)',
defaultStaleHours: 24 * 7 // Weekly
},
// News and filings
{
name: 'filings_update',
type: 'standard',
description: 'Update SEC filings',
defaultStaleHours: 24 // Daily
},
// {
// name: 'news_update',
// type: 'standard',
// description: 'Update news articles',
// defaultStaleHours: 6 // Every 6 hours
// },
// // Technical indicators
// {
// name: 'indicators_update',
// type: 'standard',
// description: 'Calculate technical indicators',
// defaultStaleHours: 24 // Daily
// },
// // Options data
// {
// name: 'options_chain',
// type: 'standard',
// description: 'Update options chain data',
// defaultStaleHours: 1 // Hourly during market hours
// }
];
/**
* Initialize operation tracker with all registered operations
*/
export async function initializeQMOperations(
mongodb: MongoDBClient,
logger: Logger
): Promise<QMOperationTracker> {
logger.info('Initializing QM operations tracker');
const tracker = new QMOperationTracker(mongodb, logger);
// Register all operations
for (const operation of QM_OPERATIONS) {
try {
await tracker.registerOperation(operation);
logger.debug(`Registered operation: ${operation.name}`);
} catch (error) {
logger.error(`Failed to register operation: ${operation.name}`, { error });
throw error;
}
}
logger.info('QM operations tracker initialized', {
operationCount: QM_OPERATIONS.length
});
return tracker;
}
/**
* Get operation configuration by name
*/
export function getOperationConfig(name: string): QMOperationConfig | undefined {
return QM_OPERATIONS.find(op => op.name === name);
/**
* QM Operation Registry - Define and register all QM operations
*/
import type { Logger, MongoDBClient } from '@stock-bot/types';
import { QMOperationTracker } from './operation-tracker';
import type { QMOperationConfig } from './types';
// Define all QM operations
export const QM_OPERATIONS: QMOperationConfig[] = [
// Price data operations
{
name: 'symbol_info',
type: 'standard',
description: 'Update symbol metadata',
defaultStaleHours: 24 * 7 // Weekly
},
{
name: 'price_update',
type: 'standard',
description: 'Update daily price data',
defaultStaleHours: 24
},
{
name: 'intraday_bars',
type: 'intraday_crawl',
description: 'Crawl intraday price bars from today backwards',
requiresFinishedFlag: true,
defaultStaleHours: 1 // Check every hour for new data
},
// Fundamental data operations
{
name: 'financials_update',
type: 'standard',
description: 'Update financial statements',
defaultStaleHours: 24 * 7 // Weekly
},
// Corporate actions - fetched together in one API call
{
name: 'events_update',
type: 'standard',
description: 'Update events (earnings, dividends, splits)',
defaultStaleHours: 24 * 7 // Weekly
},
// News and filings
{
name: 'filings_update',
type: 'standard',
description: 'Update SEC filings',
defaultStaleHours: 24 // Daily
},
// {
// name: 'news_update',
// type: 'standard',
// description: 'Update news articles',
// defaultStaleHours: 6 // Every 6 hours
// },
// // Options data
// {
// name: 'options_chain',
// type: 'standard',
// description: 'Update options chain data',
// defaultStaleHours: 1 // Hourly during market hours
// }
];
/**
* Initialize operation tracker with all registered operations
*/
export async function initializeQMOperations(
mongodb: MongoDBClient,
logger: Logger
): Promise<QMOperationTracker> {
logger.info('Initializing QM operations tracker');
const tracker = new QMOperationTracker(mongodb, logger);
// Register all operations
for (const operation of QM_OPERATIONS) {
try {
await tracker.registerOperation(operation);
logger.debug(`Registered operation: ${operation.name}`);
} catch (error) {
logger.error(`Failed to register operation: ${operation.name}`, { error });
throw error;
}
}
logger.info('QM operations tracker initialized', {
operationCount: QM_OPERATIONS.length
});
return tracker;
}
/**
* Get operation configuration by name
*/
export function getOperationConfig(name: string): QMOperationConfig | undefined {
return QM_OPERATIONS.find(op => op.name === name);
}