no idea- added loki and other stuff to market-data-gateway, also added config lib

This commit is contained in:
Bojan Kucera 2025-06-03 11:37:58 -04:00
parent b957fb99aa
commit 1b71fc87ab
72 changed files with 6178 additions and 153 deletions

View file

@ -16,7 +16,6 @@
"hono": "^4.6.3",
"@hono/node-server": "^1.8.0",
"ws": "^8.18.0",
"axios": "^1.6.0",
"bull": "^4.12.0",
"ioredis": "^5.4.1",
"zod": "^3.22.0",
@ -29,6 +28,7 @@
"fast-json-stringify": "^5.10.0",
"pino": "^8.17.0",
"dotenv": "^16.3.0",
"@stock-bot/http-client": "*",
"@stock-bot/config": "*",
"@stock-bot/shared-types": "*",
"@stock-bot/event-bus": "*",

View file

@ -0,0 +1,114 @@
// Data Provider Configuration
export interface DataProviderConfig {
name: string;
type: 'rest' | 'websocket' | 'both';
enabled: boolean;
endpoints: {
quotes?: string;
candles?: string;
trades?: string;
websocket?: string;
};
authentication?: {
type: 'api_key' | 'bearer' | 'basic';
key?: string;
secret?: string;
token?: string;
};
rateLimits: {
requestsPerSecond: number;
requestsPerMinute: number;
requestsPerHour: number;
};
retryPolicy: {
maxRetries: number;
backoffMultiplier: number;
initialDelayMs: number;
};
timeout: number;
priority: number; // 1-10, higher is better
}
export const dataProviderConfigs: Record<string, DataProviderConfig> = {
'alpha-vantage': {
name: 'Alpha Vantage',
type: 'rest',
enabled: true,
endpoints: {
quotes: 'https://www.alphavantage.co/query',
candles: 'https://www.alphavantage.co/query',
},
authentication: {
type: 'api_key',
key: process.env.ALPHA_VANTAGE_API_KEY,
},
rateLimits: {
requestsPerSecond: 5,
requestsPerMinute: 500,
requestsPerHour: 25000,
},
retryPolicy: {
maxRetries: 3,
backoffMultiplier: 2,
initialDelayMs: 1000,
},
timeout: 10000,
priority: 7,
},
'yahoo-finance': {
name: 'Yahoo Finance',
type: 'rest',
enabled: true,
endpoints: {
quotes: 'https://query1.finance.yahoo.com/v8/finance/chart',
candles: 'https://query1.finance.yahoo.com/v8/finance/chart',
},
rateLimits: {
requestsPerSecond: 10,
requestsPerMinute: 2000,
requestsPerHour: 100000,
},
retryPolicy: {
maxRetries: 3,
backoffMultiplier: 1.5,
initialDelayMs: 500,
},
timeout: 8000,
priority: 8,
},
'polygon': {
name: 'Polygon.io',
type: 'both',
enabled: false,
endpoints: {
quotes: 'https://api.polygon.io/v2/last/nbbo',
candles: 'https://api.polygon.io/v2/aggs/ticker',
trades: 'https://api.polygon.io/v3/trades',
websocket: 'wss://socket.polygon.io/stocks',
},
authentication: {
type: 'api_key',
key: process.env.POLYGON_API_KEY,
},
rateLimits: {
requestsPerSecond: 100,
requestsPerMinute: 5000,
requestsPerHour: 100000,
},
retryPolicy: {
maxRetries: 5,
backoffMultiplier: 2,
initialDelayMs: 200,
},
timeout: 5000,
priority: 9,
},
};
export function getEnabledProviders(): DataProviderConfig[] {
return Object.values(dataProviderConfigs).filter(config => config.enabled);
}
export function getProviderByPriority(): DataProviderConfig[] {
return getEnabledProviders().sort((a, b) => b.priority - a.priority);
}

View file

