/** * Data Service - Combined live and historical data ingestion with queue-based architecture */ import { getLogger } from '@stock-bot/logger'; import { loadEnvVariables } from '@stock-bot/config'; import { Hono } from 'hono'; import { onShutdown, setShutdownTimeout } from '@stock-bot/shutdown'; import { queueManager } from './services/queue.service'; import { initializeBatchCache } from './utils/batch-helpers'; import { initializeProxyCache } from './providers/proxy.tasks'; import { healthRoutes, queueRoutes, marketDataRoutes, proxyRoutes, testRoutes } from './routes'; // Load environment variables loadEnvVariables(); const app = new Hono(); const logger = getLogger('data-service'); const PORT = parseInt(process.env.DATA_SERVICE_PORT || '3002'); let server: any = null; // Register all routes app.route('', healthRoutes); app.route('', queueRoutes); app.route('', marketDataRoutes); app.route('', proxyRoutes); app.route('', testRoutes); // Initialize services async function initializeServices() { logger.info('Initializing data service...'); try { // Initialize batch cache FIRST - before queue service logger.info('Starting batch cache initialization...'); await initializeBatchCache(); logger.info('Batch cache initialized'); // Initialize proxy cache - before queue service logger.info('Starting proxy cache initialization...'); await initializeProxyCache(); logger.info('Proxy cache initialized'); // Initialize queue service (Redis connections should be ready now) logger.info('Starting queue service initialization...'); await queueManager.initialize(); logger.info('Queue service initialized'); logger.info('All services initialized successfully'); } catch (error) { logger.error('Failed to initialize services', { error }); throw error; } } // Start server async function startServer() { await initializeServices(); // Start the HTTP server using Bun's native serve server = Bun.serve({ port: PORT, fetch: app.fetch, development: process.env.NODE_ENV === 'development', }); logger.info(`Data Service started on port ${PORT}`); } // Setup shutdown handling setShutdownTimeout(15000); // Register cleanup for HTTP server onShutdown(async () => { if (server) { logger.info('Stopping HTTP server...'); server.stop(); } }); // Register cleanup for queue manager onShutdown(async () => { logger.info('Shutting down queue manager...'); await queueManager.shutdown(); }); startServer().catch(error => { logger.error('Failed to start server', { error }); process.exit(1); }); logger.info('Shutdown handlers registered');