/** * Data Service - Combined live and historical data ingestion with queue-based architecture */ import { getLogger } from '@stock-bot/logger'; import { loadEnvVariables } from '@stock-bot/config'; import { Hono } from 'hono'; import { serve } from '@hono/node-server'; import { queueManager } from './services/queue-manager.service'; import { proxyService } from './services/proxy.service'; import { marketDataProvider } from './providers/market-data.provider'; // Load environment variables loadEnvVariables(); const app = new Hono(); const logger = getLogger('data-service'); const PORT = parseInt(process.env.DATA_SERVICE_PORT || '3002'); // Health check endpoint app.get('/health', (c) => { return c.json({ service: 'data-service', status: 'healthy', timestamp: new Date().toISOString(), queue: { status: 'running', workers: queueManager.getWorkerCount() } }); }); // Queue management endpoints app.get('/api/queue/status', async (c) => { try { const status = await queueManager.getQueueStatus(); return c.json({ status: 'success', data: status }); } catch (error) { logger.error('Failed to get queue status', { error }); return c.json({ status: 'error', message: 'Failed to get queue status' }, 500); } }); app.post('/api/queue/job', async (c) => { try { const jobData = await c.req.json(); const job = await queueManager.addJob(jobData); return c.json({ status: 'success', jobId: job.id }); } catch (error) { logger.error('Failed to add job', { error }); return c.json({ status: 'error', message: 'Failed to add job' }, 500); } }); // Market data endpoints app.get('/api/live/:symbol', async (c) => { const symbol = c.req.param('symbol'); logger.info('Live data request', { symbol }); try { const data = await marketDataProvider.getLiveData(symbol); return c.json({ status: 'success', symbol, data }); } catch (error) { logger.error('Failed to get live data', { symbol, error }); return c.json({ status: 'error', message: 'Failed to get live data' }, 500); } }); app.get('/api/historical/:symbol', async (c) => { const symbol = c.req.param('symbol'); const from = c.req.query('from'); const to = c.req.query('to'); logger.info('Historical data request', { symbol, from, to }); try { const fromDate = from ? new Date(from) : new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); // 30 days ago const toDate = to ? new Date(to) : new Date(); // Now const data = await marketDataProvider.getHistoricalData(symbol, fromDate, toDate); return c.json({ status: 'success', symbol, from, to, data }); } catch (error) { logger.error('Failed to get historical data', { symbol, from, to, error }); return c.json({ status: 'error', message: 'Failed to get historical data' }, 500); } }); // Proxy management endpoints app.post('/api/proxy/fetch', async (c) => { try { const jobId = await proxyService.queueProxyFetch(); return c.json({ status: 'success', jobId, message: 'Proxy fetch job queued' }); } catch (error) { logger.error('Failed to queue proxy fetch', { error }); return c.json({ status: 'error', message: 'Failed to queue proxy fetch' }, 500); } }); app.post('/api/proxy/check', async (c) => { try { const { proxies } = await c.req.json(); const jobId = await proxyService.queueProxyCheck(proxies); return c.json({ status: 'success', jobId, message: 'Proxy check job queued' }); } catch (error) { logger.error('Failed to queue proxy check', { error }); return c.json({ status: 'error', message: 'Failed to queue proxy check' }, 500); } }); // Provider registry endpoints app.get('/api/providers', async (c) => { try { const providers = queueManager.getRegisteredProviders(); return c.json({ status: 'success', providers }); } catch (error) { logger.error('Failed to get providers', { error }); return c.json({ status: 'error', message: 'Failed to get providers' }, 500); } }); // Initialize services async function initializeServices() { logger.info('Initializing data service...'); try { // Queue manager is initialized automatically when imported logger.info('Queue manager initialized'); // Initialize providers logger.info('All services initialized successfully'); } catch (error) { logger.error('Failed to initialize services', { error }); process.exit(1); } } // Start server async function startServer() { await initializeServices(); serve({ fetch: app.fetch, port: PORT, }); logger.info(`Data Service started on port ${PORT}`); logger.info('Available endpoints:'); logger.info(' GET /health - Health check'); logger.info(' GET /api/queue/status - Queue status'); logger.info(' POST /api/queue/job - Add job to queue'); logger.info(' GET /api/live/:symbol - Live market data'); logger.info(' GET /api/historical/:symbol - Historical market data'); logger.info(' POST /api/proxy/fetch - Queue proxy fetch'); logger.info(' POST /api/proxy/check - Queue proxy check'); logger.info(' GET /api/providers - List registered providers'); } startServer().catch(error => { logger.error('Failed to start server', { error }); process.exit(1); });