removed streams for eventbus- i have no idea where its going

This commit is contained in:
Boki 2025-06-19 21:23:17 -04:00
parent 08c81197fe
commit 8c2f98e010
5 changed files with 570 additions and 595 deletions

View file

@ -0,0 +1,251 @@
import { EventEmitter } from 'eventemitter3';
import Redis from 'ioredis';
import { getLogger } from '@stock-bot/logger';
import type {
EventBusConfig,
EventBusMessage,
EventHandler,
EventSubscription,
} from './types';
/**
* Lightweight Event Bus for inter-service communication
* Uses Redis pub/sub for simple, real-time event distribution
*/
export class EventBus extends EventEmitter {
private publisher: Redis;
private subscriber: Redis;
private readonly serviceName: string;
private readonly logger: ReturnType<typeof getLogger>;
private subscriptions: Map<string, EventSubscription> = new Map();
private isConnected: boolean = false;
constructor(config: EventBusConfig) {
super();
this.serviceName = config.serviceName;
this.logger = getLogger(`event-bus:${this.serviceName}`);
// Create Redis connections
const redisOptions = {
host: config.redisConfig.host,
port: config.redisConfig.port,
password: config.redisConfig.password,
db: config.redisConfig.db || 0,
lazyConnect: false,
enableOfflineQueue: true,
reconnectOnError: (err: Error) => {
this.logger.error('Redis connection error:', err);
return true;
},
};
this.publisher = new Redis(redisOptions);
this.subscriber = new Redis(redisOptions);
this.setupRedisHandlers();
}
private setupRedisHandlers(): void {
// Publisher handlers
this.publisher.on('connect', () => {
this.logger.info('Publisher connected to Redis');
this.isConnected = true;
});
this.publisher.on('error', (error) => {
this.logger.error('Publisher Redis error:', error);
});
// Subscriber handlers
this.subscriber.on('connect', () => {
this.logger.info('Subscriber connected to Redis');
// Resubscribe to all channels on reconnect
this.resubscribeAll();
});
this.subscriber.on('error', (error) => {
this.logger.error('Subscriber Redis error:', error);
});
// Handle incoming messages
this.subscriber.on('message', this.handleMessage.bind(this));
}
private handleMessage(channel: string, message: string): void {
try {
const eventMessage: EventBusMessage = JSON.parse(message);
// Skip messages from self
if (eventMessage.source === this.serviceName) {
return;
}
// Extract event type from channel (remove 'events:' prefix)
const eventType = channel.replace('events:', '');
// Emit locally
this.emit(eventType, eventMessage);
// Call registered handler if exists
const subscription = this.subscriptions.get(eventType);
if (subscription?.handler) {
Promise.resolve(subscription.handler(eventMessage)).catch((error) => {
this.logger.error(`Handler error for event ${eventType}:`, error);
});
}
this.logger.debug(`Received event: ${eventType} from ${eventMessage.source}`);
} catch (error) {
this.logger.error('Failed to handle message:', { error, channel, message });
}
}
/**
* Publish an event
*/
async publish<T = any>(
type: string,
data: T,
metadata?: Record<string, any>
): Promise<void> {
const message: EventBusMessage<T> = {
id: this.generateId(),
type,
source: this.serviceName,
timestamp: Date.now(),
data,
metadata,
};
// Emit locally first
this.emit(type, message);
// Publish to Redis
if (this.isConnected) {
try {
const channel = `events:${type}`;
await this.publisher.publish(channel, JSON.stringify(message));
this.logger.debug(`Published event: ${type}`, { messageId: message.id });
} catch (error) {
this.logger.error(`Failed to publish event: ${type}`, { error, messageId: message.id });
throw error;
}
} else {
this.logger.warn(`Not connected to Redis, event ${type} only emitted locally`);
}
}
/**
* Subscribe to an event
*/
async subscribe<T = any>(eventType: string, handler: EventHandler<T>): Promise<void> {
// Register handler
this.subscriptions.set(eventType, { channel: `events:${eventType}`, handler });
// Add local listener
this.on(eventType, handler);
// Subscribe to Redis channel
try {
const channel = `events:${eventType}`;
await this.subscriber.subscribe(channel);
this.logger.debug(`Subscribed to event: ${eventType}`);
} catch (error) {
this.logger.error(`Failed to subscribe to event: ${eventType}`, error);
throw error;
}
}
/**
* Unsubscribe from an event
*/
async unsubscribe(eventType: string, handler?: EventHandler): Promise<void> {
// Remove specific handler or all handlers
if (handler) {
this.off(eventType, handler);
} else {
this.removeAllListeners(eventType);
}
// Remove from subscriptions
this.subscriptions.delete(eventType);
// Unsubscribe from Redis
try {
const channel = `events:${eventType}`;
await this.subscriber.unsubscribe(channel);
this.logger.debug(`Unsubscribed from event: ${eventType}`);
} catch (error) {
this.logger.error(`Failed to unsubscribe from event: ${eventType}`, error);
}
}
/**
* Resubscribe to all channels (used on reconnect)
*/
private async resubscribeAll(): Promise<void> {
for (const [eventType, subscription] of this.subscriptions.entries()) {
try {
await this.subscriber.subscribe(subscription.channel);
this.logger.debug(`Resubscribed to event: ${eventType}`);
} catch (error) {
this.logger.error(`Failed to resubscribe to event: ${eventType}`, error);
}
}
}
/**
* Wait for connection to be established
*/
async waitForConnection(timeout: number = 5000): Promise<void> {
const startTime = Date.now();
while (!this.isConnected && Date.now() - startTime < timeout) {
await new Promise(resolve => setTimeout(resolve, 100));
}
if (!this.isConnected) {
throw new Error(`Failed to connect to Redis within ${timeout}ms`);
}
}
/**
* Close all connections
*/
async close(): Promise<void> {
this.isConnected = false;
// Clear all subscriptions
this.subscriptions.clear();
this.removeAllListeners();
// Close Redis connections
await Promise.all([
this.publisher.quit(),
this.subscriber.quit(),
]);
this.logger.info('Event bus closed');
}
/**
* Generate unique message ID
*/
private generateId(): string {
return `${this.serviceName}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
/**
* Check if connected to Redis
*/
get connected(): boolean {
return this.isConnected;
}
/**
* Get service name
*/
get service(): string {
return this.serviceName;
}
}

View file

@ -1,599 +1,16 @@
import { EventEmitter } from 'eventemitter3';
import Redis from 'ioredis';
import { getLogger } from '@stock-bot/logger';
import { EventBus } from './event-bus';
import type { EventBusConfig } from './types';
export interface EventBusMessage {
id: string;
type: string;
source: string;
timestamp: number;
data: any;
metadata?: Record<string, any>;
/**
* Create a new event bus instance
*/
export function createEventBus(config: EventBusConfig): EventBus {
return new EventBus(config);
}
export interface EventHandler<T = any> {
(message: EventBusMessage & { data: T }): Promise<void> | void;
}
// Re-export everything
export { EventBus } from './event-bus';
export * from './types';
export interface RedisConfig {
host: string;
port: number;
password?: string;
db?: number;
maxRetriesPerRequest?: number;
}
export interface EventBusOptions {
serviceName: string;
enablePersistence?: boolean;
useStreams?: boolean;
maxRetries?: number;
retryDelay?: number;
redisConfig: RedisConfig;
}
export interface StreamConsumerInfo {
streamKey: string;
groupName: string;
consumerName: string;
handler: EventHandler;
isRunning: boolean;
}
export class EventBus extends EventEmitter {
private redis: Redis;
private subscriber?: Redis;
private serviceName: string;
private logger: any;
private enablePersistence: boolean;
private useStreams: boolean;
private maxRetries: number;
private retryDelay: number;
private consumers: Map<string, StreamConsumerInfo> = new Map();
private isRunning: boolean = true;
constructor(options: EventBusOptions) {
super();
this.serviceName = options.serviceName;
this.enablePersistence = options.enablePersistence ?? true;
this.useStreams = options.useStreams ?? true;
this.maxRetries = options.maxRetries ?? 3;
this.retryDelay = options.retryDelay ?? 1000;
this.logger = getLogger(`event-bus:${this.serviceName}`);
const { redisConfig } = options;
this.redis = new Redis({
host: redisConfig.host,
port: redisConfig.port,
password: redisConfig.password,
db: redisConfig.db || 0,
maxRetriesPerRequest: redisConfig.maxRetriesPerRequest || 3,
lazyConnect: false,
});
if (!this.useStreams) {
this.subscriber = new Redis({
host: redisConfig.host,
port: redisConfig.port,
password: redisConfig.password,
db: redisConfig.db || 0,
});
this.subscriber.on('message', this.handleRedisMessage.bind(this));
}
this.logger.info(
`Redis event bus initialized (mode: ${this.useStreams ? 'streams' : 'pub/sub'})`
);
}
private handleRedisMessage(channel: string, message: string) {
try {
const eventMessage: EventBusMessage = JSON.parse(message);
if (eventMessage.source === this.serviceName) {
return;
}
this.emit(eventMessage.type, eventMessage);
this.logger.debug(`Received event: ${eventMessage.type} from ${eventMessage.source}`);
} catch (error) {
this.logger.error('Failed to parse Redis message', { error, message });
}
}
async publish<T = any>(
type: string,
data: T,
metadata?: Record<string, any>
): Promise<string | null> {
const message: EventBusMessage = {
id: this.generateId(),
type,
source: this.serviceName,
timestamp: Date.now(),
data,
metadata,
};
this.emit(type, message);
if (this.redis && this.enablePersistence) {
try {
if (this.useStreams) {
const streamKey = `events:${type}`;
const messageId = await this.redis.xadd(
streamKey,
'*',
'id',
message.id,
'type',
message.type,
'source',
message.source,
'timestamp',
message.timestamp.toString(),
'data',
JSON.stringify(message.data),
'metadata',
JSON.stringify(message.metadata || {})
);
this.logger.debug(`Published event to stream: ${type}`, {
messageId,
streamId: messageId,
});
return messageId as string;
} else {
await this.redis.publish(`events:${type}`, JSON.stringify(message));
this.logger.debug(`Published event via pub/sub: ${type}`, { messageId: message.id });
return message.id;
}
} catch (error) {
this.logger.error(`Failed to publish event: ${type}`, { error, messageId: message.id });
throw error;
}
}
return null;
}
async subscribe<T = any>(eventType: string, handler: EventHandler<T>): Promise<void> {
this.on(eventType, handler);
if (this.redis && this.enablePersistence) {
try {
if (this.useStreams) {
await this.subscribeToStream(eventType, handler);
} else {
if (this.subscriber) {
await this.subscriber.subscribe(`events:${eventType}`);
this.logger.debug(`Subscribed to event: ${eventType}`);
}
}
} catch (error) {
this.logger.error(`Failed to subscribe to event: ${eventType}`, error);
throw error;
}
}
}
private async subscribeToStream<T = any>(
eventType: string,
handler: EventHandler<T>
): Promise<void> {
const streamKey = `events:${eventType}`;
const groupName = `${eventType}-consumers`;
const consumerName = `${this.serviceName}-${Date.now()}`;
try {
await this.redis.xgroup('CREATE', streamKey, groupName, '$', 'MKSTREAM');
this.logger.debug(`Created consumer group: ${groupName} for stream: ${streamKey}`);
} catch (error: any) {
if (error.message.includes('BUSYGROUP')) {
this.logger.debug(`Consumer group already exists: ${groupName}`);
} else {
throw error;
}
}
const consumerInfo: StreamConsumerInfo = {
streamKey,
groupName,
consumerName,
handler,
isRunning: true,
};
this.consumers.set(`${eventType}-${consumerName}`, consumerInfo);
this.startStreamConsumer(consumerInfo);
this.logger.debug(`Started stream consumer for: ${eventType}`);
}
private async startStreamConsumer(consumerInfo: StreamConsumerInfo): Promise<void> {
const { streamKey, groupName, consumerName, handler } = consumerInfo;
let retryCount = 0;
while (consumerInfo.isRunning && this.isRunning) {
try {
await this.claimPendingMessages(streamKey, groupName, consumerName, handler);
const messages = await this.redis.xreadgroup(
'GROUP',
groupName,
consumerName,
'COUNT',
10,
'BLOCK',
1000,
'STREAMS',
streamKey,
'>'
);
if (!messages || messages.length === 0) {
retryCount = 0;
continue;
}
for (const [stream, msgs] of messages as [string, [string, string[]][]][]) {
for (const [msgId, fields] of msgs) {
await this.processStreamMessage(msgId, fields, streamKey, groupName, handler);
}
}
retryCount = 0;
} catch (error: any) {
retryCount++;
if (error.message.includes('NOGROUP')) {
this.logger.warn(`Consumer group deleted, recreating: ${groupName}`);
try {
await this.redis.xgroup('CREATE', streamKey, groupName, '$', 'MKSTREAM');
retryCount = 0;
} catch (createError) {
this.logger.error('Failed to recreate consumer group:', { error: createError });
}
} else {
this.logger.error('Error reading from stream:', { error, retryCount });
}
if (retryCount >= this.maxRetries) {
this.logger.error(`Max retries reached for consumer ${consumerName}, stopping`);
consumerInfo.isRunning = false;
break;
}
const backoffDelay = Math.min(this.retryDelay * Math.pow(2, retryCount - 1), 30000);
await this.sleep(backoffDelay);
}
}
this.logger.info(`Stream consumer stopped: ${consumerName}`);
}
private async processStreamMessage(
msgId: string,
fields: string[],
streamKey: string,
groupName: string,
handler: EventHandler
): Promise<void> {
let retryCount = 0;
while (retryCount < this.maxRetries) {
try {
const message = this.parseStreamMessage(fields);
if (message.source === this.serviceName) {
await this.redis.xack(streamKey, groupName, msgId);
return;
}
await handler(message);
await this.redis.xack(streamKey, groupName, msgId);
this.logger.debug(`Processed stream message: ${msgId}`, {
eventType: message.type,
source: message.source,
});
return;
} catch (error) {
retryCount++;
this.logger.error(
`Error processing stream message ${msgId} (attempt ${retryCount}):`,
error
);
if (retryCount >= this.maxRetries) {
await this.moveToDeadLetterQueue(msgId, fields, streamKey, groupName, error);
return;
}
await this.sleep(this.retryDelay * retryCount);
}
}
}
private async claimPendingMessages(
streamKey: string,
groupName: string,
consumerName: string,
handler: EventHandler
): Promise<void> {
try {
const pendingMessages = (await this.redis.xpending(
streamKey,
groupName,
'-',
'+',
10
)) as any[];
if (!pendingMessages || pendingMessages.length === 0) {
return;
}
const oldMessages = pendingMessages.filter((msg: any[]) => {
return msg[2] > 60000;
});
if (oldMessages.length === 0) {
return;
}
const messageIds = oldMessages.map((msg: any[]) => msg[0]);
const claimedMessages = (await this.redis.xclaim(
streamKey,
groupName,
consumerName,
60000,
...messageIds
)) as [string, string[]][];
for (const [msgId, fields] of claimedMessages) {
await this.processStreamMessage(msgId, fields, streamKey, groupName, handler);
}
this.logger.debug(`Claimed and processed ${claimedMessages.length} pending messages`);
} catch (error) {
this.logger.error('Error claiming pending messages:', error);
}
}
private async moveToDeadLetterQueue(
msgId: string,
fields: string[],
streamKey: string,
groupName: string,
error: any
): Promise<void> {
try {
const dlqKey = `${streamKey}:dlq`;
const message = this.parseStreamMessage(fields);
await this.redis.xadd(
dlqKey,
'*',
'original_id',
msgId,
'original_stream',
streamKey,
'error',
(error as Error).message || 'Unknown error',
'timestamp',
Date.now().toString(),
'id',
message.id,
'type',
message.type,
'source',
message.source,
'data',
JSON.stringify(message.data),
'metadata',
JSON.stringify(message.metadata || {})
);
await this.redis.xack(streamKey, groupName, msgId);
this.logger.warn(`Moved message ${msgId} to dead letter queue: ${dlqKey}`, {
error: (error as Error).message,
});
} catch (dlqError) {
this.logger.error(`Failed to move message ${msgId} to dead letter queue:`, {
error: dlqError,
});
}
}
private parseStreamMessage(fields: string[]): EventBusMessage {
const fieldMap: Record<string, string> = {};
for (let i = 0; i < fields.length; i += 2) {
fieldMap[fields[i]] = fields[i + 1];
}
return {
id: fieldMap.id,
type: fieldMap.type || 'unknown',
source: fieldMap.source,
timestamp: parseInt(fieldMap.timestamp) || Date.now(),
data: fieldMap.data ? JSON.parse(fieldMap.data) : {},
metadata: fieldMap.metadata ? JSON.parse(fieldMap.metadata) : {},
};
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
async unsubscribe(eventType: string, handler?: EventHandler): Promise<void> {
if (handler) {
this.off(eventType, handler);
} else {
this.removeAllListeners(eventType);
}
if (this.enablePersistence) {
try {
if (this.useStreams) {
const consumersToStop = Array.from(this.consumers.entries()).filter(([key]) =>
key.startsWith(`${eventType}-`)
);
for (const [key, consumerInfo] of consumersToStop) {
consumerInfo.isRunning = false;
this.consumers.delete(key);
}
this.logger.debug(`Stopped stream consumers for: ${eventType}`);
} else {
if (this.subscriber) {
await this.subscriber.unsubscribe(`events:${eventType}`);
this.logger.debug(`Unsubscribed from event: ${eventType}`);
}
}
} catch (error) {
this.logger.error(`Failed to unsubscribe from event: ${eventType}`, error);
}
}
}
async close(): Promise<void> {
this.isRunning = false;
for (const consumerInfo of this.consumers.values()) {
consumerInfo.isRunning = false;
}
this.consumers.clear();
if (this.redis) {
await this.redis.quit();
}
if (this.subscriber) {
await this.subscriber.quit();
}
this.removeAllListeners();
this.logger.info('Event bus closed');
}
private generateId(): string {
return `${this.serviceName}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
async getStreamInfo(eventType: string): Promise<any> {
if (!this.useStreams) {
throw new Error('Stream info only available when using Redis Streams');
}
const streamKey = `events:${eventType}`;
try {
return await this.redis.xinfo('STREAM', streamKey);
} catch (error) {
this.logger.error(`Failed to get stream info for: ${eventType}`, error);
throw error;
}
}
async getStreamLength(eventType: string): Promise<number> {
if (!this.useStreams) {
throw new Error('Stream length only available when using Redis Streams');
}
const streamKey = `events:${eventType}`;
try {
return await this.redis.xlen(streamKey);
} catch (error) {
this.logger.error(`Failed to get stream length for: ${eventType}`, error);
return 0;
}
}
async readStreamHistory(
eventType: string,
startId: string = '-',
endId: string = '+',
count?: number
): Promise<EventBusMessage[]> {
if (!this.useStreams) {
throw new Error('Stream history only available when using Redis Streams');
}
const streamKey = `events:${eventType}`;
try {
let messages: [string, string[]][];
if (count) {
messages = (await this.redis.xrange(streamKey, startId, endId, 'COUNT', count)) as [
string,
string[],
][];
} else {
messages = (await this.redis.xrange(streamKey, startId, endId)) as [string, string[]][];
}
return messages.map(([id, fields]) => ({
...this.parseStreamMessage(fields),
id,
}));
} catch (error) {
this.logger.error(`Failed to read stream history for: ${eventType}`, error);
return [];
}
}
async trimStream(eventType: string, maxLength: number): Promise<number> {
if (!this.useStreams) {
throw new Error('Stream trimming only available when using Redis Streams');
}
const streamKey = `events:${eventType}`;
try {
return await this.redis.xtrim(streamKey, 'MAXLEN', '~', maxLength);
} catch (error) {
this.logger.error(`Failed to trim stream: ${eventType}`, error);
return 0;
}
}
async replayEventsFromTimestamp(
eventType: string,
fromTimestamp: number,
handler: EventHandler,
speed: number = 1
): Promise<void> {
if (!this.useStreams) {
throw new Error('Event replay only available when using Redis Streams');
}
const events = await this.readStreamHistory(eventType);
const filteredEvents = events.filter(event => event.timestamp >= fromTimestamp);
this.logger.info(`Replaying ${filteredEvents.length} events from ${new Date(fromTimestamp)}`);
for (let i = 0; i < filteredEvents.length; i++) {
const event = filteredEvents[i];
const nextEvent = filteredEvents[i + 1];
try {
await handler(event);
if (nextEvent && speed > 0) {
const delay = (nextEvent.timestamp - event.timestamp) / speed;
if (delay > 0) {
await this.sleep(Math.min(delay, 1000));
}
}
} catch (error) {
this.logger.error(`Error replaying event: ${event.id}`, error);
}
}
this.logger.info('Event replay completed');
}
}
export function createEventBus(options: EventBusOptions): EventBus {
return new EventBus(options);
}
// Default export
export default createEventBus;

111
libs/event-bus/src/types.ts Normal file
View file

@ -0,0 +1,111 @@
export interface EventBusMessage<T = any> {
id: string;
type: string;
source: string;
timestamp: number;
data: T;
metadata?: Record<string, any>;
}
export interface EventHandler<T = any> {
(message: EventBusMessage<T>): Promise<void> | void;
}
export interface EventBusConfig {
serviceName: string;
redisConfig: {
host: string;
port: number;
password?: string;
db?: number;
};
enableLogging?: boolean;
}
export interface EventSubscription {
channel: string;
handler: EventHandler;
}
// Trading-specific event types
export enum TradingEventType {
// Market data events
PRICE_UPDATE = 'market.price.update',
ORDERBOOK_UPDATE = 'market.orderbook.update',
TRADE_EXECUTED = 'market.trade.executed',
// Order events
ORDER_CREATED = 'order.created',
ORDER_FILLED = 'order.filled',
ORDER_CANCELLED = 'order.cancelled',
ORDER_REJECTED = 'order.rejected',
// Position events
POSITION_OPENED = 'position.opened',
POSITION_CLOSED = 'position.closed',
POSITION_UPDATED = 'position.updated',
// Strategy events
STRATEGY_SIGNAL = 'strategy.signal',
STRATEGY_STARTED = 'strategy.started',
STRATEGY_STOPPED = 'strategy.stopped',
// Risk events
RISK_LIMIT_BREACH = 'risk.limit.breach',
RISK_WARNING = 'risk.warning',
// System events
SERVICE_STARTED = 'system.service.started',
SERVICE_STOPPED = 'system.service.stopped',
SERVICE_ERROR = 'system.service.error',
}
// Event data types
export interface PriceUpdateEvent {
symbol: string;
price: number;
volume: number;
timestamp: number;
}
export interface OrderEvent {
orderId: string;
symbol: string;
side: 'buy' | 'sell';
quantity: number;
price?: number;
type: 'market' | 'limit' | 'stop' | 'stop_limit';
status: string;
portfolioId: string;
strategyId?: string;
}
export interface PositionEvent {
positionId: string;
symbol: string;
quantity: number;
averageCost: number;
currentPrice: number;
unrealizedPnl: number;
realizedPnl: number;
portfolioId: string;
}
export interface StrategySignalEvent {
strategyId: string;
signal: 'buy' | 'sell' | 'hold';
symbol: string;
confidence: number;
indicators: Record<string, number>;
timestamp: number;
}
export interface RiskEvent {
type: 'position_size' | 'daily_loss' | 'max_drawdown' | 'concentration';
severity: 'warning' | 'critical';
currentValue: number;
limit: number;
portfolioId?: string;
strategyId?: string;
message: string;
}