This commit is contained in:
Bojan Kucera 2025-06-04 22:46:01 -04:00
parent 7993148a95
commit 528be93804
38 changed files with 4617 additions and 1081 deletions

View file

@ -0,0 +1,59 @@
/**
* Data Service - Combined live and historical data ingestion
*/
import { createLogger } from '@stock-bot/logger';
import { loadEnvVariables } from '@stock-bot/config';
import { Hono } from 'hono';
import { serve } from '@hono/node-server';
// Load environment variables
loadEnvVariables();
const app = new Hono();
const logger = createLogger('data-service');
const PORT = parseInt(process.env.DATA_SERVICE_PORT || '3002');
// Health check endpoint
app.get('/health', (c) => {
return c.json({
service: 'data-service',
status: 'healthy',
timestamp: new Date().toISOString()
});
});
// API routes
app.get('/api/live/:symbol', async (c) => {
const symbol = c.req.param('symbol');
logger.info('Live data request', { symbol });
// TODO: Implement live data fetching
return c.json({
symbol,
message: 'Live data endpoint - not implemented yet'
});
});
app.get('/api/historical/:symbol', async (c) => {
const symbol = c.req.param('symbol');
const from = c.req.query('from');
const to = c.req.query('to');
logger.info('Historical data request', { symbol, from, to });
// TODO: Implement historical data fetching
return c.json({
symbol,
from,
to,
message: 'Historical data endpoint - not implemented yet'
});
});
// Start server
serve({
fetch: app.fetch,
port: PORT,
});
logger.info(`Data Service started on port ${PORT}`);

View file

