added some more api's to eod

This commit is contained in:
Boki 2025-07-06 21:45:12 -04:00
parent 8630852dba
commit 8f65c19d46
5 changed files with 530 additions and 1 deletions

View file

@ -0,0 +1,188 @@
import type { BaseHandler } from '@stock-bot/handlers';
import type { DataIngestionServices } from '../../../types';
import { EOD_CONFIG } from '../shared';
interface FetchCorporateActionsInput {
symbol: string;
exchange: string;
actionType: 'dividends' | 'splits';
}
export async function scheduleFetchCorporateActions(
this: BaseHandler<DataIngestionServices>
): Promise<{ success: boolean; jobsScheduled: number }> {
const logger = this.logger;
try {
logger.info('Scheduling corporate actions fetch jobs');
// Calculate date one month ago for stale check
const oneMonthAgo = new Date();
oneMonthAgo.setMonth(oneMonthAgo.getMonth() - 1);
// Get Canadian exchanges for now
const canadianExchanges = ['TO', 'V', 'CN', 'NEO'];
// Find symbols that need corporate actions update
const symbols = await this.mongodb.collection('eodSymbols').find({
Exchange: { $in: canadianExchanges },
delisted: false,
$or: [
{ lastCorporateActionsUpdate: { $lt: oneMonthAgo } },
{ lastCorporateActionsUpdate: { $exists: false } }
]
}).limit(500).toArray(); // Limit to avoid too many jobs at once
if (!symbols || symbols.length === 0) {
logger.info('No symbols need corporate actions update');
return { success: true, jobsScheduled: 0 };
}
logger.info(`Found ${symbols.length} symbols needing corporate actions update`);
let jobsScheduled = 0;
// Schedule jobs for each symbol - both dividends and splits
for (let i = 0; i < symbols.length; i++) {
const symbol = symbols[i];
// Schedule dividends fetch
await this.scheduleOperation('fetch-corporate-actions', {
symbol: symbol.Code,
exchange: symbol.Exchange,
actionType: 'dividends'
}, {
attempts: 3,
backoff: {
type: 'exponential',
delay: 5000
},
delay: i * 200 // Stagger jobs by 200ms per symbol
});
jobsScheduled++;
// Schedule splits fetch
await this.scheduleOperation('fetch-corporate-actions', {
symbol: symbol.Code,
exchange: symbol.Exchange,
actionType: 'splits'
}, {
attempts: 3,
backoff: {
type: 'exponential',
delay: 5000
},
delay: (i * 200) + 100 // Offset splits by 100ms from dividends
});
jobsScheduled++;
}
logger.info(`Successfully scheduled ${jobsScheduled} corporate actions fetch jobs`);
return {
success: true,
jobsScheduled
};
} catch (error) {
logger.error('Failed to schedule corporate actions fetch jobs', { error });
throw error;
}
}
export async function fetchCorporateActions(
this: BaseHandler<DataIngestionServices>,
input: FetchCorporateActionsInput
): Promise<{ success: boolean; recordsCount: number }> {
const logger = this.logger;
const { symbol, exchange, actionType } = input;
try {
logger.info('Fetching corporate actions', { symbol, exchange, actionType });
// Get API key
const apiKey = EOD_CONFIG.API_TOKEN;
if (!apiKey) {
throw new Error('EOD API key not configured');
}
// Build URL based on action type
const endpoint = actionType === 'dividends' ? 'div' : 'splits';
const url = new URL(`https://eodhd.com/api/${endpoint}/${symbol}.${exchange}`);
url.searchParams.append('api_token', apiKey);
url.searchParams.append('fmt', 'json');
// Fetch data
const response = await fetch(url.toString());
if (!response.ok) {
throw new Error(`EOD ${actionType} API returned ${response.status}: ${response.statusText}`);
}
const data = await response.json();
// EOD returns an array of corporate actions
if (!Array.isArray(data)) {
throw new Error('Invalid response format from EOD API - expected array');
}
logger.info(`Fetched ${data.length} ${actionType} records for ${symbol}.${exchange}`);
if (data.length === 0) {
// Update symbol to indicate we checked but found no data
await this.mongodb.collection('eodSymbols').updateOne(
{ Code: symbol, Exchange: exchange },
{
$set: {
[`last${actionType.charAt(0).toUpperCase() + actionType.slice(1)}Update`]: new Date(),
[`has${actionType.charAt(0).toUpperCase() + actionType.slice(1)}`]: false
}
}
);
return { success: true, recordsCount: 0 };
}
// Add metadata to each record
const recordsWithMetadata = data.map(record => ({
symbol,
exchange,
symbolExchange: `${symbol}.${exchange}`,
...record,
actionType,
updatedAt: new Date(),
source: 'eod'
}));
// Determine collection name based on action type
const collectionName = actionType === 'dividends' ? 'eodDividends' : 'eodSplits';
// Save to MongoDB - use date and symbol as unique identifier
const result = await this.mongodb.batchUpsert(
collectionName,
recordsWithMetadata,
['date', 'symbolExchange']
);
// Update symbol with last update timestamp
await this.mongodb.collection('eodSymbols').updateOne(
{ Code: symbol, Exchange: exchange },
{
$set: {
lastCorporateActionsUpdate: new Date(),
[`last${actionType.charAt(0).toUpperCase() + actionType.slice(1)}Update`]: new Date(),
[`has${actionType.charAt(0).toUpperCase() + actionType.slice(1)}`]: true
}
}
);
logger.info(`Successfully saved ${result.insertedCount} ${actionType} records for ${symbol}.${exchange}`);
return {
success: true,
recordsCount: result.insertedCount
};
} catch (error) {
logger.error('Failed to fetch corporate actions', { error, symbol, exchange, actionType });
throw error;
}
}

