diff --git a/.vscode/tasks.json b/.vscode/tasks.json index b76deee..e0861e3 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -2,98 +2,18 @@ "version": "2.0.0", "tasks": [ { - "label": "Start Market Data Gateway", + "label": "Start Data Service", "type": "shell", "command": "bun", - "args": ["run", "dev"], - "options": { - "cwd": "${workspaceFolder}/apps/core-services/market-data-gateway" - }, - "group": "build", - "presentation": { - "echo": true, - "reveal": "always", - "focus": false, - "panel": "new", - "showReuseMessage": true, - "clear": false - }, - "isBackground": true, - "problemMatcher": [] - }, - { - "label": "Start Trading Dashboard", - "type": "shell", - "command": "bun", - "args": ["run", "dev"], - "options": { - "cwd": "${workspaceFolder}/apps/interface-services/trading-dashboard" - }, - "group": "build", - "presentation": { - "echo": true, - "reveal": "always", - "focus": false, - "panel": "new", - "showReuseMessage": true, - "clear": false - }, - "isBackground": true, - "problemMatcher": [] - }, - { - "label": "Start Strategy Orchestrator", - "type": "shell", - "command": "bun", - "args": ["run", "dev"], - "options": { - "cwd": "${workspaceFolder}/apps/intelligence-services/strategy-orchestrator" - }, - "group": "build", - "presentation": { - "echo": true, - "reveal": "always", - "focus": false, - "panel": "new", - "showReuseMessage": true, - "clear": false - }, - "isBackground": true, - "problemMatcher": [] - }, - { - "label": "Start Risk Guardian", - "type": "shell", - "command": "bun", - "args": ["run", "dev"], - "options": { - "cwd": "${workspaceFolder}/apps/core-services/risk-guardian" - }, - "group": "build", - "presentation": { - "echo": true, - "reveal": "always", - "focus": false, - "panel": "new", - "showReuseMessage": true, - "clear": false - }, - "isBackground": true, - "problemMatcher": [] - }, - { - "label": "Start All Services", - "dependsOn": [ - "Start Market Data Gateway", - "Start Trading Dashboard", - "Start Strategy Orchestrator", - "Start Risk Guardian" + "args": [ + "run", + "dev" ], - "dependsOrder": "parallel", - "group": { - "kind": "build", - "isDefault": true - } + "group": "build", + "isBackground": true, + "problemMatcher": [ + "$tsc" + ] } ] } diff --git a/apps/data-service/src/graceful-shutdown-test.ts b/apps/data-service/src/graceful-shutdown-test.ts deleted file mode 100644 index e4dfd28..0000000 --- a/apps/data-service/src/graceful-shutdown-test.ts +++ /dev/null @@ -1,65 +0,0 @@ -import { getLogger, shutdownLoggers } from '@stock-bot/logger'; -import { onShutdown, setShutdownTimeout, initiateShutdown } from '@stock-bot/shutdown'; - -const logger = getLogger('shutdown-test'); - -logger.info('🚀 Starting graceful shutdown test...'); - -// Configure shutdown -setShutdownTimeout(10000); // 10 seconds - -// Register multiple shutdown handlers -onShutdown(async () => { - logger.info('🔧 Shutdown handler 1: Cleaning up resources...'); - await new Promise(resolve => setTimeout(resolve, 1000)); - logger.info('✅ Shutdown handler 1 completed'); -}); - -onShutdown(async () => { - logger.info('🔧 Shutdown handler 2: Closing connections...'); - await new Promise(resolve => setTimeout(resolve, 500)); - logger.info('✅ Shutdown handler 2 completed'); -}); - -onShutdown(() => { - logger.info('🔧 Shutdown handler 3: Final cleanup (sync)'); - logger.info('✅ Shutdown handler 3 completed'); -}); - -onShutdown(async () => { - logger.info('🔧 Shutdown handler 4: Logger cleanup'); - try { - await shutdownLoggers(); - console.log('✅ Logger shutdown completed'); - } catch (error) { - console.error('❌ Logger shutdown failed:', error); - } -}); - -// Simulate some work -let counter = 0; -const workInterval = setInterval(() => { - counter++; - logger.info(`🔄 Working... ${counter}`); - - if (counter >= 5) { - logger.info('🎯 Work completed, triggering graceful shutdown in 2 seconds...'); - setTimeout(async () => { - logger.info('📡 Initiating manual graceful shutdown...'); - try { - await initiateShutdown(); - } catch (error) { - logger.error('Manual shutdown failed', error); - process.exit(1); - } - }, 2000); - clearInterval(workInterval); - } -}, 1000); - -// Log process info -logger.info('📊 Process info', { - pid: process.pid, - platform: process.platform, - node: process.version -}); diff --git a/apps/data-service/src/index.ts b/apps/data-service/src/index.ts index 24e0eb6..0301cf9 100644 --- a/apps/data-service/src/index.ts +++ b/apps/data-service/src/index.ts @@ -1,10 +1,13 @@ /** - * Data Service - Combined live and historical data ingestion + * Data Service - Combined live and historical data ingestion with queue-based architecture */ import { getLogger } from '@stock-bot/logger'; import { loadEnvVariables } from '@stock-bot/config'; import { Hono } from 'hono'; import { serve } from '@hono/node-server'; +import { queueManager } from './services/queue-manager.service'; +import { proxyService } from './services/proxy.service'; +import { marketDataProvider } from './providers/market-data.provider'; // Load environment variables loadEnvVariables(); @@ -18,20 +21,48 @@ app.get('/health', (c) => { return c.json({ service: 'data-service', status: 'healthy', - timestamp: new Date().toISOString() + timestamp: new Date().toISOString(), + queue: { + status: 'running', + workers: queueManager.getWorkerCount() + } }); }); -// API routes +// Queue management endpoints +app.get('/api/queue/status', async (c) => { + try { + const status = await queueManager.getQueueStatus(); + return c.json({ status: 'success', data: status }); + } catch (error) { + logger.error('Failed to get queue status', { error }); + return c.json({ status: 'error', message: 'Failed to get queue status' }, 500); + } +}); + +app.post('/api/queue/job', async (c) => { + try { + const jobData = await c.req.json(); + const job = await queueManager.addJob(jobData); + return c.json({ status: 'success', jobId: job.id }); + } catch (error) { + logger.error('Failed to add job', { error }); + return c.json({ status: 'error', message: 'Failed to add job' }, 500); + } +}); + +// Market data endpoints app.get('/api/live/:symbol', async (c) => { const symbol = c.req.param('symbol'); logger.info('Live data request', { symbol }); - // TODO: Implement live data fetching - return c.json({ - symbol, - message: 'Live data endpoint - not implemented yet' - }); + try { + const data = await marketDataProvider.getLiveData(symbol); + return c.json({ status: 'success', symbol, data }); + } catch (error) { + logger.error('Failed to get live data', { symbol, error }); + return c.json({ status: 'error', message: 'Failed to get live data' }, 500); + } }); app.get('/api/historical/:symbol', async (c) => { @@ -41,19 +72,89 @@ app.get('/api/historical/:symbol', async (c) => { logger.info('Historical data request', { symbol, from, to }); - // TODO: Implement historical data fetching - return c.json({ - symbol, - from, - to, - message: 'Historical data endpoint - not implemented yet' - }); + try { + const fromDate = from ? new Date(from) : new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); // 30 days ago + const toDate = to ? new Date(to) : new Date(); // Now + + const data = await marketDataProvider.getHistoricalData(symbol, fromDate, toDate); + return c.json({ status: 'success', symbol, from, to, data }); + } catch (error) { + logger.error('Failed to get historical data', { symbol, from, to, error }); + return c.json({ status: 'error', message: 'Failed to get historical data' }, 500); + } }); +// Proxy management endpoints +app.post('/api/proxy/fetch', async (c) => { + try { + const jobId = await proxyService.queueProxyFetch(); + return c.json({ status: 'success', jobId, message: 'Proxy fetch job queued' }); + } catch (error) { + logger.error('Failed to queue proxy fetch', { error }); + return c.json({ status: 'error', message: 'Failed to queue proxy fetch' }, 500); + } +}); + +app.post('/api/proxy/check', async (c) => { + try { + const { proxies } = await c.req.json(); + const jobId = await proxyService.queueProxyCheck(proxies); + return c.json({ status: 'success', jobId, message: 'Proxy check job queued' }); + } catch (error) { + logger.error('Failed to queue proxy check', { error }); + return c.json({ status: 'error', message: 'Failed to queue proxy check' }, 500); + } +}); + +// Provider registry endpoints +app.get('/api/providers', async (c) => { + try { + const providers = queueManager.getRegisteredProviders(); + return c.json({ status: 'success', providers }); + } catch (error) { + logger.error('Failed to get providers', { error }); + return c.json({ status: 'error', message: 'Failed to get providers' }, 500); + } +}); + +// Initialize services +async function initializeServices() { + logger.info('Initializing data service...'); + + try { + // Queue manager is initialized automatically when imported + logger.info('Queue manager initialized'); + + // Initialize providers + logger.info('All services initialized successfully'); + } catch (error) { + logger.error('Failed to initialize services', { error }); + process.exit(1); + } +} + // Start server -serve({ - fetch: app.fetch, - port: PORT, -}); +async function startServer() { + await initializeServices(); + + serve({ + fetch: app.fetch, + port: PORT, + }); -logger.info(`Data Service started on port ${PORT}`); + logger.info(`Data Service started on port ${PORT}`); + logger.info('Available endpoints:'); + logger.info(' GET /health - Health check'); + logger.info(' GET /api/queue/status - Queue status'); + logger.info(' POST /api/queue/job - Add job to queue'); + logger.info(' GET /api/live/:symbol - Live market data'); + logger.info(' GET /api/historical/:symbol - Historical market data'); + logger.info(' POST /api/proxy/fetch - Queue proxy fetch'); + logger.info(' POST /api/proxy/check - Queue proxy check'); + logger.info(' GET /api/providers - List registered providers'); +} + +startServer().catch(error => { + logger.error('Failed to start server', { error }); + process.exit(1); +}); diff --git a/apps/data-service/src/providers/market-data.provider.ts b/apps/data-service/src/providers/market-data.provider.ts new file mode 100644 index 0000000..6c0a2d0 --- /dev/null +++ b/apps/data-service/src/providers/market-data.provider.ts @@ -0,0 +1,179 @@ +import { Logger } from '@stock-bot/logger'; +import { HttpClient } from '@stock-bot/http'; +import createCache, { type CacheProvider } from '@stock-bot/cache'; + +export interface MarketDataResponse { + symbol: string; + price: number; + timestamp: Date; + volume?: number; + change?: number; + open?: number; + high?: number; + low?: number; + close?: number; +} + +export class MarketDataProvider { + private logger = new Logger('market-data-provider'); + private httpClient: HttpClient; + private cache: CacheProvider = createCache('hybrid'); + private readonly CACHE_TTL = 60; // 1 minute + + constructor() { + this.httpClient = new HttpClient({ + timeout: 10000, + }, this.logger); + } + async getLiveData(symbol: string): Promise { + const cacheKey = `market-data:${symbol}`; + + try { + // Check cache first + const cached = await this.cache.get(cacheKey) as MarketDataResponse | null; + if (cached) { + this.logger.debug('Returning cached market data', { symbol }); + return cached; + } + + // Generate simulated data for demo + const data = this.generateSimulatedData(symbol); + + // Cache the result + await this.cache.set(cacheKey, data, this.CACHE_TTL); + + this.logger.info('Generated live market data', { symbol, price: data.price }); + return data; + } catch (error) { + this.logger.error('Error fetching market data', { symbol, error }); + throw error; + } + } + async getHistoricalData(symbol: string, from: Date, to: Date, interval: string = '1m'): Promise { + const cacheKey = `historical:${symbol}:${from.toISOString()}:${to.toISOString()}:${interval}`; + + try { + const cached = await this.cache.get(cacheKey) as MarketDataResponse[] | null; + if (cached) { + this.logger.debug('Returning cached historical data', { symbol, from, to }); + return cached; + } + + // Generate simulated historical data + const data = this.generateHistoricalData(symbol, from, to, interval); + + // Cache for longer time (1 hour) + await this.cache.set(cacheKey, data, 3600); + + this.logger.info('Generated historical market data', { symbol, from, to, count: data.length }); + return data; + } catch (error) { + this.logger.error('Error fetching historical data', { symbol, error }); + throw error; + } + } + + private generateSimulatedData(symbol: string): MarketDataResponse { + // Base prices for different symbols + const basePrices: { [key: string]: number } = { + 'AAPL': 150, + 'GOOGL': 2500, + 'MSFT': 300, + 'TSLA': 200, + 'AMZN': 3000, + 'NVDA': 400, + 'META': 250, + 'NFLX': 400 + }; + + const basePrice = basePrices[symbol] || 100; + + // Add some randomness (+/- 2%) + const variation = (Math.random() - 0.5) * 0.04; // ±2% + const price = basePrice * (1 + variation); + const change = variation * basePrice; + + return { + symbol, + price: Math.round(price * 100) / 100, + timestamp: new Date(), + volume: Math.floor(Math.random() * 1000000) + 500000, + change: Math.round(change * 100) / 100, + open: Math.round((price - change) * 100) / 100, + high: Math.round((price + Math.abs(change * 0.5)) * 100) / 100, + low: Math.round((price - Math.abs(change * 0.5)) * 100) / 100, + close: Math.round(price * 100) / 100 + }; + } + + private generateHistoricalData(symbol: string, from: Date, to: Date, interval: string): MarketDataResponse[] { + const data: MarketDataResponse[] = []; + const intervalMs = this.parseInterval(interval); + + let currentTime = new Date(from); + const endTime = new Date(to); + + // Base price for the symbol + const basePrices: { [key: string]: number } = { + 'AAPL': 150, + 'GOOGL': 2500, + 'MSFT': 300, + 'TSLA': 200, + 'AMZN': 3000, + 'NVDA': 400, + 'META': 250, + 'NFLX': 400 + }; + + let basePrice = basePrices[symbol] || 100; + + while (currentTime <= endTime) { + // Add some trend and randomness + const trend = (Math.random() - 0.5) * 0.01; // Small trend + const variation = (Math.random() - 0.5) * 0.02; // Random variation + + basePrice = basePrice * (1 + trend + variation); + const change = basePrice * variation; + + data.push({ + symbol, + price: Math.round(basePrice * 100) / 100, + timestamp: new Date(currentTime), + volume: Math.floor(Math.random() * 1000000) + 500000, + change: Math.round(change * 100) / 100, + open: Math.round((basePrice - change) * 100) / 100, + high: Math.round((basePrice + Math.abs(change * 0.5)) * 100) / 100, + low: Math.round((basePrice - Math.abs(change * 0.5)) * 100) / 100, + close: Math.round(basePrice * 100) / 100 + }); + + currentTime = new Date(currentTime.getTime() + intervalMs); + } + + return data; + } + + private parseInterval(interval: string): number { + const value = parseInt(interval.slice(0, -1)); + const unit = interval.slice(-1).toLowerCase(); + + switch (unit) { + case 's': return value * 1000; + case 'm': return value * 60 * 1000; + case 'h': return value * 60 * 60 * 1000; + case 'd': return value * 24 * 60 * 60 * 1000; + default: return 60 * 1000; // Default to 1 minute + } + } + + async clearCache(): Promise { + this.logger.info('Clearing market data cache'); + // Note: Cache provider limitations - would need proper key tracking + } + + async shutdown(): Promise { + this.logger.info('Shutting down MarketDataProvider'); + } +} + +export const marketDataProvider = new MarketDataProvider(); diff --git a/apps/data-service/src/services/proxy.service.ts b/apps/data-service/src/services/proxy.service.ts index d61b594..ae0edad 100644 --- a/apps/data-service/src/services/proxy.service.ts +++ b/apps/data-service/src/services/proxy.service.ts @@ -72,15 +72,47 @@ export class ProxyService { this.logger.info('ProxyService initialized'); } - - private async initializeScheduling() { + private async initializeScheduling() { try { - await queueService.scheduleRecurringTasks(); - this.logger.info('Proxy scheduling initialized'); + // Queue manager will handle scheduling + this.logger.info('Proxy scheduling will be handled by queue manager'); } catch (error) { this.logger.error('Failed to initialize scheduling', { error }); } } + + // Add queue integration methods + async queueProxyFetch(): Promise { + const { queueManager } = await import('./queue-manager.service'); + const job = await queueManager.addJob({ + type: 'proxy-fetch', + service: 'proxy', + provider: 'proxy-service', + operation: 'fetch-and-check', + payload: {}, + priority: 5 + }); + + const jobId = job.id || 'unknown'; + this.logger.info('Proxy fetch job queued', { jobId }); + return jobId; + } + + async queueProxyCheck(proxies: ProxyInfo[]): Promise { + const { queueManager } = await import('./queue-manager.service'); + const job = await queueManager.addJob({ + type: 'proxy-check', + service: 'proxy', + provider: 'proxy-service', + operation: 'check-specific', + payload: { proxies }, + priority: 3 + }); + + const jobId = job.id || 'unknown'; + this.logger.info('Proxy check job queued', { jobId, count: proxies.length }); + return jobId; + } async fetchProxiesFromSources() : Promise { diff --git a/apps/data-service/src/services/queue-manager.service.ts b/apps/data-service/src/services/queue-manager.service.ts new file mode 100644 index 0000000..6ee2f38 --- /dev/null +++ b/apps/data-service/src/services/queue-manager.service.ts @@ -0,0 +1,186 @@ +import { Queue, Worker, QueueEvents } from 'bullmq'; +import { Logger } from '@stock-bot/logger'; + +export interface JobData { + type: 'proxy-fetch' | 'proxy-check' | 'market-data' | 'historical-data'; + service: 'proxy' | 'market-data' | 'analytics'; + provider: string; + operation: string; + payload: any; + priority?: number; +} + +export class QueueManagerService { + private logger = new Logger('queue-manager'); + private queue: Queue; + private worker: Worker; + private queueEvents: QueueEvents; + + constructor() { + const connection = { + host: process.env.DRAGONFLY_HOST || 'localhost', + port: parseInt(process.env.DRAGONFLY_PORT || '6379'), + }; + + this.queue = new Queue('data-service-queue', { connection }); + this.worker = new Worker('data-service-queue', this.processJob.bind(this), { + connection, + concurrency: 10 + }); + this.queueEvents = new QueueEvents('data-service-queue', { connection }); + + this.setupEventListeners(); + this.setupScheduledTasks(); + } + + private async processJob(job: any) { + const { type, service, provider, operation, payload }: JobData = job.data; + + this.logger.info('Processing job', { id: job.id, type, service, provider, operation }); + + try { + switch (type) { + case 'proxy-fetch': + return await this.handleProxyFetch(payload); + case 'proxy-check': + return await this.handleProxyCheck(payload); + case 'market-data': + return await this.handleMarketData(payload); + case 'historical-data': + return await this.handleHistoricalData(payload); + default: + throw new Error(`Unknown job type: ${type}`); + } + } catch (error) { + this.logger.error('Job failed', { id: job.id, type, error }); + throw error; + } + } + + private async handleProxyFetch(payload: any) { + const { proxyService } = await import('./proxy.service'); + return await proxyService.fetchProxiesFromSources(); + } + + private async handleProxyCheck(payload: { proxies: any[] }) { + const { proxyService } = await import('./proxy.service'); + return await proxyService.checkProxies(payload.proxies); + } + private async handleMarketData(payload: { symbol: string }) { + const { marketDataProvider } = await import('../providers/market-data.provider.js'); + return await marketDataProvider.getLiveData(payload.symbol); + } + + private async handleHistoricalData(payload: { symbol: string; from: Date; to: Date; interval: string }) { + const { marketDataProvider } = await import('../providers/market-data.provider.js'); + return await marketDataProvider.getHistoricalData(payload.symbol, payload.from, payload.to, payload.interval); + } + + private setupEventListeners() { + this.queueEvents.on('completed', (job) => { + this.logger.info('Job completed', { id: job.jobId }); + }); + + this.queueEvents.on('failed', (job) => { + this.logger.error('Job failed', { id: job.jobId, error: job.failedReason }); + }); + + this.worker.on('progress', (job, progress) => { + this.logger.debug('Job progress', { id: job.id, progress }); + }); + } + + private setupScheduledTasks() { + // Market data refresh every minute + this.addRecurringJob({ + type: 'market-data', + service: 'market-data', + provider: 'unified-data', + operation: 'refresh-cache', + payload: { symbols: ['AAPL', 'GOOGL', 'MSFT'] } + }, '*/1 * * * *'); + + // Proxy check every 15 minutes + this.addRecurringJob({ + type: 'proxy-fetch', + service: 'proxy', + provider: 'proxy-service', + operation: 'fetch-and-check', + payload: {} + }, '*/15 * * * *'); + + this.logger.info('Scheduled tasks configured'); + } + + async addJob(jobData: JobData, options?: any) { + return this.queue.add(jobData.type, jobData, { + priority: jobData.priority || 0, + removeOnComplete: 10, + removeOnFail: 5, + ...options + }); + } + + async addRecurringJob(jobData: JobData, cronPattern: string) { + return this.queue.add( + `recurring-${jobData.type}`, + jobData, + { + repeat: { pattern: cronPattern }, + removeOnComplete: 1, + removeOnFail: 1, + jobId: `recurring-${jobData.service}-${jobData.provider}-${jobData.operation}` + } + ); + } + + async getJobStats() { + const [waiting, active, completed, failed, delayed] = await Promise.all([ + this.queue.getWaiting(), + this.queue.getActive(), + this.queue.getCompleted(), + this.queue.getFailed(), + this.queue.getDelayed() + ]); + + return { + waiting: waiting.length, + active: active.length, + completed: completed.length, + failed: failed.length, + delayed: delayed.length + }; + } + async getQueueStatus() { + const stats = await this.getJobStats(); + return { + ...stats, + workers: this.getWorkerCount(), + queue: this.queue.name, + connection: { + host: process.env.DRAGONFLY_HOST || 'localhost', + port: parseInt(process.env.DRAGONFLY_PORT || '6379') + } + }; + } + + getWorkerCount() { + return this.worker.opts.concurrency || 1; + } + + getRegisteredProviders() { + return [ + { name: 'proxy-service', type: 'proxy', operations: ['fetch-and-check', 'check-specific'] }, + { name: 'market-data-provider', type: 'market-data', operations: ['live-data', 'historical-data'] } + ]; + } + + async shutdown() { + this.logger.info('Shutting down queue manager'); + await this.worker.close(); + await this.queue.close(); + await this.queueEvents.close(); + } +} + +export const queueManager = new QueueManagerService();