import { Queue as BullQueue, Worker, QueueEvents, type Job } from 'bullmq'; import { getLogger } from '@stock-bot/logger'; import { handlerRegistry } from './handler-registry'; import type { JobData, JobOptions, QueueStats, RedisConfig } from './types'; import { getRedisConnection } from './utils'; const logger = getLogger('queue'); export interface QueueWorkerConfig { workers?: number; concurrency?: number; startWorker?: boolean; } /** * Consolidated Queue class that handles both job operations and optional worker management * Can be used as a simple job queue or with workers for automatic processing */ export class Queue { private bullQueue: BullQueue; private workers: Worker[] = []; private queueEvents?: QueueEvents; private queueName: string; private redisConfig: RedisConfig; constructor( queueName: string, redisConfig: RedisConfig, defaultJobOptions: JobOptions = {}, config: QueueWorkerConfig = {} ) { this.queueName = queueName; this.redisConfig = redisConfig; const connection = getRedisConnection(redisConfig); // Initialize BullMQ queue this.bullQueue = new BullQueue(`{${queueName}}`, { connection, defaultJobOptions: { removeOnComplete: 10, removeOnFail: 5, attempts: 3, backoff: { type: 'exponential', delay: 1000, }, ...defaultJobOptions, }, }); // Initialize queue events if workers will be used if (config.workers && config.workers > 0) { this.queueEvents = new QueueEvents(`{${queueName}}`, { connection }); } // Start workers if requested and not explicitly disabled if (config.workers && config.workers > 0 && config.startWorker !== false) { this.startWorkers(config.workers, config.concurrency || 1); } logger.trace('Queue created', { queueName, workers: config.workers || 0, concurrency: config.concurrency || 1 }); } /** * Get the queue name */ getName(): string { return this.queueName; } /** * Add a single job to the queue */ async add(name: string, data: JobData, options: JobOptions = {}): Promise { logger.trace('Adding job', { queueName: this.queueName, jobName: name }); return await this.bullQueue.add(name, data, options); } /** * Add multiple jobs to the queue in bulk */ async addBulk( jobs: Array<{ name: string; data: JobData; opts?: JobOptions }> ): Promise { logger.trace('Adding bulk jobs', { queueName: this.queueName, jobCount: jobs.length }); return await this.bullQueue.addBulk(jobs); } /** * Add a scheduled job with cron-like pattern */ async addScheduledJob( name: string, data: JobData, cronPattern: string, options: JobOptions = {} ): Promise { const scheduledOptions: JobOptions = { ...options, repeat: { pattern: cronPattern, // Use job name as repeat key to prevent duplicates key: `${this.queueName}:${name}`, ...options.repeat, }, }; logger.info('Adding scheduled job', { queueName: this.queueName, jobName: name, cronPattern, repeatKey: scheduledOptions.repeat?.key, immediately: scheduledOptions.repeat?.immediately }); return await this.bullQueue.add(name, data, scheduledOptions); } /** * Get queue statistics */ async getStats(): Promise { const [waiting, active, completed, failed, delayed] = await Promise.all([ this.bullQueue.getWaiting(), this.bullQueue.getActive(), this.bullQueue.getCompleted(), this.bullQueue.getFailed(), this.bullQueue.getDelayed(), ]); const isPaused = await this.bullQueue.isPaused(); return { waiting: waiting.length, active: active.length, completed: completed.length, failed: failed.length, delayed: delayed.length, paused: isPaused, workers: this.workers.length, }; } /** * Get a specific job by ID */ async getJob(jobId: string): Promise { return await this.bullQueue.getJob(jobId); } /** * Get jobs by state */ async getJobs( states: Array<'waiting' | 'active' | 'completed' | 'failed' | 'delayed'>, start = 0, end = 100 ): Promise { return await this.bullQueue.getJobs(states, start, end); } /** * Pause the queue (stops processing new jobs) */ async pause(): Promise { await this.bullQueue.pause(); logger.info('Queue paused', { queueName: this.queueName }); } /** * Resume the queue */ async resume(): Promise { await this.bullQueue.resume(); logger.info('Queue resumed', { queueName: this.queueName }); } /** * Drain the queue (remove all jobs) */ async drain(delayed = false): Promise { await this.bullQueue.drain(delayed); logger.info('Queue drained', { queueName: this.queueName, delayed }); } /** * Clean completed and failed jobs */ async clean( grace: number = 0, limit: number = 100, type: 'completed' | 'failed' = 'completed' ): Promise { await this.bullQueue.clean(grace, limit, type); logger.debug('Queue cleaned', { queueName: this.queueName, type, grace, limit }); } /** * Wait until the queue is ready */ async waitUntilReady(): Promise { await this.bullQueue.waitUntilReady(); } /** * Close the queue (cleanup resources) */ async close(): Promise { try { // Close workers first if (this.workers.length > 0) { await Promise.all(this.workers.map(worker => worker.close())); this.workers = []; logger.debug('Workers closed', { queueName: this.queueName }); } // Close queue events if (this.queueEvents) { await this.queueEvents.close(); logger.debug('Queue events closed', { queueName: this.queueName }); } // Close the queue itself await this.bullQueue.close(); logger.info('Queue closed', { queueName: this.queueName }); } catch (error) { logger.error('Error closing queue', { queueName: this.queueName, error }); throw error; } } /** * Start workers for this queue */ private startWorkers(workerCount: number, concurrency: number): void { const connection = getRedisConnection(this.redisConfig); for (let i = 0; i < workerCount; i++) { const worker = new Worker( `{${this.queueName}}`, this.processJob.bind(this), { connection, concurrency, maxStalledCount: 3, stalledInterval: 30000, } ); // Setup worker event handlers worker.on('completed', (job) => { logger.trace('Job completed', { queueName: this.queueName, jobId: job.id, handler: job.data?.handler, operation: job.data?.operation, }); }); worker.on('failed', (job, err) => { logger.error('Job failed', { queueName: this.queueName, jobId: job?.id, handler: job?.data?.handler, operation: job?.data?.operation, error: err.message, }); }); worker.on('error', (error) => { logger.error('Worker error', { queueName: this.queueName, workerId: i, error: error.message, }); }); this.workers.push(worker); } logger.info('Workers started', { queueName: this.queueName, workerCount, concurrency, }); } /** * Process a job using the handler registry */ private async processJob(job: Job): Promise { const { handler, operation, payload }: JobData = job.data; logger.trace('Processing job', { id: job.id, handler, operation, queueName: this.queueName, }); try { // Look up handler in registry const jobHandler = handlerRegistry.getHandler(handler, operation); if (!jobHandler) { throw new Error(`No handler found for ${handler}:${operation}`); } const result = await jobHandler(payload); logger.trace('Job completed successfully', { id: job.id, handler, operation, queueName: this.queueName, }); return result; } catch (error) { logger.error('Job processing failed', { id: job.id, handler, operation, queueName: this.queueName, error: error instanceof Error ? error.message : String(error), }); throw error; } } /** * Get the number of active workers */ getWorkerCount(): number { return this.workers.length; } /** * Get the underlying BullMQ queue (for advanced operations) * @deprecated Use direct methods instead */ getBullQueue(): BullQueue { return this.bullQueue; } }