From 811fc86c92c217568f2dffff93e6abe9ea7708b8 Mon Sep 17 00:00:00 2001 From: Bojan Kucera Date: Sun, 8 Jun 2025 14:36:23 -0400 Subject: [PATCH] clean up --- apps/data-service/src/api/queue.routes.ts | 115 ------------------ apps/data-service/src/index.ts | 23 +++- .../src/services/queue.service.ts | 78 +++++++++--- 3 files changed, 78 insertions(+), 138 deletions(-) delete mode 100644 apps/data-service/src/api/queue.routes.ts diff --git a/apps/data-service/src/api/queue.routes.ts b/apps/data-service/src/api/queue.routes.ts deleted file mode 100644 index 03990a0..0000000 --- a/apps/data-service/src/api/queue.routes.ts +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Queue API Endpoints - * REST API for monitoring and controlling job queues - */ -import { Router } from 'express'; -import { getLogger } from '@stock-bot/logger'; -import { proxyQueueIntegration } from '../services/proxy-queue-integration'; - -const logger = getLogger('queue-api'); -const router = Router(); - -/** - * GET /api/queue/stats - * Get queue statistics - */ -router.get('/stats', async (req, res) => { - try { - const stats = await proxyQueueIntegration.getStats(); - res.json({ success: true, data: stats }); - } catch (error) { - logger.error('Failed to get queue stats', { error }); - res.status(500).json({ - success: false, - error: 'Failed to get queue statistics' - }); - } -}); - -/** - * POST /api/queue/proxy/fetch - * Manually trigger proxy fetching - */ -router.post('/proxy/fetch', async (req, res) => { - try { - const job = await proxyQueueIntegration.triggerProxyFetch(); - res.json({ - success: true, - data: { - jobId: job.id, - message: 'Proxy fetch job queued' - } - }); - } catch (error) { - logger.error('Failed to trigger proxy fetch', { error }); - res.status(500).json({ - success: false, - error: 'Failed to queue proxy fetch job' - }); - } -}); - -/** - * POST /api/queue/proxy/check - * Check specific proxies - */ -router.post('/proxy/check', async (req, res) => { - try { - const { proxies } = req.body; - - if (!Array.isArray(proxies) || proxies.length === 0) { - return res.status(400).json({ - success: false, - error: 'Proxies array is required' - }); - } - - const job = await proxyQueueIntegration.checkSpecificProxies(proxies); - res.json({ - success: true, - data: { - jobId: job.id, - proxiesCount: proxies.length, - message: 'Proxy check job queued' - } - }); - } catch (error) { - logger.error('Failed to queue proxy check', { error }); - res.status(500).json({ - success: false, - error: 'Failed to queue proxy check job' - }); - } -}); - -/** - * GET /api/queue/health - * Health check for queue service - */ -router.get('/health', async (req, res) => { - try { - const stats = await proxyQueueIntegration.getStats(); - const isHealthy = stats.active >= 0; // Basic health check - - res.status(isHealthy ? 200 : 503).json({ - success: isHealthy, - data: { - status: isHealthy ? 'healthy' : 'unhealthy', - stats, - timestamp: new Date().toISOString() - } - }); - } catch (error) { - logger.error('Queue health check failed', { error }); - res.status(503).json({ - success: false, - data: { - status: 'unhealthy', - error: 'Queue service unavailable', - timestamp: new Date().toISOString() - } - }); - } -}); - -export default router; diff --git a/apps/data-service/src/index.ts b/apps/data-service/src/index.ts index b5599d6..d367615 100644 --- a/apps/data-service/src/index.ts +++ b/apps/data-service/src/index.ts @@ -122,15 +122,13 @@ async function initializeServices() { logger.info('Initializing data service...'); try { - // Queue manager is initialized automatically when imported - logger.info('Queue manager initialized'); - - - // Initialize providers + // Initialize queue service + await queueManager.initialize(); + logger.info('Queue service initialized'); logger.info('All services initialized successfully'); } catch (error) { logger.error('Failed to initialize services', { error }); - process.exit(1); + throw error; } } @@ -155,6 +153,19 @@ async function startServer() { logger.info(' GET /api/providers - List registered providers'); } +// Graceful shutdown +process.on('SIGINT', async () => { + logger.info('Received SIGINT, shutting down gracefully...'); + await queueManager.shutdown(); + process.exit(0); +}); + +process.on('SIGTERM', async () => { + logger.info('Received SIGTERM, shutting down gracefully...'); + await queueManager.shutdown(); + process.exit(0); +}); + startServer().catch(error => { logger.error('Failed to start server', { error }); process.exit(1); diff --git a/apps/data-service/src/services/queue.service.ts b/apps/data-service/src/services/queue.service.ts index 5582828..0022f57 100644 --- a/apps/data-service/src/services/queue.service.ts +++ b/apps/data-service/src/services/queue.service.ts @@ -11,26 +11,53 @@ export interface JobData { } export class QueueService { - private logger = new Logger('queue-manager'); - private queue: Queue; - private worker: Worker; - private queueEvents: QueueEvents; + private logger = new Logger('queue-service'); + private queue!: Queue; + private worker!: Worker; + private queueEvents!: QueueEvents; + private isInitialized = false; constructor() { + // Don't initialize in constructor to allow for proper async initialization + } + + async initialize() { + if (this.isInitialized) { + this.logger.warn('Queue service already initialized'); + return; + } + + this.logger.info('Initializing queue service...'); + const connection = { host: process.env.DRAGONFLY_HOST || 'localhost', port: parseInt(process.env.DRAGONFLY_PORT || '6379'), }; - this.queue = new Queue('data-service-queue', { connection }); - this.worker = new Worker('data-service-queue', this.processJob.bind(this), { - connection, - concurrency: 10 - }); - this.queueEvents = new QueueEvents('data-service-queue', { connection }); + this.logger.info('Connecting to Redis/Dragonfly', connection); - this.setupEventListeners(); - this.setupScheduledTasks(); + try { + this.queue = new Queue('data-service-queue', { connection }); + this.worker = new Worker('data-service-queue', this.processJob.bind(this), { + connection, + concurrency: 10 + }); + this.queueEvents = new QueueEvents('data-service-queue', { connection }); + + // Test connection + await this.queue.waitUntilReady(); + await this.worker.waitUntilReady(); + await this.queueEvents.waitUntilReady(); + + this.setupEventListeners(); + this.setupScheduledTasks(); + + this.isInitialized = true; + this.logger.info('Queue service initialized successfully'); + } catch (error) { + this.logger.error('Failed to initialize queue service', { error }); + throw error; + } } private async processJob(job: any) { @@ -111,8 +138,10 @@ export class QueueService { this.logger.info('Scheduled tasks configured'); } - async addJob(jobData: JobData, options?: any) { + if (!this.isInitialized) { + throw new Error('Queue service not initialized. Call initialize() first.'); + } return this.queue.add(jobData.type, jobData, { priority: jobData.priority || 0, removeOnComplete: 10, @@ -122,6 +151,9 @@ export class QueueService { } async addRecurringJob(jobData: JobData, cronPattern: string) { + if (!this.isInitialized) { + throw new Error('Queue service not initialized. Call initialize() first.'); + } return this.queue.add( `recurring-${jobData.type}`, jobData, @@ -135,6 +167,9 @@ export class QueueService { } async getJobStats() { + if (!this.isInitialized) { + throw new Error('Queue service not initialized. Call initialize() first.'); + } const [waiting, active, completed, failed, delayed] = await Promise.all([ this.queue.getWaiting(), this.queue.getActive(), @@ -150,8 +185,10 @@ export class QueueService { failed: failed.length, delayed: delayed.length }; - } - async getQueueStatus() { + } async getQueueStatus() { + if (!this.isInitialized) { + throw new Error('Queue service not initialized. Call initialize() first.'); + } const stats = await this.getJobStats(); return { ...stats, @@ -165,6 +202,9 @@ export class QueueService { } getWorkerCount() { + if (!this.isInitialized) { + return 0; + } return this.worker.opts.concurrency || 1; } @@ -174,12 +214,16 @@ export class QueueService { { name: 'market-data-provider', type: 'market-data', operations: ['live-data', 'historical-data'] } ]; } - async shutdown() { - this.logger.info('Shutting down queue manager'); + if (!this.isInitialized) { + this.logger.warn('Queue service not initialized, nothing to shutdown'); + return; + } + this.logger.info('Shutting down queue service'); await this.worker.close(); await this.queue.close(); await this.queueEvents.close(); + this.isInitialized = false; } }