// Framework imports import { initializeConfig } from '@stock-bot/config'; import { Hono } from 'hono'; import { cors } from 'hono/cors'; // Library imports import { getLogger, setLoggerConfig, shutdownLoggers } from '@stock-bot/logger'; import { connectMongoDB } from '@stock-bot/mongodb-client'; import { connectPostgreSQL } from '@stock-bot/postgres-client'; import { QueueManager, type QueueManagerConfig } from '@stock-bot/queue'; import { Shutdown } from '@stock-bot/shutdown'; import { ProxyManager } from '@stock-bot/utils'; // Local imports import { exchangeRoutes, healthRoutes, queueRoutes } from './routes'; const config = initializeConfig(); const serviceConfig = config.service; const databaseConfig = config.database; const queueConfig = config.queue; if (config.log) { setLoggerConfig({ logLevel: config.log.level, logConsole: true, logFile: false, environment: config.environment, }); } const app = new Hono(); // Add CORS middleware app.use( '*', cors({ origin: '*', allowMethods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS', 'PATCH'], allowHeaders: ['Content-Type', 'Authorization'], credentials: false, }) ); const logger = getLogger('data-service'); const PORT = serviceConfig.port; let server: ReturnType | null = null; // Singleton clients are managed in libraries let queueManager: QueueManager | null = null; // Initialize shutdown manager const shutdown = Shutdown.getInstance({ timeout: 15000 }); // Mount routes app.route('/health', healthRoutes); app.route('/api/exchanges', exchangeRoutes); app.route('/api/queue', queueRoutes); // Initialize services async function initializeServices() { logger.info('Initializing data service...'); try { // Initialize MongoDB client singleton logger.debug('Connecting to MongoDB...'); const mongoConfig = databaseConfig.mongodb; await connectMongoDB({ uri: mongoConfig.uri, database: mongoConfig.database, host: mongoConfig.host || 'localhost', port: mongoConfig.port || 27017, timeouts: { connectTimeout: 30000, socketTimeout: 30000, serverSelectionTimeout: 5000, }, }); logger.info('MongoDB connected'); // Initialize PostgreSQL client singleton logger.debug('Connecting to PostgreSQL...'); const pgConfig = databaseConfig.postgres; await connectPostgreSQL({ host: pgConfig.host, port: pgConfig.port, database: pgConfig.database, username: pgConfig.user, password: pgConfig.password, poolSettings: { min: 2, max: pgConfig.poolSize || 10, idleTimeoutMillis: pgConfig.idleTimeout || 30000, }, }); logger.info('PostgreSQL connected'); // Initialize queue system logger.debug('Initializing queue system...'); const queueManagerConfig: QueueManagerConfig = { redis: queueConfig?.redis || { host: 'localhost', port: 6379, db: 1, }, defaultQueueOptions: { defaultJobOptions: queueConfig?.defaultJobOptions || { attempts: 3, backoff: { type: 'exponential', delay: 1000, }, removeOnComplete: true, removeOnFail: false, }, workers: 5, concurrency: 20, enableMetrics: true, enableDLQ: true, }, enableScheduledJobs: true, }; queueManager = QueueManager.getOrInitialize(queueManagerConfig); logger.info('Queue system initialized'); // Initialize proxy manager logger.debug('Initializing proxy manager...'); await ProxyManager.initialize(); logger.info('Proxy manager initialized'); // Initialize providers (register handlers and scheduled jobs) logger.debug('Initializing data providers...'); const { initializeWebShareProvider } = await import('./providers/webshare.provider'); const { initializeExchangeSyncProvider } = await import('./providers/exchange-sync.provider'); const { initializeIBProvider } = await import('./providers/ib.provider'); const { initializeProxyProvider } = await import('./providers/proxy.provider'); const { initializeQMProvider } = await import('./providers/qm.provider'); initializeExchangeSyncProvider(); initializeIBProvider(); initializeProxyProvider(); initializeQMProvider(); initializeWebShareProvider(); logger.info('Data providers initialized'); // Create scheduled jobs from registered handlers logger.debug('Creating scheduled jobs from registered handlers...'); const { handlerRegistry } = await import('@stock-bot/queue'); const allHandlers = handlerRegistry.getAllHandlers(); let totalScheduledJobs = 0; for (const [handlerName, config] of allHandlers) { if (config.scheduledJobs && config.scheduledJobs.length > 0) { const queue = queueManager.getQueue(handlerName); for (const scheduledJob of config.scheduledJobs) { // Include handler and operation info in job data const jobData = { handler: handlerName, operation: scheduledJob.operation, ...(scheduledJob.payload || {}), }; // Build job options from scheduled job config const jobOptions = { priority: scheduledJob.priority, delay: scheduledJob.delay, repeat: { immediately: scheduledJob.immediately, }, }; await queue.addScheduledJob( scheduledJob.operation, jobData, scheduledJob.cronPattern, jobOptions ); totalScheduledJobs++; logger.debug('Scheduled job created', { handler: handlerName, operation: scheduledJob.operation, cronPattern: scheduledJob.cronPattern, immediately: scheduledJob.immediately, priority: scheduledJob.priority, }); } } } logger.info('Scheduled jobs created', { totalJobs: totalScheduledJobs }); logger.info('All services initialized successfully'); } catch (error) { logger.error('Failed to initialize services', { error }); throw error; } } // Start server async function startServer() { await initializeServices(); server = Bun.serve({ port: PORT, fetch: app.fetch, development: config.environment === 'development', }); logger.info(`Data Service started on port ${PORT}`); } // Register shutdown handlers shutdown.onShutdown(async () => { if (server) { logger.info('Stopping HTTP server...'); try { server.stop(); logger.info('HTTP server stopped'); } catch (error) { logger.error('Error stopping HTTP server', { error }); } } }); shutdown.onShutdown(async () => { logger.info('Shutting down queue system...'); try { if (queueManager) { await queueManager.shutdown(); } logger.info('Queue system shut down'); } catch (error) { logger.error('Error shutting down queue system', { error }); } }); shutdown.onShutdown(async () => { logger.info('Disconnecting from databases...'); try { const { disconnectMongoDB } = await import('@stock-bot/mongodb-client'); const { disconnectPostgreSQL } = await import('@stock-bot/postgres-client'); await disconnectMongoDB(); await disconnectPostgreSQL(); logger.info('Database connections closed'); } catch (error) { logger.error('Error closing database connections', { error }); } }); // Logger shutdown - registered last so it runs after all other shutdown operations shutdown.onShutdown(async () => { try { logger.info('Shutting down loggers...'); await shutdownLoggers(); // Don't log after shutdown } catch { // Silently ignore logger shutdown errors } }); // Start the service startServer().catch(error => { logger.fatal('Failed to start data service', { error }); process.exit(1); }); logger.info('Data service startup initiated'); // ProxyManager class and singleton instance are available via @stock-bot/utils