From fad9e62d580aba3df10197e6a5911332c78415dd Mon Sep 17 00:00:00 2001 From: Boki Date: Tue, 10 Jun 2025 23:35:33 -0400 Subject: [PATCH] queue service simplification --- .../src/services/queue.service.ts | 412 ++++++++---------- 1 file changed, 172 insertions(+), 240 deletions(-) diff --git a/apps/data-service/src/services/queue.service.ts b/apps/data-service/src/services/queue.service.ts index 438dc6c..e34c674 100644 --- a/apps/data-service/src/services/queue.service.ts +++ b/apps/data-service/src/services/queue.service.ts @@ -1,18 +1,20 @@ -import { Queue, Worker, QueueEvents } from 'bullmq'; +import { Queue, Worker, QueueEvents, type Job } from 'bullmq'; import { getLogger } from '@stock-bot/logger'; -import { providerRegistry, JobData } from './provider-registry.service'; +import { providerRegistry, type JobData } from './provider-registry.service'; export class QueueService { private logger = getLogger('queue-service'); private queue!: Queue; private workers: Worker[] = []; private queueEvents!: QueueEvents; - private isInitialized = false; + + private get isInitialized() { + return !!this.queue; + } constructor() { // Don't initialize in constructor to allow for proper async initialization } - async initialize() { if (this.isInitialized) { this.logger.warn('Queue service already initialized'); @@ -24,26 +26,11 @@ export class QueueService { // Register all providers first await this.registerProviders(); - const connection = { - host: process.env.DRAGONFLY_HOST || 'localhost', - port: parseInt(process.env.DRAGONFLY_PORT || '6379'), - // Add these Redis-specific options to fix the undeclared key issue - maxRetriesPerRequest: null, - retryDelayOnFailover: 100, - enableReadyCheck: false, - lazyConnect: false, - // Disable Redis Cluster mode if you're using standalone Redis/Dragonfly - enableOfflineQueue: true - }; - - // Worker configuration - const workerCount = parseInt(process.env.WORKER_COUNT || '5'); - const concurrencyPerWorker = parseInt(process.env.WORKER_CONCURRENCY || '20'); - - this.logger.info('Connecting to Redis/Dragonfly', connection); + const connection = this.getConnection(); + const queueName = '{data-service-queue}'; try { - this.queue = new Queue('{data-service-queue}', { + this.queue = new Queue(queueName, { connection, defaultJobOptions: { removeOnComplete: 10, @@ -55,48 +42,47 @@ export class QueueService { } } }); - // Create multiple workers + + // Create workers (keeping same count as before) + const workerCount = parseInt(process.env.WORKER_COUNT || '5'); + const concurrencyPerWorker = parseInt(process.env.WORKER_CONCURRENCY || '20'); + for (let i = 0; i < workerCount; i++) { const worker = new Worker( - '{data-service-queue}', + queueName, this.processJob.bind(this), { - connection: { ...connection }, // Each worker gets its own connection + connection: { ...connection }, concurrency: concurrencyPerWorker, maxStalledCount: 1, stalledInterval: 30000, } ); - // Add worker-specific logging - worker.on('ready', () => { - this.logger.info(`Worker ${i + 1} ready`, { workerId: i + 1 }); - }); - - worker.on('error', (error) => { - this.logger.error(`Worker ${i + 1} error`, { workerId: i + 1, error }); - }); - + + this.setupWorkerEvents(worker, i); this.workers.push(worker); } - this.queueEvents = new QueueEvents('{data-service-queue}', { connection }); // Test connection - // Wait for all workers to be ready + this.queueEvents = new QueueEvents(queueName, { connection }); + + // Wait for readiness await this.queue.waitUntilReady(); await Promise.all(this.workers.map(worker => worker.waitUntilReady())); await this.queueEvents.waitUntilReady(); - this.setupEventListeners(); - this.isInitialized = true; - this.logger.info('Queue service initialized successfully'); - + this.setupQueueEvents(); await this.setupScheduledTasks(); + this.logger.info('Queue service initialized successfully', { + workers: this.workers.length, + totalConcurrency: workerCount * concurrencyPerWorker + }); + } catch (error) { this.logger.error('Failed to initialize queue service', { error }); throw error; } } - // Update getTotalConcurrency method getTotalConcurrency() { if (!this.isInitialized) { @@ -107,25 +93,57 @@ export class QueueService { }, 0); } - private async registerProviders() { + private getConnection() { + return { + host: process.env.DRAGONFLY_HOST || 'localhost', + port: parseInt(process.env.DRAGONFLY_PORT || '6379'), + maxRetriesPerRequest: null, + retryDelayOnFailover: 100, + lazyConnect: false + }; + } + + private setupWorkerEvents(worker: Worker, index: number) { + worker.on('ready', () => { + this.logger.info(`Worker ${index + 1} ready`); + }); + + worker.on('error', (error) => { + this.logger.error(`Worker ${index + 1} error`, { error }); + }); + } + + private setupQueueEvents() { + this.queueEvents.on('completed', (job) => { + this.logger.debug('Job completed', { id: job.jobId }); + }); + + this.queueEvents.on('failed', (job) => { + this.logger.error('Job failed', { id: job.jobId, error: job.failedReason }); + }); + } private async registerProviders() { this.logger.info('Registering providers...'); try { - // Import and register all providers - const { proxyProvider } = await import('../providers/proxy.provider'); - const { quotemediaProvider } = await import('../providers/quotemedia.provider'); - const { yahooProvider } = await import('../providers/yahoo.provider'); + // Define providers to register + const providers = [ + { module: '../providers/proxy.provider', export: 'proxyProvider' }, + { module: '../providers/quotemedia.provider', export: 'quotemediaProvider' }, + { module: '../providers/yahoo.provider', export: 'yahooProvider' } + ]; - providerRegistry.registerProvider(proxyProvider); - providerRegistry.registerProvider(quotemediaProvider); - providerRegistry.registerProvider(yahooProvider); + // Import and register all providers + for (const { module, export: exportName } of providers) { + const providerModule = await import(module); + providerRegistry.registerProvider(providerModule[exportName]); + } this.logger.info('All providers registered successfully'); } catch (error) { this.logger.error('Failed to register providers', { error }); throw error; } - } private async processJob(job: any) { + }private async processJob(job: Job) { const { provider, operation, payload }: JobData = job.data; this.logger.info('Processing job', { @@ -133,24 +151,23 @@ export class QueueService { provider, operation, payloadKeys: Object.keys(payload || {}) - }); - - try { - // Handle special batch processing jobs - if (operation === 'process-batch-items') { - const { processBatchJob } = await import('../utils/batch-helpers'); - return await processBatchJob(payload, this); - } - - // Get handler from registry - const handler = providerRegistry.getHandler(provider, operation); + }); try { + let result; - if (!handler) { - throw new Error(`No handler found for ${provider}:${operation}`); + if (operation === 'process-batch-items') { + // Special handling for batch processing - requires 2 parameters + const { processBatchJob } = await import('../utils/batch-helpers'); + result = await processBatchJob(payload, this); + } else { + // Regular handler lookup - requires 1 parameter + const handler = providerRegistry.getHandler(provider, operation); + + if (!handler) { + throw new Error(`No handler found for ${provider}:${operation}`); + } + + result = await handler(payload); } - - // Execute the handler - const result = await handler(payload); this.logger.info('Job completed successfully', { id: job.id, @@ -171,116 +188,67 @@ export class QueueService { throw error; } } - - async addBulk(jobs: any[]) : Promise { - return await this.queue.addBulk(jobs) + async addBulk(jobs: any[]): Promise { + return await this.queue.addBulk(jobs); } - private setupEventListeners() { - this.queueEvents.on('completed', (job) => { - this.logger.info('Job completed', { id: job.jobId }); - }); - this.queueEvents.on('failed', (job) => { - this.logger.error('Job failed', { id: job.jobId, error: job.failedReason }); - }); - - // Note: Worker-specific events are already set up during worker creation - // No need for additional progress events since we handle them per-worker - } private async setupScheduledTasks() { - try { - this.logger.info('Setting up scheduled tasks from providers...'); - - // Get all scheduled jobs from all providers - const allScheduledJobs = providerRegistry.getAllScheduledJobs(); - - if (allScheduledJobs.length === 0) { - this.logger.warn('No scheduled jobs found in providers'); - return; - } - - // Get existing repeatable jobs for comparison - const existingJobs = await this.queue.getRepeatableJobs(); - this.logger.info(`Found ${existingJobs.length} existing repeatable jobs`); - - let successCount = 0; - let failureCount = 0; - let updatedCount = 0; - let newCount = 0; // Process each scheduled job - for (const { provider, job } of allScheduledJobs) { - try { - const jobKey = `${provider}-${job.operation}`; - - // Check if this job already exists - const existingJob = existingJobs.find(existing => - existing.key?.includes(jobKey) || existing.name === job.type - ); - - if (existingJob) { - // Check if the job needs updating (different cron pattern or config) - const needsUpdate = existingJob.pattern !== job.cronPattern; - - if (needsUpdate) { - this.logger.info('Job configuration changed, updating', { - jobKey, - oldPattern: existingJob.pattern, - newPattern: job.cronPattern - }); - updatedCount++; - } else { - this.logger.debug('Job unchanged, skipping', { jobKey }); - successCount++; - continue; - } - } else { - newCount++; - } - - // Add delay between job registrations - await new Promise(resolve => setTimeout(resolve, 100)); - - await this.addRecurringJob({ - type: job.type, - provider: provider, - operation: job.operation, - payload: job.payload, - priority: job.priority, - immediately: job.immediately || false - }, job.cronPattern); - - this.logger.info('Scheduled job registered', { - type: job.type, - provider, - operation: job.operation, - cronPattern: job.cronPattern, - description: job.description, - immediately: job.immediately || false - }); - - successCount++; - - } catch (error) { - this.logger.error('Failed to register scheduled job', { - type: job.type, - provider, - error: error instanceof Error ? error.message : String(error) - }); - failureCount++; + try { + this.logger.info('Setting up scheduled tasks from providers...'); + + const allScheduledJobs = providerRegistry.getAllScheduledJobs(); + + if (allScheduledJobs.length === 0) { + this.logger.warn('No scheduled jobs found in providers'); + return; } - } - this.logger.info(`Scheduled tasks setup complete`, { - total: allScheduledJobs.length, - successful: successCount, - failed: failureCount, - updated: updatedCount, - new: newCount - }); - - } catch (error) { - this.logger.error('Failed to setup scheduled tasks', error); + let successCount = 0; + let failureCount = 0; + + // Process each scheduled job - simplified without complex update logic + for (const { provider, job } of allScheduledJobs) { + try { + await this.addRecurringJob({ + type: job.type, + provider: provider, + operation: job.operation, + payload: job.payload, + priority: job.priority, + immediately: job.immediately || false + }, job.cronPattern); + + this.logger.info('Scheduled job registered', { + type: job.type, + provider, + operation: job.operation, + cronPattern: job.cronPattern, + description: job.description, + immediately: job.immediately || false + }); + + successCount++; + + } catch (error) { + this.logger.error('Failed to register scheduled job', { + type: job.type, + provider, + error: error instanceof Error ? error.message : String(error) + }); + failureCount++; + } + } + + this.logger.info(`Scheduled tasks setup complete`, { + total: allScheduledJobs.length, + successful: successCount, + failed: failureCount + }); + + } catch (error) { + this.logger.error('Failed to setup scheduled tasks', error); + } } -} async addJob(jobData: JobData, options?: any) { if (!this.isInitialized) { throw new Error('Queue service not initialized. Call initialize() first.'); @@ -293,77 +261,41 @@ export class QueueService { ...options }); } - async addRecurringJob(jobData: JobData, cronPattern: string, options?: any) { if (!this.isInitialized) { throw new Error('Queue service not initialized. Call initialize() first.'); } - try { // Create a unique job key for this specific job - const jobKey = `${jobData.provider}-${jobData.operation}`; - - // Get all existing repeatable jobs - const existingJobs = await this.queue.getRepeatableJobs(); - - // Find and remove the existing job with the same key if it exists - const existingJob = existingJobs.find(job => { - // Check if this is the same job by comparing key components - return job.key?.includes(jobKey) || job.name === jobData.type; - }); + const jobKey = `recurring-${jobData.provider}-${jobData.operation}`; + + // Let BullMQ handle duplicate prevention with consistent jobId + const jobType = jobData.type || `${jobData.provider}-${jobData.operation}`; + const job = await this.queue.add(jobType, jobData, { + repeat: { + pattern: cronPattern, + tz: 'UTC', + immediately: jobData.immediately || false, + }, + jobId: jobKey, // Consistent ID prevents duplicates + removeOnComplete: 1, + removeOnFail: 1, + attempts: 2, + backoff: { + type: 'fixed', + delay: 5000 + }, + ...options + }); - if (existingJob) { - this.logger.info('Updating existing recurring job', { - jobKey, - existingPattern: existingJob.pattern, - newPattern: cronPattern - }); // Remove the existing job - if (existingJob.key) { - await this.queue.removeRepeatableByKey(existingJob.key); - } - - // Small delay to ensure cleanup is complete - await new Promise(resolve => setTimeout(resolve, 100)); - } else { - this.logger.info('Creating new recurring job', { jobKey, cronPattern }); - } // Add the new/updated recurring job - const jobType = jobData.type || `${jobData.provider}-${jobData.operation}`; - const job = await this.queue.add(jobType, jobData, { - repeat: { - pattern: cronPattern, - tz: 'UTC', - immediately: jobData.immediately || false, - }, - // Use a consistent jobId for this specific recurring job - jobId: `recurring-${jobKey}`, - removeOnComplete: 1, - removeOnFail: 1, - attempts: 2, - backoff: { - type: 'fixed', - delay: 5000 - }, - ...options - }); + this.logger.info('Recurring job added successfully', { + jobKey, + type: jobData.type, + cronPattern, + immediately: jobData.immediately || false + }); - this.logger.info('Recurring job added/updated successfully', { - jobKey, - type: jobData.type, - cronPattern, - immediately: jobData.immediately || false - }); - - return job; - - } catch (error) { - this.logger.error('Failed to add/update recurring job', { - jobData, - cronPattern, - error: error instanceof Error ? error.message : String(error) - }); - throw error; - } + return job; } - async getJobStats() { if (!this.isInitialized) { throw new Error('Queue service not initialized. Call initialize() first.'); @@ -386,8 +318,8 @@ export class QueueService { } async drainQueue() { - if (!this.isInitialized) { - await this.queue.drain() + if (this.isInitialized) { + await this.queue.drain(); } } @@ -398,7 +330,7 @@ export class QueueService { const stats = await this.getJobStats(); return { ...stats, - workers: this.getWorkerCount(), + workers: this.workers.length, totalConcurrency: this.getTotalConcurrency(), queue: this.queue.name, connection: { @@ -409,11 +341,9 @@ export class QueueService { } getWorkerCount() { - if (!this.isInitialized) { - return 0; - } return this.workers.length; } + getRegisteredProviders() { return providerRegistry.getProviders().map(({ key, config }) => ({ key, @@ -422,6 +352,7 @@ export class QueueService { scheduledJobs: config.scheduledJobs?.length || 0 })); } + getScheduledJobsInfo() { return providerRegistry.getAllScheduledJobs().map(({ provider, job }) => ({ id: `${provider}-${job.type}`, @@ -434,11 +365,13 @@ export class QueueService { immediately: job.immediately || false })); } + async shutdown() { if (!this.isInitialized) { this.logger.warn('Queue service not initialized, nothing to shutdown'); return; } + this.logger.info('Shutting down queue service'); // Close all workers @@ -450,7 +383,6 @@ export class QueueService { await this.queue.close(); await this.queueEvents.close(); - this.isInitialized = false; this.logger.info('Queue service shutdown complete'); } }