stock-bot/apps/stock/data-ingestion/src/handlers/eod/actions/prices.ts

152 lines
No EOL
4.6 KiB
TypeScript

import type { BaseHandler } from '@stock-bot/handlers';
import type { DataIngestionServices } from '../../../types';
import { EOD_CONFIG } from '../shared';
interface FetchPricesInput {
symbol: string;
exchange: string;
}
export async function scheduleFetchPrices(
this: BaseHandler<DataIngestionServices>
): Promise<{ success: boolean; jobsScheduled: number }> {
const logger = this.logger;
try {
logger.info('Scheduling price fetch jobs for Canadian symbols');
// Calculate date one week ago
const oneWeekAgo = new Date();
oneWeekAgo.setDate(oneWeekAgo.getDate() - 7);
// Get Canadian exchanges (TSX, TSV, CNQ, NEO)
const canadianExchanges = ['TO', 'V', 'CN', 'NEO'];
// Find symbols that haven't been updated in the last week
const symbols = await this.mongodb.collection('eodSymbols').find({
Exchange: { $in: canadianExchanges },
delisted: false,
$or: [
{ lastPriceUpdate: { $lt: oneWeekAgo } },
{ lastPriceUpdate: { $exists: false } }
]
}).toArray();
if (!symbols || symbols.length === 0) {
logger.info('No Canadian symbols need price updates');
return { success: true, jobsScheduled: 0 };
}
logger.info(`Found ${symbols.length} Canadian symbols needing price updates`);
let jobsScheduled = 0;
// Schedule jobs with staggered delays
for (let i = 0; i < symbols.length; i++) {
const symbol = symbols[i];
await this.scheduleOperation('fetch-prices', {
symbol: symbol.Code,
exchange: symbol.Exchange
}, {
attempts: 3,
backoff: {
type: 'exponential',
delay: 5000
},
delay: i * 100 // Stagger jobs by 100ms per symbol to avoid rate limit spikes
});
jobsScheduled++;
}
logger.info(`Successfully scheduled ${jobsScheduled} price fetch jobs`);
return {
success: true,
jobsScheduled
};
} catch (error) {
logger.error('Failed to schedule price fetch jobs', { error });
throw error;
}
}
export async function fetchPrices(
this: BaseHandler<DataIngestionServices>,
input: FetchPricesInput
): Promise<{ success: boolean; priceCount: number }> {
const logger = this.logger;
const { symbol, exchange } = input;
try {
logger.info('Fetching prices', { symbol, exchange });
// Get API key from config
const apiKey = EOD_CONFIG.API_TOKEN;
if (!apiKey) {
throw new Error('EOD API key not configured');
}
// Build URL for EOD price data
const url = new URL(`https://eodhd.com/api/eod/${symbol}.${exchange}`);
url.searchParams.append('api_token', apiKey);
url.searchParams.append('fmt', 'json');
// Fetch price data from EOD API
const response = await fetch(url.toString());
if (!response.ok) {
throw new Error(`EOD Prices API returned ${response.status}: ${response.statusText}`);
}
const priceData = await response.json();
// EOD returns an array of historical prices
if (!Array.isArray(priceData)) {
throw new Error('Invalid response format from EOD API - expected array');
}
logger.info(`Fetched ${priceData.length} price records for ${symbol}.${exchange}`);
// Add metadata to each price record
const pricesWithMetadata = priceData.map(price => ({
symbol,
exchange,
symbolExchange: `${symbol}.${exchange}`,
date: price.date,
open: price.open,
high: price.high,
low: price.low,
close: price.close,
adjustedClose: price.adjusted_close,
volume: price.volume,
}));
// Save to MongoDB - use date and symbol as unique identifier
const result = await this.mongodb.batchUpsert(
'eodPrices',
pricesWithMetadata,
['date', 'symbolExchange']
);
// Update the symbol's last price update timestamp
await this.mongodb.collection('eodSymbols').updateOne(
{ Code: symbol, Exchange: exchange },
{
$set: {
lastPriceUpdate: new Date(),
lastPriceDate: priceData.length > 0 ? priceData[priceData.length - 1].date : null
}
}
);
logger.info(`Successfully saved ${result.insertedCount} price records for ${symbol}.${exchange}`);
return {
success: true,
priceCount: result.insertedCount
};
} catch (error) {
logger.error('Failed to fetch or save prices', { error, symbol, exchange });
throw error;
}
}