more work

This commit is contained in:
Boki 2025-07-09 23:49:08 -04:00
parent 7a99d08d04
commit b87a931a2b
11 changed files with 595 additions and 183 deletions

View file

@ -0,0 +1,93 @@
import { MongoClient } from 'mongodb';
async function migratePriceTracking() {
const client = new MongoClient('mongodb://localhost:27017');
await client.connect();
const mongodb = client.db('stock');
try {
console.log('Starting price tracking migration...');
const collection = mongodb.collection('eodSymbols');
const batchSize = 100;
let processedCount = 0;
let hasMore = true;
while (hasMore) {
// Find documents that need migration
const documents = await collection.find({
lastPriceUpdate: { $exists: true },
'operations.price_update': { $exists: false }
}).limit(batchSize).toArray();
if (documents.length === 0) {
hasMore = false;
break;
}
// Process each document
for (const doc of documents) {
// Normalize date to 00:00:00 UTC
const lastPriceUpdate = new Date(doc.lastPriceUpdate);
const normalizedDate = new Date(Date.UTC(
lastPriceUpdate.getUTCFullYear(),
lastPriceUpdate.getUTCMonth(),
lastPriceUpdate.getUTCDate(),
0, 0, 0, 0
));
// Parse lastPriceDate if it exists
let lastRecordDate = null;
if (doc.lastPriceDate) {
try {
lastRecordDate = new Date(doc.lastPriceDate);
} catch (e) {
console.warn(`Failed to parse lastPriceDate for ${doc.Code}: ${doc.lastPriceDate}`);
}
}
// Update the document
await collection.updateOne(
{ _id: doc._id },
{
$set: {
'operations.price_update': {
lastRunAt: normalizedDate,
lastSuccessAt: normalizedDate,
status: 'success',
...(lastRecordDate && { lastRecordDate })
}
}
}
);
processedCount++;
if (processedCount % 1000 === 0) {
console.log(`Processed ${processedCount} documents...`);
}
}
}
console.log(`Migration completed. Total documents migrated: ${processedCount}`);
// Optional: Remove old fields
const removeOldFields = false; // Set to true to remove old fields
if (removeOldFields) {
console.log('Removing old fields...');
const result = await collection.updateMany(
{},
{ $unset: { lastPriceUpdate: '', lastPriceDate: '' } }
);
console.log(`Removed old fields from ${result.modifiedCount} documents`);
}
} catch (error) {
console.error('Migration failed:', error);
} finally {
await client.close();
}
}
// Run the migration
migratePriceTracking().catch(console.error);

View file

