stock-bot/libs/core/queue/src/batch-processor.ts
2025-06-24 18:09:32 -04:00

353 lines
9.9 KiB
TypeScript

import { QueueManager } from './queue-manager';
import type { BatchJobData, BatchResult, JobData, ProcessOptions } from './types';
/**
* Main function - processes items either directly or in batches
* Each item becomes payload: item (no processing needed)
*/
export async function processItems<T>(
items: T[],
queueName: string,
options: ProcessOptions,
queueManager: QueueManager
): Promise<BatchResult> {
const queue = queueManager.getQueue(queueName);
const logger = queue.createChildLogger('batch-processor', {
queueName,
totalItems: items.length,
mode: options.useBatching ? 'batch' : 'direct',
});
const startTime = Date.now();
if (items.length === 0) {
return {
jobsCreated: 0,
mode: 'direct',
totalItems: 0,
duration: 0,
};
}
logger.info('Starting batch processing', {
totalItems: items.length,
mode: options.useBatching ? 'batch' : 'direct',
batchSize: options.batchSize,
totalDelayHours: options.totalDelayHours,
});
try {
const result = options.useBatching
? await processBatched(items, queueName, options, queueManager)
: await processDirect(items, queueName, options, queueManager);
const duration = Date.now() - startTime;
logger.info('Batch processing completed', {
...result,
duration: `${(duration / 1000).toFixed(1)}s`,
});
return { ...result, duration };
} catch (error) {
logger.error('Batch processing failed', { error });
throw error;
}
}
/**
* Process items directly - each item becomes a separate job
*/
async function processDirect<T>(
items: T[],
queueName: string,
options: ProcessOptions,
queueManager: QueueManager
): Promise<Omit<BatchResult, 'duration'>> {
const queue = queueManager.getQueue(queueName);
const logger = queue.createChildLogger('batch-direct', {
queueName,
totalItems: items.length,
});
const totalDelayMs = options.totalDelayHours * 60 * 60 * 1000; // Convert hours to milliseconds
const delayPerItem = totalDelayMs / items.length;
logger.info('Creating direct jobs', {
totalItems: items.length,
delayPerItem: `${(delayPerItem / 1000).toFixed(1)}s`,
});
const jobs = items.map((item, index) => ({
name: 'process-item',
data: {
handler: options.handler || 'generic',
operation: options.operation || 'process-item',
payload: item, // Just the item directly - no wrapper!
priority: options.priority || undefined,
},
opts: {
delay: index * delayPerItem,
priority: options.priority || undefined,
attempts: options.retries || 3,
removeOnComplete: options.removeOnComplete || 100,
removeOnFail: options.removeOnFail || 100,
},
}));
const createdJobs = await addJobsInChunks(queueName, jobs, queueManager);
return {
totalItems: items.length,
jobsCreated: createdJobs.length,
mode: 'direct',
};
}
/**
* Process items in batches - store items directly
*/
async function processBatched<T>(
items: T[],
queueName: string,
options: ProcessOptions,
queueManager: QueueManager
): Promise<Omit<BatchResult, 'duration'>> {
const queue = queueManager.getQueue(queueName);
const logger = queue.createChildLogger('batch-batched', {
queueName,
totalItems: items.length,
});
const batchSize = options.batchSize || 100;
const batches = createBatches(items, batchSize);
const totalDelayMs = options.totalDelayHours * 60 * 60 * 1000; // Convert hours to milliseconds
const delayPerBatch = totalDelayMs / batches.length;
logger.info('Creating batch jobs', {
totalItems: items.length,
batchSize,
totalBatches: batches.length,
delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes`,
});
const batchJobs = await Promise.all(
batches.map(async (batch, batchIndex) => {
// Just store the items directly - no processing needed
const payloadKey = await storeItems(batch, queueName, options, queueManager);
return {
name: 'process-batch',
data: {
handler: options.handler || 'generic',
operation: 'process-batch-items',
payload: {
payloadKey,
batchIndex,
totalBatches: batches.length,
itemCount: batch.length,
totalDelayHours: options.totalDelayHours,
} as BatchJobData,
priority: options.priority || undefined,
},
opts: {
delay: batchIndex * delayPerBatch,
priority: options.priority || undefined,
attempts: options.retries || 3,
removeOnComplete: options.removeOnComplete || 100,
removeOnFail: options.removeOnFail || 100,
},
};
})
);
const createdJobs = await addJobsInChunks(queueName, batchJobs, queueManager);
return {
totalItems: items.length,
jobsCreated: createdJobs.length,
batchesCreated: batches.length,
mode: 'batch',
};
}
/**
* Process a batch job - loads items and creates individual jobs
*/
export async function processBatchJob(
jobData: BatchJobData,
queueName: string,
queueManager: QueueManager
): Promise<unknown> {
const queue = queueManager.getQueue(queueName);
const logger = queue.createChildLogger('batch-job', {
queueName,
batchIndex: jobData.batchIndex,
payloadKey: jobData.payloadKey,
});
const { payloadKey, batchIndex, totalBatches, itemCount, totalDelayHours } = jobData;
logger.debug('Processing batch job', {
batchIndex,
totalBatches,
itemCount,
totalDelayHours,
});
try {
const payload = await loadPayload(payloadKey, queueName, queueManager);
if (!payload || !payload.items || !payload.options) {
logger.error('Invalid payload data', { payloadKey, payload });
throw new Error(`Invalid payload data for key: ${payloadKey}`);
}
const { items, options } = payload;
// Calculate the time window for this batch
const totalDelayMs = totalDelayHours * 60 * 60 * 1000; // Convert hours to ms
const delayPerBatch = totalDelayMs / totalBatches; // Time allocated for each batch
const delayPerItem = delayPerBatch / items.length; // Distribute items evenly within batch window
logger.debug('Calculating job delays', {
batchIndex,
delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes`,
delayPerItem: `${(delayPerItem / 1000).toFixed(2)} seconds`,
itemsInBatch: items.length,
});
// Create jobs directly from items - each item becomes payload: item
const jobs = items.map((item: unknown, index: number) => ({
name: 'process-item',
data: {
handler: options.handler || 'generic',
operation: options.operation || 'generic',
payload: item, // Just the item directly!
priority: options.priority || undefined,
},
opts: {
delay: index * delayPerItem, // Distribute evenly within batch window
priority: options.priority || undefined,
attempts: options.retries || 3,
},
}));
const createdJobs = await addJobsInChunks(queueName, jobs, queueManager);
// Cleanup payload after successful processing
await cleanupPayload(payloadKey, queueName, queueManager);
return {
batchIndex,
itemsProcessed: items.length,
jobsCreated: createdJobs.length,
};
} catch (error) {
logger.error('Batch job processing failed', { batchIndex, error });
throw error;
}
}
// Helper functions
function createBatches<T>(items: T[], batchSize: number): T[][] {
const batches: T[][] = [];
for (let i = 0; i < items.length; i += batchSize) {
batches.push(items.slice(i, i + batchSize));
}
return batches;
}
async function storeItems<T>(
items: T[],
queueName: string,
options: ProcessOptions,
queueManager: QueueManager
): Promise<string> {
const cache = queueManager.getCache(queueName);
const payloadKey = `payload:${Date.now()}:${Math.random().toString(36).substr(2, 9)}`;
const payload = {
items, // Just store the items directly
options: {
delayPerItem: 1000,
priority: options.priority || undefined,
retries: options.retries || 3,
handler: options.handler || 'generic',
operation: options.operation || 'generic',
},
createdAt: new Date().toISOString(),
};
const ttlSeconds = options.ttl || 86400; // 24 hours default
await cache.set(payloadKey, payload, ttlSeconds);
return payloadKey;
}
async function loadPayload<T>(
key: string,
queueName: string,
queueManager: QueueManager
): Promise<{
items: T[];
options: {
delayPerItem: number;
priority?: number;
retries: number;
handler: string;
operation: string;
};
} | null> {
const cache = queueManager.getCache(queueName);
return (await cache.get(key)) as {
items: T[];
options: {
delayPerItem: number;
priority?: number;
retries: number;
handler: string;
operation: string;
};
} | null;
}
async function cleanupPayload(
key: string,
queueName: string,
queueManager: QueueManager
): Promise<void> {
const cache = queueManager.getCache(queueName);
await cache.del(key);
}
async function addJobsInChunks(
queueName: string,
jobs: Array<{ name: string; data: JobData; opts?: Record<string, unknown> }>,
queueManager: QueueManager,
chunkSize = 100
): Promise<unknown[]> {
const queue = queueManager.getQueue(queueName);
const logger = queue.createChildLogger('batch-chunk', {
queueName,
totalJobs: jobs.length,
});
const allCreatedJobs = [];
for (let i = 0; i < jobs.length; i += chunkSize) {
const chunk = jobs.slice(i, i + chunkSize);
try {
const createdJobs = await queue.addBulk(chunk);
allCreatedJobs.push(...createdJobs);
// Small delay between chunks to avoid overwhelming Redis
if (i + chunkSize < jobs.length) {
await new Promise(resolve => setTimeout(resolve, 100));
}
} catch (error) {
logger.error('Failed to add job chunk', {
startIndex: i,
chunkSize: chunk.length,
error,
});
}
}
return allCreatedJobs;
}