import { Queue as BullQueue, QueueEvents, Worker, type Job } from 'bullmq'; // Handler registry will be injected import type { HandlerRegistry } from '@stock-bot/handler-registry'; import type { ExtendedJobOptions, JobData, JobOptions, QueueStats, RedisConfig } from './types'; import { getRedisConnection } from './utils'; // Logger interface for type safety interface Logger { info(message: string, meta?: Record): void; error(message: string, meta?: Record): void; warn(message: string, meta?: Record): void; debug(message: string, meta?: Record): void; trace(message: string, meta?: Record): void; child?(name: string, context?: Record): Logger; } export interface QueueWorkerConfig { workers?: number; concurrency?: number; startWorker?: boolean; handlerRegistry?: HandlerRegistry; serviceName?: string; } /** * Consolidated Queue class that handles both job operations and optional worker management * Can be used as a simple job queue or with workers for automatic processing */ export class Queue { private bullQueue: BullQueue; private workers: Worker[] = []; private queueEvents?: QueueEvents; private queueName: string; private redisConfig: RedisConfig; private readonly logger: Logger; private readonly handlerRegistry?: HandlerRegistry; private serviceName?: string; constructor( queueName: string, redisConfig: RedisConfig, defaultJobOptions: JobOptions = {}, config: QueueWorkerConfig = {}, logger?: Logger ) { this.queueName = queueName; this.redisConfig = redisConfig; this.logger = logger || console; this.handlerRegistry = config.handlerRegistry; this.serviceName = config.serviceName; this.logger.debug('Queue constructor called', { queueName, serviceName: this.serviceName, hasHandlerRegistry: !!config.handlerRegistry, handlerRegistryType: config.handlerRegistry ? typeof config.handlerRegistry : 'undefined', configKeys: Object.keys(config), }); const connection = getRedisConnection(redisConfig); // Initialize BullMQ queue this.bullQueue = new BullQueue(queueName, { connection, defaultJobOptions: { removeOnComplete: 10, removeOnFail: 5, attempts: 3, backoff: { type: 'exponential', delay: 1000, }, ...defaultJobOptions, }, }); // Initialize queue events if workers will be used if (config.workers && config.workers > 0) { this.queueEvents = new QueueEvents(queueName, { connection }); } // Start workers if requested and not explicitly disabled if (config.workers && config.workers > 0 && config.startWorker !== false) { this.logger.info('Starting workers for queue', { queueName, workers: config.workers, concurrency: config.concurrency || 1, hasHandlerRegistry: !!this.handlerRegistry, }); this.startWorkers(config.workers, config.concurrency || 1); } else { this.logger.info('Not starting workers for queue', { queueName, workers: config.workers || 0, startWorker: config.startWorker, hasHandlerRegistry: !!this.handlerRegistry, }); } this.logger.trace('Queue created', { queueName, workers: config.workers || 0, concurrency: config.concurrency || 1, }); } /** * Get the queue name */ getName(): string { return this.queueName; } /** * Get the underlying BullMQ queue instance (for monitoring/admin purposes) */ getBullQueue(): BullQueue { return this.bullQueue; } /** * Add a single job to the queue */ async add(name: string, data: JobData, options: JobOptions = {}): Promise { this.logger.trace('Adding job', { queueName: this.queueName, jobName: name }); return await this.bullQueue.add(name, data, options); } /** * Add multiple jobs to the queue in bulk */ async addBulk(jobs: Array<{ name: string; data: JobData; opts?: JobOptions }>): Promise { this.logger.trace('Adding bulk jobs', { queueName: this.queueName, jobCount: jobs.length, }); return await this.bullQueue.addBulk(jobs); } /** * Add a scheduled job with cron-like pattern */ async addScheduledJob( name: string, data: JobData, cronPattern: string, options: ExtendedJobOptions = {} ): Promise { const scheduledOptions: ExtendedJobOptions = { ...options, repeat: { pattern: cronPattern, // Use job name as repeat key to prevent duplicates key: `${this.queueName}:${name}`, ...options.repeat, }, }; this.logger.info('Adding scheduled job', { queueName: this.queueName, jobName: name, cronPattern, repeatKey: scheduledOptions.repeat?.key, immediately: scheduledOptions.repeat?.immediately, }); return await this.bullQueue.add(name, data, scheduledOptions); } /** * Get queue statistics */ async getStats(): Promise { const [waiting, active, completed, failed, delayed] = await Promise.all([ this.bullQueue.getWaiting(), this.bullQueue.getActive(), this.bullQueue.getCompleted(), this.bullQueue.getFailed(), this.bullQueue.getDelayed(), ]); const isPaused = await this.bullQueue.isPaused(); // Get actual active worker count from BullMQ const activeWorkerCount = await this.getActiveWorkerCount(); return { waiting: waiting.length, active: active.length, completed: completed.length, failed: failed.length, delayed: delayed.length, paused: isPaused, workers: activeWorkerCount, // Use BullMQ's tracked count instead of local array }; } /** * Get a specific job by ID */ async getJob(jobId: string): Promise { return await this.bullQueue.getJob(jobId); } /** * Get jobs by state */ async getJobs( states: Array<'waiting' | 'active' | 'completed' | 'failed' | 'delayed'>, start = 0, end = 100 ): Promise { return await this.bullQueue.getJobs(states, start, end); } /** * Pause the queue (stops processing new jobs) */ async pause(): Promise { await this.bullQueue.pause(); this.logger.info('Queue paused', { queueName: this.queueName }); } /** * Resume the queue */ async resume(): Promise { await this.bullQueue.resume(); this.logger.info('Queue resumed', { queueName: this.queueName }); } /** * Drain the queue (remove all jobs) */ async drain(delayed = false): Promise { await this.bullQueue.drain(delayed); this.logger.info('Queue drained', { queueName: this.queueName, delayed }); } /** * Clean completed and failed jobs */ async clean( grace: number = 0, limit: number = 100, type: 'completed' | 'failed' = 'completed' ): Promise { await this.bullQueue.clean(grace, limit, type); this.logger.debug('Queue cleaned', { queueName: this.queueName, type, grace, limit }); } /** * Wait until the queue is ready */ async waitUntilReady(): Promise { await this.bullQueue.waitUntilReady(); } /** * Close the queue (cleanup resources) */ /** * Close the queue (cleanup resources) */ async close(): Promise { try { // Close the queue itself await this.bullQueue.close(); this.logger.info('Queue closed', { queueName: this.queueName }); // Close queue events if (this.queueEvents) { await this.queueEvents.close(); this.logger.debug('Queue events closed', { queueName: this.queueName }); } // Close workers first if (this.workers.length > 0) { await Promise.all( this.workers.map(async worker => { return await worker.close(); }) ); this.workers = []; this.logger.debug('Workers closed', { queueName: this.queueName }); } } catch (error) { this.logger.error('Error closing queue', { queueName: this.queueName, error }); throw error; } } /** * Create a child logger with additional context * Useful for batch processing and other queue operations */ createChildLogger(name: string, context?: Record) { if (this.logger && typeof this.logger.child === 'function') { return this.logger.child(name, context); } // Fallback to main logger if child not supported (e.g., console) return this.logger; } /** * Start workers for this queue */ private startWorkers(workerCount: number, concurrency: number): void { const connection = getRedisConnection(this.redisConfig); for (let i = 0; i < workerCount; i++) { const worker = new Worker(this.queueName, this.processJob.bind(this), { connection, concurrency, maxStalledCount: 3, stalledInterval: 30000, // Add a name to identify the worker name: `${this.serviceName || 'unknown'}_worker_${i}`, }); this.logger.info(`Starting worker ${i + 1}/${workerCount} for queue`, { queueName: this.queueName, workerName: worker.name, concurrency, }); // Setup worker event handlers worker.on('completed', job => { this.logger.trace('Job completed', { queueName: this.queueName, jobId: job.id, handler: job.data?.handler, operation: job.data?.operation, }); }); worker.on('failed', (job, err) => { this.logger.error('Job failed', { queueName: this.queueName, jobId: job?.id, handler: job?.data?.handler, operation: job?.data?.operation, error: err.message, }); }); worker.on('error', error => { this.logger.error('Worker error', { queueName: this.queueName, workerId: i, error: error.message, }); }); this.workers.push(worker); } this.logger.info('Workers started', { queueName: this.queueName, workerCount, concurrency, }); } /** * Process a job using the handler registry */ private async processJob(job: Job): Promise { const { handler, operation, payload }: JobData = job.data; this.logger.trace('Processing job', { id: job.id, handler, operation, queueName: this.queueName, }); try { // Look up handler in registry if (!this.handlerRegistry) { throw new Error('Handler registry not configured for worker processing'); } this.logger.debug('Looking up handler in registry', { handler, operation, queueName: this.queueName, registeredHandlers: this.handlerRegistry.getHandlerNames(), }); const jobHandler = this.handlerRegistry.getOperation(handler, operation); if (!jobHandler) { throw new Error(`No handler found for ${handler}:${operation}`); } const result = await jobHandler(payload); this.logger.trace('Job completed successfully', { id: job.id, handler, operation, queueName: this.queueName, }); return result; } catch (error) { this.logger.error('Job processing failed', { id: job.id, handler, operation, queueName: this.queueName, error: error instanceof Error ? error.message : String(error), }); throw error; } } /** * Start workers manually (for delayed initialization) */ startWorkersManually(workerCount: number, concurrency: number = 1): void { if (this.workers.length > 0) { this.logger.warn('Workers already started for queue', { queueName: this.queueName }); return; } this.logger.info('Starting workers manually', { queueName: this.queueName, workerCount, concurrency, hasHandlerRegistry: !!this.handlerRegistry, }); // Initialize queue events if not already done if (!this.queueEvents) { const connection = getRedisConnection(this.redisConfig); this.queueEvents = new QueueEvents(this.queueName, { connection }); } this.startWorkers(workerCount, concurrency); } /** * Get the number of active workers */ getWorkerCount(): number { return this.workers.length; } /** * Get active workers from BullMQ * This uses BullMQ's built-in worker tracking */ async getActiveWorkers(): Promise { try { const workers = await this.bullQueue.getWorkers(); return workers; } catch (error) { this.logger.error('Failed to get active workers', { queueName: this.queueName, error }); return []; } } /** * Get count of active workers from BullMQ */ async getActiveWorkerCount(): Promise { try { const workers = await this.bullQueue.getWorkers(); return workers.length; } catch (error) { this.logger.error('Failed to get active worker count', { queueName: this.queueName, error }); return 0; } } /** * Get detailed worker information */ async getWorkerDetails(): Promise> { try { const workers = await this.bullQueue.getWorkers(); return workers.map(worker => ({ id: worker.id || 'unknown', name: worker.name, addr: worker.addr, age: typeof worker.age === 'string' ? parseInt(worker.age) : worker.age, idle: typeof worker.idle === 'string' ? parseInt(worker.idle) : worker.idle, started: typeof worker.started === 'string' ? parseInt(worker.started) : worker.started, })); } catch (error) { this.logger.error('Failed to get worker details', { queueName: this.queueName, error }); return []; } } }