@ -1,5 +1,6 @@
import type { BaseHandler } from '@stock-bot/handlers';
import type { DataIngestionServices } from '../../../types';
import type { EodHandler } from '../eod.handler';
import { EOD_CONFIG } from '../shared';
import { getEodExchangeSuffix } from '../shared/utils';
@ -11,52 +12,50 @@ interface FetchCorporateActionsInput {
}
export async function scheduleFetchCorporateActions(
this: BaseHandler<DataIngestionServices>
this: EodHandler
): Promise<{ success: boolean; jobsScheduled: number }> {
const logger = this.logger;
try {
logger.info('Scheduling corporate actions fetch jobs');
// Calculate date one month ago for stale check
const oneMonthAgo = new Date();
oneMonthAgo.setMonth(oneMonthAgo.getMonth() - 1);
// Get Canadian exchanges for now
const canadianExchanges = ['TO', 'V', 'CN', 'NEO'];
// Find symbols that need corporate actions update
const symbols = await this.mongodb.collection('eodSymbols').find({
Exchange: { $in: canadianExchanges },
delisted: false,
$or: [
{ lastCorporateActionsUpdate: { $lt: oneMonthAgo } },
{ lastCorporateActionsUpdate: { $exists: false } }
]
}).limit(500).toArray(); // Limit to avoid too many jobs at once
// Use OperationTracker to find stale symbols for both dividends and splits
const allStaleDividends = await this.operationRegistry.getStaleSymbols('eod', 'dividends_update', {
limit: 2000 // Get more symbols to filter from
});
if (!symbols || symbols.length === 0) {
const allStaleSplits = await this.operationRegistry.getStaleSymbols('eod', 'splits_update', {
limit: 2000 // Get more symbols to filter from
});
// Filter for Canadian exchanges and non-delisted symbols
const staleSymbolsDividends = allStaleDividends.filter(item =>
canadianExchanges.includes(item.symbol.Exchange) &&
item.symbol.delisted === false
).slice(0, 500);
const staleSymbolsSplits = allStaleSplits.filter(item =>
canadianExchanges.includes(item.symbol.Exchange) &&
item.symbol.delisted === false
).slice(0, 500);
if ((!staleSymbolsDividends || staleSymbolsDividends.length === 0) &&
(!staleSymbolsSplits || staleSymbolsSplits.length === 0)) {
logger.info('No symbols need corporate actions update');
return { success: true, jobsScheduled: 0 };
}
logger.info(`Found ${symbols.length} symbols needing corporate actions update`, {
count: symbols.length,
samples: symbols.slice(0, 5).map(s => ({
symbol: s.Code,
exchange: s.Exchange,
name: s.Name,
lastUpdate: s.lastCorporateActionsUpdate
}))
});
logger.info(`Found ${staleSymbolsDividends.length} symbols needing dividends update and ${staleSymbolsSplits.length} symbols needing splits update`);
let jobsScheduled = 0;
// Schedule jobs for each symbol - both dividends and splits
for (let i = 0; i < symbols.length; i++) {
const symbol = symbols[i];
// Schedule dividends jobs
for (let i = 0; i < staleSymbolsDividends.length; i++) {
const { symbol } = staleSymbolsDividends[i];
// Schedule dividends fetch
await this.scheduleOperation('fetch-corporate-actions', {
symbol: symbol.Code,
exchange: symbol.eodExchange || symbol.Exchange, // Use eodExchange if available
@ -71,8 +70,12 @@ export async function scheduleFetchCorporateActions(
delay: i * 200 // Stagger jobs by 200ms per symbol
});
jobsScheduled++;
}
// Schedule splits jobs
for (let i = 0; i < staleSymbolsSplits.length; i++) {
const { symbol } = staleSymbolsSplits[i];
// Schedule splits fetch
await this.scheduleOperation('fetch-corporate-actions', {
symbol: symbol.Code,
exchange: symbol.eodExchange || symbol.Exchange, // Use eodExchange if available
@ -102,7 +105,7 @@ export async function scheduleFetchCorporateActions(
}
export async function fetchCorporateActions(
this: BaseHandler<DataIngestionServices>,
this: EodHandler,
input: FetchCorporateActionsInput
): Promise<{ success: boolean; recordsCount: number }> {
const logger = this.logger;
@ -204,17 +207,17 @@ export async function fetchCorporateActions(
['date', 'symbolExchange']
);
// Update symbol with last update timestamp
await this.mongodb.collection('eodSymbols').updateOne(
{ Code: symbol, Exchange: exchange },
{
$set: {
lastCorporateActionsUpdate: new Date(),
[`last${actionType.charAt(0).toUpperCase() + actionType.slice(1)}Update`]: new Date(),
[`has${actionType.charAt(0).toUpperCase() + actionType.slice(1)}`]: true
// Update operation tracker based on action type
const operationName = actionType === 'dividends' ? 'dividends_update' : 'splits_update';
await this.operationRegistry.updateOperation('eod', symbol, operationName, {
status: 'success',
recordCount: result.insertedCount,
metadata: {
actionType: actionType,
exchange: exchange,
hasData: result.insertedCount > 0
}
}
);
});
logger.info(`Successfully saved ${result.insertedCount} ${actionType} records for ${symbol}.${exchange}`);
@ -224,6 +227,14 @@ export async function fetchCorporateActions(
};
} catch (error) {
logger.error('Failed to fetch corporate actions', { error, symbol, exchange, actionType });
// Update operation tracker with failure
const operationName = actionType === 'dividends' ? 'dividends_update' : 'splits_update';
await this.operationRegistry.updateOperation('eod', symbol, operationName, {
status: 'failure',
error: error.message
});
throw error;
}
}

View file

@ -1,5 +1,6 @@
import type { BaseHandler } from '@stock-bot/handlers';
import type { DataIngestionServices } from '../../../types';
import type { EodHandler } from '../eod.handler';
import { EOD_CONFIG } from '../shared';
import { getEodExchangeSuffix } from '../shared/utils';
@ -14,52 +15,49 @@ interface FetchSingleFundamentalsInput {
}
export async function scheduleFetchFundamentals(
this: BaseHandler<DataIngestionServices>
this: EodHandler
): Promise<{ success: boolean; jobsScheduled: number }> {
const logger = this.logger;
try {
logger.info('Scheduling fundamentals fetch jobs');
// Calculate date one week ago for stale check
const oneWeekAgo = new Date();
oneWeekAgo.setDate(oneWeekAgo.getDate() - 7);
// Get Canadian exchanges for now
const canadianExchanges = ['TO', 'V', 'CN', 'NEO'];
// Find symbols that need fundamentals update
const symbols = await this.mongodb.collection('eodSymbols').find({
Exchange: { $in: canadianExchanges },
delisted: false,
$or: [
{ lastFundamentalsUpdate: { $lt: oneWeekAgo } },
{ lastFundamentalsUpdate: { $exists: false } }
]
}).toArray();
// Use OperationTracker to find stale symbols
const allStaleSymbols = await this.operationRegistry.getStaleSymbols('eod', 'fundamentals_update', {
limit: 5000 // Get more symbols to filter from
});
if (!symbols || symbols.length === 0) {
// Filter for Canadian exchanges and non-delisted symbols
const staleSymbols = allStaleSymbols.filter(item =>
canadianExchanges.includes(item.symbol.Exchange) &&
item.symbol.delisted === false
).slice(0, 1000); // Limit to 1000 after filtering
if (!staleSymbols || staleSymbols.length === 0) {
logger.info('No symbols need fundamentals update');
return { success: true, jobsScheduled: 0 };
}
logger.info(`Found ${symbols.length} symbols needing fundamentals update`);
logger.info(`Found ${staleSymbols.length} symbols needing fundamentals update`);
// Separate ETFs from regular stocks
const etfs = symbols.filter(s => s.Type === 'ETF');
const nonEtfs = symbols.filter(s => s.Type !== 'ETF');
const etfs = staleSymbols.filter(s => s.symbol.Type === 'ETF');
const nonEtfs = staleSymbols.filter(s => s.symbol.Type !== 'ETF');
logger.info(`Found ${etfs.length} ETFs and ${nonEtfs.length} non-ETFs`, {
etfSamples: etfs.slice(0, 5).map(e => ({
symbol: e.Code,
exchange: e.Exchange,
name: e.Name
symbol: e.symbol.Code,
exchange: e.symbol.Exchange,
name: e.symbol.Name
})),
nonEtfSamples: nonEtfs.slice(0, 5).map(s => ({
symbol: s.Code,
exchange: s.Exchange,
name: s.Name,
type: s.Type
symbol: s.symbol.Code,
exchange: s.symbol.Exchange,
name: s.symbol.Name,
type: s.symbol.Type
}))
});
@ -67,7 +65,7 @@ export async function scheduleFetchFundamentals(
// Schedule individual jobs for ETFs
for (let i = 0; i < etfs.length; i++) {
const etf = etfs[i];
const { symbol: etf } = etfs[i];
await this.scheduleOperation('fetch-single-fundamentals', {
symbol: etf.Code,
exchange: etf.eodExchange || etf.Exchange, // Use eodExchange if available
@ -95,9 +93,9 @@ export async function scheduleFetchFundamentals(
// Convert to array of {symbol, exchange, country} objects
const symbolBatch = batch.map(s => ({
symbol: s.Code,
exchange: s.eodExchange || s.Exchange, // Use eodExchange if available
country: s.Country
symbol: s.symbol.Code,
exchange: s.symbol.eodExchange || s.symbol.Exchange, // Use eodExchange if available
country: s.symbol.Country
}));
await this.scheduleOperation('fetch-bulk-fundamentals', {
@ -128,7 +126,7 @@ export async function scheduleFetchFundamentals(
}
export async function fetchBulkFundamentals(
this: BaseHandler<DataIngestionServices>,
this: EodHandler,
input: BulkFundamentalsInput
): Promise<{ success: boolean; symbolsProcessed: number }> {
const logger = this.logger;
@ -221,17 +219,16 @@ export async function fetchBulkFundamentals(
logger.info(`Saved ${result.insertedCount} fundamentals records for ${exchange}`);
// Update symbols with last update timestamp
// Update operation tracker for each symbol
const updatePromises = symbolsToUpdate.map(({ symbol, exchange }) =>
this.mongodb.collection('eodSymbols').updateOne(
{ Code: symbol, Exchange: exchange },
{
$set: {
lastFundamentalsUpdate: new Date(),
hasFundamentals: true
this.operationRegistry.updateOperation('eod', symbol, 'fundamentals_update', {
status: 'success',
recordCount: 1,
metadata: {
hasData: true,
exchange: exchange
}
}
)
})
);
await Promise.all(updatePromises);
@ -247,12 +244,22 @@ export async function fetchBulkFundamentals(
};
} catch (error) {
logger.error('Failed to fetch bulk fundamentals', { error });
// Mark all symbols as failed
const failPromises = input.symbols.map(({ symbol }) =>
this.operationRegistry.updateOperation('eod', symbol, 'fundamentals_update', {
status: 'failure',
error: error.message
})
);
await Promise.all(failPromises);
throw error;
}
}
export async function fetchSingleFundamentals(
this: BaseHandler<DataIngestionServices>,
this: EodHandler,
input: FetchSingleFundamentalsInput
): Promise<{ success: boolean; saved: boolean }> {
const logger = this.logger;
@ -321,16 +328,16 @@ export async function fetchSingleFundamentals(
['symbolExchange']
);
// Update symbol with last update timestamp
await this.mongodb.collection('eodSymbols').updateOne(
{ Code: symbol, Exchange: exchange },
{
$set: {
lastFundamentalsUpdate: new Date(),
hasFundamentals: true
// Update operation tracker
await this.operationRegistry.updateOperation('eod', symbol, 'fundamentals_update', {
status: 'success',
recordCount: result.insertedCount,
metadata: {
hasData: true,
exchange: exchange,
saved: result.insertedCount > 0
}
}
);
});
logger.info(`Successfully saved fundamentals for ${symbol}.${exchange}`);
@ -340,6 +347,13 @@ export async function fetchSingleFundamentals(
};
} catch (error) {
logger.error('Failed to fetch single fundamentals', { error, symbol, exchange });
// Update operation tracker with failure
await this.operationRegistry.updateOperation('eod', symbol, 'fundamentals_update', {
status: 'failure',
error: error.message
});
throw error;
}
}

View file

@ -1,5 +1,6 @@
import type { BaseHandler } from '@stock-bot/handlers';
import type { DataIngestionServices } from '../../../types';
import type { EodHandler } from '../eod.handler';
import { EOD_CONFIG } from '../shared';
import { getEodExchangeSuffix } from '../shared/utils';
@ -36,7 +37,7 @@ const MAX_DAYS_PER_INTERVAL = {
};
export async function scheduleIntradayCrawl(
this: BaseHandler<DataIngestionServices>
this: EodHandler
): Promise<{ success: boolean; jobsScheduled: number }> {
const logger = this.logger;
@ -46,45 +47,59 @@ export async function scheduleIntradayCrawl(
// Get Canadian exchanges for now
const canadianExchanges = ['TO', 'V', 'CN', 'NEO'];
// Find active symbols that need intraday data
const symbols = await this.mongodb.collection('eodSymbols').find({
Exchange: { $in: canadianExchanges },
delisted: false,
// Only symbols without complete intraday data
$or: [
{ 'intradayState.1m.finished': { $ne: true } },
{ 'intradayState.5m.finished': { $ne: true } },
{ 'intradayState.1h.finished': { $ne: true } },
{ 'intradayState': { $exists: false } }
]
}).limit(100).toArray(); // Limit to avoid too many jobs at once
// Use OperationTracker to find symbols needing intraday crawl
const intervals: Array<'1m' | '5m' | '1h'> = ['1m', '5m', '1h'];
const operationNames = ['intraday_1m', 'intraday_5m', 'intraday_1h'];
if (!symbols || symbols.length === 0) {
let allSymbolsForCrawl: any[] = [];
// Get symbols needing crawl for each interval
for (let i = 0; i < intervals.length; i++) {
const interval = intervals[i];
const operationName = operationNames[i];
const allSymbolsForInterval = await this.operationRegistry.getSymbolsForIntradayCrawl('eod', operationName, {
limit: 500 // Get more symbols to filter from
});
// Filter for Canadian exchanges and non-delisted symbols
const symbolsForInterval = allSymbolsForInterval.filter(item =>
canadianExchanges.includes(item.symbol.Exchange) &&
item.symbol.delisted === false
).slice(0, 100);
// Add interval info to each symbol
symbolsForInterval.forEach(item => {
allSymbolsForCrawl.push({
symbol: item.symbol,
interval: interval,
operationName: operationName,
crawlState: item.crawlState
});
});
}
if (!allSymbolsForCrawl || allSymbolsForCrawl.length === 0) {
logger.info('No symbols need intraday crawl');
return { success: true, jobsScheduled: 0 };
}
logger.info(`Found ${symbols.length} symbols needing intraday data`, {
count: symbols.length,
samples: symbols.slice(0, 5).map(s => ({
symbol: s.Code,
exchange: s.Exchange,
name: s.Name,
intradayState: s.intradayState
logger.info(`Found ${allSymbolsForCrawl.length} symbol/interval combinations needing intraday data`, {
count: allSymbolsForCrawl.length,
samples: allSymbolsForCrawl.slice(0, 5).map(s => ({
symbol: s.symbol.Code,
exchange: s.symbol.Exchange,
name: s.symbol.Name,
interval: s.interval,
crawlState: s.crawlState
}))
});
let jobsScheduled = 0;
const intervals: Array<'1m' | '5m' | '1h'> = ['1m', '5m', '1h'];
// Schedule crawl jobs for each symbol and interval
for (const symbol of symbols) {
for (const interval of intervals) {
// Check if this interval is already finished
const isFinished = symbol.intradayState?.[interval]?.finished;
if (isFinished) {
continue;
}
// Schedule crawl jobs for each symbol/interval combination
for (const item of allSymbolsForCrawl) {
const { symbol, interval } = item;
await this.scheduleOperation('crawl-intraday', {
symbol: symbol.Code,
@ -101,7 +116,6 @@ export async function scheduleIntradayCrawl(
});
jobsScheduled++;
}
}
logger.info(`Successfully scheduled ${jobsScheduled} intraday crawl jobs`);
@ -116,7 +130,7 @@ export async function scheduleIntradayCrawl(
}
export async function crawlIntraday(
this: BaseHandler<DataIngestionServices>,
this: EodHandler,
input: CrawlIntradayInput
): Promise<{ success: boolean; recordsProcessed: number; finished: boolean }> {
const logger = this.logger;
@ -240,7 +254,7 @@ export async function crawlIntraday(
}
export async function fetchIntraday(
this: BaseHandler<DataIngestionServices>,
this: EodHandler,
input: FetchIntradayInput
): Promise<{ success: boolean; recordsSaved: number }> {
const logger = this.logger;

View file

@ -1,5 +1,6 @@
import type { BaseHandler } from '@stock-bot/handlers';
import type { DataIngestionServices } from '../../../types';
import type { EodHandler } from '../eod.handler';
import { EOD_CONFIG } from '../shared';
import { getEodExchangeSuffix } from '../shared/utils';
@ -10,48 +11,40 @@ interface FetchPricesInput {
}
export async function scheduleFetchPrices(
this: BaseHandler<DataIngestionServices>
this: EodHandler
): Promise<{ success: boolean; jobsScheduled: number }> {
const logger = this.logger;
try {
logger.info('Scheduling price fetch jobs for all symbols');
// Calculate date one week ago
const oneWeekAgo = new Date();
oneWeekAgo.setDate(oneWeekAgo.getDate() - 7);
// Use OperationTracker to find stale symbols
const staleSymbols = await this.operationRegistry.getStaleSymbols('eod', 'price_update', {
limit: 1000 // Process in batches to avoid overwhelming the system
});
// Find ALL symbols that haven't been updated in the last week
const symbols = await this.mongodb.collection('eodSymbols').find({
delisted: false,
$or: [
{ lastPriceUpdate: { $lt: oneWeekAgo } },
{ lastPriceUpdate: { $exists: false } }
]
}).toArray();
if (!symbols || symbols.length === 0) {
if (!staleSymbols || staleSymbols.length === 0) {
logger.info('No symbols need price updates');
return { success: true, jobsScheduled: 0 };
}
logger.info(`Found ${symbols.length} symbols needing price updates`, {
symbols: symbols.map(s => ({
symbol: s.Code,
exchange: s.Exchange,
name: s.Name,
lastUpdate: s.lastPriceUpdate
logger.info(`Found ${staleSymbols.length} symbols needing price updates`, {
symbols: staleSymbols.slice(0, 10).map(s => ({
symbol: s.symbol.Code,
exchange: s.symbol.Exchange,
name: s.symbol.Name,
lastUpdate: s.lastRun
}))
});
let jobsScheduled = 0;
// Schedule jobs with staggered delays
for (let i = 0; i < symbols.length; i++) {
const symbol = symbols[i];
for (let i = 0; i < staleSymbols.length; i++) {
const { symbol } = staleSymbols[i];
logger.debug(`Scheduling price fetch for ${symbol.Code}.${symbol.Exchange}`, {
name: symbol.Name,
lastUpdate: symbol.lastPriceUpdate,
lastUpdate: staleSymbols[i].lastRun,
delay: i * 100
});
@ -83,7 +76,7 @@ export async function scheduleFetchPrices(
}
export async function fetchPrices(
this: BaseHandler<DataIngestionServices>,
this: EodHandler,
input: FetchPricesInput
): Promise<{ success: boolean; priceCount: number }> {
const logger = this.logger;
@ -165,16 +158,16 @@ export async function fetchPrices(
['date', 'symbolExchange']
);
// Update the symbol's last price update timestamp
await this.mongodb.collection('eodSymbols').updateOne(
{ Code: symbol, Exchange: exchange },
{
$set: {
lastPriceUpdate: new Date(),
lastPriceDate: priceData.length > 0 ? priceData[priceData.length - 1].date : null
// Update operation tracker instead of directly updating the symbol
await this.operationRegistry.updateOperation('eod', symbol, 'price_update', {
status: 'success',
lastRecordDate: priceData.length > 0 ? priceData[priceData.length - 1].date : null,
recordCount: priceData.length,
metadata: {
insertedCount: result.insertedCount,
updatedCount: priceData.length - result.insertedCount
}
}
);
});
logger.info(`Successfully saved ${result.insertedCount} price records for ${symbol}.${exchange}`);
@ -184,6 +177,13 @@ export async function fetchPrices(
};
} catch (error) {
logger.error('Failed to fetch or save prices', { error, symbol, exchange });
// Update operation tracker with failure
await this.operationRegistry.updateOperation('eod', symbol, 'price_update', {
status: 'failure',
error: error.message
});
throw error;
}
}

View file

@ -5,23 +5,25 @@ import {
RateLimit,
ScheduledOperation
} from '@stock-bot/handlers';
import type { OperationRegistry } from '../../shared/operation-manager';
import type { DataIngestionServices } from '../../types';
import {
fetchExchanges,
fetchSymbols,
scheduleFetchSymbols,
fetchPrices,
scheduleFetchPrices,
fetchIntraday,
crawlIntraday,
scheduleIntradayCrawl,
fetchBulkFundamentals,
fetchSingleFundamentals,
scheduleFetchFundamentals,
fetchSymbolChangeHistory,
fetchCorporateActions,
scheduleFetchCorporateActions
fetchExchanges,
fetchIntraday,
fetchPrices,
fetchSingleFundamentals,
fetchSymbolChangeHistory,
fetchSymbols,
scheduleFetchCorporateActions,
scheduleFetchFundamentals,
scheduleFetchPrices,
scheduleFetchSymbols,
scheduleIntradayCrawl
} from './actions';
import { createEODOperationRegistry } from './shared';
/**
* EOD (End of Day) Handler demonstrating advanced rate limiting
@ -37,10 +39,18 @@ import {
],
})
export class EodHandler extends BaseHandler<DataIngestionServices> {
public operationRegistry!: OperationRegistry;
constructor(services: any) {
super(services);
}
async initialize(): Promise<void> {
// Initialize operation registry
this.operationRegistry = await createEODOperationRegistry(this.mongodb, this.logger);
this.logger.info('EOD operation registry initialized');
}
/**
* Fetch exchanges list from EOD
* Runs weekly on Sundays at midnight

View file

@ -1,2 +1,3 @@
export * from './config';
export * from './utils';
export * from './operation-provider';

View file

@ -0,0 +1,118 @@
/**
* EOD Operation Provider - Defines operations for EOD Historical Data source
*/
import { BaseOperationProvider, OperationRegistry, type OperationConfig, type ProviderConfig } from '../../../shared/operation-manager';
/**
* EOD operation definitions
*/
export const EOD_OPERATIONS: OperationConfig[] = [
// Exchange data
{
name: 'exchange_update',
type: 'standard',
description: 'Update exchange list',
defaultStaleHours: 24 * 7 // Weekly
},
// Symbol data
{
name: 'symbol_update',
type: 'standard',
description: 'Update symbol list for exchange',
defaultStaleHours: 24 // Daily
},
// Price data
{
name: 'price_update',
type: 'standard',
description: 'Update daily price data',
defaultStaleHours: 24 * 7 // Weekly - EOD's schedule-fetch-prices runs for symbols not updated in last week
},
// Intraday data
{
name: 'intraday_1m',
type: 'intraday_crawl',
description: 'Crawl 1-minute intraday data',
requiresFinishedFlag: true,
defaultStaleHours: 24 // Daily check for new data
},
{
name: 'intraday_5m',
type: 'intraday_crawl',
description: 'Crawl 5-minute intraday data',
requiresFinishedFlag: true,
defaultStaleHours: 24 // Daily check for new data
},
{
name: 'intraday_1h',
type: 'intraday_crawl',
description: 'Crawl 1-hour intraday data',
requiresFinishedFlag: true,
defaultStaleHours: 24 // Daily check for new data
},
// Fundamental data
{
name: 'fundamentals_update',
type: 'standard',
description: 'Update fundamental data',
defaultStaleHours: 24 * 7 // Weekly
},
// Corporate actions
{
name: 'dividends_update',
type: 'standard',
description: 'Update dividend data',
defaultStaleHours: 24 * 30 // Monthly
},
{
name: 'splits_update',
type: 'standard',
description: 'Update stock split data',
defaultStaleHours: 24 * 30 // Monthly
},
// Symbol changes
{
name: 'symbol_change_update',
type: 'standard',
description: 'Update symbol change history',
defaultStaleHours: 24 * 7 // Weekly
}
];
/**
* EOD Operation Provider
*/
export class EODOperationProvider extends BaseOperationProvider {
getProviderConfig(): ProviderConfig {
return {
name: 'eod',
collectionName: 'eodSymbols',
symbolField: 'Code',
description: 'EOD Historical Data provider'
};
}
getOperations(): OperationConfig[] {
return EOD_OPERATIONS;
}
}
/**
* Create and initialize EOD operation registry
*/
export async function createEODOperationRegistry(
mongodb: any,
logger: any
): Promise<OperationRegistry> {
const registry = new OperationRegistry({ mongodb, logger });
const provider = new EODOperationProvider({ mongodb, logger });
await registry.registerProvider(provider);
return registry;
}

View file

@ -85,7 +85,7 @@ services: # Dragonfly - Redis replacement for caching and events
MONGO_INITDB_DATABASE: stock
ports:
- "27017:27017"
command: --wiredTigerCacheSizeGB 8
command: --wiredTigerCacheSizeGB 20
volumes:
- mongodb_data:/data/db
- ./database/mongodb/init:/docker-entrypoint-initdb.d

View file

@ -0,0 +1,73 @@
// MongoDB Shell Script for Price Tracking Migration
// Run with: mongosh mongodb://username:password@localhost:27017/stock price-migration.mongodb.js
print("Starting price tracking migration...");
const batchSize = 1000;
let processedCount = 0;
let cursor = db.eodSymbols.find({
lastPriceUpdate: { $exists: true },
'operations.price_update': { $exists: false }
});
let batch = [];
cursor.forEach(doc => {
// Normalize date to 00:00:00 UTC
const normalizedDate = new Date(doc.lastPriceUpdate);
normalizedDate.setUTCHours(0, 0, 0, 0);
// Parse lastPriceDate if it exists
let lastRecordDate = null;
if (doc.lastPriceDate) {
try {
lastRecordDate = new Date(doc.lastPriceDate);
} catch (e) {
print(`Failed to parse lastPriceDate for ${doc.Code}: ${doc.lastPriceDate}`);
}
}
batch.push({
updateOne: {
filter: { _id: doc._id },
update: {
$set: {
'operations.price_update': {
lastRunAt: normalizedDate,
lastSuccessAt: normalizedDate,
status: 'success',
...(lastRecordDate && { lastRecordDate })
}
}
}
}
});
if (batch.length >= batchSize) {
db.eodSymbols.bulkWrite(batch);
processedCount += batch.length;
print(`Processed ${processedCount} documents...`);
batch = [];
}
});
// Process remaining batch
if (batch.length > 0) {
db.eodSymbols.bulkWrite(batch);
processedCount += batch.length;
}
print(`Migration completed. Processed ${processedCount} documents.`);
// Verify migration
const sampleDoc = db.eodSymbols.findOne({ 'operations.price_update': { $exists: true } });
print("\nSample migrated document:");
printjson(sampleDoc.operations);
// Count remaining documents with old fields
const remainingCount = db.eodSymbols.countDocuments({ lastPriceUpdate: { $exists: true } });
print(`\nDocuments still having lastPriceUpdate field: ${remainingCount}`);
// Optional: Remove old fields (uncomment to execute)
// print("\nRemoving old fields...");
// db.eodSymbols.updateMany({}, { $unset: { lastPriceUpdate: '', lastPriceDate: '' } });

78
run-price-migration.js Normal file
View file

@ -0,0 +1,78 @@
// This script performs the migration of lastPriceUpdate and lastPriceDate fields
// to the operations tracking system
// Since authentication is required, you'll need to run this script with proper MongoDB credentials
// Update the connection string below with your authentication details
const MONGODB_URI = 'mongodb://username:password@localhost:27017/stock?authSource=admin';
// Or use environment variable: process.env.MONGODB_URI
async function runMigration() {
console.log(`
========================================
PRICE TRACKING MIGRATION SCRIPT
========================================
This script will migrate lastPriceUpdate and lastPriceDate fields
from eodSymbols collection to the operations tracking system.
To run this migration:
1. Update the MONGODB_URI in this file with your MongoDB credentials
OR set the MONGODB_URI environment variable
2. Run the script:
node run-price-migration.js
The script will:
- Find all documents with lastPriceUpdate field
- Convert dates to normalized format (00:00:00 UTC)
- Create operations.price_update structure
- Process in batches of 1000 documents
Total documents to migrate: ~199,175
Migration query that will be executed:
`);
console.log(`
db.eodSymbols.find({
lastPriceUpdate: { $exists: true },
'operations.price_update': { $exists: false }
}).forEach(doc => {
const normalizedDate = new Date(doc.lastPriceUpdate);
normalizedDate.setUTCHours(0, 0, 0, 0);
let lastRecordDate = null;
if (doc.lastPriceDate) {
try {
lastRecordDate = new Date(doc.lastPriceDate);
} catch (e) {}
}
db.eodSymbols.updateOne(
{ _id: doc._id },
{
$set: {
'operations.price_update': {
lastRunAt: normalizedDate,
lastSuccessAt: normalizedDate,
status: 'success',
...(lastRecordDate && { lastRecordDate })
}
}
}
);
});
`);
console.log(`
After migration, you can optionally remove old fields:
db.eodSymbols.updateMany({}, { $unset: { lastPriceUpdate: '', lastPriceDate: '' } })
To verify migration:
db.eodSymbols.findOne({ 'operations.price_update': { $exists: true } })
`);
}
runMigration();