import { EventEmitter } from 'eventemitter3'; import Redis from 'ioredis'; import { getLogger } from '@stock-bot/logger'; import type { EventBusConfig, EventBusMessage, EventHandler, EventSubscription, } from './types'; /** * Lightweight Event Bus for inter-service communication * Uses Redis pub/sub for simple, real-time event distribution */ export class EventBus extends EventEmitter { private publisher: Redis; private subscriber: Redis; private readonly serviceName: string; private readonly logger: ReturnType; private subscriptions: Map = new Map(); private isConnected: boolean = false; constructor(config: EventBusConfig) { super(); this.serviceName = config.serviceName; this.logger = getLogger(`event-bus:${this.serviceName}`); // Create Redis connections const redisOptions = { host: config.redisConfig.host, port: config.redisConfig.port, password: config.redisConfig.password, db: config.redisConfig.db || 0, lazyConnect: false, enableOfflineQueue: true, reconnectOnError: (err: Error) => { this.logger.error('Redis connection error:', err); return true; }, }; this.publisher = new Redis(redisOptions); this.subscriber = new Redis(redisOptions); this.setupRedisHandlers(); } private setupRedisHandlers(): void { // Publisher handlers this.publisher.on('connect', () => { this.logger.info('Publisher connected to Redis'); this.isConnected = true; }); this.publisher.on('error', (error) => { this.logger.error('Publisher Redis error:', error); }); // Subscriber handlers this.subscriber.on('connect', () => { this.logger.info('Subscriber connected to Redis'); // Resubscribe to all channels on reconnect this.resubscribeAll(); }); this.subscriber.on('error', (error) => { this.logger.error('Subscriber Redis error:', error); }); // Handle incoming messages this.subscriber.on('message', this.handleMessage.bind(this)); } private handleMessage(channel: string, message: string): void { try { const eventMessage: EventBusMessage = JSON.parse(message); // Skip messages from self if (eventMessage.source === this.serviceName) { return; } // Extract event type from channel (remove 'events:' prefix) const eventType = channel.replace('events:', ''); // Emit locally this.emit(eventType, eventMessage); // Call registered handler if exists const subscription = this.subscriptions.get(eventType); if (subscription?.handler) { Promise.resolve(subscription.handler(eventMessage)).catch((error) => { this.logger.error(`Handler error for event ${eventType}:`, error); }); } this.logger.debug(`Received event: ${eventType} from ${eventMessage.source}`); } catch (error) { this.logger.error('Failed to handle message:', { error, channel, message }); } } /** * Publish an event */ async publish( type: string, data: T, metadata?: Record ): Promise { const message: EventBusMessage = { id: this.generateId(), type, source: this.serviceName, timestamp: Date.now(), data, metadata, }; // Emit locally first this.emit(type, message); // Publish to Redis if (this.isConnected) { try { const channel = `events:${type}`; await this.publisher.publish(channel, JSON.stringify(message)); this.logger.debug(`Published event: ${type}`, { messageId: message.id }); } catch (error) { this.logger.error(`Failed to publish event: ${type}`, { error, messageId: message.id }); throw error; } } else { this.logger.warn(`Not connected to Redis, event ${type} only emitted locally`); } } /** * Subscribe to an event */ async subscribe(eventType: string, handler: EventHandler): Promise { // Register handler this.subscriptions.set(eventType, { channel: `events:${eventType}`, handler }); // Add local listener this.on(eventType, handler); // Subscribe to Redis channel try { const channel = `events:${eventType}`; await this.subscriber.subscribe(channel); this.logger.debug(`Subscribed to event: ${eventType}`); } catch (error) { this.logger.error(`Failed to subscribe to event: ${eventType}`, error); throw error; } } /** * Unsubscribe from an event */ async unsubscribe(eventType: string, handler?: EventHandler): Promise { // Remove specific handler or all handlers if (handler) { this.off(eventType, handler); } else { this.removeAllListeners(eventType); } // Remove from subscriptions this.subscriptions.delete(eventType); // Unsubscribe from Redis try { const channel = `events:${eventType}`; await this.subscriber.unsubscribe(channel); this.logger.debug(`Unsubscribed from event: ${eventType}`); } catch (error) { this.logger.error(`Failed to unsubscribe from event: ${eventType}`, error); } } /** * Resubscribe to all channels (used on reconnect) */ private async resubscribeAll(): Promise { for (const [eventType, subscription] of this.subscriptions.entries()) { try { await this.subscriber.subscribe(subscription.channel); this.logger.debug(`Resubscribed to event: ${eventType}`); } catch (error) { this.logger.error(`Failed to resubscribe to event: ${eventType}`, error); } } } /** * Wait for connection to be established */ async waitForConnection(timeout: number = 5000): Promise { const startTime = Date.now(); while (!this.isConnected && Date.now() - startTime < timeout) { await new Promise(resolve => setTimeout(resolve, 100)); } if (!this.isConnected) { throw new Error(`Failed to connect to Redis within ${timeout}ms`); } } /** * Close all connections */ async close(): Promise { this.isConnected = false; // Clear all subscriptions this.subscriptions.clear(); this.removeAllListeners(); // Close Redis connections await Promise.all([ this.publisher.quit(), this.subscriber.quit(), ]); this.logger.info('Event bus closed'); } /** * Generate unique message ID */ private generateId(): string { return `${this.serviceName}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; } /** * Check if connected to Redis */ get connected(): boolean { return this.isConnected; } /** * Get service name */ get service(): string { return this.serviceName; } }