fixed batch timings
This commit is contained in:
parent
c1d04723e1
commit
65fb9116fb
2 changed files with 18 additions and 7 deletions
|
|
@ -89,7 +89,6 @@ async function processDirect<T>(
|
||||||
|
|
||||||
const createdJobs = await addJobsInChunks(queueName, jobs);
|
const createdJobs = await addJobsInChunks(queueName, jobs);
|
||||||
|
|
||||||
|
|
||||||
return {
|
return {
|
||||||
totalItems: items.length,
|
totalItems: items.length,
|
||||||
jobsCreated: createdJobs.length,
|
jobsCreated: createdJobs.length,
|
||||||
|
|
@ -134,6 +133,7 @@ async function processBatched<T>(
|
||||||
batchIndex,
|
batchIndex,
|
||||||
totalBatches: batches.length,
|
totalBatches: batches.length,
|
||||||
itemCount: batch.length,
|
itemCount: batch.length,
|
||||||
|
totalDelayHours: options.totalDelayHours,
|
||||||
} as BatchJobData,
|
} as BatchJobData,
|
||||||
priority: options.priority || undefined,
|
priority: options.priority || undefined,
|
||||||
},
|
},
|
||||||
|
|
@ -161,18 +161,16 @@ async function processBatched<T>(
|
||||||
/**
|
/**
|
||||||
* Process a batch job - loads items and creates individual jobs
|
* Process a batch job - loads items and creates individual jobs
|
||||||
*/
|
*/
|
||||||
export async function processBatchJob(
|
export async function processBatchJob(jobData: BatchJobData, queueName: string): Promise<unknown> {
|
||||||
jobData: BatchJobData,
|
|
||||||
queueName: string
|
|
||||||
): Promise<unknown> {
|
|
||||||
const queueManager = QueueManager.getInstance();
|
const queueManager = QueueManager.getInstance();
|
||||||
queueManager.getQueue(queueName);
|
queueManager.getQueue(queueName);
|
||||||
const { payloadKey, batchIndex, totalBatches, itemCount } = jobData;
|
const { payloadKey, batchIndex, totalBatches, itemCount, totalDelayHours } = jobData;
|
||||||
|
|
||||||
logger.debug('Processing batch job', {
|
logger.debug('Processing batch job', {
|
||||||
batchIndex,
|
batchIndex,
|
||||||
totalBatches,
|
totalBatches,
|
||||||
itemCount,
|
itemCount,
|
||||||
|
totalDelayHours,
|
||||||
});
|
});
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
@ -184,6 +182,18 @@ export async function processBatchJob(
|
||||||
|
|
||||||
const { items, options } = payload;
|
const { items, options } = payload;
|
||||||
|
|
||||||
|
// Calculate the time window for this batch
|
||||||
|
const totalDelayMs = totalDelayHours * 60 * 60 * 1000; // Convert hours to ms
|
||||||
|
const delayPerBatch = totalDelayMs / totalBatches; // Time allocated for each batch
|
||||||
|
const delayPerItem = delayPerBatch / items.length; // Distribute items evenly within batch window
|
||||||
|
|
||||||
|
logger.debug('Calculating job delays', {
|
||||||
|
batchIndex,
|
||||||
|
delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes`,
|
||||||
|
delayPerItem: `${(delayPerItem / 1000).toFixed(2)} seconds`,
|
||||||
|
itemsInBatch: items.length,
|
||||||
|
});
|
||||||
|
|
||||||
// Create jobs directly from items - each item becomes payload: item
|
// Create jobs directly from items - each item becomes payload: item
|
||||||
const jobs = items.map((item: unknown, index: number) => ({
|
const jobs = items.map((item: unknown, index: number) => ({
|
||||||
name: 'process-item',
|
name: 'process-item',
|
||||||
|
|
@ -194,7 +204,7 @@ export async function processBatchJob(
|
||||||
priority: options.priority || undefined,
|
priority: options.priority || undefined,
|
||||||
},
|
},
|
||||||
opts: {
|
opts: {
|
||||||
delay: index * (options.delayPerItem || 1000),
|
delay: index * delayPerItem, // Distribute evenly within batch window
|
||||||
priority: options.priority || undefined,
|
priority: options.priority || undefined,
|
||||||
attempts: options.retries || 3,
|
attempts: options.retries || 3,
|
||||||
},
|
},
|
||||||
|
|
|
||||||
|
|
@ -133,6 +133,7 @@ export interface BatchJobData {
|
||||||
batchIndex: number;
|
batchIndex: number;
|
||||||
totalBatches: number;
|
totalBatches: number;
|
||||||
itemCount: number;
|
itemCount: number;
|
||||||
|
totalDelayHours: number; // Total time to distribute all batches
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface HandlerInitializer {
|
export interface HandlerInitializer {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue