From 39f6c4204428c59fcfbb86882b6c5c8bb279007d Mon Sep 17 00:00:00 2001 From: Bojan Kucera Date: Sun, 8 Jun 2025 19:11:05 -0400 Subject: [PATCH] added all multi worker support --- .../src/services/queue.service.ts | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/apps/data-service/src/services/queue.service.ts b/apps/data-service/src/services/queue.service.ts index d9c272f..77a3023 100644 --- a/apps/data-service/src/services/queue.service.ts +++ b/apps/data-service/src/services/queue.service.ts @@ -180,7 +180,6 @@ export class QueueService { throw error; } } - private setupEventListeners() { this.queueEvents.on('completed', (job) => { this.logger.info('Job completed', { id: job.jobId }); @@ -190,9 +189,8 @@ export class QueueService { this.logger.error('Job failed', { id: job.jobId, error: job.failedReason }); }); - this.worker.on('progress', (job, progress) => { - this.logger.debug('Job progress', { id: job.id, progress }); - }); + // Note: Worker-specific events are already set up during worker creation + // No need for additional progress events since we handle them per-worker } private async setupScheduledTasks() { try { @@ -398,7 +396,6 @@ export class QueueService { delayed: delayed.length }; } - async getQueueStatus() { if (!this.isInitialized) { throw new Error('Queue service not initialized. Call initialize() first.'); @@ -407,6 +404,7 @@ export class QueueService { return { ...stats, workers: this.getWorkerCount(), + totalConcurrency: this.getTotalConcurrency(), queue: this.queue.name, connection: { host: process.env.DRAGONFLY_HOST || 'localhost', @@ -414,12 +412,11 @@ export class QueueService { } }; } - getWorkerCount() { if (!this.isInitialized) { return 0; } - return this.worker.opts.concurrency || 1; + return this.workers.length; } getRegisteredProviders() { return providerRegistry.getProviders().map(({ key, config }) => ({ @@ -444,17 +441,24 @@ export class QueueService { immediately: job.immediately || false })); } - async shutdown() { if (!this.isInitialized) { this.logger.warn('Queue service not initialized, nothing to shutdown'); return; } this.logger.info('Shutting down queue service'); - await this.worker.close(); + + // Close all workers + this.logger.info(`Closing ${this.workers.length} workers...`); + await Promise.all(this.workers.map((worker, index) => { + this.logger.debug(`Closing worker ${index + 1}`); + return worker.close(); + })); + await this.queue.close(); await this.queueEvents.close(); this.isInitialized = false; + this.logger.info('Queue service shutdown complete'); } }