import { EventEmitter } from 'eventemitter3'; import Redis from 'ioredis'; import { getLogger } from '@stock-bot/logger'; import { dragonflyConfig } from '@stock-bot/config'; export interface EventBusMessage { id: string; type: string; source: string; timestamp: number; data: any; metadata?: Record; } export interface EventHandler { (message: EventBusMessage & { data: T }): Promise | void; } export interface EventBusOptions { serviceName: string; enablePersistence?: boolean; useStreams?: boolean; maxRetries?: number; retryDelay?: number; } export interface StreamConsumerInfo { streamKey: string; groupName: string; consumerName: string; handler: EventHandler; isRunning: boolean; } export class EventBus extends EventEmitter { private redis: Redis; private subscriber?: Redis; private serviceName: string; private logger: any; private enablePersistence: boolean; private useStreams: boolean; private maxRetries: number; private retryDelay: number; private consumers: Map = new Map(); private isRunning: boolean = true; constructor(options: EventBusOptions) { super(); this.serviceName = options.serviceName; this.enablePersistence = options.enablePersistence ?? true; this.useStreams = options.useStreams ?? true; this.maxRetries = options.maxRetries ?? 3; this.retryDelay = options.retryDelay ?? 1000; this.logger = getLogger(`event-bus:${this.serviceName}`); this.redis = new Redis({ host: dragonflyConfig.DRAGONFLY_HOST, port: dragonflyConfig.DRAGONFLY_PORT, password: dragonflyConfig.DRAGONFLY_PASSWORD, db: dragonflyConfig.DRAGONFLY_DATABASE || 0, maxRetriesPerRequest: dragonflyConfig.DRAGONFLY_MAX_RETRIES, lazyConnect: false, }); if (!this.useStreams) { this.subscriber = new Redis({ host: dragonflyConfig.DRAGONFLY_HOST, port: dragonflyConfig.DRAGONFLY_PORT, password: dragonflyConfig.DRAGONFLY_PASSWORD, db: dragonflyConfig.DRAGONFLY_DATABASE || 0, }); this.subscriber.on('message', this.handleRedisMessage.bind(this)); } this.logger.info(`Redis event bus initialized (mode: ${this.useStreams ? 'streams' : 'pub/sub'})`); } private handleRedisMessage(channel: string, message: string) { try { const eventMessage: EventBusMessage = JSON.parse(message); if (eventMessage.source === this.serviceName) { return; } this.emit(eventMessage.type, eventMessage); this.logger.debug(`Received event: ${eventMessage.type} from ${eventMessage.source}`); } catch (error) { this.logger.error('Failed to parse Redis message', { error, message }); } } async publish(type: string, data: T, metadata?: Record): Promise { const message: EventBusMessage = { id: this.generateId(), type, source: this.serviceName, timestamp: Date.now(), data, metadata, }; this.emit(type, message); if (this.redis && this.enablePersistence) { try { if (this.useStreams) { const streamKey = `events:${type}`; const messageId = await this.redis.xadd( streamKey, '*', 'id', message.id, 'type', message.type, 'source', message.source, 'timestamp', message.timestamp.toString(), 'data', JSON.stringify(message.data), 'metadata', JSON.stringify(message.metadata || {}) ); this.logger.debug(`Published event to stream: ${type}`, { messageId, streamId: messageId }); return messageId as string; } else { await this.redis.publish(`events:${type}`, JSON.stringify(message)); this.logger.debug(`Published event via pub/sub: ${type}`, { messageId: message.id }); return message.id; } } catch (error) { this.logger.error(`Failed to publish event: ${type}`, { error, messageId: message.id }); throw error; } } return null; } async subscribe(eventType: string, handler: EventHandler): Promise { this.on(eventType, handler); if (this.redis && this.enablePersistence) { try { if (this.useStreams) { await this.subscribeToStream(eventType, handler); } else { if (this.subscriber) { await this.subscriber.subscribe(`events:${eventType}`); this.logger.debug(`Subscribed to event: ${eventType}`); } } } catch (error) { this.logger.error(`Failed to subscribe to event: ${eventType}`, error); throw error; } } } private async subscribeToStream(eventType: string, handler: EventHandler): Promise { const streamKey = `events:${eventType}`; const groupName = `${eventType}-consumers`; const consumerName = `${this.serviceName}-${Date.now()}`; try { await this.redis.xgroup('CREATE', streamKey, groupName, '$', 'MKSTREAM'); this.logger.debug(`Created consumer group: ${groupName} for stream: ${streamKey}`); } catch (error: any) { if (error.message.includes('BUSYGROUP')) { this.logger.debug(`Consumer group already exists: ${groupName}`); } else { throw error; } } const consumerInfo: StreamConsumerInfo = { streamKey, groupName, consumerName, handler, isRunning: true, }; this.consumers.set(`${eventType}-${consumerName}`, consumerInfo); this.startStreamConsumer(consumerInfo); this.logger.debug(`Started stream consumer for: ${eventType}`); } private async startStreamConsumer(consumerInfo: StreamConsumerInfo): Promise { const { streamKey, groupName, consumerName, handler } = consumerInfo; let retryCount = 0; while (consumerInfo.isRunning && this.isRunning) { try { await this.claimPendingMessages(streamKey, groupName, consumerName, handler); const messages = await this.redis.xreadgroup( 'GROUP', groupName, consumerName, 'COUNT', 10, 'BLOCK', 1000, 'STREAMS', streamKey, '>' ); if (!messages || messages.length === 0) { retryCount = 0; continue; } for (const [stream, msgs] of messages as [string, [string, string[]][]][]) { for (const [msgId, fields] of msgs) { await this.processStreamMessage(msgId, fields, streamKey, groupName, handler); } } retryCount = 0; } catch (error: any) { retryCount++; if (error.message.includes('NOGROUP')) { this.logger.warn(`Consumer group deleted, recreating: ${groupName}`); try { await this.redis.xgroup('CREATE', streamKey, groupName, '$', 'MKSTREAM'); retryCount = 0; } catch (createError) { this.logger.error('Failed to recreate consumer group:', { error: createError }); } } else { this.logger.error('Error reading from stream:', { error, retryCount }); } if (retryCount >= this.maxRetries) { this.logger.error(`Max retries reached for consumer ${consumerName}, stopping`); consumerInfo.isRunning = false; break; } const backoffDelay = Math.min(this.retryDelay * Math.pow(2, retryCount - 1), 30000); await this.sleep(backoffDelay); } } this.logger.info(`Stream consumer stopped: ${consumerName}`); } private async processStreamMessage( msgId: string, fields: string[], streamKey: string, groupName: string, handler: EventHandler ): Promise { let retryCount = 0; while (retryCount < this.maxRetries) { try { const message = this.parseStreamMessage(fields); if (message.source === this.serviceName) { await this.redis.xack(streamKey, groupName, msgId); return; } await handler(message); await this.redis.xack(streamKey, groupName, msgId); this.logger.debug(`Processed stream message: ${msgId}`, { eventType: message.type, source: message.source }); return; } catch (error) { retryCount++; this.logger.error(`Error processing stream message ${msgId} (attempt ${retryCount}):`, error); if (retryCount >= this.maxRetries) { await this.moveToDeadLetterQueue(msgId, fields, streamKey, groupName, error); return; } await this.sleep(this.retryDelay * retryCount); } } } private async claimPendingMessages( streamKey: string, groupName: string, consumerName: string, handler: EventHandler ): Promise { try { const pendingMessages = await this.redis.xpending( streamKey, groupName, '-', '+', 10 ) as any[]; if (!pendingMessages || pendingMessages.length === 0) { return; } const oldMessages = pendingMessages.filter((msg: any[]) => { return msg[2] > 60000; }); if (oldMessages.length === 0) { return; } const messageIds = oldMessages.map((msg: any[]) => msg[0]); const claimedMessages = await this.redis.xclaim( streamKey, groupName, consumerName, 60000, ...messageIds ) as [string, string[]][]; for (const [msgId, fields] of claimedMessages) { await this.processStreamMessage(msgId, fields, streamKey, groupName, handler); } this.logger.debug(`Claimed and processed ${claimedMessages.length} pending messages`); } catch (error) { this.logger.error('Error claiming pending messages:', error); } } private async moveToDeadLetterQueue( msgId: string, fields: string[], streamKey: string, groupName: string, error: any ): Promise { try { const dlqKey = `${streamKey}:dlq`; const message = this.parseStreamMessage(fields); await this.redis.xadd( dlqKey, '*', 'original_id', msgId, 'original_stream', streamKey, 'error', (error as Error).message || 'Unknown error', 'timestamp', Date.now().toString(), 'id', message.id, 'type', message.type, 'source', message.source, 'data', JSON.stringify(message.data), 'metadata', JSON.stringify(message.metadata || {}) ); await this.redis.xack(streamKey, groupName, msgId); this.logger.warn(`Moved message ${msgId} to dead letter queue: ${dlqKey}`, { error: (error as Error).message }); } catch (dlqError) { this.logger.error(`Failed to move message ${msgId} to dead letter queue:`, { error: dlqError }); } } private parseStreamMessage(fields: string[]): EventBusMessage { const fieldMap: Record = {}; for (let i = 0; i < fields.length; i += 2) { fieldMap[fields[i]] = fields[i + 1]; } return { id: fieldMap.id, type: fieldMap.type || 'unknown', source: fieldMap.source, timestamp: parseInt(fieldMap.timestamp) || Date.now(), data: fieldMap.data ? JSON.parse(fieldMap.data) : {}, metadata: fieldMap.metadata ? JSON.parse(fieldMap.metadata) : {}, }; } private sleep(ms: number): Promise { return new Promise(resolve => setTimeout(resolve, ms)); } async unsubscribe(eventType: string, handler?: EventHandler): Promise { if (handler) { this.off(eventType, handler); } else { this.removeAllListeners(eventType); } if (this.enablePersistence) { try { if (this.useStreams) { const consumersToStop = Array.from(this.consumers.entries()) .filter(([key]) => key.startsWith(`${eventType}-`)); for (const [key, consumerInfo] of consumersToStop) { consumerInfo.isRunning = false; this.consumers.delete(key); } this.logger.debug(`Stopped stream consumers for: ${eventType}`); } else { if (this.subscriber) { await this.subscriber.unsubscribe(`events:${eventType}`); this.logger.debug(`Unsubscribed from event: ${eventType}`); } } } catch (error) { this.logger.error(`Failed to unsubscribe from event: ${eventType}`, error); } } } async close(): Promise { this.isRunning = false; for (const consumerInfo of this.consumers.values()) { consumerInfo.isRunning = false; } this.consumers.clear(); if (this.redis) { await this.redis.quit(); } if (this.subscriber) { await this.subscriber.quit(); } this.removeAllListeners(); this.logger.info('Event bus closed'); } private generateId(): string { return `${this.serviceName}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; } async getStreamInfo(eventType: string): Promise { if (!this.useStreams) { throw new Error('Stream info only available when using Redis Streams'); } const streamKey = `events:${eventType}`; try { return await this.redis.xinfo('STREAM', streamKey); } catch (error) { this.logger.error(`Failed to get stream info for: ${eventType}`, error); throw error; } } async getStreamLength(eventType: string): Promise { if (!this.useStreams) { throw new Error('Stream length only available when using Redis Streams'); } const streamKey = `events:${eventType}`; try { return await this.redis.xlen(streamKey); } catch (error) { this.logger.error(`Failed to get stream length for: ${eventType}`, error); return 0; } } async readStreamHistory( eventType: string, startId: string = '-', endId: string = '+', count?: number ): Promise { if (!this.useStreams) { throw new Error('Stream history only available when using Redis Streams'); } const streamKey = `events:${eventType}`; try { let messages: [string, string[]][]; if (count) { messages = await this.redis.xrange(streamKey, startId, endId, 'COUNT', count) as [string, string[]][]; } else { messages = await this.redis.xrange(streamKey, startId, endId) as [string, string[]][]; } return messages.map(([id, fields]) => ({ ...this.parseStreamMessage(fields), id })); } catch (error) { this.logger.error(`Failed to read stream history for: ${eventType}`, error); return []; } } async trimStream(eventType: string, maxLength: number): Promise { if (!this.useStreams) { throw new Error('Stream trimming only available when using Redis Streams'); } const streamKey = `events:${eventType}`; try { return await this.redis.xtrim(streamKey, 'MAXLEN', '~', maxLength); } catch (error) { this.logger.error(`Failed to trim stream: ${eventType}`, error); return 0; } } async replayEventsFromTimestamp( eventType: string, fromTimestamp: number, handler: EventHandler, speed: number = 1 ): Promise { if (!this.useStreams) { throw new Error('Event replay only available when using Redis Streams'); } const events = await this.readStreamHistory(eventType); const filteredEvents = events.filter(event => event.timestamp >= fromTimestamp); this.logger.info(`Replaying ${filteredEvents.length} events from ${new Date(fromTimestamp)}`); for (let i = 0; i < filteredEvents.length; i++) { const event = filteredEvents[i]; const nextEvent = filteredEvents[i + 1]; try { await handler(event); if (nextEvent && speed > 0) { const delay = (nextEvent.timestamp - event.timestamp) / speed; if (delay > 0) { await this.sleep(Math.min(delay, 1000)); } } } catch (error) { this.logger.error(`Error replaying event: ${event.id}`, error); } } this.logger.info('Event replay completed'); } } export function createEventBus(options: EventBusOptions): EventBus { return new EventBus(options); }