diff --git a/libs/queue/src/batch-processor.ts b/libs/queue/src/batch-processor.ts index 93e55da..8e8bb2b 100644 --- a/libs/queue/src/batch-processor.ts +++ b/libs/queue/src/batch-processor.ts @@ -89,7 +89,6 @@ async function processDirect( const createdJobs = await addJobsInChunks(queueName, jobs); - return { totalItems: items.length, jobsCreated: createdJobs.length, @@ -134,6 +133,7 @@ async function processBatched( batchIndex, totalBatches: batches.length, itemCount: batch.length, + totalDelayHours: options.totalDelayHours, } as BatchJobData, priority: options.priority || undefined, }, @@ -161,18 +161,16 @@ async function processBatched( /** * Process a batch job - loads items and creates individual jobs */ -export async function processBatchJob( - jobData: BatchJobData, - queueName: string -): Promise { +export async function processBatchJob(jobData: BatchJobData, queueName: string): Promise { const queueManager = QueueManager.getInstance(); queueManager.getQueue(queueName); - const { payloadKey, batchIndex, totalBatches, itemCount } = jobData; + const { payloadKey, batchIndex, totalBatches, itemCount, totalDelayHours } = jobData; logger.debug('Processing batch job', { batchIndex, totalBatches, itemCount, + totalDelayHours, }); try { @@ -184,6 +182,18 @@ export async function processBatchJob( const { items, options } = payload; + // Calculate the time window for this batch + const totalDelayMs = totalDelayHours * 60 * 60 * 1000; // Convert hours to ms + const delayPerBatch = totalDelayMs / totalBatches; // Time allocated for each batch + const delayPerItem = delayPerBatch / items.length; // Distribute items evenly within batch window + + logger.debug('Calculating job delays', { + batchIndex, + delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes`, + delayPerItem: `${(delayPerItem / 1000).toFixed(2)} seconds`, + itemsInBatch: items.length, + }); + // Create jobs directly from items - each item becomes payload: item const jobs = items.map((item: unknown, index: number) => ({ name: 'process-item', @@ -194,7 +204,7 @@ export async function processBatchJob( priority: options.priority || undefined, }, opts: { - delay: index * (options.delayPerItem || 1000), + delay: index * delayPerItem, // Distribute evenly within batch window priority: options.priority || undefined, attempts: options.retries || 3, }, diff --git a/libs/queue/src/types.ts b/libs/queue/src/types.ts index a8ed887..1c21286 100644 --- a/libs/queue/src/types.ts +++ b/libs/queue/src/types.ts @@ -133,6 +133,7 @@ export interface BatchJobData { batchIndex: number; totalBatches: number; itemCount: number; + totalDelayHours: number; // Total time to distribute all batches } export interface HandlerInitializer {