@ -1,4 +1,4 @@
// Market Data Gateway - Unified Implementation
// Market Data Gateway - Enhanced Implementation
import { Hono } from 'hono';
import { cors } from 'hono/cors';
import { logger } from 'hono/logger';
@ -8,6 +8,12 @@ import { WebSocketServer } from 'ws';
// Types
import { GatewayConfig } from './types/MarketDataGateway';
// Services
import { DataNormalizer, DataNormalizationResult } from './services/DataNormalizer';
import { MarketDataCache } from './services/AdvancedCache';
import { ConnectionPoolManager } from './services/ConnectionPoolManager';
import { dataProviderConfigs, getEnabledProviders } from './config/DataProviderConfig';
// Simple logger interface
interface Logger {
info: (message: string, ...args: any[]) => void;
@ -18,12 +24,26 @@ interface Logger {
// Create application logger
const appLogger: Logger = {
info: (message: string, ...args: any[]) => console.log(`[MDG-UNIFIED] [INFO] ${message}`, ...args),
error: (message: string, ...args: any[]) => console.error(`[MDG-UNIFIED] [ERROR] ${message}`, ...args),
warn: (message: string, ...args: any[]) => console.warn(`[MDG-UNIFIED] [WARN] ${message}`, ...args),
debug: (message: string, ...args: any[]) => console.debug(`[MDG-UNIFIED] [DEBUG] ${message}`, ...args),
info: (message: string, ...args: any[]) => console.log(`[MDG-ENHANCED] [INFO] ${message}`, ...args),
error: (message: string, ...args: any[]) => console.error(`[MDG-ENHANCED] [ERROR] ${message}`, ...args),
warn: (message: string, ...args: any[]) => console.warn(`[MDG-ENHANCED] [WARN] ${message}`, ...args),
debug: (message: string, ...args: any[]) => console.debug(`[MDG-ENHANCED] [DEBUG] ${message}`, ...args),
};
// Initialize services
const dataNormalizer = new DataNormalizer();
const marketDataCache = new MarketDataCache();
const connectionPool = new ConnectionPoolManager({
maxConnections: 100,
maxConnectionsPerHost: 20,
connectionTimeout: 10000,
requestTimeout: 30000,
retryAttempts: 3,
retryDelay: 1000,
keepAlive: true,
maxIdleTime: 300000, // 5 minutes
});
// Configuration matching the GatewayConfig interface
const config: GatewayConfig = {
server: {
@ -36,7 +56,46 @@ const config: GatewayConfig = {
headers: ['Content-Type', 'Authorization'],
},
},
dataSources: [], // Array of DataSourceConfig, initially empty
dataSources: getEnabledProviders().map(provider => ({
id: provider.name.toLowerCase().replace(/\s+/g, '-'),
name: provider.name,
type: provider.type === 'both' ? 'websocket' : provider.type as any,
enabled: provider.enabled,
priority: provider.priority,
rateLimit: {
requestsPerSecond: provider.rateLimits.requestsPerSecond,
burstLimit: provider.rateLimits.requestsPerMinute,
},
connection: {
url: provider.endpoints.quotes || provider.endpoints.websocket || '',
authentication: provider.authentication ? {
type: provider.authentication.type === 'api_key' ? 'apikey' as const : 'basic' as const,
credentials: {
apiKey: provider.authentication.key || '',
secret: provider.authentication.secret || '',
token: provider.authentication.token || '',
},
} : undefined,
},
subscriptions: {
quotes: true,
trades: true,
orderbook: provider.endpoints.websocket ? true : false,
candles: true,
news: false,
},
symbols: ['AAPL', 'GOOGL', 'MSFT', 'AMZN', 'TSLA'], // Default symbols
retryPolicy: {
maxRetries: provider.retryPolicy.maxRetries,
backoffMultiplier: provider.retryPolicy.backoffMultiplier,
maxBackoffMs: provider.retryPolicy.initialDelayMs * 10,
},
healthCheck: {
intervalMs: 30000,
timeoutMs: provider.timeout,
expectedLatencyMs: 1000,
},
})),
processing: {
pipelines: [],
bufferSize: 10000,
@ -207,32 +266,134 @@ app.post('/api/v1/subscriptions', async (c) => {
}
});
// Market data endpoints
// Market data endpoints with enhanced functionality
app.get('/api/v1/data/tick/:symbol', async (c) => {
const symbol = c.req.param('symbol');
return c.json({
...mockTickData,
symbol: symbol.toUpperCase(),
});
const symbol = c.req.param('symbol').toUpperCase();
const source = c.req.query('source') || 'yahoo-finance';
try {
// Check cache first
const cacheKey = marketDataCache.getQuoteKey(symbol);
const cachedData = marketDataCache.get(cacheKey);
if (cachedData) {
appLogger.debug(`Cache hit for ${symbol}`);
return c.json({
...cachedData,
cached: true,
timestamp: new Date().toISOString(),
});
}
// Fetch from provider
const provider = dataProviderConfigs[source];
if (!provider || !provider.enabled) {
return c.json({ error: 'Data source not available' }, 400);
}
// Mock data for now (replace with actual API calls)
const mockData = {
symbol,
price: 150.25 + (Math.random() - 0.5) * 10,
volume: Math.floor(Math.random() * 100000),
timestamp: new Date().toISOString(),
bid: 150.20,
ask: 150.30,
source,
};
// Normalize the data
const normalizedResult = dataNormalizer.normalizeMarketData(mockData, source);
if (!normalizedResult.success) {
return c.json({ error: normalizedResult.error }, 500);
}
// Cache the result
marketDataCache.setQuote(symbol, normalizedResult.data);
return c.json({
...normalizedResult.data,
cached: false,
processingTimeMs: normalizedResult.processingTimeMs,
});
} catch (error) {
appLogger.error(`Error fetching tick data for ${symbol}:`, error);
return c.json({ error: 'Internal server error' }, 500);
}
});
app.get('/api/v1/data/candles/:symbol', async (c) => {
const symbol = c.req.param('symbol');
const symbol = c.req.param('symbol').toUpperCase();
const timeframe = c.req.query('timeframe') || '1m';
const limit = parseInt(c.req.query('limit') || '100');
const limit = Math.min(parseInt(c.req.query('limit') || '100'), 1000); // Max 1000
const source = c.req.query('source') || 'yahoo-finance';
const candles = Array.from({ length: limit }, (_, i) => ({
...mockCandleData,
symbol: symbol.toUpperCase(),
timeframe,
timestamp: new Date(Date.now() - i * 60000).toISOString(),
}));
return c.json({ candles });
try {
// Generate cache key
const cacheKey = `candles:${symbol}:${timeframe}:${limit}`;
const cachedData = marketDataCache.get(cacheKey);
if (cachedData) {
appLogger.debug(`Cache hit for candles ${symbol}:${timeframe}`);
return c.json({
candles: cachedData,
cached: true,
count: cachedData.length,
});
}
// Mock candle data generation (replace with actual API calls)
const candles = Array.from({ length: limit }, (_, i) => {
const timestamp = new Date(Date.now() - i * 60000);
const basePrice = 150 + (Math.random() - 0.5) * 20;
const variation = (Math.random() - 0.5) * 2;
return {
symbol,
timeframe,
timestamp: timestamp.toISOString(),
open: basePrice + variation,
high: basePrice + variation + Math.random() * 2,
low: basePrice + variation - Math.random() * 2,
close: basePrice + variation + (Math.random() - 0.5),
volume: Math.floor(Math.random() * 10000),
source,
};
}).reverse(); // Oldest first
// Normalize OHLCV data
const normalizedResult = dataNormalizer.normalizeOHLCV(
{ candles: candles.map(c => ({ ...c, timestamp: new Date(c.timestamp) })) },
source
);
if (!normalizedResult.success) {
return c.json({ error: normalizedResult.error }, 500);
}
// Cache the result
marketDataCache.set(cacheKey, normalizedResult.data, marketDataCache['getCandleTTL'](timeframe));
return c.json({
candles: normalizedResult.data,
cached: false,
count: normalizedResult.data?.length || 0,
processingTimeMs: normalizedResult.processingTimeMs,
});
} catch (error) {
appLogger.error(`Error fetching candles for ${symbol}:`, error);
return c.json({ error: 'Internal server error' }, 500);
}
});
// Metrics endpoints
// Enhanced metrics endpoints
app.get('/api/v1/metrics', async (c) => {
const cacheStats = marketDataCache.getStats();
const connectionStats = connectionPool.getStats();
return c.json({
system: {
uptime: process.uptime(),
@ -241,9 +402,91 @@ app.get('/api/v1/metrics', async (c) => {
},
gateway: {
activeConnections: webSocketServer ? webSocketServer.clients.size : 0,
dataSourcesCount: config.dataSources.length,
dataSourcesCount: config.dataSources.filter(ds => ds.enabled).length,
messagesProcessed: 0,
},
cache: cacheStats,
connectionPool: connectionStats,
timestamp: new Date().toISOString(),
});
});
// Data quality assessment endpoint
app.get('/api/v1/data/quality/:symbol', async (c) => {
const symbol = c.req.param('symbol').toUpperCase();
const source = c.req.query('source') || 'yahoo-finance';
try {
// Get recent data for quality assessment (mock for now)
const recentData = Array.from({ length: 10 }, (_, i) => ({
symbol,
price: 150 + (Math.random() - 0.5) * 10,
bid: 149.5,
ask: 150.5,
volume: Math.floor(Math.random() * 10000),
timestamp: new Date(Date.now() - i * 60000),
}));
const qualityMetrics = dataNormalizer.assessDataQuality(recentData, source);
return c.json({
symbol,
source,
dataPoints: recentData.length,
qualityMetrics,
timestamp: new Date().toISOString(),
});
} catch (error) {
appLogger.error(`Error assessing data quality for ${symbol}:`, error);
return c.json({ error: 'Internal server error' }, 500);
}
});
// Cache management endpoints
app.get('/api/v1/cache/stats', async (c) => {
return c.json({
stats: marketDataCache.getStats(),
keys: marketDataCache.keys().slice(0, 100), // Limit to first 100 keys
timestamp: new Date().toISOString(),
});
});
app.delete('/api/v1/cache/clear', async (c) => {
marketDataCache.clear();
return c.json({
message: 'Cache cleared successfully',
timestamp: new Date().toISOString(),
});
});
app.delete('/api/v1/cache/key/:key', async (c) => {
const key = c.req.param('key');
const deleted = marketDataCache.delete(key);
return c.json({
message: deleted ? 'Key deleted successfully' : 'Key not found',
key,
deleted,
timestamp: new Date().toISOString(),
});
});
// Data providers status endpoint
app.get('/api/v1/providers', async (c) => {
const providers = Object.values(dataProviderConfigs).map(provider => ({
name: provider.name,
enabled: provider.enabled,
type: provider.type,
priority: provider.priority,
rateLimits: provider.rateLimits,
endpoints: Object.keys(provider.endpoints),
}));
return c.json({
providers,
enabled: providers.filter(p => p.enabled).length,
total: providers.length,
timestamp: new Date().toISOString(),
});
});
@ -332,7 +575,7 @@ function setupWebSocketServer(): void {
appLogger.info(`WebSocket server listening on port ${wsPort}`);
}
// Graceful shutdown handler
// Enhanced graceful shutdown handler
async function gracefulShutdown(): Promise<void> {
if (isShuttingDown) return;
isShuttingDown = true;
@ -349,6 +592,14 @@ async function gracefulShutdown(): Promise<void> {
appLogger.info('WebSocket server closed');
}
// Close connection pool
await connectionPool.close();
appLogger.info('Connection pool closed');
// Clean up cache
marketDataCache.destroy();
appLogger.info('Cache destroyed');
appLogger.info('Graceful shutdown completed');
process.exit(0);
} catch (error) {
@ -357,10 +608,19 @@ async function gracefulShutdown(): Promise<void> {
}
}
// Start server function
// Enhanced start server function
async function startServer(): Promise<void> {
try {
appLogger.info('Starting Market Data Gateway...');
appLogger.info('Starting Enhanced Market Data Gateway...');
// Initialize cache event listeners
marketDataCache.on('hit', (key) => appLogger.debug(`Cache hit: ${key}`));
marketDataCache.on('miss', (key) => appLogger.debug(`Cache miss: ${key}`));
marketDataCache.on('evict', (key) => appLogger.debug(`Cache evict: ${key}`));
// Initialize connection pool event listeners
connectionPool.on('connectionCreated', (host) => appLogger.debug(`Connection created for: ${host}`));
connectionPool.on('error', ({ host, error }) => appLogger.warn(`Connection error for ${host}: ${error}`));
// Setup WebSocket server
setupWebSocketServer();
@ -369,9 +629,14 @@ async function startServer(): Promise<void> {
process.on('SIGTERM', gracefulShutdown);
process.on('SIGINT', gracefulShutdown);
// Log service status
appLogger.info(`HTTP server starting on ${config.server.host}:${config.server.port}`);
appLogger.info(`WebSocket server running on port ${config.server.port + 1}`);
appLogger.info('Market Data Gateway started successfully');
appLogger.info(`Data sources configured: ${config.dataSources.length}`);
appLogger.info(`Enabled providers: ${config.dataSources.filter(ds => ds.enabled).length}`);
appLogger.info(`Cache max size: ${marketDataCache['config'].maxSize}`);
appLogger.info(`Connection pool max connections: ${connectionPool['config'].maxConnections}`);
appLogger.info('Enhanced Market Data Gateway started successfully');
} catch (error) {
appLogger.error('Failed to start server:', error);

View file

@ -0,0 +1,361 @@
import { EventEmitter } from 'events';
export interface CacheEntry<T> {
data: T;
timestamp: number;
ttl: number;
hits: number;
lastAccessed: number;
}
export interface CacheStats {
totalEntries: number;
memoryUsage: number;
hitRate: number;
totalHits: number;
totalMisses: number;
averageAccessTime: number;
}
export interface CacheConfig {
maxSize: number;
defaultTtl: number;
cleanupInterval: number;
enableStats: boolean;
compressionEnabled: boolean;
}
export class AdvancedCache<T = any> extends EventEmitter {
private cache = new Map<string, CacheEntry<T>>();
private stats = {
hits: 0,
misses: 0,
totalAccessTime: 0,
accessCount: 0,
};
private cleanupTimer: NodeJS.Timeout | null = null;
constructor(private config: CacheConfig) {
super();
this.startCleanupTimer();
}
/**
* Get value from cache
*/
get(key: string): T | null {
const startTime = Date.now();
const entry = this.cache.get(key);
if (!entry) {
this.stats.misses++;
this.emit('miss', key);
return null;
}
// Check if entry has expired
if (Date.now() > entry.timestamp + entry.ttl) {
this.cache.delete(key);
this.stats.misses++;
this.emit('expired', key, entry);
return null;
}
// Update access statistics
entry.hits++;
entry.lastAccessed = Date.now();
this.stats.hits++;
if (this.config.enableStats) {
this.stats.totalAccessTime += Date.now() - startTime;
this.stats.accessCount++;
}
this.emit('hit', key, entry);
return entry.data;
}
/**
* Set value in cache
*/
set(key: string, value: T, ttl?: number): void {
const effectiveTtl = ttl || this.config.defaultTtl;
// Check cache size limits
if (this.cache.size >= this.config.maxSize && !this.cache.has(key)) {
this.evictLeastUsed();
}
const entry: CacheEntry<T> = {
data: value,
timestamp: Date.now(),
ttl: effectiveTtl,
hits: 0,
lastAccessed: Date.now(),
};
this.cache.set(key, entry);
this.emit('set', key, entry);
}
/**
* Delete value from cache
*/
delete(key: string): boolean {
const deleted = this.cache.delete(key);
if (deleted) {
this.emit('delete', key);
}
return deleted;
}
/**
* Check if key exists in cache
*/
has(key: string): boolean {
const entry = this.cache.get(key);
if (!entry) return false;
// Check if expired
if (Date.now() > entry.timestamp + entry.ttl) {
this.cache.delete(key);
return false;
}
return true;
}
/**
* Clear all cache entries
*/
clear(): void {
this.cache.clear();
this.resetStats();
this.emit('clear');
}
/**
* Get cache statistics
*/
getStats(): CacheStats {
const memoryUsage = this.estimateMemoryUsage();
const hitRate = this.stats.hits + this.stats.misses > 0
? this.stats.hits / (this.stats.hits + this.stats.misses)
: 0;
const averageAccessTime = this.stats.accessCount > 0
? this.stats.totalAccessTime / this.stats.accessCount
: 0;
return {
totalEntries: this.cache.size,
memoryUsage,
hitRate,
totalHits: this.stats.hits,
totalMisses: this.stats.misses,
averageAccessTime,
};
}
/**
* Get all cache keys
*/
keys(): string[] {
return Array.from(this.cache.keys());
}
/**
* Get cache size
*/
size(): number {
return this.cache.size;
}
/**
* Get or set with async loader function
*/
async getOrSet<K>(
key: string,
loader: () => Promise<K>,
ttl?: number
): Promise<K> {
const cached = this.get(key) as K;
if (cached !== null) {
return cached;
}
try {
const value = await loader();
this.set(key, value as any, ttl);
return value;
} catch (error) {
this.emit('error', key, error);
throw error;
}
}
/**
* Batch get multiple keys
*/
mget(keys: string[]): Map<string, T | null> {
const result = new Map<string, T | null>();
for (const key of keys) {
result.set(key, this.get(key));
}
return result;
}
/**
* Batch set multiple key-value pairs
*/
mset(entries: Map<string, T>, ttl?: number): void {
for (const [key, value] of entries) {
this.set(key, value, ttl);
}
}
/**
* Clean up expired entries
*/
cleanup(): number {
const now = Date.now();
let removedCount = 0;
for (const [key, entry] of this.cache.entries()) {
if (now > entry.timestamp + entry.ttl) {
this.cache.delete(key);
removedCount++;
this.emit('expired', key, entry);
}
}
return removedCount;
}
/**
* Evict least recently used entries
*/
private evictLeastUsed(): void {
let oldestKey: string | null = null;
let oldestTime = Date.now();
for (const [key, entry] of this.cache.entries()) {
if (entry.lastAccessed < oldestTime) {
oldestTime = entry.lastAccessed;
oldestKey = key;
}
}
if (oldestKey) {
this.cache.delete(oldestKey);
this.emit('evict', oldestKey);
}
}
/**
* Estimate memory usage in bytes
*/
private estimateMemoryUsage(): number {
let totalSize = 0;
for (const [key, entry] of this.cache.entries()) {
// Rough estimation: key size + data size (as JSON string)
totalSize += key.length * 2; // UTF-16 encoding
totalSize += JSON.stringify(entry.data).length * 2;
totalSize += 64; // Overhead for entry metadata
}
return totalSize;
}
/**
* Reset statistics
*/
private resetStats(): void {
this.stats = {
hits: 0,
misses: 0,
totalAccessTime: 0,
accessCount: 0,
};
}
/**
* Start cleanup timer
*/
private startCleanupTimer(): void {
if (this.cleanupTimer) {
clearInterval(this.cleanupTimer);
}
this.cleanupTimer = setInterval(() => {
const removed = this.cleanup();
if (removed > 0) {
this.emit('cleanup', removed);
}
}, this.config.cleanupInterval);
}
/**
* Stop cleanup timer and close cache
*/
destroy(): void {
if (this.cleanupTimer) {
clearInterval(this.cleanupTimer);
this.cleanupTimer = null;
}
this.clear();
this.removeAllListeners();
}
}
// Specialized cache for market data
export class MarketDataCache extends AdvancedCache {
constructor() {
super({
maxSize: 10000,
defaultTtl: 60000, // 1 minute
cleanupInterval: 30000, // 30 seconds
enableStats: true,
compressionEnabled: false,
});
}
// Market data specific cache keys
getQuoteKey(symbol: string): string {
return `quote:${symbol}`;
}
getCandleKey(symbol: string, timeframe: string, timestamp: Date): string {
return `candle:${symbol}:${timeframe}:${timestamp.getTime()}`;
}
getOrderBookKey(symbol: string): string {
return `orderbook:${symbol}`;
}
// Market data specific TTLs
setQuote(symbol: string, data: any): void {
this.set(this.getQuoteKey(symbol), data, 60000); // 1 minute
}
setCandle(symbol: string, timeframe: string, timestamp: Date, data: any): void {
const ttl = this.getCandleTTL(timeframe);
this.set(this.getCandleKey(symbol, timeframe, timestamp), data, ttl);
}
setOrderBook(symbol: string, data: any): void {
this.set(this.getOrderBookKey(symbol), data, 30000); // 30 seconds
}
private getCandleTTL(timeframe: string): number {
const ttlMap: Record<string, number> = {
'1m': 60000, // 1 minute
'5m': 300000, // 5 minutes
'15m': 900000, // 15 minutes
'1h': 3600000, // 1 hour
'1d': 86400000, // 24 hours
};
return ttlMap[timeframe] || 300000; // Default 5 minutes
}
}

View file

@ -0,0 +1,346 @@
import { EventEmitter } from 'eventemitter3';
import {
BunHttpClient,
RequestConfig,
HttpResponse,
ConnectionStats,
HttpClientConfig
} from '@stock-bot/http-client';
export interface ConnectionPoolConfig {
maxConnections: number;
maxConnectionsPerHost: number;
connectionTimeout: number;
requestTimeout: number;
retryAttempts: number;
retryDelay: number;
keepAlive: boolean;
maxIdleTime: number;
}
export interface QueuedRequest {
id: string;
config: RequestConfig;
resolve: (value: any) => void;
reject: (error: any) => void;
timestamp: number;
retryCount: number;
}
export class ConnectionPoolManager extends EventEmitter {
private clients = new Map<string, BunHttpClient>();
private activeRequests = new Map<string, number>(); // host -> count
private requestQueue: QueuedRequest[] = [];
private stats = {
totalConnections: 0,
successfulRequests: 0,
failedRequests: 0,
totalResponseTime: 0,
requestCount: 0,
};
private isProcessingQueue = false;
private queueProcessor?: NodeJS.Timeout;
constructor(private config: ConnectionPoolConfig) {
super();
this.startQueueProcessor();
}
/**
* Get or create a client for a host
*/
private getClient(host: string): BunHttpClient {
if (!this.clients.has(host)) {
const client = new BunHttpClient({
baseURL: `https://${host}`,
timeout: this.config.requestTimeout,
retries: this.config.retryAttempts,
retryDelay: this.config.retryDelay,
keepAlive: this.config.keepAlive,
headers: {
'User-Agent': 'StockBot-MarketDataGateway/1.0',
'Accept': 'application/json',
},
validateStatus: (status: number) => status < 500,
});
// Listen for events from the client
client.on('response', (data) => {
const responseTime = data.response.timing.duration;
this.updateStats(true, responseTime);
this.emit('response', {
host,
responseTime,
status: data.response.status
});
});
client.on('error', (data) => {
const responseTime = data.error?.config?.metadata?.startTime
? Date.now() - data.error.config.metadata.startTime
: 0;
this.updateStats(false, responseTime);
this.emit('error', {
host,
error: data.error.message,
responseTime
});
});
this.clients.set(host, client);
this.activeRequests.set(host, 0);
this.stats.totalConnections++;
this.emit('connectionCreated', host);
}
return this.clients.get(host)!;
}
/**
* Make an HTTP request with connection pooling
*/
async request(config: RequestConfig): Promise<any> {
return new Promise((resolve, reject) => {
const requestId = this.generateRequestId();
const queuedRequest: QueuedRequest = {
id: requestId,
config,
resolve,
reject,
timestamp: Date.now(),
retryCount: 0,
};
this.requestQueue.push(queuedRequest);
this.processQueue();
});
}
/**
* Process the request queue
*/
private async processQueue(): Promise<void> {
if (this.isProcessingQueue || this.requestQueue.length === 0) {
return;
}
this.isProcessingQueue = true;
while (this.requestQueue.length > 0) {
const request = this.requestQueue.shift()!;
try {
const host = this.extractHost(request.config.url || '');
const currentConnections = this.activeRequests.get(host) || 0;
// Check connection limits
if (currentConnections >= this.config.maxConnectionsPerHost) {
// Put request back in queue
this.requestQueue.unshift(request);
break;
}
// Check global connection limit
const totalActive = Array.from(this.activeRequests.values()).reduce((sum, count) => sum + count, 0);
if (totalActive >= this.config.maxConnections) {
this.requestQueue.unshift(request);
break;
}
// Execute the request
this.executeRequest(request, host);
} catch (error) {
request.reject(error);
}
}
this.isProcessingQueue = false;
}
/**
* Execute a single request
*/
private async executeRequest(request: QueuedRequest, host: string): Promise<void> {
const client = this.getClient(host);
// Increment active connections
this.activeRequests.set(host, (this.activeRequests.get(host) || 0) + 1);
try {
// Add metadata to track timing
if (!request.config.metadata) {
request.config.metadata = {};
}
request.config.metadata.startTime = Date.now();
// Execute request using our client
const response = await client.request(request.config);
request.resolve(response.data);
} catch (error: any) {
// No need to handle retries explicitly as the BunHttpClient handles them internally
request.reject(error);
// Emit retry event for monitoring
if (error.retryCount) {
this.emit('retry', {
requestId: request.id,
retryCount: error.retryCount,
error
});
}
} finally {
// Decrement active connections
this.activeRequests.set(host, Math.max(0, (this.activeRequests.get(host) || 0) - 1));
}
}
/**
* Extract host from URL
*/
private extractHost(url: string): string {
try {
const urlObj = new URL(url);
return urlObj.host;
} catch {
return 'default';
}
}
/**
* Generate unique request ID
*/
private generateRequestId(): string {
return `req_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
/**
* Update statistics
*/
private updateStats(success: boolean, responseTime: number): void {
this.stats.requestCount++;
this.stats.totalResponseTime += responseTime;
if (success) {
this.stats.successfulRequests++;
} else {
this.stats.failedRequests++;
}
}
/**
* Get connection pool statistics
*/
getStats(): ConnectionStats {
const totalActive = Array.from(this.activeRequests.values()).reduce((sum, count) => sum + count, 0);
const averageResponseTime = this.stats.requestCount > 0
? this.stats.totalResponseTime / this.stats.requestCount
: 0;
const utilization = this.config.maxConnections > 0
? totalActive / this.config.maxConnections
: 0;
// Combine our stats with the stats from all clients
const clientStats = Array.from(this.clients.values()).map(client => client.getStats());
let successfulRequests = this.stats.successfulRequests;
let failedRequests = this.stats.failedRequests;
for (const stats of clientStats) {
successfulRequests += stats.successfulRequests;
failedRequests += stats.failedRequests;
}
return {
activeConnections: totalActive,
totalConnections: this.stats.totalConnections,
successfulRequests,
failedRequests,
averageResponseTime,
connectionPoolUtilization: utilization,
requestsPerSecond: 0 // Will be calculated by the http-client
};
}
/**
* Start queue processor timer
*/
private startQueueProcessor(): void {
this.queueProcessor = setInterval(() => {
this.processQueue();
}, 100); // Process queue every 100ms
}
/**
* Close all connections and clean up
*/
async close(): Promise<void> {
// Stop the queue processor
if (this.queueProcessor) {
clearInterval(this.queueProcessor);
}
// Wait for pending requests to complete (with timeout)
const timeout = 30000; // 30 seconds
const startTime = Date.now();
while (this.requestQueue.length > 0 && Date.now() - startTime < timeout) {
await new Promise(resolve => setTimeout(resolve, 100));
}
// Clear remaining requests
while (this.requestQueue.length > 0) {
const request = this.requestQueue.shift()!;
request.reject(new Error('Connection pool closing'));
}
// Close all clients
const closePromises = Array.from(this.clients.values()).map(client => client.close());
await Promise.all(closePromises);
// Clear clients and requests
this.clients.clear();
this.activeRequests.clear();
this.emit('closed');
}
/**
* Health check for the connection pool
*/
async healthCheck(): Promise<{ healthy: boolean; details: any }> {
const stats = this.getStats();
const queueSize = this.requestQueue.length;
// Check health of all clients
const clientHealthChecks = await Promise.all(
Array.from(this.clients.entries()).map(async ([host, client]) => {
const health = await client.healthCheck();
return {
host,
healthy: health.healthy,
details: health.details
};
})
);
const healthy =
stats.connectionPoolUtilization < 0.9 && // Less than 90% utilization
queueSize < 100 && // Queue not too large
stats.averageResponseTime < 5000 && // Average response time under 5 seconds
clientHealthChecks.every(check => check.healthy); // All clients healthy
return {
healthy,
details: {
stats,
queueSize,
clients: clientHealthChecks,
connections: Array.from(this.clients.keys()),
},
};
}
}

View file

@ -1,36 +1,164 @@
import type { MarketData, OHLCV } from '@stock-bot/shared-types';
import { dataProviderConfigs } from '@stock-bot/config';
import { dataProviderConfigs, DataProviderConfig } from '../config/DataProviderConfig';
// Define local types for market data
interface MarketDataType {
symbol: string;
price: number;
bid: number;
ask: number;
volume: number;
timestamp: Date;
}
interface OHLCVType {
symbol: string;
timestamp: Date;
open: number;
high: number;
low: number;
close: number;
volume: number;
}
export interface DataNormalizationResult<T> {
success: boolean;
data?: T;
error?: string;
source: string;
timestamp: Date;
processingTimeMs: number;
}
export interface DataQualityMetrics {
completeness: number; // 0-1
accuracy: number; // 0-1
timeliness: number; // 0-1
consistency: number; // 0-1
overall: number; // 0-1
}
export class DataNormalizer {
private readonly providerConfigs: Record<string, DataProviderConfig>;
constructor() {
this.providerConfigs = dataProviderConfigs;
}
/**
* Normalize market data from different providers to our standard format
*/
normalizeMarketData(rawData: any, source: string): MarketData {
switch (source) {
case 'alpha-vantage':
return this.normalizeAlphaVantage(rawData);
case 'yahoo-finance':
return this.normalizeYahooFinance(rawData);
default:
throw new Error(`Unsupported data source: ${source}`);
}
}
normalizeMarketData(rawData: any, source: string): DataNormalizationResult<MarketDataType> {
const startTime = Date.now();
try {
let normalizedData: MarketDataType;
switch (source.toLowerCase()) {
case 'alpha-vantage':
normalizedData = this.normalizeAlphaVantage(rawData);
break;
case 'yahoo-finance':
normalizedData = this.normalizeYahooFinance(rawData);
break;
case 'polygon':
normalizedData = this.normalizePolygon(rawData);
break;
default:
return {
success: false,
error: `Unsupported data source: ${source}`,
source,
timestamp: new Date(),
processingTimeMs: Date.now() - startTime,
};
}
/**
// Validate the normalized data
if (!this.validateMarketData(normalizedData)) {
return {
success: false,
error: 'Data validation failed',
source,
timestamp: new Date(),
processingTimeMs: Date.now() - startTime,
};
}
return {
success: true,
data: normalizedData,
source,
timestamp: new Date(),
processingTimeMs: Date.now() - startTime,
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
source,
timestamp: new Date(),
processingTimeMs: Date.now() - startTime,
};
}
} /**
* Normalize OHLCV data from different providers
*/
normalizeOHLCV(rawData: any, source: string): OHLCV[] {
switch (source) {
case 'alpha-vantage':
return this.normalizeAlphaVantageOHLCV(rawData);
case 'yahoo-finance':
return this.normalizeYahooFinanceOHLCV(rawData);
default:
throw new Error(`Unsupported data source: ${source}`);
normalizeOHLCV(rawData: any, source: string): DataNormalizationResult<OHLCVType[]> {
const startTime = Date.now();
try {
let normalizedData: OHLCVType[];
switch (source.toLowerCase()) {
case 'alpha-vantage':
normalizedData = this.normalizeAlphaVantageOHLCV(rawData);
break;
case 'yahoo-finance':
normalizedData = this.normalizeYahooFinanceOHLCV(rawData);
break;
case 'polygon':
normalizedData = this.normalizePolygonOHLCV(rawData);
break;
default:
return {
success: false,
error: `Unsupported data source: ${source}`,
source,
timestamp: new Date(),
processingTimeMs: Date.now() - startTime,
};
}
// Validate each OHLCV entry
const validData = normalizedData.filter(item => this.validateOHLCV(item));
if (validData.length === 0) {
return {
success: false,
error: 'No valid OHLCV data after normalization',
source,
timestamp: new Date(),
processingTimeMs: Date.now() - startTime,
};
}
return {
success: true,
data: validData,
source,
timestamp: new Date(),
processingTimeMs: Date.now() - startTime,
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
source,
timestamp: new Date(),
processingTimeMs: Date.now() - startTime,
};
}
}
private normalizeAlphaVantage(data: any): MarketData {
private normalizeAlphaVantage(data: any): MarketDataType {
const quote = data['Global Quote'];
return {
symbol: quote['01. symbol'],
@ -41,8 +169,7 @@ export class DataNormalizer {
timestamp: new Date(),
};
}
private normalizeYahooFinance(data: any): MarketData {
private normalizeYahooFinance(data: any): MarketDataType {
return {
symbol: data.symbol,
price: data.regularMarketPrice,
@ -53,7 +180,19 @@ export class DataNormalizer {
};
}
private normalizeAlphaVantageOHLCV(data: any): OHLCV[] {
private normalizePolygon(data: any): MarketDataType {
// Polygon.io format normalization
return {
symbol: data.T || data.symbol,
price: data.c || data.price,
bid: data.b || data.bid,
ask: data.a || data.ask,
volume: data.v || data.volume,
timestamp: new Date(data.t || data.timestamp),
};
}
private normalizeAlphaVantageOHLCV(data: any): OHLCVType[] {
const timeSeries = data['Time Series (1min)'] || data['Time Series (5min)'] || data['Time Series (Daily)'];
const symbol = data['Meta Data']['2. Symbol'];
@ -67,8 +206,7 @@ export class DataNormalizer {
volume: parseInt(values['5. volume']),
})).sort((a, b) => b.timestamp.getTime() - a.timestamp.getTime());
}
private normalizeYahooFinanceOHLCV(data: any): OHLCV[] {
private normalizeYahooFinanceOHLCV(data: any): OHLCVType[] {
const result = data.chart.result[0];
const timestamps = result.timestamp;
const quotes = result.indicators.quote[0];
@ -84,26 +222,48 @@ export class DataNormalizer {
}));
}
/**
private normalizePolygonOHLCV(data: any): OHLCVType[] {
// Polygon.io aggregates format
if (data.results && Array.isArray(data.results)) {
return data.results.map((candle: any) => ({
symbol: data.ticker || candle.T,
timestamp: new Date(candle.t),
open: candle.o,
high: candle.h,
low: candle.l,
close: candle.c,
volume: candle.v,
}));
}
return [];
} /**
* Validate market data quality
*/
validateMarketData(data: MarketData): boolean {
validateMarketData(data: MarketDataType): boolean {
return (
data.symbol &&
typeof data.symbol === 'string' &&
data.symbol.length > 0 &&
typeof data.price === 'number' &&
data.price > 0 &&
typeof data.volume === 'number' &&
data.volume >= 0 &&
data.timestamp instanceof Date
data.timestamp instanceof Date &&
!isNaN(data.timestamp.getTime()) &&
typeof data.bid === 'number' &&
typeof data.ask === 'number' &&
data.ask >= data.bid
) as boolean;
}
/**
* Validate OHLCV data quality
*/
validateOHLCV(data: OHLCV): boolean {
validateOHLCV(data: OHLCVType): boolean {
return (
data.symbol &&
typeof data.symbol === 'string' &&
data.symbol.length > 0 &&
typeof data.open === 'number' && data.open > 0 &&
typeof data.high === 'number' && data.high > 0 &&
typeof data.low === 'number' && data.low > 0 &&
@ -111,7 +271,126 @@ export class DataNormalizer {
data.high >= Math.max(data.open, data.close) &&
data.low <= Math.min(data.open, data.close) &&
typeof data.volume === 'number' && data.volume >= 0 &&
data.timestamp instanceof Date
data.timestamp instanceof Date &&
!isNaN(data.timestamp.getTime())
) as boolean;
}
/**
* Assess data quality metrics for market data
*/
assessDataQuality(data: MarketDataType[], source: string): DataQualityMetrics {
if (data.length === 0) {
return {
completeness: 0,
accuracy: 0,
timeliness: 0,
consistency: 0,
overall: 0,
};
}
// Completeness: percentage of valid data points
const validCount = data.filter(item => this.validateMarketData(item)).length;
const completeness = validCount / data.length;
// Accuracy: based on price consistency and reasonable ranges
const accuracyScore = this.assessAccuracy(data);
// Timeliness: based on data freshness
const timelinessScore = this.assessTimeliness(data);
// Consistency: based on data patterns and outliers
const consistencyScore = this.assessConsistency(data);
const overall = (completeness + accuracyScore + timelinessScore + consistencyScore) / 4;
return {
completeness,
accuracy: accuracyScore,
timeliness: timelinessScore,
consistency: consistencyScore,
overall,
};
}
private assessAccuracy(data: MarketDataType[]): number {
let accuracySum = 0;
for (const item of data) {
let score = 1.0;
// Check for reasonable price ranges
if (item.price <= 0 || item.price > 100000) score -= 0.3;
// Check bid/ask spread reasonableness
const spread = item.ask - item.bid;
const spreadPercentage = spread / item.price;
if (spreadPercentage > 0.1) score -= 0.2; // More than 10% spread is suspicious
// Check for negative volume
if (item.volume < 0) score -= 0.5;
accuracySum += Math.max(0, score);
}
return data.length > 0 ? accuracySum / data.length : 0;
}
private assessTimeliness(data: MarketDataType[]): number {
const now = new Date();
let timelinessSum = 0;
for (const item of data) {
const ageMs = now.getTime() - item.timestamp.getTime();
const ageMinutes = ageMs / (1000 * 60);
// Score based on data age (fresher is better)
let score = 1.0;
if (ageMinutes > 60) score = 0.1; // Very old data
else if (ageMinutes > 15) score = 0.5; // Moderately old
else if (ageMinutes > 5) score = 0.8; // Slightly old
timelinessSum += score;
}
return data.length > 0 ? timelinessSum / data.length : 0;
}
private assessConsistency(data: MarketDataType[]): number {
if (data.length < 2) return 1.0;
// Sort by timestamp
const sortedData = [...data].sort((a, b) => a.timestamp.getTime() - b.timestamp.getTime());
let consistencySum = 0;
for (let i = 1; i < sortedData.length; i++) {
const current = sortedData[i];
const previous = sortedData[i - 1];
// Check for reasonable price movements
const priceChange = Math.abs(current.price - previous.price) / previous.price;
let score = 1.0;
if (priceChange > 0.5) score -= 0.7; // More than 50% change is suspicious
else if (priceChange > 0.1) score -= 0.3; // More than 10% change is notable
consistencySum += Math.max(0, score);
}
return consistencySum / (sortedData.length - 1);
}
/**
* Clean and sanitize market data
*/
sanitizeMarketData(data: MarketDataType): MarketDataType {
return {
symbol: data.symbol.toUpperCase().trim(),
price: Math.max(0, Number(data.price) || 0),
bid: Math.max(0, Number(data.bid) || 0),
ask: Math.max(0, Number(data.ask) || 0),
volume: Math.max(0, Math.floor(Number(data.volume) || 0)),
timestamp: new Date(data.timestamp),
};
}
}

View file

@ -1,7 +1,43 @@
import { EventEmitter } from 'eventemitter3';
import { Logger } from 'pino';
// Local logger interface to avoid pino dependency issues
interface Logger {
info(msg: string, ...args: any[]): void;
error(msg: string, ...args: any[]): void;
warn(msg: string, ...args: any[]): void;
debug(msg: string, ...args: any[]): void;
child(options: any): Logger;
}
// Simple logger implementation
const createLogger = (name: string): Logger => ({
info: (msg: string, ...args: any[]) => console.log(`[${name}] INFO:`, msg, ...args),
error: (msg: string, ...args: any[]) => console.error(`[${name}] ERROR:`, msg, ...args),
warn: (msg: string, ...args: any[]) => console.warn(`[${name}] WARN:`, msg, ...args),
debug: (msg: string, ...args: any[]) => console.debug(`[${name}] DEBUG:`, msg, ...args),
child: (options: any) => createLogger(`${name}.${options.component || 'child'}`)
});
import WebSocket from 'ws';
import axios, { AxiosInstance } from 'axios';
// Simple HTTP client to replace axios
interface HttpClient {
get(url: string): Promise<{ data: any }>;
post(url: string, data?: any): Promise<{ data: any }>;
}
const createHttpClient = (baseURL: string, headers?: Record<string, string>): HttpClient => ({
get: async (url: string) => {
const response = await fetch(`${baseURL}${url}`, { headers });
return { data: await response.json() };
},
post: async (url: string, data?: any) => {
const response = await fetch(`${baseURL}${url}`, {
method: 'POST',
headers: { 'Content-Type': 'application/json', ...headers },
body: data ? JSON.stringify(data) : undefined
});
return { data: await response.json() };
}
});
import {
DataSourceConfig,
DataSourceMetrics,
@ -13,7 +49,7 @@ import {
interface DataSourceConnection {
config: DataSourceConfig;
connection?: WebSocket | AxiosInstance;
connection?: WebSocket | HttpClient;
status: 'disconnected' | 'connecting' | 'connected' | 'error';
lastConnectedAt?: Date;
lastErrorAt?: Date;
@ -112,9 +148,8 @@ export class DataSourceManager extends EventEmitter {
await this.connectDataSource(config.id);
}
}
public async removeDataSource(sourceId: string): Promise<void> {
this.logger.info({ sourceId }, 'Removing data source');
this.logger.info(`Removing data source: ${sourceId}`);
await this.disconnectDataSource(sourceId);
this.dataSources.delete(sourceId);
@ -132,7 +167,7 @@ export class DataSourceManager extends EventEmitter {
throw new Error(`Data source ${sourceId} not found`);
}
this.logger.info({ sourceId, updates }, 'Updating data source');
this.logger.info(`Updating data source: ${sourceId}`, updates);
// Update configuration
dataSource.config = { ...dataSource.config, ...updates };

View file

@ -4,12 +4,14 @@ import type { MarketDataEvent, SignalEvent, TradingEvent } from '@stock-bot/shar
export class EventPublisher {
private dragonfly: Redis;
private readonly STREAM_NAME = 'trading-events'; constructor() {
private readonly STREAM_NAME = 'trading-events';
constructor() {
this.dragonfly = new Redis({
host: databaseConfig.dragonfly.host,
port: databaseConfig.dragonfly.port,
password: databaseConfig.dragonfly.password,
maxRetriesPerRequest: 3,
maxRetriesPerRequest: databaseConfig.dragonfly.maxRetriesPerRequest,
});
this.dragonfly.on('connect', () => {

View file

@ -1,5 +1,21 @@
import { EventEmitter } from 'eventemitter3';
import { Logger } from 'pino';
// Local logger interface to avoid pino dependency issues
interface Logger {
info(msg: string, ...args: any[]): void;
error(msg: string, ...args: any[]): void;
warn(msg: string, ...args: any[]): void;
debug(msg: string, ...args: any[]): void;
child(options: any): Logger;
}
// Simple logger implementation
const createLogger = (name: string): Logger => ({
info: (msg: string, ...args: any[]) => console.log(`[${name}] INFO:`, msg, ...args),
error: (msg: string, ...args: any[]) => console.error(`[${name}] ERROR:`, msg, ...args),
warn: (msg: string, ...args: any[]) => console.warn(`[${name}] WARN:`, msg, ...args),
debug: (msg: string, ...args: any[]) => console.debug(`[${name}] DEBUG:`, msg, ...args),
child: (options: any) => createLogger(`${name}.${options.component || 'child'}`)
});
import {
GatewayConfig,
DataSourceConfig,