From d22f7aafa0b7c796f1f64f9ea8bbd7674c881e29 Mon Sep 17 00:00:00 2001 From: Bojan Kucera Date: Tue, 3 Jun 2025 19:43:22 -0400 Subject: [PATCH] work up marketdatagateway --- .../market-data-gateway/src/index.ts | 16 +- .../market-data-gateway/src/index_clean.ts | 12 +- .../src/services/MarketDataGatewayService.ts | 380 ---------------- .../MarketDataGatewayService.ts.backup | 404 ++++++++++++++++++ .../src/types/MarketDataGateway.ts | 144 +++++-- .../market-data-gateway/tsconfig.json | 28 +- .../src/controllers/HealthController.ts | 4 +- .../src/controllers/JobController.ts | 4 +- .../src/controllers/PipelineController.ts | 4 +- .../src/core/DataPipelineOrchestrator.ts | 76 ++-- .../data-processor/src/core/JobQueue.ts | 4 +- .../src/core/PipelineScheduler.ts | 9 +- .../src/services/DataIngestionService.ts | 12 +- .../src/services/DataQualityService.ts | 11 +- .../src/services/DataTransformationService.ts | 15 +- .../src/services/DataValidationService.ts | 4 +- libs/types/src/events/events.ts | 20 +- 17 files changed, 653 insertions(+), 494 deletions(-) create mode 100644 apps/core-services/market-data-gateway/src/services/MarketDataGatewayService.ts.backup diff --git a/apps/core-services/market-data-gateway/src/index.ts b/apps/core-services/market-data-gateway/src/index.ts index a664f25..0ed49f4 100644 --- a/apps/core-services/market-data-gateway/src/index.ts +++ b/apps/core-services/market-data-gateway/src/index.ts @@ -55,11 +55,11 @@ const config: GatewayConfig = { methods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'], headers: ['Content-Type', 'Authorization'], }, - }, - dataSources: getEnabledProviders().map(provider => ({ + }, dataSources: getEnabledProviders().map(provider => ({ id: provider.name.toLowerCase().replace(/\s+/g, '-'), name: provider.name, type: provider.type === 'both' ? 'websocket' : provider.type as any, + provider: provider.name, enabled: provider.enabled, priority: provider.priority, rateLimit: { @@ -114,21 +114,17 @@ const config: GatewayConfig = { candles: 86400000, // 24 hours orderbook: 30000, // 30 seconds }, - }, - monitoring: { + }, monitoring: { metrics: { enabled: true, port: parseInt(process.env.METRICS_PORT || '9090'), - path: '/metrics', - }, - logging: { - level: 'info', - format: 'json', - outputs: ['console'], + intervalMs: 5000, + retention: '24h', }, alerts: { enabled: true, thresholds: { + latency: 1000, latencyMs: 1000, errorRate: 0.05, connectionLoss: 3, diff --git a/apps/core-services/market-data-gateway/src/index_clean.ts b/apps/core-services/market-data-gateway/src/index_clean.ts index 6da5aa2..126d00e 100644 --- a/apps/core-services/market-data-gateway/src/index_clean.ts +++ b/apps/core-services/market-data-gateway/src/index_clean.ts @@ -55,21 +55,17 @@ const config: GatewayConfig = { candles: 86400000, // 24 hours orderbook: 30000, // 30 seconds }, - }, - monitoring: { + }, monitoring: { metrics: { enabled: true, port: parseInt(process.env.METRICS_PORT || '9090'), - path: '/metrics', - }, - logging: { - level: 'info', - format: 'json', - outputs: ['console'], + intervalMs: 5000, + retention: '24h', }, alerts: { enabled: true, thresholds: { + latency: 1000, latencyMs: 1000, errorRate: 0.05, connectionLoss: 3, diff --git a/apps/core-services/market-data-gateway/src/services/MarketDataGatewayService.ts b/apps/core-services/market-data-gateway/src/services/MarketDataGatewayService.ts index 122835c..e69de29 100644 --- a/apps/core-services/market-data-gateway/src/services/MarketDataGatewayService.ts +++ b/apps/core-services/market-data-gateway/src/services/MarketDataGatewayService.ts @@ -1,380 +0,0 @@ -import { EventEmitter } from 'eventemitter3'; -// Local logger interface to avoid pino dependency issues -interface Logger { - info(msg: string, ...args: any[]): void; - error(msg: string, ...args: any[]): void; - warn(msg: string, ...args: any[]): void; - debug(msg: string, ...args: any[]): void; - child(options: any): Logger; -} - -// Simple logger implementation -const createLogger = (name: string): Logger => ({ - info: (msg: string, ...args: any[]) => console.log(`[${name}] INFO:`, msg, ...args), - error: (msg: string, ...args: any[]) => console.error(`[${name}] ERROR:`, msg, ...args), - warn: (msg: string, ...args: any[]) => console.warn(`[${name}] WARN:`, msg, ...args), - debug: (msg: string, ...args: any[]) => console.debug(`[${name}] DEBUG:`, msg, ...args), - child: (options: any) => createLogger(`${name}.${options.component || 'child'}`) -}); -import { - GatewayConfig, - DataSourceConfig, - ProcessingPipeline, - ClientSubscription, - SubscriptionRequest, - DataSourceMetrics, - GatewayMetrics, - MarketDataTick, - MarketDataCandle, - MarketDataTrade, - MarketDataOrder, - HealthStatus -} from '../types/MarketDataGateway'; -import { DataSourceManager } from './DataSourceManager'; -import { ProcessingEngine } from './ProcessingEngine'; -import { SubscriptionManager } from './SubscriptionManager'; -import { CacheManager } from './CacheManager'; -import { MetricsCollector } from './MetricsCollector'; -import { ServiceIntegrationManager } from './ServiceIntegrationManager'; - -export class MarketDataGatewayService extends EventEmitter { - private config: GatewayConfig; - private logger: Logger; - private dataSourceManager: DataSourceManager; - private processingEngine: ProcessingEngine; - private subscriptionManager: SubscriptionManager; - private cacheManager: CacheManager; - private metricsCollector: MetricsCollector; - private serviceIntegration: ServiceIntegrationManager; - private isRunning = false; - private startTime: Date = new Date(); - - constructor(config: GatewayConfig, logger: Logger) { - super(); - this.config = config; - this.logger = logger; - - this.initializeComponents(); - this.setupEventHandlers(); - } - - private initializeComponents() { - this.logger.info('Initializing Market Data Gateway components'); - - // Initialize core components - this.dataSourceManager = new DataSourceManager( - this.config.dataSources, - this.logger.child({ component: 'DataSourceManager' }) - ); - - this.processingEngine = new ProcessingEngine( - this.config.processing, - this.logger.child({ component: 'ProcessingEngine' }) - ); - - this.subscriptionManager = new SubscriptionManager( - this.logger.child({ component: 'SubscriptionManager' }) - ); - - this.cacheManager = new CacheManager( - this.config.cache, - this.logger.child({ component: 'CacheManager' }) - ); - - this.metricsCollector = new MetricsCollector( - this.logger.child({ component: 'MetricsCollector' }) - ); - - this.serviceIntegration = new ServiceIntegrationManager( - this.logger.child({ component: 'ServiceIntegration' }) - ); - } - - private setupEventHandlers() { - // Data source events - this.dataSourceManager.on('data', this.handleIncomingData.bind(this)); - this.dataSourceManager.on('error', this.handleDataSourceError.bind(this)); - this.dataSourceManager.on('connected', this.handleDataSourceConnected.bind(this)); - this.dataSourceManager.on('disconnected', this.handleDataSourceDisconnected.bind(this)); - - // Processing engine events - this.processingEngine.on('processed', this.handleProcessedData.bind(this)); - this.processingEngine.on('error', this.handleProcessingError.bind(this)); - - // Subscription events - this.subscriptionManager.on('subscribed', this.handleClientSubscribed.bind(this)); - this.subscriptionManager.on('unsubscribed', this.handleClientUnsubscribed.bind(this)); - this.subscriptionManager.on('error', this.handleSubscriptionError.bind(this)); - - // Cache events - this.cacheManager.on('cached', this.handleDataCached.bind(this)); - this.cacheManager.on('error', this.handleCacheError.bind(this)); - - // Service integration events - this.serviceIntegration.on('data-forwarded', this.handleDataForwarded.bind(this)); - this.serviceIntegration.on('integration-error', this.handleIntegrationError.bind(this)); - } - - public async start(): Promise { - if (this.isRunning) { - this.logger.warn('Gateway is already running'); - return; - } - - try { - this.logger.info('Starting Market Data Gateway'); - this.startTime = new Date(); - - // Start components in order - await this.cacheManager.start(); - await this.metricsCollector.start(); - await this.serviceIntegration.start(); - await this.processingEngine.start(); - await this.subscriptionManager.start(); - await this.dataSourceManager.start(); - - this.isRunning = true; - this.logger.info('Market Data Gateway started successfully'); - this.emit('started'); - - } catch (error) { - this.logger.error({ error }, 'Failed to start Market Data Gateway'); - await this.stop(); - throw error; - } - } - - public async stop(): Promise { - if (!this.isRunning) { - return; - } - - try { - this.logger.info('Stopping Market Data Gateway'); - - // Stop components in reverse order - await this.dataSourceManager.stop(); - await this.subscriptionManager.stop(); - await this.processingEngine.stop(); - await this.serviceIntegration.stop(); - await this.metricsCollector.stop(); - await this.cacheManager.stop(); - - this.isRunning = false; - this.logger.info('Market Data Gateway stopped'); - this.emit('stopped'); - - } catch (error) { - this.logger.error({ error }, 'Error stopping Market Data Gateway'); - throw error; - } - } - - // Data handling methods - private async handleIncomingData(sourceId: string, data: any): Promise { - try { - this.metricsCollector.recordMessage(sourceId, 'received'); - - // Process data through pipeline - const processedData = await this.processingEngine.process(data); - - // Cache processed data - await this.cacheManager.cache(processedData); - - // Forward to subscribers - await this.subscriptionManager.broadcast(processedData); - - // Forward to integrated services - await this.serviceIntegration.forwardData(processedData); - - this.emit('data-processed', { sourceId, data: processedData }); - - } catch (error) { - this.logger.error({ error, sourceId, data }, 'Error handling incoming data'); - this.metricsCollector.recordError(sourceId); - } - } - - private async handleProcessedData(data: any): Promise { - this.logger.debug({ data }, 'Data processed successfully'); - this.metricsCollector.recordMessage('processing', 'processed'); - } - - private handleDataSourceError(sourceId: string, error: Error): void { - this.logger.error({ sourceId, error }, 'Data source error'); - this.metricsCollector.recordError(sourceId); - this.emit('source-error', { sourceId, error }); - } - - private handleDataSourceConnected(sourceId: string): void { - this.logger.info({ sourceId }, 'Data source connected'); - this.metricsCollector.recordConnection(sourceId, 'connected'); - } - - private handleDataSourceDisconnected(sourceId: string): void { - this.logger.warn({ sourceId }, 'Data source disconnected'); - this.metricsCollector.recordConnection(sourceId, 'disconnected'); - } - - private handleProcessingError(error: Error, data: any): void { - this.logger.error({ error, data }, 'Processing error'); - this.emit('processing-error', { error, data }); - } - - private handleClientSubscribed(subscription: ClientSubscription): void { - this.logger.info({ - clientId: subscription.request.clientId, - symbols: subscription.request.symbols - }, 'Client subscribed'); - } - - private handleClientUnsubscribed(clientId: string): void { - this.logger.info({ clientId }, 'Client unsubscribed'); - } - - private handleSubscriptionError(error: Error, clientId: string): void { - this.logger.error({ error, clientId }, 'Subscription error'); - } - - private handleDataCached(key: string, data: any): void { - this.logger.debug({ key }, 'Data cached'); - } - - private handleCacheError(error: Error, operation: string): void { - this.logger.error({ error, operation }, 'Cache error'); - } - - private handleDataForwarded(service: string, data: any): void { - this.logger.debug({ service }, 'Data forwarded to service'); - } - - private handleIntegrationError(service: string, error: Error): void { - this.logger.error({ service, error }, 'Service integration error'); - } - - // Public API methods - public async subscribe(request: SubscriptionRequest): Promise { - return this.subscriptionManager.subscribe(request); - } - - public async unsubscribe(subscriptionId: string): Promise { - return this.subscriptionManager.unsubscribe(subscriptionId); - } - - public async getSubscriptions(clientId?: string): Promise { - return this.subscriptionManager.getSubscriptions(clientId); - } - - public async addDataSource(config: DataSourceConfig): Promise { - return this.dataSourceManager.addDataSource(config); - } - - public async removeDataSource(sourceId: string): Promise { - return this.dataSourceManager.removeDataSource(sourceId); - } - - public async updateDataSource(sourceId: string, config: Partial): Promise { - return this.dataSourceManager.updateDataSource(sourceId, config); - } - - public async getDataSources(): Promise { - return this.dataSourceManager.getDataSources(); - } - - public async addProcessingPipeline(pipeline: ProcessingPipeline): Promise { - return this.processingEngine.addPipeline(pipeline); - } - - public async removeProcessingPipeline(pipelineId: string): Promise { - return this.processingEngine.removePipeline(pipelineId); - } - - public async getProcessingPipelines(): Promise { - return this.processingEngine.getPipelines(); - } - - public async getMetrics(): Promise { - return this.metricsCollector.getMetrics(); - } - - public async getDataSourceMetrics(sourceId?: string): Promise { - return this.metricsCollector.getDataSourceMetrics(sourceId); - } - - public async getHealthStatus(): Promise { - const metrics = await this.getMetrics(); - const dataSources = await this.getDataSources(); - - // Check component health - const dependencies = [ - { - name: 'cache', - status: await this.cacheManager.isHealthy() ? 'healthy' : 'unhealthy' as const - }, - { - name: 'processing-engine', - status: this.processingEngine.isHealthy() ? 'healthy' : 'unhealthy' as const - }, - { - name: 'data-sources', - status: dataSources.every(ds => ds.enabled) ? 'healthy' : 'unhealthy' as const - } - ]; - - const hasUnhealthyDependencies = dependencies.some(dep => dep.status === 'unhealthy'); - - return { - service: 'market-data-gateway', - status: hasUnhealthyDependencies ? 'degraded' : 'healthy', - timestamp: new Date(), - uptime: Date.now() - this.startTime.getTime(), - version: process.env.SERVICE_VERSION || '1.0.0', - dependencies, - metrics: { - connectionsActive: metrics.subscriptions.active, - messagesPerSecond: metrics.processing.messagesPerSecond, - errorRate: metrics.processing.errorRate, - avgLatencyMs: metrics.dataSources.reduce((sum, ds) => sum + ds.latency.avgMs, 0) / metrics.dataSources.length || 0 - } - }; - } - - // Cache operations - public async getCachedData(key: string): Promise { - return this.cacheManager.get(key); - } - - public async setCachedData(key: string, data: any, ttl?: number): Promise { - return this.cacheManager.set(key, data, ttl); - } - - // Configuration management - public getConfig(): GatewayConfig { - return { ...this.config }; - } - - public async updateConfig(updates: Partial): Promise { - this.config = { ...this.config, ...updates }; - this.logger.info('Gateway configuration updated'); - - // Notify components of config changes - if (updates.dataSources) { - await this.dataSourceManager.updateConfig(updates.dataSources); - } - - if (updates.processing) { - await this.processingEngine.updateConfig(updates.processing); - } - - this.emit('config-updated', this.config); - } - - // Utility methods - public isRunning(): boolean { - return this.isRunning; - } - - public getUptime(): number { - return Date.now() - this.startTime.getTime(); - } -} diff --git a/apps/core-services/market-data-gateway/src/services/MarketDataGatewayService.ts.backup b/apps/core-services/market-data-gateway/src/services/MarketDataGatewayService.ts.backup new file mode 100644 index 0000000..44a548a --- /dev/null +++ b/apps/core-services/market-data-gateway/src/services/MarketDataGatewayService.ts.backup @@ -0,0 +1,404 @@ +import { EventEmitter } from 'eventemitter3'; +// Local logger interface to avoid pino dependency issues +interface Logger { + info(msg: string | object, ...args: any[]): void; + error(msg: string | object, ...args: any[]): void; + warn(msg: string | object, ...args: any[]): void; + debug(msg: string | object, ...args: any[]): void; + child(options: any): Logger; +} + +// Simple logger implementation +const createLogger = (name: string): Logger => ({ + info: (msg: string | object, ...args: any[]) => { + if (typeof msg === 'object') { + console.log(`[${name}] INFO:`, JSON.stringify(msg), ...args); + } else { + console.log(`[${name}] INFO:`, msg, ...args); + } + }, + error: (msg: string | object, ...args: any[]) => { + if (typeof msg === 'object') { + console.error(`[${name}] ERROR:`, JSON.stringify(msg), ...args); + } else { + console.error(`[${name}] ERROR:`, msg, ...args); + } + }, + warn: (msg: string | object, ...args: any[]) => { + if (typeof msg === 'object') { + console.warn(`[${name}] WARN:`, JSON.stringify(msg), ...args); + } else { + console.warn(`[${name}] WARN:`, msg, ...args); + } + }, + debug: (msg: string | object, ...args: any[]) => { + if (typeof msg === 'object') { + console.debug(`[${name}] DEBUG:`, JSON.stringify(msg), ...args); + } else { + console.debug(`[${name}] DEBUG:`, msg, ...args); + } + }, + child: (options: any) => createLogger(`${name}.${options.component || 'child'}`) +}); +import { + GatewayConfig, + DataSourceConfig, + ProcessingPipeline, + ClientSubscription, + SubscriptionRequest, + DataSourceMetrics, + GatewayMetrics, + MarketDataTick, + MarketDataCandle, + MarketDataTrade, + MarketDataOrder, + HealthStatus +} from '../types/MarketDataGateway'; +import { DataSourceManager } from './DataSourceManager'; +import { ProcessingEngine } from './ProcessingEngine'; +import { SubscriptionManager } from './SubscriptionManager'; +import { CacheManager } from './CacheManager'; +import { MetricsCollector } from './MetricsCollector'; +import { ServiceIntegrationManager } from './ServiceIntegrationManager'; + +export class MarketDataGatewayService extends EventEmitter { + private config: GatewayConfig; + private logger: Logger; + private dataSourceManager!: DataSourceManager; + private processingEngine!: ProcessingEngine; + private subscriptionManager!: SubscriptionManager; + private cacheManager!: CacheManager; + private metricsCollector!: MetricsCollector; + private serviceIntegration!: ServiceIntegrationManager; + private _isRunning = false; + private startTime: Date = new Date(); + + constructor(config: GatewayConfig, logger: Logger) { + super(); + this.config = config; + this.logger = logger; + + this.initializeComponents(); + this.setupEventHandlers(); + } + + private initializeComponents() { + this.logger.info('Initializing Market Data Gateway components'); + + // Initialize core components + this.dataSourceManager = new DataSourceManager( + this.config.dataSources, + this.logger.child({ component: 'DataSourceManager' }) + ); + + this.processingEngine = new ProcessingEngine( + this.config.processing, + this.logger.child({ component: 'ProcessingEngine' }) + ); + + this.subscriptionManager = new SubscriptionManager( + this.logger.child({ component: 'SubscriptionManager' }) + ); + + this.cacheManager = new CacheManager( + this.config.cache, + this.logger.child({ component: 'CacheManager' }) + ); + + this.metricsCollector = new MetricsCollector( + this.logger.child({ component: 'MetricsCollector' }) + ); + + this.serviceIntegration = new ServiceIntegrationManager( + this.logger.child({ component: 'ServiceIntegration' }) + ); + } + + private setupEventHandlers() { + // Data source events + this.dataSourceManager.on('data', this.handleIncomingData.bind(this)); + this.dataSourceManager.on('error', this.handleDataSourceError.bind(this)); + this.dataSourceManager.on('connected', this.handleDataSourceConnected.bind(this)); + this.dataSourceManager.on('disconnected', this.handleDataSourceDisconnected.bind(this)); + + // Processing engine events + this.processingEngine.on('processed', this.handleProcessedData.bind(this)); + this.processingEngine.on('error', this.handleProcessingError.bind(this)); + + // Subscription events + this.subscriptionManager.on('subscribed', this.handleClientSubscribed.bind(this)); + this.subscriptionManager.on('unsubscribed', this.handleClientUnsubscribed.bind(this)); + this.subscriptionManager.on('error', this.handleSubscriptionError.bind(this)); + + // Cache events + this.cacheManager.on('cached', this.handleDataCached.bind(this)); + this.cacheManager.on('error', this.handleCacheError.bind(this)); + + // Service integration events + this.serviceIntegration.on('data-forwarded', this.handleDataForwarded.bind(this)); + this.serviceIntegration.on('integration-error', this.handleIntegrationError.bind(this)); + } + + public async start(): Promise { + if (this.isRunning) { + this.logger.warn('Gateway is already running'); + return; + } + + try { + this.logger.info('Starting Market Data Gateway'); + this.startTime = new Date(); + + // Start components in order + await this.cacheManager.start(); + await this.metricsCollector.start(); + await this.serviceIntegration.start(); + await this.processingEngine.start(); + await this.subscriptionManager.start(); + await this.dataSourceManager.start(); + + this.isRunning = true; + this.logger.info('Market Data Gateway started successfully'); + this.emit('started'); + + } catch (error) { + this.logger.error({ error }, 'Failed to start Market Data Gateway'); + await this.stop(); + throw error; + } + } + + public async stop(): Promise { + if (!this.isRunning) { + return; + } + + try { + this.logger.info('Stopping Market Data Gateway'); + + // Stop components in reverse order + await this.dataSourceManager.stop(); + await this.subscriptionManager.stop(); + await this.processingEngine.stop(); + await this.serviceIntegration.stop(); + await this.metricsCollector.stop(); + await this.cacheManager.stop(); + + this.isRunning = false; + this.logger.info('Market Data Gateway stopped'); + this.emit('stopped'); + + } catch (error) { + this.logger.error({ error }, 'Error stopping Market Data Gateway'); + throw error; + } + } + + // Data handling methods + private async handleIncomingData(sourceId: string, data: any): Promise { + try { + this.metricsCollector.recordMessage(sourceId, 'received'); + + // Process data through pipeline + const processedData = await this.processingEngine.process(data); + + // Cache processed data + await this.cacheManager.cache(processedData); + + // Forward to subscribers + await this.subscriptionManager.broadcast(processedData); + + // Forward to integrated services + await this.serviceIntegration.forwardData(processedData); + + this.emit('data-processed', { sourceId, data: processedData }); + + } catch (error) { + this.logger.error({ error, sourceId, data }, 'Error handling incoming data'); + this.metricsCollector.recordError(sourceId); + } + } + + private async handleProcessedData(data: any): Promise { + this.logger.debug({ data }, 'Data processed successfully'); + this.metricsCollector.recordMessage('processing', 'processed'); + } + + private handleDataSourceError(sourceId: string, error: Error): void { + this.logger.error({ sourceId, error }, 'Data source error'); + this.metricsCollector.recordError(sourceId); + this.emit('source-error', { sourceId, error }); + } + + private handleDataSourceConnected(sourceId: string): void { + this.logger.info({ sourceId }, 'Data source connected'); + this.metricsCollector.recordConnection(sourceId, 'connected'); + } + + private handleDataSourceDisconnected(sourceId: string): void { + this.logger.warn({ sourceId }, 'Data source disconnected'); + this.metricsCollector.recordConnection(sourceId, 'disconnected'); + } + + private handleProcessingError(error: Error, data: any): void { + this.logger.error({ error, data }, 'Processing error'); + this.emit('processing-error', { error, data }); + } + + private handleClientSubscribed(subscription: ClientSubscription): void { + this.logger.info({ + clientId: subscription.request.clientId, + symbols: subscription.request.symbols + }, 'Client subscribed'); + } + + private handleClientUnsubscribed(clientId: string): void { + this.logger.info({ clientId }, 'Client unsubscribed'); + } + + private handleSubscriptionError(error: Error, clientId: string): void { + this.logger.error({ error, clientId }, 'Subscription error'); + } + + private handleDataCached(key: string, data: any): void { + this.logger.debug({ key }, 'Data cached'); + } + + private handleCacheError(error: Error, operation: string): void { + this.logger.error({ error, operation }, 'Cache error'); + } + + private handleDataForwarded(service: string, data: any): void { + this.logger.debug({ service }, 'Data forwarded to service'); + } + + private handleIntegrationError(service: string, error: Error): void { + this.logger.error({ service, error }, 'Service integration error'); + } + + // Public API methods + public async subscribe(request: SubscriptionRequest): Promise { + return this.subscriptionManager.subscribe(request); + } + + public async unsubscribe(subscriptionId: string): Promise { + return this.subscriptionManager.unsubscribe(subscriptionId); + } + + public async getSubscriptions(clientId?: string): Promise { + return this.subscriptionManager.getSubscriptions(clientId); + } + + public async addDataSource(config: DataSourceConfig): Promise { + return this.dataSourceManager.addDataSource(config); + } + + public async removeDataSource(sourceId: string): Promise { + return this.dataSourceManager.removeDataSource(sourceId); + } + + public async updateDataSource(sourceId: string, config: Partial): Promise { + return this.dataSourceManager.updateDataSource(sourceId, config); + } + + public async getDataSources(): Promise { + return this.dataSourceManager.getDataSources(); + } + + public async addProcessingPipeline(pipeline: ProcessingPipeline): Promise { + return this.processingEngine.addPipeline(pipeline); + } + + public async removeProcessingPipeline(pipelineId: string): Promise { + return this.processingEngine.removePipeline(pipelineId); + } + + public async getProcessingPipelines(): Promise { + return this.processingEngine.getPipelines(); + } + + public async getMetrics(): Promise { + return this.metricsCollector.getMetrics(); + } + + public async getDataSourceMetrics(sourceId?: string): Promise { + return this.metricsCollector.getDataSourceMetrics(sourceId); + } + + public async getHealthStatus(): Promise { + const metrics = await this.getMetrics(); + const dataSources = await this.getDataSources(); + + // Check component health + const dependencies = [ + { + name: 'cache', + status: await this.cacheManager.isHealthy() ? 'healthy' : 'unhealthy' as const + }, + { + name: 'processing-engine', + status: this.processingEngine.isHealthy() ? 'healthy' : 'unhealthy' as const + }, + { + name: 'data-sources', + status: dataSources.every(ds => ds.enabled) ? 'healthy' : 'unhealthy' as const + } + ]; + + const hasUnhealthyDependencies = dependencies.some(dep => dep.status === 'unhealthy'); + + return { + service: 'market-data-gateway', + status: hasUnhealthyDependencies ? 'degraded' : 'healthy', + timestamp: new Date(), + uptime: Date.now() - this.startTime.getTime(), + version: process.env.SERVICE_VERSION || '1.0.0', + dependencies, + metrics: { + connectionsActive: metrics.subscriptions.active, + messagesPerSecond: metrics.processing.messagesPerSecond, + errorRate: metrics.processing.errorRate, + avgLatencyMs: metrics.dataSources.reduce((sum, ds) => sum + ds.latency.avgMs, 0) / metrics.dataSources.length || 0 + } + }; + } + + // Cache operations + public async getCachedData(key: string): Promise { + return this.cacheManager.get(key); + } + + public async setCachedData(key: string, data: any, ttl?: number): Promise { + return this.cacheManager.set(key, data, ttl); + } + + // Configuration management + public getConfig(): GatewayConfig { + return { ...this.config }; + } + + public async updateConfig(updates: Partial): Promise { + this.config = { ...this.config, ...updates }; + this.logger.info('Gateway configuration updated'); + + // Notify components of config changes + if (updates.dataSources) { + await this.dataSourceManager.updateConfig(updates.dataSources); + } + + if (updates.processing) { + await this.processingEngine.updateConfig(updates.processing); + } + + this.emit('config-updated', this.config); + } + + // Utility methods + public isRunning(): boolean { + return this.isRunning; + } + + public getUptime(): number { + return Date.now() - this.startTime.getTime(); + } +} diff --git a/apps/core-services/market-data-gateway/src/types/MarketDataGateway.ts b/apps/core-services/market-data-gateway/src/types/MarketDataGateway.ts index dabd1eb..fb00621 100644 --- a/apps/core-services/market-data-gateway/src/types/MarketDataGateway.ts +++ b/apps/core-services/market-data-gateway/src/types/MarketDataGateway.ts @@ -66,6 +66,7 @@ export interface DataSourceConfig { id: string; name: string; type: 'websocket' | 'rest' | 'fix' | 'stream'; + provider: string; enabled: boolean; priority: number; rateLimit: { @@ -131,35 +132,8 @@ export interface ProcessingPipeline { }; } -// Data Processing Pipeline -export interface DataProcessor { - id: string; - name: string; - type: 'enrichment' | 'validation' | 'normalization' | 'aggregation' | 'filter'; - enabled: boolean; - priority: number; - config: Record; - process(data: MarketDataTick | MarketDataCandle | MarketDataTrade): Promise; -} - -export interface ProcessingPipeline { - id: string; - name: string; - processors: DataProcessor[]; - inputFilter: { - symbols?: string[]; - sources?: string[]; - dataTypes?: string[]; - }; - outputTargets: { - eventBus?: boolean; - database?: boolean; - cache?: boolean; - websocket?: boolean; - dataProcessor?: boolean; - featureStore?: boolean; - }; -} +// ProcessingPipelineConfig is an alias for ProcessingPipeline +export type ProcessingPipelineConfig = ProcessingPipeline; // Subscription Management export interface SubscriptionRequest { @@ -228,10 +202,10 @@ export interface GatewayConfig { candles: number; orderbook: number; }; - }; - monitoring: { + }; monitoring: { metrics: { enabled: boolean; + port: number; intervalMs: number; retention: string; }; @@ -240,6 +214,7 @@ export interface GatewayConfig { thresholds: { errorRate: number; latency: number; + latencyMs: number; connectionLoss: number; }; }; @@ -320,6 +295,7 @@ export interface WebSocketSubscribeMessage extends WebSocketMessage { export interface WebSocketDataMessage extends WebSocketMessage { type: 'data'; payload: MarketDataTick | MarketDataTrade | MarketDataCandle | MarketDataOrder; + dataType?: string; } // Error Types @@ -342,3 +318,109 @@ export interface MarketDataEvent { data: MarketDataTick | MarketDataTrade | MarketDataCandle | MarketDataOrder; metadata?: Record; } + +// Processing and Integration Types +export interface ProcessingError { + code: string; + message: string; + timestamp: Date; + data?: any; + source?: string; +} + +export interface ServiceIntegration { + serviceName: string; + endpoint: string; + enabled: boolean; + config: Record; + dataProcessor: { + enabled: boolean; + endpoint: string; + timeout: number; + retries: number; + }; + featureStore: { + enabled: boolean; + endpoint: string; + timeout: number; + retries: number; + }; + dataCatalog: { + enabled: boolean; + endpoint: string; + timeout: number; + retries: number; + }; +} + +export interface Logger { + info(message: string, ...args: any[]): void; + error(message: string, ...args: any[]): void; + warn(message: string, ...args: any[]): void; + debug(message: string, ...args: any[]): void; +} + +export interface ProcessedData { + source: string; + timestamp: Date; + data: any; + processedAt: Date; + metadata?: Record; +} + +export interface DataPipelineJob { + id: string; + type: string; + status: 'pending' | 'running' | 'completed' | 'failed'; + data: any; + createdAt: Date; + startedAt?: Date; + completedAt?: Date; +} + +export interface FeatureComputationRequest { + featureGroupId: string; + features: string[]; + data: any; + timestamp: Date; + metadata?: Record; +} + +export interface DataAsset { + id: string; + name: string; + type: string; + source: string; + metadata: Record; + createdAt: Date; + updatedAt: Date; +} + +// Add missing types +export interface CacheConfig { + redis: { + host: string; + port: number; + password?: string; + db: number; + }; + ttl: { + quotes: number; + trades: number; + candles: number; + orderbook: number; + }; +} + +export interface ProcessingMetrics { + totalProcessed: number; + processedPerSecond: number; + processingLatency: number; + errorCount: number; +} + +export interface SubscriptionMetrics { + totalSubscriptions: number; + messagesSent: number; + sendRate: number; +} diff --git a/apps/core-services/market-data-gateway/tsconfig.json b/apps/core-services/market-data-gateway/tsconfig.json index 168d88b..175764e 100644 --- a/apps/core-services/market-data-gateway/tsconfig.json +++ b/apps/core-services/market-data-gateway/tsconfig.json @@ -2,11 +2,31 @@ "extends": "../../../tsconfig.json", "compilerOptions": { "outDir": "./dist", - "rootDir": "./src", "module": "ESNext", "moduleResolution": "bundler", - "types": ["bun-types"] + "types": ["bun-types"], + "baseUrl": "../../../", + "paths": { + "@stock-bot/*": ["libs/*/src", "libs/*/dist"] + }, + "rootDir": "../../../" }, - "include": ["src/**/*"], - "exclude": ["node_modules", "dist"] + "include": [ + "src/**/*", + "../../../libs/*/src/**/*" + ], + "exclude": [ + "node_modules", + "dist", + "../../../libs/*/examples/**/*", + "../../../libs/**/*.test.ts", + "../../../libs/**/*.spec.ts" + ], + "references": [ + { "path": "../../../libs/config" }, + { "path": "../../../libs/types" }, + { "path": "../../../libs/logger" }, + { "path": "../../../libs/http-client" }, + { "path": "../../../libs/event-bus" } + ] } diff --git a/apps/data-services/data-processor/src/controllers/HealthController.ts b/apps/data-services/data-processor/src/controllers/HealthController.ts index 5f78f94..123943e 100644 --- a/apps/data-services/data-processor/src/controllers/HealthController.ts +++ b/apps/data-services/data-processor/src/controllers/HealthController.ts @@ -1,5 +1,7 @@ import { Context } from 'hono'; -import { logger } from '@stock-bot/utils'; +import { getLogger } from '@stock-bot/logger'; + +const logger = getLogger('HealthController'); export class HealthController { async getHealth(c: Context): Promise { diff --git a/apps/data-services/data-processor/src/controllers/JobController.ts b/apps/data-services/data-processor/src/controllers/JobController.ts index 6d002c5..3a365e9 100644 --- a/apps/data-services/data-processor/src/controllers/JobController.ts +++ b/apps/data-services/data-processor/src/controllers/JobController.ts @@ -1,5 +1,7 @@ import { Context } from 'hono'; -import { logger } from '@stock-bot/utils'; +import { getLogger } from '@stock-bot/logger'; + +const logger = getLogger('JobController'); import { DataPipelineOrchestrator } from '../core/DataPipelineOrchestrator'; import { JobStatus } from '../types/DataPipeline'; diff --git a/apps/data-services/data-processor/src/controllers/PipelineController.ts b/apps/data-services/data-processor/src/controllers/PipelineController.ts index f899da7..fce48f4 100644 --- a/apps/data-services/data-processor/src/controllers/PipelineController.ts +++ b/apps/data-services/data-processor/src/controllers/PipelineController.ts @@ -1,8 +1,10 @@ import { Context } from 'hono'; -import { logger } from '@stock-bot/utils'; +import { getLogger } from '@stock-bot/logger'; import { DataPipelineOrchestrator } from '../core/DataPipelineOrchestrator'; import { DataPipeline, PipelineStatus } from '../types/DataPipeline'; +const logger = getLogger('pipeline-controller'); + export class PipelineController { constructor(private orchestrator: DataPipelineOrchestrator) {} diff --git a/apps/data-services/data-processor/src/core/DataPipelineOrchestrator.ts b/apps/data-services/data-processor/src/core/DataPipelineOrchestrator.ts index 6607429..03921c7 100644 --- a/apps/data-services/data-processor/src/core/DataPipelineOrchestrator.ts +++ b/apps/data-services/data-processor/src/core/DataPipelineOrchestrator.ts @@ -1,5 +1,6 @@ -import { EventBus } from '@stock-bot/event-bus'; -import { logger } from '@stock-bot/utils'; +import { EventBus, EventBusConfig } from '@stock-bot/event-bus'; +import { DataPipelineEvent, DataJobEvent } from '@stock-bot/types'; +import { getLogger } from '@stock-bot/logger'; import { DataPipeline, PipelineStatus, PipelineJob, JobStatus } from '../types/DataPipeline'; import { DataIngestionService } from '../services/DataIngestionService'; import { DataTransformationService } from '../services/DataTransformationService'; @@ -8,34 +9,39 @@ import { DataQualityService } from '../services/DataQualityService'; import { PipelineScheduler } from './PipelineScheduler'; import { JobQueue } from './JobQueue'; +const logger = getLogger('data-pipeline-orchestrator'); + export class DataPipelineOrchestrator { private eventBus: EventBus; private scheduler: PipelineScheduler; private jobQueue: JobQueue; private pipelines: Map = new Map(); private runningJobs: Map = new Map(); - constructor( private ingestionService: DataIngestionService, private transformationService: DataTransformationService, private validationService: DataValidationService, private qualityService: DataQualityService ) { - this.eventBus = new EventBus(); + const eventBusConfig: EventBusConfig = { + redisHost: process.env.REDIS_HOST || 'localhost', + redisPort: parseInt(process.env.REDIS_PORT || '6379'), + redisPassword: process.env.REDIS_PASSWORD + }; + this.eventBus = new EventBus(eventBusConfig); this.scheduler = new PipelineScheduler(this); this.jobQueue = new JobQueue(this); } - async initialize(): Promise { logger.info('🔄 Initializing Data Pipeline Orchestrator...'); - await this.eventBus.initialize(); + // EventBus doesn't have initialize method, it connects automatically await this.scheduler.initialize(); await this.jobQueue.initialize(); // Subscribe to pipeline events - await this.eventBus.subscribe('data.pipeline.*', this.handlePipelineEvent.bind(this)); - await this.eventBus.subscribe('data.job.*', this.handleJobEvent.bind(this)); + this.eventBus.subscribe('data.pipeline.*', this.handlePipelineEvent.bind(this)); + this.eventBus.subscribe('data.job.*', this.handleJobEvent.bind(this)); // Load existing pipelines await this.loadPipelines(); @@ -50,14 +56,14 @@ export class DataPipelineOrchestrator { status: PipelineStatus.DRAFT, createdAt: new Date(), updatedAt: new Date(), - }; - - this.pipelines.set(pipelineWithId.id, pipelineWithId); + }; this.pipelines.set(pipelineWithId.id, pipelineWithId); await this.eventBus.publish('data.pipeline.created', { + type: 'PIPELINE_CREATED', pipelineId: pipelineWithId.id, - pipeline: pipelineWithId, - }); + pipelineName: pipelineWithId.name, + timestamp: new Date() + } as DataPipelineEvent); logger.info(`📋 Created pipeline: ${pipelineWithId.name} (${pipelineWithId.id})`); return pipelineWithId; @@ -91,15 +97,15 @@ export class DataPipelineOrchestrator { }; this.runningJobs.set(job.id, job); - - // Queue the job for execution + // Queue the job for execution await this.jobQueue.enqueueJob(job); await this.eventBus.publish('data.job.queued', { + type: 'JOB_STARTED', jobId: job.id, pipelineId, - job, - }); + timestamp: new Date() + } as DataJobEvent); logger.info(`🚀 Queued pipeline job: ${job.id} for pipeline: ${pipeline.name}`); return job; @@ -111,15 +117,15 @@ export class DataPipelineOrchestrator { throw new Error(`Pipeline not found: ${job.pipelineId}`); } - const startTime = Date.now(); - job.status = JobStatus.RUNNING; + const startTime = Date.now(); job.status = JobStatus.RUNNING; job.startedAt = new Date(); await this.eventBus.publish('data.job.started', { + type: 'JOB_STARTED', jobId: job.id, pipelineId: job.pipelineId, - job, - }); + timestamp: new Date() + } as DataJobEvent); try { logger.info(`⚙️ Executing pipeline job: ${job.id}`); @@ -131,30 +137,30 @@ export class DataPipelineOrchestrator { await this.executeQualityChecks(pipeline, job); // Complete the job - job.status = JobStatus.COMPLETED; - job.completedAt = new Date(); + job.status = JobStatus.COMPLETED; job.completedAt = new Date(); job.metrics.processingTimeMs = Date.now() - startTime; await this.eventBus.publish('data.job.completed', { + type: 'JOB_COMPLETED', jobId: job.id, pipelineId: job.pipelineId, - job, - }); + timestamp: new Date() + } as DataJobEvent); logger.info(`✅ Pipeline job completed: ${job.id} in ${job.metrics.processingTimeMs}ms`); } catch (error) { job.status = JobStatus.FAILED; - job.completedAt = new Date(); - job.error = error instanceof Error ? error.message : 'Unknown error'; + job.completedAt = new Date(); job.error = error instanceof Error ? error.message : 'Unknown error'; job.metrics.processingTimeMs = Date.now() - startTime; await this.eventBus.publish('data.job.failed', { + type: 'JOB_FAILED', jobId: job.id, pipelineId: job.pipelineId, - job, error: job.error, - }); + timestamp: new Date() + } as DataJobEvent); logger.error(`❌ Pipeline job failed: ${job.id}`, error); throw error; @@ -228,14 +234,15 @@ export class DataPipelineOrchestrator { pipeline.schedule = { cronExpression, enabled: true, - lastRun: null, - nextRun: this.scheduler.getNextRunTime(cronExpression), + lastRun: null, nextRun: this.scheduler.getNextRunTime(cronExpression), }; await this.eventBus.publish('data.pipeline.scheduled', { + type: 'PIPELINE_STARTED', pipelineId, - cronExpression, - }); + pipelineName: pipeline.name, + timestamp: new Date() + } as DataPipelineEvent); logger.info(`📅 Scheduled pipeline: ${pipeline.name} with cron: ${cronExpression}`); } @@ -280,13 +287,12 @@ export class DataPipelineOrchestrator { private generateJobId(): string { return `job_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; } - async shutdown(): Promise { logger.info('🔄 Shutting down Data Pipeline Orchestrator...'); await this.scheduler.shutdown(); await this.jobQueue.shutdown(); - await this.eventBus.disconnect(); + await this.eventBus.close(); logger.info('✅ Data Pipeline Orchestrator shutdown complete'); } diff --git a/apps/data-services/data-processor/src/core/JobQueue.ts b/apps/data-services/data-processor/src/core/JobQueue.ts index c84ebb3..672da25 100644 --- a/apps/data-services/data-processor/src/core/JobQueue.ts +++ b/apps/data-services/data-processor/src/core/JobQueue.ts @@ -1,8 +1,10 @@ import Queue from 'bull'; -import { logger } from '@stock-bot/utils'; +import { getLogger } from '@stock-bot/logger'; import { PipelineJob } from '../types/DataPipeline'; import { DataPipelineOrchestrator } from './DataPipelineOrchestrator'; +const logger = getLogger('job-queue'); + export class JobQueue { private queue: Queue.Queue; diff --git a/apps/data-services/data-processor/src/core/PipelineScheduler.ts b/apps/data-services/data-processor/src/core/PipelineScheduler.ts index b0e62b5..99c373c 100644 --- a/apps/data-services/data-processor/src/core/PipelineScheduler.ts +++ b/apps/data-services/data-processor/src/core/PipelineScheduler.ts @@ -1,7 +1,9 @@ import { CronJob } from 'cron'; -import { logger } from '@stock-bot/utils'; +import { getLogger } from '@stock-bot/logger'; import { DataPipelineOrchestrator } from './DataPipelineOrchestrator'; +const logger = getLogger('pipeline-scheduler'); + export class PipelineScheduler { private scheduledJobs: Map = new Map(); @@ -45,10 +47,9 @@ export class PipelineScheduler { logger.info(`🚫 Cancelled schedule for pipeline: ${pipelineId}`); } } - getNextRunTime(cronExpression: string): Date { - const job = new CronJob(cronExpression); - return job.nextDate().toDate(); + const job = new CronJob(cronExpression, () => {}, null, false); + return job.nextDate().toJSDate(); } getScheduledPipelines(): string[] { diff --git a/apps/data-services/data-processor/src/services/DataIngestionService.ts b/apps/data-services/data-processor/src/services/DataIngestionService.ts index 3b3e57d..c924142 100644 --- a/apps/data-services/data-processor/src/services/DataIngestionService.ts +++ b/apps/data-services/data-processor/src/services/DataIngestionService.ts @@ -1,7 +1,9 @@ -import { logger } from '@stock-bot/utils'; +import { getLogger } from '@stock-bot/logger'; + +const logger = getLogger('DataIngestionService'); import { IngestionStep, ProcessingResult, DataSource } from '../types/DataPipeline'; import axios from 'axios'; -import * as csv from 'csv-parser'; +import csv from 'csv-parser'; import * as fs from 'fs'; export class DataIngestionService { @@ -112,11 +114,9 @@ export class DataIngestionService { return new Promise((resolve, reject) => { const records: any[] = []; const errors: any[] = []; - let recordCount = 0; - - fs.createReadStream(filePath) + let recordCount = 0; fs.createReadStream(filePath) .pipe(csv()) - .on('data', (data) => { + .on('data', (data: any) => { recordCount++; try { records.push(data); diff --git a/apps/data-services/data-processor/src/services/DataQualityService.ts b/apps/data-services/data-processor/src/services/DataQualityService.ts index af3b125..1d35a14 100644 --- a/apps/data-services/data-processor/src/services/DataQualityService.ts +++ b/apps/data-services/data-processor/src/services/DataQualityService.ts @@ -1,4 +1,6 @@ -import { logger } from '@stock-bot/utils'; +import { getLogger } from '@stock-bot/logger'; + +const logger = getLogger('DataQualityService'); import { QualityCheckStep, ProcessingResult, QualityCheck, QualityThresholds } from '../types/DataPipeline'; export class DataQualityService { @@ -277,11 +279,12 @@ export class DataQualityService { private storeQualityMetrics(metrics: any): void { const key = `metrics_${Date.now()}`; this.qualityMetrics.set(key, metrics); - - // Keep only last 100 metrics + // Keep only last 100 metrics if (this.qualityMetrics.size > 100) { const oldestKey = this.qualityMetrics.keys().next().value; - this.qualityMetrics.delete(oldestKey); + if (oldestKey) { + this.qualityMetrics.delete(oldestKey); + } } } diff --git a/apps/data-services/data-processor/src/services/DataTransformationService.ts b/apps/data-services/data-processor/src/services/DataTransformationService.ts index 9eb5c48..59eb97d 100644 --- a/apps/data-services/data-processor/src/services/DataTransformationService.ts +++ b/apps/data-services/data-processor/src/services/DataTransformationService.ts @@ -1,4 +1,6 @@ -import { logger } from '@stock-bot/utils'; +import { getLogger } from '@stock-bot/logger'; + +const logger = getLogger('DataTransformationService'); import { TransformationStep, ProcessingResult } from '../types/DataPipeline'; export class DataTransformationService { @@ -163,21 +165,22 @@ export class DataTransformationService { } acc[key].push(record); return acc; - }, {}); + }, {} as Record); - const aggregated = Object.entries(grouped).map(([key, records]: [string, any[]]) => { + const aggregated = Object.entries(grouped).map(([key, records]) => { + const recordsArray = records as any[]; const result: any = { [groupBy]: key }; if (aggregations.includes('avg')) { - result.avgPrice = records.reduce((sum, r) => sum + (r.price || 0), 0) / records.length; + result.avgPrice = recordsArray.reduce((sum: number, r: any) => sum + (r.price || 0), 0) / recordsArray.length; } if (aggregations.includes('sum')) { - result.totalVolume = records.reduce((sum, r) => sum + (r.volume || 0), 0); + result.totalVolume = recordsArray.reduce((sum: number, r: any) => sum + (r.volume || 0), 0); } if (aggregations.includes('count')) { - result.count = records.length; + result.count = recordsArray.length; } return result; diff --git a/apps/data-services/data-processor/src/services/DataValidationService.ts b/apps/data-services/data-processor/src/services/DataValidationService.ts index dc5090f..29c009e 100644 --- a/apps/data-services/data-processor/src/services/DataValidationService.ts +++ b/apps/data-services/data-processor/src/services/DataValidationService.ts @@ -1,4 +1,6 @@ -import { logger } from '@stock-bot/utils'; +import { getLogger } from '@stock-bot/logger'; + +const logger = getLogger('DataValidationService'); import { ValidationStep, ProcessingResult, ValidationRule } from '../types/DataPipeline'; import Joi from 'joi'; diff --git a/libs/types/src/events/events.ts b/libs/types/src/events/events.ts index b0be0fc..969ad52 100644 --- a/libs/types/src/events/events.ts +++ b/libs/types/src/events/events.ts @@ -38,5 +38,23 @@ export interface SystemEvent { timestamp: Date; } +// Data Processing Events +export interface DataPipelineEvent { + type: 'PIPELINE_CREATED' | 'PIPELINE_STARTED' | 'PIPELINE_COMPLETED' | 'PIPELINE_FAILED'; + pipelineId: string; + pipelineName?: string; + timestamp: Date; +} + +export interface DataJobEvent { + type: 'JOB_STARTED' | 'JOB_COMPLETED' | 'JOB_FAILED' | 'JOB_PROGRESS'; + jobId: string; + pipelineId?: string; + progress?: number; + error?: string; + timestamp: Date; +} + export type TradingEvent = MarketDataEvent | OrderEvent | SignalEvent; -export type Event = TradingEvent | RiskAlertEvent | SystemEvent; +export type DataEvent = DataPipelineEvent | DataJobEvent; +export type Event = TradingEvent | RiskAlertEvent | SystemEvent | DataEvent;