diff --git a/apps/data-service/src/services/queue.service.ts b/apps/data-service/src/services/queue.service.ts index 650e0b8..6e8b4e8 100644 --- a/apps/data-service/src/services/queue.service.ts +++ b/apps/data-service/src/services/queue.service.ts @@ -36,27 +36,45 @@ export class QueueService { const connection = { host: process.env.DRAGONFLY_HOST || 'localhost', port: parseInt(process.env.DRAGONFLY_PORT || '6379'), + // Add these Redis-specific options to fix the undeclared key issue + maxRetriesPerRequest: 3, + retryDelayOnFailover: 100, + enableReadyCheck: false, + lazyConnect: true, + // Disable Redis Cluster mode if you're using standalone Redis/Dragonfly + enableOfflineQueue: false }; this.logger.info('Connecting to Redis/Dragonfly', connection); try { - this.queue = new Queue('data-service-queue', { connection }); - this.worker = new Worker('data-service-queue', this.processJob.bind(this), { + this.queue = new Queue('{data-service-queue}', { connection, - concurrency: 10 + defaultJobOptions: { + removeOnComplete: 10, + removeOnFail: 5, + attempts: 3, + backoff: { + type: 'exponential', + delay: 1000, + } + } }); - this.queueEvents = new QueueEvents('data-service-queue', { connection }); // Test connection + this.worker = new Worker('{data-service-queue}', this.processJob.bind(this), { + connection, + concurrency: 5, // Reduce concurrency to avoid overwhelming Redis + }); + this.queueEvents = new QueueEvents('{data-service-queue}', { connection }); // Test connection await this.queue.waitUntilReady(); await this.worker.waitUntilReady(); await this.queueEvents.waitUntilReady(); this.setupEventListeners(); - this.isInitialized = true; this.logger.info('Queue service initialized successfully'); await this.setupScheduledTasks(); + } catch (error) { this.logger.error('Failed to initialize queue service', { error }); throw error; @@ -144,6 +162,22 @@ export class QueueService { try { this.logger.info('Setting up scheduled tasks from providers...'); + // Clear any existing repeatable jobs first + const repeatableJobs = await this.queue.getRepeatableJobs(); + this.logger.info(`Found ${repeatableJobs.length} existing repeatable jobs`); + + for (const job of repeatableJobs) { + try { + await this.queue.removeJobScheduler(job.name); + this.logger.debug('Removed existing repeatable job', { name: job.name }); + } catch (error) { + this.logger.warn('Failed to remove existing repeatable job', { + name: job.name, + error: error instanceof Error ? error.message : String(error) + }); + } + } + // Get all scheduled jobs from all providers const allScheduledJobs = providerRegistry.getAllScheduledJobs(); @@ -152,9 +186,15 @@ export class QueueService { return; } - // Register each scheduled job + let successCount = 0; + let failureCount = 0; + + // Register each scheduled job with delay between registrations for (const { service, provider, job } of allScheduledJobs) { try { + // Add a small delay between job registrations to avoid overwhelming Redis + await new Promise(resolve => setTimeout(resolve, 100)); + await this.addRecurringJob({ type: job.type, service: service, @@ -172,6 +212,9 @@ export class QueueService { cronPattern: job.cronPattern, description: job.description }); + + successCount++; + } catch (error) { this.logger.error('Failed to register scheduled job', { type: job.type, @@ -179,10 +222,11 @@ export class QueueService { provider, error: error instanceof Error ? error.message : String(error) }); + failureCount++; } } - this.logger.info(`Successfully configured ${allScheduledJobs.length} scheduled tasks`); + this.logger.info(`Scheduled tasks setup complete: ${successCount} successful, ${failureCount} failed`); } catch (error) { this.logger.error('Failed to setup scheduled tasks', error); @@ -201,20 +245,60 @@ export class QueueService { }); } - async addRecurringJob(jobData: JobData, cronPattern: string) { + async addRecurringJob(jobData: JobData, cronPattern: string, options?: any) { if (!this.isInitialized) { throw new Error('Queue service not initialized. Call initialize() first.'); } - return this.queue.add( - `recurring-${jobData.type}`, - jobData, - { - repeat: { pattern: cronPattern }, + + try { + // Create a unique job ID to avoid Redis key conflicts + const jobId = `recurring-${jobData.service}-${jobData.provider}-${jobData.operation}`; + + // First, try to remove any existing recurring job with the same ID + try { + await this.queue.removeRepeatable(jobData.type, { + pattern: cronPattern, + jobId: jobId + }); + } catch (removeError) { + // Ignore errors when removing non-existent jobs + this.logger.debug('No existing recurring job to remove', { jobId }); + } + + // Add the new recurring job with proper options + const job = await this.queue.add(jobData.type, jobData, { + repeat: { + pattern: cronPattern, + // Use UTC timezone to avoid timezone issues + tz: 'UTC' + }, + jobId: jobId, removeOnComplete: 1, removeOnFail: 1, - jobId: `recurring-${jobData.service}-${jobData.provider}-${jobData.operation}` - } - ); + attempts: 2, + backoff: { + type: 'fixed', + delay: 5000 + }, + ...options + }); + + this.logger.info('Recurring job added successfully', { + jobId: jobId, + type: jobData.type, + cronPattern: cronPattern + }); + + return job; + + } catch (error) { + this.logger.error('Failed to add recurring job', { + jobData, + cronPattern, + error: error instanceof Error ? error.message : String(error) + }); + throw error; + } } async getJobStats() { diff --git a/docker-compose.yml b/docker-compose.yml index 7734bd8..6c0c487 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -13,6 +13,8 @@ services: - --proactor_threads=8 - --bind=0.0.0.0 - --admin_port=6380 + - --cluster_mode=emulated + - --lock_on_hashtags volumes: - dragonfly_data:/data restart: unless-stopped