import { Queue, Worker, QueueEvents } from 'bullmq'; import { getLogger } from '@stock-bot/logger'; import { providerRegistry, JobData } from './provider-registry.service'; export class QueueService { private logger = getLogger('queue-service'); private queue!: Queue; private workers: Worker[] = []; private queueEvents!: QueueEvents; private isInitialized = false; constructor() { // Don't initialize in constructor to allow for proper async initialization } async initialize() { if (this.isInitialized) { this.logger.warn('Queue service already initialized'); return; } this.logger.info('Initializing queue service...'); // Register all providers first await this.registerProviders(); const connection = { host: process.env.DRAGONFLY_HOST || 'localhost', port: parseInt(process.env.DRAGONFLY_PORT || '6379'), // Add these Redis-specific options to fix the undeclared key issue maxRetriesPerRequest: null, retryDelayOnFailover: 100, enableReadyCheck: false, lazyConnect: false, // Disable Redis Cluster mode if you're using standalone Redis/Dragonfly enableOfflineQueue: true }; // Worker configuration const workerCount = parseInt(process.env.WORKER_COUNT || '5'); const concurrencyPerWorker = parseInt(process.env.WORKER_CONCURRENCY || '20'); this.logger.info('Connecting to Redis/Dragonfly', connection); try { this.queue = new Queue('{data-service-queue}', { connection, defaultJobOptions: { removeOnComplete: 10, removeOnFail: 5, attempts: 3, backoff: { type: 'exponential', delay: 1000, } } }); // Create multiple workers for (let i = 0; i < workerCount; i++) { const worker = new Worker( '{data-service-queue}', this.processJob.bind(this), { connection: { ...connection }, // Each worker gets its own connection concurrency: concurrencyPerWorker, maxStalledCount: 1, stalledInterval: 30000, } ); // Add worker-specific logging worker.on('ready', () => { this.logger.info(`Worker ${i + 1} ready`, { workerId: i + 1 }); }); worker.on('error', (error) => { this.logger.error(`Worker ${i + 1} error`, { workerId: i + 1, error }); }); this.workers.push(worker); } this.queueEvents = new QueueEvents('{data-service-queue}', { connection }); // Test connection // Wait for all workers to be ready await this.queue.waitUntilReady(); await Promise.all(this.workers.map(worker => worker.waitUntilReady())); await this.queueEvents.waitUntilReady(); this.setupEventListeners(); this.isInitialized = true; this.logger.info('Queue service initialized successfully'); await this.setupScheduledTasks(); } catch (error) { this.logger.error('Failed to initialize queue service', { error }); throw error; } } // Update getTotalConcurrency method getTotalConcurrency() { if (!this.isInitialized) { return 0; } return this.workers.reduce((total, worker) => { return total + (worker.opts.concurrency || 1); }, 0); } private async registerProviders() { this.logger.info('Registering providers...'); try { // Import and register all providers const { proxyProvider } = await import('../providers/proxy.provider'); const { quotemediaProvider } = await import('../providers/quotemedia.provider'); const { yahooProvider } = await import('../providers/yahoo.provider'); providerRegistry.registerProvider(proxyProvider); providerRegistry.registerProvider(quotemediaProvider); providerRegistry.registerProvider(yahooProvider); this.logger.info('All providers registered successfully'); } catch (error) { this.logger.error('Failed to register providers', { error }); throw error; } } private async processJob(job: any) { const { provider, operation, payload }: JobData = job.data; this.logger.info('Processing job', { id: job.id, provider, operation, payloadKeys: Object.keys(payload || {}) }); try { // Handle special batch processing jobs if (operation === 'process-batch-items') { const { processBatchJob } = await import('../utils/batch-helpers'); return await processBatchJob(payload, this); } // Get handler from registry const handler = providerRegistry.getHandler(provider, operation); if (!handler) { throw new Error(`No handler found for ${provider}:${operation}`); } // Execute the handler const result = await handler(payload); this.logger.info('Job completed successfully', { id: job.id, provider, operation }); return result; } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error); this.logger.error('Job failed', { id: job.id, provider, operation, error: errorMessage }); throw error; } } async addBulk(jobs: any[]) : Promise { return await this.queue.addBulk(jobs) } private setupEventListeners() { this.queueEvents.on('completed', (job) => { this.logger.info('Job completed', { id: job.jobId }); }); this.queueEvents.on('failed', (job) => { this.logger.error('Job failed', { id: job.jobId, error: job.failedReason }); }); // Note: Worker-specific events are already set up during worker creation // No need for additional progress events since we handle them per-worker } private async setupScheduledTasks() { try { this.logger.info('Setting up scheduled tasks from providers...'); // Get all scheduled jobs from all providers const allScheduledJobs = providerRegistry.getAllScheduledJobs(); if (allScheduledJobs.length === 0) { this.logger.warn('No scheduled jobs found in providers'); return; } // Get existing repeatable jobs for comparison const existingJobs = await this.queue.getRepeatableJobs(); this.logger.info(`Found ${existingJobs.length} existing repeatable jobs`); let successCount = 0; let failureCount = 0; let updatedCount = 0; let newCount = 0; // Process each scheduled job for (const { provider, job } of allScheduledJobs) { try { const jobKey = `${provider}-${job.operation}`; // Check if this job already exists const existingJob = existingJobs.find(existing => existing.key?.includes(jobKey) || existing.name === job.type ); if (existingJob) { // Check if the job needs updating (different cron pattern or config) const needsUpdate = existingJob.pattern !== job.cronPattern; if (needsUpdate) { this.logger.info('Job configuration changed, updating', { jobKey, oldPattern: existingJob.pattern, newPattern: job.cronPattern }); updatedCount++; } else { this.logger.debug('Job unchanged, skipping', { jobKey }); successCount++; continue; } } else { newCount++; } // Add delay between job registrations await new Promise(resolve => setTimeout(resolve, 100)); await this.addRecurringJob({ type: job.type, provider: provider, operation: job.operation, payload: job.payload, priority: job.priority, immediately: job.immediately || false }, job.cronPattern); this.logger.info('Scheduled job registered', { type: job.type, provider, operation: job.operation, cronPattern: job.cronPattern, description: job.description, immediately: job.immediately || false }); successCount++; } catch (error) { this.logger.error('Failed to register scheduled job', { type: job.type, provider, error: error instanceof Error ? error.message : String(error) }); failureCount++; } } this.logger.info(`Scheduled tasks setup complete`, { total: allScheduledJobs.length, successful: successCount, failed: failureCount, updated: updatedCount, new: newCount }); } catch (error) { this.logger.error('Failed to setup scheduled tasks', error); } } async addJob(jobData: JobData, options?: any) { if (!this.isInitialized) { throw new Error('Queue service not initialized. Call initialize() first.'); } const jobType = jobData.type || `${jobData.provider}-${jobData.operation}`; return this.queue.add(jobType, jobData, { priority: jobData.priority || 0, removeOnComplete: 10, removeOnFail: 5, ...options }); } async addRecurringJob(jobData: JobData, cronPattern: string, options?: any) { if (!this.isInitialized) { throw new Error('Queue service not initialized. Call initialize() first.'); } try { // Create a unique job key for this specific job const jobKey = `${jobData.provider}-${jobData.operation}`; // Get all existing repeatable jobs const existingJobs = await this.queue.getRepeatableJobs(); // Find and remove the existing job with the same key if it exists const existingJob = existingJobs.find(job => { // Check if this is the same job by comparing key components return job.key?.includes(jobKey) || job.name === jobData.type; }); if (existingJob) { this.logger.info('Updating existing recurring job', { jobKey, existingPattern: existingJob.pattern, newPattern: cronPattern }); // Remove the existing job if (existingJob.key) { await this.queue.removeRepeatableByKey(existingJob.key); } // Small delay to ensure cleanup is complete await new Promise(resolve => setTimeout(resolve, 100)); } else { this.logger.info('Creating new recurring job', { jobKey, cronPattern }); } // Add the new/updated recurring job const jobType = jobData.type || `${jobData.provider}-${jobData.operation}`; const job = await this.queue.add(jobType, jobData, { repeat: { pattern: cronPattern, tz: 'UTC', immediately: jobData.immediately || false, }, // Use a consistent jobId for this specific recurring job jobId: `recurring-${jobKey}`, removeOnComplete: 1, removeOnFail: 1, attempts: 2, backoff: { type: 'fixed', delay: 5000 }, ...options }); this.logger.info('Recurring job added/updated successfully', { jobKey, type: jobData.type, cronPattern, immediately: jobData.immediately || false }); return job; } catch (error) { this.logger.error('Failed to add/update recurring job', { jobData, cronPattern, error: error instanceof Error ? error.message : String(error) }); throw error; } } async getJobStats() { if (!this.isInitialized) { throw new Error('Queue service not initialized. Call initialize() first.'); } const [waiting, active, completed, failed, delayed] = await Promise.all([ this.queue.getWaiting(), this.queue.getActive(), this.queue.getCompleted(), this.queue.getFailed(), this.queue.getDelayed() ]); return { waiting: waiting.length, active: active.length, completed: completed.length, failed: failed.length, delayed: delayed.length }; } async drainQueue() { if (!this.isInitialized) { await this.queue.drain() } } async getQueueStatus() { if (!this.isInitialized) { throw new Error('Queue service not initialized. Call initialize() first.'); } const stats = await this.getJobStats(); return { ...stats, workers: this.getWorkerCount(), totalConcurrency: this.getTotalConcurrency(), queue: this.queue.name, connection: { host: process.env.DRAGONFLY_HOST || 'localhost', port: parseInt(process.env.DRAGONFLY_PORT || '6379') } }; } getWorkerCount() { if (!this.isInitialized) { return 0; } return this.workers.length; } getRegisteredProviders() { return providerRegistry.getProviders().map(({ key, config }) => ({ key, name: config.name, operations: Object.keys(config.operations), scheduledJobs: config.scheduledJobs?.length || 0 })); } getScheduledJobsInfo() { return providerRegistry.getAllScheduledJobs().map(({ provider, job }) => ({ id: `${provider}-${job.type}`, provider, type: job.type, operation: job.operation, cronPattern: job.cronPattern, priority: job.priority, description: job.description, immediately: job.immediately || false })); } async shutdown() { if (!this.isInitialized) { this.logger.warn('Queue service not initialized, nothing to shutdown'); return; } this.logger.info('Shutting down queue service'); // Close all workers this.logger.info(`Closing ${this.workers.length} workers...`); await Promise.all(this.workers.map((worker, index) => { this.logger.debug(`Closing worker ${index + 1}`); return worker.close(); })); await this.queue.close(); await this.queueEvents.close(); this.isInitialized = false; this.logger.info('Queue service shutdown complete'); } } export const queueManager = new QueueService();