View file

@ -0,0 +1,197 @@
import type { BaseHandler } from '@stock-bot/handlers';
import type { DataIngestionServices } from '../../../types';
import { EOD_CONFIG } from '../shared';
interface BulkFundamentalsInput {
symbols: Array<{ symbol: string; exchange: string }>;
}
export async function scheduleFetchFundamentals(
this: BaseHandler<DataIngestionServices>
): Promise<{ success: boolean; jobsScheduled: number }> {
const logger = this.logger;
try {
logger.info('Scheduling fundamentals fetch jobs');
// Calculate date one week ago for stale check
const oneWeekAgo = new Date();
oneWeekAgo.setDate(oneWeekAgo.getDate() - 7);
// Get Canadian exchanges for now
const canadianExchanges = ['TO', 'V', 'CN', 'NEO'];
// Find symbols that need fundamentals update
const symbols = await this.mongodb.collection('eodSymbols').find({
Exchange: { $in: canadianExchanges },
delisted: false,
$or: [
{ lastFundamentalsUpdate: { $lt: oneWeekAgo } },
{ lastFundamentalsUpdate: { $exists: false } }
]
}).toArray();
if (!symbols || symbols.length === 0) {
logger.info('No symbols need fundamentals update');
return { success: true, jobsScheduled: 0 };
}
logger.info(`Found ${symbols.length} symbols needing fundamentals update`);
let jobsScheduled = 0;
const batchSize = 500; // EOD allows up to 500 symbols per bulk request
// Create batches of 500 symbols
for (let i = 0; i < symbols.length; i += batchSize) {
const batch = symbols.slice(i, i + batchSize);
// Convert to array of {symbol, exchange} objects
const symbolBatch = batch.map(s => ({
symbol: s.Code,
exchange: s.Exchange
}));
await this.scheduleOperation('fetch-bulk-fundamentals', {
symbols: symbolBatch
}, {
attempts: 3,
backoff: {
type: 'exponential',
delay: 10000
},
delay: jobsScheduled * 5000 // Stagger batches by 5 seconds
});
jobsScheduled++;
logger.info(`Scheduled fundamentals batch ${jobsScheduled} with ${symbolBatch.length} symbols`);
}
logger.info(`Successfully scheduled ${jobsScheduled} fundamentals fetch jobs`);
return {
success: true,
jobsScheduled
};
} catch (error) {
logger.error('Failed to schedule fundamentals fetch jobs', { error });
throw error;
}
}
export async function fetchBulkFundamentals(
this: BaseHandler<DataIngestionServices>,
input: BulkFundamentalsInput
): Promise<{ success: boolean; symbolsProcessed: number }> {
const logger = this.logger;
const { symbols } = input;
try {
logger.info('Fetching bulk fundamentals', { symbolCount: symbols.length });
// Get API key
const apiKey = EOD_CONFIG.API_TOKEN;
if (!apiKey) {
throw new Error('EOD API key not configured');
}
// Group symbols by exchange for the API call
const exchangeGroups = symbols.reduce((acc, { symbol, exchange }) => {
if (!acc[exchange]) {
acc[exchange] = [];
}
acc[exchange].push(`${symbol}.${exchange}`);
return acc;
}, {} as Record<string, string[]>);
let totalProcessed = 0;
// Process each exchange group
for (const [exchange, symbolList] of Object.entries(exchangeGroups)) {
// Build URL - using the exchange as the endpoint
const url = new URL(`https://eodhd.com/api/bulk-fundamentals/${exchange}`);
url.searchParams.append('api_token', apiKey);
url.searchParams.append('fmt', 'json');
url.searchParams.append('symbols', symbolList.join(','));
logger.info(`Fetching fundamentals for ${symbolList.length} ${exchange} symbols`);
// Fetch data
const response = await fetch(url.toString());
if (!response.ok) {
throw new Error(`EOD Bulk Fundamentals API returned ${response.status}: ${response.statusText}`);
}
const data = await response.json();
// EOD bulk fundamentals returns an object with symbol keys
if (typeof data !== 'object' || !data) {
throw new Error('Invalid response format from EOD API - expected object');
}
// Process each symbol's fundamentals
const fundamentalsToSave = [];
const symbolsToUpdate = [];
for (const [symbolExchange, fundamentals] of Object.entries(data)) {
if (!fundamentals || typeof fundamentals !== 'object') {
logger.warn(`No fundamentals data for ${symbolExchange}`);
continue;
}
// Extract symbol and exchange from the key
const [symbol, exc] = symbolExchange.split('.');
// Add metadata
const fundamentalsWithMetadata = {
symbol,
exchange: exc,
symbolExchange,
...fundamentals,
updatedAt: new Date(),
source: 'eod'
};
fundamentalsToSave.push(fundamentalsWithMetadata);
symbolsToUpdate.push({ symbol, exchange: exc });
}
if (fundamentalsToSave.length > 0) {
// Save fundamentals to MongoDB
const result = await this.mongodb.batchUpsert(
'eodFundamentals',
fundamentalsToSave,
['symbolExchange']
);
logger.info(`Saved ${result.insertedCount} fundamentals records for ${exchange}`);
// Update symbols with last update timestamp
const updatePromises = symbolsToUpdate.map(({ symbol, exchange }) =>
this.mongodb.collection('eodSymbols').updateOne(
{ Code: symbol, Exchange: exchange },
{
$set: {
lastFundamentalsUpdate: new Date(),
hasFundamentals: true
}
}
)
);
await Promise.all(updatePromises);
totalProcessed += fundamentalsToSave.length;
}
}
logger.info(`Successfully processed fundamentals for ${totalProcessed} symbols`);
return {
success: true,
symbolsProcessed: totalProcessed
};
} catch (error) {
logger.error('Failed to fetch bulk fundamentals', { error });
throw error;
}
}

