From 11c24b22801a2ccbb093c495963ab5dcbc49f221 Mon Sep 17 00:00:00 2001 From: Boki Date: Wed, 2 Jul 2025 18:26:30 -0400 Subject: [PATCH] finished intra-day crawl --- apps/stock/config/config/default.json | 8 +- .../data-ingestion/docs/INTRADAY_CRAWL.md | 122 ---- .../docs/OPERATION_TRACKER_ENHANCEMENTS.md | 107 ---- .../src/handlers/qm/actions/index.ts | 7 +- .../qm/actions/intraday-crawl.action.ts | 544 +++++++----------- .../handlers/qm/actions/intraday.action.ts | 302 ---------- .../src/handlers/qm/actions/session.action.ts | 3 +- .../src/handlers/qm/qm.handler.ts | 16 +- .../src/handlers/qm/shared/config.ts | 22 +- .../operation-manager/OperationTracker.ts | 44 +- .../src/shared/operation-manager/types.ts | 2 + .../data-ingestion/test/test-intraday-flow.ts | 156 +++++ 12 files changed, 437 insertions(+), 896 deletions(-) delete mode 100644 apps/stock/data-ingestion/docs/INTRADAY_CRAWL.md delete mode 100644 apps/stock/data-ingestion/docs/OPERATION_TRACKER_ENHANCEMENTS.md delete mode 100644 apps/stock/data-ingestion/src/handlers/qm/actions/intraday.action.ts create mode 100644 apps/stock/data-ingestion/test/test-intraday-flow.ts diff --git a/apps/stock/config/config/default.json b/apps/stock/config/config/default.json index dde869a..0ebc0f0 100644 --- a/apps/stock/config/config/default.json +++ b/apps/stock/config/config/default.json @@ -77,8 +77,8 @@ "port": 6379, "db": 1 }, - "workers": 1, - "concurrency": 1, + "workers": 5, + "concurrency": 5, "enableScheduledJobs": true, "defaultJobOptions": { "attempts": 3, @@ -183,11 +183,11 @@ "services": { "dataIngestion": { "port": 2001, - "workers": 4, + "workers": 5, "queues": { "ceo": { "concurrency": 2 }, "webshare": { "concurrency": 1 }, - "qm": { "concurrency": 2 }, + "qm": { "concurrency": 5 }, "ib": { "concurrency": 1 }, "proxy": { "concurrency": 1 } }, diff --git a/apps/stock/data-ingestion/docs/INTRADAY_CRAWL.md b/apps/stock/data-ingestion/docs/INTRADAY_CRAWL.md deleted file mode 100644 index 652b271..0000000 --- a/apps/stock/data-ingestion/docs/INTRADAY_CRAWL.md +++ /dev/null @@ -1,122 +0,0 @@ -# Intraday Crawl System - -## Overview - -The intraday crawl system is designed to handle large-scale historical data collection with proper resumption support. It tracks the oldest and newest dates reached, allowing it to resume from where it left off if interrupted. - -## Key Features - -1. **Bidirectional Crawling**: Can crawl both forward (for new data) and backward (for historical data) -2. **Resumption Support**: Tracks progress and can resume from where it left off -3. **Gap Detection**: Automatically detects gaps in data coverage -4. **Batch Processing**: Processes data in configurable batches (default: 7 days) -5. **Completion Tracking**: Knows when a symbol's full history has been fetched - -## Crawl State Fields - -The system tracks the following state for each symbol: - -```typescript -interface CrawlState { - finished: boolean; // Whether crawl is complete - oldestDateReached?: Date; // Oldest date we've fetched - newestDateReached?: Date; // Newest date we've fetched - lastProcessedDate?: Date; // Last date processed (for resumption) - totalDaysProcessed?: number; // Total days processed so far - lastCrawlDirection?: 'forward' | 'backward'; - targetOldestDate?: Date; // Target date to reach -} -``` - -## How It Works - -### Initial Crawl -1. Starts from today and fetches current data -2. Then begins crawling backward in weekly batches -3. Continues until it reaches the target oldest date (default: 2020-01-01) -4. Marks as finished when complete - -### Resumption After Interruption -1. Checks for forward gap: If `newestDateReached < yesterday`, fetches new data first -2. Checks for backward gap: If not finished and `oldestDateReached > targetOldestDate`, continues backward crawl -3. Resumes from `lastProcessedDate` to avoid re-fetching data - -### Daily Updates -Once a symbol is fully crawled: -- Only needs to fetch new data (forward crawl) -- Much faster as it's typically just 1-2 days of data - -## Usage - -### Manual Crawl for Single Symbol -```typescript -await handler.crawlIntradayData({ - symbol: 'AAPL', - symbolId: 12345, - qmSearchCode: 'AAPL', - targetOldestDate: '2020-01-01', - batchSize: 7 // Days per batch -}); -``` - -### Schedule Crawls for Multiple Symbols -```typescript -await handler.scheduleIntradayCrawls({ - limit: 50, - targetOldestDate: '2020-01-01', - priorityMode: 'incomplete' // or 'never_run', 'stale', 'all' -}); -``` - -### Check Crawl Status -```typescript -const tracker = handler.operationRegistry.getTracker('qm'); -const isComplete = await tracker.isIntradayCrawlComplete('AAPL', 'intraday_bars', new Date('2020-01-01')); -``` - -### Get Symbols Needing Crawl -```typescript -const symbols = await tracker.getSymbolsForIntradayCrawl('intraday_bars', { - limit: 100, - targetOldestDate: new Date('2020-01-01'), - includeNewDataGaps: true // Include symbols needing updates -}); -``` - -## Priority Modes - -- **never_run**: Symbols that have never been crawled (highest priority) -- **incomplete**: Symbols with unfinished crawls -- **stale**: Symbols with complete crawls but new data available -- **all**: All symbols needing any processing - -## Scheduled Operations - -The system includes scheduled operations: -- `schedule-intraday-crawls-batch`: Runs every 4 hours, processes incomplete crawls - -## Monitoring - -Use the provided scripts to monitor crawl progress: - -```bash -# Check overall status -bun run scripts/check-intraday-status.ts - -# Test crawl for specific symbol -bun run test/intraday-crawl.test.ts -``` - -## Performance Considerations - -1. **Rate Limiting**: Delays between API calls to avoid rate limits -2. **Weekend Skipping**: Automatically skips weekends to save API calls -3. **Batch Size**: Configurable batch size (default 7 days) balances progress vs memory -4. **Priority Scheduling**: Higher priority for current data updates - -## Error Handling - -- Failed batches don't stop the entire crawl -- Errors are logged and stored in the operation status -- Partial success is tracked separately from complete failure -- Session failures trigger automatic session rotation \ No newline at end of file diff --git a/apps/stock/data-ingestion/docs/OPERATION_TRACKER_ENHANCEMENTS.md b/apps/stock/data-ingestion/docs/OPERATION_TRACKER_ENHANCEMENTS.md deleted file mode 100644 index 12f4b26..0000000 --- a/apps/stock/data-ingestion/docs/OPERATION_TRACKER_ENHANCEMENTS.md +++ /dev/null @@ -1,107 +0,0 @@ -# Operation Tracker Enhancements for Intraday Crawling - -## Summary of Changes - -This document summarizes the enhancements made to the operation tracker to support sophisticated intraday data crawling with resumption capabilities. - -## Changes Made - -### 1. Enhanced CrawlState Interface (`types.ts`) -Added new fields to track crawl progress: -- `newestDateReached`: Track the most recent date processed -- `lastProcessedDate`: For resumption after interruption -- `totalDaysProcessed`: Progress tracking -- `targetOldestDate`: The goal date to reach - -### 2. Updated OperationTracker (`OperationTracker.ts`) -- Modified `updateSymbolOperation` to handle new crawl state fields -- Updated `bulkUpdateSymbolOperations` for proper Date handling -- Enhanced `markCrawlFinished` to track both oldest and newest dates -- Added `getSymbolsForIntradayCrawl`: Specialized method for intraday crawls with gap detection -- Added `isIntradayCrawlComplete`: Check if a crawl has reached its target -- Added new indexes for efficient querying on crawl state fields - -### 3. New Intraday Crawl Action (`intraday-crawl.action.ts`) -Created a sophisticated crawl system with: -- **Bidirectional crawling**: Handles both forward (new data) and backward (historical) gaps -- **Batch processing**: Processes data in weekly batches by default -- **Resumption logic**: Can resume from where it left off if interrupted -- **Gap detection**: Automatically identifies missing date ranges -- **Completion tracking**: Knows when the full history has been fetched - -### 4. Integration with QM Handler -- Added new operations: `crawl-intraday-data` and `schedule-intraday-crawls` -- Added scheduled operation to automatically process incomplete crawls every 4 hours -- Integrated with the existing operation registry system - -### 5. Testing and Monitoring Tools -- Created test script to verify crawl functionality -- Created status checking script to monitor crawl progress -- Added comprehensive documentation - -## How It Works - -### Initial Crawl Flow -1. Symbol starts with no crawl state -2. First crawl fetches today's data and sets `newestDateReached` -3. Subsequent batches crawl backward in time -4. Each batch updates `oldestDateReached` and `lastProcessedDate` -5. When `oldestDateReached <= targetOldestDate`, crawl is marked finished - -### Resumption Flow -1. Check if `newestDateReached < yesterday` (forward gap) -2. If yes, fetch new data first to stay current -3. Check if `finished = false` (backward gap) -4. If yes, continue backward crawl from `lastProcessedDate` -5. Process in batches until complete - -### Daily Update Flow -1. For finished crawls, only check for forward gaps -2. Fetch data from `newestDateReached + 1` to today -3. Update `newestDateReached` to maintain currency - -## Benefits - -1. **Resilient**: Can handle interruptions gracefully -2. **Efficient**: Avoids re-fetching data -3. **Trackable**: Clear progress visibility -4. **Scalable**: Can handle thousands of symbols -5. **Flexible**: Configurable batch sizes and target dates - -## Usage Examples - -### Check if symbol X needs crawling: -```typescript -const tracker = operationRegistry.getTracker('qm'); -const symbols = await tracker.getSymbolsForIntradayCrawl('intraday_bars', { - limit: 1, - targetOldestDate: new Date('2020-01-01') -}); -const symbolX = symbols.find(s => s.symbol === 'X'); -``` - -### Start crawl for symbol X only: -```typescript -await handler.crawlIntradayData({ - symbol: 'X', - symbolId: symbolData.symbolId, - qmSearchCode: symbolData.qmSearchCode, - targetOldestDate: '2020-01-01' -}); -``` - -### Schedule crawls for never-run symbols: -```typescript -await handler.scheduleIntradayCrawls({ - limit: 50, - priorityMode: 'never_run', - targetOldestDate: '2020-01-01' -}); -``` - -## Next Steps - -1. Monitor the crawl progress using the provided scripts -2. Adjust batch sizes based on API rate limits -3. Consider adding more sophisticated retry logic for failed batches -4. Implement data validation to ensure quality \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/index.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/index.ts index 0105f30..454e848 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/index.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/index.ts @@ -5,13 +5,12 @@ export { scheduleEventsUpdates, updateEvents } from './events.action'; export { scheduleFilingsUpdates, updateFilings } from './filings.action'; export { scheduleFinancialsUpdates, updateFinancials } from './financials.action'; -export { scheduleIntradayUpdates, updateIntradayBars } from './intraday.action'; +export { scheduleInsidersUpdates, updateInsiders } from './insiders.action'; export { crawlIntradayData, scheduleIntradayCrawls } from './intraday-crawl.action'; +export { scheduleSymbolNewsUpdates, updateGeneralNews, updateSymbolNews } from './news.action'; export { schedulePriceUpdates, updatePrices } from './prices.action'; export { checkSessions, createSession } from './session.action'; +export { deduplicateSymbols, updateExchangeStats, updateExchangeStatsAndDeduplicate } from './symbol-dedup.action'; export { scheduleSymbolInfoUpdates, updateSymbolInfo } from './symbol-info.action'; export { searchSymbols, spiderSymbol } from './symbol.action'; -export { deduplicateSymbols, updateExchangeStats, updateExchangeStatsAndDeduplicate } from './symbol-dedup.action'; -export { scheduleInsidersUpdates, updateInsiders } from './insiders.action'; -export { scheduleSymbolNewsUpdates, updateSymbolNews, updateGeneralNews } from './news.action'; diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/intraday-crawl.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/intraday-crawl.action.ts index 30ff2af..c2e82de 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/intraday-crawl.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/intraday-crawl.action.ts @@ -5,7 +5,7 @@ import type { ExecutionContext } from '@stock-bot/handlers'; import type { CrawlState } from '../../../shared/operation-manager/types'; import type { QMHandler } from '../qm.handler'; -import { getWeekStart, QM_CONFIG, QM_SESSION_IDS } from '../shared/config'; +import { getLastWeek, QM_CONFIG, QM_SESSION_IDS } from '../shared/config'; import { QMSessionManager } from '../shared/session-manager'; interface IntradayCrawlInput { @@ -13,165 +13,13 @@ interface IntradayCrawlInput { exchange: string; qmSearchCode: string; targetOldestDate?: string; // ISO date string for how far back to crawl - batchSize?: number; // Days per batch + existingCrawlState?: CrawlState; // Pass existing crawl state to avoid re-querying + gaps?: { forward?: boolean; backward?: boolean }; // Pass gap information } -interface DateRange { - start: Date; - end: Date; - direction: 'forward' | 'backward'; -} /** - * Process a batch of intraday data for a date range - */ -export async function processIntradayBatch( - this: QMHandler, - input: { - symbol: string; - exchange: string; - qmSearchCode: string; - dateRange: DateRange; - }, - _context?: ExecutionContext -): Promise<{ - success: boolean; - recordsProcessed: number; - datesProcessed: number; - errors: string[]; -}> { - const { symbol, exchange, qmSearchCode, dateRange } = input; - console.log('Processing intraday batch for:', { symbol, exchange, qmSearchCode, dateRange }); - const errors: string[] = []; - let recordsProcessed = 0; - let datesProcessed = 0; - - const sessionManager = QMSessionManager.getInstance(); - await sessionManager.initialize(this.cache, this.logger); - - // Get a session - const sessionId = QM_SESSION_IDS.PRICES; // TODO: Update with correct session ID - const session = await sessionManager.getSession(sessionId); - - if (!session || !session.uuid) { - throw new Error(`No active session found for QM intraday`); - } - - // Process each date in the range - const currentWeek = getWeekStart(new Date(dateRange.start)); - const endDate = new Date(dateRange.end); - - while ( - (dateRange.direction === 'backward' && currentWeek >= endDate) || - (dateRange.direction === 'forward' && currentWeek <= endDate) - ) { - try { - // Skip weekends - if (currentWeek.getDay() === 0 || currentWeek.getDay() === 6) { - if (dateRange.direction === 'backward') { - currentWeek.setDate(currentWeek.getDate() - 1); - } else { - currentWeek.setDate(currentWeek.getDate() + 1); - } - continue; - } - getWeekStart(currentWeek); // Ensure we are at the start of the week - - // Build API request - const searchParams = new URLSearchParams({ - adjType:'none', - adjusted:'true', - freq:'day', - interval:'1', - marketSession:'mkt', - pathName:'/demo/portal/company-quotes.php', - qmodTool:'InteractiveChart', - start: currentWeek.toISOString().split('T')[0], - symbol: qmSearchCode, - unadjusted:'false', - webmasterId:'500', - zeroTradeDays:'false', - } as Record); - - console.log('Fetching intraday data for:', searchParams.toString()); - console.log(test) - const apiUrl = `${QM_CONFIG.PRICES_URL}?${searchParams.toString()}`; - - const response = await fetch(apiUrl, { - method: 'GET', - headers: session.headers, - proxy: session.proxy, - }); - //https://app.quotemedia.com/datatool/getEnhancedChartData.json?zeroTradeDays=false&start=2025-06-24&interval=1&marketSession=mkt&freq=day&adjusted=true&adjustmentType=none&unadjusted=false&datatype=int&symbol=X:CA - - if (!response.ok) { - throw new Error(`API request failed: ${response.status}`); - } - - const barsResults = await response.json(); - console.log('Bars results:', barsResults); - - const barsData = barsResults.results.intraday[0].interval || []; - - this.logger.info(`Fetched ${barsData.length} bars for ${qmSearchCode} on ${currentWeek.toISOString().split('T')[0]}`, { - qmSearchCode, - date: currentWeek.toISOString().split('T')[0], - records: barsData.length - }); - - // Update session success stats - await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid); - - // Process and store data if we got any - if (barsData && barsData.length > 0) { - const processedBars = barsData.map((bar: any) => ({ - ...bar, - qmSearchCode, - symbol, - exchange, - timestamp: new Date(bar.startdatetime), - })); - - await this.mongodb.batchUpsert( - 'qmIntraday', - processedBars, - ['qmSearchCode', 'timestamp'] - ); - - recordsProcessed += barsData.length; - } - - datesProcessed++; - - } catch (error) { - const errorMsg = `Failed to fetch ${qmSearchCode} for ${currentWeek.toISOString().split('T')[0]}: ${error}`; - errors.push(errorMsg); - this.logger.error(errorMsg); - - // Update session failure stats - if (session.uuid) { - await sessionManager.incrementFailedCalls(sessionId, session.uuid); - } - } - - // Move to next date - if (dateRange.direction === 'backward') { - currentWeek.setDate(currentWeek.getDate() - 1); - } else { - currentWeek.setDate(currentWeek.getDate() + 1); - } - } - - return { - success: errors.length === 0, - recordsProcessed, - datesProcessed, - errors - }; -} - -/** - * Main intraday crawl handler with sophisticated resumption logic + * Main intraday crawl handler - crawls backwards week by week (Sunday to Sunday) */ export async function crawlIntradayData( this: QMHandler, @@ -189,89 +37,69 @@ export async function crawlIntradayData( symbol, exchange, qmSearchCode, - targetOldestDate = '2020-01-01', // Default to ~5 years of data - batchSize = 7 // Process a week at a time + targetOldestDate = '1960-01-01', // Default to ~5 years of data + existingCrawlState, + gaps } = input; this.logger.info('Starting intraday crawl', { symbol, exchange, targetOldestDate, - batchSize + hasExistingState: !!existingCrawlState, + gaps }); + const sessionManager = QMSessionManager.getInstance(); + await sessionManager.initialize(this.cache, this.logger); + + const sessionId = QM_SESSION_IDS.PRICES; + const session = await sessionManager.getSession(sessionId); + + if (!session || !session.uuid) { + throw new Error(`No active session found for QM intraday`); + } + try { - // Get current crawl state - const symbolData = await this.mongodb.findOne('qmSymbols', { - qmSearchCode - }); + // Use passed crawl state if available, otherwise query for it + let currentCrawlState: CrawlState; + + if (existingCrawlState) { + currentCrawlState = existingCrawlState; + } else { + // Only query if not passed + const symbolData = await this.mongodb.findOne('qmSymbols', { + qmSearchCode + }); + + currentCrawlState = symbolData?.operations?.intraday_bars?.crawlState || { + finished: false + }; + } - const currentCrawlState: CrawlState = symbolData?.operations?.intraday_bars?.crawlState || { - finished: false - }; - - // Determine what needs to be processed - const today = new Date(); - today.setHours(0, 0, 0, 0); + // Determine starting point for crawl + let currentWeek: Date; + if (currentCrawlState.oldestDateReached && !currentCrawlState.finished) { + // Start from oldest date reached + currentWeek = new Date(currentCrawlState.oldestDateReached); + } else if (currentCrawlState.lastProcessedDate) { + // Resume from last processed date + currentWeek = new Date(currentCrawlState.lastProcessedDate); + } else { + // New crawl, start from today + currentWeek = new Date(); + // console.log('Current week before adjustment:', currentWeek); + } + + + // Get the Sunday of the current week + currentWeek = getLastWeek(currentWeek); const targetOldest = new Date(targetOldestDate); targetOldest.setHours(0, 0, 0, 0); - const ranges: DateRange[] = []; - - // 1. Check for forward gap (new data since last crawl) - if (currentCrawlState.newestDateReached) { - const newestDate = new Date(currentCrawlState.newestDateReached); - const daysSinceNewest = Math.floor((today.getTime() - newestDate.getTime()) / (1000 * 60 * 60 * 24)); - - if (daysSinceNewest > 1) { - // We have new data to fetch - const forwardStart = new Date(newestDate); - forwardStart.setDate(forwardStart.getDate() + 1); - - ranges.push({ - start: forwardStart, - end: today, - direction: 'forward' - }); - } - } else if (!currentCrawlState.oldestDateReached) { - // Never crawled, start from today - ranges.push({ - start: today, - end: today, - direction: 'forward' - }); - } - - // 2. Check for backward gap (historical data) - if (!currentCrawlState.finished) { - const startDate = currentCrawlState.lastProcessedDate - ? new Date(currentCrawlState.lastProcessedDate) - : currentCrawlState.oldestDateReached - ? new Date(currentCrawlState.oldestDateReached) - : today; - - if (startDate > targetOldest) { - // Calculate batch end date - const batchEnd = new Date(startDate); - batchEnd.setDate(batchEnd.getDate() - batchSize); - - // Don't go past target - if (batchEnd < targetOldest) { - batchEnd.setTime(targetOldest.getTime()); - } - - ranges.push({ - start: startDate, - end: batchEnd, - direction: 'backward' - }); - } - } - - if (ranges.length === 0) { - // Nothing to do + // Check if already finished + if (currentCrawlState.finished || currentWeek <= targetOldest) { this.logger.info('Intraday crawl already complete', { symbol }); return { success: true, @@ -282,98 +110,137 @@ export async function crawlIntradayData( }; } - // Process the ranges let totalRecords = 0; - let totalDates = 0; - const allErrors: string[] = []; + let weeksProcessed = 0; + const errors: string[] = []; + + const endOfWeek = new Date(currentWeek) + endOfWeek.setDate(endOfWeek.getDate() + 6); // Set to next Saturday + // Build API request for the week + const searchParams = new URLSearchParams({ + adjType:'none', + adjusted:'true', + freq:'day', + interval:'1', + marketSession:'mkt', + pathName:'/demo/portal/company-quotes.php', + qmodTool:'InteractiveChart', + start: currentWeek.toISOString().split('T')[0], + end: endOfWeek.toISOString().split('T')[0], // Next Sunday + symbol: qmSearchCode, + unadjusted:'false', + webmasterId:'500', + zeroTradeDays:'false', + } as Record); - for (const range of ranges) { - this.logger.info('Processing date range', { - symbol, - start: range.start.toISOString().split('T')[0], - end: range.end.toISOString().split('T')[0], - direction: range.direction + try { + const response = await fetch(`${QM_CONFIG.PRICES_URL}?${searchParams.toString()}`, { + method: 'GET', + headers: session.headers, + proxy: session.proxy, }); - const result = await processIntradayBatch.call(this, { + if (!response.ok) { + throw new Error(`API request failed: ${response.status}`); + } + + const barsResults = await response.json(); + + // Parse the results based on the API response structure + const barsData = barsResults.results?.intraday?.[0]?.interval || null; + const barsLength = barsData !== null ? barsData.length : 0; + this.logger.info(`Fetched ${barsLength} bars for ${qmSearchCode} for week of ${currentWeek}`, { + qmSearchCode, + currentWeek, + records: barsLength + }); + + // Update session success stats + await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid); + + // Process and store data if we got any + if (barsData !== null && barsLength > 0) { + const processedBars = barsData.map((bar: any) => ({ + ...bar, + qmSearchCode, + symbol, + exchange, + timestamp: new Date(bar.startdatetime || bar.date || bar.datetime), + })); + + await this.mongodb.batchUpsert( + 'qmIntraday', + processedBars, + ['qmSearchCode', 'timestamp'] + ); + + totalRecords += barsLength; + } + + weeksProcessed = 1; + const nextWeek = getLastWeek(new Date(currentWeek)); + const finished = (barsData === null) && nextWeek < new Date('2024-01-01') + this.operationRegistry.updateOperation('qm', qmSearchCode, 'intraday_bars', { + status: 'success', + crawlState: { + finished: finished, + oldestDateReached: currentCrawlState.oldestDateReached, + lastProcessedDate: currentWeek + } + }); + + // Calculate next week to process (previous Sunday) + + // console.log(barsData === null , existingCrawlState); + if(!finished) { + currentCrawlState.lastProcessedDate = nextWeek; + await this.scheduleOperation('crawl-intraday-data', { + symbol: symbol, + exchange: exchange, + qmSearchCode: qmSearchCode, + targetOldestDate, + // Pass existing crawl state and gaps to avoid re-querying + existingCrawlState: currentCrawlState, + gaps: gaps + }, { + priority: 4, // Standard priority for backward crawls + }); + } + + return { + success: true, symbol, exchange, qmSearchCode, - dateRange: range + message: `Intraday crawl completed for ${symbol} - Processed ${weeksProcessed} week(s), ${totalRecords} records`, + data: { + totalRecords, + weeksProcessed, + nextWeek: nextWeek.toISOString().split('T')[0], + errors: errors.length > 0 ? errors : undefined + } + } + + } catch (error) { + this.logger.error('Intraday crawl failed', { + symbol, + error: error instanceof Error ? error.message : 'Unknown error' }); - totalRecords += result.recordsProcessed; - totalDates += result.datesProcessed; - allErrors.push(...result.errors); - - // Update crawl state after each batch - const updatedCrawlState: Partial = { - lastProcessedDate: range.end, - lastCrawlDirection: range.direction, - totalDaysProcessed: (currentCrawlState.totalDaysProcessed || 0) + result.datesProcessed - }; - - if (range.direction === 'forward') { - updatedCrawlState.newestDateReached = range.end; - if (!currentCrawlState.oldestDateReached) { - updatedCrawlState.oldestDateReached = range.start; - } - } else { - updatedCrawlState.oldestDateReached = range.end; - if (!currentCrawlState.newestDateReached) { - updatedCrawlState.newestDateReached = range.start; - } - } - - // Check if we've completed the crawl - if (range.direction === 'backward' && range.end <= targetOldest) { - updatedCrawlState.finished = true; - updatedCrawlState.targetOldestDate = targetOldest; - } - await this.operationRegistry.updateOperation('qm', qmSearchCode, 'intraday_bars', { - status: allErrors.length > 0 ? 'partial' : 'success', - lastRecordDate: today, - recordCount: totalRecords, - crawlState: updatedCrawlState, - error: allErrors.length > 0 ? allErrors.join('; ') : undefined + status: 'failure', + error: error instanceof Error ? error.message : 'Unknown error' }); + + return { + success: false, + symbol, + exchange, + qmSearchCode, + message: `Intraday crawl failed: ${error instanceof Error ? error.message : 'Unknown error'}` + }; } - - const message = `Processed ${totalDates} days, ${totalRecords} records for ${symbol}`; - this.logger.info('Intraday crawl batch completed', { - symbol, - totalDates, - totalRecords, - errors: allErrors.length, - finished: ranges.some(r => r.direction === 'backward' && r.end <= targetOldest) - }); - - return { - success: allErrors.length === 0, - symbol, - exchange, - qmSearchCode, - message, - data: { - datesProcessed: totalDates, - recordsProcessed: totalRecords, - errors: allErrors, - crawlComplete: ranges.some(r => r.direction === 'backward' && r.end <= targetOldest) - } - }; - - } catch (error) { - this.logger.error('Intraday crawl failed', { - symbol, - error: error instanceof Error ? error.message : 'Unknown error' - }); - - await this.operationRegistry.updateOperation('qm', qmSearchCode, 'intraday_bars', { - status: 'failure', - error: error instanceof Error ? error.message : 'Unknown error' - }); - + }catch (error) { return { success: false, symbol, @@ -392,7 +259,7 @@ export async function scheduleIntradayCrawls( input: { limit?: number; targetOldestDate?: string; - priorityMode?: 'never_run' | 'incomplete' | 'stale' | 'all'; + priorityMode?: 'never_run' | 'incomplete' | 'all'; } = {}, _context?: ExecutionContext ): Promise<{ @@ -401,7 +268,7 @@ export async function scheduleIntradayCrawls( errors: number; }> { const { - limit = 1, + limit = 999999999, targetOldestDate = '1960-01-01', priorityMode = 'all' } = input; @@ -433,25 +300,27 @@ export async function scheduleIntradayCrawls( // Get symbols with incomplete crawls symbolsToProcess = await tracker.getSymbolsForIntradayCrawl( 'intraday_bars', - { limit, targetOldestDate: new Date(targetOldestDate), includeNewDataGaps: false } + { + limit, + targetOldestDate: new Date(targetOldestDate), + includeNewDataGaps: false, // Only backward gaps + // symbolFilter: { symbol: 'ETU' } // Filter for testing + } ); break; - case 'stale': - // Get symbols that need updates (new data) - symbolsToProcess = await tracker.getSymbolsForIntradayCrawl( - 'intraday_bars', - { limit, targetOldestDate: new Date(targetOldestDate), includeNewDataGaps: true } - ); - symbolsToProcess = symbolsToProcess.filter(s => s.gaps?.forward); - break; case 'all': default: // Get all symbols that need any processing symbolsToProcess = await tracker.getSymbolsForIntradayCrawl( 'intraday_bars', - { limit, targetOldestDate: new Date(targetOldestDate) } + { + limit, + targetOldestDate: new Date(targetOldestDate), + includeNewDataGaps: false, // Only backward gaps since we removed forward crawling + // symbolFilter: { symbol: 'AAPL' } // Filter for testing + } ); break; } @@ -463,7 +332,16 @@ export async function scheduleIntradayCrawls( errors: 0 }; } - symbolsToProcess = [{symbol: 'X:CA'}] + + this.logger.info('Symbols to process from getSymbolsForIntradayCrawl', { + count: symbolsToProcess.length, + firstSymbol: symbolsToProcess[0] ? { + symbol: symbolsToProcess[0].symbol, + gaps: symbolsToProcess[0].gaps, + hasOperationStatus: !!symbolsToProcess[0].operationStatus, + crawlState: symbolsToProcess[0].operationStatus?.crawlState + } : null + }); // Get full symbol data if needed if (priorityMode !== 'never_run') { @@ -474,13 +352,32 @@ export async function scheduleIntradayCrawls( projection: { symbol: 1, exchange: 1, qmSearchCode: 1, operations: 1 } }); - // Map back the full data + // Map back the full data while preserving gaps and operation status symbolsToProcess = symbolsToProcess.map(sp => { const full = fullSymbols.find(f => f.qmSearchCode === sp.symbol); - return full || sp; + if (full) { + return { + ...full, + gaps: sp.gaps, // Preserve gap information + operationStatus: sp.operationStatus // Preserve original operation status + }; + } + return sp; }); } + this.logger.info('After mapping, symbols to process', { + count: symbolsToProcess.length, + firstSymbol: symbolsToProcess[0] ? { + symbol: symbolsToProcess[0].symbol, + exchange: symbolsToProcess[0].exchange, + qmSearchCode: symbolsToProcess[0].qmSearchCode, + gaps: symbolsToProcess[0].gaps, + hasOperationStatus: !!symbolsToProcess[0].operationStatus, + crawlState: symbolsToProcess[0].operationStatus?.crawlState + } : null + }); + let symbolsQueued = 0; let errors = 0; @@ -492,9 +389,12 @@ export async function scheduleIntradayCrawls( symbol: doc.symbol, exchange: doc.exchange, qmSearchCode: doc.qmSearchCode, - targetOldestDate + targetOldestDate, + // Pass existing crawl state and gaps to avoid re-querying + existingCrawlState: doc.operationStatus?.crawlState, + gaps: doc.gaps }, { - priority: priorityMode === 'stale' ? 9 : 5, // Higher priority for updates + priority: 6, // Standard priority for backward crawls }); symbolsQueued++; diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/intraday.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/intraday.action.ts deleted file mode 100644 index a984866..0000000 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/intraday.action.ts +++ /dev/null @@ -1,302 +0,0 @@ -/** - * QM Intraday Actions - Fetch and update intraday price bars - */ - -import type { ExecutionContext } from '@stock-bot/handlers'; -import type { QMHandler } from '../qm.handler'; -import { QM_CONFIG, QM_SESSION_IDS } from '../shared/config'; -import { QMSessionManager } from '../shared/session-manager'; - -/** - * Update intraday bars for a single symbol - * This handles both initial crawl and incremental updates - */ -export async function updateIntradayBars( - this: QMHandler, - input: { - symbol: string; - symbolId: number; - qmSearchCode: string; - crawlDate?: string; // ISO date string for specific date crawl - }, - _context?: ExecutionContext -): Promise<{ - success: boolean; - symbol: string; - message: string; - data?: any; -}> { - const { symbol, symbolId, qmSearchCode, crawlDate } = input; - - this.logger.info('Fetching intraday bars', { symbol, symbolId, crawlDate }); - - const sessionManager = QMSessionManager.getInstance(); - await sessionManager.initialize(this.cache, this.logger); - - // Get a session - you'll need to add the appropriate session ID for intraday - const sessionId = QM_SESSION_IDS.LOOKUP; // TODO: Update with correct session ID - const session = await sessionManager.getSession(sessionId); - - if (!session || !session.uuid) { - throw new Error(`No active session found for QM intraday`); - } - - try { - // Determine the date to fetch - const targetDate = crawlDate ? new Date(crawlDate) : new Date(); - - // Build API request for intraday bars - const searchParams = new URLSearchParams({ - symbol: symbol, - symbolId: symbolId.toString(), - qmodTool: 'IntradayBars', - webmasterId: '500', - date: targetDate.toISOString().split('T')[0], - interval: '1' // 1-minute bars - } as Record); - - // TODO: Update with correct intraday endpoint - const apiUrl = `${QM_CONFIG.BASE_URL}/datatool/intraday.json?${searchParams.toString()}`; - - const response = await fetch(apiUrl, { - method: 'GET', - headers: session.headers, - proxy: session.proxy, - }); - - if (!response.ok) { - throw new Error(`QM API request failed: ${response.status} ${response.statusText}`); - } - - const barsData = await response.json(); - - // Update session success stats - await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid); - - // Process and store intraday data - if (barsData && barsData.length > 0) { - // Store bars in a separate collection - const processedBars = barsData.map((bar: any) => ({ - ...bar, - symbol, - symbolId, - timestamp: new Date(bar.timestamp), - date: targetDate, - updated_at: new Date() - })); - - await this.mongodb.batchUpsert( - 'qmIntradayBars', - processedBars, - ['symbol', 'timestamp'] // Unique keys - ); - - // Update operation tracking - await this.operationRegistry.updateOperation('qm', qmSearchCode, 'intraday_bars', { - status: 'success', - lastRecordDate: targetDate, - recordCount: barsData.length - }); - - this.logger.info('Intraday bars updated successfully', { - symbol, - date: targetDate, - barCount: barsData.length - }); - - return { - success: true, - symbol, - message: `Intraday bars updated for ${symbol} on ${targetDate.toISOString().split('T')[0]}`, - data: { - count: barsData.length, - date: targetDate - } - }; - } else { - // No data for this date (weekend, holiday, or no trading) - this.logger.info('No intraday data for date', { symbol, date: targetDate }); - - // Still update operation tracking as successful (no data is a valid result) - await this.operationRegistry.updateOperation('qm', qmSearchCode, 'intraday_bars', { - status: 'success', - lastRecordDate: targetDate, - recordCount: 0 - }); - - return { - success: true, - symbol, - message: `No intraday data for ${symbol} on ${targetDate.toISOString().split('T')[0]}`, - data: { - count: 0, - date: targetDate - } - }; - } - - } catch (error) { - // Update session failure stats - if (session.uuid) { - await sessionManager.incrementFailedCalls(sessionId, session.uuid); - } - - this.logger.error('Error fetching intraday bars', { - symbol, - error: error instanceof Error ? error.message : 'Unknown error' - }); - - // Update operation tracking for failure - await this.operationRegistry.updateOperation('qm', qmSearchCode, 'intraday_bars', { - status: 'failure' - }); - - return { - success: false, - symbol, - message: `Failed to fetch intraday bars: ${error instanceof Error ? error.message : 'Unknown error'}` - }; - } -} - -/** - * Schedule intraday updates for symbols - * This handles both initial crawls and regular updates - */ -export async function scheduleIntradayUpdates( - this: QMHandler, - input: { - limit?: number; - mode?: 'crawl' | 'update'; // crawl for historical, update for recent - forceUpdate?: boolean; - } = {}, - _context?: ExecutionContext -): Promise<{ - message: string; - symbolsQueued: number; - jobsQueued: number; - errors: number; -}> { - const { limit = 50, mode = 'update', forceUpdate = false } = input; - - this.logger.info('Scheduling intraday updates', { limit, mode, forceUpdate }); - - try { - let symbolsToProcess: any[] = []; - - if (mode === 'crawl') { - // Get symbols that need historical crawl - symbolsToProcess = await this.operationRegistry.getSymbolsForCrawl('qm', 'intraday_bars', { - limit - }); - } else { - // Get symbols that need regular updates - const staleSymbols = await this.operationRegistry.getStaleSymbols('qm', 'intraday_bars', { - minHoursSinceRun: forceUpdate ? 0 : 1, // Hourly updates - limit - }); - - if (staleSymbols.length === 0) { - this.logger.info('No symbols need intraday updates'); - return { - message: 'No symbols need intraday updates', - symbolsQueued: 0, - jobsQueued: 0, - errors: 0 - }; - } - - // Get full symbol data - symbolsToProcess = await this.mongodb.find('qmSymbols', { - qmSearchCode: { $in: staleSymbols } - }, { - projection: { symbol: 1, symbolId: 1, qmSearchCode: 1 } - }); - } - - if (symbolsToProcess.length === 0) { - this.logger.info('No symbols to process for intraday'); - return { - message: 'No symbols to process', - symbolsQueued: 0, - jobsQueued: 0, - errors: 0 - }; - } - - this.logger.info(`Found ${symbolsToProcess.length} symbols for intraday ${mode}`); - - let symbolsQueued = 0; - let jobsQueued = 0; - let errors = 0; - - // Process each symbol - for (const doc of symbolsToProcess) { - try { - if (!doc.symbolId) { - this.logger.warn(`Symbol ${doc.symbol} missing symbolId, skipping`); - continue; - } - - if (mode === 'crawl' && doc.crawlState) { - // For crawl mode, schedule multiple days going backwards - const startDate = doc.crawlState.oldestDateReached || new Date(); - const daysToFetch = 30; // Fetch 30 days at a time - - for (let i = 0; i < daysToFetch; i++) { - const crawlDate = new Date(startDate); - crawlDate.setDate(crawlDate.getDate() - i); - - await this.scheduleOperation('update-intraday-bars', { - symbol: doc.symbol, - symbolId: doc.symbolId, - qmSearchCode: doc.qmSearchCode, - crawlDate: crawlDate.toISOString() - }, { - priority: 6, - delay: jobsQueued * 1000 // 1 second between jobs - }); - - jobsQueued++; - } - - // Note: Crawl state will be updated when the actual jobs run - } else { - // For update mode, just fetch today's data - await this.scheduleOperation('update-intraday-bars', { - symbol: doc.symbol, - symbolId: doc.symbolId, - qmSearchCode: doc.qmSearchCode - }, { - priority: 8, // High priority for current data - delay: jobsQueued * 500 // 0.5 seconds between jobs - }); - - jobsQueued++; - } - - symbolsQueued++; - } catch (error) { - this.logger.error(`Failed to schedule intraday update for ${doc.symbol}`, { error }); - errors++; - } - } - - this.logger.info('Intraday update scheduling completed', { - symbolsQueued, - jobsQueued, - errors, - mode - }); - - return { - message: `Scheduled intraday ${mode} for ${symbolsQueued} symbols (${jobsQueued} jobs)`, - symbolsQueued, - jobsQueued, - errors - }; - } catch (error) { - this.logger.error('Intraday scheduling failed', { error }); - throw error; - } -} \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/session.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/session.action.ts index f53a1df..478e418 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/session.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/session.action.ts @@ -44,6 +44,7 @@ export async function checkSessions( for (let i = 0; i < toQueue; i++) { await this.scheduleOperation('create-session', { sessionId, sessionType }, { + priority: 0, // delay: i * 2000, // Stagger creation by 2 seconds }); queuedCount++; @@ -107,7 +108,7 @@ export async function createSession( // Build request options const sessionRequest = { proxy: proxyUrl || undefined, - headers: getQmHeaders(sessionType), + headers: getQmHeaders(), }; this.logger.debug('Authenticating with QM API', { sessionUrl, sessionRequest }); diff --git a/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts b/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts index 6196490..213667c 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts @@ -14,7 +14,6 @@ import { scheduleEventsUpdates, scheduleFinancialsUpdates, scheduleInsidersUpdates, - scheduleIntradayUpdates, schedulePriceUpdates, scheduleSymbolInfoUpdates, scheduleSymbolNewsUpdates, @@ -26,7 +25,6 @@ import { updateFinancials, updateGeneralNews, updateInsiders, - updateIntradayBars, updatePrices, updateSymbolInfo, updateSymbolNews @@ -156,32 +154,20 @@ export class QMHandler extends BaseHandler { /** * INTRADAY DATA */ - @Operation('update-intraday-bars') - updateIntradayBars = updateIntradayBars; - @Operation('crawl-intraday-data') crawlIntradayData = crawlIntradayData; @Operation('schedule-intraday-crawls') scheduleIntradayCrawls = scheduleIntradayCrawls; - @Disabled() - @ScheduledOperation('schedule-intraday-updates', '*/30 * * * *', { - priority: 9, - immediately: false, - description: 'Check for symbols needing intraday updates every 30 minutes' - }) - scheduleIntradayUpdates = scheduleIntradayUpdates; - // @Disabled() - @ScheduledOperation('schedule-intraday-crawls-batch', '0 */12 * * *', { + @ScheduledOperation('schedule-intraday-crawls-batch', '0 0 * * 0', { priority: 5, immediately: false, description: 'Schedule intraday crawls for incomplete symbols every 12 hours' }) scheduleIntradayCrawlsBatch = async () => { return scheduleIntradayCrawls.call(this, { - limit: 25, priorityMode: 'incomplete' }); }; diff --git a/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts b/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts index dc4cc8c..93c7b29 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts @@ -37,6 +37,7 @@ export const QM_CONFIG = { LOOKUP_URL: 'https://app.quotemedia.com/datatool/lookup.json', SYMBOL_URL: 'https://app.quotemedia.com/datatool/getProfiles.json', PRICES_URL: 'https://app.quotemedia.com/datatool/getEnhancedChartData.json', + INTRADAY_URL: 'https://app.quotemedia.com/datatool/getEnhancedChartData.json', EVENTS_URL: 'https://app.quotemedia.com/datatool/getIndicatorsBySymbol.json', FINANCIALS_URL: 'https://app.quotemedia.com/datatool/getFinancialsEnhancedBySymbol.json', FILING_URL: 'https://app.quotemedia.com/datatool/getCompanyFilings.json', @@ -44,13 +45,13 @@ export const QM_CONFIG = { // Session management settings export const SESSION_CONFIG = { - MAX_SESSIONS: 5, + MAX_SESSIONS: 100, MAX_FAILED_CALLS: 5, SESSION_TIMEOUT: 5000, // 10 seconds API_TIMEOUT: 30000, // 15 seconds } as const; -export function getQmHeaders(type?: string): Record { +export function getQmHeaders(): Record { // if(type?.toUpperCase() === 'FILINGS') { // return { // 'User-Agent': getRandomUserAgent(), @@ -100,6 +101,23 @@ export function getWeekStart(dateInput: Date | string): Date { return date; } +export function getLastWeek(dateInput: Date | string): Date { + // Handle string input properly + let date: Date; + if (typeof dateInput === 'string') { + date = parseLocalDate(dateInput); + } else { + // Create new date with local time components + date = new Date(dateInput.getFullYear(), dateInput.getMonth(), dateInput.getDate()); + } + + // Subtract 7 days + date.setDate(date.getDate() - 7); + + date.setHours(0, 0, 0, 0); + return date; +} + // Get end of week (Sunday) export function getWeekEnd(dateInput: Date | string): Date { let date: Date; diff --git a/apps/stock/data-ingestion/src/shared/operation-manager/OperationTracker.ts b/apps/stock/data-ingestion/src/shared/operation-manager/OperationTracker.ts index 8b71596..af3cbbf 100644 --- a/apps/stock/data-ingestion/src/shared/operation-manager/OperationTracker.ts +++ b/apps/stock/data-ingestion/src/shared/operation-manager/OperationTracker.ts @@ -317,7 +317,8 @@ export class OperationTracker { const { limit = 1000, excludeSymbols = [], - activeOnly = true + activeOnly = true, + symbolFilter } = options; this.provider.validateOperation(operationName); @@ -345,6 +346,11 @@ export class OperationTracker { filter[symbolField] = { $nin: excludeSymbols }; } + // Add symbol filter if provided + if (symbolFilter?.symbol) { + filter.symbol = symbolFilter.symbol; + } + const symbols = await this.mongodb.find(collectionName, filter, { limit, projection: { [symbolField]: 1 }, @@ -431,10 +437,11 @@ export class OperationTracker { limit?: number; targetOldestDate?: Date; includeNewDataGaps?: boolean; + symbolFilter?: { symbol?: string }; } = {} ): Promise> { const { collectionName, symbolField } = this.provider.getProviderConfig(); - const { limit = 100, targetOldestDate, includeNewDataGaps = true } = options; + const { limit = 100, targetOldestDate, includeNewDataGaps = true, symbolFilter } = options; this.provider.validateOperation(operationName); @@ -443,6 +450,11 @@ export class OperationTracker { active: { $ne: false } }; + // Add symbol filter if provided + if (symbolFilter?.symbol) { + filter.symbol = symbolFilter.symbol; + } + // Get all symbols that either: // 1. Have never been crawled // 2. Are not finished @@ -468,7 +480,11 @@ export class OperationTracker { limit, projection: { [symbolField]: 1, - [`operations.${operationName}`]: 1 + [`operations.${operationName}`]: 1, + // Include common fields that might be needed + symbol: 1, + exchange: 1, + qmSearchCode: 1 }, sort: { [`operations.${operationName}.lastRunAt`]: 1 @@ -479,19 +495,11 @@ export class OperationTracker { const opStatus = doc.operations?.[operationName]; const crawlState = opStatus?.crawlState; - // Determine gaps + // Determine gaps (only backward since we removed forward crawling) const gaps: { forward?: boolean; backward?: boolean } = {}; if (crawlState) { - // Check for forward gap (new data) - if (crawlState.newestDateReached) { - const daysSinceNewest = Math.floor( - (Date.now() - new Date(crawlState.newestDateReached).getTime()) / (1000 * 60 * 60 * 24) - ); - gaps.forward = daysSinceNewest > 1; - } - - // Check for backward gap (historical data) + // Only check for backward gap (historical data) if (!crawlState.finished) { gaps.backward = true; if (targetOldestDate && crawlState.oldestDateReached) { @@ -499,8 +507,7 @@ export class OperationTracker { } } } else { - // Never crawled, has both gaps - gaps.forward = true; + // Never crawled, needs backward crawl gaps.backward = true; } @@ -508,8 +515,11 @@ export class OperationTracker { symbol: doc[symbolField], lastRecordDate: opStatus?.lastRecordDate, operationStatus: opStatus, - gaps - }; + gaps, + // Include other potentially useful fields + exchange: doc.exchange, + qmSearchCode: doc.qmSearchCode || doc[symbolField] + } as SymbolWithOperations & { gaps?: { forward?: boolean; backward?: boolean } }; }); } diff --git a/apps/stock/data-ingestion/src/shared/operation-manager/types.ts b/apps/stock/data-ingestion/src/shared/operation-manager/types.ts index 978e53f..d384538 100644 --- a/apps/stock/data-ingestion/src/shared/operation-manager/types.ts +++ b/apps/stock/data-ingestion/src/shared/operation-manager/types.ts @@ -114,6 +114,8 @@ export interface StaleSymbolOptions { excludeSymbols?: string[]; /** Only include active symbols */ activeOnly?: boolean; + /** Filter for specific symbols */ + symbolFilter?: { symbol?: string }; } /** diff --git a/apps/stock/data-ingestion/test/test-intraday-flow.ts b/apps/stock/data-ingestion/test/test-intraday-flow.ts new file mode 100644 index 0000000..2eb1b7a --- /dev/null +++ b/apps/stock/data-ingestion/test/test-intraday-flow.ts @@ -0,0 +1,156 @@ +/** + * Test script to verify intraday crawl data flow + */ + +import { OperationTracker, OperationRegistry } from '../src/shared/operation-manager'; +import type { DataIngestionServices } from '../src/types'; + +async function testIntradayDataFlow() { + console.log('=== Testing Intraday Data Flow ===\n'); + + // Mock services + const mockServices: DataIngestionServices = { + mongodb: { + collection: (name: string) => ({ + find: () => ({ + toArray: async () => { + console.log(`Mock: Query collection ${name}`); + if (name === 'qmSymbols') { + return [{ + symbol: 'X', + symbolId: 123456, + qmSearchCode: 'X:NYSE', + exchange: 'NYSE', + operations: { + intraday_bars: { + lastRunAt: new Date('2024-01-01'), + lastSuccessAt: new Date('2024-01-01'), + status: 'success', + crawlState: { + finished: false, + oldestDateReached: new Date('2023-01-01'), + newestDateReached: new Date('2024-01-01'), + lastProcessedDate: new Date('2023-06-15'), + totalDaysProcessed: 180 + } + } + } + }]; + } + return []; + } + }), + createIndex: async () => ({ ok: 1 }) + }), + find: async (collection: string, filter: any, options?: any) => { + console.log(`Mock: Direct find on ${collection}`, { filter, options }); + return [{ + symbol: 'X', + qmSearchCode: 'X:NYSE', + exchange: 'NYSE', + operations: { + intraday_bars: { + crawlState: { + finished: false, + oldestDateReached: new Date('2023-01-01'), + newestDateReached: new Date('2024-01-01') + } + } + } + }]; + } + } as any, + logger: { + info: (msg: string, data?: any) => console.log(`[INFO] ${msg}`, JSON.stringify(data, null, 2)), + error: (msg: string, data?: any) => console.error(`[ERROR] ${msg}`, data || ''), + warn: (msg: string, data?: any) => console.warn(`[WARN] ${msg}`, data || ''), + debug: (msg: string, data?: any) => console.debug(`[DEBUG] ${msg}`, data || '') + } + } as DataIngestionServices; + + // Create registry and provider + const registry = new OperationRegistry(mockServices as any); + + // Mock provider + const mockProvider = { + getProviderConfig: () => ({ + name: 'qm', + collectionName: 'qmSymbols', + symbolField: 'qmSearchCode' + }), + getOperations: () => [{ + name: 'intraday_bars', + type: 'intraday_crawl' as const, + defaultStaleHours: 1 + }], + validateOperation: () => true, + getOperation: () => ({ name: 'intraday_bars', type: 'intraday_crawl' }), + getDefaultStaleHours: () => 1, + initialize: async () => {}, + beforeOperationUpdate: async () => {}, + afterOperationUpdate: async () => {} + }; + + const tracker = new OperationTracker(mockServices as any, mockProvider as any); + await tracker.initialize(); + + // Test 1: Get symbols for intraday crawl + console.log('\nTest 1: Get symbols for intraday crawl with symbol filter'); + const symbols = await tracker.getSymbolsForIntradayCrawl('intraday_bars', { + limit: 10, + targetOldestDate: new Date('2020-01-01'), + includeNewDataGaps: true, + symbolFilter: { symbol: 'X' } + }); + + console.log(`\nFound ${symbols.length} symbols:`); + symbols.forEach(sym => { + console.log({ + symbol: sym.symbol, + qmSearchCode: sym.qmSearchCode, + exchange: sym.exchange, + gaps: sym.gaps, + crawlState: sym.operationStatus?.crawlState + }); + }); + + // Test 2: Verify data preservation + console.log('\n\nTest 2: Verify data is preserved through mapping'); + const testSymbol = symbols[0]; + if (testSymbol) { + console.log('Original data:'); + console.log({ + symbol: testSymbol.symbol, + hasGaps: !!testSymbol.gaps, + gaps: testSymbol.gaps, + hasOperationStatus: !!testSymbol.operationStatus, + crawlState: testSymbol.operationStatus?.crawlState + }); + + // Simulate mapping + const mapped = { + ...testSymbol, + symbol: 'X', + exchange: 'NYSE', + qmSearchCode: 'X:NYSE', + gaps: testSymbol.gaps, + operationStatus: testSymbol.operationStatus + }; + + console.log('\nAfter mapping:'); + console.log({ + symbol: mapped.symbol, + exchange: mapped.exchange, + qmSearchCode: mapped.qmSearchCode, + hasGaps: !!mapped.gaps, + gaps: mapped.gaps, + hasOperationStatus: !!mapped.operationStatus, + crawlState: mapped.operationStatus?.crawlState + }); + } + + console.log('\n=== Tests Complete ==='); +} + +// Run tests +testIntradayDataFlow().catch(console.error); \ No newline at end of file