work on eod

This commit is contained in:
Boki 2025-07-06 23:42:43 -04:00
parent 8f65c19d46
commit 5ca8fafe7e
10 changed files with 271 additions and 49 deletions

View file

@ -38,7 +38,15 @@ export async function scheduleFetchCorporateActions(
return { success: true, jobsScheduled: 0 };
}
logger.info(`Found ${symbols.length} symbols needing corporate actions update`);
logger.info(`Found ${symbols.length} symbols needing corporate actions update`, {
count: symbols.length,
samples: symbols.slice(0, 5).map(s => ({
symbol: s.Code,
exchange: s.Exchange,
name: s.Name,
lastUpdate: s.lastCorporateActionsUpdate
}))
});
let jobsScheduled = 0;
@ -97,7 +105,7 @@ export async function fetchCorporateActions(
const { symbol, exchange, actionType } = input;
try {
logger.info('Fetching corporate actions', { symbol, exchange, actionType });
logger.info(`Fetching ${actionType} for ${symbol}.${exchange}`);
// Get API key
const apiKey = EOD_CONFIG.API_TOKEN;
@ -127,6 +135,18 @@ export async function fetchCorporateActions(
logger.info(`Fetched ${data.length} ${actionType} records for ${symbol}.${exchange}`);
// Log sample records if any
if (data.length > 0) {
logger.debug(`Sample ${actionType} for ${symbol}.${exchange}:`, {
count: data.length,
samples: data.slice(0, 3).map(record => ({
date: record.date,
value: record.value || record.split || record.amount,
...record
}))
});
}
if (data.length === 0) {
// Update symbol to indicate we checked but found no data
await this.mongodb.collection('eodSymbols').updateOne(

View file

@ -34,10 +34,26 @@ export async function fetchExchanges(
throw new Error('Invalid response format from EOD API - expected array');
}
logger.info(`Fetched ${exchanges.length} exchanges from EOD`);
// Log some example exchanges
if (exchanges.length > 0) {
logger.debug('Sample exchanges:', {
count: exchanges.length,
samples: exchanges.slice(0, 5).map(e => ({
code: e.Code,
name: e.Name,
country: e.Country
}))
});
}
const result = await this.mongodb.batchUpsert('eodExchanges', exchanges, ['Code']);
logger.info(`Successfully saved ${result.insertedCount} exchanges to MongoDB`);
logger.info(`Successfully saved exchanges to MongoDB`, {
total: exchanges.length,
inserted: result.insertedCount,
updated: exchanges.length - result.insertedCount
});
return {
success: true,

View file

@ -6,6 +6,11 @@ interface BulkFundamentalsInput {
symbols: Array<{ symbol: string; exchange: string }>;
}
interface FetchSingleFundamentalsInput {
symbol: string;
exchange: string;
}
export async function scheduleFetchFundamentals(
this: BaseHandler<DataIngestionServices>
): Promise<{ success: boolean; jobsScheduled: number }> {
@ -38,12 +43,52 @@ export async function scheduleFetchFundamentals(
logger.info(`Found ${symbols.length} symbols needing fundamentals update`);
// Separate ETFs from regular stocks
const etfs = symbols.filter(s => s.Type === 'ETF');
const nonEtfs = symbols.filter(s => s.Type !== 'ETF');
logger.info(`Found ${etfs.length} ETFs and ${nonEtfs.length} non-ETFs`, {
etfSamples: etfs.slice(0, 5).map(e => ({
symbol: e.Code,
exchange: e.Exchange,
name: e.Name
})),
nonEtfSamples: nonEtfs.slice(0, 5).map(s => ({
symbol: s.Code,
exchange: s.Exchange,
name: s.Name,
type: s.Type
}))
});
let jobsScheduled = 0;
// Schedule individual jobs for ETFs
for (let i = 0; i < etfs.length; i++) {
const etf = etfs[i];
await this.scheduleOperation('fetch-single-fundamentals', {
symbol: etf.Code,
exchange: etf.Exchange
}, {
attempts: 3,
backoff: {
type: 'exponential',
delay: 10000
},
delay: jobsScheduled * 1000 // Stagger by 1 second
});
jobsScheduled++;
}
if (etfs.length > 0) {
logger.info(`Scheduled ${etfs.length} individual ETF fundamentals jobs`);
}
// Create batches of 500 symbols for non-ETFs
const batchSize = 500; // EOD allows up to 500 symbols per bulk request
// Create batches of 500 symbols
for (let i = 0; i < symbols.length; i += batchSize) {
const batch = symbols.slice(i, i + batchSize);
for (let i = 0; i < nonEtfs.length; i += batchSize) {
const batch = nonEtfs.slice(i, i + batchSize);
// Convert to array of {symbol, exchange} objects
const symbolBatch = batch.map(s => ({
@ -63,7 +108,7 @@ export async function scheduleFetchFundamentals(
});
jobsScheduled++;
logger.info(`Scheduled fundamentals batch ${jobsScheduled} with ${symbolBatch.length} symbols`);
logger.info(`Scheduled fundamentals batch with ${symbolBatch.length} non-ETF symbols`);
}
logger.info(`Successfully scheduled ${jobsScheduled} fundamentals fetch jobs`);
@ -113,7 +158,11 @@ export async function fetchBulkFundamentals(
url.searchParams.append('fmt', 'json');
url.searchParams.append('symbols', symbolList.join(','));
logger.info(`Fetching fundamentals for ${symbolList.length} ${exchange} symbols`);
logger.info(`Fetching bulk fundamentals for ${symbolList.length} ${exchange} symbols`, {
exchange,
count: symbolList.length,
samples: symbolList.slice(0, 5)
});
// Fetch data
const response = await fetch(url.toString());
@ -194,4 +243,80 @@ export async function fetchBulkFundamentals(
logger.error('Failed to fetch bulk fundamentals', { error });
throw error;
}
}
export async function fetchSingleFundamentals(
this: BaseHandler<DataIngestionServices>,
input: FetchSingleFundamentalsInput
): Promise<{ success: boolean; saved: boolean }> {
const logger = this.logger;
const { symbol, exchange } = input;
try {
logger.info(`Fetching single fundamentals for ${symbol}.${exchange}`);
// Get API key
const apiKey = EOD_CONFIG.API_TOKEN;
if (!apiKey) {
throw new Error('EOD API key not configured');
}
// Build URL for single fundamentals endpoint
const url = new URL(`https://eodhd.com/api/fundamentals/${symbol}.${exchange}`);
url.searchParams.append('api_token', apiKey);
url.searchParams.append('fmt', 'json');
// Fetch data
const response = await fetch(url.toString());
if (!response.ok) {
throw new Error(`EOD Single Fundamentals API returned ${response.status}: ${response.statusText}`);
}
const fundamentals = await response.json();
// Check if we got valid data
if (!fundamentals || typeof fundamentals !== 'object') {
logger.warn(`No fundamentals data returned for ${symbol}.${exchange}`);
return { success: true, saved: false };
}
// Add metadata
const fundamentalsWithMetadata = {
symbol,
exchange,
symbolExchange: `${symbol}.${exchange}`,
...fundamentals,
updatedAt: new Date(),
source: 'eod'
};
// Save to MongoDB
const result = await this.mongodb.batchUpsert(
'eodFundamentals',
[fundamentalsWithMetadata],
['symbolExchange']
);
// Update symbol with last update timestamp
await this.mongodb.collection('eodSymbols').updateOne(
{ Code: symbol, Exchange: exchange },
{
$set: {
lastFundamentalsUpdate: new Date(),
hasFundamentals: true
}
}
);
logger.info(`Successfully saved fundamentals for ${symbol}.${exchange}`);
return {
success: true,
saved: result.insertedCount > 0
};
} catch (error) {
logger.error('Failed to fetch single fundamentals', { error, symbol, exchange });
throw error;
}
}

View file

@ -2,6 +2,6 @@ export { fetchExchanges } from './exchanges';
export { fetchSymbols, scheduleFetchSymbols } from './symbols';
export { fetchPrices, scheduleFetchPrices } from './prices';
export { fetchIntraday, crawlIntraday, scheduleIntradayCrawl } from './intraday';
export { fetchBulkFundamentals, scheduleFetchFundamentals } from './fundamentals';
export { fetchBulkFundamentals, fetchSingleFundamentals, scheduleFetchFundamentals } from './fundamentals';
export { fetchSymbolChangeHistory } from './symbol-change';
export { fetchCorporateActions, scheduleFetchCorporateActions } from './corporate-actions';

View file

@ -61,7 +61,15 @@ export async function scheduleIntradayCrawl(
return { success: true, jobsScheduled: 0 };
}
logger.info(`Found ${symbols.length} symbols needing intraday data`);
logger.info(`Found ${symbols.length} symbols needing intraday data`, {
count: symbols.length,
samples: symbols.slice(0, 5).map(s => ({
symbol: s.Code,
exchange: s.Exchange,
name: s.Name,
intradayState: s.intradayState
}))
});
let jobsScheduled = 0;
const intervals: Array<'1m' | '5m' | '1h'> = ['1m', '5m', '1h'];
@ -111,7 +119,7 @@ export async function crawlIntraday(
const { symbol, exchange, interval } = input;
try {
logger.info('Starting intraday crawl', { symbol, exchange, interval });
logger.info(`Starting intraday crawl for ${symbol}.${exchange} - ${interval}`);
// Get current crawl state
const symbolDoc = await this.mongodb.collection('eodSymbols').findOne({
@ -172,11 +180,11 @@ export async function crawlIntraday(
// Check if we're finished (no data returned means we've reached the end)
if (result.recordsSaved === 0) {
newState.finished = true;
logger.info('Intraday crawl finished - no more data', {
symbol,
exchange,
interval,
totalRecords: newState.totalRecordsProcessed
logger.info(`Intraday crawl finished for ${symbol}.${exchange} - ${interval}`, {
totalRecords: newState.totalRecordsProcessed,
oldestDate: newState.oldestDateReached,
newestDate: newState.newestDateReached,
batches: newState.totalBatchesProcessed
});
}
@ -233,12 +241,9 @@ export async function fetchIntraday(
const { symbol, exchange, interval, fromDate, toDate } = input;
try {
logger.info('Fetching intraday data', {
symbol,
exchange,
interval,
from: fromDate?.toISOString(),
to: toDate?.toISOString()
logger.info(`Fetching intraday data for ${symbol}.${exchange} - ${interval}`, {
from: fromDate?.toISOString().split('T')[0],
to: toDate?.toISOString().split('T')[0]
});
// Get API key
@ -280,7 +285,7 @@ export async function fetchIntraday(
return { success: true, recordsSaved: 0 };
}
logger.info(`Fetched ${data.length} intraday records`, { symbol, exchange, interval });
logger.info(`Fetched ${data.length} intraday records for ${symbol}.${exchange} - ${interval}`);
// Add metadata to each record
const recordsWithMetadata = data.map(bar => ({

View file

@ -13,18 +13,14 @@ export async function scheduleFetchPrices(
const logger = this.logger;
try {
logger.info('Scheduling price fetch jobs for Canadian symbols');
logger.info('Scheduling price fetch jobs for all symbols');
// Calculate date one week ago
const oneWeekAgo = new Date();
oneWeekAgo.setDate(oneWeekAgo.getDate() - 7);
// Get Canadian exchanges (TSX, TSV, CNQ, NEO)
const canadianExchanges = ['TO', 'V', 'CN', 'NEO'];
// Find symbols that haven't been updated in the last week
// Find ALL symbols that haven't been updated in the last week
const symbols = await this.mongodb.collection('eodSymbols').find({
Exchange: { $in: canadianExchanges },
delisted: false,
$or: [
{ lastPriceUpdate: { $lt: oneWeekAgo } },
@ -33,17 +29,30 @@ export async function scheduleFetchPrices(
}).toArray();
if (!symbols || symbols.length === 0) {
logger.info('No Canadian symbols need price updates');
logger.info('No symbols need price updates');
return { success: true, jobsScheduled: 0 };
}
logger.info(`Found ${symbols.length} Canadian symbols needing price updates`);
logger.info(`Found ${symbols.length} symbols needing price updates`, {
symbols: symbols.map(s => ({
symbol: s.Code,
exchange: s.Exchange,
name: s.Name,
lastUpdate: s.lastPriceUpdate
}))
});
let jobsScheduled = 0;
// Schedule jobs with staggered delays
for (let i = 0; i < symbols.length; i++) {
const symbol = symbols[i];
logger.debug(`Scheduling price fetch for ${symbol.Code}.${symbol.Exchange}`, {
name: symbol.Name,
lastUpdate: symbol.lastPriceUpdate,
delay: i * 100
});
await this.scheduleOperation('fetch-prices', {
symbol: symbol.Code,
exchange: symbol.Exchange
@ -78,7 +87,7 @@ export async function fetchPrices(
const { symbol, exchange } = input;
try {
logger.info('Fetching prices', { symbol, exchange });
logger.info(`Fetching prices for ${symbol}.${exchange}`);
// Get API key from config
const apiKey = EOD_CONFIG.API_TOKEN;
@ -90,7 +99,6 @@ export async function fetchPrices(
const url = new URL(`https://eodhd.com/api/eod/${symbol}.${exchange}`);
url.searchParams.append('api_token', apiKey);
url.searchParams.append('fmt', 'json');
// Fetch price data from EOD API
const response = await fetch(url.toString());
@ -107,6 +115,15 @@ export async function fetchPrices(
logger.info(`Fetched ${priceData.length} price records for ${symbol}.${exchange}`);
// Log date range of prices
if (priceData.length > 0) {
logger.debug(`Price data range for ${symbol}.${exchange}:`, {
oldest: priceData[0].date,
newest: priceData[priceData.length - 1].date,
count: priceData.length
});
}
// Add metadata to each price record
const pricesWithMetadata = priceData.map(price => ({
symbol,

View file

@ -38,9 +38,22 @@ export async function fetchSymbolChangeHistory(
logger.info(`Fetched ${data.length} symbol change records`);
if (data.length === 0) {
logger.info('No symbol changes found');
return { success: true, changesCount: 0 };
}
// Log some sample changes
logger.debug('Sample symbol changes:', {
count: data.length,
samples: data.slice(0, 5).map(c => ({
oldSymbol: c.oldSymbol,
newSymbol: c.newSymbol,
exchange: c.exchange,
date: c.date,
note: c.note
}))
});
// Add metadata to each record
const changesWithMetadata = data.map(change => ({
...change,
@ -73,7 +86,10 @@ export async function fetchSymbolChangeHistory(
if (updateResult.modifiedCount > 0) {
updatedSymbols++;
logger.debug(`Updated symbol ${change.oldSymbol} to ${change.newSymbol} on ${change.exchange}`);
logger.info(`Updated symbol ${change.oldSymbol} to ${change.newSymbol} on ${change.exchange}`, {
date: change.date,
note: change.note
});
}
}
}

View file

@ -14,17 +14,19 @@ export async function scheduleFetchSymbols(
const logger = this.logger;
try {
logger.info('Scheduling symbol fetch jobs for all exchanges');
logger.info('Starting symbol fetch job scheduling');
// Get all exchanges from MongoDB
const exchanges = await this.mongodb.collection('eodExchanges').find({}).toArray();
if (!exchanges || exchanges.length === 0) {
logger.warn('No exchanges found in database');
logger.warn('No exchanges found in database - run fetch-exchanges first');
return { success: true, jobsCreated: 0 };
}
logger.info(`Found ${exchanges.length} exchanges to process`);
logger.info(`Found ${exchanges.length} exchanges to process`, {
exchanges: exchanges.map(e => ({ code: e.Code, name: e.Name, country: e.Country }))
});
let jobsCreated = 0;
@ -79,7 +81,7 @@ export async function fetchSymbols(
const { exchangeCode, delisted } = input;
try {
logger.info('Fetching symbols for exchange', { exchangeCode, delisted });
logger.info(`Fetching ${delisted ? 'delisted' : 'active'} symbols for ${exchangeCode}`);
// Get API key from config
const apiKey = EOD_CONFIG.API_TOKEN;
@ -111,6 +113,18 @@ export async function fetchSymbols(
logger.info(`Fetched ${symbols.length} ${delisted ? 'delisted' : 'active'} symbols for ${exchangeCode}`);
// Log some sample symbols
if (symbols.length > 0) {
logger.debug(`Sample ${delisted ? 'delisted' : 'active'} symbols for ${exchangeCode}:`, {
count: symbols.length,
samples: symbols.slice(0, 5).map(s => ({
code: s.Code,
name: s.Name,
type: s.Type
}))
});
}
// Add metadata to each symbol
const symbolsWithMetadata = symbols.map(symbol => ({
...symbol,

View file

@ -16,6 +16,7 @@ import {
crawlIntraday,
scheduleIntradayCrawl,
fetchBulkFundamentals,
fetchSingleFundamentals,
scheduleFetchFundamentals,
fetchSymbolChangeHistory,
fetchCorporateActions,
@ -63,11 +64,11 @@ export class EodHandler extends BaseHandler<DataIngestionServices> {
* Called by schedule-fetch-symbols for each exchange
*/
@Operation('fetch-symbols')
@RateLimit(10) // 10 points per exchange
@RateLimit(1) // 1 point per exchange
fetchSymbols = fetchSymbols;
/**
* Schedule price fetching for Canadian symbols not updated in last week
* Schedule price fetching for all symbols not updated in last week
* Runs daily at 2 AM
*/
@Operation('schedule-fetch-prices')
@ -80,7 +81,7 @@ export class EodHandler extends BaseHandler<DataIngestionServices> {
* Called by schedule-fetch-prices for each symbol
*/
@Operation('fetch-prices')
@RateLimit(10) // 10 points per price fetch
@RateLimit(1) // 1 point per price fetch
fetchPrices = fetchPrices;
/**
@ -97,7 +98,7 @@ export class EodHandler extends BaseHandler<DataIngestionServices> {
* Handles resumption and batch processing
*/
@Operation('crawl-intraday')
@RateLimit(50) // 50 points per crawl batch
@RateLimit(5) // 5 points per crawl batch
crawlIntraday = crawlIntraday;
/**
@ -105,7 +106,7 @@ export class EodHandler extends BaseHandler<DataIngestionServices> {
* Called by crawl-intraday
*/
@Operation('fetch-intraday')
@RateLimit(50) // 50 points per intraday fetch
@RateLimit(5) // 5 points per intraday fetch
fetchIntraday = fetchIntraday;
/**
@ -119,27 +120,35 @@ export class EodHandler extends BaseHandler<DataIngestionServices> {
/**
* Fetch fundamentals for up to 500 symbols in bulk
* Called by schedule-fetch-fundamentals
* Called by schedule-fetch-fundamentals for non-ETFs
*/
@Operation('fetch-bulk-fundamentals')
@RateLimit(100) // 100 points per bulk request
@RateLimit(600) // 100 base + up to 500 symbols = 600 points max
fetchBulkFundamentals = fetchBulkFundamentals;
/**
* Fetch fundamentals for a single symbol
* Called by schedule-fetch-fundamentals for ETFs
*/
@Operation('fetch-single-fundamentals')
@RateLimit(10) // 10 points per single fundamentals fetch
fetchSingleFundamentals = fetchSingleFundamentals;
/**
* Fetch symbol change history
* Runs weekly on Mondays at 5 AM
*/
@Operation('fetch-symbol-change-history')
@ScheduledOperation('fetch-symbol-change-history', '0 5 * * 1')
@RateLimit(10) // 10 points per request
@RateLimit(5) // 5 points per request
fetchSymbolChangeHistory = fetchSymbolChangeHistory;
/**
* Schedule corporate actions (dividends and splits) fetch
* Runs monthly on the 1st at 6 AM
* DISABLED - Corporate actions likely come with fundamentals
*/
@Operation('schedule-fetch-corporate-actions')
@ScheduledOperation('schedule-fetch-corporate-actions', '0 6 1 * *')
// @ScheduledOperation('schedule-fetch-corporate-actions', '0 6 1 * *') // Disabled - would run monthly on the 1st at 6 AM
@RateLimit(1) // 1 point for scheduling
scheduleFetchCorporateActions = scheduleFetchCorporateActions;

View file

@ -1,5 +1,5 @@
export const EOD_CONFIG = {
// API configuration
API_BASE_URL: 'https://eodhd.com/api/',
API_TOKEN: '657fe003583a32.85708911"',
API_TOKEN: '657fe003583a32.85708911',
};