work on strategy-engine
This commit is contained in:
parent
fc4cf71a70
commit
eee6135867
7 changed files with 572 additions and 33 deletions
|
|
@ -12,7 +12,7 @@ export interface BacktestConfig {
|
|||
commissionModel?: string;
|
||||
}
|
||||
|
||||
export class EventBacktestMode extends ExecutionMode {
|
||||
export class EventMode extends ExecutionMode {
|
||||
name = 'event-driven';
|
||||
private simulationTime: Date;
|
||||
private historicalData: Map<string, MarketData[]> = new Map();
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@ import { EventBus } from '@stock-bot/event-bus';
|
|||
import { VectorEngine, VectorizedBacktestResult } from '@stock-bot/vector-engine';
|
||||
import { DataFrame } from '@stock-bot/data-frame';
|
||||
import { ExecutionMode, BacktestContext, BacktestResult } from '../framework/execution-mode';
|
||||
import EventMode from './event-mode';
|
||||
import { EventMode } from './event-mode';
|
||||
import VectorizedMode from './vectorized-mode';
|
||||
import { create } from 'domain';
|
||||
|
||||
|
|
|
|||
|
|
@ -8,8 +8,8 @@ import { program } from 'commander';
|
|||
import { createLogger } from '@stock-bot/logger';
|
||||
import { createEventBus } from '@stock-bot/event-bus';
|
||||
import { BacktestContext } from '../framework/execution-mode';
|
||||
import LiveMode from '../backtesting/modes/live-mode';
|
||||
import EventMode from '../backtesting/modes/event-mode';
|
||||
import { LiveMode } from '../backtesting/modes/live-mode';
|
||||
import { EventMode } from '../backtesting/modes/event-mode';
|
||||
import VectorizedMode from '../backtesting/modes/vectorized-mode';
|
||||
import HybridMode from '../backtesting/modes/hybrid-mode';
|
||||
|
||||
|
|
@ -17,6 +17,7 @@ const logger = createLogger('strategy-cli');
|
|||
|
||||
interface CLIBacktestConfig {
|
||||
strategy: string;
|
||||
strategies: string;
|
||||
symbol: string;
|
||||
startDate: string;
|
||||
endDate: string;
|
||||
|
|
@ -102,7 +103,6 @@ async function runBacktest(options: CLIBacktestConfig): Promise<void> {
|
|||
|
||||
} catch (error) {
|
||||
logger.error('Backtest failed', { error });
|
||||
console.error('Backtest failed:', error.message);
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
|
|
@ -118,7 +118,7 @@ async function loadConfig(configPath: string): Promise<any> {
|
|||
}
|
||||
} catch (error) {
|
||||
logger.error('Failed to load config', { configPath, error });
|
||||
throw new Error(`Failed to load config from ${configPath}: ${error.message}`);
|
||||
throw new Error(`Failed to load config from ${configPath}: ${(error as Error).message}`);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -167,10 +167,9 @@ async function saveResults(result: any, outputPath: string): Promise<void> {
|
|||
await Bun.write(outputPath + '.json', JSON.stringify(result, null, 2));
|
||||
}
|
||||
|
||||
console.log(`\nResults saved to: ${outputPath}`);
|
||||
logger.info(`\nResults saved to: ${outputPath}`);
|
||||
} catch (error) {
|
||||
logger.error('Failed to save results', { outputPath, error });
|
||||
console.error(`Failed to save results: ${error.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -231,18 +230,8 @@ program
|
|||
.option('--config <path>', 'Configuration file path')
|
||||
.option('-o, --output <path>', 'Output file path')
|
||||
.option('-v, --verbose', 'Verbose output')
|
||||
.action(async (options) => {
|
||||
await runBacktest({
|
||||
strategy: options.strategy,
|
||||
symbol: options.symbol,
|
||||
startDate: options.startDate,
|
||||
endDate: options.endDate,
|
||||
mode: options.mode,
|
||||
initialCapital: parseFloat(options.initialCapital),
|
||||
config: options.config,
|
||||
output: options.output,
|
||||
verbose: options.verbose
|
||||
});
|
||||
.action(async (options: CLIBacktestConfig) => {
|
||||
await runBacktest(options);
|
||||
});
|
||||
|
||||
program
|
||||
|
|
@ -254,7 +243,7 @@ program
|
|||
.command('validate')
|
||||
.description('Validate a strategy')
|
||||
.requiredOption('-s, --strategy <strategy>', 'Strategy to validate')
|
||||
.action(async (options) => {
|
||||
.action(async (options: CLIBacktestConfig) => {
|
||||
await validateStrategy(options.strategy);
|
||||
});
|
||||
|
||||
|
|
@ -268,7 +257,7 @@ program
|
|||
.option('-m, --mode <mode>', 'Execution mode', 'vectorized')
|
||||
.option('-c, --initial-capital <amount>', 'Initial capital', '10000')
|
||||
.option('-o, --output <path>', 'Output directory')
|
||||
.action(async (options) => {
|
||||
.action(async (options: CLIBacktestConfig) => {
|
||||
const strategies = options.strategies.split(',').map((s: string) => s.trim());
|
||||
console.log(`Comparing strategies: ${strategies.join(', ')}`);
|
||||
|
||||
|
|
@ -278,16 +267,12 @@ program
|
|||
console.log(`\nRunning ${strategy}...`);
|
||||
try {
|
||||
await runBacktest({
|
||||
...options,
|
||||
strategy,
|
||||
symbol: options.symbol,
|
||||
startDate: options.startDate,
|
||||
endDate: options.endDate,
|
||||
mode: options.mode,
|
||||
initialCapital: parseFloat(options.initialCapital),
|
||||
output: options.output ? `${options.output}/${strategy}.json` : undefined
|
||||
});
|
||||
} catch (error) {
|
||||
console.error(`Failed to run ${strategy}:`, error.message);
|
||||
console.error(`Failed to run ${strategy}:`, (error as Error).message);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
5
bun.lock
5
bun.lock
|
|
@ -151,6 +151,7 @@
|
|||
"@stock-bot/event-bus": "workspace:*",
|
||||
"@stock-bot/logger": "workspace:*",
|
||||
"@stock-bot/utils": "workspace:*",
|
||||
"commander": "^14.0.0",
|
||||
"eventemitter3": "^5.0.1",
|
||||
},
|
||||
"devDependencies": {
|
||||
|
|
@ -478,7 +479,7 @@
|
|||
|
||||
"combined-stream": ["combined-stream@1.0.8", "", { "dependencies": { "delayed-stream": "~1.0.0" } }, "sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg=="],
|
||||
|
||||
"commander": ["commander@2.20.3", "", {}, "sha512-GpVkmM8vF2vQUkj2LvZmD35JxeJOLCwJ9cUkugyk2nuhbv3+mJvpLYYt+0+USMxE+oj+ey/lJEnhZw75x/OMcQ=="],
|
||||
"commander": ["commander@14.0.0", "", {}, "sha512-2uM9rYjPvyq39NwLRqaiLtWHyDC1FvryJDa2ATTVims5YAS4PupsEQsDvP14FqhFr0P49CYDugi59xaxJlTXRA=="],
|
||||
|
||||
"commondir": ["commondir@1.0.1", "", {}, "sha512-W9pAhw0ja1Edb5GVdIF1mjZw/ASI0AlShXM83UUGe2DVr5TdAPEA1OA8m/g8zWp9x6On7gqufY+FatDbC3MDQg=="],
|
||||
|
||||
|
|
@ -1172,6 +1173,8 @@
|
|||
|
||||
"mongodb-memory-server-core/mongodb": ["mongodb@5.9.2", "", { "dependencies": { "bson": "^5.5.0", "mongodb-connection-string-url": "^2.6.0", "socks": "^2.7.1" }, "optionalDependencies": { "@mongodb-js/saslprep": "^1.1.0" }, "peerDependencies": { "@aws-sdk/credential-providers": "^3.188.0", "@mongodb-js/zstd": "^1.0.0", "kerberos": "^1.0.0 || ^2.0.0", "mongodb-client-encryption": ">=2.3.0 <3", "snappy": "^7.2.2" }, "optionalPeers": ["@aws-sdk/credential-providers", "@mongodb-js/zstd", "kerberos", "mongodb-client-encryption", "snappy"] }, "sha512-H60HecKO4Bc+7dhOv4sJlgvenK4fQNqqUIlXxZYQNbfEWSALGAwGoyJd/0Qwk4TttFXUOHJ2ZJQe/52ScaUwtQ=="],
|
||||
|
||||
"nearley/commander": ["commander@2.20.3", "", {}, "sha512-GpVkmM8vF2vQUkj2LvZmD35JxeJOLCwJ9cUkugyk2nuhbv3+mJvpLYYt+0+USMxE+oj+ey/lJEnhZw75x/OMcQ=="],
|
||||
|
||||
"node-fetch/whatwg-url": ["whatwg-url@5.0.0", "", { "dependencies": { "tr46": "~0.0.3", "webidl-conversions": "^3.0.0" } }, "sha512-saE57nupxk6v3HY35+jzBwYa0rKSy0XR8JSxZPwgLr7ys0IBzhGviA1/TUGJLmSVqs8pb9AnvICXEuOHLprYTw=="],
|
||||
|
||||
"path-scurry/lru-cache": ["lru-cache@10.4.3", "", {}, "sha512-JNAzZcXrCt42VGLuYz0zfAzDfAvJWW6AfYlDBQyDV5DClI2m5sAmK+OIO7s59XfsRsWHp02jAJrRadPRGTt6SQ=="],
|
||||
|
|
|
|||
|
|
@ -0,0 +1,550 @@
|
|||
import { EventEmitter } from 'eventemitter3';
|
||||
import Redis from 'ioredis';
|
||||
import { createLogger } from '@stock-bot/logger';
|
||||
import { dragonflyConfig } from '@stock-bot/config';
|
||||
|
||||
export interface EventBusMessage {
|
||||
id: string;
|
||||
type: string;
|
||||
source: string;
|
||||
timestamp: number;
|
||||
data: any;
|
||||
metadata?: Record<string, any>;
|
||||
}
|
||||
|
||||
export interface EventHandler<T = any> {
|
||||
(message: EventBusMessage & { data: T }): Promise<void> | void;
|
||||
}
|
||||
|
||||
export interface EventBusOptions {
|
||||
serviceName: string;
|
||||
enablePersistence?: boolean;
|
||||
useStreams?: boolean;
|
||||
maxRetries?: number;
|
||||
retryDelay?: number;
|
||||
}
|
||||
|
||||
export interface StreamConsumerInfo {
|
||||
streamKey: string;
|
||||
groupName: string;
|
||||
consumerName: string;
|
||||
handler: EventHandler;
|
||||
isRunning: boolean;
|
||||
}
|
||||
|
||||
export class EventBus extends EventEmitter {
|
||||
private redis: Redis;
|
||||
private subscriber?: Redis;
|
||||
private serviceName: string;
|
||||
private logger: any;
|
||||
private enablePersistence: boolean;
|
||||
private useStreams: boolean;
|
||||
private maxRetries: number;
|
||||
private retryDelay: number;
|
||||
private consumers: Map<string, StreamConsumerInfo> = new Map();
|
||||
private isRunning: boolean = true;
|
||||
|
||||
constructor(options: EventBusOptions) {
|
||||
super();
|
||||
this.serviceName = options.serviceName;
|
||||
this.enablePersistence = options.enablePersistence ?? true;
|
||||
this.useStreams = options.useStreams ?? true;
|
||||
this.maxRetries = options.maxRetries ?? 3;
|
||||
this.retryDelay = options.retryDelay ?? 1000;
|
||||
this.logger = createLogger(`event-bus:${this.serviceName}`);
|
||||
|
||||
this.redis = new Redis({
|
||||
host: dragonflyConfig.DRAGONFLY_HOST,
|
||||
port: dragonflyConfig.DRAGONFLY_PORT,
|
||||
password: dragonflyConfig.DRAGONFLY_PASSWORD,
|
||||
db: dragonflyConfig.DRAGONFLY_DATABASE || 0,
|
||||
maxRetriesPerRequest: dragonflyConfig.DRAGONFLY_MAX_RETRIES,
|
||||
lazyConnect: true,
|
||||
});
|
||||
|
||||
if (!this.useStreams) {
|
||||
this.subscriber = new Redis({
|
||||
host: dragonflyConfig.DRAGONFLY_HOST,
|
||||
port: dragonflyConfig.DRAGONFLY_PORT,
|
||||
password: dragonflyConfig.DRAGONFLY_PASSWORD,
|
||||
db: dragonflyConfig.DRAGONFLY_DATABASE || 0,
|
||||
});
|
||||
this.subscriber.on('message', this.handleRedisMessage.bind(this));
|
||||
}
|
||||
|
||||
this.logger.info(`Redis event bus initialized (mode: ${this.useStreams ? 'streams' : 'pub/sub'})`);
|
||||
}
|
||||
|
||||
private handleRedisMessage(channel: string, message: string) {
|
||||
try {
|
||||
const eventMessage: EventBusMessage = JSON.parse(message);
|
||||
|
||||
if (eventMessage.source === this.serviceName) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.emit(eventMessage.type, eventMessage);
|
||||
this.logger.debug(`Received event: ${eventMessage.type} from ${eventMessage.source}`);
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to parse Redis message', { error, message });
|
||||
}
|
||||
}
|
||||
|
||||
async publish<T = any>(type: string, data: T, metadata?: Record<string, any>): Promise<string | null> {
|
||||
const message: EventBusMessage = {
|
||||
id: this.generateId(),
|
||||
type,
|
||||
source: this.serviceName,
|
||||
timestamp: Date.now(),
|
||||
data,
|
||||
metadata,
|
||||
};
|
||||
|
||||
this.emit(type, message);
|
||||
|
||||
if (this.redis && this.enablePersistence) {
|
||||
try {
|
||||
if (this.useStreams) {
|
||||
const streamKey = `events:${type}`;
|
||||
const messageId = await this.redis.xadd(
|
||||
streamKey,
|
||||
'*',
|
||||
'id', message.id,
|
||||
'type', message.type,
|
||||
'source', message.source,
|
||||
'timestamp', message.timestamp.toString(),
|
||||
'data', JSON.stringify(message.data),
|
||||
'metadata', JSON.stringify(message.metadata || {})
|
||||
);
|
||||
|
||||
this.logger.debug(`Published event to stream: ${type}`, {
|
||||
messageId,
|
||||
streamId: messageId
|
||||
});
|
||||
return messageId as string;
|
||||
} else {
|
||||
await this.redis.publish(`events:${type}`, JSON.stringify(message));
|
||||
this.logger.debug(`Published event via pub/sub: ${type}`, { messageId: message.id });
|
||||
return message.id;
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error(`Failed to publish event: ${type}`, { error, messageId: message.id });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
async subscribe<T = any>(eventType: string, handler: EventHandler<T>): Promise<void> {
|
||||
this.on(eventType, handler);
|
||||
|
||||
if (this.redis && this.enablePersistence) {
|
||||
try {
|
||||
if (this.useStreams) {
|
||||
await this.subscribeToStream(eventType, handler);
|
||||
} else {
|
||||
if (this.subscriber) {
|
||||
await this.subscriber.subscribe(`events:${eventType}`);
|
||||
this.logger.debug(`Subscribed to event: ${eventType}`);
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error(`Failed to subscribe to event: ${eventType}`, { error });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async subscribeToStream<T = any>(eventType: string, handler: EventHandler<T>): Promise<void> {
|
||||
const streamKey = `events:${eventType}`;
|
||||
const groupName = `${eventType}-consumers`;
|
||||
const consumerName = `${this.serviceName}-${Date.now()}`;
|
||||
|
||||
try {
|
||||
await this.redis.xgroup('CREATE', streamKey, groupName, '$', 'MKSTREAM');
|
||||
this.logger.debug(`Created consumer group: ${groupName} for stream: ${streamKey}`);
|
||||
} catch (error: any) {
|
||||
if (error.message.includes('BUSYGROUP')) {
|
||||
this.logger.debug(`Consumer group already exists: ${groupName}`);
|
||||
} else {
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
const consumerInfo: StreamConsumerInfo = {
|
||||
streamKey,
|
||||
groupName,
|
||||
consumerName,
|
||||
handler,
|
||||
isRunning: true,
|
||||
};
|
||||
|
||||
this.consumers.set(`${eventType}-${consumerName}`, consumerInfo);
|
||||
this.startStreamConsumer(consumerInfo);
|
||||
this.logger.debug(`Started stream consumer for: ${eventType}`);
|
||||
}
|
||||
|
||||
private async startStreamConsumer(consumerInfo: StreamConsumerInfo): Promise<void> {
|
||||
const { streamKey, groupName, consumerName, handler } = consumerInfo;
|
||||
let retryCount = 0;
|
||||
|
||||
while (consumerInfo.isRunning && this.isRunning) {
|
||||
try {
|
||||
await this.claimPendingMessages(streamKey, groupName, consumerName, handler);
|
||||
|
||||
const messages = await this.redis.xreadgroup(
|
||||
'GROUP', groupName, consumerName,
|
||||
'COUNT', 10,
|
||||
'BLOCK', 1000,
|
||||
'STREAMS', streamKey, '>'
|
||||
);
|
||||
|
||||
if (!messages || messages.length === 0) {
|
||||
retryCount = 0;
|
||||
continue;
|
||||
}
|
||||
|
||||
for (const [stream, msgs] of messages as [string, [string, string[]][]][]) {
|
||||
for (const [msgId, fields] of msgs) {
|
||||
await this.processStreamMessage(msgId, fields, streamKey, groupName, handler);
|
||||
}
|
||||
}
|
||||
|
||||
retryCount = 0;
|
||||
} catch (error: any) {
|
||||
retryCount++;
|
||||
|
||||
if (error.message.includes('NOGROUP')) {
|
||||
this.logger.warn(`Consumer group deleted, recreating: ${groupName}`);
|
||||
try {
|
||||
await this.redis.xgroup('CREATE', streamKey, groupName, '$', 'MKSTREAM');
|
||||
retryCount = 0;
|
||||
} catch (createError) {
|
||||
this.logger.error('Failed to recreate consumer group:', { error: createError });
|
||||
}
|
||||
} else {
|
||||
this.logger.error('Error reading from stream:', { error, retryCount });
|
||||
}
|
||||
|
||||
if (retryCount >= this.maxRetries) {
|
||||
this.logger.error(`Max retries reached for consumer ${consumerName}, stopping`);
|
||||
consumerInfo.isRunning = false;
|
||||
break;
|
||||
}
|
||||
|
||||
const backoffDelay = Math.min(this.retryDelay * Math.pow(2, retryCount - 1), 30000);
|
||||
await this.sleep(backoffDelay);
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.info(`Stream consumer stopped: ${consumerName}`);
|
||||
}
|
||||
|
||||
private async processStreamMessage(
|
||||
msgId: string,
|
||||
fields: string[],
|
||||
streamKey: string,
|
||||
groupName: string,
|
||||
handler: EventHandler
|
||||
): Promise<void> {
|
||||
let retryCount = 0;
|
||||
|
||||
while (retryCount < this.maxRetries) {
|
||||
try {
|
||||
const message = this.parseStreamMessage(fields);
|
||||
|
||||
if (message.source === this.serviceName) {
|
||||
await this.redis.xack(streamKey, groupName, msgId);
|
||||
return;
|
||||
}
|
||||
|
||||
await handler(message);
|
||||
await this.redis.xack(streamKey, groupName, msgId);
|
||||
|
||||
this.logger.debug(`Processed stream message: ${msgId}`, {
|
||||
eventType: message.type,
|
||||
source: message.source
|
||||
});
|
||||
|
||||
return;
|
||||
|
||||
} catch (error) {
|
||||
retryCount++;
|
||||
this.logger.error(`Error processing stream message ${msgId} (attempt ${retryCount}):`, { error });
|
||||
|
||||
if (retryCount >= this.maxRetries) {
|
||||
await this.moveToDeadLetterQueue(msgId, fields, streamKey, groupName, error);
|
||||
return;
|
||||
}
|
||||
|
||||
await this.sleep(this.retryDelay * retryCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async claimPendingMessages(
|
||||
streamKey: string,
|
||||
groupName: string,
|
||||
consumerName: string,
|
||||
handler: EventHandler
|
||||
): Promise<void> {
|
||||
try {
|
||||
const pendingMessages = await this.redis.xpending(
|
||||
streamKey,
|
||||
groupName,
|
||||
'-',
|
||||
'+',
|
||||
10
|
||||
) as any[];
|
||||
|
||||
if (!pendingMessages || pendingMessages.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const oldMessages = pendingMessages.filter((msg: any[]) => {
|
||||
return msg[2] > 60000;
|
||||
});
|
||||
|
||||
if (oldMessages.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const messageIds = oldMessages.map((msg: any[]) => msg[0]);
|
||||
const claimedMessages = await this.redis.xclaim(
|
||||
streamKey,
|
||||
groupName,
|
||||
consumerName,
|
||||
60000,
|
||||
...messageIds
|
||||
) as [string, string[]][];
|
||||
|
||||
for (const [msgId, fields] of claimedMessages) {
|
||||
await this.processStreamMessage(msgId, fields, streamKey, groupName, handler);
|
||||
}
|
||||
|
||||
this.logger.debug(`Claimed and processed ${claimedMessages.length} pending messages`);
|
||||
} catch (error) {
|
||||
this.logger.error('Error claiming pending messages:', { error });
|
||||
}
|
||||
}
|
||||
|
||||
private async moveToDeadLetterQueue(
|
||||
msgId: string,
|
||||
fields: string[],
|
||||
streamKey: string,
|
||||
groupName: string,
|
||||
error: any
|
||||
): Promise<void> {
|
||||
try {
|
||||
const dlqKey = `${streamKey}:dlq`;
|
||||
const message = this.parseStreamMessage(fields);
|
||||
|
||||
await this.redis.xadd(
|
||||
dlqKey,
|
||||
'*',
|
||||
'original_id', msgId,
|
||||
'original_stream', streamKey,
|
||||
'error', (error as Error).message || 'Unknown error',
|
||||
'timestamp', Date.now().toString(),
|
||||
'id', message.id,
|
||||
'type', message.type,
|
||||
'source', message.source,
|
||||
'data', JSON.stringify(message.data),
|
||||
'metadata', JSON.stringify(message.metadata || {})
|
||||
);
|
||||
|
||||
await this.redis.xack(streamKey, groupName, msgId);
|
||||
|
||||
this.logger.warn(`Moved message ${msgId} to dead letter queue: ${dlqKey}`, { error: (error as Error).message });
|
||||
} catch (dlqError) {
|
||||
this.logger.error(`Failed to move message ${msgId} to dead letter queue:`, { error: dlqError });
|
||||
}
|
||||
}
|
||||
|
||||
private parseStreamMessage(fields: string[]): EventBusMessage {
|
||||
const fieldMap: Record<string, string> = {};
|
||||
|
||||
for (let i = 0; i < fields.length; i += 2) {
|
||||
fieldMap[fields[i]] = fields[i + 1];
|
||||
}
|
||||
|
||||
return {
|
||||
id: fieldMap.id,
|
||||
type: fieldMap.type || 'unknown',
|
||||
source: fieldMap.source,
|
||||
timestamp: parseInt(fieldMap.timestamp) || Date.now(),
|
||||
data: fieldMap.data ? JSON.parse(fieldMap.data) : {},
|
||||
metadata: fieldMap.metadata ? JSON.parse(fieldMap.metadata) : {},
|
||||
};
|
||||
}
|
||||
|
||||
private sleep(ms: number): Promise<void> {
|
||||
return new Promise(resolve => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
async unsubscribe(eventType: string, handler?: EventHandler): Promise<void> {
|
||||
if (handler) {
|
||||
this.off(eventType, handler);
|
||||
} else {
|
||||
this.removeAllListeners(eventType);
|
||||
}
|
||||
|
||||
if (this.enablePersistence) {
|
||||
try {
|
||||
if (this.useStreams) {
|
||||
const consumersToStop = Array.from(this.consumers.entries())
|
||||
.filter(([key]) => key.startsWith(`${eventType}-`));
|
||||
|
||||
for (const [key, consumerInfo] of consumersToStop) {
|
||||
consumerInfo.isRunning = false;
|
||||
this.consumers.delete(key);
|
||||
}
|
||||
|
||||
this.logger.debug(`Stopped stream consumers for: ${eventType}`);
|
||||
} else {
|
||||
if (this.subscriber) {
|
||||
await this.subscriber.unsubscribe(`events:${eventType}`);
|
||||
this.logger.debug(`Unsubscribed from event: ${eventType}`);
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error(`Failed to unsubscribe from event: ${eventType}`, { error });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async close(): Promise<void> {
|
||||
this.isRunning = false;
|
||||
|
||||
for (const consumerInfo of this.consumers.values()) {
|
||||
consumerInfo.isRunning = false;
|
||||
}
|
||||
this.consumers.clear();
|
||||
|
||||
if (this.redis) {
|
||||
await this.redis.quit();
|
||||
}
|
||||
if (this.subscriber) {
|
||||
await this.subscriber.quit();
|
||||
}
|
||||
|
||||
this.removeAllListeners();
|
||||
this.logger.info('Event bus closed');
|
||||
}
|
||||
|
||||
private generateId(): string {
|
||||
return `${this.serviceName}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
|
||||
}
|
||||
|
||||
async getStreamInfo(eventType: string): Promise<any> {
|
||||
if (!this.useStreams) {
|
||||
throw new Error('Stream info only available when using Redis Streams');
|
||||
}
|
||||
|
||||
const streamKey = `events:${eventType}`;
|
||||
try {
|
||||
return await this.redis.xinfo('STREAM', streamKey);
|
||||
} catch (error) {
|
||||
this.logger.error(`Failed to get stream info for: ${eventType}`, { error });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async getStreamLength(eventType: string): Promise<number> {
|
||||
if (!this.useStreams) {
|
||||
throw new Error('Stream length only available when using Redis Streams');
|
||||
}
|
||||
|
||||
const streamKey = `events:${eventType}`;
|
||||
try {
|
||||
return await this.redis.xlen(streamKey);
|
||||
} catch (error) {
|
||||
this.logger.error(`Failed to get stream length for: ${eventType}`, { error });
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
async readStreamHistory(
|
||||
eventType: string,
|
||||
startId: string = '-',
|
||||
endId: string = '+',
|
||||
count?: number
|
||||
): Promise<EventBusMessage[]> {
|
||||
if (!this.useStreams) {
|
||||
throw new Error('Stream history only available when using Redis Streams');
|
||||
}
|
||||
|
||||
const streamKey = `events:${eventType}`;
|
||||
try {
|
||||
let messages: [string, string[]][];
|
||||
|
||||
if (count) {
|
||||
messages = await this.redis.xrange(streamKey, startId, endId, 'COUNT', count) as [string, string[]][];
|
||||
} else {
|
||||
messages = await this.redis.xrange(streamKey, startId, endId) as [string, string[]][];
|
||||
}
|
||||
|
||||
return messages.map(([id, fields]) => ({
|
||||
...this.parseStreamMessage(fields),
|
||||
id
|
||||
}));
|
||||
} catch (error) {
|
||||
this.logger.error(`Failed to read stream history for: ${eventType}`, { error });
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
async trimStream(eventType: string, maxLength: number): Promise<number> {
|
||||
if (!this.useStreams) {
|
||||
throw new Error('Stream trimming only available when using Redis Streams');
|
||||
}
|
||||
|
||||
const streamKey = `events:${eventType}`;
|
||||
try {
|
||||
return await this.redis.xtrim(streamKey, 'MAXLEN', '~', maxLength);
|
||||
} catch (error) {
|
||||
this.logger.error(`Failed to trim stream: ${eventType}`, { error });
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
async replayEventsFromTimestamp(
|
||||
eventType: string,
|
||||
fromTimestamp: number,
|
||||
handler: EventHandler,
|
||||
speed: number = 1
|
||||
): Promise<void> {
|
||||
if (!this.useStreams) {
|
||||
throw new Error('Event replay only available when using Redis Streams');
|
||||
}
|
||||
|
||||
const events = await this.readStreamHistory(eventType);
|
||||
const filteredEvents = events.filter(event => event.timestamp >= fromTimestamp);
|
||||
|
||||
this.logger.info(`Replaying ${filteredEvents.length} events from ${new Date(fromTimestamp)}`);
|
||||
|
||||
for (let i = 0; i < filteredEvents.length; i++) {
|
||||
const event = filteredEvents[i];
|
||||
const nextEvent = filteredEvents[i + 1];
|
||||
|
||||
try {
|
||||
await handler(event);
|
||||
|
||||
if (nextEvent && speed > 0) {
|
||||
const delay = (nextEvent.timestamp - event.timestamp) / speed;
|
||||
if (delay > 0) {
|
||||
await this.sleep(Math.min(delay, 1000));
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error(`Error replaying event: ${event.id}`, { error });
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.info('Event replay completed');
|
||||
}
|
||||
}
|
||||
|
||||
export function createEventBus(options: EventBusOptions): EventBus {
|
||||
return new EventBus(options);
|
||||
}
|
||||
|
|
@ -10,11 +10,12 @@
|
|||
"test": "bun test"
|
||||
},
|
||||
"dependencies": {
|
||||
"@stock-bot/logger": "workspace:*",
|
||||
"@stock-bot/config": "workspace:*",
|
||||
"@stock-bot/utils": "workspace:*",
|
||||
"@stock-bot/event-bus": "workspace:*",
|
||||
"@stock-bot/data-frame": "workspace:*",
|
||||
"@stock-bot/event-bus": "workspace:*",
|
||||
"@stock-bot/logger": "workspace:*",
|
||||
"@stock-bot/utils": "workspace:*",
|
||||
"commander": "^14.0.0",
|
||||
"eventemitter3": "^5.0.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
|
|
|
|||
|
|
@ -143,7 +143,7 @@ export abstract class BaseStrategy extends EventEmitter {
|
|||
|
||||
// Utility methods
|
||||
protected async emitSignal(signal: TradingSignal): Promise<void> {
|
||||
await this.eventBus.publishStrategySignal(this.config.id, signal);
|
||||
await this.eventBus.publish(this.config.id, signal);
|
||||
this.emit('signal', signal);
|
||||
this.logger.info('Signal generated', {
|
||||
signal: signal.type,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue