initial setup

This commit is contained in:
Bojan Kucera 2025-06-02 08:15:20 -04:00
commit 232a63dfe8
61 changed files with 4985 additions and 0 deletions

View file

@ -0,0 +1,20 @@
{
"name": "market-data-gateway",
"version": "1.0.0",
"description": "Market data ingestion service",
"main": "src/index.ts",
"scripts": {
"dev": "bun run --watch src/index.ts",
"start": "bun run src/index.ts",
"test": "echo 'No tests yet'"
}, "dependencies": {
"hono": "^4.6.3",
"ioredis": "^5.4.1",
"@stock-bot/config": "workspace:*",
"@stock-bot/shared-types": "workspace:*",
"ws": "^8.18.0"
}, "devDependencies": {
"bun-types": "^1.2.15",
"@types/ws": "^8.5.12"
}
}

View file

@ -0,0 +1,83 @@
import { Hono } from 'hono';
import { serve } from 'bun';
const app = new Hono();
// Health check endpoint
app.get('/health', (c) => {
return c.json({
service: 'market-data-gateway',
status: 'healthy',
timestamp: new Date(),
version: '1.0.0'
});
});
// Demo market data endpoint
app.get('/api/market-data/:symbol', (c) => {
const symbol = c.req.param('symbol');
// Generate demo data
const demoData = {
symbol: symbol.toUpperCase(),
price: 150 + Math.random() * 50, // Random price between 150-200
bid: 149.99,
ask: 150.01,
volume: Math.floor(Math.random() * 1000000),
timestamp: new Date()
};
return c.json({
success: true,
data: demoData,
timestamp: new Date()
});
});
// Demo OHLCV endpoint
app.get('/api/ohlcv/:symbol', (c) => {
const symbol = c.req.param('symbol');
const limit = parseInt(c.req.query('limit') || '10');
// Generate demo OHLCV data
const data = [];
let basePrice = 150;
for (let i = limit - 1; i >= 0; i--) {
const open = basePrice + (Math.random() - 0.5) * 10;
const close = open + (Math.random() - 0.5) * 5;
const high = Math.max(open, close) + Math.random() * 3;
const low = Math.min(open, close) - Math.random() * 3;
data.push({
symbol: symbol.toUpperCase(),
timestamp: new Date(Date.now() - i * 60000), // 1 minute intervals
open: Math.round(open * 100) / 100,
high: Math.round(high * 100) / 100,
low: Math.round(low * 100) / 100,
close: Math.round(close * 100) / 100,
volume: Math.floor(Math.random() * 50000) + 10000
});
basePrice = close;
}
return c.json({
success: true,
data,
timestamp: new Date()
});
});
const PORT = 3001;
console.log(`🚀 Market Data Gateway starting on port ${PORT}`);
serve({
port: PORT,
fetch: app.fetch,
});
console.log(`📊 Market Data Gateway running on http://localhost:${PORT}`);
console.log(`🔍 Health check: http://localhost:${PORT}/health`);
console.log(`📈 Demo data: http://localhost:${PORT}/api/market-data/AAPL`);

View file

