diff --git a/libs/queue/src/queue-manager.ts b/libs/queue/src/queue-manager.ts index ab63d01..b555824 100644 --- a/libs/queue/src/queue-manager.ts +++ b/libs/queue/src/queue-manager.ts @@ -1,13 +1,13 @@ -import { getLogger } from '@stock-bot/logger'; -import { QueueRateLimiter } from './rate-limiter'; -import { Queue, type QueueWorkerConfig } from './queue'; import { CacheProvider, createCache } from '@stock-bot/cache'; -import type { - QueueManagerConfig, - QueueOptions, - GlobalStats, +import { getLogger } from '@stock-bot/logger'; +import { Queue, type QueueWorkerConfig } from './queue'; +import { QueueRateLimiter } from './rate-limiter'; +import type { + GlobalStats, + QueueManagerConfig, + QueueOptions, QueueStats, - RateLimitRule + RateLimitRule, } from './types'; import { getRedisConnection } from './utils'; @@ -30,7 +30,7 @@ export class QueueManager { private constructor(config: QueueManagerConfig) { this.config = config; this.redisConnection = getRedisConnection(config.redis); - + // Initialize rate limiter if rules are provided if (config.rateLimitRules && config.rateLimitRules.length > 0) { this.rateLimiter = new QueueRateLimiter(this.redisConnection); @@ -40,7 +40,7 @@ export class QueueManager { } }); } - + logger.info('QueueManager singleton initialized', { redis: `${config.redis.host}:${config.redis.port}`, }); @@ -52,9 +52,7 @@ export class QueueManager { */ static getInstance(): QueueManager { if (!QueueManager.instance) { - throw new Error( - 'QueueManager not initialized. Call QueueManager.initialize(config) first.' - ); + throw new Error('QueueManager not initialized. Call QueueManager.initialize(config) first.'); } return QueueManager.instance; } @@ -80,14 +78,14 @@ export class QueueManager { if (QueueManager.instance) { return QueueManager.instance; } - + if (!config) { throw new Error( 'QueueManager not initialized and no config provided. ' + - 'Either call initialize(config) first or provide config to getOrInitialize(config).' + 'Either call initialize(config) first or provide config to getOrInitialize(config).' ); } - + return QueueManager.initialize(config); } @@ -135,8 +133,8 @@ export class QueueManager { }; const queue = new Queue( - queueName, - this.config.redis, + queueName, + this.config.redis, mergedOptions.defaultJobOptions || {}, queueConfig ); @@ -158,10 +156,10 @@ export class QueueManager { }); } - logger.info('Queue created with batch cache', { - queueName, + logger.info('Queue created with batch cache', { + queueName, workers: mergedOptions.workers || 0, - concurrency: mergedOptions.concurrency || 1 + concurrency: mergedOptions.concurrency || 1, }); return queue; @@ -232,7 +230,7 @@ export class QueueManager { for (const [queueName, queue] of this.queues) { const stats = await queue.getStats(); queueStats[queueName] = stats; - + totalJobs += stats.waiting + stats.active + stats.completed + stats.failed + stats.delayed; totalWorkers += stats.workers || 0; } @@ -269,7 +267,11 @@ export class QueueManager { /** * Check rate limits for a job */ - async checkRateLimit(queueName: string, handler: string, operation: string): Promise<{ + async checkRateLimit( + queueName: string, + handler: string, + operation: string + ): Promise<{ allowed: boolean; retryAfter?: number; remainingPoints?: number; @@ -278,7 +280,7 @@ export class QueueManager { if (!this.rateLimiter) { return { allowed: true }; } - + return await this.rateLimiter.checkLimit(queueName, handler, operation); } @@ -293,7 +295,7 @@ export class QueueManager { operation, }; } - + return await this.rateLimiter.getStatus(queueName, handler, operation); } @@ -323,7 +325,7 @@ export class QueueManager { if (!queue) { return false; } - + await queue.pause(); return true; } @@ -336,7 +338,7 @@ export class QueueManager { if (!queue) { return false; } - + await queue.resume(); return true; } @@ -354,18 +356,17 @@ export class QueueManager { * Clean all queues */ async cleanAll( - grace: number = 0, - limit: number = 100, + grace: number = 0, + limit: number = 100, type: 'completed' | 'failed' = 'completed' ): Promise { - const cleanPromises = Array.from(this.queues.values()).map(queue => + const cleanPromises = Array.from(this.queues.values()).map(queue => queue.clean(grace, limit, type) ); await Promise.all(cleanPromises); logger.info('All queues cleaned', { type, grace, limit }); } - /** * Shutdown all queues and workers (thread-safe) */ @@ -393,15 +394,15 @@ export class QueueManager { private async performShutdown(): Promise { try { // Close all queues (this now includes workers since they're managed by Queue class) - const queueShutdownPromises = Array.from(this.queues.values()).map(async (queue) => { + const queueShutdownPromises = Array.from(this.queues.values()).map(async queue => { try { // Add timeout to queue.close() to prevent hanging - const closePromise = queue.close(); - const timeoutPromise = new Promise((_, reject) => - setTimeout(() => reject(new Error('Queue close timeout')), 100) - ); - - await Promise.race([closePromise, timeoutPromise]); + await queue.close(); + // const timeoutPromise = new Promise((_, reject) => + // setTimeout(() => reject(new Error('Queue close timeout')), 100) + // ); + + // await Promise.race([closePromise, timeoutPromise]); } catch (error) { logger.warn('Error closing queue', { error: (error as Error).message }); } @@ -410,7 +411,7 @@ export class QueueManager { await Promise.all(queueShutdownPromises); // Close all caches - const cacheShutdownPromises = Array.from(this.caches.values()).map(async (cache) => { + const cacheShutdownPromises = Array.from(this.caches.values()).map(async cache => { try { // Clear cache before shutdown await cache.clear(); diff --git a/libs/queue/src/queue.ts b/libs/queue/src/queue.ts index 734e7d4..d66cac0 100644 --- a/libs/queue/src/queue.ts +++ b/libs/queue/src/queue.ts @@ -1,4 +1,4 @@ -import { Queue as BullQueue, Worker, QueueEvents, type Job } from 'bullmq'; +import { Queue as BullQueue, QueueEvents, Worker, type Job } from 'bullmq'; import { getLogger } from '@stock-bot/logger'; import { handlerRegistry } from './handler-registry'; import type { JobData, JobOptions, QueueStats, RedisConfig } from './types'; @@ -24,14 +24,14 @@ export class Queue { private redisConfig: RedisConfig; constructor( - queueName: string, - redisConfig: RedisConfig, + queueName: string, + redisConfig: RedisConfig, defaultJobOptions: JobOptions = {}, config: QueueWorkerConfig = {} ) { this.queueName = queueName; this.redisConfig = redisConfig; - + const connection = getRedisConnection(redisConfig); // Initialize BullMQ queue @@ -59,10 +59,10 @@ export class Queue { this.startWorkers(config.workers, config.concurrency || 1); } - logger.trace('Queue created', { - queueName, + logger.trace('Queue created', { + queueName, workers: config.workers || 0, - concurrency: config.concurrency || 1 + concurrency: config.concurrency || 1, }); } @@ -84,12 +84,10 @@ export class Queue { /** * Add multiple jobs to the queue in bulk */ - async addBulk( - jobs: Array<{ name: string; data: JobData; opts?: JobOptions }> - ): Promise { - logger.trace('Adding bulk jobs', { - queueName: this.queueName, - jobCount: jobs.length + async addBulk(jobs: Array<{ name: string; data: JobData; opts?: JobOptions }>): Promise { + logger.trace('Adding bulk jobs', { + queueName: this.queueName, + jobCount: jobs.length, }); return await this.bullQueue.addBulk(jobs); } @@ -98,9 +96,9 @@ export class Queue { * Add a scheduled job with cron-like pattern */ async addScheduledJob( - name: string, - data: JobData, - cronPattern: string, + name: string, + data: JobData, + cronPattern: string, options: JobOptions = {} ): Promise { const scheduledOptions: JobOptions = { @@ -112,15 +110,15 @@ export class Queue { ...options.repeat, }, }; - - logger.info('Adding scheduled job', { - queueName: this.queueName, - jobName: name, + + logger.info('Adding scheduled job', { + queueName: this.queueName, + jobName: name, cronPattern, repeatKey: scheduledOptions.repeat?.key, - immediately: scheduledOptions.repeat?.immediately + immediately: scheduledOptions.repeat?.immediately, }); - + return await this.bullQueue.add(name, data, scheduledOptions); } @@ -195,8 +193,8 @@ export class Queue { * Clean completed and failed jobs */ async clean( - grace: number = 0, - limit: number = 100, + grace: number = 0, + limit: number = 100, type: 'completed' | 'failed' = 'completed' ): Promise { await this.bullQueue.clean(grace, limit, type); @@ -210,62 +208,30 @@ export class Queue { await this.bullQueue.waitUntilReady(); } + /** + * Close the queue (cleanup resources) + */ /** * Close the queue (cleanup resources) */ async close(): Promise { try { - // Close workers first with timeout - if (this.workers.length > 0) { - const workerClosePromises = this.workers.map((worker) => { - return new Promise((resolve) => { - const timeout = setTimeout(() => { - resolve(); - }, 50); - - worker.close().then(() => { - clearTimeout(timeout); - resolve(); - }).catch(() => { - clearTimeout(timeout); - resolve(); - }); - }); - }); - - await Promise.all(workerClosePromises); - this.workers = []; - logger.debug('Workers closed', { queueName: this.queueName }); - } + // Close the queue itself + await this.bullQueue.close(); + logger.info('Queue closed', { queueName: this.queueName }); - // Close queue events with timeout + // Close queue events if (this.queueEvents) { - const eventsClosePromise = this.queueEvents.close(); - const eventsTimeoutPromise = new Promise((_, reject) => - setTimeout(() => reject(new Error('Queue events close timeout')), 50) - ); - - try { - await Promise.race([eventsClosePromise, eventsTimeoutPromise]); - } catch (error) { - // Silently ignore timeout - } + await this.queueEvents.close(); logger.debug('Queue events closed', { queueName: this.queueName }); } - // Close the queue itself with timeout - const queueClosePromise = this.bullQueue.close(); - const queueTimeoutPromise = new Promise((_, reject) => - setTimeout(() => reject(new Error('BullQueue close timeout')), 50) - ); - - try { - await Promise.race([queueClosePromise, queueTimeoutPromise]); - } catch (error) { - // Silently ignore timeout + // Close workers first + if (this.workers.length > 0) { + await Promise.all(this.workers.map(worker => worker.close())); + this.workers = []; + logger.debug('Workers closed', { queueName: this.queueName }); } - - logger.info('Queue closed', { queueName: this.queueName }); } catch (error) { logger.error('Error closing queue', { queueName: this.queueName, error }); throw error; @@ -279,19 +245,15 @@ export class Queue { const connection = getRedisConnection(this.redisConfig); for (let i = 0; i < workerCount; i++) { - const worker = new Worker( - `{${this.queueName}}`, - this.processJob.bind(this), - { - connection, - concurrency, - maxStalledCount: 3, - stalledInterval: 30000, - } - ); + const worker = new Worker(`{${this.queueName}}`, this.processJob.bind(this), { + connection, + concurrency, + maxStalledCount: 3, + stalledInterval: 30000, + }); // Setup worker event handlers - worker.on('completed', (job) => { + worker.on('completed', job => { logger.trace('Job completed', { queueName: this.queueName, jobId: job.id, @@ -310,7 +272,7 @@ export class Queue { }); }); - worker.on('error', (error) => { + worker.on('error', error => { logger.error('Worker error', { queueName: this.queueName, workerId: i, @@ -385,4 +347,4 @@ export class Queue { getBullQueue(): BullQueue { return this.bullQueue; } -} \ No newline at end of file +}