diff --git a/apps/data-service/src/providers/proxy.provider.ts b/apps/data-service/src/providers/proxy.provider.ts index 59d3a12..a7a44f2 100644 --- a/apps/data-service/src/providers/proxy.provider.ts +++ b/apps/data-service/src/providers/proxy.provider.ts @@ -35,7 +35,7 @@ export const proxyProvider: ProviderConfig = { source: 'batch-processing' }), queueManager, { - totalDelayMs: parseInt(process.env.PROXY_VALIDATION_HOURS || '4') * 60 * 60 * 1000, + totalDelayMs: parseInt(process.env.PROXY_VALIDATION_HOURS || '7') * 60 * 60 * 1000, batchSize: parseInt(process.env.PROXY_BATCH_SIZE || '200'), useBatching: process.env.PROXY_DIRECT_MODE !== 'true', priority: 2, @@ -114,16 +114,16 @@ export const proxyProvider: ProviderConfig = { } }, scheduledJobs: [ - { - type: 'proxy-maintenance', - operation: 'fetch-and-check', - payload: {}, - // should remove and just run at the same time so app restarts dont keeping adding same jobs - cronPattern: getEvery24HourCron(), - priority: 5, - immediately: true, // Don't run immediately during startup to avoid conflicts - description: 'Fetch and validate proxy list from sources' - } + // { + // type: 'proxy-maintenance', + // operation: 'fetch-and-check', + // payload: {}, + // // should remove and just run at the same time so app restarts dont keeping adding same jobs + // cronPattern: getEvery24HourCron(), + // priority: 5, + // immediately: true, // Don't run immediately during startup to avoid conflicts + // description: 'Fetch and validate proxy list from sources' + // } ] }; diff --git a/apps/data-service/src/services/queue.service.ts b/apps/data-service/src/services/queue.service.ts index e34c674..29b4f3e 100644 --- a/apps/data-service/src/services/queue.service.ts +++ b/apps/data-service/src/services/queue.service.ts @@ -8,14 +8,22 @@ export class QueueService { private workers: Worker[] = []; private queueEvents!: QueueEvents; + private config = { + workers: parseInt(process.env.WORKER_COUNT || '5'), + concurrency: parseInt(process.env.WORKER_CONCURRENCY || '20'), + redis: { + host: process.env.DRAGONFLY_HOST || 'localhost', + port: parseInt(process.env.DRAGONFLY_PORT || '6379') + } + }; + private get isInitialized() { return !!this.queue; } constructor() { // Don't initialize in constructor to allow for proper async initialization - } - async initialize() { + } async initialize() { if (this.isInitialized) { this.logger.warn('Queue service already initialized'); return; @@ -23,105 +31,94 @@ export class QueueService { this.logger.info('Initializing queue service...'); - // Register all providers first - await this.registerProviders(); - - const connection = this.getConnection(); - const queueName = '{data-service-queue}'; - try { + // Step 1: Register providers + await this.registerProviders(); + + // Step 2: Setup queue and workers + const connection = this.getConnection(); + const queueName = '{data-service-queue}'; + this.queue = new Queue(queueName, { connection, defaultJobOptions: { removeOnComplete: 10, removeOnFail: 5, attempts: 3, - backoff: { - type: 'exponential', - delay: 1000, - } + backoff: { type: 'exponential', delay: 1000 } } }); - // Create workers (keeping same count as before) - const workerCount = parseInt(process.env.WORKER_COUNT || '5'); - const concurrencyPerWorker = parseInt(process.env.WORKER_CONCURRENCY || '20'); - - for (let i = 0; i < workerCount; i++) { - const worker = new Worker( - queueName, - this.processJob.bind(this), - { - connection: { ...connection }, - concurrency: concurrencyPerWorker, - maxStalledCount: 1, - stalledInterval: 30000, - } - ); - - this.setupWorkerEvents(worker, i); - this.workers.push(worker); - } - this.queueEvents = new QueueEvents(queueName, { connection }); + + // Step 3: Create workers + const { workerCount, totalConcurrency } = this.createWorkers(queueName, connection); - // Wait for readiness - await this.queue.waitUntilReady(); - await Promise.all(this.workers.map(worker => worker.waitUntilReady())); - await this.queueEvents.waitUntilReady(); + // Step 4: Wait for readiness (parallel) + await Promise.all([ + this.queue.waitUntilReady(), + this.queueEvents.waitUntilReady(), + ...this.workers.map(worker => worker.waitUntilReady()) + ]); + // Step 5: Setup events and scheduled tasks this.setupQueueEvents(); await this.setupScheduledTasks(); this.logger.info('Queue service initialized successfully', { - workers: this.workers.length, - totalConcurrency: workerCount * concurrencyPerWorker + workers: workerCount, + totalConcurrency }); } catch (error) { this.logger.error('Failed to initialize queue service', { error }); throw error; } - } - // Update getTotalConcurrency method - getTotalConcurrency() { - if (!this.isInitialized) { - return 0; - } - return this.workers.reduce((total, worker) => { - return total + (worker.opts.concurrency || 1); - }, 0); - } - - private getConnection() { + } private getConnection() { return { - host: process.env.DRAGONFLY_HOST || 'localhost', - port: parseInt(process.env.DRAGONFLY_PORT || '6379'), + ...this.config.redis, maxRetriesPerRequest: null, retryDelayOnFailover: 100, lazyConnect: false }; } - private setupWorkerEvents(worker: Worker, index: number) { - worker.on('ready', () => { - this.logger.info(`Worker ${index + 1} ready`); - }); + private createWorkers(queueName: string, connection: any) { + for (let i = 0; i < this.config.workers; i++) { + const worker = new Worker(queueName, this.processJob.bind(this), { + connection: { ...connection }, + concurrency: this.config.concurrency, + maxStalledCount: 1, + stalledInterval: 30000, + }); + + // Setup events inline + worker.on('ready', () => this.logger.info(`Worker ${i + 1} ready`)); + worker.on('error', (error) => this.logger.error(`Worker ${i + 1} error`, { error })); + + this.workers.push(worker); + } - worker.on('error', (error) => { - this.logger.error(`Worker ${index + 1} error`, { error }); + return { + workerCount: this.config.workers, + totalConcurrency: this.config.workers * this.config.concurrency + }; + } private setupQueueEvents() { + // Only log failures, not every completion + this.queueEvents.on('failed', (job, error) => { + this.logger.error('Job failed', { + id: job.jobId, + error: String(error) + }); }); - } - - private setupQueueEvents() { - this.queueEvents.on('completed', (job) => { - this.logger.debug('Job completed', { id: job.jobId }); - }); - - this.queueEvents.on('failed', (job) => { - this.logger.error('Job failed', { id: job.jobId, error: job.failedReason }); - }); - } private async registerProviders() { + + // Only log completions in debug mode + if (process.env.LOG_LEVEL === 'debug') { + this.queueEvents.on('completed', (job) => { + this.logger.debug('Job completed', { id: job.jobId }); + }); + } + }private async registerProviders() { this.logger.info('Registering providers...'); try { @@ -187,72 +184,63 @@ export class QueueService { }); throw error; } - } - async addBulk(jobs: any[]): Promise { + } async addBulk(jobs: any[]): Promise { return await this.queue.addBulk(jobs); } - private async setupScheduledTasks() { - try { - this.logger.info('Setting up scheduled tasks from providers...'); - - const allScheduledJobs = providerRegistry.getAllScheduledJobs(); - - if (allScheduledJobs.length === 0) { - this.logger.warn('No scheduled jobs found in providers'); - return; - } - - let successCount = 0; - let failureCount = 0; - - // Process each scheduled job - simplified without complex update logic - for (const { provider, job } of allScheduledJobs) { - try { - await this.addRecurringJob({ - type: job.type, - provider: provider, - operation: job.operation, - payload: job.payload, - priority: job.priority, - immediately: job.immediately || false - }, job.cronPattern); - - this.logger.info('Scheduled job registered', { - type: job.type, - provider, - operation: job.operation, - cronPattern: job.cronPattern, - description: job.description, - immediately: job.immediately || false - }); - - successCount++; - - } catch (error) { - this.logger.error('Failed to register scheduled job', { - type: job.type, - provider, - error: error instanceof Error ? error.message : String(error) - }); - failureCount++; - } - } - - this.logger.info(`Scheduled tasks setup complete`, { - total: allScheduledJobs.length, - successful: successCount, - failed: failureCount - }); - - } catch (error) { - this.logger.error('Failed to setup scheduled tasks', error); - } + private getTotalConcurrency() { + return this.workers.reduce((total, worker) => total + (worker.opts.concurrency || 1), 0); } - async addJob(jobData: JobData, options?: any) { - if (!this.isInitialized) { - throw new Error('Queue service not initialized. Call initialize() first.'); + private async setupScheduledTasks() { + const allScheduledJobs = providerRegistry.getAllScheduledJobs(); + + if (allScheduledJobs.length === 0) { + this.logger.warn('No scheduled jobs found in providers'); + return; } + + this.logger.info('Setting up scheduled tasks...', { count: allScheduledJobs.length }); + + // Use Promise.allSettled for parallel processing + better error handling + const results = await Promise.allSettled( + allScheduledJobs.map(async ({ provider, job }) => { + await this.addRecurringJob({ + type: job.type, + provider, + operation: job.operation, + payload: job.payload, + priority: job.priority, + immediately: job.immediately || false + }, job.cronPattern); + + return { provider, operation: job.operation }; + }) + ); + + // Log results + const successful = results.filter(r => r.status === 'fulfilled'); + const failed = results.filter(r => r.status === 'rejected'); + + if (failed.length > 0) { + failed.forEach((result, index) => { + const { provider, job } = allScheduledJobs[index]; + this.logger.error('Failed to register scheduled job', { + provider, + operation: job.operation, + error: result.reason + }); + }); + } + + this.logger.info('Scheduled tasks setup complete', { + successful: successful.length, + failed: failed.length + }); + } private async addJobInternal(jobData: JobData, options: any = {}) { + if (!this.isInitialized) { + throw new Error('Queue service not initialized'); + } + const jobType = jobData.type || `${jobData.provider}-${jobData.operation}`; return this.queue.add(jobType, jobData, { priority: jobData.priority || 0, @@ -261,22 +249,19 @@ export class QueueService { ...options }); } - async addRecurringJob(jobData: JobData, cronPattern: string, options?: any) { - if (!this.isInitialized) { - throw new Error('Queue service not initialized. Call initialize() first.'); - } + async addJob(jobData: JobData, options?: any) { + return this.addJobInternal(jobData, options); + } async addRecurringJob(jobData: JobData, cronPattern: string, options?: any) { const jobKey = `recurring-${jobData.provider}-${jobData.operation}`; - // Let BullMQ handle duplicate prevention with consistent jobId - const jobType = jobData.type || `${jobData.provider}-${jobData.operation}`; - const job = await this.queue.add(jobType, jobData, { + return this.addJobInternal(jobData, { repeat: { pattern: cronPattern, tz: 'UTC', immediately: jobData.immediately || false, }, - jobId: jobKey, // Consistent ID prevents duplicates + jobId: jobKey, removeOnComplete: 1, removeOnFail: 1, attempts: 2, @@ -286,15 +271,6 @@ export class QueueService { }, ...options }); - - this.logger.info('Recurring job added successfully', { - jobKey, - type: jobData.type, - cronPattern, - immediately: jobData.immediately || false - }); - - return job; } async getJobStats() { if (!this.isInitialized) { @@ -322,50 +298,18 @@ export class QueueService { await this.queue.drain(); } } - async getQueueStatus() { if (!this.isInitialized) { - throw new Error('Queue service not initialized. Call initialize() first.'); + throw new Error('Queue service not initialized'); } + const stats = await this.getJobStats(); return { ...stats, workers: this.workers.length, - totalConcurrency: this.getTotalConcurrency(), - queue: this.queue.name, - connection: { - host: process.env.DRAGONFLY_HOST || 'localhost', - port: parseInt(process.env.DRAGONFLY_PORT || '6379') - } + concurrency: this.getTotalConcurrency() }; } - - getWorkerCount() { - return this.workers.length; - } - - getRegisteredProviders() { - return providerRegistry.getProviders().map(({ key, config }) => ({ - key, - name: config.name, - operations: Object.keys(config.operations), - scheduledJobs: config.scheduledJobs?.length || 0 - })); - } - - getScheduledJobsInfo() { - return providerRegistry.getAllScheduledJobs().map(({ provider, job }) => ({ - id: `${provider}-${job.type}`, - provider, - type: job.type, - operation: job.operation, - cronPattern: job.cronPattern, - priority: job.priority, - description: job.description, - immediately: job.immediately || false - })); - } - async shutdown() { if (!this.isInitialized) { this.logger.warn('Queue service not initialized, nothing to shutdown'); @@ -375,7 +319,6 @@ export class QueueService { this.logger.info('Shutting down queue service'); // Close all workers - this.logger.info(`Closing ${this.workers.length} workers...`); await Promise.all(this.workers.map((worker, index) => { this.logger.debug(`Closing worker ${index + 1}`); return worker.close();