work on qm filings

This commit is contained in:
Boki 2025-07-01 15:35:56 -04:00
parent 710577eb3d
commit 960daf4cad
17 changed files with 2319 additions and 32 deletions

View file

@ -0,0 +1,122 @@
# Intraday Crawl System
## Overview
The intraday crawl system is designed to handle large-scale historical data collection with proper resumption support. It tracks the oldest and newest dates reached, allowing it to resume from where it left off if interrupted.
## Key Features
1. **Bidirectional Crawling**: Can crawl both forward (for new data) and backward (for historical data)
2. **Resumption Support**: Tracks progress and can resume from where it left off
3. **Gap Detection**: Automatically detects gaps in data coverage
4. **Batch Processing**: Processes data in configurable batches (default: 7 days)
5. **Completion Tracking**: Knows when a symbol's full history has been fetched
## Crawl State Fields
The system tracks the following state for each symbol:
```typescript
interface CrawlState {
finished: boolean; // Whether crawl is complete
oldestDateReached?: Date; // Oldest date we've fetched
newestDateReached?: Date; // Newest date we've fetched
lastProcessedDate?: Date; // Last date processed (for resumption)
totalDaysProcessed?: number; // Total days processed so far
lastCrawlDirection?: 'forward' | 'backward';
targetOldestDate?: Date; // Target date to reach
}
```
## How It Works
### Initial Crawl
1. Starts from today and fetches current data
2. Then begins crawling backward in weekly batches
3. Continues until it reaches the target oldest date (default: 2020-01-01)
4. Marks as finished when complete
### Resumption After Interruption
1. Checks for forward gap: If `newestDateReached < yesterday`, fetches new data first
2. Checks for backward gap: If not finished and `oldestDateReached > targetOldestDate`, continues backward crawl
3. Resumes from `lastProcessedDate` to avoid re-fetching data
### Daily Updates
Once a symbol is fully crawled:
- Only needs to fetch new data (forward crawl)
- Much faster as it's typically just 1-2 days of data
## Usage
### Manual Crawl for Single Symbol
```typescript
await handler.crawlIntradayData({
symbol: 'AAPL',
symbolId: 12345,
qmSearchCode: 'AAPL',
targetOldestDate: '2020-01-01',
batchSize: 7 // Days per batch
});
```
### Schedule Crawls for Multiple Symbols
```typescript
await handler.scheduleIntradayCrawls({
limit: 50,
targetOldestDate: '2020-01-01',
priorityMode: 'incomplete' // or 'never_run', 'stale', 'all'
});
```
### Check Crawl Status
```typescript
const tracker = handler.operationRegistry.getTracker('qm');
const isComplete = await tracker.isIntradayCrawlComplete('AAPL', 'intraday_bars', new Date('2020-01-01'));
```
### Get Symbols Needing Crawl
```typescript
const symbols = await tracker.getSymbolsForIntradayCrawl('intraday_bars', {
limit: 100,
targetOldestDate: new Date('2020-01-01'),
includeNewDataGaps: true // Include symbols needing updates
});
```
## Priority Modes
- **never_run**: Symbols that have never been crawled (highest priority)
- **incomplete**: Symbols with unfinished crawls
- **stale**: Symbols with complete crawls but new data available
- **all**: All symbols needing any processing
## Scheduled Operations
The system includes scheduled operations:
- `schedule-intraday-crawls-batch`: Runs every 4 hours, processes incomplete crawls
## Monitoring
Use the provided scripts to monitor crawl progress:
```bash
# Check overall status
bun run scripts/check-intraday-status.ts
# Test crawl for specific symbol
bun run test/intraday-crawl.test.ts
```
## Performance Considerations
1. **Rate Limiting**: Delays between API calls to avoid rate limits
2. **Weekend Skipping**: Automatically skips weekends to save API calls
3. **Batch Size**: Configurable batch size (default 7 days) balances progress vs memory
4. **Priority Scheduling**: Higher priority for current data updates
## Error Handling
- Failed batches don't stop the entire crawl
- Errors are logged and stored in the operation status
- Partial success is tracked separately from complete failure
- Session failures trigger automatic session rotation

View file

@ -0,0 +1,107 @@
# Operation Tracker Enhancements for Intraday Crawling
## Summary of Changes
This document summarizes the enhancements made to the operation tracker to support sophisticated intraday data crawling with resumption capabilities.
## Changes Made
### 1. Enhanced CrawlState Interface (`types.ts`)
Added new fields to track crawl progress:
- `newestDateReached`: Track the most recent date processed
- `lastProcessedDate`: For resumption after interruption
- `totalDaysProcessed`: Progress tracking
- `targetOldestDate`: The goal date to reach
### 2. Updated OperationTracker (`OperationTracker.ts`)
- Modified `updateSymbolOperation` to handle new crawl state fields
- Updated `bulkUpdateSymbolOperations` for proper Date handling
- Enhanced `markCrawlFinished` to track both oldest and newest dates
- Added `getSymbolsForIntradayCrawl`: Specialized method for intraday crawls with gap detection
- Added `isIntradayCrawlComplete`: Check if a crawl has reached its target
- Added new indexes for efficient querying on crawl state fields
### 3. New Intraday Crawl Action (`intraday-crawl.action.ts`)
Created a sophisticated crawl system with:
- **Bidirectional crawling**: Handles both forward (new data) and backward (historical) gaps
- **Batch processing**: Processes data in weekly batches by default
- **Resumption logic**: Can resume from where it left off if interrupted
- **Gap detection**: Automatically identifies missing date ranges
- **Completion tracking**: Knows when the full history has been fetched
### 4. Integration with QM Handler
- Added new operations: `crawl-intraday-data` and `schedule-intraday-crawls`
- Added scheduled operation to automatically process incomplete crawls every 4 hours
- Integrated with the existing operation registry system
### 5. Testing and Monitoring Tools
- Created test script to verify crawl functionality
- Created status checking script to monitor crawl progress
- Added comprehensive documentation
## How It Works
### Initial Crawl Flow
1. Symbol starts with no crawl state
2. First crawl fetches today's data and sets `newestDateReached`
3. Subsequent batches crawl backward in time
4. Each batch updates `oldestDateReached` and `lastProcessedDate`
5. When `oldestDateReached <= targetOldestDate`, crawl is marked finished
### Resumption Flow
1. Check if `newestDateReached < yesterday` (forward gap)
2. If yes, fetch new data first to stay current
3. Check if `finished = false` (backward gap)
4. If yes, continue backward crawl from `lastProcessedDate`
5. Process in batches until complete
### Daily Update Flow
1. For finished crawls, only check for forward gaps
2. Fetch data from `newestDateReached + 1` to today
3. Update `newestDateReached` to maintain currency
## Benefits
1. **Resilient**: Can handle interruptions gracefully
2. **Efficient**: Avoids re-fetching data
3. **Trackable**: Clear progress visibility
4. **Scalable**: Can handle thousands of symbols
5. **Flexible**: Configurable batch sizes and target dates
## Usage Examples
### Check if symbol X needs crawling:
```typescript
const tracker = operationRegistry.getTracker('qm');
const symbols = await tracker.getSymbolsForIntradayCrawl('intraday_bars', {
limit: 1,
targetOldestDate: new Date('2020-01-01')
});
const symbolX = symbols.find(s => s.symbol === 'X');
```
### Start crawl for symbol X only:
```typescript
await handler.crawlIntradayData({
symbol: 'X',
symbolId: symbolData.symbolId,
qmSearchCode: symbolData.qmSearchCode,
targetOldestDate: '2020-01-01'
});
```
### Schedule crawls for never-run symbols:
```typescript
await handler.scheduleIntradayCrawls({
limit: 50,
priorityMode: 'never_run',
targetOldestDate: '2020-01-01'
});
```
## Next Steps
1. Monitor the crawl progress using the provided scripts
2. Adjust batch sizes based on API rate limits
3. Consider adding more sophisticated retry logic for failed batches
4. Implement data validation to ensure quality

View file

@ -0,0 +1,66 @@
#!/usr/bin/env bun
/**
* Script to check intraday crawl status
*/
import { createTestContext } from '../src/test-utils';
import { QMHandler } from '../src/handlers/qm/qm.handler';
async function checkIntradayStatus() {
const context = await createTestContext();
const handler = new QMHandler(context.services);
// Wait for operation registry to initialize
await new Promise(resolve => setTimeout(resolve, 1000));
try {
const tracker = handler.operationRegistry.getTracker('qm');
// Get operation stats
const stats = await tracker.getOperationStats('intraday_bars');
console.log('Intraday Bars Operation Stats:');
console.log('------------------------------');
console.log(`Total symbols: ${stats.totalSymbols}`);
console.log(`Processed symbols: ${stats.processedSymbols}`);
console.log(`Successful symbols: ${stats.successfulSymbols}`);
console.log(`Failed symbols: ${stats.failedSymbols}`);
console.log(`Stale symbols: ${stats.staleSymbols}`);
console.log(`Finished crawls: ${stats.finishedCrawls || 0}`);
console.log('');
// Get symbols needing crawl
console.log('Symbols needing crawl (top 10):');
console.log('--------------------------------');
const needsCrawl = await tracker.getSymbolsForIntradayCrawl('intraday_bars', {
limit: 10,
targetOldestDate: new Date('2020-01-01')
});
for (const symbol of needsCrawl) {
const crawlState = symbol.operationStatus?.crawlState;
console.log(`\n${symbol.symbol}:`);
console.log(` Status: ${symbol.operationStatus?.status || 'never run'}`);
console.log(` Last run: ${symbol.operationStatus?.lastRunAt || 'never'}`);
if (crawlState) {
console.log(` Finished: ${crawlState.finished}`);
console.log(` Oldest date: ${crawlState.oldestDateReached || 'none'}`);
console.log(` Newest date: ${crawlState.newestDateReached || 'none'}`);
console.log(` Days processed: ${crawlState.totalDaysProcessed || 0}`);
}
if (symbol.gaps) {
console.log(` Gaps: ${symbol.gaps.forward ? 'new data' : ''} ${symbol.gaps.backward ? 'historical' : ''}`);
}
}
} catch (error) {
console.error('Failed to check status:', error);
} finally {
await context.cleanup();
}
}
// Run the check
checkIntradayStatus().catch(console.error);

