import { getLogger } from '@stock-bot/logger'; import { QueueManager } from './queue-manager'; import type { BatchJobData, BatchResult, JobData, ProcessOptions } from './types'; const logger = getLogger('batch-processor'); /** * Main function - processes items either directly or in batches * Each item becomes payload: item (no processing needed) */ export async function processItems( items: T[], queueName: string, options: ProcessOptions ): Promise { const queueManager = QueueManager.getInstance(); queueManager.getQueue(queueName); const startTime = Date.now(); if (items.length === 0) { return { jobsCreated: 0, mode: 'direct', totalItems: 0, duration: 0, }; } logger.info('Starting batch processing', { totalItems: items.length, mode: options.useBatching ? 'batch' : 'direct', batchSize: options.batchSize, totalDelayHours: options.totalDelayHours, }); try { const result = options.useBatching ? await processBatched(items, queueName, options) : await processDirect(items, queueName, options); const duration = Date.now() - startTime; logger.info('Batch processing completed', { ...result, duration: `${(duration / 1000).toFixed(1)}s`, }); return { ...result, duration }; } catch (error) { logger.error('Batch processing failed', error); throw error; } } /** * Process items directly - each item becomes a separate job */ async function processDirect( items: T[], queueName: string, options: ProcessOptions ): Promise> { const queueManager = QueueManager.getInstance(); queueManager.getQueue(queueName); const totalDelayMs = options.totalDelayHours * 60 * 60 * 1000; // Convert hours to milliseconds const delayPerItem = totalDelayMs / items.length; logger.info('Creating direct jobs', { totalItems: items.length, delayPerItem: `${(delayPerItem / 1000).toFixed(1)}s`, }); const jobs = items.map((item, index) => ({ name: 'process-item', data: { handler: options.handler || 'generic', operation: options.operation || 'process-item', payload: item, // Just the item directly - no wrapper! priority: options.priority || undefined, }, opts: { delay: index * delayPerItem, priority: options.priority || undefined, attempts: options.retries || 3, removeOnComplete: options.removeOnComplete || 10, removeOnFail: options.removeOnFail || 5, }, })); const createdJobs = await addJobsInChunks(queueName, jobs); return { totalItems: items.length, jobsCreated: createdJobs.length, mode: 'direct', }; } /** * Process items in batches - store items directly */ async function processBatched( items: T[], queueName: string, options: ProcessOptions ): Promise> { const queueManager = QueueManager.getInstance(); queueManager.getQueue(queueName); const batchSize = options.batchSize || 100; const batches = createBatches(items, batchSize); const totalDelayMs = options.totalDelayHours * 60 * 60 * 1000; // Convert hours to milliseconds const delayPerBatch = totalDelayMs / batches.length; logger.info('Creating batch jobs', { totalItems: items.length, batchSize, totalBatches: batches.length, delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes`, }); const batchJobs = await Promise.all( batches.map(async (batch, batchIndex) => { // Just store the items directly - no processing needed const payloadKey = await storeItems(batch, queueName, options); return { name: 'process-batch', data: { handler: options.handler || 'generic', operation: 'process-batch-items', payload: { payloadKey, batchIndex, totalBatches: batches.length, itemCount: batch.length, } as BatchJobData, priority: options.priority || undefined, }, opts: { delay: batchIndex * delayPerBatch, priority: options.priority || undefined, attempts: options.retries || 3, removeOnComplete: options.removeOnComplete || 10, removeOnFail: options.removeOnFail || 5, }, }; }) ); const createdJobs = await addJobsInChunks(queueName, batchJobs); return { totalItems: items.length, jobsCreated: createdJobs.length, batchesCreated: batches.length, mode: 'batch', }; } /** * Process a batch job - loads items and creates individual jobs */ export async function processBatchJob( jobData: BatchJobData, queueName: string ): Promise { const queueManager = QueueManager.getInstance(); queueManager.getQueue(queueName); const { payloadKey, batchIndex, totalBatches, itemCount } = jobData; logger.debug('Processing batch job', { batchIndex, totalBatches, itemCount, }); try { const payload = await loadPayload(payloadKey, queueName); if (!payload || !payload.items || !payload.options) { logger.error('Invalid payload data', { payloadKey, payload }); throw new Error(`Invalid payload data for key: ${payloadKey}`); } const { items, options } = payload; // Create jobs directly from items - each item becomes payload: item const jobs = items.map((item: unknown, index: number) => ({ name: 'process-item', data: { handler: options.handler || 'generic', operation: options.operation || 'generic', payload: item, // Just the item directly! priority: options.priority || undefined, }, opts: { delay: index * (options.delayPerItem || 1000), priority: options.priority || undefined, attempts: options.retries || 3, }, })); const createdJobs = await addJobsInChunks(queueName, jobs); // Cleanup payload after successful processing await cleanupPayload(payloadKey, queueName); return { batchIndex, itemsProcessed: items.length, jobsCreated: createdJobs.length, }; } catch (error) { logger.error('Batch job processing failed', { batchIndex, error }); throw error; } } // Helper functions function createBatches(items: T[], batchSize: number): T[][] { const batches: T[][] = []; for (let i = 0; i < items.length; i += batchSize) { batches.push(items.slice(i, i + batchSize)); } return batches; } async function storeItems( items: T[], queueName: string, options: ProcessOptions ): Promise { const queueManager = QueueManager.getInstance(); const cache = queueManager.getCache(queueName); const payloadKey = `payload:${Date.now()}:${Math.random().toString(36).substr(2, 9)}`; const payload = { items, // Just store the items directly options: { delayPerItem: 1000, priority: options.priority || undefined, retries: options.retries || 3, handler: options.handler || 'generic', operation: options.operation || 'generic', }, createdAt: new Date().toISOString(), }; const ttlSeconds = options.ttl || 86400; // 24 hours default await cache.set(payloadKey, payload, ttlSeconds); return payloadKey; } async function loadPayload( key: string, queueName: string ): Promise<{ items: T[]; options: { delayPerItem: number; priority?: number; retries: number; handler: string; operation: string; }; } | null> { const queueManager = QueueManager.getInstance(); const cache = queueManager.getCache(queueName); return (await cache.get(key)) as { items: T[]; options: { delayPerItem: number; priority?: number; retries: number; handler: string; operation: string; }; } | null; } async function cleanupPayload(key: string, queueName: string): Promise { const queueManager = QueueManager.getInstance(); const cache = queueManager.getCache(queueName); await cache.del(key); } async function addJobsInChunks( queueName: string, jobs: Array<{ name: string; data: JobData; opts?: Record }>, chunkSize = 100 ): Promise { const queueManager = QueueManager.getInstance(); const queue = queueManager.getQueue(queueName); const allCreatedJobs = []; for (let i = 0; i < jobs.length; i += chunkSize) { const chunk = jobs.slice(i, i + chunkSize); try { const createdJobs = await queue.addBulk(chunk); allCreatedJobs.push(...createdJobs); // Small delay between chunks to avoid overwhelming Redis if (i + chunkSize < jobs.length) { await new Promise(resolve => setTimeout(resolve, 100)); } } catch (error) { logger.error('Failed to add job chunk', { startIndex: i, chunkSize: chunk.length, error, }); } } return allCreatedJobs; }