diff --git a/apps/strategy-service/src/backtesting/modes/event-mode.ts b/apps/strategy-service/src/backtesting/modes/event-mode.ts index 3555a72..c9efdbd 100644 --- a/apps/strategy-service/src/backtesting/modes/event-mode.ts +++ b/apps/strategy-service/src/backtesting/modes/event-mode.ts @@ -12,7 +12,7 @@ export interface BacktestConfig { commissionModel?: string; } -export class EventBacktestMode extends ExecutionMode { +export class EventMode extends ExecutionMode { name = 'event-driven'; private simulationTime: Date; private historicalData: Map = new Map(); diff --git a/apps/strategy-service/src/backtesting/modes/hybrid-mode.ts b/apps/strategy-service/src/backtesting/modes/hybrid-mode.ts index 2adfffc..a545f82 100644 --- a/apps/strategy-service/src/backtesting/modes/hybrid-mode.ts +++ b/apps/strategy-service/src/backtesting/modes/hybrid-mode.ts @@ -3,7 +3,7 @@ import { EventBus } from '@stock-bot/event-bus'; import { VectorEngine, VectorizedBacktestResult } from '@stock-bot/vector-engine'; import { DataFrame } from '@stock-bot/data-frame'; import { ExecutionMode, BacktestContext, BacktestResult } from '../framework/execution-mode'; -import EventMode from './event-mode'; +import { EventMode } from './event-mode'; import VectorizedMode from './vectorized-mode'; import { create } from 'domain'; diff --git a/apps/strategy-service/src/cli/index.ts b/apps/strategy-service/src/cli/index.ts index fccbd00..c69cbb5 100644 --- a/apps/strategy-service/src/cli/index.ts +++ b/apps/strategy-service/src/cli/index.ts @@ -8,8 +8,8 @@ import { program } from 'commander'; import { createLogger } from '@stock-bot/logger'; import { createEventBus } from '@stock-bot/event-bus'; import { BacktestContext } from '../framework/execution-mode'; -import LiveMode from '../backtesting/modes/live-mode'; -import EventMode from '../backtesting/modes/event-mode'; +import { LiveMode } from '../backtesting/modes/live-mode'; +import { EventMode } from '../backtesting/modes/event-mode'; import VectorizedMode from '../backtesting/modes/vectorized-mode'; import HybridMode from '../backtesting/modes/hybrid-mode'; @@ -17,6 +17,7 @@ const logger = createLogger('strategy-cli'); interface CLIBacktestConfig { strategy: string; + strategies: string; symbol: string; startDate: string; endDate: string; @@ -102,7 +103,6 @@ async function runBacktest(options: CLIBacktestConfig): Promise { } catch (error) { logger.error('Backtest failed', { error }); - console.error('Backtest failed:', error.message); process.exit(1); } } @@ -118,7 +118,7 @@ async function loadConfig(configPath: string): Promise { } } catch (error) { logger.error('Failed to load config', { configPath, error }); - throw new Error(`Failed to load config from ${configPath}: ${error.message}`); + throw new Error(`Failed to load config from ${configPath}: ${(error as Error).message}`); } } @@ -167,10 +167,9 @@ async function saveResults(result: any, outputPath: string): Promise { await Bun.write(outputPath + '.json', JSON.stringify(result, null, 2)); } - console.log(`\nResults saved to: ${outputPath}`); + logger.info(`\nResults saved to: ${outputPath}`); } catch (error) { logger.error('Failed to save results', { outputPath, error }); - console.error(`Failed to save results: ${error.message}`); } } @@ -231,18 +230,8 @@ program .option('--config ', 'Configuration file path') .option('-o, --output ', 'Output file path') .option('-v, --verbose', 'Verbose output') - .action(async (options) => { - await runBacktest({ - strategy: options.strategy, - symbol: options.symbol, - startDate: options.startDate, - endDate: options.endDate, - mode: options.mode, - initialCapital: parseFloat(options.initialCapital), - config: options.config, - output: options.output, - verbose: options.verbose - }); + .action(async (options: CLIBacktestConfig) => { + await runBacktest(options); }); program @@ -254,7 +243,7 @@ program .command('validate') .description('Validate a strategy') .requiredOption('-s, --strategy ', 'Strategy to validate') - .action(async (options) => { + .action(async (options: CLIBacktestConfig) => { await validateStrategy(options.strategy); }); @@ -268,7 +257,7 @@ program .option('-m, --mode ', 'Execution mode', 'vectorized') .option('-c, --initial-capital ', 'Initial capital', '10000') .option('-o, --output ', 'Output directory') - .action(async (options) => { + .action(async (options: CLIBacktestConfig) => { const strategies = options.strategies.split(',').map((s: string) => s.trim()); console.log(`Comparing strategies: ${strategies.join(', ')}`); @@ -278,16 +267,12 @@ program console.log(`\nRunning ${strategy}...`); try { await runBacktest({ + ...options, strategy, - symbol: options.symbol, - startDate: options.startDate, - endDate: options.endDate, - mode: options.mode, - initialCapital: parseFloat(options.initialCapital), output: options.output ? `${options.output}/${strategy}.json` : undefined }); } catch (error) { - console.error(`Failed to run ${strategy}:`, error.message); + console.error(`Failed to run ${strategy}:`, (error as Error).message); } } diff --git a/bun.lock b/bun.lock index 16ea79f..e01550d 100644 --- a/bun.lock +++ b/bun.lock @@ -151,6 +151,7 @@ "@stock-bot/event-bus": "workspace:*", "@stock-bot/logger": "workspace:*", "@stock-bot/utils": "workspace:*", + "commander": "^14.0.0", "eventemitter3": "^5.0.1", }, "devDependencies": { @@ -478,7 +479,7 @@ "combined-stream": ["combined-stream@1.0.8", "", { "dependencies": { "delayed-stream": "~1.0.0" } }, "sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg=="], - "commander": ["commander@2.20.3", "", {}, "sha512-GpVkmM8vF2vQUkj2LvZmD35JxeJOLCwJ9cUkugyk2nuhbv3+mJvpLYYt+0+USMxE+oj+ey/lJEnhZw75x/OMcQ=="], + "commander": ["commander@14.0.0", "", {}, "sha512-2uM9rYjPvyq39NwLRqaiLtWHyDC1FvryJDa2ATTVims5YAS4PupsEQsDvP14FqhFr0P49CYDugi59xaxJlTXRA=="], "commondir": ["commondir@1.0.1", "", {}, "sha512-W9pAhw0ja1Edb5GVdIF1mjZw/ASI0AlShXM83UUGe2DVr5TdAPEA1OA8m/g8zWp9x6On7gqufY+FatDbC3MDQg=="], @@ -1172,6 +1173,8 @@ "mongodb-memory-server-core/mongodb": ["mongodb@5.9.2", "", { "dependencies": { "bson": "^5.5.0", "mongodb-connection-string-url": "^2.6.0", "socks": "^2.7.1" }, "optionalDependencies": { "@mongodb-js/saslprep": "^1.1.0" }, "peerDependencies": { "@aws-sdk/credential-providers": "^3.188.0", "@mongodb-js/zstd": "^1.0.0", "kerberos": "^1.0.0 || ^2.0.0", "mongodb-client-encryption": ">=2.3.0 <3", "snappy": "^7.2.2" }, "optionalPeers": ["@aws-sdk/credential-providers", "@mongodb-js/zstd", "kerberos", "mongodb-client-encryption", "snappy"] }, "sha512-H60HecKO4Bc+7dhOv4sJlgvenK4fQNqqUIlXxZYQNbfEWSALGAwGoyJd/0Qwk4TttFXUOHJ2ZJQe/52ScaUwtQ=="], + "nearley/commander": ["commander@2.20.3", "", {}, "sha512-GpVkmM8vF2vQUkj2LvZmD35JxeJOLCwJ9cUkugyk2nuhbv3+mJvpLYYt+0+USMxE+oj+ey/lJEnhZw75x/OMcQ=="], + "node-fetch/whatwg-url": ["whatwg-url@5.0.0", "", { "dependencies": { "tr46": "~0.0.3", "webidl-conversions": "^3.0.0" } }, "sha512-saE57nupxk6v3HY35+jzBwYa0rKSy0XR8JSxZPwgLr7ys0IBzhGviA1/TUGJLmSVqs8pb9AnvICXEuOHLprYTw=="], "path-scurry/lru-cache": ["lru-cache@10.4.3", "", {}, "sha512-JNAzZcXrCt42VGLuYz0zfAzDfAvJWW6AfYlDBQyDV5DClI2m5sAmK+OIO7s59XfsRsWHp02jAJrRadPRGTt6SQ=="], diff --git a/libs/event-bus/src/index.ts b/libs/event-bus/src/index.ts index e69de29..fef31e9 100644 --- a/libs/event-bus/src/index.ts +++ b/libs/event-bus/src/index.ts @@ -0,0 +1,550 @@ +import { EventEmitter } from 'eventemitter3'; +import Redis from 'ioredis'; +import { createLogger } from '@stock-bot/logger'; +import { dragonflyConfig } from '@stock-bot/config'; + +export interface EventBusMessage { + id: string; + type: string; + source: string; + timestamp: number; + data: any; + metadata?: Record; +} + +export interface EventHandler { + (message: EventBusMessage & { data: T }): Promise | void; +} + +export interface EventBusOptions { + serviceName: string; + enablePersistence?: boolean; + useStreams?: boolean; + maxRetries?: number; + retryDelay?: number; +} + +export interface StreamConsumerInfo { + streamKey: string; + groupName: string; + consumerName: string; + handler: EventHandler; + isRunning: boolean; +} + +export class EventBus extends EventEmitter { + private redis: Redis; + private subscriber?: Redis; + private serviceName: string; + private logger: any; + private enablePersistence: boolean; + private useStreams: boolean; + private maxRetries: number; + private retryDelay: number; + private consumers: Map = new Map(); + private isRunning: boolean = true; + + constructor(options: EventBusOptions) { + super(); + this.serviceName = options.serviceName; + this.enablePersistence = options.enablePersistence ?? true; + this.useStreams = options.useStreams ?? true; + this.maxRetries = options.maxRetries ?? 3; + this.retryDelay = options.retryDelay ?? 1000; + this.logger = createLogger(`event-bus:${this.serviceName}`); + + this.redis = new Redis({ + host: dragonflyConfig.DRAGONFLY_HOST, + port: dragonflyConfig.DRAGONFLY_PORT, + password: dragonflyConfig.DRAGONFLY_PASSWORD, + db: dragonflyConfig.DRAGONFLY_DATABASE || 0, + maxRetriesPerRequest: dragonflyConfig.DRAGONFLY_MAX_RETRIES, + lazyConnect: true, + }); + + if (!this.useStreams) { + this.subscriber = new Redis({ + host: dragonflyConfig.DRAGONFLY_HOST, + port: dragonflyConfig.DRAGONFLY_PORT, + password: dragonflyConfig.DRAGONFLY_PASSWORD, + db: dragonflyConfig.DRAGONFLY_DATABASE || 0, + }); + this.subscriber.on('message', this.handleRedisMessage.bind(this)); + } + + this.logger.info(`Redis event bus initialized (mode: ${this.useStreams ? 'streams' : 'pub/sub'})`); + } + + private handleRedisMessage(channel: string, message: string) { + try { + const eventMessage: EventBusMessage = JSON.parse(message); + + if (eventMessage.source === this.serviceName) { + return; + } + + this.emit(eventMessage.type, eventMessage); + this.logger.debug(`Received event: ${eventMessage.type} from ${eventMessage.source}`); + } catch (error) { + this.logger.error('Failed to parse Redis message', { error, message }); + } + } + + async publish(type: string, data: T, metadata?: Record): Promise { + const message: EventBusMessage = { + id: this.generateId(), + type, + source: this.serviceName, + timestamp: Date.now(), + data, + metadata, + }; + + this.emit(type, message); + + if (this.redis && this.enablePersistence) { + try { + if (this.useStreams) { + const streamKey = `events:${type}`; + const messageId = await this.redis.xadd( + streamKey, + '*', + 'id', message.id, + 'type', message.type, + 'source', message.source, + 'timestamp', message.timestamp.toString(), + 'data', JSON.stringify(message.data), + 'metadata', JSON.stringify(message.metadata || {}) + ); + + this.logger.debug(`Published event to stream: ${type}`, { + messageId, + streamId: messageId + }); + return messageId as string; + } else { + await this.redis.publish(`events:${type}`, JSON.stringify(message)); + this.logger.debug(`Published event via pub/sub: ${type}`, { messageId: message.id }); + return message.id; + } + } catch (error) { + this.logger.error(`Failed to publish event: ${type}`, { error, messageId: message.id }); + throw error; + } + } + + return null; + } + + async subscribe(eventType: string, handler: EventHandler): Promise { + this.on(eventType, handler); + + if (this.redis && this.enablePersistence) { + try { + if (this.useStreams) { + await this.subscribeToStream(eventType, handler); + } else { + if (this.subscriber) { + await this.subscriber.subscribe(`events:${eventType}`); + this.logger.debug(`Subscribed to event: ${eventType}`); + } + } + } catch (error) { + this.logger.error(`Failed to subscribe to event: ${eventType}`, { error }); + throw error; + } + } + } + + private async subscribeToStream(eventType: string, handler: EventHandler): Promise { + const streamKey = `events:${eventType}`; + const groupName = `${eventType}-consumers`; + const consumerName = `${this.serviceName}-${Date.now()}`; + + try { + await this.redis.xgroup('CREATE', streamKey, groupName, '$', 'MKSTREAM'); + this.logger.debug(`Created consumer group: ${groupName} for stream: ${streamKey}`); + } catch (error: any) { + if (error.message.includes('BUSYGROUP')) { + this.logger.debug(`Consumer group already exists: ${groupName}`); + } else { + throw error; + } + } + + const consumerInfo: StreamConsumerInfo = { + streamKey, + groupName, + consumerName, + handler, + isRunning: true, + }; + + this.consumers.set(`${eventType}-${consumerName}`, consumerInfo); + this.startStreamConsumer(consumerInfo); + this.logger.debug(`Started stream consumer for: ${eventType}`); + } + + private async startStreamConsumer(consumerInfo: StreamConsumerInfo): Promise { + const { streamKey, groupName, consumerName, handler } = consumerInfo; + let retryCount = 0; + + while (consumerInfo.isRunning && this.isRunning) { + try { + await this.claimPendingMessages(streamKey, groupName, consumerName, handler); + + const messages = await this.redis.xreadgroup( + 'GROUP', groupName, consumerName, + 'COUNT', 10, + 'BLOCK', 1000, + 'STREAMS', streamKey, '>' + ); + + if (!messages || messages.length === 0) { + retryCount = 0; + continue; + } + + for (const [stream, msgs] of messages as [string, [string, string[]][]][]) { + for (const [msgId, fields] of msgs) { + await this.processStreamMessage(msgId, fields, streamKey, groupName, handler); + } + } + + retryCount = 0; + } catch (error: any) { + retryCount++; + + if (error.message.includes('NOGROUP')) { + this.logger.warn(`Consumer group deleted, recreating: ${groupName}`); + try { + await this.redis.xgroup('CREATE', streamKey, groupName, '$', 'MKSTREAM'); + retryCount = 0; + } catch (createError) { + this.logger.error('Failed to recreate consumer group:', { error: createError }); + } + } else { + this.logger.error('Error reading from stream:', { error, retryCount }); + } + + if (retryCount >= this.maxRetries) { + this.logger.error(`Max retries reached for consumer ${consumerName}, stopping`); + consumerInfo.isRunning = false; + break; + } + + const backoffDelay = Math.min(this.retryDelay * Math.pow(2, retryCount - 1), 30000); + await this.sleep(backoffDelay); + } + } + + this.logger.info(`Stream consumer stopped: ${consumerName}`); + } + + private async processStreamMessage( + msgId: string, + fields: string[], + streamKey: string, + groupName: string, + handler: EventHandler + ): Promise { + let retryCount = 0; + + while (retryCount < this.maxRetries) { + try { + const message = this.parseStreamMessage(fields); + + if (message.source === this.serviceName) { + await this.redis.xack(streamKey, groupName, msgId); + return; + } + + await handler(message); + await this.redis.xack(streamKey, groupName, msgId); + + this.logger.debug(`Processed stream message: ${msgId}`, { + eventType: message.type, + source: message.source + }); + + return; + + } catch (error) { + retryCount++; + this.logger.error(`Error processing stream message ${msgId} (attempt ${retryCount}):`, { error }); + + if (retryCount >= this.maxRetries) { + await this.moveToDeadLetterQueue(msgId, fields, streamKey, groupName, error); + return; + } + + await this.sleep(this.retryDelay * retryCount); + } + } + } + + private async claimPendingMessages( + streamKey: string, + groupName: string, + consumerName: string, + handler: EventHandler + ): Promise { + try { + const pendingMessages = await this.redis.xpending( + streamKey, + groupName, + '-', + '+', + 10 + ) as any[]; + + if (!pendingMessages || pendingMessages.length === 0) { + return; + } + + const oldMessages = pendingMessages.filter((msg: any[]) => { + return msg[2] > 60000; + }); + + if (oldMessages.length === 0) { + return; + } + + const messageIds = oldMessages.map((msg: any[]) => msg[0]); + const claimedMessages = await this.redis.xclaim( + streamKey, + groupName, + consumerName, + 60000, + ...messageIds + ) as [string, string[]][]; + + for (const [msgId, fields] of claimedMessages) { + await this.processStreamMessage(msgId, fields, streamKey, groupName, handler); + } + + this.logger.debug(`Claimed and processed ${claimedMessages.length} pending messages`); + } catch (error) { + this.logger.error('Error claiming pending messages:', { error }); + } + } + + private async moveToDeadLetterQueue( + msgId: string, + fields: string[], + streamKey: string, + groupName: string, + error: any + ): Promise { + try { + const dlqKey = `${streamKey}:dlq`; + const message = this.parseStreamMessage(fields); + + await this.redis.xadd( + dlqKey, + '*', + 'original_id', msgId, + 'original_stream', streamKey, + 'error', (error as Error).message || 'Unknown error', + 'timestamp', Date.now().toString(), + 'id', message.id, + 'type', message.type, + 'source', message.source, + 'data', JSON.stringify(message.data), + 'metadata', JSON.stringify(message.metadata || {}) + ); + + await this.redis.xack(streamKey, groupName, msgId); + + this.logger.warn(`Moved message ${msgId} to dead letter queue: ${dlqKey}`, { error: (error as Error).message }); + } catch (dlqError) { + this.logger.error(`Failed to move message ${msgId} to dead letter queue:`, { error: dlqError }); + } + } + + private parseStreamMessage(fields: string[]): EventBusMessage { + const fieldMap: Record = {}; + + for (let i = 0; i < fields.length; i += 2) { + fieldMap[fields[i]] = fields[i + 1]; + } + + return { + id: fieldMap.id, + type: fieldMap.type || 'unknown', + source: fieldMap.source, + timestamp: parseInt(fieldMap.timestamp) || Date.now(), + data: fieldMap.data ? JSON.parse(fieldMap.data) : {}, + metadata: fieldMap.metadata ? JSON.parse(fieldMap.metadata) : {}, + }; + } + + private sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); + } + + async unsubscribe(eventType: string, handler?: EventHandler): Promise { + if (handler) { + this.off(eventType, handler); + } else { + this.removeAllListeners(eventType); + } + + if (this.enablePersistence) { + try { + if (this.useStreams) { + const consumersToStop = Array.from(this.consumers.entries()) + .filter(([key]) => key.startsWith(`${eventType}-`)); + + for (const [key, consumerInfo] of consumersToStop) { + consumerInfo.isRunning = false; + this.consumers.delete(key); + } + + this.logger.debug(`Stopped stream consumers for: ${eventType}`); + } else { + if (this.subscriber) { + await this.subscriber.unsubscribe(`events:${eventType}`); + this.logger.debug(`Unsubscribed from event: ${eventType}`); + } + } + } catch (error) { + this.logger.error(`Failed to unsubscribe from event: ${eventType}`, { error }); + } + } + } + + async close(): Promise { + this.isRunning = false; + + for (const consumerInfo of this.consumers.values()) { + consumerInfo.isRunning = false; + } + this.consumers.clear(); + + if (this.redis) { + await this.redis.quit(); + } + if (this.subscriber) { + await this.subscriber.quit(); + } + + this.removeAllListeners(); + this.logger.info('Event bus closed'); + } + + private generateId(): string { + return `${this.serviceName}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; + } + + async getStreamInfo(eventType: string): Promise { + if (!this.useStreams) { + throw new Error('Stream info only available when using Redis Streams'); + } + + const streamKey = `events:${eventType}`; + try { + return await this.redis.xinfo('STREAM', streamKey); + } catch (error) { + this.logger.error(`Failed to get stream info for: ${eventType}`, { error }); + throw error; + } + } + + async getStreamLength(eventType: string): Promise { + if (!this.useStreams) { + throw new Error('Stream length only available when using Redis Streams'); + } + + const streamKey = `events:${eventType}`; + try { + return await this.redis.xlen(streamKey); + } catch (error) { + this.logger.error(`Failed to get stream length for: ${eventType}`, { error }); + return 0; + } + } + async readStreamHistory( + eventType: string, + startId: string = '-', + endId: string = '+', + count?: number + ): Promise { + if (!this.useStreams) { + throw new Error('Stream history only available when using Redis Streams'); + } + + const streamKey = `events:${eventType}`; + try { + let messages: [string, string[]][]; + + if (count) { + messages = await this.redis.xrange(streamKey, startId, endId, 'COUNT', count) as [string, string[]][]; + } else { + messages = await this.redis.xrange(streamKey, startId, endId) as [string, string[]][]; + } + + return messages.map(([id, fields]) => ({ + ...this.parseStreamMessage(fields), + id + })); + } catch (error) { + this.logger.error(`Failed to read stream history for: ${eventType}`, { error }); + return []; + } + } + + async trimStream(eventType: string, maxLength: number): Promise { + if (!this.useStreams) { + throw new Error('Stream trimming only available when using Redis Streams'); + } + + const streamKey = `events:${eventType}`; + try { + return await this.redis.xtrim(streamKey, 'MAXLEN', '~', maxLength); + } catch (error) { + this.logger.error(`Failed to trim stream: ${eventType}`, { error }); + return 0; + } + } + + async replayEventsFromTimestamp( + eventType: string, + fromTimestamp: number, + handler: EventHandler, + speed: number = 1 + ): Promise { + if (!this.useStreams) { + throw new Error('Event replay only available when using Redis Streams'); + } + + const events = await this.readStreamHistory(eventType); + const filteredEvents = events.filter(event => event.timestamp >= fromTimestamp); + + this.logger.info(`Replaying ${filteredEvents.length} events from ${new Date(fromTimestamp)}`); + + for (let i = 0; i < filteredEvents.length; i++) { + const event = filteredEvents[i]; + const nextEvent = filteredEvents[i + 1]; + + try { + await handler(event); + + if (nextEvent && speed > 0) { + const delay = (nextEvent.timestamp - event.timestamp) / speed; + if (delay > 0) { + await this.sleep(Math.min(delay, 1000)); + } + } + } catch (error) { + this.logger.error(`Error replaying event: ${event.id}`, { error }); + } + } + + this.logger.info('Event replay completed'); + } +} + +export function createEventBus(options: EventBusOptions): EventBus { + return new EventBus(options); +} diff --git a/libs/strategy-engine/package.json b/libs/strategy-engine/package.json index 300a072..f6c1590 100644 --- a/libs/strategy-engine/package.json +++ b/libs/strategy-engine/package.json @@ -10,11 +10,12 @@ "test": "bun test" }, "dependencies": { - "@stock-bot/logger": "workspace:*", "@stock-bot/config": "workspace:*", - "@stock-bot/utils": "workspace:*", - "@stock-bot/event-bus": "workspace:*", "@stock-bot/data-frame": "workspace:*", + "@stock-bot/event-bus": "workspace:*", + "@stock-bot/logger": "workspace:*", + "@stock-bot/utils": "workspace:*", + "commander": "^14.0.0", "eventemitter3": "^5.0.1" }, "devDependencies": { diff --git a/libs/strategy-engine/src/index.ts b/libs/strategy-engine/src/index.ts index 1623efa..388095e 100644 --- a/libs/strategy-engine/src/index.ts +++ b/libs/strategy-engine/src/index.ts @@ -143,7 +143,7 @@ export abstract class BaseStrategy extends EventEmitter { // Utility methods protected async emitSignal(signal: TradingSignal): Promise { - await this.eventBus.publishStrategySignal(this.config.id, signal); + await this.eventBus.publish(this.config.id, signal); this.emit('signal', signal); this.logger.info('Signal generated', { signal: signal.type,