From 1caa2d516822eabc7e5383b486b14f03d7bc1ff2 Mon Sep 17 00:00:00 2001 From: Boki Date: Tue, 10 Jun 2025 11:11:02 -0400 Subject: [PATCH] fixed batching to work better --- .../data-service/src/utils/batch-processor.ts | 54 ++++++++++++------- 1 file changed, 35 insertions(+), 19 deletions(-) diff --git a/apps/data-service/src/utils/batch-processor.ts b/apps/data-service/src/utils/batch-processor.ts index 7d4222c..5f3222f 100644 --- a/apps/data-service/src/utils/batch-processor.ts +++ b/apps/data-service/src/utils/batch-processor.ts @@ -22,14 +22,16 @@ const logger = getLogger('batch-processor'); export class BatchProcessor { private cacheProvider: CacheProvider; private isReady = false; + private keyPrefix: string = 'batch:'; // Default key prefix for batch payloads constructor( private queueManager: any, private cacheOptions?: { keyPrefix?: string; ttl?: number } // Optional cache configuration ) { + this.keyPrefix = cacheOptions?.keyPrefix || 'batch:'; // Initialize cache provider with batch-specific settings this.cacheProvider = createCache('redis', { - keyPrefix: cacheOptions?.keyPrefix || 'batch:', + keyPrefix: this.keyPrefix, ttl: cacheOptions?.ttl || 86400 * 2, // 48 hours default enableMetrics: true }); @@ -51,7 +53,7 @@ export class BatchProcessor { this.isReady = true; logger.info('BatchProcessor initialized successfully', { cacheReady: this.cacheProvider.isReady(), - keyPrefix: this.cacheOptions?.keyPrefix || 'batch:', + keyPrefix: this.keyPrefix, ttlHours: ((this.cacheOptions?.ttl || 86400 * 2) / 3600).toFixed(1) }); } catch (error) { @@ -69,13 +71,13 @@ export class BatchProcessor { getReadyStatus(): boolean { return this.isReady && this.cacheProvider.isReady(); } - /** * Generate a unique key for storing batch payload in Redis + * Note: The cache provider will add its keyPrefix ('batch:') automatically */ private generatePayloadKey(jobNamePrefix: string, batchIndex: number): string { - return `batch:payload:${jobNamePrefix}:${batchIndex}:${Date.now()}`; - } /** + return `payload:${jobNamePrefix}:${batchIndex}:${Date.now()}`; + }/** * Store batch payload in Redis and return the key */ private async storeBatchPayload( @@ -123,21 +125,36 @@ export class BatchProcessor { batchIndex: number; config: BatchConfig; } | null> { - // Ensure cache is ready before loading - if (!this.cacheProvider.isReady()) { - logger.error('Cache provider not ready - cannot load batch payload', { payloadKey }); - return null; + // Auto-initialize if not ready + if (!this.cacheProvider.isReady() || !this.isReady) { + logger.info('Cache provider not ready, initializing...', { payloadKey }); + try { + await this.initialize(); + } catch (error) { + logger.error('Failed to initialize cache provider for loading', { + payloadKey, + error: error instanceof Error ? error.message : String(error) + }); + throw new Error('Cache provider initialization failed - cannot load batch payload'); + } } try { - const payloadJson = await this.cacheProvider.get(payloadKey); + const payloadData = await this.cacheProvider.get(payloadKey); - if (!payloadJson) { + if (!payloadData) { logger.error('Batch payload not found in Redis', { payloadKey }); - return null; + throw new Error('Batch payload not found in Redis'); } - const payload = JSON.parse(payloadJson); + // Handle both string and already-parsed object + let payload; + if (typeof payloadData === 'string') { + payload = JSON.parse(payloadData); + } else { + // Already parsed by cache provider + payload = payloadData; + } logger.info('Loaded batch payload from Redis', { payloadKey, @@ -151,7 +168,7 @@ export class BatchProcessor { payloadKey, error: error instanceof Error ? error.message : String(error) }); - return null; + throw new Error('Failed to load batch payload from Redis'); } } /** @@ -230,7 +247,7 @@ export class BatchProcessor { }, opts: { delay: globalIndex * delayPerItem, - jobId: `${jobNamePrefix}-${globalIndex}-${Date.now()}`, + jobId: `${jobNamePrefix}:${globalIndex}:${Date.now()}`, removeOnComplete, removeOnFail } @@ -300,7 +317,6 @@ export class BatchProcessor { const batchItems = items.slice(startIndex, endIndex); // Store batch payload in Redis and get reference key const payloadKey = await this.storeBatchPayload(batchItems, config, batchIndex); - batchJobs.push({ name: `${jobNamePrefix}-batch-processing`, data: { @@ -310,7 +326,7 @@ export class BatchProcessor { operation: `process-${jobNamePrefix}-batch`, payload: { // Optimized: only store reference and metadata - payloadKey, + payloadKey: payloadKey, batchIndex, total: totalBatches, itemCount: batchItems.length, @@ -329,7 +345,7 @@ export class BatchProcessor { }, opts: { delay: batchIndex * delayPerBatch, - jobId: `${jobNamePrefix}-batch-${batchIndex}-${Date.now()}` + jobId: `${jobNamePrefix}-batch:${batchIndex}:${Date.now()}` } }); } @@ -459,7 +475,7 @@ export class BatchProcessor { }, opts: { delay: itemIndex * delayPerItem, - jobId: `${config.jobNamePrefix}-${batchIndex}-${itemIndex}-${Date.now()}`, + jobId: `${config.jobNamePrefix}:${batchIndex}:${itemIndex}:${Date.now()}`, removeOnComplete: config.removeOnComplete || 5, removeOnFail: config.removeOnFail || 3 }