View file

@ -14,8 +14,11 @@ export async function updateFilings(
this: QMHandler,
input: {
symbol: string;
symbolId: number;
exchange: string;
lastRecordDate?: Date | null;
qmSearchCode: string;
page: number;
totalPages?: number;
},
_context?: ExecutionContext
): Promise<{
@ -24,9 +27,9 @@ export async function updateFilings(
message: string;
data?: any;
}> {
const { symbol, symbolId, qmSearchCode } = input;
const { qmSearchCode, page, symbol, exchange, lastRecordDate, totalPages } = input;
this.logger.info('Fetching filings', { symbol, symbolId });
this.logger.info(`Fetching filings ${qmSearchCode} - ${page}/${totalPages}`, { qmSearchCode, page });
const sessionManager = QMSessionManager.getInstance();
await sessionManager.initialize(this.cache, this.logger);
@ -42,15 +45,17 @@ export async function updateFilings(
try {
// Build API request for filings
const searchParams = new URLSearchParams({
symbol: symbol,
symbolId: symbolId.toString(),
qmodTool: 'Filings',
webmasterId: '500',
limit: '50' // Get recent filings
symbol: qmSearchCode,
webmasterId: "500",
page: "1",
xbrlSubDoc: "true",
inclIxbrl: "true",
inclXbrl: "true",
resultsPerPage: "25",
});
// TODO: Update with correct filings endpoint
const apiUrl = `${QM_CONFIG.BASE_URL}/datatool/filings.json?${searchParams.toString()}`;
const apiUrl = `${QM_CONFIG.FILING_URL}?${searchParams.toString()}`;
const response = await fetch(apiUrl, {
method: 'GET',
@ -75,10 +80,9 @@ export async function updateFilings(
filingsData.map((filing: any) => ({
...filing,
symbol,
symbolId,
updated_at: new Date()
exchange,
})),
['symbol', 'filingDate', 'formType', 'accessionNumber'] // Unique keys
['qmSearchCode', 'filingId'] // Unique keys
);
// Update symbol to track last filings update
@ -88,6 +92,8 @@ export async function updateFilings(
recordCount: filingsData.length
});
this.logger.info('Filings updated successfully', {
symbol,
filingsCount: filingsData.length
@ -155,16 +161,18 @@ export async function scheduleFilingsUpdates(
symbolsQueued: number;
errors: number;
}> {
const { limit = 100, forceUpdate = false } = input;
const { limit = 1, forceUpdate = false } = input;
this.logger.info('Scheduling filings updates', { limit, forceUpdate });
try {
// Get symbols that need updating
const staleSymbols = await this.operationRegistry.getStaleSymbols('qm', 'filings_update', {
minHoursSinceRun: forceUpdate ? 0 : 24, // Daily for filings
limit
});
// const staleSymbols = await this.operationRegistry.getStaleSymbols('qm', 'filings_update', {
// minHoursSinceRun: forceUpdate ? 0 : 24, // Daily for filings
// limit
// });
const staleSymbols = ['X:CA']
if (staleSymbols.length === 0) {
this.logger.info('No symbols need filings updates');
@ -181,7 +189,7 @@ export async function scheduleFilingsUpdates(
const symbolDocs = await this.mongodb.find('qmSymbols', {
qmSearchCode: { $in: staleSymbols }
}, {
projection: { symbol: 1, symbolId: 1, qmSearchCode: 1 }
projection: { qmSearchCode: 1, operations: 1, symbol: 1, exchange: 1 }
});
let queued = 0;
@ -197,11 +205,11 @@ export async function scheduleFilingsUpdates(
await this.scheduleOperation('update-filings', {
symbol: doc.symbol,
symbolId: doc.symbolId,
qmSearchCode: doc.qmSearchCode
exchange: doc.exchange,
qmSearchCode: doc.qmSearchCode,
lastRecordDate: doc.operations?.filings_update?.lastRecordDate || null,
}, {
priority: 5, // Lower priority than financial data
delay: queued * 2000 // 2 seconds between jobs
});
queued++;

View file

@ -6,9 +6,12 @@ export { scheduleEventsUpdates, updateEvents } from './events.action';
export { scheduleFilingsUpdates, updateFilings } from './filings.action';
export { scheduleFinancialsUpdates, updateFinancials } from './financials.action';
export { scheduleIntradayUpdates, updateIntradayBars } from './intraday.action';
export { crawlIntradayData, scheduleIntradayCrawls } from './intraday-crawl.action';
export { schedulePriceUpdates, updatePrices } from './prices.action';
export { checkSessions, createSession } from './session.action';
export { scheduleSymbolInfoUpdates, updateSymbolInfo } from './symbol-info.action';
export { searchSymbols, spiderSymbol } from './symbol.action';
export { deduplicateSymbols, updateExchangeStats, updateExchangeStatsAndDeduplicate } from './symbol-dedup.action';
export { scheduleInsidersUpdates, updateInsiders } from './insiders.action';
export { scheduleSymbolNewsUpdates, updateSymbolNews, updateGeneralNews } from './news.action';

View file

@ -0,0 +1,286 @@
/**
* QM Insiders Actions - Fetch and update insider trading data
*/
import type { ExecutionContext } from '@stock-bot/handlers';
import type { QMHandler } from '../qm.handler';
import { QM_CONFIG, QM_SESSION_IDS } from '../shared/config';
import { QMSessionManager } from '../shared/session-manager';
/**
* Update insider transactions for a single symbol
*/
export async function updateInsiders(
this: QMHandler,
input: {
symbol: string;
symbolId: number;
qmSearchCode: string;
lookbackDays?: number;
},
_context?: ExecutionContext
): Promise<{
success: boolean;
symbol: string;
message: string;
data?: any;
}> {
const { symbol, symbolId, qmSearchCode, lookbackDays = 365 } = input;
this.logger.info('Fetching insider transactions', { symbol, symbolId, lookbackDays });
const sessionManager = QMSessionManager.getInstance();
await sessionManager.initialize(this.cache, this.logger);
const sessionId = QM_SESSION_IDS.LOOKUP;
const session = await sessionManager.getSession(sessionId);
if (!session || !session.uuid) {
throw new Error(`No active session found for QM insiders`);
}
try {
// Calculate date range
const endDate = new Date();
const startDate = new Date();
startDate.setDate(startDate.getDate() - lookbackDays);
// Build API request for insider transactions
const searchParams = new URLSearchParams({
symbol: symbol,
symbolId: symbolId.toString(),
qmodTool: 'InsiderActivity',
webmasterId: '500',
startDate: startDate.toISOString().split('T')[0],
endDate: endDate.toISOString().split('T')[0],
includeOptions: 'true',
pageSize: '100'
} as Record<string, string>);
const apiUrl = `${QM_CONFIG.BASE_URL}/datatool/insiders.json?${searchParams.toString()}`;
const response = await fetch(apiUrl, {
method: 'GET',
headers: session.headers,
proxy: session.proxy,
});
if (!response.ok) {
throw new Error(`QM API request failed: ${response.status} ${response.statusText}`);
}
const insiderData = await response.json();
// Update session success stats
await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid);
// Process and store insider data
if (insiderData && insiderData.transactions && insiderData.transactions.length > 0) {
const processedTransactions = insiderData.transactions.map((transaction: any) => ({
symbol,
symbolId,
transactionDate: new Date(transaction.transactionDate),
filingDate: new Date(transaction.filingDate),
insiderName: transaction.insiderName,
insiderTitle: transaction.insiderTitle || 'Unknown',
transactionType: transaction.transactionType,
shares: parseFloat(transaction.shares) || 0,
pricePerShare: parseFloat(transaction.pricePerShare) || 0,
totalValue: parseFloat(transaction.totalValue) || 0,
sharesOwned: parseFloat(transaction.sharesOwned) || 0,
ownershipType: transaction.ownershipType || 'Direct',
formType: transaction.formType || 'Form 4',
transactionCode: transaction.transactionCode,
updated_at: new Date()
}));
// Store in MongoDB
await this.mongodb.batchUpsert(
'qmInsiders',
processedTransactions,
['symbol', 'transactionDate', 'insiderName', 'transactionType'] // Unique keys
);
// Calculate summary statistics
const totalBuys = processedTransactions.filter((t: any) =>
t.transactionType === 'Buy' || t.transactionType === 'Purchase'
).length;
const totalSells = processedTransactions.filter((t: any) =>
t.transactionType === 'Sell' || t.transactionType === 'Sale'
).length;
const totalBuyValue = processedTransactions
.filter((t: any) => t.transactionType === 'Buy' || t.transactionType === 'Purchase')
.reduce((sum: number, t: any) => sum + t.totalValue, 0);
const totalSellValue = processedTransactions
.filter((t: any) => t.transactionType === 'Sell' || t.transactionType === 'Sale')
.reduce((sum: number, t: any) => sum + t.totalValue, 0);
// Update operation tracking
await this.operationRegistry.updateOperation('qm', qmSearchCode, 'insiders_update', {
status: 'success',
lastRecordDate: endDate,
recordCount: processedTransactions.length,
metadata: {
totalBuys,
totalSells,
totalBuyValue,
totalSellValue,
netValue: totalBuyValue - totalSellValue,
uniqueInsiders: new Set(processedTransactions.map((t: any) => t.insiderName)).size
}
});
this.logger.info('Insider transactions updated successfully', {
symbol,
transactionCount: processedTransactions.length,
totalBuys,
totalSells,
netValue: totalBuyValue - totalSellValue
});
return {
success: true,
symbol,
message: `Updated ${processedTransactions.length} insider transactions`,
data: {
count: processedTransactions.length,
totalBuys,
totalSells,
totalBuyValue,
totalSellValue,
netValue: totalBuyValue - totalSellValue
}
};
} else {
// No insider data
this.logger.info('No insider transactions found', { symbol });
await this.operationRegistry.updateOperation('qm', qmSearchCode, 'insiders_update', {
status: 'success',
lastRecordDate: endDate,
recordCount: 0
});
return {
success: true,
symbol,
message: 'No insider transactions found',
data: { count: 0 }
};
}
} catch (error) {
// Update session failure stats
if (session.uuid) {
await sessionManager.incrementFailedCalls(sessionId, session.uuid);
}
this.logger.error('Error fetching insider transactions', {
symbol,
error: error instanceof Error ? error.message : 'Unknown error'
});
// Update operation tracking for failure
await this.operationRegistry.updateOperation('qm', qmSearchCode, 'insiders_update', {
status: 'failure',
error: error instanceof Error ? error.message : 'Unknown error'
});
return {
success: false,
symbol,
message: `Failed to fetch insider transactions: ${error instanceof Error ? error.message : 'Unknown error'}`
};
}
}
/**
* Schedule insider updates for symbols
*/
export async function scheduleInsidersUpdates(
this: QMHandler,
input: {
limit?: number;
minHoursSinceRun?: number;
forceUpdate?: boolean;
} = {},
_context?: ExecutionContext
): Promise<{
message: string;
symbolsQueued: number;
errors: number;
}> {
const { limit = 100, minHoursSinceRun = 24 * 7, forceUpdate = false } = input;
this.logger.info('Scheduling insider updates', { limit, minHoursSinceRun, forceUpdate });
try {
// Get symbols that need insider updates
const staleSymbols = await this.operationRegistry.getStaleSymbols('qm', 'insiders_update', {
minHoursSinceRun: forceUpdate ? 0 : minHoursSinceRun,
limit
});
if (staleSymbols.length === 0) {
this.logger.info('No symbols need insider updates');
return {
message: 'No symbols need insider updates',
symbolsQueued: 0,
errors: 0
};
}
// Get full symbol data
const symbolsToProcess = await this.mongodb.find('qmSymbols', {
qmSearchCode: { $in: staleSymbols }
}, {
projection: { symbol: 1, symbolId: 1, qmSearchCode: 1 }
});
this.logger.info(`Found ${symbolsToProcess.length} symbols for insider updates`);
let symbolsQueued = 0;
let errors = 0;
// Schedule update jobs
for (const doc of symbolsToProcess) {
try {
if (!doc.symbolId) {
this.logger.warn(`Symbol ${doc.symbol} missing symbolId, skipping`);
continue;
}
await this.scheduleOperation('update-insiders', {
symbol: doc.symbol,
symbolId: doc.symbolId,
qmSearchCode: doc.qmSearchCode
}, {
priority: 5, // Medium priority
delay: symbolsQueued * 1000 // 1 second between jobs
});
symbolsQueued++;
} catch (error) {
this.logger.error(`Failed to schedule insider update for ${doc.symbol}`, { error });
errors++;
}
}
this.logger.info('Insider update scheduling completed', {
symbolsQueued,
errors
});
return {
message: `Scheduled insider updates for ${symbolsQueued} symbols`,
symbolsQueued,
errors
};
} catch (error) {
this.logger.error('Insider scheduling failed', { error });
throw error;
}
}

View file

@ -0,0 +1,500 @@
/**
* QM Intraday Crawl Actions - Sophisticated crawling with resumption support
*/
import type { ExecutionContext } from '@stock-bot/handlers';
import type { QMHandler } from '../qm.handler';
import type { CrawlState } from '../../../shared/operation-manager/types';
import { QM_CONFIG, QM_SESSION_IDS } from '../shared/config';
import { QMSessionManager } from '../shared/session-manager';
interface IntradayCrawlInput {
symbol: string;
symbolId: number;
qmSearchCode: string;
targetOldestDate?: string; // ISO date string for how far back to crawl
batchSize?: number; // Days per batch
}
interface DateRange {
start: Date;
end: Date;
direction: 'forward' | 'backward';
}
/**
* Process a batch of intraday data for a date range
*/
export async function processIntradayBatch(
this: QMHandler,
input: {
symbol: string;
symbolId: number;
qmSearchCode: string;
dateRange: DateRange;
},
_context?: ExecutionContext
): Promise<{
success: boolean;
recordsProcessed: number;
datesProcessed: number;
errors: string[];
}> {
const { symbol, symbolId, qmSearchCode, dateRange } = input;
const errors: string[] = [];
let recordsProcessed = 0;
let datesProcessed = 0;
const sessionManager = QMSessionManager.getInstance();
await sessionManager.initialize(this.cache, this.logger);
// Get a session
const sessionId = QM_SESSION_IDS.LOOKUP; // TODO: Update with correct session ID
const session = await sessionManager.getSession(sessionId);
if (!session || !session.uuid) {
throw new Error(`No active session found for QM intraday`);
}
// Process each date in the range
const currentDate = new Date(dateRange.start);
const endDate = new Date(dateRange.end);
while (
(dateRange.direction === 'backward' && currentDate >= endDate) ||
(dateRange.direction === 'forward' && currentDate <= endDate)
) {
try {
// Skip weekends
if (currentDate.getDay() === 0 || currentDate.getDay() === 6) {
if (dateRange.direction === 'backward') {
currentDate.setDate(currentDate.getDate() - 1);
} else {
currentDate.setDate(currentDate.getDate() + 1);
}
continue;
}
// Build API request
const searchParams = new URLSearchParams({
symbol: symbol,
symbolId: symbolId.toString(),
qmodTool: 'IntradayBars',
webmasterId: '500',
date: currentDate.toISOString().split('T')[0],
interval: '1' // 1-minute bars
} as Record<string, string>);
const apiUrl = `${QM_CONFIG.BASE_URL}/datatool/intraday.json?${searchParams.toString()}`;
const response = await fetch(apiUrl, {
method: 'GET',
headers: session.headers,
proxy: session.proxy,
});
if (!response.ok) {
throw new Error(`API request failed: ${response.status}`);
}
const barsData = await response.json();
// Update session success stats
await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid);
// Process and store data if we got any
if (barsData && barsData.length > 0) {
const processedBars = barsData.map((bar: any) => ({
...bar,
symbol,
symbolId,
timestamp: new Date(bar.timestamp),
date: new Date(currentDate),
updated_at: new Date()
}));
await this.mongodb.batchUpsert(
'qmIntradayBars',
processedBars,
['symbol', 'timestamp']
);
recordsProcessed += barsData.length;
}
datesProcessed++;
} catch (error) {
const errorMsg = `Failed to fetch ${symbol} for ${currentDate.toISOString().split('T')[0]}: ${error}`;
errors.push(errorMsg);
this.logger.error(errorMsg);
// Update session failure stats
if (session.uuid) {
await sessionManager.incrementFailedCalls(sessionId, session.uuid);
}
}
// Move to next date
if (dateRange.direction === 'backward') {
currentDate.setDate(currentDate.getDate() - 1);
} else {
currentDate.setDate(currentDate.getDate() + 1);
}
}
return {
success: errors.length === 0,
recordsProcessed,
datesProcessed,
errors
};
}
/**
* Main intraday crawl handler with sophisticated resumption logic
*/
export async function crawlIntradayData(
this: QMHandler,
input: IntradayCrawlInput,
_context?: ExecutionContext
): Promise<{
success: boolean;
symbol: string;
message: string;
data?: any;
}> {
const {
symbol,
symbolId,
qmSearchCode,
targetOldestDate = '2020-01-01', // Default to ~5 years of data
batchSize = 7 // Process a week at a time
} = input;
this.logger.info('Starting intraday crawl', {
symbol,
symbolId,
targetOldestDate,
batchSize
});
try {
// Get current crawl state
const symbolData = await this.mongodb.findOne('qmSymbols', {
qmSearchCode
});
const currentCrawlState: CrawlState = symbolData?.operations?.intraday_bars?.crawlState || {
finished: false
};
// Determine what needs to be processed
const today = new Date();
today.setHours(0, 0, 0, 0);
const targetOldest = new Date(targetOldestDate);
targetOldest.setHours(0, 0, 0, 0);
const ranges: DateRange[] = [];
// 1. Check for forward gap (new data since last crawl)
if (currentCrawlState.newestDateReached) {
const newestDate = new Date(currentCrawlState.newestDateReached);
const daysSinceNewest = Math.floor((today.getTime() - newestDate.getTime()) / (1000 * 60 * 60 * 24));
if (daysSinceNewest > 1) {
// We have new data to fetch
const forwardStart = new Date(newestDate);
forwardStart.setDate(forwardStart.getDate() + 1);
ranges.push({
start: forwardStart,
end: today,
direction: 'forward'
});
}
} else if (!currentCrawlState.oldestDateReached) {
// Never crawled, start from today
ranges.push({
start: today,
end: today,
direction: 'forward'
});
}
// 2. Check for backward gap (historical data)
if (!currentCrawlState.finished) {
const startDate = currentCrawlState.lastProcessedDate
? new Date(currentCrawlState.lastProcessedDate)
: currentCrawlState.oldestDateReached
? new Date(currentCrawlState.oldestDateReached)
: today;
if (startDate > targetOldest) {
// Calculate batch end date
const batchEnd = new Date(startDate);
batchEnd.setDate(batchEnd.getDate() - batchSize);
// Don't go past target
if (batchEnd < targetOldest) {
batchEnd.setTime(targetOldest.getTime());
}
ranges.push({
start: startDate,
end: batchEnd,
direction: 'backward'
});
}
}
if (ranges.length === 0) {
// Nothing to do
this.logger.info('Intraday crawl already complete', { symbol });
return {
success: true,
symbol,
message: 'Intraday crawl already complete'
};
}
// Process the ranges
let totalRecords = 0;
let totalDates = 0;
const allErrors: string[] = [];
for (const range of ranges) {
this.logger.info('Processing date range', {
symbol,
start: range.start.toISOString().split('T')[0],
end: range.end.toISOString().split('T')[0],
direction: range.direction
});
const result = await processIntradayBatch.call(this, {
symbol,
symbolId,
qmSearchCode,
dateRange: range
});
totalRecords += result.recordsProcessed;
totalDates += result.datesProcessed;
allErrors.push(...result.errors);
// Update crawl state after each batch
const updatedCrawlState: Partial<CrawlState> = {
lastProcessedDate: range.end,
lastCrawlDirection: range.direction,
totalDaysProcessed: (currentCrawlState.totalDaysProcessed || 0) + result.datesProcessed
};
if (range.direction === 'forward') {
updatedCrawlState.newestDateReached = range.end;
if (!currentCrawlState.oldestDateReached) {
updatedCrawlState.oldestDateReached = range.start;
}
} else {
updatedCrawlState.oldestDateReached = range.end;
if (!currentCrawlState.newestDateReached) {
updatedCrawlState.newestDateReached = range.start;
}
}
// Check if we've completed the crawl
if (range.direction === 'backward' && range.end <= targetOldest) {
updatedCrawlState.finished = true;
updatedCrawlState.targetOldestDate = targetOldest;
}
await this.operationRegistry.updateOperation('qm', qmSearchCode, 'intraday_bars', {
status: allErrors.length > 0 ? 'partial' : 'success',
lastRecordDate: today,
recordCount: totalRecords,
crawlState: updatedCrawlState,
error: allErrors.length > 0 ? allErrors.join('; ') : undefined
});
}
const message = `Processed ${totalDates} days, ${totalRecords} records for ${symbol}`;
this.logger.info('Intraday crawl batch completed', {
symbol,
totalDates,
totalRecords,
errors: allErrors.length,
finished: ranges.some(r => r.direction === 'backward' && r.end <= targetOldest)
});
return {
success: allErrors.length === 0,
symbol,
message,
data: {
datesProcessed: totalDates,
recordsProcessed: totalRecords,
errors: allErrors,
crawlComplete: ranges.some(r => r.direction === 'backward' && r.end <= targetOldest)
}
};
} catch (error) {
this.logger.error('Intraday crawl failed', {
symbol,
error: error instanceof Error ? error.message : 'Unknown error'
});
await this.operationRegistry.updateOperation('qm', qmSearchCode, 'intraday_bars', {
status: 'failure',
error: error instanceof Error ? error.message : 'Unknown error'
});
return {
success: false,
symbol,
message: `Intraday crawl failed: ${error instanceof Error ? error.message : 'Unknown error'}`
};
}
}
/**
* Schedule intraday crawls for multiple symbols
*/
export async function scheduleIntradayCrawls(
this: QMHandler,
input: {
limit?: number;
targetOldestDate?: string;
priorityMode?: 'never_run' | 'incomplete' | 'stale' | 'all';
} = {},
_context?: ExecutionContext
): Promise<{
message: string;
symbolsQueued: number;
errors: number;
}> {
const {
limit = 50,
targetOldestDate = '2020-01-01',
priorityMode = 'all'
} = input;
this.logger.info('Scheduling intraday crawls', {
limit,
targetOldestDate,
priorityMode
});
try {
// Get symbols based on priority mode
let symbolsToProcess: any[] = [];
const tracker = this.operationRegistry.getTracker('qm');
switch (priorityMode) {
case 'never_run':
// Get symbols that have never been crawled
symbolsToProcess = await this.mongodb.find('qmSymbols', {
'operations.intraday_bars': { $exists: false },
active: { $ne: false }
}, {
limit,
projection: { symbol: 1, symbolId: 1, qmSearchCode: 1 }
});
break;
case 'incomplete':
// Get symbols with incomplete crawls
symbolsToProcess = await tracker.getSymbolsForIntradayCrawl(
'intraday_bars',
{ limit, targetOldestDate: new Date(targetOldestDate), includeNewDataGaps: false }
);
break;
case 'stale':
// Get symbols that need updates (new data)
symbolsToProcess = await tracker.getSymbolsForIntradayCrawl(
'intraday_bars',
{ limit, targetOldestDate: new Date(targetOldestDate), includeNewDataGaps: true }
);
symbolsToProcess = symbolsToProcess.filter(s => s.gaps?.forward);
break;
case 'all':
default:
// Get all symbols that need any processing
symbolsToProcess = await tracker.getSymbolsForIntradayCrawl(
'intraday_bars',
{ limit, targetOldestDate: new Date(targetOldestDate) }
);
break;
}
if (symbolsToProcess.length === 0) {
return {
message: `No symbols found for priority mode: ${priorityMode}`,
symbolsQueued: 0,
errors: 0
};
}
// Get full symbol data if needed
if (priorityMode !== 'never_run') {
const qmSearchCodes = symbolsToProcess.map(s => s.symbol);
const fullSymbols = await this.mongodb.find('qmSymbols', {
qmSearchCode: { $in: qmSearchCodes }
}, {
projection: { symbol: 1, symbolId: 1, qmSearchCode: 1 }
});
// Map back the full data
symbolsToProcess = symbolsToProcess.map(sp => {
const full = fullSymbols.find(f => f.qmSearchCode === sp.symbol);
return full || sp;
});
}
let symbolsQueued = 0;
let errors = 0;
// Schedule crawl jobs
for (const doc of symbolsToProcess) {
try {
if (!doc.symbolId) {
this.logger.warn(`Symbol ${doc.symbol} missing symbolId, skipping`);
continue;
}
await this.scheduleOperation('crawl-intraday-data', {
symbol: doc.symbol,
symbolId: doc.symbolId,
qmSearchCode: doc.qmSearchCode,
targetOldestDate
}, {
priority: priorityMode === 'stale' ? 9 : 5, // Higher priority for updates
delay: symbolsQueued * 2000 // 2 seconds between jobs
});
symbolsQueued++;
} catch (error) {
this.logger.error(`Failed to schedule intraday crawl for ${doc.symbol}`, { error });
errors++;
}
}
this.logger.info('Intraday crawl scheduling completed', {
priorityMode,
symbolsQueued,
errors
});
return {
message: `Scheduled ${symbolsQueued} symbols for intraday crawl (${priorityMode} mode)`,
symbolsQueued,
errors
};
} catch (error) {
this.logger.error('Intraday crawl scheduling failed', { error });
throw error;
}
}

View file

@ -0,0 +1,448 @@
/**
* QM News Actions - Fetch symbol-specific and general market news
*/
import type { ExecutionContext } from '@stock-bot/handlers';
import type { QMHandler } from '../qm.handler';
import { QM_CONFIG, QM_SESSION_IDS } from '../shared/config';
import { QMSessionManager } from '../shared/session-manager';
interface NewsArticle {
id: string;
publishedDate: Date;
title: string;
summary: string;
source: string;
url: string;
symbols?: string[];
categories?: string[];
sentiment?: {
score: number;
label: string; // positive, negative, neutral
};
imageUrl?: string;
}
/**
* Update news for a single symbol
*/
export async function updateSymbolNews(
this: QMHandler,
input: {
symbol: string;
symbolId: number;
qmSearchCode: string;
lookbackDays?: number;
},
_context?: ExecutionContext
): Promise<{
success: boolean;
symbol: string;
message: string;
data?: any;
}> {
const { symbol, symbolId, qmSearchCode, lookbackDays = 30 } = input;
this.logger.info('Fetching symbol news', { symbol, symbolId, lookbackDays });
const sessionManager = QMSessionManager.getInstance();
await sessionManager.initialize(this.cache, this.logger);
const sessionId = QM_SESSION_IDS.LOOKUP;
const session = await sessionManager.getSession(sessionId);
if (!session || !session.uuid) {
throw new Error(`No active session found for QM news`);
}
try {
// Calculate date range
const endDate = new Date();
const startDate = new Date();
startDate.setDate(startDate.getDate() - lookbackDays);
// Build API request for symbol news
const searchParams = new URLSearchParams({
symbol: symbol,
symbolId: symbolId.toString(),
qmodTool: 'News',
webmasterId: '500',
startDate: startDate.toISOString().split('T')[0],
endDate: endDate.toISOString().split('T')[0],
includeContent: 'true',
pageSize: '50'
} as Record<string, string>);
const apiUrl = `${QM_CONFIG.BASE_URL}/datatool/news.json?${searchParams.toString()}`;
const response = await fetch(apiUrl, {
method: 'GET',
headers: session.headers,
proxy: session.proxy,
});
if (!response.ok) {
throw new Error(`QM API request failed: ${response.status} ${response.statusText}`);
}
const newsData = await response.json();
// Update session success stats
await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid);
// Process and store news data
if (newsData && newsData.articles && newsData.articles.length > 0) {
const processedArticles = newsData.articles.map((article: any) => ({
articleId: article.id || `${symbol}_${article.publishedDate}_${article.title.substring(0, 20)}`,
symbol,
symbolId,
publishedDate: new Date(article.publishedDate),
title: article.title,
summary: article.summary || article.content?.substring(0, 500),
source: article.source || 'Unknown',
url: article.url,
symbols: article.symbols || [symbol],
categories: article.categories || [],
sentiment: article.sentiment ? {
score: parseFloat(article.sentiment.score) || 0,
label: article.sentiment.label || 'neutral'
} : null,
imageUrl: article.imageUrl,
isSymbolSpecific: true,
updated_at: new Date()
}));
// Store in MongoDB
await this.mongodb.batchUpsert(
'qmNews',
processedArticles,
['articleId'] // Unique key
);
// Calculate sentiment summary
const sentimentCounts = processedArticles.reduce((acc: any, article: any) => {
if (article.sentiment) {
acc[article.sentiment.label] = (acc[article.sentiment.label] || 0) + 1;
}
return acc;
}, {});
// Update operation tracking
await this.operationRegistry.updateOperation('qm', qmSearchCode, 'news_update', {
status: 'success',
lastRecordDate: endDate,
recordCount: processedArticles.length,
metadata: {
sentimentCounts,
uniqueSources: new Set(processedArticles.map((a: any) => a.source)).size,
avgSentimentScore: processedArticles
.filter((a: any) => a.sentiment?.score)
.reduce((sum: number, a: any, i: number, arr: any[]) =>
i === arr.length - 1 ? (sum + a.sentiment.score) / arr.length : sum + a.sentiment.score, 0
)
}
});
this.logger.info('Symbol news updated successfully', {
symbol,
articleCount: processedArticles.length,
sentimentCounts
});
return {
success: true,
symbol,
message: `Updated ${processedArticles.length} news articles`,
data: {
count: processedArticles.length,
sentimentCounts,
sources: new Set(processedArticles.map((a: any) => a.source)).size
}
};
} else {
// No news found
this.logger.info('No news articles found', { symbol });
await this.operationRegistry.updateOperation('qm', qmSearchCode, 'news_update', {
status: 'success',
lastRecordDate: endDate,
recordCount: 0
});
return {
success: true,
symbol,
message: 'No news articles found',
data: { count: 0 }
};
}
} catch (error) {
// Update session failure stats
if (session.uuid) {
await sessionManager.incrementFailedCalls(sessionId, session.uuid);
}
this.logger.error('Error fetching symbol news', {
symbol,
error: error instanceof Error ? error.message : 'Unknown error'
});
// Update operation tracking for failure
await this.operationRegistry.updateOperation('qm', qmSearchCode, 'news_update', {
status: 'failure',
error: error instanceof Error ? error.message : 'Unknown error'
});
return {
success: false,
symbol,
message: `Failed to fetch news: ${error instanceof Error ? error.message : 'Unknown error'}`
};
}
}
/**
* Update general market news
*/
export async function updateGeneralNews(
this: QMHandler,
input: {
categories?: string[];
lookbackMinutes?: number;
} = {},
_context?: ExecutionContext
): Promise<{
success: boolean;
message: string;
data?: any;
}> {
const { categories = ['market', 'economy', 'politics'], lookbackMinutes = 60 } = input;
this.logger.info('Fetching general news', { categories, lookbackMinutes });
const sessionManager = QMSessionManager.getInstance();
await sessionManager.initialize(this.cache, this.logger);
const sessionId = QM_SESSION_IDS.LOOKUP;
const session = await sessionManager.getSession(sessionId);
if (!session || !session.uuid) {
throw new Error(`No active session found for QM general news`);
}
try {
// Calculate time range
const endDate = new Date();
const startDate = new Date();
startDate.setMinutes(startDate.getMinutes() - lookbackMinutes);
// Build API request for general news
const searchParams = new URLSearchParams({
qmodTool: 'MarketNews',
webmasterId: '500',
categories: categories.join(','),
startDateTime: startDate.toISOString(),
endDateTime: endDate.toISOString(),
includeContent: 'true',
pageSize: '100'
} as Record<string, string>);
const apiUrl = `${QM_CONFIG.BASE_URL}/datatool/marketnews.json?${searchParams.toString()}`;
const response = await fetch(apiUrl, {
method: 'GET',
headers: session.headers,
proxy: session.proxy,
});
if (!response.ok) {
throw new Error(`QM API request failed: ${response.status} ${response.statusText}`);
}
const newsData = await response.json();
// Update session success stats
await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid);
// Process and store general news
if (newsData && newsData.articles && newsData.articles.length > 0) {
const processedArticles = newsData.articles.map((article: any) => ({
articleId: article.id || `general_${article.publishedDate}_${article.title.substring(0, 20)}`,
publishedDate: new Date(article.publishedDate),
title: article.title,
summary: article.summary || article.content?.substring(0, 500),
source: article.source || 'Unknown',
url: article.url,
symbols: article.symbols || [],
categories: article.categories || categories,
sentiment: article.sentiment ? {
score: parseFloat(article.sentiment.score) || 0,
label: article.sentiment.label || 'neutral'
} : null,
imageUrl: article.imageUrl,
isSymbolSpecific: false,
isMarketMoving: article.isMarketMoving || false,
importance: article.importance || 'medium',
updated_at: new Date()
}));
// Store in MongoDB
await this.mongodb.batchUpsert(
'qmNews',
processedArticles,
['articleId'] // Unique key
);
// Find high-importance articles
const highImportanceCount = processedArticles.filter((a: any) =>
a.importance === 'high' || a.isMarketMoving
).length;
// Update a general tracking document
await this.mongodb.updateOne(
'qmOperationStats',
{ operation: 'general_news_update' },
{
$set: {
lastRunAt: new Date(),
lastRecordCount: processedArticles.length,
highImportanceCount,
categories,
updated_at: new Date()
}
},
{ upsert: true }
);
this.logger.info('General news updated successfully', {
articleCount: processedArticles.length,
highImportanceCount,
categories
});
return {
success: true,
message: `Updated ${processedArticles.length} general news articles`,
data: {
count: processedArticles.length,
highImportanceCount,
categories,
sources: new Set(processedArticles.map((a: any) => a.source)).size
}
};
} else {
// No news found
this.logger.info('No general news articles found');
return {
success: true,
message: 'No general news articles found',
data: { count: 0 }
};
}
} catch (error) {
// Update session failure stats
if (session.uuid) {
await sessionManager.incrementFailedCalls(sessionId, session.uuid);
}
this.logger.error('Error fetching general news', {
error: error instanceof Error ? error.message : 'Unknown error'
});
return {
success: false,
message: `Failed to fetch general news: ${error instanceof Error ? error.message : 'Unknown error'}`
};
}
}
/**
* Schedule symbol news updates
*/
export async function scheduleSymbolNewsUpdates(
this: QMHandler,
input: {
limit?: number;
minHoursSinceRun?: number;
forceUpdate?: boolean;
} = {},
_context?: ExecutionContext
): Promise<{
message: string;
symbolsQueued: number;
errors: number;
}> {
const { limit = 200, minHoursSinceRun = 24 * 7, forceUpdate = false } = input;
this.logger.info('Scheduling symbol news updates', { limit, minHoursSinceRun, forceUpdate });
try {
// Get symbols that need news updates
const staleSymbols = await this.operationRegistry.getStaleSymbols('qm', 'news_update', {
minHoursSinceRun: forceUpdate ? 0 : minHoursSinceRun,
limit
});
if (staleSymbols.length === 0) {
this.logger.info('No symbols need news updates');
return {
message: 'No symbols need news updates',
symbolsQueued: 0,
errors: 0
};
}
// Get full symbol data
const symbolsToProcess = await this.mongodb.find('qmSymbols', {
qmSearchCode: { $in: staleSymbols }
}, {
projection: { symbol: 1, symbolId: 1, qmSearchCode: 1 }
});
this.logger.info(`Found ${symbolsToProcess.length} symbols for news updates`);
let symbolsQueued = 0;
let errors = 0;
// Schedule update jobs
for (const doc of symbolsToProcess) {
try {
if (!doc.symbolId) {
this.logger.warn(`Symbol ${doc.symbol} missing symbolId, skipping`);
continue;
}
await this.scheduleOperation('update-symbol-news', {
symbol: doc.symbol,
symbolId: doc.symbolId,
qmSearchCode: doc.qmSearchCode
}, {
priority: 4, // Lower priority than price data
delay: symbolsQueued * 500 // 0.5 seconds between jobs
});
symbolsQueued++;
} catch (error) {
this.logger.error(`Failed to schedule news update for ${doc.symbol}`, { error });
errors++;
}
}
this.logger.info('Symbol news update scheduling completed', {
symbolsQueued,
errors
});
return {
message: `Scheduled news updates for ${symbolsQueued} symbols`,
symbolsQueued,
errors
};
} catch (error) {
this.logger.error('Symbol news scheduling failed', { error });
throw error;
}
}

View file

@ -14,9 +14,11 @@ import {
scheduleEventsUpdates,
scheduleFilingsUpdates,
scheduleFinancialsUpdates,
scheduleInsidersUpdates,
scheduleIntradayUpdates,
schedulePriceUpdates,
scheduleSymbolInfoUpdates,
scheduleSymbolNewsUpdates,
searchSymbols,
spiderSymbol,
updateEvents,
@ -24,10 +26,14 @@ import {
updateExchangeStatsAndDeduplicate,
updateFilings,
updateFinancials,
updateGeneralNews,
updateInsiders,
updateIntradayBars,
updatePrices,
updateSymbolInfo
updateSymbolInfo,
updateSymbolNews
} from './actions';
import { crawlIntradayData, scheduleIntradayCrawls } from './actions/intraday-crawl.action';
import { createQMOperationRegistry } from './shared/operation-provider';
@Handler('qm')
@ -169,6 +175,12 @@ export class QMHandler extends BaseHandler<DataIngestionServices> {
@Operation('update-intraday-bars')
updateIntradayBars = updateIntradayBars;
@Operation('crawl-intraday-data')
crawlIntradayData = crawlIntradayData;
@Operation('schedule-intraday-crawls')
scheduleIntradayCrawls = scheduleIntradayCrawls;
@Disabled()
@ScheduledOperation('schedule-intraday-updates', '*/30 * * * *', {
priority: 9,
@ -176,4 +188,60 @@ export class QMHandler extends BaseHandler<DataIngestionServices> {
description: 'Check for symbols needing intraday updates every 30 minutes'
})
scheduleIntradayUpdates = scheduleIntradayUpdates;
@ScheduledOperation('schedule-intraday-crawls-batch', '0 */4 * * *', {
priority: 5,
immediately: false,
description: 'Schedule intraday crawls for incomplete symbols every 4 hours'
})
scheduleIntradayCrawlsBatch = async () => {
return scheduleIntradayCrawls.call(this, {
limit: 25,
priorityMode: 'incomplete'
});
};
/**
* INSIDER TRADING
*/
@Operation('update-insiders')
updateInsiders = updateInsiders;
@Disabled()
@ScheduledOperation('schedule-insiders-updates', '0 4 * * 1', {
priority: 5,
immediately: false,
description: 'Check for symbols needing insider updates weekly on Monday at 4 AM'
})
scheduleInsidersUpdates = scheduleInsidersUpdates;
/**
* NEWS
*/
@Operation('update-symbol-news')
updateSymbolNews = updateSymbolNews;
@Operation('update-general-news')
updateGeneralNews = updateGeneralNews;
@Disabled()
@ScheduledOperation('schedule-symbol-news-updates', '0 5 * * 1', {
priority: 4,
immediately: false,
description: 'Check for symbols needing news updates weekly on Monday at 5 AM'
})
scheduleSymbolNewsUpdates = scheduleSymbolNewsUpdates;
@Disabled()
@ScheduledOperation('update-general-news-frequent', '*/1 * * * *', {
priority: 9,
immediately: true,
description: 'Update general market news every minute'
})
updateGeneralNewsFrequent = async () => {
return updateGeneralNews.call(this, {
categories: ['market', 'economy', 'politics', 'breaking'],
lookbackMinutes: 5 // Only look back 5 minutes to avoid duplicates
});
};
}

View file

@ -10,7 +10,7 @@ export const QM_SESSION_IDS = {
SYMBOL: '1e1d7cb1de1fd2fe52684abdea41a446919a5fe12776dfab88615ac1ce1ec2f6', // getProfiles
PRICES: '5ad521e05faf5778d567f6d0012ec34d6cdbaeb2462f41568f66558bc7b4ced9', // getEnhancedChartData
FINANCIALS: '4e4f1565fb7c9f2a8b4b32b9aa3137af684f3da8a2ce97799d3a7117b14f07be', // getFinancialsEnhancedBySymbol
// FILINGS: '', //
FILINGS: 'a863d519e38f80e45d10e280fb1afc729816e23f0218db2f3e8b23005a9ad8dd', // getCompanyFilings
// INTRADAY: '', //
// '5ad521e05faf5778d567f6d0012ec34d6cdbaeb2462f41568f66558bc7b4ced9' // getEhnachedChartData
// '5ad521e05faf5778d567f6d0012ec34d6cdbaeb2462f41568f66558bc7b4ced9': [], //4488d072b
@ -31,6 +31,7 @@ export const QM_SESSION_IDS = {
// QM API Configuration
export const QM_CONFIG = {
PROXY_URL: 'http://5.79.66.2:13010',
BASE_URL: 'https://app.quotemedia.com',
SESSION_PATH: '/auth/g/authenticate/dataTool/v0/500',
LOOKUP_URL: 'https://app.quotemedia.com/datatool/lookup.json',
@ -38,6 +39,7 @@ export const QM_CONFIG = {
PRICES_URL: 'https://app.quotemedia.com/datatool/getEnhancedChartData.json',
EVENTS_URL: 'https://app.quotemedia.com/datatool/getIndicatorsBySymbol.json',
FINANCIALS_URL: 'https://app.quotemedia.com/datatool/getFinancialsEnhancedBySymbol.json',
FILING_URL: 'https://app.quotemedia.com/datatool/getCompanyFilings.json',
} as const;
// Session management settings

View file

@ -59,6 +59,22 @@ export const QM_OPERATIONS: OperationConfig[] = [
type: 'standard',
description: 'Update SEC filings',
defaultStaleHours: 24 // Daily
},
// Insider trading
{
name: 'insiders_update',
type: 'standard',
description: 'Update insider transactions',
defaultStaleHours: 24 * 7 // Weekly
},
// News
{
name: 'news_update',
type: 'standard',
description: 'Update symbol-specific news',
defaultStaleHours: 24 * 7 // Weekly
}
];

View file

@ -82,12 +82,22 @@ export class OperationTracker {
}
];
// Add crawl state index for crawl operations
// Add crawl state indexes for crawl operations
if (operation.type === 'crawl' || operation.type === 'intraday_crawl') {
indexes.push({
[`operations.${operation.name}.crawlState.finished`]: 1,
[symbolField]: 1
});
indexes.push(
{
[`operations.${operation.name}.crawlState.finished`]: 1,
[symbolField]: 1
},
{
[`operations.${operation.name}.crawlState.newestDateReached`]: 1,
[symbolField]: 1
},
{
[`operations.${operation.name}.crawlState.oldestDateReached`]: 1,
[symbolField]: 1
}
);
}
const collection = this.mongodb.collection(collectionName);
@ -168,9 +178,21 @@ export class OperationTracker {
if (data.crawlState.oldestDateReached) {
update.$set[`${existingPath}.oldestDateReached`] = data.crawlState.oldestDateReached;
}
if (data.crawlState.newestDateReached) {
update.$set[`${existingPath}.newestDateReached`] = data.crawlState.newestDateReached;
}
if (data.crawlState.lastProcessedDate) {
update.$set[`${existingPath}.lastProcessedDate`] = data.crawlState.lastProcessedDate;
}
if (data.crawlState.totalDaysProcessed !== undefined) {
update.$set[`${existingPath}.totalDaysProcessed`] = data.crawlState.totalDaysProcessed;
}
if (data.crawlState.lastCrawlDirection) {
update.$set[`${existingPath}.lastCrawlDirection`] = data.crawlState.lastCrawlDirection;
}
if (data.crawlState.targetOldestDate) {
update.$set[`${existingPath}.targetOldestDate`] = data.crawlState.targetOldestDate;
}
if (data.crawlState.metadata) {
update.$set[`${existingPath}.metadata`] = data.crawlState.metadata;
}
@ -250,7 +272,12 @@ export class OperationTracker {
const basePath = `operations.${operation}.crawlState`;
Object.entries(data.crawlState).forEach(([key, value]) => {
if (value !== undefined) {
update.$set[`${basePath}.${key}`] = value;
// Handle Date objects properly
if (value instanceof Date || (typeof value === 'string' && key.includes('Date'))) {
update.$set[`${basePath}.${key}`] = new Date(value);
} else {
update.$set[`${basePath}.${key}`] = value;
}
}
});
}
@ -374,13 +401,15 @@ export class OperationTracker {
async markCrawlFinished(
symbol: string,
operationName: string,
oldestDateReached: Date
oldestDateReached: Date,
newestDateReached?: Date
): Promise<void> {
await this.updateSymbolOperation(symbol, operationName, {
status: 'success',
crawlState: {
finished: true,
oldestDateReached
oldestDateReached,
newestDateReached: newestDateReached || new Date()
}
});
@ -388,10 +417,137 @@ export class OperationTracker {
provider: this.provider.getProviderConfig().name,
symbol,
operation: operationName,
oldestDateReached
oldestDateReached,
newestDateReached
});
}
/**
* Get symbols for intraday crawl with gap detection
*/
async getSymbolsForIntradayCrawl(
operationName: string,
options: {
limit?: number;
targetOldestDate?: Date;
includeNewDataGaps?: boolean;
} = {}
): Promise<Array<SymbolWithOperations & { gaps?: { forward?: boolean; backward?: boolean } }>> {
const { collectionName, symbolField } = this.provider.getProviderConfig();
const { limit = 100, targetOldestDate, includeNewDataGaps = true } = options;
this.provider.validateOperation(operationName);
// Build filter
const filter: any = {
active: { $ne: false }
};
// Get all symbols that either:
// 1. Have never been crawled
// 2. Are not finished
// 3. Have gaps (new data since last crawl)
const orConditions = [
{ [`operations.${operationName}`]: { $exists: false } },
{ [`operations.${operationName}.crawlState.finished`]: { $ne: true } }
];
if (includeNewDataGaps) {
const yesterday = new Date();
yesterday.setDate(yesterday.getDate() - 1);
yesterday.setHours(0, 0, 0, 0);
orConditions.push({
[`operations.${operationName}.crawlState.newestDateReached`]: { $lt: yesterday }
});
}
filter.$or = orConditions;
const symbols = await this.mongodb.find(collectionName, filter, {
limit,
projection: {
[symbolField]: 1,
[`operations.${operationName}`]: 1
},
sort: {
[`operations.${operationName}.lastRunAt`]: 1
}
});
return symbols.map(doc => {
const opStatus = doc.operations?.[operationName];
const crawlState = opStatus?.crawlState;
// Determine gaps
const gaps: { forward?: boolean; backward?: boolean } = {};
if (crawlState) {
// Check for forward gap (new data)
if (crawlState.newestDateReached) {
const daysSinceNewest = Math.floor(
(Date.now() - new Date(crawlState.newestDateReached).getTime()) / (1000 * 60 * 60 * 24)
);
gaps.forward = daysSinceNewest > 1;
}
// Check for backward gap (historical data)
if (!crawlState.finished) {
gaps.backward = true;
if (targetOldestDate && crawlState.oldestDateReached) {
gaps.backward = new Date(crawlState.oldestDateReached) > targetOldestDate;
}
}
} else {
// Never crawled, has both gaps
gaps.forward = true;
gaps.backward = true;
}
return {
symbol: doc[symbolField],
lastRecordDate: opStatus?.lastRecordDate,
operationStatus: opStatus,
gaps
};
});
}
/**
* Check if intraday crawl is complete
*/
async isIntradayCrawlComplete(
symbol: string,
operationName: string,
targetOldestDate: Date
): Promise<boolean> {
const { collectionName, symbolField } = this.provider.getProviderConfig();
const doc = await this.mongodb.findOne(collectionName, {
[symbolField]: symbol
}, {
projection: { [`operations.${operationName}.crawlState`]: 1 }
});
if (!doc?.operations?.[operationName]?.crawlState) {
return false;
}
const crawlState = doc.operations[operationName].crawlState;
// Check if explicitly marked as finished
if (crawlState.finished) {
return true;
}
// Check if we've reached the target oldest date
if (crawlState.oldestDateReached && targetOldestDate) {
return new Date(crawlState.oldestDateReached) <= targetOldestDate;
}
return false;
}
/**
* Get symbols that need data updates based on last record date
*/

View file

@ -54,8 +54,16 @@ export interface CrawlState {
finished: boolean;
/** Oldest date reached during crawl */
oldestDateReached?: Date;
/** Newest date reached during crawl */
newestDateReached?: Date;
/** Last date that was processed (for resumption) */
lastProcessedDate?: Date;
/** Total days processed so far */
totalDaysProcessed?: number;
/** Direction of last crawl */
lastCrawlDirection?: 'forward' | 'backward';
/** Target oldest date to reach */
targetOldestDate?: Date;
/** Custom crawl metadata */
metadata?: Record<string, any>;
}

View file

@ -0,0 +1,77 @@
#!/usr/bin/env bun
/**
* Test script for intraday crawl functionality
*/
import { createTestContext } from '../src/test-utils';
import { QMHandler } from '../src/handlers/qm/qm.handler';
async function testIntradayCrawl() {
console.log('Testing intraday crawl functionality...\n');
const context = await createTestContext();
const handler = new QMHandler(context.services);
// Wait for operation registry to initialize
await new Promise(resolve => setTimeout(resolve, 1000));
try {
// Test 1: Schedule crawls for never-run symbols
console.log('Test 1: Scheduling crawls for never-run symbols...');
const result1 = await handler.scheduleIntradayCrawls({
limit: 5,
priorityMode: 'never_run',
targetOldestDate: '2023-01-01' // Just 1 year for testing
});
console.log('Result:', result1);
console.log('');
// Test 2: Check crawl state for a specific symbol
console.log('Test 2: Checking crawl state for symbol X...');
const tracker = handler.operationRegistry.getTracker('qm');
const isComplete = await tracker.isIntradayCrawlComplete('X', 'intraday_bars', new Date('2023-01-01'));
console.log('Is crawl complete for X?', isComplete);
// Get detailed state
const symbols = await tracker.getSymbolsForIntradayCrawl('intraday_bars', {
limit: 1,
targetOldestDate: new Date('2023-01-01')
});
const symbolX = symbols.find(s => s.symbol === 'X');
if (symbolX) {
console.log('Symbol X state:', JSON.stringify(symbolX, null, 2));
}
console.log('');
// Test 3: Manually crawl a single symbol
console.log('Test 3: Manually crawling intraday data for X...');
// First get symbol data
const symbolData = await context.services.mongodb.findOne('qmSymbols', {
symbol: 'X'
});
if (symbolData && symbolData.symbolId) {
const crawlResult = await handler.crawlIntradayData({
symbol: 'X',
symbolId: symbolData.symbolId,
qmSearchCode: symbolData.qmSearchCode,
targetOldestDate: '2024-01-01', // Just current year for quick test
batchSize: 7
});
console.log('Crawl result:', crawlResult);
} else {
console.log('Symbol X not found or missing symbolId');
}
} catch (error) {
console.error('Test failed:', error);
} finally {
await context.cleanup();
}
}
// Run the test
testIntradayCrawl().catch(console.error);

View file

@ -0,0 +1,139 @@
/**
* Examples of modifying existing functions to only work with symbol 'X'
*/
import { OperationTracker } from '../src/shared/operation-manager';
// Example 1: Modified getStaleSymbols to only return symbol X
async function getStaleSymbolsOnlyX(
operationTracker: OperationTracker,
providerName: string,
operationName: string,
options: any = {}
) {
// Method 1: Add symbolFilter to options
const modifiedOptions = {
...options,
symbolFilter: { symbol: 'X' }
};
return operationTracker.getStaleSymbols(providerName, operationName, modifiedOptions);
}
// Example 2: Modified sophisticated backtest function for symbol X only
async function runSophisticatedBacktestOnlyX(orchestrator: any) {
const symbols = ['X']; // Only test symbol X
const backtestConfig = {
symbols,
startDate: new Date('2023-01-01'),
endDate: new Date('2024-01-01'),
strategies: ['momentum', 'mean_reversion'],
// ... rest of config
};
return orchestrator.runBacktest(backtestConfig);
}
// Example 3: Modified schedule function to only process symbol X
async function scheduleOperationsOnlyX(handler: any) {
// Get all symbols that need updates, then filter for X
const staleSymbols = await handler.operationRegistry.getStaleSymbols('qm', 'price_update', {
minHoursSinceRun: 24,
limit: 1000
});
// Filter to only include symbol X
const symbolXOnly = staleSymbols.filter((s: string) => s.includes('X:'));
if (symbolXOnly.length > 0) {
const symbolData = await handler.mongodb.find('qmSymbols', {
qmSearchCode: symbolXOnly[0]
});
if (symbolData.length > 0) {
await handler.scheduleOperation('update-prices', {
symbol: symbolData[0].symbol,
symbolId: symbolData[0].symbolId,
qmSearchCode: symbolData[0].qmSearchCode
});
}
}
}
// Example 4: Modified intraday crawl for symbol X only
async function crawlIntradayOnlyX(handler: any) {
// Direct approach - just process symbol X
const symbolX = await handler.mongodb.findOne('qmSymbols', { symbol: 'X' });
if (symbolX) {
return handler.crawlIntradayData({
symbol: symbolX.symbol,
symbolId: symbolX.symbolId,
qmSearchCode: symbolX.qmSearchCode,
mode: 'full',
targetOldestDate: new Date('2020-01-01')
});
}
}
// Example 5: Test wrapper that ensures only symbol X is processed
function createSymbolXTestWrapper(originalFunction: Function) {
return async function(...args: any[]) {
// Check if first argument has symbol property
if (args[0] && typeof args[0] === 'object') {
// If it's a symbol-specific call, only proceed if symbol is X
if (args[0].symbol && args[0].symbol !== 'X') {
console.log(`Skipping symbol ${args[0].symbol} - only testing X`);
return { success: false, message: 'Test mode - only symbol X allowed' };
}
// If it's a batch operation, filter to only X
if (args[0].symbols && Array.isArray(args[0].symbols)) {
args[0].symbols = args[0].symbols.filter((s: string) => s === 'X');
}
}
// Call original function with potentially modified args
return originalFunction.apply(this, args);
};
}
// Example usage
async function demonstrateUsage() {
console.log('=== Demonstration of Symbol X Only Modifications ===\n');
// Mock tracker for demonstration
const mockTracker = {
getStaleSymbols: async (provider: string, operation: string, options: any) => {
console.log(`Getting stale symbols with options:`, options);
if (options.symbolFilter?.symbol === 'X') {
return ['X:NYSE'];
}
return ['AAPL:NASDAQ', 'GOOGL:NASDAQ', 'X:NYSE', 'MSFT:NASDAQ'];
}
} as any;
// Test the modified function
console.log('1. Testing getStaleSymbols with X filter:');
const xOnlySymbols = await getStaleSymbolsOnlyX(mockTracker, 'qm', 'price_update', {
minHoursSinceRun: 24
});
console.log('Results:', xOnlySymbols);
console.log('\n2. Example of wrapper usage:');
const originalUpdate = async (input: any) => {
console.log(`Processing symbol: ${input.symbol}`);
return { success: true, symbol: input.symbol };
};
const wrappedUpdate = createSymbolXTestWrapper(originalUpdate);
await wrappedUpdate({ symbol: 'X', symbolId: 123 });
await wrappedUpdate({ symbol: 'AAPL', symbolId: 456 }); // Will be skipped
console.log('\n=== Demonstration Complete ===');
}
// Run demonstration
demonstrateUsage().catch(console.error);

View file

@ -0,0 +1,163 @@
/**
* Test script for QM operations
*/
import { QMHandler } from '../src/handlers/qm/qm.handler';
import type { DataIngestionServices } from '../src/types';
// Mock services for testing
const mockServices: Partial<DataIngestionServices> = {
mongodb: {
batchUpsert: async (collection: string, data: any[], uniqueKeys: string[]) => {
console.log(`Mock: Batch upsert to ${collection}`, {
recordCount: data.length,
uniqueKeys
});
return { insertedCount: data.length, modifiedCount: 0 };
},
find: async (collection: string, query: any, options?: any) => {
console.log(`Mock: Find from ${collection}`, { query, options });
// Return test symbol for testing
if (collection === 'qmSymbols' && query.symbol === 'X') {
return [{
symbol: 'X',
symbolId: 123456,
qmSearchCode: 'X:NYSE',
exchange: 'NYSE',
name: 'United States Steel Corporation'
}];
}
return [];
},
updateOne: async (collection: string, filter: any, update: any, options?: any) => {
console.log(`Mock: Update ${collection}`, { filter, update, options });
return { modifiedCount: 1 };
}
},
cache: {
get: async (key: string) => {
console.log(`Mock: Cache get ${key}`);
return null;
},
set: async (key: string, value: any, ttl?: number) => {
console.log(`Mock: Cache set ${key}`, { ttl });
return true;
}
},
logger: {
info: (message: string, data?: any) => {
console.log(`[INFO] ${message}`, data || '');
},
error: (message: string, data?: any) => {
console.error(`[ERROR] ${message}`, data || '');
},
warn: (message: string, data?: any) => {
console.warn(`[WARN] ${message}`, data || '');
},
debug: (message: string, data?: any) => {
console.debug(`[DEBUG] ${message}`, data || '');
}
},
// Mock operation registry
operationRegistry: {
updateOperation: async (provider: string, symbol: string, operation: string, data: any) => {
console.log(`Mock: Update operation ${provider}/${operation} for ${symbol}`, data);
return true;
},
getStaleSymbols: async (provider: string, operation: string, options: any) => {
console.log(`Mock: Get stale symbols for ${provider}/${operation}`, options);
// Return test symbol
if (options.symbolFilter?.symbol === 'X') {
return ['X:NYSE'];
}
return [];
}
}
} as DataIngestionServices;
async function testQMOperations() {
console.log('=== Testing QM Operations ===\n');
// Create handler instance
const handler = new QMHandler(mockServices);
// Wait a bit for initialization
await new Promise(resolve => setTimeout(resolve, 1000));
// Test 1: Update Insiders for symbol X
console.log('Test 1: Update Insiders for symbol X');
console.log('-------------------------------------');
try {
const insidersResult = await handler.updateInsiders({
symbol: 'X',
symbolId: 123456,
qmSearchCode: 'X:NYSE',
lookbackDays: 30
});
console.log('Result:', JSON.stringify(insidersResult, null, 2));
} catch (error) {
console.error('Failed:', error);
}
console.log('\n');
// Test 2: Update Symbol News for symbol X
console.log('Test 2: Update Symbol News for symbol X');
console.log('----------------------------------------');
try {
const newsResult = await handler.updateSymbolNews({
symbol: 'X',
symbolId: 123456,
qmSearchCode: 'X:NYSE',
lookbackDays: 7
});
console.log('Result:', JSON.stringify(newsResult, null, 2));
} catch (error) {
console.error('Failed:', error);
}
console.log('\n');
// Test 3: Update General News
console.log('Test 3: Update General News');
console.log('---------------------------');
try {
const generalNewsResult = await handler.updateGeneralNews({
categories: ['market', 'economy'],
lookbackMinutes: 60
});
console.log('Result:', JSON.stringify(generalNewsResult, null, 2));
} catch (error) {
console.error('Failed:', error);
}
console.log('\n');
// Test 4: Check available operations
console.log('Test 4: List Available Operations');
console.log('---------------------------------');
const operations = [
'create-session',
'search-symbols',
'update-symbol-info',
'update-financials',
'update-events',
'update-filings',
'update-prices',
'update-intraday-bars',
'crawl-intraday-data',
'update-insiders',
'update-symbol-news',
'update-general-news'
];
for (const op of operations) {
const hasOperation = typeof (handler as any)[op.replace(/-/g, '')] === 'function';
console.log(`${op}: ${hasOperation ? '✓' : '✗'}`);
}
console.log('\n=== Tests Complete ===');
}
// Run tests
testQMOperations().catch(console.error);

View file

@ -0,0 +1,118 @@
/**
* Test script specifically for symbol X operations
*/
import { QMHandler } from '../src/handlers/qm/qm.handler';
import { OperationTracker } from '../src/shared/operation-manager';
import type { DataIngestionServices } from '../src/types';
// Simple test to check operations for symbol X
async function testSymbolXOperations() {
console.log('=== Testing Operations for Symbol X ===\n');
// Mock minimal services needed
const mockServices: Partial<DataIngestionServices> = {
mongodb: {
collection: (name: string) => ({
find: () => ({
toArray: async () => {
console.log(`Querying collection: ${name}`);
if (name === 'qmSymbols') {
return [{
symbol: 'X',
symbolId: 123456,
qmSearchCode: 'X:NYSE',
exchange: 'NYSE',
name: 'United States Steel Corporation'
}];
}
return [];
}
}),
findOne: async (query: any) => {
console.log(`Finding one in ${name}:`, query);
return null;
},
updateOne: async (filter: any, update: any, options: any) => {
console.log(`Updating ${name}:`, { filter, update });
return { modifiedCount: 1 };
}
}),
find: async (collection: string, query: any) => {
console.log(`Direct find on ${collection}:`, query);
if (collection === 'qmSymbols' && query.symbol === 'X') {
return [{
symbol: 'X',
symbolId: 123456,
qmSearchCode: 'X:NYSE'
}];
}
return [];
}
} as any,
logger: {
info: (msg: string, data?: any) => console.log(`[INFO] ${msg}`, data || ''),
error: (msg: string, data?: any) => console.error(`[ERROR] ${msg}`, data || ''),
warn: (msg: string, data?: any) => console.warn(`[WARN] ${msg}`, data || ''),
debug: (msg: string, data?: any) => console.debug(`[DEBUG] ${msg}`, data || '')
}
} as DataIngestionServices;
// Test 1: Check stale operations for symbol X
console.log('Test 1: Get stale operations for symbol X');
console.log('------------------------------------------');
const tracker = new OperationTracker(mockServices as any);
try {
// Check each operation type
const operations = [
'symbol_info',
'price_update',
'intraday_bars',
'financials_update_quarterly',
'financials_update_annual',
'events_update',
'filings_update',
'insiders_update',
'news_update'
];
for (const operation of operations) {
console.log(`\nChecking ${operation}:`);
const staleSymbols = await tracker.getStaleSymbols('qm', operation, {
minHoursSinceRun: 0, // Get all symbols regardless of last run
limit: 10,
symbolFilter: { symbol: 'X' } // Only get symbol X
});
console.log(`Found ${staleSymbols.length} stale symbols:`, staleSymbols);
}
} catch (error) {
console.error('Error checking stale symbols:', error);
}
// Test 2: Check intraday crawl status for symbol X
console.log('\n\nTest 2: Check intraday crawl status for symbol X');
console.log('------------------------------------------------');
try {
const symbolsForCrawl = await tracker.getSymbolsForIntradayCrawl('intraday_bars', {
limit: 10,
symbolFilter: { symbol: 'X' }
});
console.log(`Found ${symbolsForCrawl.length} symbols for intraday crawl`);
if (symbolsForCrawl.length > 0) {
console.log('Symbol details:', JSON.stringify(symbolsForCrawl[0], null, 2));
}
} catch (error) {
console.error('Error checking intraday crawl:', error);
}
console.log('\n=== Tests Complete ===');
}
// Run the test
testSymbolXOperations().catch(console.error);