finished intra-day crawl

This commit is contained in:
Boki 2025-07-02 18:26:30 -04:00
parent c9a679d9a5
commit 11c24b2280
12 changed files with 437 additions and 896 deletions

View file

@ -77,8 +77,8 @@
"port": 6379,
"db": 1
},
"workers": 1,
"concurrency": 1,
"workers": 5,
"concurrency": 5,
"enableScheduledJobs": true,
"defaultJobOptions": {
"attempts": 3,
@ -183,11 +183,11 @@
"services": {
"dataIngestion": {
"port": 2001,
"workers": 4,
"workers": 5,
"queues": {
"ceo": { "concurrency": 2 },
"webshare": { "concurrency": 1 },
"qm": { "concurrency": 2 },
"qm": { "concurrency": 5 },
"ib": { "concurrency": 1 },
"proxy": { "concurrency": 1 }
},

View file

@ -1,122 +0,0 @@
# Intraday Crawl System
## Overview
The intraday crawl system is designed to handle large-scale historical data collection with proper resumption support. It tracks the oldest and newest dates reached, allowing it to resume from where it left off if interrupted.
## Key Features
1. **Bidirectional Crawling**: Can crawl both forward (for new data) and backward (for historical data)
2. **Resumption Support**: Tracks progress and can resume from where it left off
3. **Gap Detection**: Automatically detects gaps in data coverage
4. **Batch Processing**: Processes data in configurable batches (default: 7 days)
5. **Completion Tracking**: Knows when a symbol's full history has been fetched
## Crawl State Fields
The system tracks the following state for each symbol:
```typescript
interface CrawlState {
finished: boolean; // Whether crawl is complete
oldestDateReached?: Date; // Oldest date we've fetched
newestDateReached?: Date; // Newest date we've fetched
lastProcessedDate?: Date; // Last date processed (for resumption)
totalDaysProcessed?: number; // Total days processed so far
lastCrawlDirection?: 'forward' | 'backward';
targetOldestDate?: Date; // Target date to reach
}
```
## How It Works
### Initial Crawl
1. Starts from today and fetches current data
2. Then begins crawling backward in weekly batches
3. Continues until it reaches the target oldest date (default: 2020-01-01)
4. Marks as finished when complete
### Resumption After Interruption
1. Checks for forward gap: If `newestDateReached < yesterday`, fetches new data first
2. Checks for backward gap: If not finished and `oldestDateReached > targetOldestDate`, continues backward crawl
3. Resumes from `lastProcessedDate` to avoid re-fetching data
### Daily Updates
Once a symbol is fully crawled:
- Only needs to fetch new data (forward crawl)
- Much faster as it's typically just 1-2 days of data
## Usage
### Manual Crawl for Single Symbol
```typescript
await handler.crawlIntradayData({
symbol: 'AAPL',
symbolId: 12345,
qmSearchCode: 'AAPL',
targetOldestDate: '2020-01-01',
batchSize: 7 // Days per batch
});
```
### Schedule Crawls for Multiple Symbols
```typescript
await handler.scheduleIntradayCrawls({
limit: 50,
targetOldestDate: '2020-01-01',
priorityMode: 'incomplete' // or 'never_run', 'stale', 'all'
});
```
### Check Crawl Status
```typescript
const tracker = handler.operationRegistry.getTracker('qm');
const isComplete = await tracker.isIntradayCrawlComplete('AAPL', 'intraday_bars', new Date('2020-01-01'));
```
### Get Symbols Needing Crawl
```typescript
const symbols = await tracker.getSymbolsForIntradayCrawl('intraday_bars', {
limit: 100,
targetOldestDate: new Date('2020-01-01'),
includeNewDataGaps: true // Include symbols needing updates
});
```
## Priority Modes
- **never_run**: Symbols that have never been crawled (highest priority)
- **incomplete**: Symbols with unfinished crawls
- **stale**: Symbols with complete crawls but new data available
- **all**: All symbols needing any processing
## Scheduled Operations
The system includes scheduled operations:
- `schedule-intraday-crawls-batch`: Runs every 4 hours, processes incomplete crawls
## Monitoring
Use the provided scripts to monitor crawl progress:
```bash
# Check overall status
bun run scripts/check-intraday-status.ts
# Test crawl for specific symbol
bun run test/intraday-crawl.test.ts
```
## Performance Considerations
1. **Rate Limiting**: Delays between API calls to avoid rate limits
2. **Weekend Skipping**: Automatically skips weekends to save API calls
3. **Batch Size**: Configurable batch size (default 7 days) balances progress vs memory
4. **Priority Scheduling**: Higher priority for current data updates
## Error Handling
- Failed batches don't stop the entire crawl
- Errors are logged and stored in the operation status
- Partial success is tracked separately from complete failure
- Session failures trigger automatic session rotation

View file

@ -1,107 +0,0 @@
# Operation Tracker Enhancements for Intraday Crawling
## Summary of Changes
This document summarizes the enhancements made to the operation tracker to support sophisticated intraday data crawling with resumption capabilities.
## Changes Made
### 1. Enhanced CrawlState Interface (`types.ts`)
Added new fields to track crawl progress:
- `newestDateReached`: Track the most recent date processed
- `lastProcessedDate`: For resumption after interruption
- `totalDaysProcessed`: Progress tracking
- `targetOldestDate`: The goal date to reach
### 2. Updated OperationTracker (`OperationTracker.ts`)
- Modified `updateSymbolOperation` to handle new crawl state fields
- Updated `bulkUpdateSymbolOperations` for proper Date handling
- Enhanced `markCrawlFinished` to track both oldest and newest dates
- Added `getSymbolsForIntradayCrawl`: Specialized method for intraday crawls with gap detection
- Added `isIntradayCrawlComplete`: Check if a crawl has reached its target
- Added new indexes for efficient querying on crawl state fields
### 3. New Intraday Crawl Action (`intraday-crawl.action.ts`)
Created a sophisticated crawl system with:
- **Bidirectional crawling**: Handles both forward (new data) and backward (historical) gaps
- **Batch processing**: Processes data in weekly batches by default
- **Resumption logic**: Can resume from where it left off if interrupted
- **Gap detection**: Automatically identifies missing date ranges
- **Completion tracking**: Knows when the full history has been fetched
### 4. Integration with QM Handler
- Added new operations: `crawl-intraday-data` and `schedule-intraday-crawls`
- Added scheduled operation to automatically process incomplete crawls every 4 hours
- Integrated with the existing operation registry system
### 5. Testing and Monitoring Tools
- Created test script to verify crawl functionality
- Created status checking script to monitor crawl progress
- Added comprehensive documentation
## How It Works
### Initial Crawl Flow
1. Symbol starts with no crawl state
2. First crawl fetches today's data and sets `newestDateReached`
3. Subsequent batches crawl backward in time
4. Each batch updates `oldestDateReached` and `lastProcessedDate`
5. When `oldestDateReached <= targetOldestDate`, crawl is marked finished
### Resumption Flow
1. Check if `newestDateReached < yesterday` (forward gap)
2. If yes, fetch new data first to stay current
3. Check if `finished = false` (backward gap)
4. If yes, continue backward crawl from `lastProcessedDate`
5. Process in batches until complete
### Daily Update Flow
1. For finished crawls, only check for forward gaps
2. Fetch data from `newestDateReached + 1` to today
3. Update `newestDateReached` to maintain currency
## Benefits
1. **Resilient**: Can handle interruptions gracefully
2. **Efficient**: Avoids re-fetching data
3. **Trackable**: Clear progress visibility
4. **Scalable**: Can handle thousands of symbols
5. **Flexible**: Configurable batch sizes and target dates
## Usage Examples
### Check if symbol X needs crawling:
```typescript
const tracker = operationRegistry.getTracker('qm');
const symbols = await tracker.getSymbolsForIntradayCrawl('intraday_bars', {
limit: 1,
targetOldestDate: new Date('2020-01-01')
});
const symbolX = symbols.find(s => s.symbol === 'X');
```
### Start crawl for symbol X only:
```typescript
await handler.crawlIntradayData({
symbol: 'X',
symbolId: symbolData.symbolId,
qmSearchCode: symbolData.qmSearchCode,
targetOldestDate: '2020-01-01'
});
```
### Schedule crawls for never-run symbols:
```typescript
await handler.scheduleIntradayCrawls({
limit: 50,
priorityMode: 'never_run',
targetOldestDate: '2020-01-01'
});
```
## Next Steps
1. Monitor the crawl progress using the provided scripts
2. Adjust batch sizes based on API rate limits
3. Consider adding more sophisticated retry logic for failed batches
4. Implement data validation to ensure quality

View file

@ -5,13 +5,12 @@
export { scheduleEventsUpdates, updateEvents } from './events.action';
export { scheduleFilingsUpdates, updateFilings } from './filings.action';
export { scheduleFinancialsUpdates, updateFinancials } from './financials.action';
export { scheduleIntradayUpdates, updateIntradayBars } from './intraday.action';
export { scheduleInsidersUpdates, updateInsiders } from './insiders.action';
export { crawlIntradayData, scheduleIntradayCrawls } from './intraday-crawl.action';
export { scheduleSymbolNewsUpdates, updateGeneralNews, updateSymbolNews } from './news.action';
export { schedulePriceUpdates, updatePrices } from './prices.action';
export { checkSessions, createSession } from './session.action';
export { deduplicateSymbols, updateExchangeStats, updateExchangeStatsAndDeduplicate } from './symbol-dedup.action';
export { scheduleSymbolInfoUpdates, updateSymbolInfo } from './symbol-info.action';
export { searchSymbols, spiderSymbol } from './symbol.action';
export { deduplicateSymbols, updateExchangeStats, updateExchangeStatsAndDeduplicate } from './symbol-dedup.action';
export { scheduleInsidersUpdates, updateInsiders } from './insiders.action';
export { scheduleSymbolNewsUpdates, updateSymbolNews, updateGeneralNews } from './news.action';

View file

@ -5,7 +5,7 @@
import type { ExecutionContext } from '@stock-bot/handlers';
import type { CrawlState } from '../../../shared/operation-manager/types';
import type { QMHandler } from '../qm.handler';
import { getWeekStart, QM_CONFIG, QM_SESSION_IDS } from '../shared/config';
import { getLastWeek, QM_CONFIG, QM_SESSION_IDS } from '../shared/config';
import { QMSessionManager } from '../shared/session-manager';
interface IntradayCrawlInput {
@ -13,165 +13,13 @@ interface IntradayCrawlInput {
exchange: string;
qmSearchCode: string;
targetOldestDate?: string; // ISO date string for how far back to crawl
batchSize?: number; // Days per batch
existingCrawlState?: CrawlState; // Pass existing crawl state to avoid re-querying
gaps?: { forward?: boolean; backward?: boolean }; // Pass gap information
}
interface DateRange {
start: Date;
end: Date;
direction: 'forward' | 'backward';
}
/**
* Process a batch of intraday data for a date range
*/
export async function processIntradayBatch(
this: QMHandler,
input: {
symbol: string;
exchange: string;
qmSearchCode: string;
dateRange: DateRange;
},
_context?: ExecutionContext
): Promise<{
success: boolean;
recordsProcessed: number;
datesProcessed: number;
errors: string[];
}> {
const { symbol, exchange, qmSearchCode, dateRange } = input;
console.log('Processing intraday batch for:', { symbol, exchange, qmSearchCode, dateRange });
const errors: string[] = [];
let recordsProcessed = 0;
let datesProcessed = 0;
const sessionManager = QMSessionManager.getInstance();
await sessionManager.initialize(this.cache, this.logger);
// Get a session
const sessionId = QM_SESSION_IDS.PRICES; // TODO: Update with correct session ID
const session = await sessionManager.getSession(sessionId);
if (!session || !session.uuid) {
throw new Error(`No active session found for QM intraday`);
}
// Process each date in the range
const currentWeek = getWeekStart(new Date(dateRange.start));
const endDate = new Date(dateRange.end);
while (
(dateRange.direction === 'backward' && currentWeek >= endDate) ||
(dateRange.direction === 'forward' && currentWeek <= endDate)
) {
try {
// Skip weekends
if (currentWeek.getDay() === 0 || currentWeek.getDay() === 6) {
if (dateRange.direction === 'backward') {
currentWeek.setDate(currentWeek.getDate() - 1);
} else {
currentWeek.setDate(currentWeek.getDate() + 1);
}
continue;
}
getWeekStart(currentWeek); // Ensure we are at the start of the week
// Build API request
const searchParams = new URLSearchParams({
adjType:'none',
adjusted:'true',
freq:'day',
interval:'1',
marketSession:'mkt',
pathName:'/demo/portal/company-quotes.php',
qmodTool:'InteractiveChart',
start: currentWeek.toISOString().split('T')[0],
symbol: qmSearchCode,
unadjusted:'false',
webmasterId:'500',
zeroTradeDays:'false',
} as Record<string, string>);
console.log('Fetching intraday data for:', searchParams.toString());
console.log(test)
const apiUrl = `${QM_CONFIG.PRICES_URL}?${searchParams.toString()}`;
const response = await fetch(apiUrl, {
method: 'GET',
headers: session.headers,
proxy: session.proxy,
});
//https://app.quotemedia.com/datatool/getEnhancedChartData.json?zeroTradeDays=false&start=2025-06-24&interval=1&marketSession=mkt&freq=day&adjusted=true&adjustmentType=none&unadjusted=false&datatype=int&symbol=X:CA
if (!response.ok) {
throw new Error(`API request failed: ${response.status}`);
}
const barsResults = await response.json();
console.log('Bars results:', barsResults);
const barsData = barsResults.results.intraday[0].interval || [];
this.logger.info(`Fetched ${barsData.length} bars for ${qmSearchCode} on ${currentWeek.toISOString().split('T')[0]}`, {
qmSearchCode,
date: currentWeek.toISOString().split('T')[0],
records: barsData.length
});
// Update session success stats
await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid);
// Process and store data if we got any
if (barsData && barsData.length > 0) {
const processedBars = barsData.map((bar: any) => ({
...bar,
qmSearchCode,
symbol,
exchange,
timestamp: new Date(bar.startdatetime),
}));
await this.mongodb.batchUpsert(
'qmIntraday',
processedBars,
['qmSearchCode', 'timestamp']
);
recordsProcessed += barsData.length;
}
datesProcessed++;
} catch (error) {
const errorMsg = `Failed to fetch ${qmSearchCode} for ${currentWeek.toISOString().split('T')[0]}: ${error}`;
errors.push(errorMsg);
this.logger.error(errorMsg);
// Update session failure stats
if (session.uuid) {
await sessionManager.incrementFailedCalls(sessionId, session.uuid);
}
}
// Move to next date
if (dateRange.direction === 'backward') {
currentWeek.setDate(currentWeek.getDate() - 1);
} else {
currentWeek.setDate(currentWeek.getDate() + 1);
}
}
return {
success: errors.length === 0,
recordsProcessed,
datesProcessed,
errors
};
}
/**
* Main intraday crawl handler with sophisticated resumption logic
* Main intraday crawl handler - crawls backwards week by week (Sunday to Sunday)
*/
export async function crawlIntradayData(
this: QMHandler,
@ -189,89 +37,69 @@ export async function crawlIntradayData(
symbol,
exchange,
qmSearchCode,
targetOldestDate = '2020-01-01', // Default to ~5 years of data
batchSize = 7 // Process a week at a time
targetOldestDate = '1960-01-01', // Default to ~5 years of data
existingCrawlState,
gaps
} = input;
this.logger.info('Starting intraday crawl', {
symbol,
exchange,
targetOldestDate,
batchSize
hasExistingState: !!existingCrawlState,
gaps
});
const sessionManager = QMSessionManager.getInstance();
await sessionManager.initialize(this.cache, this.logger);
const sessionId = QM_SESSION_IDS.PRICES;
const session = await sessionManager.getSession(sessionId);
if (!session || !session.uuid) {
throw new Error(`No active session found for QM intraday`);
}
try {
// Get current crawl state
// Use passed crawl state if available, otherwise query for it
let currentCrawlState: CrawlState;
if (existingCrawlState) {
currentCrawlState = existingCrawlState;
} else {
// Only query if not passed
const symbolData = await this.mongodb.findOne('qmSymbols', {
qmSearchCode
});
const currentCrawlState: CrawlState = symbolData?.operations?.intraday_bars?.crawlState || {
currentCrawlState = symbolData?.operations?.intraday_bars?.crawlState || {
finished: false
};
}
// Determine what needs to be processed
const today = new Date();
today.setHours(0, 0, 0, 0);
// Determine starting point for crawl
let currentWeek: Date;
if (currentCrawlState.oldestDateReached && !currentCrawlState.finished) {
// Start from oldest date reached
currentWeek = new Date(currentCrawlState.oldestDateReached);
} else if (currentCrawlState.lastProcessedDate) {
// Resume from last processed date
currentWeek = new Date(currentCrawlState.lastProcessedDate);
} else {
// New crawl, start from today
currentWeek = new Date();
// console.log('Current week before adjustment:', currentWeek);
}
// Get the Sunday of the current week
currentWeek = getLastWeek(currentWeek);
const targetOldest = new Date(targetOldestDate);
targetOldest.setHours(0, 0, 0, 0);
const ranges: DateRange[] = [];
// 1. Check for forward gap (new data since last crawl)
if (currentCrawlState.newestDateReached) {
const newestDate = new Date(currentCrawlState.newestDateReached);
const daysSinceNewest = Math.floor((today.getTime() - newestDate.getTime()) / (1000 * 60 * 60 * 24));
if (daysSinceNewest > 1) {
// We have new data to fetch
const forwardStart = new Date(newestDate);
forwardStart.setDate(forwardStart.getDate() + 1);
ranges.push({
start: forwardStart,
end: today,
direction: 'forward'
});
}
} else if (!currentCrawlState.oldestDateReached) {
// Never crawled, start from today
ranges.push({
start: today,
end: today,
direction: 'forward'
});
}
// 2. Check for backward gap (historical data)
if (!currentCrawlState.finished) {
const startDate = currentCrawlState.lastProcessedDate
? new Date(currentCrawlState.lastProcessedDate)
: currentCrawlState.oldestDateReached
? new Date(currentCrawlState.oldestDateReached)
: today;
if (startDate > targetOldest) {
// Calculate batch end date
const batchEnd = new Date(startDate);
batchEnd.setDate(batchEnd.getDate() - batchSize);
// Don't go past target
if (batchEnd < targetOldest) {
batchEnd.setTime(targetOldest.getTime());
}
ranges.push({
start: startDate,
end: batchEnd,
direction: 'backward'
});
}
}
if (ranges.length === 0) {
// Nothing to do
// Check if already finished
if (currentCrawlState.finished || currentWeek <= targetOldest) {
this.logger.info('Intraday crawl already complete', { symbol });
return {
success: true,
@ -282,86 +110,116 @@ export async function crawlIntradayData(
};
}
// Process the ranges
let totalRecords = 0;
let totalDates = 0;
const allErrors: string[] = [];
let weeksProcessed = 0;
const errors: string[] = [];
for (const range of ranges) {
this.logger.info('Processing date range', {
symbol,
start: range.start.toISOString().split('T')[0],
end: range.end.toISOString().split('T')[0],
direction: range.direction
const endOfWeek = new Date(currentWeek)
endOfWeek.setDate(endOfWeek.getDate() + 6); // Set to next Saturday
// Build API request for the week
const searchParams = new URLSearchParams({
adjType:'none',
adjusted:'true',
freq:'day',
interval:'1',
marketSession:'mkt',
pathName:'/demo/portal/company-quotes.php',
qmodTool:'InteractiveChart',
start: currentWeek.toISOString().split('T')[0],
end: endOfWeek.toISOString().split('T')[0], // Next Sunday
symbol: qmSearchCode,
unadjusted:'false',
webmasterId:'500',
zeroTradeDays:'false',
} as Record<string, string>);
try {
const response = await fetch(`${QM_CONFIG.PRICES_URL}?${searchParams.toString()}`, {
method: 'GET',
headers: session.headers,
proxy: session.proxy,
});
const result = await processIntradayBatch.call(this, {
if (!response.ok) {
throw new Error(`API request failed: ${response.status}`);
}
const barsResults = await response.json();
// Parse the results based on the API response structure
const barsData = barsResults.results?.intraday?.[0]?.interval || null;
const barsLength = barsData !== null ? barsData.length : 0;
this.logger.info(`Fetched ${barsLength} bars for ${qmSearchCode} for week of ${currentWeek}`, {
qmSearchCode,
currentWeek,
records: barsLength
});
// Update session success stats
await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid);
// Process and store data if we got any
if (barsData !== null && barsLength > 0) {
const processedBars = barsData.map((bar: any) => ({
...bar,
qmSearchCode,
symbol,
exchange,
qmSearchCode,
dateRange: range
timestamp: new Date(bar.startdatetime || bar.date || bar.datetime),
}));
await this.mongodb.batchUpsert(
'qmIntraday',
processedBars,
['qmSearchCode', 'timestamp']
);
totalRecords += barsLength;
}
weeksProcessed = 1;
const nextWeek = getLastWeek(new Date(currentWeek));
const finished = (barsData === null) && nextWeek < new Date('2024-01-01')
this.operationRegistry.updateOperation('qm', qmSearchCode, 'intraday_bars', {
status: 'success',
crawlState: {
finished: finished,
oldestDateReached: currentCrawlState.oldestDateReached,
lastProcessedDate: currentWeek
}
});
totalRecords += result.recordsProcessed;
totalDates += result.datesProcessed;
allErrors.push(...result.errors);
// Calculate next week to process (previous Sunday)
// Update crawl state after each batch
const updatedCrawlState: Partial<CrawlState> = {
lastProcessedDate: range.end,
lastCrawlDirection: range.direction,
totalDaysProcessed: (currentCrawlState.totalDaysProcessed || 0) + result.datesProcessed
};
if (range.direction === 'forward') {
updatedCrawlState.newestDateReached = range.end;
if (!currentCrawlState.oldestDateReached) {
updatedCrawlState.oldestDateReached = range.start;
}
} else {
updatedCrawlState.oldestDateReached = range.end;
if (!currentCrawlState.newestDateReached) {
updatedCrawlState.newestDateReached = range.start;
}
}
// Check if we've completed the crawl
if (range.direction === 'backward' && range.end <= targetOldest) {
updatedCrawlState.finished = true;
updatedCrawlState.targetOldestDate = targetOldest;
}
await this.operationRegistry.updateOperation('qm', qmSearchCode, 'intraday_bars', {
status: allErrors.length > 0 ? 'partial' : 'success',
lastRecordDate: today,
recordCount: totalRecords,
crawlState: updatedCrawlState,
error: allErrors.length > 0 ? allErrors.join('; ') : undefined
// console.log(barsData === null , existingCrawlState);
if(!finished) {
currentCrawlState.lastProcessedDate = nextWeek;
await this.scheduleOperation('crawl-intraday-data', {
symbol: symbol,
exchange: exchange,
qmSearchCode: qmSearchCode,
targetOldestDate,
// Pass existing crawl state and gaps to avoid re-querying
existingCrawlState: currentCrawlState,
gaps: gaps
}, {
priority: 4, // Standard priority for backward crawls
});
}
const message = `Processed ${totalDates} days, ${totalRecords} records for ${symbol}`;
this.logger.info('Intraday crawl batch completed', {
symbol,
totalDates,
totalRecords,
errors: allErrors.length,
finished: ranges.some(r => r.direction === 'backward' && r.end <= targetOldest)
});
return {
success: allErrors.length === 0,
success: true,
symbol,
exchange,
qmSearchCode,
message,
message: `Intraday crawl completed for ${symbol} - Processed ${weeksProcessed} week(s), ${totalRecords} records`,
data: {
datesProcessed: totalDates,
recordsProcessed: totalRecords,
errors: allErrors,
crawlComplete: ranges.some(r => r.direction === 'backward' && r.end <= targetOldest)
totalRecords,
weeksProcessed,
nextWeek: nextWeek.toISOString().split('T')[0],
errors: errors.length > 0 ? errors : undefined
}
}
};
} catch (error) {
this.logger.error('Intraday crawl failed', {
@ -374,6 +232,15 @@ export async function crawlIntradayData(
error: error instanceof Error ? error.message : 'Unknown error'
});
return {
success: false,
symbol,
exchange,
qmSearchCode,
message: `Intraday crawl failed: ${error instanceof Error ? error.message : 'Unknown error'}`
};
}
}catch (error) {
return {
success: false,
symbol,
@ -392,7 +259,7 @@ export async function scheduleIntradayCrawls(
input: {
limit?: number;
targetOldestDate?: string;
priorityMode?: 'never_run' | 'incomplete' | 'stale' | 'all';
priorityMode?: 'never_run' | 'incomplete' | 'all';
} = {},
_context?: ExecutionContext
): Promise<{
@ -401,7 +268,7 @@ export async function scheduleIntradayCrawls(
errors: number;
}> {
const {
limit = 1,
limit = 999999999,
targetOldestDate = '1960-01-01',
priorityMode = 'all'
} = input;
@ -433,25 +300,27 @@ export async function scheduleIntradayCrawls(
// Get symbols with incomplete crawls
symbolsToProcess = await tracker.getSymbolsForIntradayCrawl(
'intraday_bars',
{ limit, targetOldestDate: new Date(targetOldestDate), includeNewDataGaps: false }
{
limit,
targetOldestDate: new Date(targetOldestDate),
includeNewDataGaps: false, // Only backward gaps
// symbolFilter: { symbol: 'ETU' } // Filter for testing
}
);
break;
case 'stale':
// Get symbols that need updates (new data)
symbolsToProcess = await tracker.getSymbolsForIntradayCrawl(
'intraday_bars',
{ limit, targetOldestDate: new Date(targetOldestDate), includeNewDataGaps: true }
);
symbolsToProcess = symbolsToProcess.filter(s => s.gaps?.forward);
break;
case 'all':
default:
// Get all symbols that need any processing
symbolsToProcess = await tracker.getSymbolsForIntradayCrawl(
'intraday_bars',
{ limit, targetOldestDate: new Date(targetOldestDate) }
{
limit,
targetOldestDate: new Date(targetOldestDate),
includeNewDataGaps: false, // Only backward gaps since we removed forward crawling
// symbolFilter: { symbol: 'AAPL' } // Filter for testing
}
);
break;
}
@ -463,7 +332,16 @@ export async function scheduleIntradayCrawls(
errors: 0
};
}
symbolsToProcess = [{symbol: 'X:CA'}]
this.logger.info('Symbols to process from getSymbolsForIntradayCrawl', {
count: symbolsToProcess.length,
firstSymbol: symbolsToProcess[0] ? {
symbol: symbolsToProcess[0].symbol,
gaps: symbolsToProcess[0].gaps,
hasOperationStatus: !!symbolsToProcess[0].operationStatus,
crawlState: symbolsToProcess[0].operationStatus?.crawlState
} : null
});
// Get full symbol data if needed
if (priorityMode !== 'never_run') {
@ -474,13 +352,32 @@ export async function scheduleIntradayCrawls(
projection: { symbol: 1, exchange: 1, qmSearchCode: 1, operations: 1 }
});
// Map back the full data
// Map back the full data while preserving gaps and operation status
symbolsToProcess = symbolsToProcess.map(sp => {
const full = fullSymbols.find(f => f.qmSearchCode === sp.symbol);
return full || sp;
if (full) {
return {
...full,
gaps: sp.gaps, // Preserve gap information
operationStatus: sp.operationStatus // Preserve original operation status
};
}
return sp;
});
}
this.logger.info('After mapping, symbols to process', {
count: symbolsToProcess.length,
firstSymbol: symbolsToProcess[0] ? {
symbol: symbolsToProcess[0].symbol,
exchange: symbolsToProcess[0].exchange,
qmSearchCode: symbolsToProcess[0].qmSearchCode,
gaps: symbolsToProcess[0].gaps,
hasOperationStatus: !!symbolsToProcess[0].operationStatus,
crawlState: symbolsToProcess[0].operationStatus?.crawlState
} : null
});
let symbolsQueued = 0;
let errors = 0;
@ -492,9 +389,12 @@ export async function scheduleIntradayCrawls(
symbol: doc.symbol,
exchange: doc.exchange,
qmSearchCode: doc.qmSearchCode,
targetOldestDate
targetOldestDate,
// Pass existing crawl state and gaps to avoid re-querying
existingCrawlState: doc.operationStatus?.crawlState,
gaps: doc.gaps
}, {
priority: priorityMode === 'stale' ? 9 : 5, // Higher priority for updates
priority: 6, // Standard priority for backward crawls
});
symbolsQueued++;

View file

@ -1,302 +0,0 @@
/**
* QM Intraday Actions - Fetch and update intraday price bars
*/
import type { ExecutionContext } from '@stock-bot/handlers';
import type { QMHandler } from '../qm.handler';
import { QM_CONFIG, QM_SESSION_IDS } from '../shared/config';
import { QMSessionManager } from '../shared/session-manager';
/**
* Update intraday bars for a single symbol
* This handles both initial crawl and incremental updates
*/
export async function updateIntradayBars(
this: QMHandler,
input: {
symbol: string;
symbolId: number;
qmSearchCode: string;
crawlDate?: string; // ISO date string for specific date crawl
},
_context?: ExecutionContext
): Promise<{
success: boolean;
symbol: string;
message: string;
data?: any;
}> {
const { symbol, symbolId, qmSearchCode, crawlDate } = input;
this.logger.info('Fetching intraday bars', { symbol, symbolId, crawlDate });
const sessionManager = QMSessionManager.getInstance();
await sessionManager.initialize(this.cache, this.logger);
// Get a session - you'll need to add the appropriate session ID for intraday
const sessionId = QM_SESSION_IDS.LOOKUP; // TODO: Update with correct session ID
const session = await sessionManager.getSession(sessionId);
if (!session || !session.uuid) {
throw new Error(`No active session found for QM intraday`);
}
try {
// Determine the date to fetch
const targetDate = crawlDate ? new Date(crawlDate) : new Date();
// Build API request for intraday bars
const searchParams = new URLSearchParams({
symbol: symbol,
symbolId: symbolId.toString(),
qmodTool: 'IntradayBars',
webmasterId: '500',
date: targetDate.toISOString().split('T')[0],
interval: '1' // 1-minute bars
} as Record<string, string>);
// TODO: Update with correct intraday endpoint
const apiUrl = `${QM_CONFIG.BASE_URL}/datatool/intraday.json?${searchParams.toString()}`;
const response = await fetch(apiUrl, {
method: 'GET',
headers: session.headers,
proxy: session.proxy,
});
if (!response.ok) {
throw new Error(`QM API request failed: ${response.status} ${response.statusText}`);
}
const barsData = await response.json();
// Update session success stats
await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid);
// Process and store intraday data
if (barsData && barsData.length > 0) {
// Store bars in a separate collection
const processedBars = barsData.map((bar: any) => ({
...bar,
symbol,
symbolId,
timestamp: new Date(bar.timestamp),
date: targetDate,
updated_at: new Date()
}));
await this.mongodb.batchUpsert(
'qmIntradayBars',
processedBars,
['symbol', 'timestamp'] // Unique keys
);
// Update operation tracking
await this.operationRegistry.updateOperation('qm', qmSearchCode, 'intraday_bars', {
status: 'success',
lastRecordDate: targetDate,
recordCount: barsData.length
});
this.logger.info('Intraday bars updated successfully', {
symbol,
date: targetDate,
barCount: barsData.length
});
return {
success: true,
symbol,
message: `Intraday bars updated for ${symbol} on ${targetDate.toISOString().split('T')[0]}`,
data: {
count: barsData.length,
date: targetDate
}
};
} else {
// No data for this date (weekend, holiday, or no trading)
this.logger.info('No intraday data for date', { symbol, date: targetDate });
// Still update operation tracking as successful (no data is a valid result)
await this.operationRegistry.updateOperation('qm', qmSearchCode, 'intraday_bars', {
status: 'success',
lastRecordDate: targetDate,
recordCount: 0
});
return {
success: true,
symbol,
message: `No intraday data for ${symbol} on ${targetDate.toISOString().split('T')[0]}`,
data: {
count: 0,
date: targetDate
}
};
}
} catch (error) {
// Update session failure stats
if (session.uuid) {
await sessionManager.incrementFailedCalls(sessionId, session.uuid);
}
this.logger.error('Error fetching intraday bars', {
symbol,
error: error instanceof Error ? error.message : 'Unknown error'
});
// Update operation tracking for failure
await this.operationRegistry.updateOperation('qm', qmSearchCode, 'intraday_bars', {
status: 'failure'
});
return {
success: false,
symbol,
message: `Failed to fetch intraday bars: ${error instanceof Error ? error.message : 'Unknown error'}`
};
}
}
/**
* Schedule intraday updates for symbols
* This handles both initial crawls and regular updates
*/
export async function scheduleIntradayUpdates(
this: QMHandler,
input: {
limit?: number;
mode?: 'crawl' | 'update'; // crawl for historical, update for recent
forceUpdate?: boolean;
} = {},
_context?: ExecutionContext
): Promise<{
message: string;
symbolsQueued: number;
jobsQueued: number;
errors: number;
}> {
const { limit = 50, mode = 'update', forceUpdate = false } = input;
this.logger.info('Scheduling intraday updates', { limit, mode, forceUpdate });
try {
let symbolsToProcess: any[] = [];
if (mode === 'crawl') {
// Get symbols that need historical crawl
symbolsToProcess = await this.operationRegistry.getSymbolsForCrawl('qm', 'intraday_bars', {
limit
});
} else {
// Get symbols that need regular updates
const staleSymbols = await this.operationRegistry.getStaleSymbols('qm', 'intraday_bars', {
minHoursSinceRun: forceUpdate ? 0 : 1, // Hourly updates
limit
});
if (staleSymbols.length === 0) {
this.logger.info('No symbols need intraday updates');
return {
message: 'No symbols need intraday updates',
symbolsQueued: 0,
jobsQueued: 0,
errors: 0
};
}
// Get full symbol data
symbolsToProcess = await this.mongodb.find('qmSymbols', {
qmSearchCode: { $in: staleSymbols }
}, {
projection: { symbol: 1, symbolId: 1, qmSearchCode: 1 }
});
}
if (symbolsToProcess.length === 0) {
this.logger.info('No symbols to process for intraday');
return {
message: 'No symbols to process',
symbolsQueued: 0,
jobsQueued: 0,
errors: 0
};
}
this.logger.info(`Found ${symbolsToProcess.length} symbols for intraday ${mode}`);
let symbolsQueued = 0;
let jobsQueued = 0;
let errors = 0;
// Process each symbol
for (const doc of symbolsToProcess) {
try {
if (!doc.symbolId) {
this.logger.warn(`Symbol ${doc.symbol} missing symbolId, skipping`);
continue;
}
if (mode === 'crawl' && doc.crawlState) {
// For crawl mode, schedule multiple days going backwards
const startDate = doc.crawlState.oldestDateReached || new Date();
const daysToFetch = 30; // Fetch 30 days at a time
for (let i = 0; i < daysToFetch; i++) {
const crawlDate = new Date(startDate);
crawlDate.setDate(crawlDate.getDate() - i);
await this.scheduleOperation('update-intraday-bars', {
symbol: doc.symbol,
symbolId: doc.symbolId,
qmSearchCode: doc.qmSearchCode,
crawlDate: crawlDate.toISOString()
}, {
priority: 6,
delay: jobsQueued * 1000 // 1 second between jobs
});
jobsQueued++;
}
// Note: Crawl state will be updated when the actual jobs run
} else {
// For update mode, just fetch today's data
await this.scheduleOperation('update-intraday-bars', {
symbol: doc.symbol,
symbolId: doc.symbolId,
qmSearchCode: doc.qmSearchCode
}, {
priority: 8, // High priority for current data
delay: jobsQueued * 500 // 0.5 seconds between jobs
});
jobsQueued++;
}
symbolsQueued++;
} catch (error) {
this.logger.error(`Failed to schedule intraday update for ${doc.symbol}`, { error });
errors++;
}
}
this.logger.info('Intraday update scheduling completed', {
symbolsQueued,
jobsQueued,
errors,
mode
});
return {
message: `Scheduled intraday ${mode} for ${symbolsQueued} symbols (${jobsQueued} jobs)`,
symbolsQueued,
jobsQueued,
errors
};
} catch (error) {
this.logger.error('Intraday scheduling failed', { error });
throw error;
}
}

View file

@ -44,6 +44,7 @@ export async function checkSessions(
for (let i = 0; i < toQueue; i++) {
await this.scheduleOperation('create-session', { sessionId, sessionType }, {
priority: 0,
// delay: i * 2000, // Stagger creation by 2 seconds
});
queuedCount++;
@ -107,7 +108,7 @@ export async function createSession(
// Build request options
const sessionRequest = {
proxy: proxyUrl || undefined,
headers: getQmHeaders(sessionType),
headers: getQmHeaders(),
};
this.logger.debug('Authenticating with QM API', { sessionUrl, sessionRequest });

View file

@ -14,7 +14,6 @@ import {
scheduleEventsUpdates,
scheduleFinancialsUpdates,
scheduleInsidersUpdates,
scheduleIntradayUpdates,
schedulePriceUpdates,
scheduleSymbolInfoUpdates,
scheduleSymbolNewsUpdates,
@ -26,7 +25,6 @@ import {
updateFinancials,
updateGeneralNews,
updateInsiders,
updateIntradayBars,
updatePrices,
updateSymbolInfo,
updateSymbolNews
@ -156,32 +154,20 @@ export class QMHandler extends BaseHandler<DataIngestionServices> {
/**
* INTRADAY DATA
*/
@Operation('update-intraday-bars')
updateIntradayBars = updateIntradayBars;
@Operation('crawl-intraday-data')
crawlIntradayData = crawlIntradayData;
@Operation('schedule-intraday-crawls')
scheduleIntradayCrawls = scheduleIntradayCrawls;
@Disabled()
@ScheduledOperation('schedule-intraday-updates', '*/30 * * * *', {
priority: 9,
immediately: false,
description: 'Check for symbols needing intraday updates every 30 minutes'
})
scheduleIntradayUpdates = scheduleIntradayUpdates;
// @Disabled()
@ScheduledOperation('schedule-intraday-crawls-batch', '0 */12 * * *', {
@ScheduledOperation('schedule-intraday-crawls-batch', '0 0 * * 0', {
priority: 5,
immediately: false,
description: 'Schedule intraday crawls for incomplete symbols every 12 hours'
})
scheduleIntradayCrawlsBatch = async () => {
return scheduleIntradayCrawls.call(this, {
limit: 25,
priorityMode: 'incomplete'
});
};

View file

@ -37,6 +37,7 @@ export const QM_CONFIG = {
LOOKUP_URL: 'https://app.quotemedia.com/datatool/lookup.json',
SYMBOL_URL: 'https://app.quotemedia.com/datatool/getProfiles.json',
PRICES_URL: 'https://app.quotemedia.com/datatool/getEnhancedChartData.json',
INTRADAY_URL: 'https://app.quotemedia.com/datatool/getEnhancedChartData.json',
EVENTS_URL: 'https://app.quotemedia.com/datatool/getIndicatorsBySymbol.json',
FINANCIALS_URL: 'https://app.quotemedia.com/datatool/getFinancialsEnhancedBySymbol.json',
FILING_URL: 'https://app.quotemedia.com/datatool/getCompanyFilings.json',
@ -44,13 +45,13 @@ export const QM_CONFIG = {
// Session management settings
export const SESSION_CONFIG = {
MAX_SESSIONS: 5,
MAX_SESSIONS: 100,
MAX_FAILED_CALLS: 5,
SESSION_TIMEOUT: 5000, // 10 seconds
API_TIMEOUT: 30000, // 15 seconds
} as const;
export function getQmHeaders(type?: string): Record<string, string> {
export function getQmHeaders(): Record<string, string> {
// if(type?.toUpperCase() === 'FILINGS') {
// return {
// 'User-Agent': getRandomUserAgent(),
@ -100,6 +101,23 @@ export function getWeekStart(dateInput: Date | string): Date {
return date;
}
export function getLastWeek(dateInput: Date | string): Date {
// Handle string input properly
let date: Date;
if (typeof dateInput === 'string') {
date = parseLocalDate(dateInput);
} else {
// Create new date with local time components
date = new Date(dateInput.getFullYear(), dateInput.getMonth(), dateInput.getDate());
}
// Subtract 7 days
date.setDate(date.getDate() - 7);
date.setHours(0, 0, 0, 0);
return date;
}
// Get end of week (Sunday)
export function getWeekEnd(dateInput: Date | string): Date {
let date: Date;

View file

@ -317,7 +317,8 @@ export class OperationTracker {
const {
limit = 1000,
excludeSymbols = [],
activeOnly = true
activeOnly = true,
symbolFilter
} = options;
this.provider.validateOperation(operationName);
@ -345,6 +346,11 @@ export class OperationTracker {
filter[symbolField] = { $nin: excludeSymbols };
}
// Add symbol filter if provided
if (symbolFilter?.symbol) {
filter.symbol = symbolFilter.symbol;
}
const symbols = await this.mongodb.find(collectionName, filter, {
limit,
projection: { [symbolField]: 1 },
@ -431,10 +437,11 @@ export class OperationTracker {
limit?: number;
targetOldestDate?: Date;
includeNewDataGaps?: boolean;
symbolFilter?: { symbol?: string };
} = {}
): Promise<Array<SymbolWithOperations & { gaps?: { forward?: boolean; backward?: boolean } }>> {
const { collectionName, symbolField } = this.provider.getProviderConfig();
const { limit = 100, targetOldestDate, includeNewDataGaps = true } = options;
const { limit = 100, targetOldestDate, includeNewDataGaps = true, symbolFilter } = options;
this.provider.validateOperation(operationName);
@ -443,6 +450,11 @@ export class OperationTracker {
active: { $ne: false }
};
// Add symbol filter if provided
if (symbolFilter?.symbol) {
filter.symbol = symbolFilter.symbol;
}
// Get all symbols that either:
// 1. Have never been crawled
// 2. Are not finished
@ -468,7 +480,11 @@ export class OperationTracker {
limit,
projection: {
[symbolField]: 1,
[`operations.${operationName}`]: 1
[`operations.${operationName}`]: 1,
// Include common fields that might be needed
symbol: 1,
exchange: 1,
qmSearchCode: 1
},
sort: {
[`operations.${operationName}.lastRunAt`]: 1
@ -479,19 +495,11 @@ export class OperationTracker {
const opStatus = doc.operations?.[operationName];
const crawlState = opStatus?.crawlState;
// Determine gaps
// Determine gaps (only backward since we removed forward crawling)
const gaps: { forward?: boolean; backward?: boolean } = {};
if (crawlState) {
// Check for forward gap (new data)
if (crawlState.newestDateReached) {
const daysSinceNewest = Math.floor(
(Date.now() - new Date(crawlState.newestDateReached).getTime()) / (1000 * 60 * 60 * 24)
);
gaps.forward = daysSinceNewest > 1;
}
// Check for backward gap (historical data)
// Only check for backward gap (historical data)
if (!crawlState.finished) {
gaps.backward = true;
if (targetOldestDate && crawlState.oldestDateReached) {
@ -499,8 +507,7 @@ export class OperationTracker {
}
}
} else {
// Never crawled, has both gaps
gaps.forward = true;
// Never crawled, needs backward crawl
gaps.backward = true;
}
@ -508,8 +515,11 @@ export class OperationTracker {
symbol: doc[symbolField],
lastRecordDate: opStatus?.lastRecordDate,
operationStatus: opStatus,
gaps
};
gaps,
// Include other potentially useful fields
exchange: doc.exchange,
qmSearchCode: doc.qmSearchCode || doc[symbolField]
} as SymbolWithOperations & { gaps?: { forward?: boolean; backward?: boolean } };
});
}

View file

@ -114,6 +114,8 @@ export interface StaleSymbolOptions {
excludeSymbols?: string[];
/** Only include active symbols */
activeOnly?: boolean;
/** Filter for specific symbols */
symbolFilter?: { symbol?: string };
}
/**

View file

@ -0,0 +1,156 @@
/**
* Test script to verify intraday crawl data flow
*/
import { OperationTracker, OperationRegistry } from '../src/shared/operation-manager';
import type { DataIngestionServices } from '../src/types';
async function testIntradayDataFlow() {
console.log('=== Testing Intraday Data Flow ===\n');
// Mock services
const mockServices: DataIngestionServices = {
mongodb: {
collection: (name: string) => ({
find: () => ({
toArray: async () => {
console.log(`Mock: Query collection ${name}`);
if (name === 'qmSymbols') {
return [{
symbol: 'X',
symbolId: 123456,
qmSearchCode: 'X:NYSE',
exchange: 'NYSE',
operations: {
intraday_bars: {
lastRunAt: new Date('2024-01-01'),
lastSuccessAt: new Date('2024-01-01'),
status: 'success',
crawlState: {
finished: false,
oldestDateReached: new Date('2023-01-01'),
newestDateReached: new Date('2024-01-01'),
lastProcessedDate: new Date('2023-06-15'),
totalDaysProcessed: 180
}
}
}
}];
}
return [];
}
}),
createIndex: async () => ({ ok: 1 })
}),
find: async (collection: string, filter: any, options?: any) => {
console.log(`Mock: Direct find on ${collection}`, { filter, options });
return [{
symbol: 'X',
qmSearchCode: 'X:NYSE',
exchange: 'NYSE',
operations: {
intraday_bars: {
crawlState: {
finished: false,
oldestDateReached: new Date('2023-01-01'),
newestDateReached: new Date('2024-01-01')
}
}
}
}];
}
} as any,
logger: {
info: (msg: string, data?: any) => console.log(`[INFO] ${msg}`, JSON.stringify(data, null, 2)),
error: (msg: string, data?: any) => console.error(`[ERROR] ${msg}`, data || ''),
warn: (msg: string, data?: any) => console.warn(`[WARN] ${msg}`, data || ''),
debug: (msg: string, data?: any) => console.debug(`[DEBUG] ${msg}`, data || '')
}
} as DataIngestionServices;
// Create registry and provider
const registry = new OperationRegistry(mockServices as any);
// Mock provider
const mockProvider = {
getProviderConfig: () => ({
name: 'qm',
collectionName: 'qmSymbols',
symbolField: 'qmSearchCode'
}),
getOperations: () => [{
name: 'intraday_bars',
type: 'intraday_crawl' as const,
defaultStaleHours: 1
}],
validateOperation: () => true,
getOperation: () => ({ name: 'intraday_bars', type: 'intraday_crawl' }),
getDefaultStaleHours: () => 1,
initialize: async () => {},
beforeOperationUpdate: async () => {},
afterOperationUpdate: async () => {}
};
const tracker = new OperationTracker(mockServices as any, mockProvider as any);
await tracker.initialize();
// Test 1: Get symbols for intraday crawl
console.log('\nTest 1: Get symbols for intraday crawl with symbol filter');
const symbols = await tracker.getSymbolsForIntradayCrawl('intraday_bars', {
limit: 10,
targetOldestDate: new Date('2020-01-01'),
includeNewDataGaps: true,
symbolFilter: { symbol: 'X' }
});
console.log(`\nFound ${symbols.length} symbols:`);
symbols.forEach(sym => {
console.log({
symbol: sym.symbol,
qmSearchCode: sym.qmSearchCode,
exchange: sym.exchange,
gaps: sym.gaps,
crawlState: sym.operationStatus?.crawlState
});
});
// Test 2: Verify data preservation
console.log('\n\nTest 2: Verify data is preserved through mapping');
const testSymbol = symbols[0];
if (testSymbol) {
console.log('Original data:');
console.log({
symbol: testSymbol.symbol,
hasGaps: !!testSymbol.gaps,
gaps: testSymbol.gaps,
hasOperationStatus: !!testSymbol.operationStatus,
crawlState: testSymbol.operationStatus?.crawlState
});
// Simulate mapping
const mapped = {
...testSymbol,
symbol: 'X',
exchange: 'NYSE',
qmSearchCode: 'X:NYSE',
gaps: testSymbol.gaps,
operationStatus: testSymbol.operationStatus
};
console.log('\nAfter mapping:');
console.log({
symbol: mapped.symbol,
exchange: mapped.exchange,
qmSearchCode: mapped.qmSearchCode,
hasGaps: !!mapped.gaps,
gaps: mapped.gaps,
hasOperationStatus: !!mapped.operationStatus,
crawlState: mapped.operationStatus?.crawlState
});
}
console.log('\n=== Tests Complete ===');
}
// Run tests
testIntradayDataFlow().catch(console.error);