From fc4cf71a70c6d4356207fefe0d6d4c9c0c574557 Mon Sep 17 00:00:00 2001 From: Bojan Kucera Date: Wed, 4 Jun 2025 23:50:34 -0400 Subject: [PATCH] finished initial event-bus with streams --- libs/event-bus/src/index.ts | 169 ------------------------------------ 1 file changed, 169 deletions(-) diff --git a/libs/event-bus/src/index.ts b/libs/event-bus/src/index.ts index d8e0c06..e69de29 100644 --- a/libs/event-bus/src/index.ts +++ b/libs/event-bus/src/index.ts @@ -1,169 +0,0 @@ -import { EventEmitter } from 'eventemitter3'; -import Redis from 'ioredis'; -import { createLogger } from '@stock-bot/logger'; -import { dragonflyConfig } from '@stock-bot/config'; - -export interface EventBusMessage { - id: string; - type: string; - source: string; - timestamp: number; - data: any; - metadata?: Record; -} - -export interface EventHandler { - (message: EventBusMessage & { data: T }): Promise | void; -} - -export interface EventBusOptions { - - serviceName: string; - enablePersistence?: boolean; -} - -export class EventBus extends EventEmitter { - private redis?: Redis; - private subscriber?: Redis; - private serviceName: string; - private logger - private enablePersistence: boolean; - - constructor(options: EventBusOptions) { - super(); - this.serviceName = options.serviceName; - this.enablePersistence = options.enablePersistence ?? true; - this.logger = createLogger(`event-bus:${this.serviceName}`); - - this.redis = new Redis({ - host: dragonflyConfig.DRAGONFLY_HOST, - port: dragonflyConfig.DRAGONFLY_PORT, - password: dragonflyConfig.DRAGONFLY_PASSWORD, - db: dragonflyConfig.DRAGONFLY_DATABASE || 0, - maxRetriesPerRequest: dragonflyConfig.DRAGONFLY_MAX_RETRIES, - }); - - this.subscriber = new Redis({ - host: dragonflyConfig.DRAGONFLY_HOST, - port: dragonflyConfig.DRAGONFLY_PORT, - password: dragonflyConfig.DRAGONFLY_PASSWORD, - db: dragonflyConfig.DRAGONFLY_DATABASE || 0, - }); - - this.subscriber.on('message', this.handleRedisMessage.bind(this)); - this.logger.info('Redis event bus initialized'); - } - - private handleRedisMessage(channel: string, message: string) { - try { - const eventMessage: EventBusMessage = JSON.parse(message); - - // Don't process our own messages - if (eventMessage.source === this.serviceName) { - return; - } - - this.emit(eventMessage.type, eventMessage); - this.logger.debug(`Received event: ${eventMessage.type} from ${eventMessage.source}`); - } catch (error) { - this.logger.error('Failed to parse Redis message', { error, message }); - } - } - - async publish(type: string, data: T, metadata?: Record): Promise { - const message: EventBusMessage = { - id: this.generateId(), - type, - source: this.serviceName, - timestamp: Date.now(), - data, - metadata, - }; - - // Emit locally first - this.emit(type, message); - - // Publish to Redis if available - if (this.redis && this.enablePersistence) { - try { - await this.redis.publish(`events:${type}`, JSON.stringify(message)); - this.logger.debug(`Published event: ${type}`, { messageId: message.id }); - } catch (error) { - this.logger.error(`Failed to publish event: ${type}`, { error, messageId: message.id }); - } - } - } - - async subscribe(eventType: string, handler: EventHandler): Promise { - // Subscribe locally - this.on(eventType, handler); - - // Subscribe to Redis channel if available - if (this.subscriber && this.enablePersistence) { - try { - await this.subscriber.subscribe(`events:${eventType}`); - this.logger.debug(`Subscribed to event: ${eventType}`); - } catch (error) { - this.logger.error(`Failed to subscribe to event: ${eventType}`, { error }); - } - } - } - - async unsubscribe(eventType: string, handler?: EventHandler): Promise { - if (handler) { - this.off(eventType, handler); - } else { - this.removeAllListeners(eventType); - } - - if (this.subscriber && this.enablePersistence) { - try { - await this.subscriber.unsubscribe(`events:${eventType}`); - this.logger.debug(`Unsubscribed from event: ${eventType}`); - } catch (error) { - this.logger.error(`Failed to unsubscribe from event: ${eventType}`, { error }); - } - } - } - - async close(): Promise { - if (this.redis) { - await this.redis.quit(); - } - if (this.subscriber) { - await this.subscriber.quit(); - } - this.removeAllListeners(); - this.logger.info('Event bus closed'); - } - - private generateId(): string { - return `${this.serviceName}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; - } - - // Utility methods for common event patterns - async publishMarketData(symbol: string, data: any): Promise { - await this.publish('market.data', { symbol, ...data }); - } - - async publishOrderUpdate(orderId: string, status: string, data: any): Promise { - await this.publish('order.update', { orderId, status, ...data }); - } - - async publishStrategySignal(strategyId: string, signal: any): Promise { - await this.publish('strategy.signal', { strategyId, ...signal }); - } - - async publishPortfolioUpdate(portfolioId: string, data: any): Promise { - await this.publish('portfolio.update', { portfolioId, ...data }); - } - - async publishBacktestUpdate(backtestId: string, progress: number, data?: any): Promise { - await this.publish('backtest.update', { backtestId, progress, ...data }); - } -} - -// Factory function for easy initialization -export function createEventBus(options: EventBusOptions): EventBus { - return new EventBus(options); -}