seperating batch payload from job queue

This commit is contained in:
Boki 2025-06-10 09:58:20 -04:00
parent c57733ebca
commit b974753d8b
5 changed files with 360 additions and 25 deletions

View file

@ -1,4 +1,5 @@
import { getLogger } from '@stock-bot/logger';
import { createCache, CacheProvider } from '@stock-bot/cache';
export interface BatchConfig<T> {
items: T[];
@ -13,23 +14,173 @@ export interface BatchConfig<T> {
removeOnComplete?: number;
removeOnFail?: number;
useBatching?: boolean; // Simple flag to choose mode
payloadTtlHours?: number; // TTL for stored payloads (default 24 hours)
}
const logger = getLogger('batch-processor');
export class BatchProcessor {
constructor(private queueManager: any) {}
private cacheProvider: CacheProvider;
private isReady = false;
constructor(
private queueManager: any,
private cacheOptions?: { keyPrefix?: string; ttl?: number } // Optional cache configuration
) {
// Initialize cache provider with batch-specific settings
this.cacheProvider = createCache('redis', {
keyPrefix: cacheOptions?.keyPrefix || 'batch:',
ttl: cacheOptions?.ttl || 86400 * 2, // 48 hours default
enableMetrics: true
});
}
/**
* Initialize the batch processor and wait for cache to be ready
*/
async initialize(timeout: number = 10000): Promise<void> {
if (this.isReady) {
logger.warn('BatchProcessor already initialized');
return;
}
logger.info('Initializing BatchProcessor, waiting for cache to be ready...');
try {
await this.cacheProvider.waitForReady(timeout);
this.isReady = true;
logger.info('BatchProcessor initialized successfully', {
cacheReady: this.cacheProvider.isReady(),
keyPrefix: this.cacheOptions?.keyPrefix || 'batch:',
ttlHours: ((this.cacheOptions?.ttl || 86400 * 2) / 3600).toFixed(1)
});
} catch (error) {
logger.error('Failed to initialize BatchProcessor', {
error: error instanceof Error ? error.message : String(error),
timeout
});
throw new Error(`BatchProcessor initialization failed: ${error instanceof Error ? error.message : String(error)}`);
}
}
/**
* Check if the batch processor is ready
*/
getReadyStatus(): boolean {
return this.isReady && this.cacheProvider.isReady();
}
/**
* Generate a unique key for storing batch payload in Redis
*/
private generatePayloadKey(jobNamePrefix: string, batchIndex: number): string {
return `batch:payload:${jobNamePrefix}:${batchIndex}:${Date.now()}`;
} /**
* Store batch payload in Redis and return the key
*/
private async storeBatchPayload<T>(
items: T[],
config: BatchConfig<T>,
batchIndex: number
): Promise<string> {
// Ensure cache is ready before storing
if (!this.cacheProvider.isReady()) {
throw new Error('Cache provider not ready - cannot store batch payload');
}
const payloadKey = this.generatePayloadKey(config.jobNamePrefix, batchIndex);
const payload = {
items,
batchIndex,
config: {
...config,
items: undefined // Don't store items twice
},
createdAt: new Date().toISOString()
};
const ttlSeconds = (config.payloadTtlHours || 24) * 60 * 60;
await this.cacheProvider.set(
payloadKey,
JSON.stringify(payload),
ttlSeconds
);
logger.info('Stored batch payload in Redis', {
payloadKey,
itemCount: items.length,
batchIndex,
ttlHours: config.payloadTtlHours || 24
});
return payloadKey;
} /**
* Load batch payload from Redis
*/
private async loadBatchPayload<T>(payloadKey: string): Promise<{
items: T[];
batchIndex: number;
config: BatchConfig<T>;
} | null> {
// Ensure cache is ready before loading
if (!this.cacheProvider.isReady()) {
logger.error('Cache provider not ready - cannot load batch payload', { payloadKey });
return null;
}
try {
const payloadJson = await this.cacheProvider.get<string>(payloadKey);
if (!payloadJson) {
logger.error('Batch payload not found in Redis', { payloadKey });
return null;
}
const payload = JSON.parse(payloadJson);
logger.info('Loaded batch payload from Redis', {
payloadKey,
itemCount: payload.items?.length || 0,
batchIndex: payload.batchIndex
});
return payload;
} catch (error) {
logger.error('Failed to load batch payload from Redis', {
payloadKey,
error: error instanceof Error ? error.message : String(error)
});
return null;
}
}
/**
* Unified method that handles both direct and batch approaches
*/
async processItems<T>(config: BatchConfig<T>) {
// Check if BatchProcessor is ready
if (!this.getReadyStatus()) {
logger.warn('BatchProcessor not ready, attempting to initialize...');
await this.initialize();
}
const { items, useBatching = false } = config;
if (items.length === 0) {
return { totalItems: 0, jobsCreated: 0 };
}
// Final readiness check
if (!this.cacheProvider.isReady()) {
throw new Error('Cache provider is not ready - cannot process items');
}
logger.info('Starting item processing', {
totalItems: items.length,
mode: useBatching ? 'batch' : 'direct',
cacheReady: this.cacheProvider.isReady()
});
if (useBatching) {
return await this.createBatchJobs(config);
} else {
@ -113,7 +264,6 @@ export class BatchProcessor {
mode: 'direct'
};
}
private async createBatchJobs<T>(config: BatchConfig<T>) {
const {
items,
@ -131,11 +281,12 @@ export class BatchProcessor {
const chunkSize = 50; // Create batch jobs in chunks
let batchJobsCreated = 0;
logger.info('Creating batch jobs', {
logger.info('Creating optimized batch jobs with Redis payload storage', {
totalItems: items.length,
batchSize,
totalBatches,
delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes`
delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes`,
payloadTtlHours: config.payloadTtlHours || 24
});
// Create batch jobs in chunks
@ -147,6 +298,8 @@ export class BatchProcessor {
const startIndex = batchIndex * batchSize;
const endIndex = Math.min(startIndex + batchSize, items.length);
const batchItems = items.slice(startIndex, endIndex);
// Store batch payload in Redis and get reference key
const payloadKey = await this.storeBatchPayload(batchItems, config, batchIndex);
batchJobs.push({
name: `${jobNamePrefix}-batch-processing`,
@ -156,10 +309,21 @@ export class BatchProcessor {
provider,
operation: `process-${jobNamePrefix}-batch`,
payload: {
items: batchItems,
// Optimized: only store reference and metadata
payloadKey,
batchIndex,
total: totalBatches,
config: { ...config, priority: priority - 1 }
itemCount: batchItems.length,
configSnapshot: {
jobNamePrefix: config.jobNamePrefix,
operation: config.operation,
service: config.service,
provider: config.provider,
priority: config.priority,
removeOnComplete: config.removeOnComplete,
removeOnFail: config.removeOnFail,
totalDelayMs: config.totalDelayMs
}
},
priority
},
@ -173,13 +337,13 @@ export class BatchProcessor {
try {
const createdJobs = await this.queueManager.queue.addBulk(batchJobs);
batchJobsCreated += createdJobs.length;
logger.info('Batch chunk created', {
logger.info('Optimized batch chunk created', {
chunkStart: chunkStart + 1,
chunkEnd,
created: createdJobs.length,
totalCreated: batchJobsCreated,
progress: `${((chunkEnd / totalBatches) * 100).toFixed(1)}%`
progress: `${((chunkEnd / totalBatches) * 100).toFixed(1)}%`,
usingRedisStorage: true
});
} catch (error) {
logger.error('Failed to create batch chunk', {
@ -193,33 +357,75 @@ export class BatchProcessor {
if (chunkEnd < totalBatches) {
await new Promise(resolve => setTimeout(resolve, 100));
}
}
return {
} return {
totalItems: items.length,
batchJobsCreated,
totalBatches,
estimatedDurationHours: totalDelayMs / 1000 / 60 / 60,
mode: 'batch'
mode: 'batch',
optimized: true
};
}
/**
* Process a batch (called by batch jobs)
* Supports both optimized (Redis payload storage) and fallback modes
*/
async processBatch<T>(payload: {
items: T[];
batchIndex: number;
total: number;
config: BatchConfig<T>;
}, createJobData?: (item: T, index: number) => any) {
const { items, batchIndex, total, config } = payload;
async processBatch<T>(
jobPayload: any,
createJobData?: (item: T, index: number) => any
) {
let batchData: {
items: T[];
batchIndex: number;
config: BatchConfig<T>;
};
let total: number;
// Check if this is an optimized batch with Redis payload storage
if (jobPayload.payloadKey) {
logger.info('Processing optimized batch with Redis payload storage', {
payloadKey: jobPayload.payloadKey,
batchIndex: jobPayload.batchIndex,
itemCount: jobPayload.itemCount
});
// Load actual payload from Redis
const loadedPayload = await this.loadBatchPayload<T>(jobPayload.payloadKey);
if (!loadedPayload) {
throw new Error(`Failed to load batch payload from Redis: ${jobPayload.payloadKey}`);
}
batchData = loadedPayload;
total = jobPayload.total;
// Clean up Redis payload after loading (optional - you might want to keep it for retry scenarios)
// await this.redisClient?.del(jobPayload.payloadKey);
} else {
// Fallback: payload stored directly in job data
logger.info('Processing batch with inline payload storage', {
batchIndex: jobPayload.batchIndex,
itemCount: jobPayload.items?.length || 0
});
batchData = {
items: jobPayload.items,
batchIndex: jobPayload.batchIndex,
config: jobPayload.config
};
total = jobPayload.total;
}
const { items, batchIndex, config } = batchData;
logger.info('Processing batch', {
batchIndex,
batchSize: items.length,
total,
progress: `${((batchIndex + 1) / total * 100).toFixed(2)}%`
progress: `${((batchIndex + 1) / total * 100).toFixed(2)}%`,
isOptimized: !!jobPayload.payloadKey
});
const totalBatchDelayMs = config.totalDelayMs / total;
@ -267,14 +473,16 @@ export class BatchProcessor {
batchIndex,
totalItems: items.length,
jobsCreated: createdJobs.length,
progress: `${((batchIndex + 1) / total * 100).toFixed(2)}%`
progress: `${((batchIndex + 1) / total * 100).toFixed(2)}%`,
memoryOptimized: !!jobPayload.payloadKey
});
return {
batchIndex,
totalItems: items.length,
jobsCreated: createdJobs.length,
jobsFailed: 0
jobsFailed: 0,
payloadKey: jobPayload.payloadKey || null
};
} catch (error) {
logger.error('Failed to process batch', {
@ -286,8 +494,31 @@ export class BatchProcessor {
batchIndex,
totalItems: items.length,
jobsCreated: 0,
jobsFailed: items.length
jobsFailed: items.length,
payloadKey: jobPayload.payloadKey || null
};
}
} /**
* Clean up Redis payload after successful processing (optional)
*/
async cleanupBatchPayload(payloadKey: string): Promise<void> {
if (!payloadKey) {
return;
}
if (!this.cacheProvider.isReady()) {
logger.warn('Cache provider not ready - skipping cleanup', { payloadKey });
return;
}
try {
await this.cacheProvider.del(payloadKey);
logger.info('Cleaned up batch payload from Redis', { payloadKey });
} catch (error) {
logger.warn('Failed to cleanup batch payload', {
payloadKey,
error: error instanceof Error ? error.message : String(error)
});
}
}
}