finished initial event-bus with streams

This commit is contained in:
Bojan Kucera 2025-06-04 23:50:34 -04:00
parent 5cd24ade09
commit fc4cf71a70

View file

@ -1,169 +0,0 @@
import { EventEmitter } from 'eventemitter3';
import Redis from 'ioredis';
import { createLogger } from '@stock-bot/logger';
import { dragonflyConfig } from '@stock-bot/config';
export interface EventBusMessage {
id: string;
type: string;
source: string;
timestamp: number;
data: any;
metadata?: Record<string, any>;
}
export interface EventHandler<T = any> {
(message: EventBusMessage & { data: T }): Promise<void> | void;
}
export interface EventBusOptions {
serviceName: string;
enablePersistence?: boolean;
}
export class EventBus extends EventEmitter {
private redis?: Redis;
private subscriber?: Redis;
private serviceName: string;
private logger
private enablePersistence: boolean;
constructor(options: EventBusOptions) {
super();
this.serviceName = options.serviceName;
this.enablePersistence = options.enablePersistence ?? true;
this.logger = createLogger(`event-bus:${this.serviceName}`);
this.redis = new Redis({
host: dragonflyConfig.DRAGONFLY_HOST,
port: dragonflyConfig.DRAGONFLY_PORT,
password: dragonflyConfig.DRAGONFLY_PASSWORD,
db: dragonflyConfig.DRAGONFLY_DATABASE || 0,
maxRetriesPerRequest: dragonflyConfig.DRAGONFLY_MAX_RETRIES,
});
this.subscriber = new Redis({
host: dragonflyConfig.DRAGONFLY_HOST,
port: dragonflyConfig.DRAGONFLY_PORT,
password: dragonflyConfig.DRAGONFLY_PASSWORD,
db: dragonflyConfig.DRAGONFLY_DATABASE || 0,
});
this.subscriber.on('message', this.handleRedisMessage.bind(this));
this.logger.info('Redis event bus initialized');
}
private handleRedisMessage(channel: string, message: string) {
try {
const eventMessage: EventBusMessage = JSON.parse(message);
// Don't process our own messages
if (eventMessage.source === this.serviceName) {
return;
}
this.emit(eventMessage.type, eventMessage);
this.logger.debug(`Received event: ${eventMessage.type} from ${eventMessage.source}`);
} catch (error) {
this.logger.error('Failed to parse Redis message', { error, message });
}
}
async publish<T = any>(type: string, data: T, metadata?: Record<string, any>): Promise<void> {
const message: EventBusMessage = {
id: this.generateId(),
type,
source: this.serviceName,
timestamp: Date.now(),
data,
metadata,
};
// Emit locally first
this.emit(type, message);
// Publish to Redis if available
if (this.redis && this.enablePersistence) {
try {
await this.redis.publish(`events:${type}`, JSON.stringify(message));
this.logger.debug(`Published event: ${type}`, { messageId: message.id });
} catch (error) {
this.logger.error(`Failed to publish event: ${type}`, { error, messageId: message.id });
}
}
}
async subscribe<T = any>(eventType: string, handler: EventHandler<T>): Promise<void> {
// Subscribe locally
this.on(eventType, handler);
// Subscribe to Redis channel if available
if (this.subscriber && this.enablePersistence) {
try {
await this.subscriber.subscribe(`events:${eventType}`);
this.logger.debug(`Subscribed to event: ${eventType}`);
} catch (error) {
this.logger.error(`Failed to subscribe to event: ${eventType}`, { error });
}
}
}
async unsubscribe(eventType: string, handler?: EventHandler): Promise<void> {
if (handler) {
this.off(eventType, handler);
} else {
this.removeAllListeners(eventType);
}
if (this.subscriber && this.enablePersistence) {
try {
await this.subscriber.unsubscribe(`events:${eventType}`);
this.logger.debug(`Unsubscribed from event: ${eventType}`);
} catch (error) {
this.logger.error(`Failed to unsubscribe from event: ${eventType}`, { error });
}
}
}
async close(): Promise<void> {
if (this.redis) {
await this.redis.quit();
}
if (this.subscriber) {
await this.subscriber.quit();
}
this.removeAllListeners();
this.logger.info('Event bus closed');
}
private generateId(): string {
return `${this.serviceName}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
// Utility methods for common event patterns
async publishMarketData(symbol: string, data: any): Promise<void> {
await this.publish('market.data', { symbol, ...data });
}
async publishOrderUpdate(orderId: string, status: string, data: any): Promise<void> {
await this.publish('order.update', { orderId, status, ...data });
}
async publishStrategySignal(strategyId: string, signal: any): Promise<void> {
await this.publish('strategy.signal', { strategyId, ...signal });
}
async publishPortfolioUpdate(portfolioId: string, data: any): Promise<void> {
await this.publish('portfolio.update', { portfolioId, ...data });
}
async publishBacktestUpdate(backtestId: string, progress: number, data?: any): Promise<void> {
await this.publish('backtest.update', { backtestId, progress, ...data });
}
}
// Factory function for easy initialization
export function createEventBus(options: EventBusOptions): EventBus {
return new EventBus(options);
}