diff --git a/apps/data-service/src/utils/batch-processor.ts b/apps/data-service/src/utils/batch-processor.ts index 5245c21..7d4222c 100644 --- a/apps/data-service/src/utils/batch-processor.ts +++ b/apps/data-service/src/utils/batch-processor.ts @@ -1,4 +1,5 @@ import { getLogger } from '@stock-bot/logger'; +import { createCache, CacheProvider } from '@stock-bot/cache'; export interface BatchConfig { items: T[]; @@ -13,23 +14,173 @@ export interface BatchConfig { removeOnComplete?: number; removeOnFail?: number; useBatching?: boolean; // Simple flag to choose mode + payloadTtlHours?: number; // TTL for stored payloads (default 24 hours) } const logger = getLogger('batch-processor'); export class BatchProcessor { - constructor(private queueManager: any) {} + private cacheProvider: CacheProvider; + private isReady = false; + constructor( + private queueManager: any, + private cacheOptions?: { keyPrefix?: string; ttl?: number } // Optional cache configuration + ) { + // Initialize cache provider with batch-specific settings + this.cacheProvider = createCache('redis', { + keyPrefix: cacheOptions?.keyPrefix || 'batch:', + ttl: cacheOptions?.ttl || 86400 * 2, // 48 hours default + enableMetrics: true + }); + } + + /** + * Initialize the batch processor and wait for cache to be ready + */ + async initialize(timeout: number = 10000): Promise { + if (this.isReady) { + logger.warn('BatchProcessor already initialized'); + return; + } + + logger.info('Initializing BatchProcessor, waiting for cache to be ready...'); + + try { + await this.cacheProvider.waitForReady(timeout); + this.isReady = true; + logger.info('BatchProcessor initialized successfully', { + cacheReady: this.cacheProvider.isReady(), + keyPrefix: this.cacheOptions?.keyPrefix || 'batch:', + ttlHours: ((this.cacheOptions?.ttl || 86400 * 2) / 3600).toFixed(1) + }); + } catch (error) { + logger.error('Failed to initialize BatchProcessor', { + error: error instanceof Error ? error.message : String(error), + timeout + }); + throw new Error(`BatchProcessor initialization failed: ${error instanceof Error ? error.message : String(error)}`); + } + } + + /** + * Check if the batch processor is ready + */ + getReadyStatus(): boolean { + return this.isReady && this.cacheProvider.isReady(); + } + + /** + * Generate a unique key for storing batch payload in Redis + */ + private generatePayloadKey(jobNamePrefix: string, batchIndex: number): string { + return `batch:payload:${jobNamePrefix}:${batchIndex}:${Date.now()}`; + } /** + * Store batch payload in Redis and return the key + */ + private async storeBatchPayload( + items: T[], + config: BatchConfig, + batchIndex: number + ): Promise { + // Ensure cache is ready before storing + if (!this.cacheProvider.isReady()) { + throw new Error('Cache provider not ready - cannot store batch payload'); + } + + const payloadKey = this.generatePayloadKey(config.jobNamePrefix, batchIndex); + const payload = { + items, + batchIndex, + config: { + ...config, + items: undefined // Don't store items twice + }, + createdAt: new Date().toISOString() + }; + + const ttlSeconds = (config.payloadTtlHours || 24) * 60 * 60; + + await this.cacheProvider.set( + payloadKey, + JSON.stringify(payload), + ttlSeconds + ); + + logger.info('Stored batch payload in Redis', { + payloadKey, + itemCount: items.length, + batchIndex, + ttlHours: config.payloadTtlHours || 24 + }); + + return payloadKey; + } /** + * Load batch payload from Redis + */ + private async loadBatchPayload(payloadKey: string): Promise<{ + items: T[]; + batchIndex: number; + config: BatchConfig; + } | null> { + // Ensure cache is ready before loading + if (!this.cacheProvider.isReady()) { + logger.error('Cache provider not ready - cannot load batch payload', { payloadKey }); + return null; + } + + try { + const payloadJson = await this.cacheProvider.get(payloadKey); + + if (!payloadJson) { + logger.error('Batch payload not found in Redis', { payloadKey }); + return null; + } + + const payload = JSON.parse(payloadJson); + + logger.info('Loaded batch payload from Redis', { + payloadKey, + itemCount: payload.items?.length || 0, + batchIndex: payload.batchIndex + }); + + return payload; + } catch (error) { + logger.error('Failed to load batch payload from Redis', { + payloadKey, + error: error instanceof Error ? error.message : String(error) + }); + return null; + } + } /** * Unified method that handles both direct and batch approaches */ async processItems(config: BatchConfig) { + // Check if BatchProcessor is ready + if (!this.getReadyStatus()) { + logger.warn('BatchProcessor not ready, attempting to initialize...'); + await this.initialize(); + } + const { items, useBatching = false } = config; if (items.length === 0) { return { totalItems: 0, jobsCreated: 0 }; } + // Final readiness check + if (!this.cacheProvider.isReady()) { + throw new Error('Cache provider is not ready - cannot process items'); + } + + logger.info('Starting item processing', { + totalItems: items.length, + mode: useBatching ? 'batch' : 'direct', + cacheReady: this.cacheProvider.isReady() + }); + if (useBatching) { return await this.createBatchJobs(config); } else { @@ -113,7 +264,6 @@ export class BatchProcessor { mode: 'direct' }; } - private async createBatchJobs(config: BatchConfig) { const { items, @@ -131,11 +281,12 @@ export class BatchProcessor { const chunkSize = 50; // Create batch jobs in chunks let batchJobsCreated = 0; - logger.info('Creating batch jobs', { + logger.info('Creating optimized batch jobs with Redis payload storage', { totalItems: items.length, batchSize, totalBatches, - delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes` + delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes`, + payloadTtlHours: config.payloadTtlHours || 24 }); // Create batch jobs in chunks @@ -147,6 +298,8 @@ export class BatchProcessor { const startIndex = batchIndex * batchSize; const endIndex = Math.min(startIndex + batchSize, items.length); const batchItems = items.slice(startIndex, endIndex); + // Store batch payload in Redis and get reference key + const payloadKey = await this.storeBatchPayload(batchItems, config, batchIndex); batchJobs.push({ name: `${jobNamePrefix}-batch-processing`, @@ -156,10 +309,21 @@ export class BatchProcessor { provider, operation: `process-${jobNamePrefix}-batch`, payload: { - items: batchItems, + // Optimized: only store reference and metadata + payloadKey, batchIndex, total: totalBatches, - config: { ...config, priority: priority - 1 } + itemCount: batchItems.length, + configSnapshot: { + jobNamePrefix: config.jobNamePrefix, + operation: config.operation, + service: config.service, + provider: config.provider, + priority: config.priority, + removeOnComplete: config.removeOnComplete, + removeOnFail: config.removeOnFail, + totalDelayMs: config.totalDelayMs + } }, priority }, @@ -173,13 +337,13 @@ export class BatchProcessor { try { const createdJobs = await this.queueManager.queue.addBulk(batchJobs); batchJobsCreated += createdJobs.length; - - logger.info('Batch chunk created', { + logger.info('Optimized batch chunk created', { chunkStart: chunkStart + 1, chunkEnd, created: createdJobs.length, totalCreated: batchJobsCreated, - progress: `${((chunkEnd / totalBatches) * 100).toFixed(1)}%` + progress: `${((chunkEnd / totalBatches) * 100).toFixed(1)}%`, + usingRedisStorage: true }); } catch (error) { logger.error('Failed to create batch chunk', { @@ -193,33 +357,75 @@ export class BatchProcessor { if (chunkEnd < totalBatches) { await new Promise(resolve => setTimeout(resolve, 100)); } - } - - return { + } return { totalItems: items.length, batchJobsCreated, totalBatches, estimatedDurationHours: totalDelayMs / 1000 / 60 / 60, - mode: 'batch' + mode: 'batch', + optimized: true }; } - /** * Process a batch (called by batch jobs) + * Supports both optimized (Redis payload storage) and fallback modes */ - async processBatch(payload: { - items: T[]; - batchIndex: number; - total: number; - config: BatchConfig; - }, createJobData?: (item: T, index: number) => any) { - const { items, batchIndex, total, config } = payload; + async processBatch( + jobPayload: any, + createJobData?: (item: T, index: number) => any + ) { + let batchData: { + items: T[]; + batchIndex: number; + config: BatchConfig; + }; + + let total: number; + + // Check if this is an optimized batch with Redis payload storage + if (jobPayload.payloadKey) { + logger.info('Processing optimized batch with Redis payload storage', { + payloadKey: jobPayload.payloadKey, + batchIndex: jobPayload.batchIndex, + itemCount: jobPayload.itemCount + }); + + // Load actual payload from Redis + const loadedPayload = await this.loadBatchPayload(jobPayload.payloadKey); + + if (!loadedPayload) { + throw new Error(`Failed to load batch payload from Redis: ${jobPayload.payloadKey}`); + } + + batchData = loadedPayload; + total = jobPayload.total; + + // Clean up Redis payload after loading (optional - you might want to keep it for retry scenarios) + // await this.redisClient?.del(jobPayload.payloadKey); + + } else { + // Fallback: payload stored directly in job data + logger.info('Processing batch with inline payload storage', { + batchIndex: jobPayload.batchIndex, + itemCount: jobPayload.items?.length || 0 + }); + + batchData = { + items: jobPayload.items, + batchIndex: jobPayload.batchIndex, + config: jobPayload.config + }; + total = jobPayload.total; + } + + const { items, batchIndex, config } = batchData; logger.info('Processing batch', { batchIndex, batchSize: items.length, total, - progress: `${((batchIndex + 1) / total * 100).toFixed(2)}%` + progress: `${((batchIndex + 1) / total * 100).toFixed(2)}%`, + isOptimized: !!jobPayload.payloadKey }); const totalBatchDelayMs = config.totalDelayMs / total; @@ -267,14 +473,16 @@ export class BatchProcessor { batchIndex, totalItems: items.length, jobsCreated: createdJobs.length, - progress: `${((batchIndex + 1) / total * 100).toFixed(2)}%` + progress: `${((batchIndex + 1) / total * 100).toFixed(2)}%`, + memoryOptimized: !!jobPayload.payloadKey }); return { batchIndex, totalItems: items.length, jobsCreated: createdJobs.length, - jobsFailed: 0 + jobsFailed: 0, + payloadKey: jobPayload.payloadKey || null }; } catch (error) { logger.error('Failed to process batch', { @@ -286,8 +494,31 @@ export class BatchProcessor { batchIndex, totalItems: items.length, jobsCreated: 0, - jobsFailed: items.length + jobsFailed: items.length, + payloadKey: jobPayload.payloadKey || null }; } + } /** + * Clean up Redis payload after successful processing (optional) + */ + async cleanupBatchPayload(payloadKey: string): Promise { + if (!payloadKey) { + return; + } + + if (!this.cacheProvider.isReady()) { + logger.warn('Cache provider not ready - skipping cleanup', { payloadKey }); + return; + } + + try { + await this.cacheProvider.del(payloadKey); + logger.info('Cleaned up batch payload from Redis', { payloadKey }); + } catch (error) { + logger.warn('Failed to cleanup batch payload', { + payloadKey, + error: error instanceof Error ? error.message : String(error) + }); + } } } \ No newline at end of file diff --git a/libs/cache/src/providers/hybrid-cache.ts b/libs/cache/src/providers/hybrid-cache.ts index f3e351f..38cba8d 100644 --- a/libs/cache/src/providers/hybrid-cache.ts +++ b/libs/cache/src/providers/hybrid-cache.ts @@ -258,4 +258,37 @@ export class HybridCache implements CacheProvider { await this.redisCache.disconnect(); this.logger.info('Hybrid cache disconnected'); } + + async waitForReady(timeout: number = 5000): Promise { + // Memory cache is always ready, only need to wait for Redis + await this.redisCache.waitForReady(timeout); + } + + isReady(): boolean { + // Memory cache is always ready, check Redis status + return this.memoryCache.isReady() && this.redisCache.isReady(); + } + + /** + * Manually trigger a refresh of the Redis cache for a specific key + * Useful for updating the cache after a data change + */ + async refresh(key: string): Promise { + try { + // Get the current value from memory (L1) + const currentValue = await this.memoryCache.get(key); + if (currentValue !== null) { + // If exists in memory, update Redis (L2) + await this.redisCache.set(key, currentValue); + this.logger.info('Cache refresh (L2)', { key }); + } else { + this.logger.debug('Cache refresh skipped, key not found in L1', { key }); + } + } catch (error) { + this.logger.error('Cache refresh error', { + key, + error: error instanceof Error ? error.message : String(error) + }); + } + } } diff --git a/libs/cache/src/providers/memory-cache.ts b/libs/cache/src/providers/memory-cache.ts index 28e740c..b81015e 100644 --- a/libs/cache/src/providers/memory-cache.ts +++ b/libs/cache/src/providers/memory-cache.ts @@ -256,4 +256,25 @@ export class MemoryCache implements CacheProvider { } return bytes; } + + async waitForReady(timeout: number = 5000): Promise { + // Memory cache is always ready immediately + return Promise.resolve(); + } + + isReady(): boolean { + // Memory cache is always ready + return true; + } + + private getMemoryUsage(): number { + // Rough estimation of memory usage in bytes + let bytes = 0; + for (const [key, entry] of this.store.entries()) { + bytes += key.length * 2; // UTF-16 characters + bytes += JSON.stringify(entry.value).length * 2; + bytes += 24; // Overhead for entry object + } + return bytes; + } } diff --git a/libs/cache/src/providers/redis-cache.ts b/libs/cache/src/providers/redis-cache.ts index dd86f2d..2ffd1fd 100644 --- a/libs/cache/src/providers/redis-cache.ts +++ b/libs/cache/src/providers/redis-cache.ts @@ -253,6 +253,44 @@ export class RedisCache implements CacheProvider { return this.get(key); } + async waitForReady(timeout: number = 5000): Promise { + if (this.isConnected) { + return Promise.resolve(); + } + + return new Promise((resolve, reject) => { + const timeoutId = setTimeout(() => { + reject(new Error(`Redis cache connection timeout after ${timeout}ms`)); + }, timeout); + + const onReady = () => { + clearTimeout(timeoutId); + this.redis.off('ready', onReady); + this.redis.off('error', onError); + resolve(); + }; + + const onError = (error: Error) => { + clearTimeout(timeoutId); + this.redis.off('ready', onReady); + this.redis.off('error', onError); + reject(new Error(`Redis cache connection failed: ${error.message}`)); + }; + + if (this.redis.status === 'ready') { + clearTimeout(timeoutId); + resolve(); + } else { + this.redis.once('ready', onReady); + this.redis.once('error', onError); + } + }); + } + + isReady(): boolean { + return this.isConnected && this.redis.status === 'ready'; + } + /** * Close the Redis connection */ diff --git a/libs/cache/src/types.ts b/libs/cache/src/types.ts index 1f19cce..4b2364f 100644 --- a/libs/cache/src/types.ts +++ b/libs/cache/src/types.ts @@ -6,6 +6,18 @@ export interface CacheProvider { clear(): Promise; getStats(): CacheStats; health(): Promise; + + /** + * Wait for the cache to be ready and connected + * @param timeout Maximum time to wait in milliseconds (default: 5000) + * @returns Promise that resolves when cache is ready + */ + waitForReady(timeout?: number): Promise; + + /** + * Check if the cache is currently ready + */ + isReady(): boolean; } export interface CacheOptions {