@ -0,0 +1,367 @@
/**
* Unified data interface for live and historical data
*/
import { createLogger } from '@stock-bot/logger';
import { QuestDBClient } from '@stock-bot/questdb-client';
import { EventBus } from '@stock-bot/event-bus';
const logger = createLogger('unified-data-provider');
export interface MarketData {
symbol: string;
timestamp: Date;
open: number;
high: number;
low: number;
close: number;
volume: number;
source?: string;
}
export interface DataProviderConfig {
questdb?: {
host: string;
port: number;
};
enableCache?: boolean;
cacheSize?: number;
}
export interface DataProvider {
getLiveData(symbol: string): Promise<MarketData>;
getHistoricalData(symbol: string, from: Date, to: Date, interval?: string): Promise<MarketData[]>;
subscribeLiveData(symbol: string, callback: (data: MarketData) => void): Promise<void>;
unsubscribeLiveData(symbol: string): Promise<void>;
}
export class UnifiedDataProvider implements DataProvider {
private questdb?: QuestDBClient;
private eventBus: EventBus;
private cache: Map<string, MarketData[]> = new Map();
private liveSubscriptions: Map<string, ((data: MarketData) => void)[]> = new Map();
private config: DataProviderConfig;
constructor(eventBus: EventBus, config: DataProviderConfig = {}) {
this.eventBus = eventBus;
this.config = {
enableCache: true,
cacheSize: 10000,
...config
};
// Initialize QuestDB client
if (config.questdb) {
this.questdb = new QuestDBClient({
host: config.questdb.host,
port: config.questdb.port
});
}
this.initializeEventSubscriptions();
}
private initializeEventSubscriptions(): void {
// Subscribe to live market data events
this.eventBus.subscribe('market.data.live', (message) => {
const data = message.data as MarketData;
this.handleLiveData(data);
});
// Subscribe to historical data requests
this.eventBus.subscribe('market.data.request', async (message) => {
const { symbol, from, to, interval, requestId } = message.data;
try {
const data = await this.getHistoricalData(symbol, new Date(from), new Date(to), interval);
await this.eventBus.publish('market.data.response', {
requestId,
symbol,
data,
status: 'success'
});
} catch (error) {
await this.eventBus.publish('market.data.response', {
requestId,
symbol,
error: error.message,
status: 'error'
});
}
});
}
async getLiveData(symbol: string): Promise<MarketData> {
logger.info('Fetching live data', { symbol });
try {
// First check cache for recent data
if (this.config.enableCache) {
const cached = this.getFromCache(symbol);
if (cached && this.isRecentData(cached)) {
return cached;
}
}
// For demo purposes, generate simulated live data
// In production, this would integrate with real market data providers
const liveData = this.generateSimulatedData(symbol);
// Store in cache
if (this.config.enableCache) {
this.addToCache(symbol, liveData);
}
// Publish live data event
await this.eventBus.publishMarketData(symbol, liveData);
return liveData;
} catch (error) {
logger.error('Failed to fetch live data', { symbol, error });
throw error;
}
}
async getHistoricalData(symbol: string, from: Date, to: Date, interval: string = '1m'): Promise<MarketData[]> {
logger.info('Fetching historical data', { symbol, from, to, interval });
try {
// Check cache first
const cacheKey = `${symbol}_${from.getTime()}_${to.getTime()}_${interval}`;
if (this.config.enableCache && this.cache.has(cacheKey)) {
logger.debug('Returning cached historical data', { symbol, cacheKey });
return this.cache.get(cacheKey)!;
}
// Try to fetch from QuestDB
let data: MarketData[] = [];
if (this.questdb) {
try {
data = await this.fetchFromQuestDB(symbol, from, to, interval);
} catch (error) {
logger.warn('Failed to fetch from QuestDB, generating simulated data', { error });
data = this.generateHistoricalData(symbol, from, to, interval);
}
} else {
// Generate simulated data if no QuestDB connection
data = this.generateHistoricalData(symbol, from, to, interval);
}
// Store in cache
if (this.config.enableCache) {
this.cache.set(cacheKey, data);
this.trimCache();
}
return data;
} catch (error) {
logger.error('Failed to fetch historical data', { symbol, from, to, error });
throw error;
}
}
async subscribeLiveData(symbol: string, callback: (data: MarketData) => void): Promise<void> {
logger.info('Subscribing to live data', { symbol });
if (!this.liveSubscriptions.has(symbol)) {
this.liveSubscriptions.set(symbol, []);
}
this.liveSubscriptions.get(symbol)!.push(callback);
// Start live data simulation for this symbol
this.startLiveDataSimulation(symbol);
}
async unsubscribeLiveData(symbol: string): Promise<void> {
logger.info('Unsubscribing from live data', { symbol });
this.liveSubscriptions.delete(symbol);
}
private async fetchFromQuestDB(symbol: string, from: Date, to: Date, interval: string): Promise<MarketData[]> {
if (!this.questdb) {
throw new Error('QuestDB client not initialized');
}
const query = `
SELECT symbol, timestamp, open, high, low, close, volume
FROM market_data
WHERE symbol = '${symbol}'
AND timestamp >= '${from.toISOString()}'
AND timestamp <= '${to.toISOString()}'
ORDER BY timestamp ASC
`;
const result = await this.questdb.query(query);
return result.rows.map(row => ({
symbol: row.symbol,
timestamp: new Date(row.timestamp),
open: parseFloat(row.open),
high: parseFloat(row.high),
low: parseFloat(row.low),
close: parseFloat(row.close),
volume: parseInt(row.volume),
source: 'questdb'
}));
}
private generateHistoricalData(symbol: string, from: Date, to: Date, interval: string): MarketData[] {
const data: MarketData[] = [];
const intervalMs = this.parseInterval(interval);
const current = new Date(from);
let basePrice = 100; // Starting price
while (current <= to) {
const timestamp = new Date(current);
// Generate realistic OHLCV data with some randomness
const volatility = 0.02;
const change = (Math.random() - 0.5) * volatility;
const open = basePrice;
const close = open * (1 + change);
const high = Math.max(open, close) * (1 + Math.random() * 0.01);
const low = Math.min(open, close) * (1 - Math.random() * 0.01);
const volume = Math.floor(Math.random() * 100000) + 10000;
data.push({
symbol,
timestamp,
open,
high,
low,
close,
volume,
source: 'simulated'
});
basePrice = close;
current.setTime(current.getTime() + intervalMs);
}
return data;
}
private generateSimulatedData(symbol: string): MarketData {
const now = new Date();
const basePrice = 100 + Math.sin(now.getTime() / 1000000) * 10;
const volatility = 0.001;
const open = basePrice + (Math.random() - 0.5) * volatility * basePrice;
const close = open + (Math.random() - 0.5) * volatility * basePrice;
const high = Math.max(open, close) + Math.random() * volatility * basePrice;
const low = Math.min(open, close) - Math.random() * volatility * basePrice;
const volume = Math.floor(Math.random() * 10000) + 1000;
return {
symbol,
timestamp: now,
open,
high,
low,
close,
volume,
source: 'simulated'
};
}
private parseInterval(interval: string): number {
const value = parseInt(interval.slice(0, -1));
const unit = interval.slice(-1).toLowerCase();
switch (unit) {
case 's': return value * 1000;
case 'm': return value * 60 * 1000;
case 'h': return value * 60 * 60 * 1000;
case 'd': return value * 24 * 60 * 60 * 1000;
default: return 60 * 1000; // Default to 1 minute
}
}
private handleLiveData(data: MarketData): void {
const callbacks = this.liveSubscriptions.get(data.symbol);
if (callbacks) {
callbacks.forEach(callback => {
try {
callback(data);
} catch (error) {
logger.error('Error in live data callback', { symbol: data.symbol, error });
}
});
}
}
private startLiveDataSimulation(symbol: string): void {
// Simulate live data updates every second
const interval = setInterval(async () => {
if (!this.liveSubscriptions.has(symbol)) {
clearInterval(interval);
return;
}
try {
const liveData = this.generateSimulatedData(symbol);
this.handleLiveData(liveData);
await this.eventBus.publishMarketData(symbol, liveData);
} catch (error) {
logger.error('Error in live data simulation', { symbol, error });
}
}, 1000);
}
private getFromCache(symbol: string): MarketData | null {
// Get most recent cached data for symbol
for (const [key, data] of this.cache.entries()) {
if (key.startsWith(symbol) && data.length > 0) {
return data[data.length - 1];
}
}
return null;
}
private isRecentData(data: MarketData): boolean {
const now = Date.now();
const dataTime = data.timestamp.getTime();
return (now - dataTime) < 60000; // Data is recent if less than 1 minute old
}
private addToCache(symbol: string, data: MarketData): void {
const key = `${symbol}_live`;
if (!this.cache.has(key)) {
this.cache.set(key, []);
}
const cached = this.cache.get(key)!;
cached.push(data);
// Keep only last 1000 data points for live data
if (cached.length > 1000) {
cached.splice(0, cached.length - 1000);
}
}
private trimCache(): void {
if (this.cache.size > this.config.cacheSize!) {
// Remove oldest entries
const entries = Array.from(this.cache.entries());
const toRemove = entries.slice(0, entries.length - this.config.cacheSize!);
toRemove.forEach(([key]) => this.cache.delete(key));
}
}
async close(): Promise<void> {
if (this.questdb) {
await this.questdb.close();
}
this.cache.clear();
this.liveSubscriptions.clear();
logger.info('Unified data provider closed');
}
}
// Factory function
export function createUnifiedDataProvider(eventBus: EventBus, config?: DataProviderConfig): UnifiedDataProvider {
return new UnifiedDataProvider(eventBus, config);
}