diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/corporate-actions.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/corporate-actions.ts index fda77a7..977e98d 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/actions/corporate-actions.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/corporate-actions.ts @@ -209,7 +209,8 @@ export async function fetchCorporateActions( // Update operation tracker based on action type const operationName = actionType === 'dividends' ? 'dividends_update' : 'splits_update'; - await this.operationRegistry.updateOperation('eod', symbol, operationName, { + const eodSearchCode = `${symbol}.${exchange}`; + await this.operationRegistry.updateOperation('eod', eodSearchCode, operationName, { status: 'success', recordCount: result.insertedCount, metadata: { @@ -230,7 +231,8 @@ export async function fetchCorporateActions( // Update operation tracker with failure const operationName = actionType === 'dividends' ? 'dividends_update' : 'splits_update'; - await this.operationRegistry.updateOperation('eod', symbol, operationName, { + const eodSearchCode = `${symbol}.${exchange}`; + await this.operationRegistry.updateOperation('eod', eodSearchCode, operationName, { status: 'failure', error: error.message }); diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/fundamentals.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/fundamentals.ts index 21c65b4..ef37d4d 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/actions/fundamentals.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/fundamentals.ts @@ -220,16 +220,17 @@ export async function fetchBulkFundamentals( logger.info(`Saved ${result.insertedCount} fundamentals records for ${exchange}`); // Update operation tracker for each symbol - const updatePromises = symbolsToUpdate.map(({ symbol, exchange }) => - this.operationRegistry.updateOperation('eod', symbol, 'fundamentals_update', { + const updatePromises = symbolsToUpdate.map(({ symbol, exchange }) => { + const eodSearchCode = `${symbol}.${exchange}`; + return this.operationRegistry.updateOperation('eod', eodSearchCode, 'fundamentals_update', { status: 'success', recordCount: 1, metadata: { hasData: true, exchange: exchange } - }) - ); + }); + }); await Promise.all(updatePromises); totalProcessed += fundamentalsToSave.length; @@ -246,12 +247,13 @@ export async function fetchBulkFundamentals( logger.error('Failed to fetch bulk fundamentals', { error }); // Mark all symbols as failed - const failPromises = input.symbols.map(({ symbol }) => - this.operationRegistry.updateOperation('eod', symbol, 'fundamentals_update', { + const failPromises = input.symbols.map(({ symbol, exchange }) => { + const eodSearchCode = `${symbol}.${exchange}`; + return this.operationRegistry.updateOperation('eod', eodSearchCode, 'fundamentals_update', { status: 'failure', error: error.message - }) - ); + }); + }); await Promise.all(failPromises); throw error; @@ -329,7 +331,8 @@ export async function fetchSingleFundamentals( ); // Update operation tracker - await this.operationRegistry.updateOperation('eod', symbol, 'fundamentals_update', { + const eodSearchCode = `${symbol}.${exchange}`; + await this.operationRegistry.updateOperation('eod', eodSearchCode, 'fundamentals_update', { status: 'success', recordCount: result.insertedCount, metadata: { @@ -349,7 +352,8 @@ export async function fetchSingleFundamentals( logger.error('Failed to fetch single fundamentals', { error, symbol, exchange }); // Update operation tracker with failure - await this.operationRegistry.updateOperation('eod', symbol, 'fundamentals_update', { + const eodSearchCode = `${symbol}.${exchange}`; + await this.operationRegistry.updateOperation('eod', eodSearchCode, 'fundamentals_update', { status: 'failure', error: error.message }); diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/intraday.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/intraday.ts index 3edeed3..48e00c4 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/actions/intraday.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/intraday.ts @@ -1,5 +1,4 @@ -import type { CrawlState } from '../../../shared/operation-manager/types'; import type { EodHandler } from '../eod.handler'; import { EOD_CONFIG } from '../shared'; @@ -19,7 +18,6 @@ interface CrawlIntradayInput { country?: string; } -// CrawlState is imported from operation-manager types // Max days per interval based on EOD limits const MAX_DAYS_PER_INTERVAL = { @@ -47,16 +45,18 @@ export async function scheduleIntradayCrawl( const interval = intervals[i]; const operationName = operationNames[i]!; // Non-null assertion since we know the array has 3 elements - const symbolsForInterval = await this.operationRegistry.getStaleSymbols('eod', operationName, { - limit: 1000, // Get more to filter - symbolFilter: { symbol: 'AAPL' } // Filter for AAPL only - }); + // For intraday, we want to check even finished crawls for new data + // So we'll query the symbols directly + const symbolsForInterval = await this.mongodb.collection('eodSymbols').find({ + eodSearchCode: 'AAPL.US', + delisted: false + }).toArray(); // Filter out delisted symbols and ensure we get AAPL with US exchange const activeSymbols = symbolsForInterval.filter(item => item.symbol.delisted === false && item.symbol.Code === 'AAPL' && - (item.symbol.eodExchange === 'US' || item.symbol.Exchange === 'US') + item.symbol.eodExchange === 'US' ); // Add interval info to each symbol @@ -153,27 +153,38 @@ export async function crawlIntraday( throw new Error(`Symbol ${symbol}.${exchange} not found`); } + logger.debug('Found symbol document', { + symbol, + exchange, + hasOperations: !!symbolDoc.operations, + operationKeys: Object.keys(symbolDoc.operations || {}) + }); + // Get operation status from tracker const operationName = `intraday_${interval}`; - const operationStatus = symbolDoc.operations?.[operationName]; - const crawlState: CrawlState = operationStatus?.crawlState || { - finished: false - }; + const operationStatus = symbolDoc.operations?.[operationName] || {}; + + logger.info(`Current crawl state for ${symbol}.${exchange} - ${interval}`, { + hasOperationStatus: !!symbolDoc.operations?.[operationName], + operationStatus: operationStatus, + lastProcessedDate: operationStatus.lastProcessedDate, + finished: operationStatus.finished + }); // Determine date range for this batch const maxDays = MAX_DAYS_PER_INTERVAL[interval]; let toDate = new Date(); let fromDate = new Date(); - if (crawlState.lastProcessedDate) { - // Continue from where we left off - toDate = new Date(crawlState.lastProcessedDate); - toDate.setDate(toDate.getDate() - 1); // Start from day before last processed + if (operationStatus.lastProcessedDate) { + // Continue from where we left off - the last processed date becomes the new toDate + toDate = new Date(operationStatus.lastProcessedDate); + // No need to subtract a day - lastProcessedDate is the fromDate of the last batch } // Calculate from date (going backwards) fromDate = new Date(toDate); - fromDate.setDate(fromDate.getDate() - maxDays + 1); + fromDate.setDate(fromDate.getDate() - maxDays); logger.info(`Fetching intraday batch for ${symbol}.${exchange} - ${interval} from ${fromDate.toISOString().split('T')[0]} to ${toDate.toISOString().split('T')[0]}`, { symbol, @@ -182,9 +193,9 @@ export async function crawlIntraday( fromDate: fromDate.toISOString(), toDate: toDate.toISOString(), maxDays, - crawlState: { - lastProcessedDate: crawlState.lastProcessedDate, - totalDaysProcessed: crawlState.totalDaysProcessed || 0 + operationStatus: { + lastProcessedDate: operationStatus.lastProcessedDate, + totalDaysProcessed: operationStatus.totalDaysProcessed || 0 } }); @@ -198,47 +209,73 @@ export async function crawlIntraday( country }); - // Update crawl state - const newState: CrawlState = { - ...crawlState, + // Prepare update data + const updateData: any = { + status: 'partial', + recordCount: result.recordsSaved, finished: false, - lastProcessedDate: fromDate, - totalDaysProcessed: (crawlState.totalDaysProcessed || 0) + 1 + lastProcessedDate: fromDate, // Store the fromDate so next batch continues from here + totalDaysProcessed: (operationStatus.totalDaysProcessed || 0) + maxDays }; // Set oldest date reached - if (!newState.oldestDateReached || fromDate < newState.oldestDateReached) { - newState.oldestDateReached = fromDate; + if (!operationStatus.oldestDateReached || fromDate < operationStatus.oldestDateReached) { + updateData.oldestDateReached = fromDate; + } else { + updateData.oldestDateReached = operationStatus.oldestDateReached; } // Set newest date reached - if (!newState.newestDateReached || toDate > newState.newestDateReached) { - newState.newestDateReached = toDate; + if (!operationStatus.newestDateReached || toDate > operationStatus.newestDateReached) { + updateData.newestDateReached = toDate; + } else { + updateData.newestDateReached = operationStatus.newestDateReached; } - // Check if we're finished (no data returned means we've reached the end) - if (result.recordsSaved === 0) { - newState.finished = true; - logger.info(`Intraday crawl finished for ${symbol}.${exchange} - ${interval} (${newState.oldestDateReached?.toISOString().split('T')[0]} to ${newState.newestDateReached?.toISOString().split('T')[0]})`, { + // Check if we're finished + // Only mark as finished if: + // 1. We got no data from the API (empty response) + // 2. We've been crawling for a while and consistently getting no new records + if (result.recordsSaved === 0 && result.recordsFetched === 0) { + // No data returned from API - we've reached the end + updateData.finished = true; + updateData.status = 'success'; + logger.info(`Intraday crawl finished for ${symbol}.${exchange} - ${interval} (no more data available)`, { symbol, exchange, interval, - oldestDate: newState.oldestDateReached?.toISOString(), - newestDate: newState.newestDateReached?.toISOString(), - totalDaysProcessed: newState.totalDaysProcessed, + oldestDate: updateData.oldestDateReached?.toISOString(), + newestDate: updateData.newestDateReached?.toISOString(), + totalDaysProcessed: updateData.totalDaysProcessed, noDataReturned: true }); + } else if (result.recordsSaved === 0 && result.recordsFetched > 0) { + // Data was fetched but all records already exist - continue crawling + logger.info(`All ${result.recordsFetched} records already exist for ${symbol}.${exchange} - ${interval}, continuing crawl`, { + symbol, + exchange, + interval, + fromDate: fromDate.toISOString(), + toDate: toDate.toISOString(), + recordsFetched: result.recordsFetched + }); } - // Update operation tracker with crawl state - await this.operationRegistry.updateOperation('eod', symbol, operationName, { - status: newState.finished ? 'success' : 'partial', + // Update operation tracker + logger.info(`Updating operation tracker for ${symbol}.${exchange} - ${interval}`, { + status: updateData.status, recordCount: result.recordsSaved, - crawlState: newState + lastProcessedDate: updateData.lastProcessedDate, + finished: updateData.finished }); + const eodSearchCode = `${symbol}.${exchange}`; + await this.operationRegistry.updateOperation('eod', eodSearchCode, operationName, updateData); + + logger.info(`Operation tracker updated for ${symbol}.${exchange} - ${interval}`); + // If not finished, schedule next batch - if (!newState.finished) { + if (!updateData.finished) { await this.scheduleOperation('crawl-intraday', { symbol, exchange, @@ -261,14 +298,14 @@ export async function crawlIntraday( currentBatchFrom: fromDate.toISOString(), currentBatchTo: toDate.toISOString(), recordsSaved: result.recordsSaved, - totalDaysProcessed: newState.totalDaysProcessed + totalDaysProcessed: updateData.totalDaysProcessed }); } return { success: true, recordsProcessed: result.recordsSaved, - finished: newState.finished + finished: updateData.finished }; } catch (error) { logger.error('Failed to crawl intraday data', { error, symbol, exchange, interval }); @@ -279,7 +316,7 @@ export async function crawlIntraday( export async function fetchIntraday( this: EodHandler, input: FetchIntradayInput -): Promise<{ success: boolean; recordsSaved: number }> { +): Promise<{ success: boolean; recordsSaved: number; recordsFetched: number }> { const logger = this.logger; const { symbol, exchange, interval, fromDate, toDate, country } = input; @@ -345,7 +382,7 @@ export async function fetchIntraday( if (data.length === 0) { logger.info('No intraday data returned', { symbol, exchange, interval }); - return { success: true, recordsSaved: 0 }; + return { success: true, recordsSaved: 0, recordsFetched: 0 }; } logger.info(`Fetched ${data.length} intraday records for ${symbol}.${exchange} - ${interval}`); @@ -383,7 +420,8 @@ export async function fetchIntraday( return { success: true, - recordsSaved: result.insertedCount + recordsSaved: result.insertedCount, + recordsFetched: data.length }; } catch (error) { logger.error('Failed to fetch intraday data', { error, symbol, exchange, interval }); diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/prices.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/prices.ts index b78cd81..4280ed0 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/actions/prices.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/prices.ts @@ -157,7 +157,8 @@ export async function fetchPrices( ); // Update operation tracker instead of directly updating the symbol - await this.operationRegistry.updateOperation('eod', symbol, 'price_update', { + const eodSearchCode = `${symbol}.${exchange}`; + await this.operationRegistry.updateOperation('eod', eodSearchCode, 'price_update', { status: 'success', lastRecordDate: priceData.length > 0 ? priceData[priceData.length - 1].date : null, recordCount: priceData.length, @@ -177,7 +178,8 @@ export async function fetchPrices( logger.error('Failed to fetch or save prices', { error, symbol, exchange }); // Update operation tracker with failure - await this.operationRegistry.updateOperation('eod', symbol, 'price_update', { + const eodSearchCode = `${symbol}.${exchange}`; + await this.operationRegistry.updateOperation('eod', eodSearchCode, 'price_update', { status: 'failure', error: error.message }); diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/symbols.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/symbols.ts index b7741ae..f3db48a 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/actions/symbols.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/symbols.ts @@ -130,6 +130,7 @@ export async function fetchSymbols( ...symbol, Exchange: symbol.Exchange || exchangeCode, // Keep the original exchange (might be wrong) eodExchange: exchangeCode, // Store the correct exchange code used to fetch this symbol + eodSearchCode: `${symbol.Code}.${exchangeCode}`, // Create unique search code like AAPL.US delisted: delisted, })); diff --git a/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts b/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts index e181481..a574f7f 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts @@ -109,7 +109,9 @@ export class EodHandler extends BaseHandler { * Runs daily at 3 AM */ @Operation('schedule-intraday-crawl') - @ScheduledOperation('schedule-intraday-crawl', '0 3 * * *') + @ScheduledOperation('schedule-intraday-crawl', '0 3 * * *', { + immediately: true, + }) @RateLimit(1) // 1 point for scheduling scheduleIntradayCrawl = scheduleIntradayCrawl; diff --git a/apps/stock/data-ingestion/src/handlers/eod/shared/operation-provider.ts b/apps/stock/data-ingestion/src/handlers/eod/shared/operation-provider.ts index 86d5221..aaf4fe0 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/shared/operation-provider.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/shared/operation-provider.ts @@ -94,7 +94,7 @@ export class EODOperationProvider extends BaseOperationProvider { return { name: 'eod', collectionName: 'eodSymbols', - symbolField: 'Code', + symbolField: 'eodSearchCode', description: 'EOD Historical Data provider' }; } diff --git a/apps/stock/data-ingestion/src/shared/operation-manager/OperationTracker.ts b/apps/stock/data-ingestion/src/shared/operation-manager/OperationTracker.ts index 907e130..b9f076c 100644 --- a/apps/stock/data-ingestion/src/shared/operation-manager/OperationTracker.ts +++ b/apps/stock/data-ingestion/src/shared/operation-manager/OperationTracker.ts @@ -82,19 +82,19 @@ export class OperationTracker { } ]; - // Add crawl state indexes for crawl operations + // Add crawl-specific indexes for crawl operations if (operation.type === 'crawl' || operation.type === 'intraday_crawl') { indexes.push( { - [`operations.${operation.name}.crawlState.finished`]: 1, + [`operations.${operation.name}.finished`]: 1, [symbolField]: 1 }, { - [`operations.${operation.name}.crawlState.newestDateReached`]: 1, + [`operations.${operation.name}.newestDateReached`]: 1, [symbolField]: 1 }, { - [`operations.${operation.name}.crawlState.oldestDateReached`]: 1, + [`operations.${operation.name}.oldestDateReached`]: 1, [symbolField]: 1 } ); @@ -103,10 +103,29 @@ export class OperationTracker { const collection = this.mongodb.collection(collectionName); for (const indexSpec of indexes) { - await collection.createIndex(indexSpec, { - background: true, - name: `op_${operation.name}_${Object.keys(indexSpec).join('_')}` - }); + try { + await collection.createIndex(indexSpec, { + background: true, + name: `op_${operation.name}_${Object.keys(indexSpec).join('_')}` + }); + } catch (indexError) { + // Log but don't fail if index already exists or we hit the limit + if (indexError.message?.includes('too many indexes') || indexError.code === 86) { + this.logger.warn(`Skipping index creation due to limit`, { + operation: operation.name, + index: Object.keys(indexSpec).join('_'), + error: indexError.message + }); + } else if (indexError.code === 11000 || indexError.code === 85) { + // Index already exists + this.logger.debug(`Index already exists`, { + operation: operation.name, + index: Object.keys(indexSpec).join('_') + }); + } else { + throw indexError; + } + } } this.indexesCreated.add(operation.name); @@ -170,40 +189,50 @@ export class OperationTracker { update.$set[`operations.${operationName}.metadata`] = data.metadata; } - if (data.crawlState) { - const existingPath = `operations.${operationName}.crawlState`; - if (data.crawlState.finished !== undefined) { - update.$set[`${existingPath}.finished`] = data.crawlState.finished; - } - if (data.crawlState.oldestDateReached) { - update.$set[`${existingPath}.oldestDateReached`] = data.crawlState.oldestDateReached; - } - if (data.crawlState.newestDateReached) { - update.$set[`${existingPath}.newestDateReached`] = data.crawlState.newestDateReached; - } - if (data.crawlState.lastProcessedDate) { - update.$set[`${existingPath}.lastProcessedDate`] = data.crawlState.lastProcessedDate; - } - if (data.crawlState.totalDaysProcessed !== undefined) { - update.$set[`${existingPath}.totalDaysProcessed`] = data.crawlState.totalDaysProcessed; - } - if (data.crawlState.lastCrawlDirection) { - update.$set[`${existingPath}.lastCrawlDirection`] = data.crawlState.lastCrawlDirection; - } - if (data.crawlState.targetOldestDate) { - update.$set[`${existingPath}.targetOldestDate`] = data.crawlState.targetOldestDate; - } - if (data.crawlState.metadata) { - update.$set[`${existingPath}.metadata`] = data.crawlState.metadata; - } + // Handle crawl-specific fields + if (data.finished !== undefined) { + update.$set[`operations.${operationName}.finished`] = data.finished; + } + if (data.oldestDateReached) { + update.$set[`operations.${operationName}.oldestDateReached`] = data.oldestDateReached; + } + if (data.newestDateReached) { + update.$set[`operations.${operationName}.newestDateReached`] = data.newestDateReached; + } + if (data.lastProcessedDate) { + update.$set[`operations.${operationName}.lastProcessedDate`] = data.lastProcessedDate; + } + if (data.totalDaysProcessed !== undefined) { + update.$set[`operations.${operationName}.totalDaysProcessed`] = data.totalDaysProcessed; + } + if (data.lastCrawlDirection) { + update.$set[`operations.${operationName}.lastCrawlDirection`] = data.lastCrawlDirection; + } + if (data.targetOldestDate) { + update.$set[`operations.${operationName}.targetOldestDate`] = data.targetOldestDate; } - await this.mongodb.updateOne( + this.logger.debug('Updating symbol operation with filter', { + collectionName, + filter: { [symbolField]: symbol }, + symbolField, + symbol, + operation: operationName + }); + + const updateResult = await this.mongodb.updateOne( collectionName, { [symbolField]: symbol }, update ); + this.logger.debug('Update result', { + matched: updateResult.matchedCount, + modified: updateResult.modifiedCount, + symbol, + operation: operationName + }); + // Call after hook await this.provider.afterOperationUpdate(symbol, operationName, data); @@ -268,18 +297,27 @@ export class OperationTracker { update.$set[`operations.${operation}.metadata`] = data.metadata; } - if (data.crawlState) { - const basePath = `operations.${operation}.crawlState`; - Object.entries(data.crawlState).forEach(([key, value]) => { - if (value !== undefined) { - // Handle Date objects properly - if (value instanceof Date || (typeof value === 'string' && key.includes('Date'))) { - update.$set[`${basePath}.${key}`] = new Date(value); - } else { - update.$set[`${basePath}.${key}`] = value; - } - } - }); + // Handle crawl-specific fields + if (data.finished !== undefined) { + update.$set[`operations.${operation}.finished`] = data.finished; + } + if (data.oldestDateReached) { + update.$set[`operations.${operation}.oldestDateReached`] = data.oldestDateReached; + } + if (data.newestDateReached) { + update.$set[`operations.${operation}.newestDateReached`] = data.newestDateReached; + } + if (data.lastProcessedDate) { + update.$set[`operations.${operation}.lastProcessedDate`] = data.lastProcessedDate; + } + if (data.totalDaysProcessed !== undefined) { + update.$set[`operations.${operation}.totalDaysProcessed`] = data.totalDaysProcessed; + } + if (data.lastCrawlDirection) { + update.$set[`operations.${operation}.lastCrawlDirection`] = data.lastCrawlDirection; + } + if (data.targetOldestDate) { + update.$set[`operations.${operation}.targetOldestDate`] = data.targetOldestDate; } return { @@ -348,7 +386,7 @@ export class OperationTracker { // Add symbol filter if provided if (symbolFilter?.symbol) { - filter.symbol = symbolFilter.symbol; + filter[symbolField] = symbolFilter.symbol; } const symbols = await this.mongodb.find(collectionName, filter, { @@ -385,7 +423,7 @@ export class OperationTracker { }; if (!includeFinished) { - filter[`operations.${operationName}.crawlState.finished`] = { $ne: true }; + filter[`operations.${operationName}.finished`] = { $ne: true }; } const symbols = await this.mongodb.find(collectionName, filter, { @@ -417,11 +455,9 @@ export class OperationTracker { ): Promise { await this.updateSymbolOperation(symbol, operationName, { status: 'success', - crawlState: { - finished: true, - oldestDateReached, - newestDateReached: newestDateReached || new Date() - } + finished: true, + oldestDateReached, + newestDateReached: newestDateReached || new Date() }); this.logger.info('Marked crawl as finished', { @@ -457,7 +493,7 @@ export class OperationTracker { // Add symbol filter if provided if (symbolFilter?.symbol) { - filter.symbol = symbolFilter.symbol; + filter[symbolField] = symbolFilter.symbol; } // Get all symbols that either: @@ -466,7 +502,7 @@ export class OperationTracker { // 3. Have gaps (new data since last crawl) const orConditions = [ { [`operations.${operationName}`]: { $exists: false } }, - { [`operations.${operationName}.crawlState.finished`]: { $ne: true } } + { [`operations.${operationName}.finished`]: { $ne: true } } ]; if (includeNewDataGaps) { @@ -475,7 +511,7 @@ export class OperationTracker { yesterday.setHours(0, 0, 0, 0); orConditions.push({ - [`operations.${operationName}.crawlState.newestDateReached`]: { $lt: yesterday } + [`operations.${operationName}.newestDateReached`]: { $lt: yesterday } }); } @@ -498,17 +534,16 @@ export class OperationTracker { return symbols.map(doc => { const opStatus = doc.operations?.[operationName]; - const crawlState = opStatus?.crawlState; // Determine gaps (only backward since we removed forward crawling) const gaps: { forward?: boolean; backward?: boolean } = {}; - if (crawlState) { + if (opStatus) { // Only check for backward gap (historical data) - if (!crawlState.finished) { + if (!opStatus.finished) { gaps.backward = true; - if (targetOldestDate && crawlState.oldestDateReached) { - gaps.backward = new Date(crawlState.oldestDateReached) > targetOldestDate; + if (targetOldestDate && opStatus.oldestDateReached) { + gaps.backward = new Date(opStatus.oldestDateReached) > targetOldestDate; } } } else { @@ -541,23 +576,23 @@ export class OperationTracker { const doc = await this.mongodb.findOne(collectionName, { [symbolField]: symbol }, { - projection: { [`operations.${operationName}.crawlState`]: 1 } + projection: { [`operations.${operationName}`]: 1 } }); - if (!doc?.operations?.[operationName]?.crawlState) { + if (!doc?.operations?.[operationName]) { return false; } - const crawlState = doc.operations[operationName].crawlState; + const opStatus = doc.operations[operationName]; // Check if explicitly marked as finished - if (crawlState.finished) { + if (opStatus.finished) { return true; } // Check if we've reached the target oldest date - if (crawlState.oldestDateReached && targetOldestDate) { - return new Date(crawlState.oldestDateReached) <= targetOldestDate; + if (opStatus.oldestDateReached && targetOldestDate) { + return new Date(opStatus.oldestDateReached) <= targetOldestDate; } return false; @@ -655,7 +690,7 @@ export class OperationTracker { // Crawl stats (if applicable) (operation.type === 'crawl' || operation.type === 'intraday_crawl') ? collection.countDocuments({ - [`operations.${operationName}.crawlState.finished`]: true + [`operations.${operationName}.finished`]: true }) : Promise.resolve(undefined), diff --git a/apps/stock/data-ingestion/src/shared/operation-manager/types.ts b/apps/stock/data-ingestion/src/shared/operation-manager/types.ts index d384538..13ab9d7 100644 --- a/apps/stock/data-ingestion/src/shared/operation-manager/types.ts +++ b/apps/stock/data-ingestion/src/shared/operation-manager/types.ts @@ -42,16 +42,10 @@ export interface OperationStatus { error?: string; /** Provider-specific metadata */ metadata?: Record; - /** Crawl-specific state */ - crawlState?: CrawlState; -} - -/** - * State for crawl-type operations - */ -export interface CrawlState { + + // Crawl-specific fields (used for crawl and intraday_crawl operations) /** Whether the crawl has completed */ - finished: boolean; + finished?: boolean; /** Oldest date reached during crawl */ oldestDateReached?: Date; /** Newest date reached during crawl */ @@ -64,10 +58,10 @@ export interface CrawlState { lastCrawlDirection?: 'forward' | 'backward'; /** Target oldest date to reach */ targetOldestDate?: Date; - /** Custom crawl metadata */ - metadata?: Record; } +// CrawlState interface removed - fields moved directly to OperationStatus + /** * Configuration for a data provider */ @@ -94,10 +88,24 @@ export interface OperationUpdate { recordCount?: number; /** Error message for failures */ error?: string; - /** Crawl state for crawl operations */ - crawlState?: Partial; /** Additional metadata */ metadata?: Record; + + // Crawl-specific update fields + /** Whether the crawl has completed */ + finished?: boolean; + /** Oldest date reached during crawl */ + oldestDateReached?: Date; + /** Newest date reached during crawl */ + newestDateReached?: Date; + /** Last date that was processed (for resumption) */ + lastProcessedDate?: Date; + /** Total days processed so far */ + totalDaysProcessed?: number; + /** Direction of last crawl */ + lastCrawlDirection?: 'forward' | 'backward'; + /** Target oldest date to reach */ + targetOldestDate?: Date; } /** diff --git a/bun.lock b/bun.lock index ae9f5d7..8c120f7 100644 --- a/bun.lock +++ b/bun.lock @@ -25,6 +25,7 @@ "eslint-plugin-node": "^11.1.0", "eslint-plugin-promise": "^7.2.1", "knip": "^5.61.2", + "mongodb": "^6.17.0", "mongodb-mcp-server": "^0.1.1", "mongodb-memory-server": "^9.1.6", "pg-mem": "^2.8.1", diff --git a/package.json b/package.json index 4f3342a..d0e3cba 100644 --- a/package.json +++ b/package.json @@ -89,6 +89,7 @@ "eslint-plugin-node": "^11.1.0", "eslint-plugin-promise": "^7.2.1", "knip": "^5.61.2", + "mongodb": "^6.17.0", "mongodb-mcp-server": "^0.1.1", "mongodb-memory-server": "^9.1.6", "pg-mem": "^2.8.1",