fixed batching to work better

This commit is contained in:
Boki 2025-06-10 11:11:02 -04:00
parent b974753d8b
commit 1caa2d5168

View file

@ -22,14 +22,16 @@ const logger = getLogger('batch-processor');
export class BatchProcessor { export class BatchProcessor {
private cacheProvider: CacheProvider; private cacheProvider: CacheProvider;
private isReady = false; private isReady = false;
private keyPrefix: string = 'batch:'; // Default key prefix for batch payloads
constructor( constructor(
private queueManager: any, private queueManager: any,
private cacheOptions?: { keyPrefix?: string; ttl?: number } // Optional cache configuration private cacheOptions?: { keyPrefix?: string; ttl?: number } // Optional cache configuration
) { ) {
this.keyPrefix = cacheOptions?.keyPrefix || 'batch:';
// Initialize cache provider with batch-specific settings // Initialize cache provider with batch-specific settings
this.cacheProvider = createCache('redis', { this.cacheProvider = createCache('redis', {
keyPrefix: cacheOptions?.keyPrefix || 'batch:', keyPrefix: this.keyPrefix,
ttl: cacheOptions?.ttl || 86400 * 2, // 48 hours default ttl: cacheOptions?.ttl || 86400 * 2, // 48 hours default
enableMetrics: true enableMetrics: true
}); });
@ -51,7 +53,7 @@ export class BatchProcessor {
this.isReady = true; this.isReady = true;
logger.info('BatchProcessor initialized successfully', { logger.info('BatchProcessor initialized successfully', {
cacheReady: this.cacheProvider.isReady(), cacheReady: this.cacheProvider.isReady(),
keyPrefix: this.cacheOptions?.keyPrefix || 'batch:', keyPrefix: this.keyPrefix,
ttlHours: ((this.cacheOptions?.ttl || 86400 * 2) / 3600).toFixed(1) ttlHours: ((this.cacheOptions?.ttl || 86400 * 2) / 3600).toFixed(1)
}); });
} catch (error) { } catch (error) {
@ -69,13 +71,13 @@ export class BatchProcessor {
getReadyStatus(): boolean { getReadyStatus(): boolean {
return this.isReady && this.cacheProvider.isReady(); return this.isReady && this.cacheProvider.isReady();
} }
/** /**
* Generate a unique key for storing batch payload in Redis * Generate a unique key for storing batch payload in Redis
* Note: The cache provider will add its keyPrefix ('batch:') automatically
*/ */
private generatePayloadKey(jobNamePrefix: string, batchIndex: number): string { private generatePayloadKey(jobNamePrefix: string, batchIndex: number): string {
return `batch:payload:${jobNamePrefix}:${batchIndex}:${Date.now()}`; return `payload:${jobNamePrefix}:${batchIndex}:${Date.now()}`;
} /** }/**
* Store batch payload in Redis and return the key * Store batch payload in Redis and return the key
*/ */
private async storeBatchPayload<T>( private async storeBatchPayload<T>(
@ -123,21 +125,36 @@ export class BatchProcessor {
batchIndex: number; batchIndex: number;
config: BatchConfig<T>; config: BatchConfig<T>;
} | null> { } | null> {
// Ensure cache is ready before loading // Auto-initialize if not ready
if (!this.cacheProvider.isReady()) { if (!this.cacheProvider.isReady() || !this.isReady) {
logger.error('Cache provider not ready - cannot load batch payload', { payloadKey }); logger.info('Cache provider not ready, initializing...', { payloadKey });
return null; try {
await this.initialize();
} catch (error) {
logger.error('Failed to initialize cache provider for loading', {
payloadKey,
error: error instanceof Error ? error.message : String(error)
});
throw new Error('Cache provider initialization failed - cannot load batch payload');
}
} }
try { try {
const payloadJson = await this.cacheProvider.get<string>(payloadKey); const payloadData = await this.cacheProvider.get<any>(payloadKey);
if (!payloadJson) { if (!payloadData) {
logger.error('Batch payload not found in Redis', { payloadKey }); logger.error('Batch payload not found in Redis', { payloadKey });
return null; throw new Error('Batch payload not found in Redis');
} }
const payload = JSON.parse(payloadJson); // Handle both string and already-parsed object
let payload;
if (typeof payloadData === 'string') {
payload = JSON.parse(payloadData);
} else {
// Already parsed by cache provider
payload = payloadData;
}
logger.info('Loaded batch payload from Redis', { logger.info('Loaded batch payload from Redis', {
payloadKey, payloadKey,
@ -151,7 +168,7 @@ export class BatchProcessor {
payloadKey, payloadKey,
error: error instanceof Error ? error.message : String(error) error: error instanceof Error ? error.message : String(error)
}); });
return null; throw new Error('Failed to load batch payload from Redis');
} }
} }
/** /**
@ -230,7 +247,7 @@ export class BatchProcessor {
}, },
opts: { opts: {
delay: globalIndex * delayPerItem, delay: globalIndex * delayPerItem,
jobId: `${jobNamePrefix}-${globalIndex}-${Date.now()}`, jobId: `${jobNamePrefix}:${globalIndex}:${Date.now()}`,
removeOnComplete, removeOnComplete,
removeOnFail removeOnFail
} }
@ -300,7 +317,6 @@ export class BatchProcessor {
const batchItems = items.slice(startIndex, endIndex); const batchItems = items.slice(startIndex, endIndex);
// Store batch payload in Redis and get reference key // Store batch payload in Redis and get reference key
const payloadKey = await this.storeBatchPayload(batchItems, config, batchIndex); const payloadKey = await this.storeBatchPayload(batchItems, config, batchIndex);
batchJobs.push({ batchJobs.push({
name: `${jobNamePrefix}-batch-processing`, name: `${jobNamePrefix}-batch-processing`,
data: { data: {
@ -310,7 +326,7 @@ export class BatchProcessor {
operation: `process-${jobNamePrefix}-batch`, operation: `process-${jobNamePrefix}-batch`,
payload: { payload: {
// Optimized: only store reference and metadata // Optimized: only store reference and metadata
payloadKey, payloadKey: payloadKey,
batchIndex, batchIndex,
total: totalBatches, total: totalBatches,
itemCount: batchItems.length, itemCount: batchItems.length,
@ -329,7 +345,7 @@ export class BatchProcessor {
}, },
opts: { opts: {
delay: batchIndex * delayPerBatch, delay: batchIndex * delayPerBatch,
jobId: `${jobNamePrefix}-batch-${batchIndex}-${Date.now()}` jobId: `${jobNamePrefix}-batch:${batchIndex}:${Date.now()}`
} }
}); });
} }
@ -459,7 +475,7 @@ export class BatchProcessor {
}, },
opts: { opts: {
delay: itemIndex * delayPerItem, delay: itemIndex * delayPerItem,
jobId: `${config.jobNamePrefix}-${batchIndex}-${itemIndex}-${Date.now()}`, jobId: `${config.jobNamePrefix}:${batchIndex}:${itemIndex}:${Date.now()}`,
removeOnComplete: config.removeOnComplete || 5, removeOnComplete: config.removeOnComplete || 5,
removeOnFail: config.removeOnFail || 3 removeOnFail: config.removeOnFail || 3
} }