added all multi worker support

This commit is contained in:
Bojan Kucera 2025-06-08 19:11:05 -04:00
parent 9b8a7bdd4b
commit 39f6c42044

View file

@ -180,7 +180,6 @@ export class QueueService {
throw error; throw error;
} }
} }
private setupEventListeners() { private setupEventListeners() {
this.queueEvents.on('completed', (job) => { this.queueEvents.on('completed', (job) => {
this.logger.info('Job completed', { id: job.jobId }); this.logger.info('Job completed', { id: job.jobId });
@ -190,9 +189,8 @@ export class QueueService {
this.logger.error('Job failed', { id: job.jobId, error: job.failedReason }); this.logger.error('Job failed', { id: job.jobId, error: job.failedReason });
}); });
this.worker.on('progress', (job, progress) => { // Note: Worker-specific events are already set up during worker creation
this.logger.debug('Job progress', { id: job.id, progress }); // No need for additional progress events since we handle them per-worker
});
} }
private async setupScheduledTasks() { private async setupScheduledTasks() {
try { try {
@ -398,7 +396,6 @@ export class QueueService {
delayed: delayed.length delayed: delayed.length
}; };
} }
async getQueueStatus() { async getQueueStatus() {
if (!this.isInitialized) { if (!this.isInitialized) {
throw new Error('Queue service not initialized. Call initialize() first.'); throw new Error('Queue service not initialized. Call initialize() first.');
@ -407,6 +404,7 @@ export class QueueService {
return { return {
...stats, ...stats,
workers: this.getWorkerCount(), workers: this.getWorkerCount(),
totalConcurrency: this.getTotalConcurrency(),
queue: this.queue.name, queue: this.queue.name,
connection: { connection: {
host: process.env.DRAGONFLY_HOST || 'localhost', host: process.env.DRAGONFLY_HOST || 'localhost',
@ -414,12 +412,11 @@ export class QueueService {
} }
}; };
} }
getWorkerCount() { getWorkerCount() {
if (!this.isInitialized) { if (!this.isInitialized) {
return 0; return 0;
} }
return this.worker.opts.concurrency || 1; return this.workers.length;
} }
getRegisteredProviders() { getRegisteredProviders() {
return providerRegistry.getProviders().map(({ key, config }) => ({ return providerRegistry.getProviders().map(({ key, config }) => ({
@ -444,17 +441,24 @@ export class QueueService {
immediately: job.immediately || false immediately: job.immediately || false
})); }));
} }
async shutdown() { async shutdown() {
if (!this.isInitialized) { if (!this.isInitialized) {
this.logger.warn('Queue service not initialized, nothing to shutdown'); this.logger.warn('Queue service not initialized, nothing to shutdown');
return; return;
} }
this.logger.info('Shutting down queue service'); this.logger.info('Shutting down queue service');
await this.worker.close();
// Close all workers
this.logger.info(`Closing ${this.workers.length} workers...`);
await Promise.all(this.workers.map((worker, index) => {
this.logger.debug(`Closing worker ${index + 1}`);
return worker.close();
}));
await this.queue.close(); await this.queue.close();
await this.queueEvents.close(); await this.queueEvents.close();
this.isInitialized = false; this.isInitialized = false;
this.logger.info('Queue service shutdown complete');
} }
} }