@ -0,0 +1,117 @@
import type { MarketData, OHLCV } from '@stock-bot/shared-types';
import { dataProviderConfigs } from '@stock-bot/config';
export class DataNormalizer {
/**
* Normalize market data from different providers to our standard format
*/
normalizeMarketData(rawData: any, source: string): MarketData {
switch (source) {
case 'alpha-vantage':
return this.normalizeAlphaVantage(rawData);
case 'yahoo-finance':
return this.normalizeYahooFinance(rawData);
default:
throw new Error(`Unsupported data source: ${source}`);
}
}
/**
* Normalize OHLCV data from different providers
*/
normalizeOHLCV(rawData: any, source: string): OHLCV[] {
switch (source) {
case 'alpha-vantage':
return this.normalizeAlphaVantageOHLCV(rawData);
case 'yahoo-finance':
return this.normalizeYahooFinanceOHLCV(rawData);
default:
throw new Error(`Unsupported data source: ${source}`);
}
}
private normalizeAlphaVantage(data: any): MarketData {
const quote = data['Global Quote'];
return {
symbol: quote['01. symbol'],
price: parseFloat(quote['05. price']),
bid: parseFloat(quote['05. price']) - 0.01, // Approximate bid/ask
ask: parseFloat(quote['05. price']) + 0.01,
volume: parseInt(quote['06. volume']),
timestamp: new Date(),
};
}
private normalizeYahooFinance(data: any): MarketData {
return {
symbol: data.symbol,
price: data.regularMarketPrice,
bid: data.bid || data.regularMarketPrice - 0.01,
ask: data.ask || data.regularMarketPrice + 0.01,
volume: data.regularMarketVolume,
timestamp: new Date(data.regularMarketTime * 1000),
};
}
private normalizeAlphaVantageOHLCV(data: any): OHLCV[] {
const timeSeries = data['Time Series (1min)'] || data['Time Series (5min)'] || data['Time Series (Daily)'];
const symbol = data['Meta Data']['2. Symbol'];
return Object.entries(timeSeries).map(([timestamp, values]: [string, any]) => ({
symbol,
timestamp: new Date(timestamp),
open: parseFloat(values['1. open']),
high: parseFloat(values['2. high']),
low: parseFloat(values['3. low']),
close: parseFloat(values['4. close']),
volume: parseInt(values['5. volume']),
})).sort((a, b) => b.timestamp.getTime() - a.timestamp.getTime());
}
private normalizeYahooFinanceOHLCV(data: any): OHLCV[] {
const result = data.chart.result[0];
const timestamps = result.timestamp;
const quotes = result.indicators.quote[0];
return timestamps.map((timestamp: number, index: number) => ({
symbol: result.meta.symbol,
timestamp: new Date(timestamp * 1000),
open: quotes.open[index],
high: quotes.high[index],
low: quotes.low[index],
close: quotes.close[index],
volume: quotes.volume[index],
}));
}
/**
* Validate market data quality
*/
validateMarketData(data: MarketData): boolean {
return (
data.symbol &&
typeof data.price === 'number' &&
data.price > 0 &&
typeof data.volume === 'number' &&
data.volume >= 0 &&
data.timestamp instanceof Date
) as boolean;
}
/**
* Validate OHLCV data quality
*/
validateOHLCV(data: OHLCV): boolean {
return (
data.symbol &&
typeof data.open === 'number' && data.open > 0 &&
typeof data.high === 'number' && data.high > 0 &&
typeof data.low === 'number' && data.low > 0 &&
typeof data.close === 'number' && data.close > 0 &&
data.high >= Math.max(data.open, data.close) &&
data.low <= Math.min(data.open, data.close) &&
typeof data.volume === 'number' && data.volume >= 0 &&
data.timestamp instanceof Date
) as boolean;
}
}

View file

@ -0,0 +1,138 @@
import Redis from 'ioredis';
import { databaseConfig } from '@stock-bot/config';
import type { MarketDataEvent, SignalEvent, TradingEvent } from '@stock-bot/shared-types';
export class EventPublisher {
private dragonfly: Redis;
private readonly STREAM_NAME = 'trading-events'; constructor() {
this.dragonfly = new Redis({
host: databaseConfig.dragonfly.host,
port: databaseConfig.dragonfly.port,
password: databaseConfig.dragonfly.password,
maxRetriesPerRequest: 3,
});
this.dragonfly.on('connect', () => {
console.log('🐉 Connected to Dragonfly for event publishing');
});
this.dragonfly.on('error', (error) => {
console.error('❌ Dragonfly connection error:', error);
});
}
/**
* Publish a market data event to the event stream
*/ async publishMarketData(event: MarketDataEvent): Promise<void> {
try {
await this.dragonfly.xadd(
this.STREAM_NAME,
'*',
'type', event.type,
'data', JSON.stringify(event.data),
'timestamp', event.timestamp.toISOString()
);
} catch (error) {
console.error('Error publishing market data event:', error);
throw error;
}
}
/**
* Publish a trading signal event
*/
async publishSignal(event: SignalEvent): Promise<void> {
try {
await this.dragonfly.xadd(
this.STREAM_NAME,
'*',
'type', event.type,
'signal', JSON.stringify(event.signal),
'timestamp', event.timestamp.toISOString()
);
} catch (error) {
console.error('Error publishing signal event:', error);
throw error;
}
}
/**
* Publish any trading event
*/
async publishEvent(event: TradingEvent): Promise<void> {
try {
const fields: string[] = ['type', event.type, 'timestamp', event.timestamp.toISOString()];
if ('data' in event) {
fields.push('data', JSON.stringify(event.data));
}
if ('order' in event) {
fields.push('order', JSON.stringify(event.order));
}
if ('signal' in event) {
fields.push('signal', JSON.stringify(event.signal));
}
await this.dragonfly.xadd(this.STREAM_NAME, '*', ...fields);
} catch (error) {
console.error('Error publishing event:', error);
throw error;
}
}
/**
* Cache market data in Dragonfly for quick access
*/
async cacheMarketData(symbol: string, data: any, ttl: number = 60): Promise<void> {
try {
const key = `market-data:${symbol}`;
await this.dragonfly.setex(key, ttl, JSON.stringify(data));
} catch (error) {
console.error('Error caching market data:', error);
}
}
/**
* Get cached market data from Dragonfly
*/
async getCachedMarketData(symbol: string): Promise<any | null> {
try {
const key = `market-data:${symbol}`;
const cached = await this.dragonfly.get(key);
return cached ? JSON.parse(cached) : null;
} catch (error) {
console.error('Error getting cached market data:', error);
return null;
}
}
/**
* Publish to a specific channel for real-time subscriptions
*/
async publishToChannel(channel: string, data: any): Promise<void> {
try {
await this.dragonfly.publish(channel, JSON.stringify(data));
} catch (error) {
console.error(`Error publishing to channel ${channel}:`, error);
throw error;
}
}
/**
* Set up health monitoring
*/
async setServiceHealth(serviceName: string, status: 'healthy' | 'unhealthy'): Promise<void> {
try {
const key = `health:${serviceName}`;
const healthData = {
status,
timestamp: new Date().toISOString(),
lastSeen: Date.now(),
};
await this.dragonfly.setex(key, 300, JSON.stringify(healthData)); // 5 minutes TTL
} catch (error) {
console.error('Error setting service health:', error);
}
}
/**
* Close Dragonfly connection
*/
async disconnect(): Promise<void> {
await this.dragonfly.quit();
}
}

