diff --git a/libs/event-bus/README.md b/libs/event-bus/README.md new file mode 100644 index 0000000..a6c03e3 --- /dev/null +++ b/libs/event-bus/README.md @@ -0,0 +1,191 @@ +# @stock-bot/event-bus + +Lightweight event bus for inter-service communication in the Stock Bot platform. + +## Overview + +This library provides a simple pub/sub event system using Redis, designed for real-time event distribution between microservices. It focuses on simplicity and reliability for event-driven communication. + +## Features + +- Simple pub/sub pattern using Redis +- Automatic reconnection and resubscription +- Local event emission (works even without Redis) +- TypeScript support with predefined trading event types +- Lightweight with minimal dependencies + +## Installation + +```bash +bun add @stock-bot/event-bus +``` + +## Usage + +### Basic Setup + +```typescript +import { createEventBus, TradingEventType } from '@stock-bot/event-bus'; + +const eventBus = createEventBus({ + serviceName: 'data-service', + redisConfig: { + host: 'localhost', + port: 6379, + }, + enableLogging: true, +}); + +// Wait for connection +await eventBus.waitForConnection(); +``` + +### Publishing Events + +```typescript +// Publish a price update +await eventBus.publish(TradingEventType.PRICE_UPDATE, { + symbol: 'AAPL', + price: 150.25, + volume: 1000000, + timestamp: Date.now(), +}); + +// Publish with metadata +await eventBus.publish(TradingEventType.ORDER_FILLED, + { + orderId: '12345', + symbol: 'TSLA', + side: 'buy', + quantity: 100, + price: 250.50, + }, + { source: 'ib-gateway', region: 'us' } +); +``` + +### Subscribing to Events + +```typescript +// Subscribe to price updates +await eventBus.subscribe(TradingEventType.PRICE_UPDATE, async (message) => { + console.log(`Price update for ${message.data.symbol}: $${message.data.price}`); +}); + +// Subscribe to order events +await eventBus.subscribe(TradingEventType.ORDER_FILLED, async (message) => { + const { orderId, symbol, quantity, price } = message.data; + console.log(`Order ${orderId} filled: ${quantity} ${symbol} @ $${price}`); +}); +``` + +### Event Types + +The library includes predefined event types for common trading operations: + +```typescript +enum TradingEventType { + // Market data events + PRICE_UPDATE = 'market.price.update', + ORDERBOOK_UPDATE = 'market.orderbook.update', + TRADE_EXECUTED = 'market.trade.executed', + + // Order events + ORDER_CREATED = 'order.created', + ORDER_FILLED = 'order.filled', + ORDER_CANCELLED = 'order.cancelled', + ORDER_REJECTED = 'order.rejected', + + // Position events + POSITION_OPENED = 'position.opened', + POSITION_CLOSED = 'position.closed', + POSITION_UPDATED = 'position.updated', + + // Strategy events + STRATEGY_SIGNAL = 'strategy.signal', + STRATEGY_STARTED = 'strategy.started', + STRATEGY_STOPPED = 'strategy.stopped', + + // Risk events + RISK_LIMIT_BREACH = 'risk.limit.breach', + RISK_WARNING = 'risk.warning', + + // System events + SERVICE_STARTED = 'system.service.started', + SERVICE_STOPPED = 'system.service.stopped', + SERVICE_ERROR = 'system.service.error', +} +``` + +### Typed Events + +Use TypeScript generics for type-safe event handling: + +```typescript +import type { PriceUpdateEvent, OrderEvent } from '@stock-bot/event-bus'; + +// Type-safe subscription +await eventBus.subscribe( + TradingEventType.PRICE_UPDATE, + async (message) => { + // message.data is typed as PriceUpdateEvent + const { symbol, price, volume } = message.data; + } +); +``` + +### Cleanup + +```typescript +// Unsubscribe from specific event +await eventBus.unsubscribe(TradingEventType.PRICE_UPDATE); + +// Close all connections +await eventBus.close(); +``` + +## Architecture Notes + +This library is designed for lightweight, real-time event distribution. For reliable job processing, retries, and persistence, use the `@stock-bot/queue` library with BullMQ instead. + +### When to Use Event Bus + +- Real-time notifications (price updates, trade executions) +- Service coordination (strategy signals, risk alerts) +- System monitoring (service status, errors) + +### When to Use Queue + +- Data processing jobs +- Batch operations +- Tasks requiring persistence and retries +- Scheduled operations + +## Error Handling + +The event bus handles connection failures gracefully: + +```typescript +try { + await eventBus.publish(TradingEventType.PRICE_UPDATE, data); +} catch (error) { + // Event will still be emitted locally + console.error('Failed to publish to Redis:', error); +} +``` + +## Development + +```bash +# Install dependencies +bun install + +# Build +bun run build + +# Run tests +bun test + +# Clean build artifacts +bun run clean +``` \ No newline at end of file diff --git a/libs/event-bus/package.json b/libs/event-bus/package.json index e6595b6..4438893 100644 --- a/libs/event-bus/package.json +++ b/libs/event-bus/package.json @@ -25,6 +25,11 @@ "import": "./dist/index.js", "require": "./dist/index.js", "types": "./dist/index.d.ts" + }, + "./types": { + "import": "./dist/types.js", + "require": "./dist/types.js", + "types": "./dist/types.d.ts" } }, "files": [ diff --git a/libs/event-bus/src/event-bus.ts b/libs/event-bus/src/event-bus.ts new file mode 100644 index 0000000..749f613 --- /dev/null +++ b/libs/event-bus/src/event-bus.ts @@ -0,0 +1,251 @@ +import { EventEmitter } from 'eventemitter3'; +import Redis from 'ioredis'; +import { getLogger } from '@stock-bot/logger'; +import type { + EventBusConfig, + EventBusMessage, + EventHandler, + EventSubscription, +} from './types'; + +/** + * Lightweight Event Bus for inter-service communication + * Uses Redis pub/sub for simple, real-time event distribution + */ +export class EventBus extends EventEmitter { + private publisher: Redis; + private subscriber: Redis; + private readonly serviceName: string; + private readonly logger: ReturnType; + private subscriptions: Map = new Map(); + private isConnected: boolean = false; + + constructor(config: EventBusConfig) { + super(); + this.serviceName = config.serviceName; + this.logger = getLogger(`event-bus:${this.serviceName}`); + + // Create Redis connections + const redisOptions = { + host: config.redisConfig.host, + port: config.redisConfig.port, + password: config.redisConfig.password, + db: config.redisConfig.db || 0, + lazyConnect: false, + enableOfflineQueue: true, + reconnectOnError: (err: Error) => { + this.logger.error('Redis connection error:', err); + return true; + }, + }; + + this.publisher = new Redis(redisOptions); + this.subscriber = new Redis(redisOptions); + + this.setupRedisHandlers(); + } + + private setupRedisHandlers(): void { + // Publisher handlers + this.publisher.on('connect', () => { + this.logger.info('Publisher connected to Redis'); + this.isConnected = true; + }); + + this.publisher.on('error', (error) => { + this.logger.error('Publisher Redis error:', error); + }); + + // Subscriber handlers + this.subscriber.on('connect', () => { + this.logger.info('Subscriber connected to Redis'); + // Resubscribe to all channels on reconnect + this.resubscribeAll(); + }); + + this.subscriber.on('error', (error) => { + this.logger.error('Subscriber Redis error:', error); + }); + + // Handle incoming messages + this.subscriber.on('message', this.handleMessage.bind(this)); + } + + private handleMessage(channel: string, message: string): void { + try { + const eventMessage: EventBusMessage = JSON.parse(message); + + // Skip messages from self + if (eventMessage.source === this.serviceName) { + return; + } + + // Extract event type from channel (remove 'events:' prefix) + const eventType = channel.replace('events:', ''); + + // Emit locally + this.emit(eventType, eventMessage); + + // Call registered handler if exists + const subscription = this.subscriptions.get(eventType); + if (subscription?.handler) { + Promise.resolve(subscription.handler(eventMessage)).catch((error) => { + this.logger.error(`Handler error for event ${eventType}:`, error); + }); + } + + this.logger.debug(`Received event: ${eventType} from ${eventMessage.source}`); + } catch (error) { + this.logger.error('Failed to handle message:', { error, channel, message }); + } + } + + /** + * Publish an event + */ + async publish( + type: string, + data: T, + metadata?: Record + ): Promise { + const message: EventBusMessage = { + id: this.generateId(), + type, + source: this.serviceName, + timestamp: Date.now(), + data, + metadata, + }; + + // Emit locally first + this.emit(type, message); + + // Publish to Redis + if (this.isConnected) { + try { + const channel = `events:${type}`; + await this.publisher.publish(channel, JSON.stringify(message)); + this.logger.debug(`Published event: ${type}`, { messageId: message.id }); + } catch (error) { + this.logger.error(`Failed to publish event: ${type}`, { error, messageId: message.id }); + throw error; + } + } else { + this.logger.warn(`Not connected to Redis, event ${type} only emitted locally`); + } + } + + /** + * Subscribe to an event + */ + async subscribe(eventType: string, handler: EventHandler): Promise { + // Register handler + this.subscriptions.set(eventType, { channel: `events:${eventType}`, handler }); + + // Add local listener + this.on(eventType, handler); + + // Subscribe to Redis channel + try { + const channel = `events:${eventType}`; + await this.subscriber.subscribe(channel); + this.logger.debug(`Subscribed to event: ${eventType}`); + } catch (error) { + this.logger.error(`Failed to subscribe to event: ${eventType}`, error); + throw error; + } + } + + /** + * Unsubscribe from an event + */ + async unsubscribe(eventType: string, handler?: EventHandler): Promise { + // Remove specific handler or all handlers + if (handler) { + this.off(eventType, handler); + } else { + this.removeAllListeners(eventType); + } + + // Remove from subscriptions + this.subscriptions.delete(eventType); + + // Unsubscribe from Redis + try { + const channel = `events:${eventType}`; + await this.subscriber.unsubscribe(channel); + this.logger.debug(`Unsubscribed from event: ${eventType}`); + } catch (error) { + this.logger.error(`Failed to unsubscribe from event: ${eventType}`, error); + } + } + + /** + * Resubscribe to all channels (used on reconnect) + */ + private async resubscribeAll(): Promise { + for (const [eventType, subscription] of this.subscriptions.entries()) { + try { + await this.subscriber.subscribe(subscription.channel); + this.logger.debug(`Resubscribed to event: ${eventType}`); + } catch (error) { + this.logger.error(`Failed to resubscribe to event: ${eventType}`, error); + } + } + } + + /** + * Wait for connection to be established + */ + async waitForConnection(timeout: number = 5000): Promise { + const startTime = Date.now(); + + while (!this.isConnected && Date.now() - startTime < timeout) { + await new Promise(resolve => setTimeout(resolve, 100)); + } + + if (!this.isConnected) { + throw new Error(`Failed to connect to Redis within ${timeout}ms`); + } + } + + /** + * Close all connections + */ + async close(): Promise { + this.isConnected = false; + + // Clear all subscriptions + this.subscriptions.clear(); + this.removeAllListeners(); + + // Close Redis connections + await Promise.all([ + this.publisher.quit(), + this.subscriber.quit(), + ]); + + this.logger.info('Event bus closed'); + } + + /** + * Generate unique message ID + */ + private generateId(): string { + return `${this.serviceName}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; + } + + /** + * Check if connected to Redis + */ + get connected(): boolean { + return this.isConnected; + } + + /** + * Get service name + */ + get service(): string { + return this.serviceName; + } +} \ No newline at end of file diff --git a/libs/event-bus/src/index.ts b/libs/event-bus/src/index.ts index 085712d..d92979e 100644 --- a/libs/event-bus/src/index.ts +++ b/libs/event-bus/src/index.ts @@ -1,599 +1,16 @@ -import { EventEmitter } from 'eventemitter3'; -import Redis from 'ioredis'; -import { getLogger } from '@stock-bot/logger'; +import { EventBus } from './event-bus'; +import type { EventBusConfig } from './types'; -export interface EventBusMessage { - id: string; - type: string; - source: string; - timestamp: number; - data: any; - metadata?: Record; +/** + * Create a new event bus instance + */ +export function createEventBus(config: EventBusConfig): EventBus { + return new EventBus(config); } -export interface EventHandler { - (message: EventBusMessage & { data: T }): Promise | void; -} +// Re-export everything +export { EventBus } from './event-bus'; +export * from './types'; -export interface RedisConfig { - host: string; - port: number; - password?: string; - db?: number; - maxRetriesPerRequest?: number; -} - -export interface EventBusOptions { - serviceName: string; - enablePersistence?: boolean; - useStreams?: boolean; - maxRetries?: number; - retryDelay?: number; - redisConfig: RedisConfig; -} - -export interface StreamConsumerInfo { - streamKey: string; - groupName: string; - consumerName: string; - handler: EventHandler; - isRunning: boolean; -} - -export class EventBus extends EventEmitter { - private redis: Redis; - private subscriber?: Redis; - private serviceName: string; - private logger: any; - private enablePersistence: boolean; - private useStreams: boolean; - private maxRetries: number; - private retryDelay: number; - private consumers: Map = new Map(); - private isRunning: boolean = true; - - constructor(options: EventBusOptions) { - super(); - this.serviceName = options.serviceName; - this.enablePersistence = options.enablePersistence ?? true; - this.useStreams = options.useStreams ?? true; - this.maxRetries = options.maxRetries ?? 3; - this.retryDelay = options.retryDelay ?? 1000; - this.logger = getLogger(`event-bus:${this.serviceName}`); - - const { redisConfig } = options; - this.redis = new Redis({ - host: redisConfig.host, - port: redisConfig.port, - password: redisConfig.password, - db: redisConfig.db || 0, - maxRetriesPerRequest: redisConfig.maxRetriesPerRequest || 3, - lazyConnect: false, - }); - - if (!this.useStreams) { - this.subscriber = new Redis({ - host: redisConfig.host, - port: redisConfig.port, - password: redisConfig.password, - db: redisConfig.db || 0, - }); - this.subscriber.on('message', this.handleRedisMessage.bind(this)); - } - - this.logger.info( - `Redis event bus initialized (mode: ${this.useStreams ? 'streams' : 'pub/sub'})` - ); - } - - private handleRedisMessage(channel: string, message: string) { - try { - const eventMessage: EventBusMessage = JSON.parse(message); - - if (eventMessage.source === this.serviceName) { - return; - } - - this.emit(eventMessage.type, eventMessage); - this.logger.debug(`Received event: ${eventMessage.type} from ${eventMessage.source}`); - } catch (error) { - this.logger.error('Failed to parse Redis message', { error, message }); - } - } - - async publish( - type: string, - data: T, - metadata?: Record - ): Promise { - const message: EventBusMessage = { - id: this.generateId(), - type, - source: this.serviceName, - timestamp: Date.now(), - data, - metadata, - }; - - this.emit(type, message); - - if (this.redis && this.enablePersistence) { - try { - if (this.useStreams) { - const streamKey = `events:${type}`; - const messageId = await this.redis.xadd( - streamKey, - '*', - 'id', - message.id, - 'type', - message.type, - 'source', - message.source, - 'timestamp', - message.timestamp.toString(), - 'data', - JSON.stringify(message.data), - 'metadata', - JSON.stringify(message.metadata || {}) - ); - - this.logger.debug(`Published event to stream: ${type}`, { - messageId, - streamId: messageId, - }); - return messageId as string; - } else { - await this.redis.publish(`events:${type}`, JSON.stringify(message)); - this.logger.debug(`Published event via pub/sub: ${type}`, { messageId: message.id }); - return message.id; - } - } catch (error) { - this.logger.error(`Failed to publish event: ${type}`, { error, messageId: message.id }); - throw error; - } - } - - return null; - } - - async subscribe(eventType: string, handler: EventHandler): Promise { - this.on(eventType, handler); - - if (this.redis && this.enablePersistence) { - try { - if (this.useStreams) { - await this.subscribeToStream(eventType, handler); - } else { - if (this.subscriber) { - await this.subscriber.subscribe(`events:${eventType}`); - this.logger.debug(`Subscribed to event: ${eventType}`); - } - } - } catch (error) { - this.logger.error(`Failed to subscribe to event: ${eventType}`, error); - throw error; - } - } - } - - private async subscribeToStream( - eventType: string, - handler: EventHandler - ): Promise { - const streamKey = `events:${eventType}`; - const groupName = `${eventType}-consumers`; - const consumerName = `${this.serviceName}-${Date.now()}`; - - try { - await this.redis.xgroup('CREATE', streamKey, groupName, '$', 'MKSTREAM'); - this.logger.debug(`Created consumer group: ${groupName} for stream: ${streamKey}`); - } catch (error: any) { - if (error.message.includes('BUSYGROUP')) { - this.logger.debug(`Consumer group already exists: ${groupName}`); - } else { - throw error; - } - } - - const consumerInfo: StreamConsumerInfo = { - streamKey, - groupName, - consumerName, - handler, - isRunning: true, - }; - - this.consumers.set(`${eventType}-${consumerName}`, consumerInfo); - this.startStreamConsumer(consumerInfo); - this.logger.debug(`Started stream consumer for: ${eventType}`); - } - - private async startStreamConsumer(consumerInfo: StreamConsumerInfo): Promise { - const { streamKey, groupName, consumerName, handler } = consumerInfo; - let retryCount = 0; - - while (consumerInfo.isRunning && this.isRunning) { - try { - await this.claimPendingMessages(streamKey, groupName, consumerName, handler); - - const messages = await this.redis.xreadgroup( - 'GROUP', - groupName, - consumerName, - 'COUNT', - 10, - 'BLOCK', - 1000, - 'STREAMS', - streamKey, - '>' - ); - - if (!messages || messages.length === 0) { - retryCount = 0; - continue; - } - - for (const [stream, msgs] of messages as [string, [string, string[]][]][]) { - for (const [msgId, fields] of msgs) { - await this.processStreamMessage(msgId, fields, streamKey, groupName, handler); - } - } - - retryCount = 0; - } catch (error: any) { - retryCount++; - - if (error.message.includes('NOGROUP')) { - this.logger.warn(`Consumer group deleted, recreating: ${groupName}`); - try { - await this.redis.xgroup('CREATE', streamKey, groupName, '$', 'MKSTREAM'); - retryCount = 0; - } catch (createError) { - this.logger.error('Failed to recreate consumer group:', { error: createError }); - } - } else { - this.logger.error('Error reading from stream:', { error, retryCount }); - } - - if (retryCount >= this.maxRetries) { - this.logger.error(`Max retries reached for consumer ${consumerName}, stopping`); - consumerInfo.isRunning = false; - break; - } - - const backoffDelay = Math.min(this.retryDelay * Math.pow(2, retryCount - 1), 30000); - await this.sleep(backoffDelay); - } - } - - this.logger.info(`Stream consumer stopped: ${consumerName}`); - } - - private async processStreamMessage( - msgId: string, - fields: string[], - streamKey: string, - groupName: string, - handler: EventHandler - ): Promise { - let retryCount = 0; - - while (retryCount < this.maxRetries) { - try { - const message = this.parseStreamMessage(fields); - - if (message.source === this.serviceName) { - await this.redis.xack(streamKey, groupName, msgId); - return; - } - - await handler(message); - await this.redis.xack(streamKey, groupName, msgId); - - this.logger.debug(`Processed stream message: ${msgId}`, { - eventType: message.type, - source: message.source, - }); - - return; - } catch (error) { - retryCount++; - this.logger.error( - `Error processing stream message ${msgId} (attempt ${retryCount}):`, - error - ); - - if (retryCount >= this.maxRetries) { - await this.moveToDeadLetterQueue(msgId, fields, streamKey, groupName, error); - return; - } - - await this.sleep(this.retryDelay * retryCount); - } - } - } - - private async claimPendingMessages( - streamKey: string, - groupName: string, - consumerName: string, - handler: EventHandler - ): Promise { - try { - const pendingMessages = (await this.redis.xpending( - streamKey, - groupName, - '-', - '+', - 10 - )) as any[]; - - if (!pendingMessages || pendingMessages.length === 0) { - return; - } - - const oldMessages = pendingMessages.filter((msg: any[]) => { - return msg[2] > 60000; - }); - - if (oldMessages.length === 0) { - return; - } - - const messageIds = oldMessages.map((msg: any[]) => msg[0]); - const claimedMessages = (await this.redis.xclaim( - streamKey, - groupName, - consumerName, - 60000, - ...messageIds - )) as [string, string[]][]; - - for (const [msgId, fields] of claimedMessages) { - await this.processStreamMessage(msgId, fields, streamKey, groupName, handler); - } - - this.logger.debug(`Claimed and processed ${claimedMessages.length} pending messages`); - } catch (error) { - this.logger.error('Error claiming pending messages:', error); - } - } - - private async moveToDeadLetterQueue( - msgId: string, - fields: string[], - streamKey: string, - groupName: string, - error: any - ): Promise { - try { - const dlqKey = `${streamKey}:dlq`; - const message = this.parseStreamMessage(fields); - - await this.redis.xadd( - dlqKey, - '*', - 'original_id', - msgId, - 'original_stream', - streamKey, - 'error', - (error as Error).message || 'Unknown error', - 'timestamp', - Date.now().toString(), - 'id', - message.id, - 'type', - message.type, - 'source', - message.source, - 'data', - JSON.stringify(message.data), - 'metadata', - JSON.stringify(message.metadata || {}) - ); - - await this.redis.xack(streamKey, groupName, msgId); - - this.logger.warn(`Moved message ${msgId} to dead letter queue: ${dlqKey}`, { - error: (error as Error).message, - }); - } catch (dlqError) { - this.logger.error(`Failed to move message ${msgId} to dead letter queue:`, { - error: dlqError, - }); - } - } - - private parseStreamMessage(fields: string[]): EventBusMessage { - const fieldMap: Record = {}; - - for (let i = 0; i < fields.length; i += 2) { - fieldMap[fields[i]] = fields[i + 1]; - } - - return { - id: fieldMap.id, - type: fieldMap.type || 'unknown', - source: fieldMap.source, - timestamp: parseInt(fieldMap.timestamp) || Date.now(), - data: fieldMap.data ? JSON.parse(fieldMap.data) : {}, - metadata: fieldMap.metadata ? JSON.parse(fieldMap.metadata) : {}, - }; - } - - private sleep(ms: number): Promise { - return new Promise(resolve => setTimeout(resolve, ms)); - } - - async unsubscribe(eventType: string, handler?: EventHandler): Promise { - if (handler) { - this.off(eventType, handler); - } else { - this.removeAllListeners(eventType); - } - - if (this.enablePersistence) { - try { - if (this.useStreams) { - const consumersToStop = Array.from(this.consumers.entries()).filter(([key]) => - key.startsWith(`${eventType}-`) - ); - - for (const [key, consumerInfo] of consumersToStop) { - consumerInfo.isRunning = false; - this.consumers.delete(key); - } - - this.logger.debug(`Stopped stream consumers for: ${eventType}`); - } else { - if (this.subscriber) { - await this.subscriber.unsubscribe(`events:${eventType}`); - this.logger.debug(`Unsubscribed from event: ${eventType}`); - } - } - } catch (error) { - this.logger.error(`Failed to unsubscribe from event: ${eventType}`, error); - } - } - } - - async close(): Promise { - this.isRunning = false; - - for (const consumerInfo of this.consumers.values()) { - consumerInfo.isRunning = false; - } - this.consumers.clear(); - - if (this.redis) { - await this.redis.quit(); - } - if (this.subscriber) { - await this.subscriber.quit(); - } - - this.removeAllListeners(); - this.logger.info('Event bus closed'); - } - - private generateId(): string { - return `${this.serviceName}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; - } - - async getStreamInfo(eventType: string): Promise { - if (!this.useStreams) { - throw new Error('Stream info only available when using Redis Streams'); - } - - const streamKey = `events:${eventType}`; - try { - return await this.redis.xinfo('STREAM', streamKey); - } catch (error) { - this.logger.error(`Failed to get stream info for: ${eventType}`, error); - throw error; - } - } - - async getStreamLength(eventType: string): Promise { - if (!this.useStreams) { - throw new Error('Stream length only available when using Redis Streams'); - } - - const streamKey = `events:${eventType}`; - try { - return await this.redis.xlen(streamKey); - } catch (error) { - this.logger.error(`Failed to get stream length for: ${eventType}`, error); - return 0; - } - } - async readStreamHistory( - eventType: string, - startId: string = '-', - endId: string = '+', - count?: number - ): Promise { - if (!this.useStreams) { - throw new Error('Stream history only available when using Redis Streams'); - } - - const streamKey = `events:${eventType}`; - try { - let messages: [string, string[]][]; - - if (count) { - messages = (await this.redis.xrange(streamKey, startId, endId, 'COUNT', count)) as [ - string, - string[], - ][]; - } else { - messages = (await this.redis.xrange(streamKey, startId, endId)) as [string, string[]][]; - } - - return messages.map(([id, fields]) => ({ - ...this.parseStreamMessage(fields), - id, - })); - } catch (error) { - this.logger.error(`Failed to read stream history for: ${eventType}`, error); - return []; - } - } - - async trimStream(eventType: string, maxLength: number): Promise { - if (!this.useStreams) { - throw new Error('Stream trimming only available when using Redis Streams'); - } - - const streamKey = `events:${eventType}`; - try { - return await this.redis.xtrim(streamKey, 'MAXLEN', '~', maxLength); - } catch (error) { - this.logger.error(`Failed to trim stream: ${eventType}`, error); - return 0; - } - } - - async replayEventsFromTimestamp( - eventType: string, - fromTimestamp: number, - handler: EventHandler, - speed: number = 1 - ): Promise { - if (!this.useStreams) { - throw new Error('Event replay only available when using Redis Streams'); - } - - const events = await this.readStreamHistory(eventType); - const filteredEvents = events.filter(event => event.timestamp >= fromTimestamp); - - this.logger.info(`Replaying ${filteredEvents.length} events from ${new Date(fromTimestamp)}`); - - for (let i = 0; i < filteredEvents.length; i++) { - const event = filteredEvents[i]; - const nextEvent = filteredEvents[i + 1]; - - try { - await handler(event); - - if (nextEvent && speed > 0) { - const delay = (nextEvent.timestamp - event.timestamp) / speed; - if (delay > 0) { - await this.sleep(Math.min(delay, 1000)); - } - } - } catch (error) { - this.logger.error(`Error replaying event: ${event.id}`, error); - } - } - - this.logger.info('Event replay completed'); - } -} - -export function createEventBus(options: EventBusOptions): EventBus { - return new EventBus(options); -} +// Default export +export default createEventBus; \ No newline at end of file diff --git a/libs/event-bus/src/types.ts b/libs/event-bus/src/types.ts new file mode 100644 index 0000000..c835a1d --- /dev/null +++ b/libs/event-bus/src/types.ts @@ -0,0 +1,111 @@ +export interface EventBusMessage { + id: string; + type: string; + source: string; + timestamp: number; + data: T; + metadata?: Record; +} + +export interface EventHandler { + (message: EventBusMessage): Promise | void; +} + +export interface EventBusConfig { + serviceName: string; + redisConfig: { + host: string; + port: number; + password?: string; + db?: number; + }; + enableLogging?: boolean; +} + +export interface EventSubscription { + channel: string; + handler: EventHandler; +} + +// Trading-specific event types +export enum TradingEventType { + // Market data events + PRICE_UPDATE = 'market.price.update', + ORDERBOOK_UPDATE = 'market.orderbook.update', + TRADE_EXECUTED = 'market.trade.executed', + + // Order events + ORDER_CREATED = 'order.created', + ORDER_FILLED = 'order.filled', + ORDER_CANCELLED = 'order.cancelled', + ORDER_REJECTED = 'order.rejected', + + // Position events + POSITION_OPENED = 'position.opened', + POSITION_CLOSED = 'position.closed', + POSITION_UPDATED = 'position.updated', + + // Strategy events + STRATEGY_SIGNAL = 'strategy.signal', + STRATEGY_STARTED = 'strategy.started', + STRATEGY_STOPPED = 'strategy.stopped', + + // Risk events + RISK_LIMIT_BREACH = 'risk.limit.breach', + RISK_WARNING = 'risk.warning', + + // System events + SERVICE_STARTED = 'system.service.started', + SERVICE_STOPPED = 'system.service.stopped', + SERVICE_ERROR = 'system.service.error', +} + +// Event data types +export interface PriceUpdateEvent { + symbol: string; + price: number; + volume: number; + timestamp: number; +} + +export interface OrderEvent { + orderId: string; + symbol: string; + side: 'buy' | 'sell'; + quantity: number; + price?: number; + type: 'market' | 'limit' | 'stop' | 'stop_limit'; + status: string; + portfolioId: string; + strategyId?: string; +} + +export interface PositionEvent { + positionId: string; + symbol: string; + quantity: number; + averageCost: number; + currentPrice: number; + unrealizedPnl: number; + realizedPnl: number; + portfolioId: string; +} + +export interface StrategySignalEvent { + strategyId: string; + signal: 'buy' | 'sell' | 'hold'; + symbol: string; + confidence: number; + indicators: Record; + timestamp: number; +} + +export interface RiskEvent { + type: 'position_size' | 'daily_loss' | 'max_drawdown' | 'concentration'; + severity: 'warning' | 'critical'; + currentValue: number; + limit: number; + portfolioId?: string; + strategyId?: string; + message: string; +} \ No newline at end of file