import { getLogger } from '@stock-bot/logger'; import { createCache, CacheProvider } from '@stock-bot/cache'; export interface BatchConfig { items: T[]; batchSize?: number; // Optional - only used for batch mode totalDelayMs: number; jobNamePrefix: string; operation: string; service: string; provider: string; priority?: number; createJobData: (item: T, index: number) => any; removeOnComplete?: number; removeOnFail?: number; useBatching?: boolean; // Simple flag to choose mode payloadTtlHours?: number; // TTL for stored payloads (default 24 hours) } const logger = getLogger('batch-processor'); export class BatchProcessor { private cacheProvider: CacheProvider; private isReady = false; private keyPrefix: string = 'batch:'; // Default key prefix for batch payloads constructor( private queueManager: any, private cacheOptions?: { keyPrefix?: string; ttl?: number } // Optional cache configuration ) { this.keyPrefix = cacheOptions?.keyPrefix || 'batch:'; // Initialize cache provider with batch-specific settings this.cacheProvider = createCache('redis', { name: 'batch-processor', keyPrefix: this.keyPrefix, ttl: cacheOptions?.ttl || 86400 * 2, // 48 hours default enableMetrics: true }); } /** * Initialize the batch processor and wait for cache to be ready */ async initialize(timeout: number = 10000): Promise { if (this.isReady) { logger.warn('BatchProcessor already initialized'); return; } logger.info('Initializing BatchProcessor, waiting for cache to be ready...'); try { await this.cacheProvider.waitForReady(timeout); this.isReady = true; logger.info('BatchProcessor initialized successfully', { cacheReady: this.cacheProvider.isReady(), keyPrefix: this.keyPrefix, ttlHours: ((this.cacheOptions?.ttl || 86400 * 2) / 3600).toFixed(1) }); } catch (error) { logger.error('Failed to initialize BatchProcessor', { error: error instanceof Error ? error.message : String(error), timeout }); throw new Error(`BatchProcessor initialization failed: ${error instanceof Error ? error.message : String(error)}`); } } /** * Check if the batch processor is ready */ getReadyStatus(): boolean { return this.isReady && this.cacheProvider.isReady(); } /** * Generate a unique key for storing batch payload in Redis * Note: The cache provider will add its keyPrefix ('batch:') automatically */ private generatePayloadKey(jobNamePrefix: string, batchIndex: number): string { return `payload:${jobNamePrefix}:${batchIndex}:${Date.now()}`; }/** * Store batch payload in Redis and return the key */ private async storeBatchPayload( items: T[], config: BatchConfig, batchIndex: number ): Promise { // Ensure cache is ready before storing if (!this.cacheProvider.isReady()) { throw new Error('Cache provider not ready - cannot store batch payload'); } const payloadKey = this.generatePayloadKey(config.jobNamePrefix, batchIndex); const payload = { items, batchIndex, config: { ...config, items: undefined // Don't store items twice }, createdAt: new Date().toISOString() }; const ttlSeconds = (config.payloadTtlHours || 24) * 60 * 60; await this.cacheProvider.set( payloadKey, JSON.stringify(payload), ttlSeconds ); logger.info('Stored batch payload in Redis', { payloadKey, itemCount: items.length, batchIndex, ttlHours: config.payloadTtlHours || 24 }); return payloadKey; } /** * Load batch payload from Redis */ private async loadBatchPayload(payloadKey: string): Promise<{ items: T[]; batchIndex: number; config: BatchConfig; } | null> { // Auto-initialize if not ready if (!this.cacheProvider.isReady() || !this.isReady) { logger.info('Cache provider not ready, initializing...', { payloadKey }); try { await this.initialize(); } catch (error) { logger.error('Failed to initialize cache provider for loading', { payloadKey, error: error instanceof Error ? error.message : String(error) }); throw new Error('Cache provider initialization failed - cannot load batch payload'); } } try { const payloadData = await this.cacheProvider.get(payloadKey); if (!payloadData) { logger.error('Batch payload not found in Redis', { payloadKey }); throw new Error('Batch payload not found in Redis'); } // Handle both string and already-parsed object let payload; if (typeof payloadData === 'string') { payload = JSON.parse(payloadData); } else { // Already parsed by cache provider payload = payloadData; } logger.info('Loaded batch payload from Redis', { payloadKey, itemCount: payload.items?.length || 0, batchIndex: payload.batchIndex }); return payload; } catch (error) { logger.error('Failed to load batch payload from Redis', { payloadKey, error: error instanceof Error ? error.message : String(error) }); throw new Error('Failed to load batch payload from Redis'); } } /** * Unified method that handles both direct and batch approaches */ async processItems(config: BatchConfig) { // Check if BatchProcessor is ready if (!this.getReadyStatus()) { logger.warn('BatchProcessor not ready, attempting to initialize...'); await this.initialize(); } const { items, useBatching = false } = config; if (items.length === 0) { return { totalItems: 0, jobsCreated: 0 }; } // Final readiness check if (!this.cacheProvider.isReady()) { throw new Error('Cache provider is not ready - cannot process items'); } logger.info('Starting item processing', { totalItems: items.length, mode: useBatching ? 'batch' : 'direct', cacheReady: this.cacheProvider.isReady() }); if (useBatching) { return await this.createBatchJobs(config); } else { return await this.createDirectJobs(config); } } private async createDirectJobs(config: BatchConfig) { const { items, totalDelayMs, jobNamePrefix, operation, service, provider, priority = 2, createJobData, removeOnComplete = 5, removeOnFail = 3 } = config; const delayPerItem = Math.floor(totalDelayMs / items.length); const chunkSize = 100; let totalJobsCreated = 0; logger.info('Creating direct jobs', { totalItems: items.length, delayPerItem: `${(delayPerItem / 1000).toFixed(1)}s`, estimatedDuration: `${(totalDelayMs / 1000 / 60 / 60).toFixed(1)} hours` }); // Process in chunks to avoid overwhelming Redis for (let i = 0; i < items.length; i += chunkSize) { const chunk = items.slice(i, i + chunkSize); const jobs = chunk.map((item, chunkIndex) => { const globalIndex = i + chunkIndex; return { name: `${jobNamePrefix}-processing`, data: { type: `${jobNamePrefix}-processing`, service, provider, operation, payload: createJobData(item, globalIndex), priority }, opts: { delay: globalIndex * delayPerItem, jobId: `${jobNamePrefix}:${globalIndex}:${Date.now()}`, removeOnComplete, removeOnFail } }; }); try { const createdJobs = await this.queueManager.queue.addBulk(jobs); totalJobsCreated += createdJobs.length; // Log progress every 500 jobs if (totalJobsCreated % 500 === 0 || i + chunkSize >= items.length) { logger.info('Direct job creation progress', { created: totalJobsCreated, total: items.length, percentage: `${((totalJobsCreated / items.length) * 100).toFixed(1)}%` }); } } catch (error) { logger.error('Failed to create job chunk', { startIndex: i, chunkSize: chunk.length, error: error instanceof Error ? error.message : String(error) }); } } return { totalItems: items.length, jobsCreated: totalJobsCreated, mode: 'direct' }; } private async createBatchJobs(config: BatchConfig) { const { items, batchSize = 200, totalDelayMs, jobNamePrefix, operation, service, provider, priority = 3 } = config; const totalBatches = Math.ceil(items.length / batchSize); const delayPerBatch = Math.floor(totalDelayMs / totalBatches); const chunkSize = 50; // Create batch jobs in chunks let batchJobsCreated = 0; logger.info('Creating optimized batch jobs with Redis payload storage', { totalItems: items.length, batchSize, totalBatches, delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes`, payloadTtlHours: config.payloadTtlHours || 24 }); // Create batch jobs in chunks for (let chunkStart = 0; chunkStart < totalBatches; chunkStart += chunkSize) { const chunkEnd = Math.min(chunkStart + chunkSize, totalBatches); const batchJobs = []; for (let batchIndex = chunkStart; batchIndex < chunkEnd; batchIndex++) { const startIndex = batchIndex * batchSize; const endIndex = Math.min(startIndex + batchSize, items.length); const batchItems = items.slice(startIndex, endIndex); // Store batch payload in Redis and get reference key const payloadKey = await this.storeBatchPayload(batchItems, config, batchIndex); batchJobs.push({ name: `${jobNamePrefix}-batch-processing`, data: { type: `${jobNamePrefix}-batch-processing`, service, provider, operation: `process-${jobNamePrefix}-batch`, payload: { // Optimized: only store reference and metadata payloadKey: payloadKey, batchIndex, total: totalBatches, itemCount: batchItems.length, configSnapshot: { jobNamePrefix: config.jobNamePrefix, operation: config.operation, service: config.service, provider: config.provider, priority: config.priority, removeOnComplete: config.removeOnComplete, removeOnFail: config.removeOnFail, totalDelayMs: config.totalDelayMs } }, priority }, opts: { delay: batchIndex * delayPerBatch, jobId: `${jobNamePrefix}-batch:${batchIndex}:${Date.now()}` } }); } try { const createdJobs = await this.queueManager.queue.addBulk(batchJobs); batchJobsCreated += createdJobs.length; logger.info('Optimized batch chunk created', { chunkStart: chunkStart + 1, chunkEnd, created: createdJobs.length, totalCreated: batchJobsCreated, progress: `${((chunkEnd / totalBatches) * 100).toFixed(1)}%`, usingRedisStorage: true }); } catch (error) { logger.error('Failed to create batch chunk', { chunkStart, chunkEnd, error: error instanceof Error ? error.message : String(error) }); } // Small delay between chunks if (chunkEnd < totalBatches) { await new Promise(resolve => setTimeout(resolve, 100)); } } return { totalItems: items.length, batchJobsCreated, totalBatches, estimatedDurationHours: totalDelayMs / 1000 / 60 / 60, mode: 'batch', optimized: true }; } /** * Process a batch (called by batch jobs) * Supports both optimized (Redis payload storage) and fallback modes */ async processBatch( jobPayload: any, createJobData?: (item: T, index: number) => any ) { let batchData: { items: T[]; batchIndex: number; config: BatchConfig; }; let total: number; // Check if this is an optimized batch with Redis payload storage if (jobPayload.payloadKey) { logger.info('Processing optimized batch with Redis payload storage', { payloadKey: jobPayload.payloadKey, batchIndex: jobPayload.batchIndex, itemCount: jobPayload.itemCount }); // Load actual payload from Redis const loadedPayload = await this.loadBatchPayload(jobPayload.payloadKey); if (!loadedPayload) { throw new Error(`Failed to load batch payload from Redis: ${jobPayload.payloadKey}`); } batchData = loadedPayload; total = jobPayload.total; // Clean up Redis payload after loading (optional - you might want to keep it for retry scenarios) // await this.redisClient?.del(jobPayload.payloadKey); } else { // Fallback: payload stored directly in job data logger.info('Processing batch with inline payload storage', { batchIndex: jobPayload.batchIndex, itemCount: jobPayload.items?.length || 0 }); batchData = { items: jobPayload.items, batchIndex: jobPayload.batchIndex, config: jobPayload.config }; total = jobPayload.total; } const { items, batchIndex, config } = batchData; logger.info('Processing batch', { batchIndex, batchSize: items.length, total, progress: `${((batchIndex + 1) / total * 100).toFixed(2)}%`, isOptimized: !!jobPayload.payloadKey }); const totalBatchDelayMs = config.totalDelayMs / total; const delayPerItem = Math.floor(totalBatchDelayMs / items.length); const jobs = items.map((item, itemIndex) => { // Use the provided createJobData function or fall back to config const jobDataFn = createJobData || config.createJobData; if (!jobDataFn) { throw new Error('createJobData function is required'); } const userData = jobDataFn(item, itemIndex); return { name: `${config.jobNamePrefix}-processing`, data: { type: `${config.jobNamePrefix}-processing`, service: config.service, provider: config.provider, operation: config.operation, payload: { ...userData, batchIndex, itemIndex, total, source: userData.source || 'batch-processing' }, priority: config.priority || 2 }, opts: { delay: itemIndex * delayPerItem, jobId: `${config.jobNamePrefix}:${batchIndex}:${itemIndex}:${Date.now()}`, removeOnComplete: config.removeOnComplete || 5, removeOnFail: config.removeOnFail || 3 } }; }); try { const createdJobs = await this.queueManager.queue.addBulk(jobs); logger.info('Batch processing completed', { batchIndex, totalItems: items.length, jobsCreated: createdJobs.length, progress: `${((batchIndex + 1) / total * 100).toFixed(2)}%`, memoryOptimized: !!jobPayload.payloadKey }); return { batchIndex, totalItems: items.length, jobsCreated: createdJobs.length, jobsFailed: 0, payloadKey: jobPayload.payloadKey || null }; } catch (error) { logger.error('Failed to process batch', { batchIndex, error: error instanceof Error ? error.message : String(error) }); return { batchIndex, totalItems: items.length, jobsCreated: 0, jobsFailed: items.length, payloadKey: jobPayload.payloadKey || null }; } } /** * Clean up Redis payload after successful processing (optional) */ async cleanupBatchPayload(payloadKey: string): Promise { if (!payloadKey) { return; } if (!this.cacheProvider.isReady()) { logger.warn('Cache provider not ready - skipping cleanup', { payloadKey }); return; } try { await this.cacheProvider.del(payloadKey); logger.info('Cleaned up batch payload from Redis', { payloadKey }); } catch (error) { logger.warn('Failed to cleanup batch payload', { payloadKey, error: error instanceof Error ? error.message : String(error) }); } } }