View file

@ -0,0 +1,278 @@
import type { MarketData, OHLCV, MarketDataEvent } from '@stock-bot/shared-types';
import { dataProviderConfigs } from '@stock-bot/config';
import { EventPublisher } from './EventPublisher';
import { DataNormalizer } from './DataNormalizer';
export class MarketDataService {
private wsClients: Set<any> = new Set();
private subscriptions: Map<string, Set<any>> = new Map();
private dataUpdateInterval: Timer | null = null;
private readonly UPDATE_INTERVAL = 5000; // 5 seconds
constructor(
private eventPublisher: EventPublisher,
private dataNormalizer: DataNormalizer
) {}
/**
* Initialize the market data service
*/
async initialize(): Promise<void> {
console.log('🔄 Initializing Market Data Service...');
// Set up periodic data updates for demo purposes
this.startDataUpdates();
// Set service health
await this.eventPublisher.setServiceHealth('market-data-gateway', 'healthy');
console.log('✅ Market Data Service initialized');
}
/**
* Get latest market data for a symbol
*/
async getLatestData(symbol: string): Promise<MarketData> {
// First check cache
const cached = await this.eventPublisher.getCachedMarketData(symbol);
if (cached) {
return cached;
}
// Fetch fresh data (using demo data for now)
const marketData = this.generateDemoData(symbol);
// Cache the data
await this.eventPublisher.cacheMarketData(symbol, marketData, 60);
// Publish market data event
const event: MarketDataEvent = {
type: 'MARKET_DATA',
data: marketData,
timestamp: new Date(),
};
await this.eventPublisher.publishMarketData(event);
return marketData;
}
/**
* Get OHLCV data for a symbol
*/
async getOHLCV(symbol: string, interval: string, limit: number): Promise<OHLCV[]> {
// Generate demo OHLCV data
const ohlcvData = this.generateDemoOHLCV(symbol, limit);
// Cache the data
await this.eventPublisher.cacheMarketData(`ohlcv:${symbol}:${interval}`, ohlcvData, 300);
return ohlcvData;
}
/**
* Add WebSocket client for real-time updates
*/
addWebSocketClient(ws: any): void {
this.wsClients.add(ws);
}
/**
* Remove WebSocket client
*/
removeWebSocketClient(ws: any): void {
this.wsClients.delete(ws);
// Remove from all subscriptions
for (const [symbol, clients] of this.subscriptions) {
clients.delete(ws);
if (clients.size === 0) {
this.subscriptions.delete(symbol);
}
}
}
/**
* Handle WebSocket messages
*/
handleWebSocketMessage(ws: any, data: any): void {
try {
const message = typeof data === 'string' ? JSON.parse(data) : data;
switch (message.type) {
case 'subscribe':
this.subscribeToSymbol(ws, message.symbol);
break;
case 'unsubscribe':
this.unsubscribeFromSymbol(ws, message.symbol);
break;
default:
console.log('Unknown WebSocket message type:', message.type);
}
} catch (error) {
console.error('Error handling WebSocket message:', error);
}
}
/**
* Subscribe WebSocket client to symbol updates
*/
private subscribeToSymbol(ws: any, symbol: string): void {
if (!this.subscriptions.has(symbol)) {
this.subscriptions.set(symbol, new Set());
}
this.subscriptions.get(symbol)!.add(ws);
ws.send(JSON.stringify({
type: 'subscribed',
symbol,
timestamp: new Date().toISOString(),
}));
}
/**
* Unsubscribe WebSocket client from symbol updates
*/
private unsubscribeFromSymbol(ws: any, symbol: string): void {
const clients = this.subscriptions.get(symbol);
if (clients) {
clients.delete(ws);
if (clients.size === 0) {
this.subscriptions.delete(symbol);
}
}
ws.send(JSON.stringify({
type: 'unsubscribed',
symbol,
timestamp: new Date().toISOString(),
}));
}
/**
* Start periodic data updates for demo
*/
private startDataUpdates(): void {
this.dataUpdateInterval = setInterval(async () => {
const symbols = ['AAPL', 'GOOGL', 'MSFT', 'TSLA', 'AMZN'];
for (const symbol of symbols) {
if (this.subscriptions.has(symbol)) {
const marketData = this.generateDemoData(symbol);
// Send to subscribed WebSocket clients
const clients = this.subscriptions.get(symbol)!;
const message = JSON.stringify({
type: 'market_data',
data: marketData,
timestamp: new Date().toISOString(),
});
for (const client of clients) {
try {
client.send(message);
} catch (error) {
console.error('Error sending WebSocket message:', error);
clients.delete(client);
}
}
// Publish event
const event: MarketDataEvent = {
type: 'MARKET_DATA',
data: marketData,
timestamp: new Date(),
};
await this.eventPublisher.publishMarketData(event);
}
}
}, this.UPDATE_INTERVAL);
}
/**
* Generate demo market data
*/
private generateDemoData(symbol: string): MarketData {
const basePrice = this.getBasePrice(symbol);
const variation = (Math.random() - 0.5) * 0.02; // ±1% variation
const price = basePrice * (1 + variation);
return {
symbol,
price: Math.round(price * 100) / 100,
bid: Math.round((price - 0.01) * 100) / 100,
ask: Math.round((price + 0.01) * 100) / 100,
volume: Math.floor(Math.random() * 1000000) + 100000,
timestamp: new Date(),
};
}
/**
* Generate demo OHLCV data
*/
private generateDemoOHLCV(symbol: string, limit: number): OHLCV[] {
const basePrice = this.getBasePrice(symbol);
const data: OHLCV[] = [];
let currentPrice = basePrice;
for (let i = limit - 1; i >= 0; i--) {
const variation = (Math.random() - 0.5) * 0.05; // ±2.5% variation
const open = currentPrice;
const close = open * (1 + variation);
const high = Math.max(open, close) * (1 + Math.random() * 0.02);
const low = Math.min(open, close) * (1 - Math.random() * 0.02);
data.push({
symbol,
timestamp: new Date(Date.now() - i * 60000), // 1 minute intervals
open: Math.round(open * 100) / 100,
high: Math.round(high * 100) / 100,
low: Math.round(low * 100) / 100,
close: Math.round(close * 100) / 100,
volume: Math.floor(Math.random() * 50000) + 10000,
});
currentPrice = close;
}
return data;
}
/**
* Get base price for demo data
*/
private getBasePrice(symbol: string): number {
const prices: Record<string, number> = {
'AAPL': 175.50,
'GOOGL': 142.30,
'MSFT': 378.85,
'TSLA': 208.75,
'AMZN': 151.20,
'NVDA': 465.80,
'META': 298.45,
'NFLX': 425.60,
};
return prices[symbol] || 100.00;
}
/**
* Shutdown the service
*/
async shutdown(): Promise<void> {
console.log('🔄 Shutting down Market Data Service...');
if (this.dataUpdateInterval) {
clearInterval(this.dataUpdateInterval);
}
// Close all WebSocket connections
for (const client of this.wsClients) {
client.close();
}
await this.eventPublisher.setServiceHealth('market-data-gateway', 'unhealthy');
await this.eventPublisher.disconnect();
console.log('✅ Market Data Service shutdown complete');
}
}

View file

@ -0,0 +1,18 @@
{
"compilerOptions": {
"target": "ES2022",
"module": "ESNext",
"moduleResolution": "bundler",
"outDir": "./dist",
"rootDir": "./src",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"allowSyntheticDefaultImports": true,
"resolveJsonModule": true,
"types": ["bun-types"]
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist"]
}

View file

@ -0,0 +1,22 @@
{
"name": "risk-guardian",
"version": "1.0.0",
"description": "Real-time risk monitoring and controls service",
"main": "src/index.ts",
"scripts": {
"dev": "bun run --watch src/index.ts",
"start": "bun run src/index.ts",
"test": "echo 'No tests yet'"
},
"dependencies": {
"hono": "^4.6.3",
"ioredis": "^5.4.1",
"@stock-bot/config": "workspace:*",
"@stock-bot/shared-types": "workspace:*",
"ws": "^8.18.0"
},
"devDependencies": {
"bun-types": "^1.2.15",
"@types/ws": "^8.5.12"
}
}