changed up operationTracker and added eodSearchCode

This commit is contained in:
Boki 2025-07-10 08:45:06 -04:00
parent 18289f0a04
commit 7486a1fa65
11 changed files with 237 additions and 143 deletions

View file

@ -209,7 +209,8 @@ export async function fetchCorporateActions(
// Update operation tracker based on action type // Update operation tracker based on action type
const operationName = actionType === 'dividends' ? 'dividends_update' : 'splits_update'; const operationName = actionType === 'dividends' ? 'dividends_update' : 'splits_update';
await this.operationRegistry.updateOperation('eod', symbol, operationName, { const eodSearchCode = `${symbol}.${exchange}`;
await this.operationRegistry.updateOperation('eod', eodSearchCode, operationName, {
status: 'success', status: 'success',
recordCount: result.insertedCount, recordCount: result.insertedCount,
metadata: { metadata: {
@ -230,7 +231,8 @@ export async function fetchCorporateActions(
// Update operation tracker with failure // Update operation tracker with failure
const operationName = actionType === 'dividends' ? 'dividends_update' : 'splits_update'; const operationName = actionType === 'dividends' ? 'dividends_update' : 'splits_update';
await this.operationRegistry.updateOperation('eod', symbol, operationName, { const eodSearchCode = `${symbol}.${exchange}`;
await this.operationRegistry.updateOperation('eod', eodSearchCode, operationName, {
status: 'failure', status: 'failure',
error: error.message error: error.message
}); });

View file

@ -220,16 +220,17 @@ export async function fetchBulkFundamentals(
logger.info(`Saved ${result.insertedCount} fundamentals records for ${exchange}`); logger.info(`Saved ${result.insertedCount} fundamentals records for ${exchange}`);
// Update operation tracker for each symbol // Update operation tracker for each symbol
const updatePromises = symbolsToUpdate.map(({ symbol, exchange }) => const updatePromises = symbolsToUpdate.map(({ symbol, exchange }) => {
this.operationRegistry.updateOperation('eod', symbol, 'fundamentals_update', { const eodSearchCode = `${symbol}.${exchange}`;
return this.operationRegistry.updateOperation('eod', eodSearchCode, 'fundamentals_update', {
status: 'success', status: 'success',
recordCount: 1, recordCount: 1,
metadata: { metadata: {
hasData: true, hasData: true,
exchange: exchange exchange: exchange
} }
}) });
); });
await Promise.all(updatePromises); await Promise.all(updatePromises);
totalProcessed += fundamentalsToSave.length; totalProcessed += fundamentalsToSave.length;
@ -246,12 +247,13 @@ export async function fetchBulkFundamentals(
logger.error('Failed to fetch bulk fundamentals', { error }); logger.error('Failed to fetch bulk fundamentals', { error });
// Mark all symbols as failed // Mark all symbols as failed
const failPromises = input.symbols.map(({ symbol }) => const failPromises = input.symbols.map(({ symbol, exchange }) => {
this.operationRegistry.updateOperation('eod', symbol, 'fundamentals_update', { const eodSearchCode = `${symbol}.${exchange}`;
return this.operationRegistry.updateOperation('eod', eodSearchCode, 'fundamentals_update', {
status: 'failure', status: 'failure',
error: error.message error: error.message
}) });
); });
await Promise.all(failPromises); await Promise.all(failPromises);
throw error; throw error;
@ -329,7 +331,8 @@ export async function fetchSingleFundamentals(
); );
// Update operation tracker // Update operation tracker
await this.operationRegistry.updateOperation('eod', symbol, 'fundamentals_update', { const eodSearchCode = `${symbol}.${exchange}`;
await this.operationRegistry.updateOperation('eod', eodSearchCode, 'fundamentals_update', {
status: 'success', status: 'success',
recordCount: result.insertedCount, recordCount: result.insertedCount,
metadata: { metadata: {
@ -349,7 +352,8 @@ export async function fetchSingleFundamentals(
logger.error('Failed to fetch single fundamentals', { error, symbol, exchange }); logger.error('Failed to fetch single fundamentals', { error, symbol, exchange });
// Update operation tracker with failure // Update operation tracker with failure
await this.operationRegistry.updateOperation('eod', symbol, 'fundamentals_update', { const eodSearchCode = `${symbol}.${exchange}`;
await this.operationRegistry.updateOperation('eod', eodSearchCode, 'fundamentals_update', {
status: 'failure', status: 'failure',
error: error.message error: error.message
}); });

View file

@ -1,5 +1,4 @@
import type { CrawlState } from '../../../shared/operation-manager/types';
import type { EodHandler } from '../eod.handler'; import type { EodHandler } from '../eod.handler';
import { EOD_CONFIG } from '../shared'; import { EOD_CONFIG } from '../shared';
@ -19,7 +18,6 @@ interface CrawlIntradayInput {
country?: string; country?: string;
} }
// CrawlState is imported from operation-manager types
// Max days per interval based on EOD limits // Max days per interval based on EOD limits
const MAX_DAYS_PER_INTERVAL = { const MAX_DAYS_PER_INTERVAL = {
@ -47,16 +45,18 @@ export async function scheduleIntradayCrawl(
const interval = intervals[i]; const interval = intervals[i];
const operationName = operationNames[i]!; // Non-null assertion since we know the array has 3 elements const operationName = operationNames[i]!; // Non-null assertion since we know the array has 3 elements
const symbolsForInterval = await this.operationRegistry.getStaleSymbols('eod', operationName, { // For intraday, we want to check even finished crawls for new data
limit: 1000, // Get more to filter // So we'll query the symbols directly
symbolFilter: { symbol: 'AAPL' } // Filter for AAPL only const symbolsForInterval = await this.mongodb.collection('eodSymbols').find({
}); eodSearchCode: 'AAPL.US',
delisted: false
}).toArray();
// Filter out delisted symbols and ensure we get AAPL with US exchange // Filter out delisted symbols and ensure we get AAPL with US exchange
const activeSymbols = symbolsForInterval.filter(item => const activeSymbols = symbolsForInterval.filter(item =>
item.symbol.delisted === false && item.symbol.delisted === false &&
item.symbol.Code === 'AAPL' && item.symbol.Code === 'AAPL' &&
(item.symbol.eodExchange === 'US' || item.symbol.Exchange === 'US') item.symbol.eodExchange === 'US'
); );
// Add interval info to each symbol // Add interval info to each symbol
@ -153,27 +153,38 @@ export async function crawlIntraday(
throw new Error(`Symbol ${symbol}.${exchange} not found`); throw new Error(`Symbol ${symbol}.${exchange} not found`);
} }
logger.debug('Found symbol document', {
symbol,
exchange,
hasOperations: !!symbolDoc.operations,
operationKeys: Object.keys(symbolDoc.operations || {})
});
// Get operation status from tracker // Get operation status from tracker
const operationName = `intraday_${interval}`; const operationName = `intraday_${interval}`;
const operationStatus = symbolDoc.operations?.[operationName]; const operationStatus = symbolDoc.operations?.[operationName] || {};
const crawlState: CrawlState = operationStatus?.crawlState || {
finished: false logger.info(`Current crawl state for ${symbol}.${exchange} - ${interval}`, {
}; hasOperationStatus: !!symbolDoc.operations?.[operationName],
operationStatus: operationStatus,
lastProcessedDate: operationStatus.lastProcessedDate,
finished: operationStatus.finished
});
// Determine date range for this batch // Determine date range for this batch
const maxDays = MAX_DAYS_PER_INTERVAL[interval]; const maxDays = MAX_DAYS_PER_INTERVAL[interval];
let toDate = new Date(); let toDate = new Date();
let fromDate = new Date(); let fromDate = new Date();
if (crawlState.lastProcessedDate) { if (operationStatus.lastProcessedDate) {
// Continue from where we left off // Continue from where we left off - the last processed date becomes the new toDate
toDate = new Date(crawlState.lastProcessedDate); toDate = new Date(operationStatus.lastProcessedDate);
toDate.setDate(toDate.getDate() - 1); // Start from day before last processed // No need to subtract a day - lastProcessedDate is the fromDate of the last batch
} }
// Calculate from date (going backwards) // Calculate from date (going backwards)
fromDate = new Date(toDate); fromDate = new Date(toDate);
fromDate.setDate(fromDate.getDate() - maxDays + 1); fromDate.setDate(fromDate.getDate() - maxDays);
logger.info(`Fetching intraday batch for ${symbol}.${exchange} - ${interval} from ${fromDate.toISOString().split('T')[0]} to ${toDate.toISOString().split('T')[0]}`, { logger.info(`Fetching intraday batch for ${symbol}.${exchange} - ${interval} from ${fromDate.toISOString().split('T')[0]} to ${toDate.toISOString().split('T')[0]}`, {
symbol, symbol,
@ -182,9 +193,9 @@ export async function crawlIntraday(
fromDate: fromDate.toISOString(), fromDate: fromDate.toISOString(),
toDate: toDate.toISOString(), toDate: toDate.toISOString(),
maxDays, maxDays,
crawlState: { operationStatus: {
lastProcessedDate: crawlState.lastProcessedDate, lastProcessedDate: operationStatus.lastProcessedDate,
totalDaysProcessed: crawlState.totalDaysProcessed || 0 totalDaysProcessed: operationStatus.totalDaysProcessed || 0
} }
}); });
@ -198,47 +209,73 @@ export async function crawlIntraday(
country country
}); });
// Update crawl state // Prepare update data
const newState: CrawlState = { const updateData: any = {
...crawlState, status: 'partial',
recordCount: result.recordsSaved,
finished: false, finished: false,
lastProcessedDate: fromDate, lastProcessedDate: fromDate, // Store the fromDate so next batch continues from here
totalDaysProcessed: (crawlState.totalDaysProcessed || 0) + 1 totalDaysProcessed: (operationStatus.totalDaysProcessed || 0) + maxDays
}; };
// Set oldest date reached // Set oldest date reached
if (!newState.oldestDateReached || fromDate < newState.oldestDateReached) { if (!operationStatus.oldestDateReached || fromDate < operationStatus.oldestDateReached) {
newState.oldestDateReached = fromDate; updateData.oldestDateReached = fromDate;
} else {
updateData.oldestDateReached = operationStatus.oldestDateReached;
} }
// Set newest date reached // Set newest date reached
if (!newState.newestDateReached || toDate > newState.newestDateReached) { if (!operationStatus.newestDateReached || toDate > operationStatus.newestDateReached) {
newState.newestDateReached = toDate; updateData.newestDateReached = toDate;
} else {
updateData.newestDateReached = operationStatus.newestDateReached;
} }
// Check if we're finished (no data returned means we've reached the end) // Check if we're finished
if (result.recordsSaved === 0) { // Only mark as finished if:
newState.finished = true; // 1. We got no data from the API (empty response)
logger.info(`Intraday crawl finished for ${symbol}.${exchange} - ${interval} (${newState.oldestDateReached?.toISOString().split('T')[0]} to ${newState.newestDateReached?.toISOString().split('T')[0]})`, { // 2. We've been crawling for a while and consistently getting no new records
if (result.recordsSaved === 0 && result.recordsFetched === 0) {
// No data returned from API - we've reached the end
updateData.finished = true;
updateData.status = 'success';
logger.info(`Intraday crawl finished for ${symbol}.${exchange} - ${interval} (no more data available)`, {
symbol, symbol,
exchange, exchange,
interval, interval,
oldestDate: newState.oldestDateReached?.toISOString(), oldestDate: updateData.oldestDateReached?.toISOString(),
newestDate: newState.newestDateReached?.toISOString(), newestDate: updateData.newestDateReached?.toISOString(),
totalDaysProcessed: newState.totalDaysProcessed, totalDaysProcessed: updateData.totalDaysProcessed,
noDataReturned: true noDataReturned: true
}); });
} else if (result.recordsSaved === 0 && result.recordsFetched > 0) {
// Data was fetched but all records already exist - continue crawling
logger.info(`All ${result.recordsFetched} records already exist for ${symbol}.${exchange} - ${interval}, continuing crawl`, {
symbol,
exchange,
interval,
fromDate: fromDate.toISOString(),
toDate: toDate.toISOString(),
recordsFetched: result.recordsFetched
});
} }
// Update operation tracker with crawl state // Update operation tracker
await this.operationRegistry.updateOperation('eod', symbol, operationName, { logger.info(`Updating operation tracker for ${symbol}.${exchange} - ${interval}`, {
status: newState.finished ? 'success' : 'partial', status: updateData.status,
recordCount: result.recordsSaved, recordCount: result.recordsSaved,
crawlState: newState lastProcessedDate: updateData.lastProcessedDate,
finished: updateData.finished
}); });
const eodSearchCode = `${symbol}.${exchange}`;
await this.operationRegistry.updateOperation('eod', eodSearchCode, operationName, updateData);
logger.info(`Operation tracker updated for ${symbol}.${exchange} - ${interval}`);
// If not finished, schedule next batch // If not finished, schedule next batch
if (!newState.finished) { if (!updateData.finished) {
await this.scheduleOperation('crawl-intraday', { await this.scheduleOperation('crawl-intraday', {
symbol, symbol,
exchange, exchange,
@ -261,14 +298,14 @@ export async function crawlIntraday(
currentBatchFrom: fromDate.toISOString(), currentBatchFrom: fromDate.toISOString(),
currentBatchTo: toDate.toISOString(), currentBatchTo: toDate.toISOString(),
recordsSaved: result.recordsSaved, recordsSaved: result.recordsSaved,
totalDaysProcessed: newState.totalDaysProcessed totalDaysProcessed: updateData.totalDaysProcessed
}); });
} }
return { return {
success: true, success: true,
recordsProcessed: result.recordsSaved, recordsProcessed: result.recordsSaved,
finished: newState.finished finished: updateData.finished
}; };
} catch (error) { } catch (error) {
logger.error('Failed to crawl intraday data', { error, symbol, exchange, interval }); logger.error('Failed to crawl intraday data', { error, symbol, exchange, interval });
@ -279,7 +316,7 @@ export async function crawlIntraday(
export async function fetchIntraday( export async function fetchIntraday(
this: EodHandler, this: EodHandler,
input: FetchIntradayInput input: FetchIntradayInput
): Promise<{ success: boolean; recordsSaved: number }> { ): Promise<{ success: boolean; recordsSaved: number; recordsFetched: number }> {
const logger = this.logger; const logger = this.logger;
const { symbol, exchange, interval, fromDate, toDate, country } = input; const { symbol, exchange, interval, fromDate, toDate, country } = input;
@ -345,7 +382,7 @@ export async function fetchIntraday(
if (data.length === 0) { if (data.length === 0) {
logger.info('No intraday data returned', { symbol, exchange, interval }); logger.info('No intraday data returned', { symbol, exchange, interval });
return { success: true, recordsSaved: 0 }; return { success: true, recordsSaved: 0, recordsFetched: 0 };
} }
logger.info(`Fetched ${data.length} intraday records for ${symbol}.${exchange} - ${interval}`); logger.info(`Fetched ${data.length} intraday records for ${symbol}.${exchange} - ${interval}`);
@ -383,7 +420,8 @@ export async function fetchIntraday(
return { return {
success: true, success: true,
recordsSaved: result.insertedCount recordsSaved: result.insertedCount,
recordsFetched: data.length
}; };
} catch (error) { } catch (error) {
logger.error('Failed to fetch intraday data', { error, symbol, exchange, interval }); logger.error('Failed to fetch intraday data', { error, symbol, exchange, interval });

View file

@ -157,7 +157,8 @@ export async function fetchPrices(
); );
// Update operation tracker instead of directly updating the symbol // Update operation tracker instead of directly updating the symbol
await this.operationRegistry.updateOperation('eod', symbol, 'price_update', { const eodSearchCode = `${symbol}.${exchange}`;
await this.operationRegistry.updateOperation('eod', eodSearchCode, 'price_update', {
status: 'success', status: 'success',
lastRecordDate: priceData.length > 0 ? priceData[priceData.length - 1].date : null, lastRecordDate: priceData.length > 0 ? priceData[priceData.length - 1].date : null,
recordCount: priceData.length, recordCount: priceData.length,
@ -177,7 +178,8 @@ export async function fetchPrices(
logger.error('Failed to fetch or save prices', { error, symbol, exchange }); logger.error('Failed to fetch or save prices', { error, symbol, exchange });
// Update operation tracker with failure // Update operation tracker with failure
await this.operationRegistry.updateOperation('eod', symbol, 'price_update', { const eodSearchCode = `${symbol}.${exchange}`;
await this.operationRegistry.updateOperation('eod', eodSearchCode, 'price_update', {
status: 'failure', status: 'failure',
error: error.message error: error.message
}); });

View file

@ -130,6 +130,7 @@ export async function fetchSymbols(
...symbol, ...symbol,
Exchange: symbol.Exchange || exchangeCode, // Keep the original exchange (might be wrong) Exchange: symbol.Exchange || exchangeCode, // Keep the original exchange (might be wrong)
eodExchange: exchangeCode, // Store the correct exchange code used to fetch this symbol eodExchange: exchangeCode, // Store the correct exchange code used to fetch this symbol
eodSearchCode: `${symbol.Code}.${exchangeCode}`, // Create unique search code like AAPL.US
delisted: delisted, delisted: delisted,
})); }));

View file

@ -109,7 +109,9 @@ export class EodHandler extends BaseHandler<DataIngestionServices> {
* Runs daily at 3 AM * Runs daily at 3 AM
*/ */
@Operation('schedule-intraday-crawl') @Operation('schedule-intraday-crawl')
@ScheduledOperation('schedule-intraday-crawl', '0 3 * * *') @ScheduledOperation('schedule-intraday-crawl', '0 3 * * *', {
immediately: true,
})
@RateLimit(1) // 1 point for scheduling @RateLimit(1) // 1 point for scheduling
scheduleIntradayCrawl = scheduleIntradayCrawl; scheduleIntradayCrawl = scheduleIntradayCrawl;

View file

@ -94,7 +94,7 @@ export class EODOperationProvider extends BaseOperationProvider {
return { return {
name: 'eod', name: 'eod',
collectionName: 'eodSymbols', collectionName: 'eodSymbols',
symbolField: 'Code', symbolField: 'eodSearchCode',
description: 'EOD Historical Data provider' description: 'EOD Historical Data provider'
}; };
} }

View file

@ -82,19 +82,19 @@ export class OperationTracker {
} }
]; ];
// Add crawl state indexes for crawl operations // Add crawl-specific indexes for crawl operations
if (operation.type === 'crawl' || operation.type === 'intraday_crawl') { if (operation.type === 'crawl' || operation.type === 'intraday_crawl') {
indexes.push( indexes.push(
{ {
[`operations.${operation.name}.crawlState.finished`]: 1, [`operations.${operation.name}.finished`]: 1,
[symbolField]: 1 [symbolField]: 1
}, },
{ {
[`operations.${operation.name}.crawlState.newestDateReached`]: 1, [`operations.${operation.name}.newestDateReached`]: 1,
[symbolField]: 1 [symbolField]: 1
}, },
{ {
[`operations.${operation.name}.crawlState.oldestDateReached`]: 1, [`operations.${operation.name}.oldestDateReached`]: 1,
[symbolField]: 1 [symbolField]: 1
} }
); );
@ -103,10 +103,29 @@ export class OperationTracker {
const collection = this.mongodb.collection(collectionName); const collection = this.mongodb.collection(collectionName);
for (const indexSpec of indexes) { for (const indexSpec of indexes) {
await collection.createIndex(indexSpec, { try {
background: true, await collection.createIndex(indexSpec, {
name: `op_${operation.name}_${Object.keys(indexSpec).join('_')}` background: true,
}); name: `op_${operation.name}_${Object.keys(indexSpec).join('_')}`
});
} catch (indexError) {
// Log but don't fail if index already exists or we hit the limit
if (indexError.message?.includes('too many indexes') || indexError.code === 86) {
this.logger.warn(`Skipping index creation due to limit`, {
operation: operation.name,
index: Object.keys(indexSpec).join('_'),
error: indexError.message
});
} else if (indexError.code === 11000 || indexError.code === 85) {
// Index already exists
this.logger.debug(`Index already exists`, {
operation: operation.name,
index: Object.keys(indexSpec).join('_')
});
} else {
throw indexError;
}
}
} }
this.indexesCreated.add(operation.name); this.indexesCreated.add(operation.name);
@ -170,40 +189,50 @@ export class OperationTracker {
update.$set[`operations.${operationName}.metadata`] = data.metadata; update.$set[`operations.${operationName}.metadata`] = data.metadata;
} }
if (data.crawlState) { // Handle crawl-specific fields
const existingPath = `operations.${operationName}.crawlState`; if (data.finished !== undefined) {
if (data.crawlState.finished !== undefined) { update.$set[`operations.${operationName}.finished`] = data.finished;
update.$set[`${existingPath}.finished`] = data.crawlState.finished; }
} if (data.oldestDateReached) {
if (data.crawlState.oldestDateReached) { update.$set[`operations.${operationName}.oldestDateReached`] = data.oldestDateReached;
update.$set[`${existingPath}.oldestDateReached`] = data.crawlState.oldestDateReached; }
} if (data.newestDateReached) {
if (data.crawlState.newestDateReached) { update.$set[`operations.${operationName}.newestDateReached`] = data.newestDateReached;
update.$set[`${existingPath}.newestDateReached`] = data.crawlState.newestDateReached; }
} if (data.lastProcessedDate) {
if (data.crawlState.lastProcessedDate) { update.$set[`operations.${operationName}.lastProcessedDate`] = data.lastProcessedDate;
update.$set[`${existingPath}.lastProcessedDate`] = data.crawlState.lastProcessedDate; }
} if (data.totalDaysProcessed !== undefined) {
if (data.crawlState.totalDaysProcessed !== undefined) { update.$set[`operations.${operationName}.totalDaysProcessed`] = data.totalDaysProcessed;
update.$set[`${existingPath}.totalDaysProcessed`] = data.crawlState.totalDaysProcessed; }
} if (data.lastCrawlDirection) {
if (data.crawlState.lastCrawlDirection) { update.$set[`operations.${operationName}.lastCrawlDirection`] = data.lastCrawlDirection;
update.$set[`${existingPath}.lastCrawlDirection`] = data.crawlState.lastCrawlDirection; }
} if (data.targetOldestDate) {
if (data.crawlState.targetOldestDate) { update.$set[`operations.${operationName}.targetOldestDate`] = data.targetOldestDate;
update.$set[`${existingPath}.targetOldestDate`] = data.crawlState.targetOldestDate;
}
if (data.crawlState.metadata) {
update.$set[`${existingPath}.metadata`] = data.crawlState.metadata;
}
} }
await this.mongodb.updateOne( this.logger.debug('Updating symbol operation with filter', {
collectionName,
filter: { [symbolField]: symbol },
symbolField,
symbol,
operation: operationName
});
const updateResult = await this.mongodb.updateOne(
collectionName, collectionName,
{ [symbolField]: symbol }, { [symbolField]: symbol },
update update
); );
this.logger.debug('Update result', {
matched: updateResult.matchedCount,
modified: updateResult.modifiedCount,
symbol,
operation: operationName
});
// Call after hook // Call after hook
await this.provider.afterOperationUpdate(symbol, operationName, data); await this.provider.afterOperationUpdate(symbol, operationName, data);
@ -268,18 +297,27 @@ export class OperationTracker {
update.$set[`operations.${operation}.metadata`] = data.metadata; update.$set[`operations.${operation}.metadata`] = data.metadata;
} }
if (data.crawlState) { // Handle crawl-specific fields
const basePath = `operations.${operation}.crawlState`; if (data.finished !== undefined) {
Object.entries(data.crawlState).forEach(([key, value]) => { update.$set[`operations.${operation}.finished`] = data.finished;
if (value !== undefined) { }
// Handle Date objects properly if (data.oldestDateReached) {
if (value instanceof Date || (typeof value === 'string' && key.includes('Date'))) { update.$set[`operations.${operation}.oldestDateReached`] = data.oldestDateReached;
update.$set[`${basePath}.${key}`] = new Date(value); }
} else { if (data.newestDateReached) {
update.$set[`${basePath}.${key}`] = value; update.$set[`operations.${operation}.newestDateReached`] = data.newestDateReached;
} }
} if (data.lastProcessedDate) {
}); update.$set[`operations.${operation}.lastProcessedDate`] = data.lastProcessedDate;
}
if (data.totalDaysProcessed !== undefined) {
update.$set[`operations.${operation}.totalDaysProcessed`] = data.totalDaysProcessed;
}
if (data.lastCrawlDirection) {
update.$set[`operations.${operation}.lastCrawlDirection`] = data.lastCrawlDirection;
}
if (data.targetOldestDate) {
update.$set[`operations.${operation}.targetOldestDate`] = data.targetOldestDate;
} }
return { return {
@ -348,7 +386,7 @@ export class OperationTracker {
// Add symbol filter if provided // Add symbol filter if provided
if (symbolFilter?.symbol) { if (symbolFilter?.symbol) {
filter.symbol = symbolFilter.symbol; filter[symbolField] = symbolFilter.symbol;
} }
const symbols = await this.mongodb.find(collectionName, filter, { const symbols = await this.mongodb.find(collectionName, filter, {
@ -385,7 +423,7 @@ export class OperationTracker {
}; };
if (!includeFinished) { if (!includeFinished) {
filter[`operations.${operationName}.crawlState.finished`] = { $ne: true }; filter[`operations.${operationName}.finished`] = { $ne: true };
} }
const symbols = await this.mongodb.find(collectionName, filter, { const symbols = await this.mongodb.find(collectionName, filter, {
@ -417,11 +455,9 @@ export class OperationTracker {
): Promise<void> { ): Promise<void> {
await this.updateSymbolOperation(symbol, operationName, { await this.updateSymbolOperation(symbol, operationName, {
status: 'success', status: 'success',
crawlState: { finished: true,
finished: true, oldestDateReached,
oldestDateReached, newestDateReached: newestDateReached || new Date()
newestDateReached: newestDateReached || new Date()
}
}); });
this.logger.info('Marked crawl as finished', { this.logger.info('Marked crawl as finished', {
@ -457,7 +493,7 @@ export class OperationTracker {
// Add symbol filter if provided // Add symbol filter if provided
if (symbolFilter?.symbol) { if (symbolFilter?.symbol) {
filter.symbol = symbolFilter.symbol; filter[symbolField] = symbolFilter.symbol;
} }
// Get all symbols that either: // Get all symbols that either:
@ -466,7 +502,7 @@ export class OperationTracker {
// 3. Have gaps (new data since last crawl) // 3. Have gaps (new data since last crawl)
const orConditions = [ const orConditions = [
{ [`operations.${operationName}`]: { $exists: false } }, { [`operations.${operationName}`]: { $exists: false } },
{ [`operations.${operationName}.crawlState.finished`]: { $ne: true } } { [`operations.${operationName}.finished`]: { $ne: true } }
]; ];
if (includeNewDataGaps) { if (includeNewDataGaps) {
@ -475,7 +511,7 @@ export class OperationTracker {
yesterday.setHours(0, 0, 0, 0); yesterday.setHours(0, 0, 0, 0);
orConditions.push({ orConditions.push({
[`operations.${operationName}.crawlState.newestDateReached`]: { $lt: yesterday } [`operations.${operationName}.newestDateReached`]: { $lt: yesterday }
}); });
} }
@ -498,17 +534,16 @@ export class OperationTracker {
return symbols.map(doc => { return symbols.map(doc => {
const opStatus = doc.operations?.[operationName]; const opStatus = doc.operations?.[operationName];
const crawlState = opStatus?.crawlState;
// Determine gaps (only backward since we removed forward crawling) // Determine gaps (only backward since we removed forward crawling)
const gaps: { forward?: boolean; backward?: boolean } = {}; const gaps: { forward?: boolean; backward?: boolean } = {};
if (crawlState) { if (opStatus) {
// Only check for backward gap (historical data) // Only check for backward gap (historical data)
if (!crawlState.finished) { if (!opStatus.finished) {
gaps.backward = true; gaps.backward = true;
if (targetOldestDate && crawlState.oldestDateReached) { if (targetOldestDate && opStatus.oldestDateReached) {
gaps.backward = new Date(crawlState.oldestDateReached) > targetOldestDate; gaps.backward = new Date(opStatus.oldestDateReached) > targetOldestDate;
} }
} }
} else { } else {
@ -541,23 +576,23 @@ export class OperationTracker {
const doc = await this.mongodb.findOne(collectionName, { const doc = await this.mongodb.findOne(collectionName, {
[symbolField]: symbol [symbolField]: symbol
}, { }, {
projection: { [`operations.${operationName}.crawlState`]: 1 } projection: { [`operations.${operationName}`]: 1 }
}); });
if (!doc?.operations?.[operationName]?.crawlState) { if (!doc?.operations?.[operationName]) {
return false; return false;
} }
const crawlState = doc.operations[operationName].crawlState; const opStatus = doc.operations[operationName];
// Check if explicitly marked as finished // Check if explicitly marked as finished
if (crawlState.finished) { if (opStatus.finished) {
return true; return true;
} }
// Check if we've reached the target oldest date // Check if we've reached the target oldest date
if (crawlState.oldestDateReached && targetOldestDate) { if (opStatus.oldestDateReached && targetOldestDate) {
return new Date(crawlState.oldestDateReached) <= targetOldestDate; return new Date(opStatus.oldestDateReached) <= targetOldestDate;
} }
return false; return false;
@ -655,7 +690,7 @@ export class OperationTracker {
// Crawl stats (if applicable) // Crawl stats (if applicable)
(operation.type === 'crawl' || operation.type === 'intraday_crawl') (operation.type === 'crawl' || operation.type === 'intraday_crawl')
? collection.countDocuments({ ? collection.countDocuments({
[`operations.${operationName}.crawlState.finished`]: true [`operations.${operationName}.finished`]: true
}) })
: Promise.resolve(undefined), : Promise.resolve(undefined),

View file

@ -42,16 +42,10 @@ export interface OperationStatus {
error?: string; error?: string;
/** Provider-specific metadata */ /** Provider-specific metadata */
metadata?: Record<string, any>; metadata?: Record<string, any>;
/** Crawl-specific state */
crawlState?: CrawlState; // Crawl-specific fields (used for crawl and intraday_crawl operations)
}
/**
* State for crawl-type operations
*/
export interface CrawlState {
/** Whether the crawl has completed */ /** Whether the crawl has completed */
finished: boolean; finished?: boolean;
/** Oldest date reached during crawl */ /** Oldest date reached during crawl */
oldestDateReached?: Date; oldestDateReached?: Date;
/** Newest date reached during crawl */ /** Newest date reached during crawl */
@ -64,10 +58,10 @@ export interface CrawlState {
lastCrawlDirection?: 'forward' | 'backward'; lastCrawlDirection?: 'forward' | 'backward';
/** Target oldest date to reach */ /** Target oldest date to reach */
targetOldestDate?: Date; targetOldestDate?: Date;
/** Custom crawl metadata */
metadata?: Record<string, any>;
} }
// CrawlState interface removed - fields moved directly to OperationStatus
/** /**
* Configuration for a data provider * Configuration for a data provider
*/ */
@ -94,10 +88,24 @@ export interface OperationUpdate {
recordCount?: number; recordCount?: number;
/** Error message for failures */ /** Error message for failures */
error?: string; error?: string;
/** Crawl state for crawl operations */
crawlState?: Partial<CrawlState>;
/** Additional metadata */ /** Additional metadata */
metadata?: Record<string, any>; metadata?: Record<string, any>;
// Crawl-specific update fields
/** Whether the crawl has completed */
finished?: boolean;
/** Oldest date reached during crawl */
oldestDateReached?: Date;
/** Newest date reached during crawl */
newestDateReached?: Date;
/** Last date that was processed (for resumption) */
lastProcessedDate?: Date;
/** Total days processed so far */
totalDaysProcessed?: number;
/** Direction of last crawl */
lastCrawlDirection?: 'forward' | 'backward';
/** Target oldest date to reach */
targetOldestDate?: Date;
} }
/** /**

View file

@ -25,6 +25,7 @@
"eslint-plugin-node": "^11.1.0", "eslint-plugin-node": "^11.1.0",
"eslint-plugin-promise": "^7.2.1", "eslint-plugin-promise": "^7.2.1",
"knip": "^5.61.2", "knip": "^5.61.2",
"mongodb": "^6.17.0",
"mongodb-mcp-server": "^0.1.1", "mongodb-mcp-server": "^0.1.1",
"mongodb-memory-server": "^9.1.6", "mongodb-memory-server": "^9.1.6",
"pg-mem": "^2.8.1", "pg-mem": "^2.8.1",

View file

@ -89,6 +89,7 @@
"eslint-plugin-node": "^11.1.0", "eslint-plugin-node": "^11.1.0",
"eslint-plugin-promise": "^7.2.1", "eslint-plugin-promise": "^7.2.1",
"knip": "^5.61.2", "knip": "^5.61.2",
"mongodb": "^6.17.0",
"mongodb-mcp-server": "^0.1.1", "mongodb-mcp-server": "^0.1.1",
"mongodb-memory-server": "^9.1.6", "mongodb-memory-server": "^9.1.6",
"pg-mem": "^2.8.1", "pg-mem": "^2.8.1",