/** * Data Service - Combined live and historical data ingestion with queue-based architecture */ import { getLogger } from '@stock-bot/logger'; import { loadEnvVariables } from '@stock-bot/config'; import { Hono } from 'hono'; import { serve } from '@hono/node-server'; import { queueManager } from './services/queue.service'; // Load environment variables loadEnvVariables(); const app = new Hono(); const logger = getLogger('data-service'); const PORT = parseInt(process.env.DATA_SERVICE_PORT || '3002'); // Health check endpoint app.get('/health', (c) => { return c.json({ service: 'data-service', status: 'healthy', timestamp: new Date().toISOString(), queue: { status: 'running', workers: queueManager.getWorkerCount() } }); }); // Queue management endpoints app.get('/api/queue/status', async (c) => { try { const status = await queueManager.getQueueStatus(); return c.json({ status: 'success', data: status }); } catch (error) { logger.error('Failed to get queue status', { error }); return c.json({ status: 'error', message: 'Failed to get queue status' }, 500); } }); app.post('/api/queue/job', async (c) => { try { const jobData = await c.req.json(); const job = await queueManager.addJob(jobData); return c.json({ status: 'success', jobId: job.id }); } catch (error) { logger.error('Failed to add job', { error }); return c.json({ status: 'error', message: 'Failed to add job' }, 500); } }); // Market data endpoints app.get('/api/live/:symbol', async (c) => { const symbol = c.req.param('symbol'); logger.info('Live data request', { symbol }); try { // Queue job for live data using Yahoo provider const job = await queueManager.addJob({ type: 'market-data-live', service: 'market-data', provider: 'yahoo-finance', operation: 'live-data', payload: { symbol } }); return c.json({ status: 'success', message: 'Live data job queued', jobId: job.id, symbol }); } catch (error) { logger.error('Failed to queue live data job', { symbol, error }); return c.json({ status: 'error', message: 'Failed to queue live data job' }, 500); } }); app.get('/api/historical/:symbol', async (c) => { const symbol = c.req.param('symbol'); const from = c.req.query('from'); const to = c.req.query('to'); logger.info('Historical data request', { symbol, from, to }); try { const fromDate = from ? new Date(from) : new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); // 30 days ago const toDate = to ? new Date(to) : new Date(); // Now // Queue job for historical data using Yahoo provider const job = await queueManager.addJob({ type: 'market-data-historical', service: 'market-data', provider: 'yahoo-finance', operation: 'historical-data', payload: { symbol, from: fromDate.toISOString(), to: toDate.toISOString() } }); return c.json({ status: 'success', message: 'Historical data job queued', jobId: job.id, symbol, from: fromDate, to: toDate }); } catch (error) { logger.error('Failed to queue historical data job', { symbol, from, to, error }); return c.json({ status: 'error', message: 'Failed to queue historical data job' }, 500); } }); // Proxy management endpoints app.post('/api/proxy/fetch', async (c) => { try { const job = await queueManager.addJob({ type: 'proxy-fetch', service: 'proxy', provider: 'proxy-service', operation: 'fetch-and-check', payload: {}, priority: 5 }); return c.json({ status: 'success', jobId: job.id, message: 'Proxy fetch job queued' }); } catch (error) { logger.error('Failed to queue proxy fetch', { error }); return c.json({ status: 'error', message: 'Failed to queue proxy fetch' }, 500); } }); app.post('/api/proxy/check', async (c) => { try { const { proxies } = await c.req.json(); const job = await queueManager.addJob({ type: 'proxy-check', service: 'proxy', provider: 'proxy-service', operation: 'check-specific', payload: { proxies }, priority: 8 }); return c.json({ status: 'success', jobId: job.id, message: `Proxy check job queued for ${proxies.length} proxies` }); } catch (error) { logger.error('Failed to queue proxy check', { error }); return c.json({ status: 'error', message: 'Failed to queue proxy check' }, 500); } }); // Get proxy stats via queue app.get('/api/proxy/stats', async (c) => { try { const job = await queueManager.addJob({ type: 'proxy-stats', service: 'proxy', provider: 'proxy-service', operation: 'get-stats', payload: {}, priority: 3 }); return c.json({ status: 'success', jobId: job.id, message: 'Proxy stats job queued' }); } catch (error) { logger.error('Failed to queue proxy stats', { error }); return c.json({ status: 'error', message: 'Failed to queue proxy stats' }, 500); } }); // Provider registry endpoints app.get('/api/providers', async (c) => { try { const providers = queueManager.getRegisteredProviders(); return c.json({ status: 'success', providers }); } catch (error) { logger.error('Failed to get providers', { error }); return c.json({ status: 'error', message: 'Failed to get providers' }, 500); } }); // Add new endpoint to see scheduled jobs app.get('/api/scheduled-jobs', async (c) => { try { const jobs = queueManager.getScheduledJobsInfo(); return c.json({ status: 'success', count: jobs.length, jobs }); } catch (error) { logger.error('Failed to get scheduled jobs info', { error }); return c.json({ status: 'error', message: 'Failed to get scheduled jobs' }, 500); } }); // Initialize services async function initializeServices() { logger.info('Initializing data service...'); try { // Initialize queue service (Redis connections should be ready now) logger.info('Starting queue service initialization...'); await queueManager.initialize(); logger.info('Queue service initialized'); logger.info('All services initialized successfully'); } catch (error) { logger.error('Failed to initialize services', { error }); throw error; } } // Start server async function startServer() { await initializeServices(); } // Graceful shutdown process.on('SIGINT', async () => { logger.info('Received SIGINT, shutting down gracefully...'); await queueManager.shutdown(); process.exit(0); }); process.on('SIGTERM', async () => { logger.info('Received SIGTERM, shutting down gracefully...'); await queueManager.shutdown(); process.exit(0); }); startServer().catch(error => { logger.error('Failed to start server', { error }); process.exit(1); });