View file

@ -2,3 +2,6 @@ export { fetchExchanges } from './exchanges';
export { fetchSymbols, scheduleFetchSymbols } from './symbols'; export { fetchSymbols, scheduleFetchSymbols } from './symbols';
export { fetchPrices, scheduleFetchPrices } from './prices'; export { fetchPrices, scheduleFetchPrices } from './prices';
export { fetchIntraday, crawlIntraday, scheduleIntradayCrawl } from './intraday'; export { fetchIntraday, crawlIntraday, scheduleIntradayCrawl } from './intraday';
export { fetchBulkFundamentals, scheduleFetchFundamentals } from './fundamentals';
export { fetchSymbolChangeHistory } from './symbol-change';
export { fetchCorporateActions, scheduleFetchCorporateActions } from './corporate-actions';

View file

@ -0,0 +1,93 @@
import type { BaseHandler } from '@stock-bot/handlers';
import type { DataIngestionServices } from '../../../types';
import { EOD_CONFIG } from '../shared';
export async function fetchSymbolChangeHistory(
this: BaseHandler<DataIngestionServices>
): Promise<{ success: boolean; changesCount: number }> {
const logger = this.logger;
try {
logger.info('Fetching symbol change history');
// Get API key
const apiKey = EOD_CONFIG.API_TOKEN;
if (!apiKey) {
throw new Error('EOD API key not configured');
}
// Build URL
const url = new URL('https://eodhd.com/api/symbol-change-history');
url.searchParams.append('api_token', apiKey);
url.searchParams.append('fmt', 'json');
// Fetch data
const response = await fetch(url.toString());
if (!response.ok) {
throw new Error(`EOD Symbol Change API returned ${response.status}: ${response.statusText}`);
}
const data = await response.json();
// EOD returns an array of symbol changes
if (!Array.isArray(data)) {
throw new Error('Invalid response format from EOD API - expected array');
}
logger.info(`Fetched ${data.length} symbol change records`);
if (data.length === 0) {
return { success: true, changesCount: 0 };
}
// Add metadata to each record
const changesWithMetadata = data.map(change => ({
...change,
updatedAt: new Date(),
source: 'eod'
}));
// Clear existing data and insert new (symbol changes are a complete dataset)
await this.mongodb.collection('eodSymbolChange').deleteMany({});
const result = await this.mongodb.collection('eodSymbolChange').insertMany(changesWithMetadata);
logger.info(`Successfully saved ${result.insertedCount} symbol change records`);
// Process symbol changes to update affected symbols in eodSymbols collection
let updatedSymbols = 0;
for (const change of data) {
if (change.oldSymbol && change.newSymbol && change.exchange) {
// Update the symbol if it exists
const updateResult = await this.mongodb.collection('eodSymbols').updateOne(
{ Code: change.oldSymbol, Exchange: change.exchange },
{
$set: {
Code: change.newSymbol,
previousCode: change.oldSymbol,
symbolChangeDate: change.date,
symbolChangeNote: change.note || 'Symbol changed'
}
}
);
if (updateResult.modifiedCount > 0) {
updatedSymbols++;
logger.debug(`Updated symbol ${change.oldSymbol} to ${change.newSymbol} on ${change.exchange}`);
}
}
}
if (updatedSymbols > 0) {
logger.info(`Updated ${updatedSymbols} symbols based on change history`);
}
return {
success: true,
changesCount: result.insertedCount
};
} catch (error) {
logger.error('Failed to fetch symbol change history', { error });
throw error;
}
}

