diff --git a/apps/stock/data-ingestion/docs/INTRADAY_CRAWL.md b/apps/stock/data-ingestion/docs/INTRADAY_CRAWL.md new file mode 100644 index 0000000..652b271 --- /dev/null +++ b/apps/stock/data-ingestion/docs/INTRADAY_CRAWL.md @@ -0,0 +1,122 @@ +# Intraday Crawl System + +## Overview + +The intraday crawl system is designed to handle large-scale historical data collection with proper resumption support. It tracks the oldest and newest dates reached, allowing it to resume from where it left off if interrupted. + +## Key Features + +1. **Bidirectional Crawling**: Can crawl both forward (for new data) and backward (for historical data) +2. **Resumption Support**: Tracks progress and can resume from where it left off +3. **Gap Detection**: Automatically detects gaps in data coverage +4. **Batch Processing**: Processes data in configurable batches (default: 7 days) +5. **Completion Tracking**: Knows when a symbol's full history has been fetched + +## Crawl State Fields + +The system tracks the following state for each symbol: + +```typescript +interface CrawlState { + finished: boolean; // Whether crawl is complete + oldestDateReached?: Date; // Oldest date we've fetched + newestDateReached?: Date; // Newest date we've fetched + lastProcessedDate?: Date; // Last date processed (for resumption) + totalDaysProcessed?: number; // Total days processed so far + lastCrawlDirection?: 'forward' | 'backward'; + targetOldestDate?: Date; // Target date to reach +} +``` + +## How It Works + +### Initial Crawl +1. Starts from today and fetches current data +2. Then begins crawling backward in weekly batches +3. Continues until it reaches the target oldest date (default: 2020-01-01) +4. Marks as finished when complete + +### Resumption After Interruption +1. Checks for forward gap: If `newestDateReached < yesterday`, fetches new data first +2. Checks for backward gap: If not finished and `oldestDateReached > targetOldestDate`, continues backward crawl +3. Resumes from `lastProcessedDate` to avoid re-fetching data + +### Daily Updates +Once a symbol is fully crawled: +- Only needs to fetch new data (forward crawl) +- Much faster as it's typically just 1-2 days of data + +## Usage + +### Manual Crawl for Single Symbol +```typescript +await handler.crawlIntradayData({ + symbol: 'AAPL', + symbolId: 12345, + qmSearchCode: 'AAPL', + targetOldestDate: '2020-01-01', + batchSize: 7 // Days per batch +}); +``` + +### Schedule Crawls for Multiple Symbols +```typescript +await handler.scheduleIntradayCrawls({ + limit: 50, + targetOldestDate: '2020-01-01', + priorityMode: 'incomplete' // or 'never_run', 'stale', 'all' +}); +``` + +### Check Crawl Status +```typescript +const tracker = handler.operationRegistry.getTracker('qm'); +const isComplete = await tracker.isIntradayCrawlComplete('AAPL', 'intraday_bars', new Date('2020-01-01')); +``` + +### Get Symbols Needing Crawl +```typescript +const symbols = await tracker.getSymbolsForIntradayCrawl('intraday_bars', { + limit: 100, + targetOldestDate: new Date('2020-01-01'), + includeNewDataGaps: true // Include symbols needing updates +}); +``` + +## Priority Modes + +- **never_run**: Symbols that have never been crawled (highest priority) +- **incomplete**: Symbols with unfinished crawls +- **stale**: Symbols with complete crawls but new data available +- **all**: All symbols needing any processing + +## Scheduled Operations + +The system includes scheduled operations: +- `schedule-intraday-crawls-batch`: Runs every 4 hours, processes incomplete crawls + +## Monitoring + +Use the provided scripts to monitor crawl progress: + +```bash +# Check overall status +bun run scripts/check-intraday-status.ts + +# Test crawl for specific symbol +bun run test/intraday-crawl.test.ts +``` + +## Performance Considerations + +1. **Rate Limiting**: Delays between API calls to avoid rate limits +2. **Weekend Skipping**: Automatically skips weekends to save API calls +3. **Batch Size**: Configurable batch size (default 7 days) balances progress vs memory +4. **Priority Scheduling**: Higher priority for current data updates + +## Error Handling + +- Failed batches don't stop the entire crawl +- Errors are logged and stored in the operation status +- Partial success is tracked separately from complete failure +- Session failures trigger automatic session rotation \ No newline at end of file diff --git a/apps/stock/data-ingestion/docs/OPERATION_TRACKER_ENHANCEMENTS.md b/apps/stock/data-ingestion/docs/OPERATION_TRACKER_ENHANCEMENTS.md new file mode 100644 index 0000000..12f4b26 --- /dev/null +++ b/apps/stock/data-ingestion/docs/OPERATION_TRACKER_ENHANCEMENTS.md @@ -0,0 +1,107 @@ +# Operation Tracker Enhancements for Intraday Crawling + +## Summary of Changes + +This document summarizes the enhancements made to the operation tracker to support sophisticated intraday data crawling with resumption capabilities. + +## Changes Made + +### 1. Enhanced CrawlState Interface (`types.ts`) +Added new fields to track crawl progress: +- `newestDateReached`: Track the most recent date processed +- `lastProcessedDate`: For resumption after interruption +- `totalDaysProcessed`: Progress tracking +- `targetOldestDate`: The goal date to reach + +### 2. Updated OperationTracker (`OperationTracker.ts`) +- Modified `updateSymbolOperation` to handle new crawl state fields +- Updated `bulkUpdateSymbolOperations` for proper Date handling +- Enhanced `markCrawlFinished` to track both oldest and newest dates +- Added `getSymbolsForIntradayCrawl`: Specialized method for intraday crawls with gap detection +- Added `isIntradayCrawlComplete`: Check if a crawl has reached its target +- Added new indexes for efficient querying on crawl state fields + +### 3. New Intraday Crawl Action (`intraday-crawl.action.ts`) +Created a sophisticated crawl system with: +- **Bidirectional crawling**: Handles both forward (new data) and backward (historical) gaps +- **Batch processing**: Processes data in weekly batches by default +- **Resumption logic**: Can resume from where it left off if interrupted +- **Gap detection**: Automatically identifies missing date ranges +- **Completion tracking**: Knows when the full history has been fetched + +### 4. Integration with QM Handler +- Added new operations: `crawl-intraday-data` and `schedule-intraday-crawls` +- Added scheduled operation to automatically process incomplete crawls every 4 hours +- Integrated with the existing operation registry system + +### 5. Testing and Monitoring Tools +- Created test script to verify crawl functionality +- Created status checking script to monitor crawl progress +- Added comprehensive documentation + +## How It Works + +### Initial Crawl Flow +1. Symbol starts with no crawl state +2. First crawl fetches today's data and sets `newestDateReached` +3. Subsequent batches crawl backward in time +4. Each batch updates `oldestDateReached` and `lastProcessedDate` +5. When `oldestDateReached <= targetOldestDate`, crawl is marked finished + +### Resumption Flow +1. Check if `newestDateReached < yesterday` (forward gap) +2. If yes, fetch new data first to stay current +3. Check if `finished = false` (backward gap) +4. If yes, continue backward crawl from `lastProcessedDate` +5. Process in batches until complete + +### Daily Update Flow +1. For finished crawls, only check for forward gaps +2. Fetch data from `newestDateReached + 1` to today +3. Update `newestDateReached` to maintain currency + +## Benefits + +1. **Resilient**: Can handle interruptions gracefully +2. **Efficient**: Avoids re-fetching data +3. **Trackable**: Clear progress visibility +4. **Scalable**: Can handle thousands of symbols +5. **Flexible**: Configurable batch sizes and target dates + +## Usage Examples + +### Check if symbol X needs crawling: +```typescript +const tracker = operationRegistry.getTracker('qm'); +const symbols = await tracker.getSymbolsForIntradayCrawl('intraday_bars', { + limit: 1, + targetOldestDate: new Date('2020-01-01') +}); +const symbolX = symbols.find(s => s.symbol === 'X'); +``` + +### Start crawl for symbol X only: +```typescript +await handler.crawlIntradayData({ + symbol: 'X', + symbolId: symbolData.symbolId, + qmSearchCode: symbolData.qmSearchCode, + targetOldestDate: '2020-01-01' +}); +``` + +### Schedule crawls for never-run symbols: +```typescript +await handler.scheduleIntradayCrawls({ + limit: 50, + priorityMode: 'never_run', + targetOldestDate: '2020-01-01' +}); +``` + +## Next Steps + +1. Monitor the crawl progress using the provided scripts +2. Adjust batch sizes based on API rate limits +3. Consider adding more sophisticated retry logic for failed batches +4. Implement data validation to ensure quality \ No newline at end of file diff --git a/apps/stock/data-ingestion/scripts/check-intraday-status.ts b/apps/stock/data-ingestion/scripts/check-intraday-status.ts new file mode 100644 index 0000000..742a4b9 --- /dev/null +++ b/apps/stock/data-ingestion/scripts/check-intraday-status.ts @@ -0,0 +1,66 @@ +#!/usr/bin/env bun + +/** + * Script to check intraday crawl status + */ + +import { createTestContext } from '../src/test-utils'; +import { QMHandler } from '../src/handlers/qm/qm.handler'; + +async function checkIntradayStatus() { + const context = await createTestContext(); + const handler = new QMHandler(context.services); + + // Wait for operation registry to initialize + await new Promise(resolve => setTimeout(resolve, 1000)); + + try { + const tracker = handler.operationRegistry.getTracker('qm'); + + // Get operation stats + const stats = await tracker.getOperationStats('intraday_bars'); + console.log('Intraday Bars Operation Stats:'); + console.log('------------------------------'); + console.log(`Total symbols: ${stats.totalSymbols}`); + console.log(`Processed symbols: ${stats.processedSymbols}`); + console.log(`Successful symbols: ${stats.successfulSymbols}`); + console.log(`Failed symbols: ${stats.failedSymbols}`); + console.log(`Stale symbols: ${stats.staleSymbols}`); + console.log(`Finished crawls: ${stats.finishedCrawls || 0}`); + console.log(''); + + // Get symbols needing crawl + console.log('Symbols needing crawl (top 10):'); + console.log('--------------------------------'); + const needsCrawl = await tracker.getSymbolsForIntradayCrawl('intraday_bars', { + limit: 10, + targetOldestDate: new Date('2020-01-01') + }); + + for (const symbol of needsCrawl) { + const crawlState = symbol.operationStatus?.crawlState; + console.log(`\n${symbol.symbol}:`); + console.log(` Status: ${symbol.operationStatus?.status || 'never run'}`); + console.log(` Last run: ${symbol.operationStatus?.lastRunAt || 'never'}`); + + if (crawlState) { + console.log(` Finished: ${crawlState.finished}`); + console.log(` Oldest date: ${crawlState.oldestDateReached || 'none'}`); + console.log(` Newest date: ${crawlState.newestDateReached || 'none'}`); + console.log(` Days processed: ${crawlState.totalDaysProcessed || 0}`); + } + + if (symbol.gaps) { + console.log(` Gaps: ${symbol.gaps.forward ? 'new data' : ''} ${symbol.gaps.backward ? 'historical' : ''}`); + } + } + + } catch (error) { + console.error('Failed to check status:', error); + } finally { + await context.cleanup(); + } +} + +// Run the check +checkIntradayStatus().catch(console.error); \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/filings.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/filings.action.ts index 9978c56..15cf20e 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/filings.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/filings.action.ts @@ -14,8 +14,11 @@ export async function updateFilings( this: QMHandler, input: { symbol: string; - symbolId: number; + exchange: string; + lastRecordDate?: Date | null; qmSearchCode: string; + page: number; + totalPages?: number; }, _context?: ExecutionContext ): Promise<{ @@ -24,9 +27,9 @@ export async function updateFilings( message: string; data?: any; }> { - const { symbol, symbolId, qmSearchCode } = input; + const { qmSearchCode, page, symbol, exchange, lastRecordDate, totalPages } = input; - this.logger.info('Fetching filings', { symbol, symbolId }); + this.logger.info(`Fetching filings ${qmSearchCode} - ${page}/${totalPages}`, { qmSearchCode, page }); const sessionManager = QMSessionManager.getInstance(); await sessionManager.initialize(this.cache, this.logger); @@ -42,15 +45,17 @@ export async function updateFilings( try { // Build API request for filings const searchParams = new URLSearchParams({ - symbol: symbol, - symbolId: symbolId.toString(), - qmodTool: 'Filings', - webmasterId: '500', - limit: '50' // Get recent filings + symbol: qmSearchCode, + webmasterId: "500", + page: "1", + xbrlSubDoc: "true", + inclIxbrl: "true", + inclXbrl: "true", + resultsPerPage: "25", }); // TODO: Update with correct filings endpoint - const apiUrl = `${QM_CONFIG.BASE_URL}/datatool/filings.json?${searchParams.toString()}`; + const apiUrl = `${QM_CONFIG.FILING_URL}?${searchParams.toString()}`; const response = await fetch(apiUrl, { method: 'GET', @@ -75,10 +80,9 @@ export async function updateFilings( filingsData.map((filing: any) => ({ ...filing, symbol, - symbolId, - updated_at: new Date() + exchange, })), - ['symbol', 'filingDate', 'formType', 'accessionNumber'] // Unique keys + ['qmSearchCode', 'filingId'] // Unique keys ); // Update symbol to track last filings update @@ -88,6 +92,8 @@ export async function updateFilings( recordCount: filingsData.length }); + + this.logger.info('Filings updated successfully', { symbol, filingsCount: filingsData.length @@ -155,16 +161,18 @@ export async function scheduleFilingsUpdates( symbolsQueued: number; errors: number; }> { - const { limit = 100, forceUpdate = false } = input; + const { limit = 1, forceUpdate = false } = input; this.logger.info('Scheduling filings updates', { limit, forceUpdate }); try { // Get symbols that need updating - const staleSymbols = await this.operationRegistry.getStaleSymbols('qm', 'filings_update', { - minHoursSinceRun: forceUpdate ? 0 : 24, // Daily for filings - limit - }); + // const staleSymbols = await this.operationRegistry.getStaleSymbols('qm', 'filings_update', { + // minHoursSinceRun: forceUpdate ? 0 : 24, // Daily for filings + // limit + // }); + + const staleSymbols = ['X:CA'] if (staleSymbols.length === 0) { this.logger.info('No symbols need filings updates'); @@ -181,7 +189,7 @@ export async function scheduleFilingsUpdates( const symbolDocs = await this.mongodb.find('qmSymbols', { qmSearchCode: { $in: staleSymbols } }, { - projection: { symbol: 1, symbolId: 1, qmSearchCode: 1 } + projection: { qmSearchCode: 1, operations: 1, symbol: 1, exchange: 1 } }); let queued = 0; @@ -197,11 +205,11 @@ export async function scheduleFilingsUpdates( await this.scheduleOperation('update-filings', { symbol: doc.symbol, - symbolId: doc.symbolId, - qmSearchCode: doc.qmSearchCode + exchange: doc.exchange, + qmSearchCode: doc.qmSearchCode, + lastRecordDate: doc.operations?.filings_update?.lastRecordDate || null, }, { priority: 5, // Lower priority than financial data - delay: queued * 2000 // 2 seconds between jobs }); queued++; diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/index.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/index.ts index cc0f464..0105f30 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/index.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/index.ts @@ -6,9 +6,12 @@ export { scheduleEventsUpdates, updateEvents } from './events.action'; export { scheduleFilingsUpdates, updateFilings } from './filings.action'; export { scheduleFinancialsUpdates, updateFinancials } from './financials.action'; export { scheduleIntradayUpdates, updateIntradayBars } from './intraday.action'; +export { crawlIntradayData, scheduleIntradayCrawls } from './intraday-crawl.action'; export { schedulePriceUpdates, updatePrices } from './prices.action'; export { checkSessions, createSession } from './session.action'; export { scheduleSymbolInfoUpdates, updateSymbolInfo } from './symbol-info.action'; export { searchSymbols, spiderSymbol } from './symbol.action'; export { deduplicateSymbols, updateExchangeStats, updateExchangeStatsAndDeduplicate } from './symbol-dedup.action'; +export { scheduleInsidersUpdates, updateInsiders } from './insiders.action'; +export { scheduleSymbolNewsUpdates, updateSymbolNews, updateGeneralNews } from './news.action'; diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/insiders.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/insiders.action.ts new file mode 100644 index 0000000..c4d649c --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/insiders.action.ts @@ -0,0 +1,286 @@ +/** + * QM Insiders Actions - Fetch and update insider trading data + */ + +import type { ExecutionContext } from '@stock-bot/handlers'; +import type { QMHandler } from '../qm.handler'; +import { QM_CONFIG, QM_SESSION_IDS } from '../shared/config'; +import { QMSessionManager } from '../shared/session-manager'; + +/** + * Update insider transactions for a single symbol + */ +export async function updateInsiders( + this: QMHandler, + input: { + symbol: string; + symbolId: number; + qmSearchCode: string; + lookbackDays?: number; + }, + _context?: ExecutionContext +): Promise<{ + success: boolean; + symbol: string; + message: string; + data?: any; +}> { + const { symbol, symbolId, qmSearchCode, lookbackDays = 365 } = input; + + this.logger.info('Fetching insider transactions', { symbol, symbolId, lookbackDays }); + + const sessionManager = QMSessionManager.getInstance(); + await sessionManager.initialize(this.cache, this.logger); + + const sessionId = QM_SESSION_IDS.LOOKUP; + const session = await sessionManager.getSession(sessionId); + + if (!session || !session.uuid) { + throw new Error(`No active session found for QM insiders`); + } + + try { + // Calculate date range + const endDate = new Date(); + const startDate = new Date(); + startDate.setDate(startDate.getDate() - lookbackDays); + + // Build API request for insider transactions + const searchParams = new URLSearchParams({ + symbol: symbol, + symbolId: symbolId.toString(), + qmodTool: 'InsiderActivity', + webmasterId: '500', + startDate: startDate.toISOString().split('T')[0], + endDate: endDate.toISOString().split('T')[0], + includeOptions: 'true', + pageSize: '100' + } as Record); + + const apiUrl = `${QM_CONFIG.BASE_URL}/datatool/insiders.json?${searchParams.toString()}`; + + const response = await fetch(apiUrl, { + method: 'GET', + headers: session.headers, + proxy: session.proxy, + }); + + if (!response.ok) { + throw new Error(`QM API request failed: ${response.status} ${response.statusText}`); + } + + const insiderData = await response.json(); + + // Update session success stats + await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid); + + // Process and store insider data + if (insiderData && insiderData.transactions && insiderData.transactions.length > 0) { + const processedTransactions = insiderData.transactions.map((transaction: any) => ({ + symbol, + symbolId, + transactionDate: new Date(transaction.transactionDate), + filingDate: new Date(transaction.filingDate), + insiderName: transaction.insiderName, + insiderTitle: transaction.insiderTitle || 'Unknown', + transactionType: transaction.transactionType, + shares: parseFloat(transaction.shares) || 0, + pricePerShare: parseFloat(transaction.pricePerShare) || 0, + totalValue: parseFloat(transaction.totalValue) || 0, + sharesOwned: parseFloat(transaction.sharesOwned) || 0, + ownershipType: transaction.ownershipType || 'Direct', + formType: transaction.formType || 'Form 4', + transactionCode: transaction.transactionCode, + updated_at: new Date() + })); + + // Store in MongoDB + await this.mongodb.batchUpsert( + 'qmInsiders', + processedTransactions, + ['symbol', 'transactionDate', 'insiderName', 'transactionType'] // Unique keys + ); + + // Calculate summary statistics + const totalBuys = processedTransactions.filter((t: any) => + t.transactionType === 'Buy' || t.transactionType === 'Purchase' + ).length; + + const totalSells = processedTransactions.filter((t: any) => + t.transactionType === 'Sell' || t.transactionType === 'Sale' + ).length; + + const totalBuyValue = processedTransactions + .filter((t: any) => t.transactionType === 'Buy' || t.transactionType === 'Purchase') + .reduce((sum: number, t: any) => sum + t.totalValue, 0); + + const totalSellValue = processedTransactions + .filter((t: any) => t.transactionType === 'Sell' || t.transactionType === 'Sale') + .reduce((sum: number, t: any) => sum + t.totalValue, 0); + + // Update operation tracking + await this.operationRegistry.updateOperation('qm', qmSearchCode, 'insiders_update', { + status: 'success', + lastRecordDate: endDate, + recordCount: processedTransactions.length, + metadata: { + totalBuys, + totalSells, + totalBuyValue, + totalSellValue, + netValue: totalBuyValue - totalSellValue, + uniqueInsiders: new Set(processedTransactions.map((t: any) => t.insiderName)).size + } + }); + + this.logger.info('Insider transactions updated successfully', { + symbol, + transactionCount: processedTransactions.length, + totalBuys, + totalSells, + netValue: totalBuyValue - totalSellValue + }); + + return { + success: true, + symbol, + message: `Updated ${processedTransactions.length} insider transactions`, + data: { + count: processedTransactions.length, + totalBuys, + totalSells, + totalBuyValue, + totalSellValue, + netValue: totalBuyValue - totalSellValue + } + }; + } else { + // No insider data + this.logger.info('No insider transactions found', { symbol }); + + await this.operationRegistry.updateOperation('qm', qmSearchCode, 'insiders_update', { + status: 'success', + lastRecordDate: endDate, + recordCount: 0 + }); + + return { + success: true, + symbol, + message: 'No insider transactions found', + data: { count: 0 } + }; + } + + } catch (error) { + // Update session failure stats + if (session.uuid) { + await sessionManager.incrementFailedCalls(sessionId, session.uuid); + } + + this.logger.error('Error fetching insider transactions', { + symbol, + error: error instanceof Error ? error.message : 'Unknown error' + }); + + // Update operation tracking for failure + await this.operationRegistry.updateOperation('qm', qmSearchCode, 'insiders_update', { + status: 'failure', + error: error instanceof Error ? error.message : 'Unknown error' + }); + + return { + success: false, + symbol, + message: `Failed to fetch insider transactions: ${error instanceof Error ? error.message : 'Unknown error'}` + }; + } +} + +/** + * Schedule insider updates for symbols + */ +export async function scheduleInsidersUpdates( + this: QMHandler, + input: { + limit?: number; + minHoursSinceRun?: number; + forceUpdate?: boolean; + } = {}, + _context?: ExecutionContext +): Promise<{ + message: string; + symbolsQueued: number; + errors: number; +}> { + const { limit = 100, minHoursSinceRun = 24 * 7, forceUpdate = false } = input; + + this.logger.info('Scheduling insider updates', { limit, minHoursSinceRun, forceUpdate }); + + try { + // Get symbols that need insider updates + const staleSymbols = await this.operationRegistry.getStaleSymbols('qm', 'insiders_update', { + minHoursSinceRun: forceUpdate ? 0 : minHoursSinceRun, + limit + }); + + if (staleSymbols.length === 0) { + this.logger.info('No symbols need insider updates'); + return { + message: 'No symbols need insider updates', + symbolsQueued: 0, + errors: 0 + }; + } + + // Get full symbol data + const symbolsToProcess = await this.mongodb.find('qmSymbols', { + qmSearchCode: { $in: staleSymbols } + }, { + projection: { symbol: 1, symbolId: 1, qmSearchCode: 1 } + }); + + this.logger.info(`Found ${symbolsToProcess.length} symbols for insider updates`); + + let symbolsQueued = 0; + let errors = 0; + + // Schedule update jobs + for (const doc of symbolsToProcess) { + try { + if (!doc.symbolId) { + this.logger.warn(`Symbol ${doc.symbol} missing symbolId, skipping`); + continue; + } + + await this.scheduleOperation('update-insiders', { + symbol: doc.symbol, + symbolId: doc.symbolId, + qmSearchCode: doc.qmSearchCode + }, { + priority: 5, // Medium priority + delay: symbolsQueued * 1000 // 1 second between jobs + }); + + symbolsQueued++; + } catch (error) { + this.logger.error(`Failed to schedule insider update for ${doc.symbol}`, { error }); + errors++; + } + } + + this.logger.info('Insider update scheduling completed', { + symbolsQueued, + errors + }); + + return { + message: `Scheduled insider updates for ${symbolsQueued} symbols`, + symbolsQueued, + errors + }; + } catch (error) { + this.logger.error('Insider scheduling failed', { error }); + throw error; + } +} \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/intraday-crawl.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/intraday-crawl.action.ts new file mode 100644 index 0000000..9fc89c8 --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/intraday-crawl.action.ts @@ -0,0 +1,500 @@ +/** + * QM Intraday Crawl Actions - Sophisticated crawling with resumption support + */ + +import type { ExecutionContext } from '@stock-bot/handlers'; +import type { QMHandler } from '../qm.handler'; +import type { CrawlState } from '../../../shared/operation-manager/types'; +import { QM_CONFIG, QM_SESSION_IDS } from '../shared/config'; +import { QMSessionManager } from '../shared/session-manager'; + +interface IntradayCrawlInput { + symbol: string; + symbolId: number; + qmSearchCode: string; + targetOldestDate?: string; // ISO date string for how far back to crawl + batchSize?: number; // Days per batch +} + +interface DateRange { + start: Date; + end: Date; + direction: 'forward' | 'backward'; +} + +/** + * Process a batch of intraday data for a date range + */ +export async function processIntradayBatch( + this: QMHandler, + input: { + symbol: string; + symbolId: number; + qmSearchCode: string; + dateRange: DateRange; + }, + _context?: ExecutionContext +): Promise<{ + success: boolean; + recordsProcessed: number; + datesProcessed: number; + errors: string[]; +}> { + const { symbol, symbolId, qmSearchCode, dateRange } = input; + const errors: string[] = []; + let recordsProcessed = 0; + let datesProcessed = 0; + + const sessionManager = QMSessionManager.getInstance(); + await sessionManager.initialize(this.cache, this.logger); + + // Get a session + const sessionId = QM_SESSION_IDS.LOOKUP; // TODO: Update with correct session ID + const session = await sessionManager.getSession(sessionId); + + if (!session || !session.uuid) { + throw new Error(`No active session found for QM intraday`); + } + + // Process each date in the range + const currentDate = new Date(dateRange.start); + const endDate = new Date(dateRange.end); + + while ( + (dateRange.direction === 'backward' && currentDate >= endDate) || + (dateRange.direction === 'forward' && currentDate <= endDate) + ) { + try { + // Skip weekends + if (currentDate.getDay() === 0 || currentDate.getDay() === 6) { + if (dateRange.direction === 'backward') { + currentDate.setDate(currentDate.getDate() - 1); + } else { + currentDate.setDate(currentDate.getDate() + 1); + } + continue; + } + + // Build API request + const searchParams = new URLSearchParams({ + symbol: symbol, + symbolId: symbolId.toString(), + qmodTool: 'IntradayBars', + webmasterId: '500', + date: currentDate.toISOString().split('T')[0], + interval: '1' // 1-minute bars + } as Record); + + const apiUrl = `${QM_CONFIG.BASE_URL}/datatool/intraday.json?${searchParams.toString()}`; + + const response = await fetch(apiUrl, { + method: 'GET', + headers: session.headers, + proxy: session.proxy, + }); + + if (!response.ok) { + throw new Error(`API request failed: ${response.status}`); + } + + const barsData = await response.json(); + + // Update session success stats + await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid); + + // Process and store data if we got any + if (barsData && barsData.length > 0) { + const processedBars = barsData.map((bar: any) => ({ + ...bar, + symbol, + symbolId, + timestamp: new Date(bar.timestamp), + date: new Date(currentDate), + updated_at: new Date() + })); + + await this.mongodb.batchUpsert( + 'qmIntradayBars', + processedBars, + ['symbol', 'timestamp'] + ); + + recordsProcessed += barsData.length; + } + + datesProcessed++; + + } catch (error) { + const errorMsg = `Failed to fetch ${symbol} for ${currentDate.toISOString().split('T')[0]}: ${error}`; + errors.push(errorMsg); + this.logger.error(errorMsg); + + // Update session failure stats + if (session.uuid) { + await sessionManager.incrementFailedCalls(sessionId, session.uuid); + } + } + + // Move to next date + if (dateRange.direction === 'backward') { + currentDate.setDate(currentDate.getDate() - 1); + } else { + currentDate.setDate(currentDate.getDate() + 1); + } + } + + return { + success: errors.length === 0, + recordsProcessed, + datesProcessed, + errors + }; +} + +/** + * Main intraday crawl handler with sophisticated resumption logic + */ +export async function crawlIntradayData( + this: QMHandler, + input: IntradayCrawlInput, + _context?: ExecutionContext +): Promise<{ + success: boolean; + symbol: string; + message: string; + data?: any; +}> { + const { + symbol, + symbolId, + qmSearchCode, + targetOldestDate = '2020-01-01', // Default to ~5 years of data + batchSize = 7 // Process a week at a time + } = input; + + this.logger.info('Starting intraday crawl', { + symbol, + symbolId, + targetOldestDate, + batchSize + }); + + try { + // Get current crawl state + const symbolData = await this.mongodb.findOne('qmSymbols', { + qmSearchCode + }); + + const currentCrawlState: CrawlState = symbolData?.operations?.intraday_bars?.crawlState || { + finished: false + }; + + // Determine what needs to be processed + const today = new Date(); + today.setHours(0, 0, 0, 0); + + const targetOldest = new Date(targetOldestDate); + targetOldest.setHours(0, 0, 0, 0); + + const ranges: DateRange[] = []; + + // 1. Check for forward gap (new data since last crawl) + if (currentCrawlState.newestDateReached) { + const newestDate = new Date(currentCrawlState.newestDateReached); + const daysSinceNewest = Math.floor((today.getTime() - newestDate.getTime()) / (1000 * 60 * 60 * 24)); + + if (daysSinceNewest > 1) { + // We have new data to fetch + const forwardStart = new Date(newestDate); + forwardStart.setDate(forwardStart.getDate() + 1); + + ranges.push({ + start: forwardStart, + end: today, + direction: 'forward' + }); + } + } else if (!currentCrawlState.oldestDateReached) { + // Never crawled, start from today + ranges.push({ + start: today, + end: today, + direction: 'forward' + }); + } + + // 2. Check for backward gap (historical data) + if (!currentCrawlState.finished) { + const startDate = currentCrawlState.lastProcessedDate + ? new Date(currentCrawlState.lastProcessedDate) + : currentCrawlState.oldestDateReached + ? new Date(currentCrawlState.oldestDateReached) + : today; + + if (startDate > targetOldest) { + // Calculate batch end date + const batchEnd = new Date(startDate); + batchEnd.setDate(batchEnd.getDate() - batchSize); + + // Don't go past target + if (batchEnd < targetOldest) { + batchEnd.setTime(targetOldest.getTime()); + } + + ranges.push({ + start: startDate, + end: batchEnd, + direction: 'backward' + }); + } + } + + if (ranges.length === 0) { + // Nothing to do + this.logger.info('Intraday crawl already complete', { symbol }); + return { + success: true, + symbol, + message: 'Intraday crawl already complete' + }; + } + + // Process the ranges + let totalRecords = 0; + let totalDates = 0; + const allErrors: string[] = []; + + for (const range of ranges) { + this.logger.info('Processing date range', { + symbol, + start: range.start.toISOString().split('T')[0], + end: range.end.toISOString().split('T')[0], + direction: range.direction + }); + + const result = await processIntradayBatch.call(this, { + symbol, + symbolId, + qmSearchCode, + dateRange: range + }); + + totalRecords += result.recordsProcessed; + totalDates += result.datesProcessed; + allErrors.push(...result.errors); + + // Update crawl state after each batch + const updatedCrawlState: Partial = { + lastProcessedDate: range.end, + lastCrawlDirection: range.direction, + totalDaysProcessed: (currentCrawlState.totalDaysProcessed || 0) + result.datesProcessed + }; + + if (range.direction === 'forward') { + updatedCrawlState.newestDateReached = range.end; + if (!currentCrawlState.oldestDateReached) { + updatedCrawlState.oldestDateReached = range.start; + } + } else { + updatedCrawlState.oldestDateReached = range.end; + if (!currentCrawlState.newestDateReached) { + updatedCrawlState.newestDateReached = range.start; + } + } + + // Check if we've completed the crawl + if (range.direction === 'backward' && range.end <= targetOldest) { + updatedCrawlState.finished = true; + updatedCrawlState.targetOldestDate = targetOldest; + } + + await this.operationRegistry.updateOperation('qm', qmSearchCode, 'intraday_bars', { + status: allErrors.length > 0 ? 'partial' : 'success', + lastRecordDate: today, + recordCount: totalRecords, + crawlState: updatedCrawlState, + error: allErrors.length > 0 ? allErrors.join('; ') : undefined + }); + } + + const message = `Processed ${totalDates} days, ${totalRecords} records for ${symbol}`; + this.logger.info('Intraday crawl batch completed', { + symbol, + totalDates, + totalRecords, + errors: allErrors.length, + finished: ranges.some(r => r.direction === 'backward' && r.end <= targetOldest) + }); + + return { + success: allErrors.length === 0, + symbol, + message, + data: { + datesProcessed: totalDates, + recordsProcessed: totalRecords, + errors: allErrors, + crawlComplete: ranges.some(r => r.direction === 'backward' && r.end <= targetOldest) + } + }; + + } catch (error) { + this.logger.error('Intraday crawl failed', { + symbol, + error: error instanceof Error ? error.message : 'Unknown error' + }); + + await this.operationRegistry.updateOperation('qm', qmSearchCode, 'intraday_bars', { + status: 'failure', + error: error instanceof Error ? error.message : 'Unknown error' + }); + + return { + success: false, + symbol, + message: `Intraday crawl failed: ${error instanceof Error ? error.message : 'Unknown error'}` + }; + } +} + +/** + * Schedule intraday crawls for multiple symbols + */ +export async function scheduleIntradayCrawls( + this: QMHandler, + input: { + limit?: number; + targetOldestDate?: string; + priorityMode?: 'never_run' | 'incomplete' | 'stale' | 'all'; + } = {}, + _context?: ExecutionContext +): Promise<{ + message: string; + symbolsQueued: number; + errors: number; +}> { + const { + limit = 50, + targetOldestDate = '2020-01-01', + priorityMode = 'all' + } = input; + + this.logger.info('Scheduling intraday crawls', { + limit, + targetOldestDate, + priorityMode + }); + + try { + // Get symbols based on priority mode + let symbolsToProcess: any[] = []; + const tracker = this.operationRegistry.getTracker('qm'); + + switch (priorityMode) { + case 'never_run': + // Get symbols that have never been crawled + symbolsToProcess = await this.mongodb.find('qmSymbols', { + 'operations.intraday_bars': { $exists: false }, + active: { $ne: false } + }, { + limit, + projection: { symbol: 1, symbolId: 1, qmSearchCode: 1 } + }); + break; + + case 'incomplete': + // Get symbols with incomplete crawls + symbolsToProcess = await tracker.getSymbolsForIntradayCrawl( + 'intraday_bars', + { limit, targetOldestDate: new Date(targetOldestDate), includeNewDataGaps: false } + ); + break; + + case 'stale': + // Get symbols that need updates (new data) + symbolsToProcess = await tracker.getSymbolsForIntradayCrawl( + 'intraday_bars', + { limit, targetOldestDate: new Date(targetOldestDate), includeNewDataGaps: true } + ); + symbolsToProcess = symbolsToProcess.filter(s => s.gaps?.forward); + break; + + case 'all': + default: + // Get all symbols that need any processing + symbolsToProcess = await tracker.getSymbolsForIntradayCrawl( + 'intraday_bars', + { limit, targetOldestDate: new Date(targetOldestDate) } + ); + break; + } + + if (symbolsToProcess.length === 0) { + return { + message: `No symbols found for priority mode: ${priorityMode}`, + symbolsQueued: 0, + errors: 0 + }; + } + + // Get full symbol data if needed + if (priorityMode !== 'never_run') { + const qmSearchCodes = symbolsToProcess.map(s => s.symbol); + const fullSymbols = await this.mongodb.find('qmSymbols', { + qmSearchCode: { $in: qmSearchCodes } + }, { + projection: { symbol: 1, symbolId: 1, qmSearchCode: 1 } + }); + + // Map back the full data + symbolsToProcess = symbolsToProcess.map(sp => { + const full = fullSymbols.find(f => f.qmSearchCode === sp.symbol); + return full || sp; + }); + } + + let symbolsQueued = 0; + let errors = 0; + + // Schedule crawl jobs + for (const doc of symbolsToProcess) { + try { + if (!doc.symbolId) { + this.logger.warn(`Symbol ${doc.symbol} missing symbolId, skipping`); + continue; + } + + await this.scheduleOperation('crawl-intraday-data', { + symbol: doc.symbol, + symbolId: doc.symbolId, + qmSearchCode: doc.qmSearchCode, + targetOldestDate + }, { + priority: priorityMode === 'stale' ? 9 : 5, // Higher priority for updates + delay: symbolsQueued * 2000 // 2 seconds between jobs + }); + + symbolsQueued++; + } catch (error) { + this.logger.error(`Failed to schedule intraday crawl for ${doc.symbol}`, { error }); + errors++; + } + } + + this.logger.info('Intraday crawl scheduling completed', { + priorityMode, + symbolsQueued, + errors + }); + + return { + message: `Scheduled ${symbolsQueued} symbols for intraday crawl (${priorityMode} mode)`, + symbolsQueued, + errors + }; + + } catch (error) { + this.logger.error('Intraday crawl scheduling failed', { error }); + throw error; + } +} \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/news.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/news.action.ts new file mode 100644 index 0000000..bce518f --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/news.action.ts @@ -0,0 +1,448 @@ +/** + * QM News Actions - Fetch symbol-specific and general market news + */ + +import type { ExecutionContext } from '@stock-bot/handlers'; +import type { QMHandler } from '../qm.handler'; +import { QM_CONFIG, QM_SESSION_IDS } from '../shared/config'; +import { QMSessionManager } from '../shared/session-manager'; + +interface NewsArticle { + id: string; + publishedDate: Date; + title: string; + summary: string; + source: string; + url: string; + symbols?: string[]; + categories?: string[]; + sentiment?: { + score: number; + label: string; // positive, negative, neutral + }; + imageUrl?: string; +} + +/** + * Update news for a single symbol + */ +export async function updateSymbolNews( + this: QMHandler, + input: { + symbol: string; + symbolId: number; + qmSearchCode: string; + lookbackDays?: number; + }, + _context?: ExecutionContext +): Promise<{ + success: boolean; + symbol: string; + message: string; + data?: any; +}> { + const { symbol, symbolId, qmSearchCode, lookbackDays = 30 } = input; + + this.logger.info('Fetching symbol news', { symbol, symbolId, lookbackDays }); + + const sessionManager = QMSessionManager.getInstance(); + await sessionManager.initialize(this.cache, this.logger); + + const sessionId = QM_SESSION_IDS.LOOKUP; + const session = await sessionManager.getSession(sessionId); + + if (!session || !session.uuid) { + throw new Error(`No active session found for QM news`); + } + + try { + // Calculate date range + const endDate = new Date(); + const startDate = new Date(); + startDate.setDate(startDate.getDate() - lookbackDays); + + // Build API request for symbol news + const searchParams = new URLSearchParams({ + symbol: symbol, + symbolId: symbolId.toString(), + qmodTool: 'News', + webmasterId: '500', + startDate: startDate.toISOString().split('T')[0], + endDate: endDate.toISOString().split('T')[0], + includeContent: 'true', + pageSize: '50' + } as Record); + + const apiUrl = `${QM_CONFIG.BASE_URL}/datatool/news.json?${searchParams.toString()}`; + + const response = await fetch(apiUrl, { + method: 'GET', + headers: session.headers, + proxy: session.proxy, + }); + + if (!response.ok) { + throw new Error(`QM API request failed: ${response.status} ${response.statusText}`); + } + + const newsData = await response.json(); + + // Update session success stats + await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid); + + // Process and store news data + if (newsData && newsData.articles && newsData.articles.length > 0) { + const processedArticles = newsData.articles.map((article: any) => ({ + articleId: article.id || `${symbol}_${article.publishedDate}_${article.title.substring(0, 20)}`, + symbol, + symbolId, + publishedDate: new Date(article.publishedDate), + title: article.title, + summary: article.summary || article.content?.substring(0, 500), + source: article.source || 'Unknown', + url: article.url, + symbols: article.symbols || [symbol], + categories: article.categories || [], + sentiment: article.sentiment ? { + score: parseFloat(article.sentiment.score) || 0, + label: article.sentiment.label || 'neutral' + } : null, + imageUrl: article.imageUrl, + isSymbolSpecific: true, + updated_at: new Date() + })); + + // Store in MongoDB + await this.mongodb.batchUpsert( + 'qmNews', + processedArticles, + ['articleId'] // Unique key + ); + + // Calculate sentiment summary + const sentimentCounts = processedArticles.reduce((acc: any, article: any) => { + if (article.sentiment) { + acc[article.sentiment.label] = (acc[article.sentiment.label] || 0) + 1; + } + return acc; + }, {}); + + // Update operation tracking + await this.operationRegistry.updateOperation('qm', qmSearchCode, 'news_update', { + status: 'success', + lastRecordDate: endDate, + recordCount: processedArticles.length, + metadata: { + sentimentCounts, + uniqueSources: new Set(processedArticles.map((a: any) => a.source)).size, + avgSentimentScore: processedArticles + .filter((a: any) => a.sentiment?.score) + .reduce((sum: number, a: any, i: number, arr: any[]) => + i === arr.length - 1 ? (sum + a.sentiment.score) / arr.length : sum + a.sentiment.score, 0 + ) + } + }); + + this.logger.info('Symbol news updated successfully', { + symbol, + articleCount: processedArticles.length, + sentimentCounts + }); + + return { + success: true, + symbol, + message: `Updated ${processedArticles.length} news articles`, + data: { + count: processedArticles.length, + sentimentCounts, + sources: new Set(processedArticles.map((a: any) => a.source)).size + } + }; + } else { + // No news found + this.logger.info('No news articles found', { symbol }); + + await this.operationRegistry.updateOperation('qm', qmSearchCode, 'news_update', { + status: 'success', + lastRecordDate: endDate, + recordCount: 0 + }); + + return { + success: true, + symbol, + message: 'No news articles found', + data: { count: 0 } + }; + } + + } catch (error) { + // Update session failure stats + if (session.uuid) { + await sessionManager.incrementFailedCalls(sessionId, session.uuid); + } + + this.logger.error('Error fetching symbol news', { + symbol, + error: error instanceof Error ? error.message : 'Unknown error' + }); + + // Update operation tracking for failure + await this.operationRegistry.updateOperation('qm', qmSearchCode, 'news_update', { + status: 'failure', + error: error instanceof Error ? error.message : 'Unknown error' + }); + + return { + success: false, + symbol, + message: `Failed to fetch news: ${error instanceof Error ? error.message : 'Unknown error'}` + }; + } +} + +/** + * Update general market news + */ +export async function updateGeneralNews( + this: QMHandler, + input: { + categories?: string[]; + lookbackMinutes?: number; + } = {}, + _context?: ExecutionContext +): Promise<{ + success: boolean; + message: string; + data?: any; +}> { + const { categories = ['market', 'economy', 'politics'], lookbackMinutes = 60 } = input; + + this.logger.info('Fetching general news', { categories, lookbackMinutes }); + + const sessionManager = QMSessionManager.getInstance(); + await sessionManager.initialize(this.cache, this.logger); + + const sessionId = QM_SESSION_IDS.LOOKUP; + const session = await sessionManager.getSession(sessionId); + + if (!session || !session.uuid) { + throw new Error(`No active session found for QM general news`); + } + + try { + // Calculate time range + const endDate = new Date(); + const startDate = new Date(); + startDate.setMinutes(startDate.getMinutes() - lookbackMinutes); + + // Build API request for general news + const searchParams = new URLSearchParams({ + qmodTool: 'MarketNews', + webmasterId: '500', + categories: categories.join(','), + startDateTime: startDate.toISOString(), + endDateTime: endDate.toISOString(), + includeContent: 'true', + pageSize: '100' + } as Record); + + const apiUrl = `${QM_CONFIG.BASE_URL}/datatool/marketnews.json?${searchParams.toString()}`; + + const response = await fetch(apiUrl, { + method: 'GET', + headers: session.headers, + proxy: session.proxy, + }); + + if (!response.ok) { + throw new Error(`QM API request failed: ${response.status} ${response.statusText}`); + } + + const newsData = await response.json(); + + // Update session success stats + await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid); + + // Process and store general news + if (newsData && newsData.articles && newsData.articles.length > 0) { + const processedArticles = newsData.articles.map((article: any) => ({ + articleId: article.id || `general_${article.publishedDate}_${article.title.substring(0, 20)}`, + publishedDate: new Date(article.publishedDate), + title: article.title, + summary: article.summary || article.content?.substring(0, 500), + source: article.source || 'Unknown', + url: article.url, + symbols: article.symbols || [], + categories: article.categories || categories, + sentiment: article.sentiment ? { + score: parseFloat(article.sentiment.score) || 0, + label: article.sentiment.label || 'neutral' + } : null, + imageUrl: article.imageUrl, + isSymbolSpecific: false, + isMarketMoving: article.isMarketMoving || false, + importance: article.importance || 'medium', + updated_at: new Date() + })); + + // Store in MongoDB + await this.mongodb.batchUpsert( + 'qmNews', + processedArticles, + ['articleId'] // Unique key + ); + + // Find high-importance articles + const highImportanceCount = processedArticles.filter((a: any) => + a.importance === 'high' || a.isMarketMoving + ).length; + + // Update a general tracking document + await this.mongodb.updateOne( + 'qmOperationStats', + { operation: 'general_news_update' }, + { + $set: { + lastRunAt: new Date(), + lastRecordCount: processedArticles.length, + highImportanceCount, + categories, + updated_at: new Date() + } + }, + { upsert: true } + ); + + this.logger.info('General news updated successfully', { + articleCount: processedArticles.length, + highImportanceCount, + categories + }); + + return { + success: true, + message: `Updated ${processedArticles.length} general news articles`, + data: { + count: processedArticles.length, + highImportanceCount, + categories, + sources: new Set(processedArticles.map((a: any) => a.source)).size + } + }; + } else { + // No news found + this.logger.info('No general news articles found'); + + return { + success: true, + message: 'No general news articles found', + data: { count: 0 } + }; + } + + } catch (error) { + // Update session failure stats + if (session.uuid) { + await sessionManager.incrementFailedCalls(sessionId, session.uuid); + } + + this.logger.error('Error fetching general news', { + error: error instanceof Error ? error.message : 'Unknown error' + }); + + return { + success: false, + message: `Failed to fetch general news: ${error instanceof Error ? error.message : 'Unknown error'}` + }; + } +} + +/** + * Schedule symbol news updates + */ +export async function scheduleSymbolNewsUpdates( + this: QMHandler, + input: { + limit?: number; + minHoursSinceRun?: number; + forceUpdate?: boolean; + } = {}, + _context?: ExecutionContext +): Promise<{ + message: string; + symbolsQueued: number; + errors: number; +}> { + const { limit = 200, minHoursSinceRun = 24 * 7, forceUpdate = false } = input; + + this.logger.info('Scheduling symbol news updates', { limit, minHoursSinceRun, forceUpdate }); + + try { + // Get symbols that need news updates + const staleSymbols = await this.operationRegistry.getStaleSymbols('qm', 'news_update', { + minHoursSinceRun: forceUpdate ? 0 : minHoursSinceRun, + limit + }); + + if (staleSymbols.length === 0) { + this.logger.info('No symbols need news updates'); + return { + message: 'No symbols need news updates', + symbolsQueued: 0, + errors: 0 + }; + } + + // Get full symbol data + const symbolsToProcess = await this.mongodb.find('qmSymbols', { + qmSearchCode: { $in: staleSymbols } + }, { + projection: { symbol: 1, symbolId: 1, qmSearchCode: 1 } + }); + + this.logger.info(`Found ${symbolsToProcess.length} symbols for news updates`); + + let symbolsQueued = 0; + let errors = 0; + + // Schedule update jobs + for (const doc of symbolsToProcess) { + try { + if (!doc.symbolId) { + this.logger.warn(`Symbol ${doc.symbol} missing symbolId, skipping`); + continue; + } + + await this.scheduleOperation('update-symbol-news', { + symbol: doc.symbol, + symbolId: doc.symbolId, + qmSearchCode: doc.qmSearchCode + }, { + priority: 4, // Lower priority than price data + delay: symbolsQueued * 500 // 0.5 seconds between jobs + }); + + symbolsQueued++; + } catch (error) { + this.logger.error(`Failed to schedule news update for ${doc.symbol}`, { error }); + errors++; + } + } + + this.logger.info('Symbol news update scheduling completed', { + symbolsQueued, + errors + }); + + return { + message: `Scheduled news updates for ${symbolsQueued} symbols`, + symbolsQueued, + errors + }; + } catch (error) { + this.logger.error('Symbol news scheduling failed', { error }); + throw error; + } +} \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts b/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts index 0124451..d00853f 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts @@ -14,9 +14,11 @@ import { scheduleEventsUpdates, scheduleFilingsUpdates, scheduleFinancialsUpdates, + scheduleInsidersUpdates, scheduleIntradayUpdates, schedulePriceUpdates, scheduleSymbolInfoUpdates, + scheduleSymbolNewsUpdates, searchSymbols, spiderSymbol, updateEvents, @@ -24,10 +26,14 @@ import { updateExchangeStatsAndDeduplicate, updateFilings, updateFinancials, + updateGeneralNews, + updateInsiders, updateIntradayBars, updatePrices, - updateSymbolInfo + updateSymbolInfo, + updateSymbolNews } from './actions'; +import { crawlIntradayData, scheduleIntradayCrawls } from './actions/intraday-crawl.action'; import { createQMOperationRegistry } from './shared/operation-provider'; @Handler('qm') @@ -169,6 +175,12 @@ export class QMHandler extends BaseHandler { @Operation('update-intraday-bars') updateIntradayBars = updateIntradayBars; + @Operation('crawl-intraday-data') + crawlIntradayData = crawlIntradayData; + + @Operation('schedule-intraday-crawls') + scheduleIntradayCrawls = scheduleIntradayCrawls; + @Disabled() @ScheduledOperation('schedule-intraday-updates', '*/30 * * * *', { priority: 9, @@ -176,4 +188,60 @@ export class QMHandler extends BaseHandler { description: 'Check for symbols needing intraday updates every 30 minutes' }) scheduleIntradayUpdates = scheduleIntradayUpdates; + + @ScheduledOperation('schedule-intraday-crawls-batch', '0 */4 * * *', { + priority: 5, + immediately: false, + description: 'Schedule intraday crawls for incomplete symbols every 4 hours' + }) + scheduleIntradayCrawlsBatch = async () => { + return scheduleIntradayCrawls.call(this, { + limit: 25, + priorityMode: 'incomplete' + }); + }; + + /** + * INSIDER TRADING + */ + @Operation('update-insiders') + updateInsiders = updateInsiders; + + @Disabled() + @ScheduledOperation('schedule-insiders-updates', '0 4 * * 1', { + priority: 5, + immediately: false, + description: 'Check for symbols needing insider updates weekly on Monday at 4 AM' + }) + scheduleInsidersUpdates = scheduleInsidersUpdates; + + /** + * NEWS + */ + @Operation('update-symbol-news') + updateSymbolNews = updateSymbolNews; + + @Operation('update-general-news') + updateGeneralNews = updateGeneralNews; + + @Disabled() + @ScheduledOperation('schedule-symbol-news-updates', '0 5 * * 1', { + priority: 4, + immediately: false, + description: 'Check for symbols needing news updates weekly on Monday at 5 AM' + }) + scheduleSymbolNewsUpdates = scheduleSymbolNewsUpdates; + + @Disabled() + @ScheduledOperation('update-general-news-frequent', '*/1 * * * *', { + priority: 9, + immediately: true, + description: 'Update general market news every minute' + }) + updateGeneralNewsFrequent = async () => { + return updateGeneralNews.call(this, { + categories: ['market', 'economy', 'politics', 'breaking'], + lookbackMinutes: 5 // Only look back 5 minutes to avoid duplicates + }); + }; } \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts b/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts index 775418e..9fc78f4 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts @@ -10,7 +10,7 @@ export const QM_SESSION_IDS = { SYMBOL: '1e1d7cb1de1fd2fe52684abdea41a446919a5fe12776dfab88615ac1ce1ec2f6', // getProfiles PRICES: '5ad521e05faf5778d567f6d0012ec34d6cdbaeb2462f41568f66558bc7b4ced9', // getEnhancedChartData FINANCIALS: '4e4f1565fb7c9f2a8b4b32b9aa3137af684f3da8a2ce97799d3a7117b14f07be', // getFinancialsEnhancedBySymbol - // FILINGS: '', // + FILINGS: 'a863d519e38f80e45d10e280fb1afc729816e23f0218db2f3e8b23005a9ad8dd', // getCompanyFilings // INTRADAY: '', // // '5ad521e05faf5778d567f6d0012ec34d6cdbaeb2462f41568f66558bc7b4ced9' // getEhnachedChartData // '5ad521e05faf5778d567f6d0012ec34d6cdbaeb2462f41568f66558bc7b4ced9': [], //4488d072b @@ -31,6 +31,7 @@ export const QM_SESSION_IDS = { // QM API Configuration export const QM_CONFIG = { + PROXY_URL: 'http://5.79.66.2:13010', BASE_URL: 'https://app.quotemedia.com', SESSION_PATH: '/auth/g/authenticate/dataTool/v0/500', LOOKUP_URL: 'https://app.quotemedia.com/datatool/lookup.json', @@ -38,6 +39,7 @@ export const QM_CONFIG = { PRICES_URL: 'https://app.quotemedia.com/datatool/getEnhancedChartData.json', EVENTS_URL: 'https://app.quotemedia.com/datatool/getIndicatorsBySymbol.json', FINANCIALS_URL: 'https://app.quotemedia.com/datatool/getFinancialsEnhancedBySymbol.json', + FILING_URL: 'https://app.quotemedia.com/datatool/getCompanyFilings.json', } as const; // Session management settings diff --git a/apps/stock/data-ingestion/src/handlers/qm/shared/operation-provider.ts b/apps/stock/data-ingestion/src/handlers/qm/shared/operation-provider.ts index c75f1dd..39ced65 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/shared/operation-provider.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/shared/operation-provider.ts @@ -59,6 +59,22 @@ export const QM_OPERATIONS: OperationConfig[] = [ type: 'standard', description: 'Update SEC filings', defaultStaleHours: 24 // Daily + }, + + // Insider trading + { + name: 'insiders_update', + type: 'standard', + description: 'Update insider transactions', + defaultStaleHours: 24 * 7 // Weekly + }, + + // News + { + name: 'news_update', + type: 'standard', + description: 'Update symbol-specific news', + defaultStaleHours: 24 * 7 // Weekly } ]; diff --git a/apps/stock/data-ingestion/src/shared/operation-manager/OperationTracker.ts b/apps/stock/data-ingestion/src/shared/operation-manager/OperationTracker.ts index 3cd4cc2..8b71596 100644 --- a/apps/stock/data-ingestion/src/shared/operation-manager/OperationTracker.ts +++ b/apps/stock/data-ingestion/src/shared/operation-manager/OperationTracker.ts @@ -82,12 +82,22 @@ export class OperationTracker { } ]; - // Add crawl state index for crawl operations + // Add crawl state indexes for crawl operations if (operation.type === 'crawl' || operation.type === 'intraday_crawl') { - indexes.push({ - [`operations.${operation.name}.crawlState.finished`]: 1, - [symbolField]: 1 - }); + indexes.push( + { + [`operations.${operation.name}.crawlState.finished`]: 1, + [symbolField]: 1 + }, + { + [`operations.${operation.name}.crawlState.newestDateReached`]: 1, + [symbolField]: 1 + }, + { + [`operations.${operation.name}.crawlState.oldestDateReached`]: 1, + [symbolField]: 1 + } + ); } const collection = this.mongodb.collection(collectionName); @@ -168,9 +178,21 @@ export class OperationTracker { if (data.crawlState.oldestDateReached) { update.$set[`${existingPath}.oldestDateReached`] = data.crawlState.oldestDateReached; } + if (data.crawlState.newestDateReached) { + update.$set[`${existingPath}.newestDateReached`] = data.crawlState.newestDateReached; + } + if (data.crawlState.lastProcessedDate) { + update.$set[`${existingPath}.lastProcessedDate`] = data.crawlState.lastProcessedDate; + } + if (data.crawlState.totalDaysProcessed !== undefined) { + update.$set[`${existingPath}.totalDaysProcessed`] = data.crawlState.totalDaysProcessed; + } if (data.crawlState.lastCrawlDirection) { update.$set[`${existingPath}.lastCrawlDirection`] = data.crawlState.lastCrawlDirection; } + if (data.crawlState.targetOldestDate) { + update.$set[`${existingPath}.targetOldestDate`] = data.crawlState.targetOldestDate; + } if (data.crawlState.metadata) { update.$set[`${existingPath}.metadata`] = data.crawlState.metadata; } @@ -250,7 +272,12 @@ export class OperationTracker { const basePath = `operations.${operation}.crawlState`; Object.entries(data.crawlState).forEach(([key, value]) => { if (value !== undefined) { - update.$set[`${basePath}.${key}`] = value; + // Handle Date objects properly + if (value instanceof Date || (typeof value === 'string' && key.includes('Date'))) { + update.$set[`${basePath}.${key}`] = new Date(value); + } else { + update.$set[`${basePath}.${key}`] = value; + } } }); } @@ -374,13 +401,15 @@ export class OperationTracker { async markCrawlFinished( symbol: string, operationName: string, - oldestDateReached: Date + oldestDateReached: Date, + newestDateReached?: Date ): Promise { await this.updateSymbolOperation(symbol, operationName, { status: 'success', crawlState: { finished: true, - oldestDateReached + oldestDateReached, + newestDateReached: newestDateReached || new Date() } }); @@ -388,10 +417,137 @@ export class OperationTracker { provider: this.provider.getProviderConfig().name, symbol, operation: operationName, - oldestDateReached + oldestDateReached, + newestDateReached }); } + /** + * Get symbols for intraday crawl with gap detection + */ + async getSymbolsForIntradayCrawl( + operationName: string, + options: { + limit?: number; + targetOldestDate?: Date; + includeNewDataGaps?: boolean; + } = {} + ): Promise> { + const { collectionName, symbolField } = this.provider.getProviderConfig(); + const { limit = 100, targetOldestDate, includeNewDataGaps = true } = options; + + this.provider.validateOperation(operationName); + + // Build filter + const filter: any = { + active: { $ne: false } + }; + + // Get all symbols that either: + // 1. Have never been crawled + // 2. Are not finished + // 3. Have gaps (new data since last crawl) + const orConditions = [ + { [`operations.${operationName}`]: { $exists: false } }, + { [`operations.${operationName}.crawlState.finished`]: { $ne: true } } + ]; + + if (includeNewDataGaps) { + const yesterday = new Date(); + yesterday.setDate(yesterday.getDate() - 1); + yesterday.setHours(0, 0, 0, 0); + + orConditions.push({ + [`operations.${operationName}.crawlState.newestDateReached`]: { $lt: yesterday } + }); + } + + filter.$or = orConditions; + + const symbols = await this.mongodb.find(collectionName, filter, { + limit, + projection: { + [symbolField]: 1, + [`operations.${operationName}`]: 1 + }, + sort: { + [`operations.${operationName}.lastRunAt`]: 1 + } + }); + + return symbols.map(doc => { + const opStatus = doc.operations?.[operationName]; + const crawlState = opStatus?.crawlState; + + // Determine gaps + const gaps: { forward?: boolean; backward?: boolean } = {}; + + if (crawlState) { + // Check for forward gap (new data) + if (crawlState.newestDateReached) { + const daysSinceNewest = Math.floor( + (Date.now() - new Date(crawlState.newestDateReached).getTime()) / (1000 * 60 * 60 * 24) + ); + gaps.forward = daysSinceNewest > 1; + } + + // Check for backward gap (historical data) + if (!crawlState.finished) { + gaps.backward = true; + if (targetOldestDate && crawlState.oldestDateReached) { + gaps.backward = new Date(crawlState.oldestDateReached) > targetOldestDate; + } + } + } else { + // Never crawled, has both gaps + gaps.forward = true; + gaps.backward = true; + } + + return { + symbol: doc[symbolField], + lastRecordDate: opStatus?.lastRecordDate, + operationStatus: opStatus, + gaps + }; + }); + } + + /** + * Check if intraday crawl is complete + */ + async isIntradayCrawlComplete( + symbol: string, + operationName: string, + targetOldestDate: Date + ): Promise { + const { collectionName, symbolField } = this.provider.getProviderConfig(); + + const doc = await this.mongodb.findOne(collectionName, { + [symbolField]: symbol + }, { + projection: { [`operations.${operationName}.crawlState`]: 1 } + }); + + if (!doc?.operations?.[operationName]?.crawlState) { + return false; + } + + const crawlState = doc.operations[operationName].crawlState; + + // Check if explicitly marked as finished + if (crawlState.finished) { + return true; + } + + // Check if we've reached the target oldest date + if (crawlState.oldestDateReached && targetOldestDate) { + return new Date(crawlState.oldestDateReached) <= targetOldestDate; + } + + return false; + } + /** * Get symbols that need data updates based on last record date */ diff --git a/apps/stock/data-ingestion/src/shared/operation-manager/types.ts b/apps/stock/data-ingestion/src/shared/operation-manager/types.ts index 4e408c1..978e53f 100644 --- a/apps/stock/data-ingestion/src/shared/operation-manager/types.ts +++ b/apps/stock/data-ingestion/src/shared/operation-manager/types.ts @@ -54,8 +54,16 @@ export interface CrawlState { finished: boolean; /** Oldest date reached during crawl */ oldestDateReached?: Date; + /** Newest date reached during crawl */ + newestDateReached?: Date; + /** Last date that was processed (for resumption) */ + lastProcessedDate?: Date; + /** Total days processed so far */ + totalDaysProcessed?: number; /** Direction of last crawl */ lastCrawlDirection?: 'forward' | 'backward'; + /** Target oldest date to reach */ + targetOldestDate?: Date; /** Custom crawl metadata */ metadata?: Record; } diff --git a/apps/stock/data-ingestion/test/intraday-crawl.test.ts b/apps/stock/data-ingestion/test/intraday-crawl.test.ts new file mode 100644 index 0000000..3ea9f48 --- /dev/null +++ b/apps/stock/data-ingestion/test/intraday-crawl.test.ts @@ -0,0 +1,77 @@ +#!/usr/bin/env bun + +/** + * Test script for intraday crawl functionality + */ + +import { createTestContext } from '../src/test-utils'; +import { QMHandler } from '../src/handlers/qm/qm.handler'; + +async function testIntradayCrawl() { + console.log('Testing intraday crawl functionality...\n'); + + const context = await createTestContext(); + const handler = new QMHandler(context.services); + + // Wait for operation registry to initialize + await new Promise(resolve => setTimeout(resolve, 1000)); + + try { + // Test 1: Schedule crawls for never-run symbols + console.log('Test 1: Scheduling crawls for never-run symbols...'); + const result1 = await handler.scheduleIntradayCrawls({ + limit: 5, + priorityMode: 'never_run', + targetOldestDate: '2023-01-01' // Just 1 year for testing + }); + console.log('Result:', result1); + console.log(''); + + // Test 2: Check crawl state for a specific symbol + console.log('Test 2: Checking crawl state for symbol X...'); + const tracker = handler.operationRegistry.getTracker('qm'); + const isComplete = await tracker.isIntradayCrawlComplete('X', 'intraday_bars', new Date('2023-01-01')); + console.log('Is crawl complete for X?', isComplete); + + // Get detailed state + const symbols = await tracker.getSymbolsForIntradayCrawl('intraday_bars', { + limit: 1, + targetOldestDate: new Date('2023-01-01') + }); + + const symbolX = symbols.find(s => s.symbol === 'X'); + if (symbolX) { + console.log('Symbol X state:', JSON.stringify(symbolX, null, 2)); + } + console.log(''); + + // Test 3: Manually crawl a single symbol + console.log('Test 3: Manually crawling intraday data for X...'); + + // First get symbol data + const symbolData = await context.services.mongodb.findOne('qmSymbols', { + symbol: 'X' + }); + + if (symbolData && symbolData.symbolId) { + const crawlResult = await handler.crawlIntradayData({ + symbol: 'X', + symbolId: symbolData.symbolId, + qmSearchCode: symbolData.qmSearchCode, + targetOldestDate: '2024-01-01', // Just current year for quick test + batchSize: 7 + }); + console.log('Crawl result:', crawlResult); + } else { + console.log('Symbol X not found or missing symbolId'); + } + + } catch (error) { + console.error('Test failed:', error); + } finally { + await context.cleanup(); + } +} + +// Run the test +testIntradayCrawl().catch(console.error); \ No newline at end of file diff --git a/apps/stock/data-ingestion/test/test-modified-functions.ts b/apps/stock/data-ingestion/test/test-modified-functions.ts new file mode 100644 index 0000000..83fca7e --- /dev/null +++ b/apps/stock/data-ingestion/test/test-modified-functions.ts @@ -0,0 +1,139 @@ +/** + * Examples of modifying existing functions to only work with symbol 'X' + */ + +import { OperationTracker } from '../src/shared/operation-manager'; + +// Example 1: Modified getStaleSymbols to only return symbol X +async function getStaleSymbolsOnlyX( + operationTracker: OperationTracker, + providerName: string, + operationName: string, + options: any = {} +) { + // Method 1: Add symbolFilter to options + const modifiedOptions = { + ...options, + symbolFilter: { symbol: 'X' } + }; + + return operationTracker.getStaleSymbols(providerName, operationName, modifiedOptions); +} + +// Example 2: Modified sophisticated backtest function for symbol X only +async function runSophisticatedBacktestOnlyX(orchestrator: any) { + const symbols = ['X']; // Only test symbol X + + const backtestConfig = { + symbols, + startDate: new Date('2023-01-01'), + endDate: new Date('2024-01-01'), + strategies: ['momentum', 'mean_reversion'], + // ... rest of config + }; + + return orchestrator.runBacktest(backtestConfig); +} + +// Example 3: Modified schedule function to only process symbol X +async function scheduleOperationsOnlyX(handler: any) { + // Get all symbols that need updates, then filter for X + const staleSymbols = await handler.operationRegistry.getStaleSymbols('qm', 'price_update', { + minHoursSinceRun: 24, + limit: 1000 + }); + + // Filter to only include symbol X + const symbolXOnly = staleSymbols.filter((s: string) => s.includes('X:')); + + if (symbolXOnly.length > 0) { + const symbolData = await handler.mongodb.find('qmSymbols', { + qmSearchCode: symbolXOnly[0] + }); + + if (symbolData.length > 0) { + await handler.scheduleOperation('update-prices', { + symbol: symbolData[0].symbol, + symbolId: symbolData[0].symbolId, + qmSearchCode: symbolData[0].qmSearchCode + }); + } + } +} + +// Example 4: Modified intraday crawl for symbol X only +async function crawlIntradayOnlyX(handler: any) { + // Direct approach - just process symbol X + const symbolX = await handler.mongodb.findOne('qmSymbols', { symbol: 'X' }); + + if (symbolX) { + return handler.crawlIntradayData({ + symbol: symbolX.symbol, + symbolId: symbolX.symbolId, + qmSearchCode: symbolX.qmSearchCode, + mode: 'full', + targetOldestDate: new Date('2020-01-01') + }); + } +} + +// Example 5: Test wrapper that ensures only symbol X is processed +function createSymbolXTestWrapper(originalFunction: Function) { + return async function(...args: any[]) { + // Check if first argument has symbol property + if (args[0] && typeof args[0] === 'object') { + // If it's a symbol-specific call, only proceed if symbol is X + if (args[0].symbol && args[0].symbol !== 'X') { + console.log(`Skipping symbol ${args[0].symbol} - only testing X`); + return { success: false, message: 'Test mode - only symbol X allowed' }; + } + + // If it's a batch operation, filter to only X + if (args[0].symbols && Array.isArray(args[0].symbols)) { + args[0].symbols = args[0].symbols.filter((s: string) => s === 'X'); + } + } + + // Call original function with potentially modified args + return originalFunction.apply(this, args); + }; +} + +// Example usage +async function demonstrateUsage() { + console.log('=== Demonstration of Symbol X Only Modifications ===\n'); + + // Mock tracker for demonstration + const mockTracker = { + getStaleSymbols: async (provider: string, operation: string, options: any) => { + console.log(`Getting stale symbols with options:`, options); + if (options.symbolFilter?.symbol === 'X') { + return ['X:NYSE']; + } + return ['AAPL:NASDAQ', 'GOOGL:NASDAQ', 'X:NYSE', 'MSFT:NASDAQ']; + } + } as any; + + // Test the modified function + console.log('1. Testing getStaleSymbols with X filter:'); + const xOnlySymbols = await getStaleSymbolsOnlyX(mockTracker, 'qm', 'price_update', { + minHoursSinceRun: 24 + }); + console.log('Results:', xOnlySymbols); + + console.log('\n2. Example of wrapper usage:'); + const originalUpdate = async (input: any) => { + console.log(`Processing symbol: ${input.symbol}`); + return { success: true, symbol: input.symbol }; + }; + + const wrappedUpdate = createSymbolXTestWrapper(originalUpdate); + + await wrappedUpdate({ symbol: 'X', symbolId: 123 }); + await wrappedUpdate({ symbol: 'AAPL', symbolId: 456 }); // Will be skipped + + console.log('\n=== Demonstration Complete ==='); +} + +// Run demonstration +demonstrateUsage().catch(console.error); \ No newline at end of file diff --git a/apps/stock/data-ingestion/test/test-qm-operations.ts b/apps/stock/data-ingestion/test/test-qm-operations.ts new file mode 100644 index 0000000..666b702 --- /dev/null +++ b/apps/stock/data-ingestion/test/test-qm-operations.ts @@ -0,0 +1,163 @@ +/** + * Test script for QM operations + */ + +import { QMHandler } from '../src/handlers/qm/qm.handler'; +import type { DataIngestionServices } from '../src/types'; + +// Mock services for testing +const mockServices: Partial = { + mongodb: { + batchUpsert: async (collection: string, data: any[], uniqueKeys: string[]) => { + console.log(`Mock: Batch upsert to ${collection}`, { + recordCount: data.length, + uniqueKeys + }); + return { insertedCount: data.length, modifiedCount: 0 }; + }, + find: async (collection: string, query: any, options?: any) => { + console.log(`Mock: Find from ${collection}`, { query, options }); + // Return test symbol for testing + if (collection === 'qmSymbols' && query.symbol === 'X') { + return [{ + symbol: 'X', + symbolId: 123456, + qmSearchCode: 'X:NYSE', + exchange: 'NYSE', + name: 'United States Steel Corporation' + }]; + } + return []; + }, + updateOne: async (collection: string, filter: any, update: any, options?: any) => { + console.log(`Mock: Update ${collection}`, { filter, update, options }); + return { modifiedCount: 1 }; + } + }, + cache: { + get: async (key: string) => { + console.log(`Mock: Cache get ${key}`); + return null; + }, + set: async (key: string, value: any, ttl?: number) => { + console.log(`Mock: Cache set ${key}`, { ttl }); + return true; + } + }, + logger: { + info: (message: string, data?: any) => { + console.log(`[INFO] ${message}`, data || ''); + }, + error: (message: string, data?: any) => { + console.error(`[ERROR] ${message}`, data || ''); + }, + warn: (message: string, data?: any) => { + console.warn(`[WARN] ${message}`, data || ''); + }, + debug: (message: string, data?: any) => { + console.debug(`[DEBUG] ${message}`, data || ''); + } + }, + // Mock operation registry + operationRegistry: { + updateOperation: async (provider: string, symbol: string, operation: string, data: any) => { + console.log(`Mock: Update operation ${provider}/${operation} for ${symbol}`, data); + return true; + }, + getStaleSymbols: async (provider: string, operation: string, options: any) => { + console.log(`Mock: Get stale symbols for ${provider}/${operation}`, options); + // Return test symbol + if (options.symbolFilter?.symbol === 'X') { + return ['X:NYSE']; + } + return []; + } + } +} as DataIngestionServices; + +async function testQMOperations() { + console.log('=== Testing QM Operations ===\n'); + + // Create handler instance + const handler = new QMHandler(mockServices); + + // Wait a bit for initialization + await new Promise(resolve => setTimeout(resolve, 1000)); + + // Test 1: Update Insiders for symbol X + console.log('Test 1: Update Insiders for symbol X'); + console.log('-------------------------------------'); + try { + const insidersResult = await handler.updateInsiders({ + symbol: 'X', + symbolId: 123456, + qmSearchCode: 'X:NYSE', + lookbackDays: 30 + }); + console.log('Result:', JSON.stringify(insidersResult, null, 2)); + } catch (error) { + console.error('Failed:', error); + } + + console.log('\n'); + + // Test 2: Update Symbol News for symbol X + console.log('Test 2: Update Symbol News for symbol X'); + console.log('----------------------------------------'); + try { + const newsResult = await handler.updateSymbolNews({ + symbol: 'X', + symbolId: 123456, + qmSearchCode: 'X:NYSE', + lookbackDays: 7 + }); + console.log('Result:', JSON.stringify(newsResult, null, 2)); + } catch (error) { + console.error('Failed:', error); + } + + console.log('\n'); + + // Test 3: Update General News + console.log('Test 3: Update General News'); + console.log('---------------------------'); + try { + const generalNewsResult = await handler.updateGeneralNews({ + categories: ['market', 'economy'], + lookbackMinutes: 60 + }); + console.log('Result:', JSON.stringify(generalNewsResult, null, 2)); + } catch (error) { + console.error('Failed:', error); + } + + console.log('\n'); + + // Test 4: Check available operations + console.log('Test 4: List Available Operations'); + console.log('---------------------------------'); + const operations = [ + 'create-session', + 'search-symbols', + 'update-symbol-info', + 'update-financials', + 'update-events', + 'update-filings', + 'update-prices', + 'update-intraday-bars', + 'crawl-intraday-data', + 'update-insiders', + 'update-symbol-news', + 'update-general-news' + ]; + + for (const op of operations) { + const hasOperation = typeof (handler as any)[op.replace(/-/g, '')] === 'function'; + console.log(`${op}: ${hasOperation ? '✓' : '✗'}`); + } + + console.log('\n=== Tests Complete ==='); +} + +// Run tests +testQMOperations().catch(console.error); \ No newline at end of file diff --git a/apps/stock/data-ingestion/test/test-symbol-x.ts b/apps/stock/data-ingestion/test/test-symbol-x.ts new file mode 100644 index 0000000..a0e88ac --- /dev/null +++ b/apps/stock/data-ingestion/test/test-symbol-x.ts @@ -0,0 +1,118 @@ +/** + * Test script specifically for symbol X operations + */ + +import { QMHandler } from '../src/handlers/qm/qm.handler'; +import { OperationTracker } from '../src/shared/operation-manager'; +import type { DataIngestionServices } from '../src/types'; + +// Simple test to check operations for symbol X +async function testSymbolXOperations() { + console.log('=== Testing Operations for Symbol X ===\n'); + + // Mock minimal services needed + const mockServices: Partial = { + mongodb: { + collection: (name: string) => ({ + find: () => ({ + toArray: async () => { + console.log(`Querying collection: ${name}`); + if (name === 'qmSymbols') { + return [{ + symbol: 'X', + symbolId: 123456, + qmSearchCode: 'X:NYSE', + exchange: 'NYSE', + name: 'United States Steel Corporation' + }]; + } + return []; + } + }), + findOne: async (query: any) => { + console.log(`Finding one in ${name}:`, query); + return null; + }, + updateOne: async (filter: any, update: any, options: any) => { + console.log(`Updating ${name}:`, { filter, update }); + return { modifiedCount: 1 }; + } + }), + find: async (collection: string, query: any) => { + console.log(`Direct find on ${collection}:`, query); + if (collection === 'qmSymbols' && query.symbol === 'X') { + return [{ + symbol: 'X', + symbolId: 123456, + qmSearchCode: 'X:NYSE' + }]; + } + return []; + } + } as any, + logger: { + info: (msg: string, data?: any) => console.log(`[INFO] ${msg}`, data || ''), + error: (msg: string, data?: any) => console.error(`[ERROR] ${msg}`, data || ''), + warn: (msg: string, data?: any) => console.warn(`[WARN] ${msg}`, data || ''), + debug: (msg: string, data?: any) => console.debug(`[DEBUG] ${msg}`, data || '') + } + } as DataIngestionServices; + + // Test 1: Check stale operations for symbol X + console.log('Test 1: Get stale operations for symbol X'); + console.log('------------------------------------------'); + + const tracker = new OperationTracker(mockServices as any); + + try { + // Check each operation type + const operations = [ + 'symbol_info', + 'price_update', + 'intraday_bars', + 'financials_update_quarterly', + 'financials_update_annual', + 'events_update', + 'filings_update', + 'insiders_update', + 'news_update' + ]; + + for (const operation of operations) { + console.log(`\nChecking ${operation}:`); + + const staleSymbols = await tracker.getStaleSymbols('qm', operation, { + minHoursSinceRun: 0, // Get all symbols regardless of last run + limit: 10, + symbolFilter: { symbol: 'X' } // Only get symbol X + }); + + console.log(`Found ${staleSymbols.length} stale symbols:`, staleSymbols); + } + } catch (error) { + console.error('Error checking stale symbols:', error); + } + + // Test 2: Check intraday crawl status for symbol X + console.log('\n\nTest 2: Check intraday crawl status for symbol X'); + console.log('------------------------------------------------'); + + try { + const symbolsForCrawl = await tracker.getSymbolsForIntradayCrawl('intraday_bars', { + limit: 10, + symbolFilter: { symbol: 'X' } + }); + + console.log(`Found ${symbolsForCrawl.length} symbols for intraday crawl`); + if (symbolsForCrawl.length > 0) { + console.log('Symbol details:', JSON.stringify(symbolsForCrawl[0], null, 2)); + } + } catch (error) { + console.error('Error checking intraday crawl:', error); + } + + console.log('\n=== Tests Complete ==='); +} + +// Run the test +testSymbolXOperations().catch(console.error); \ No newline at end of file