View file

@ -14,7 +14,12 @@ import {
scheduleFetchPrices, scheduleFetchPrices,
fetchIntraday, fetchIntraday,
crawlIntraday, crawlIntraday,
scheduleIntradayCrawl scheduleIntradayCrawl,
fetchBulkFundamentals,
scheduleFetchFundamentals,
fetchSymbolChangeHistory,
fetchCorporateActions,
scheduleFetchCorporateActions
} from './actions'; } from './actions';
/** /**
@ -102,4 +107,47 @@ export class EodHandler extends BaseHandler<DataIngestionServices> {
@Operation('fetch-intraday') @Operation('fetch-intraday')
@RateLimit(50) // 50 points per intraday fetch @RateLimit(50) // 50 points per intraday fetch
fetchIntraday = fetchIntraday; fetchIntraday = fetchIntraday;
/**
* Schedule bulk fundamentals fetch for symbols
* DISABLED - Not scheduled automatically
*/
@Operation('schedule-fetch-fundamentals')
// @ScheduledOperation('schedule-fetch-fundamentals', '0 4 * * 0') // Disabled - would run weekly at 4 AM on Sundays
@RateLimit(1) // 1 point for scheduling
scheduleFetchFundamentals = scheduleFetchFundamentals;
/**
* Fetch fundamentals for up to 500 symbols in bulk
* Called by schedule-fetch-fundamentals
*/
@Operation('fetch-bulk-fundamentals')
@RateLimit(100) // 100 points per bulk request
fetchBulkFundamentals = fetchBulkFundamentals;
/**
* Fetch symbol change history
* Runs weekly on Mondays at 5 AM
*/
@Operation('fetch-symbol-change-history')
@ScheduledOperation('fetch-symbol-change-history', '0 5 * * 1')
@RateLimit(10) // 10 points per request
fetchSymbolChangeHistory = fetchSymbolChangeHistory;
/**
* Schedule corporate actions (dividends and splits) fetch
* Runs monthly on the 1st at 6 AM
*/
@Operation('schedule-fetch-corporate-actions')
@ScheduledOperation('schedule-fetch-corporate-actions', '0 6 1 * *')
@RateLimit(1) // 1 point for scheduling
scheduleFetchCorporateActions = scheduleFetchCorporateActions;
/**
* Fetch dividends or splits for a specific symbol
* Called by schedule-fetch-corporate-actions
*/
@Operation('fetch-corporate-actions')
@RateLimit(10) // 10 points per fetch
fetchCorporateActions = fetchCorporateActions;
} }