From 9825e99540c59b459ee9344c5d214ae9cf419c1d Mon Sep 17 00:00:00 2001 From: Boki Date: Tue, 10 Jun 2025 15:07:37 -0400 Subject: [PATCH] fixes --- .../data-service/src/providers/proxy.tasks.ts | 19 +++--- .../data-service/src/utils/batch-processor.ts | 65 ++++++++++--------- libs/cache/src/redis-cache.ts | 43 ++++++------ 3 files changed, 68 insertions(+), 59 deletions(-) diff --git a/apps/data-service/src/providers/proxy.tasks.ts b/apps/data-service/src/providers/proxy.tasks.ts index 86461c2..573102f 100644 --- a/apps/data-service/src/providers/proxy.tasks.ts +++ b/apps/data-service/src/providers/proxy.tasks.ts @@ -120,14 +120,17 @@ async function initializeSharedResources() { httpClient = new HttpClient({ timeout: 10000 }, logger); concurrencyLimit = pLimit(PROXY_CONFIG.CONCURRENCY_LIMIT); - // Try to connect to cache, but don't block initialization if it fails - try { - // Use longer timeout for cache connection - await cache.waitForReady(30000); // 30 seconds - logger.info('Cache connection established'); - } catch (error) { - logger.warn('Cache connection failed, continuing with degraded functionality:', {error}); - // Don't throw - allow the service to continue with cache fallbacks + // Check if cache is ready, but don't block initialization + if (cache.isReady()) { + logger.info('Cache already ready'); + } else { + logger.info('Cache not ready yet, tasks will use fallback mode'); + // Try to wait briefly for cache to be ready, but don't block + cache.waitForReady(5000).then(() => { + logger.info('Cache became ready after initialization'); + }).catch(error => { + logger.warn('Cache connection timeout, continuing with fallback mode:', {error: error.message}); + }); } logger.info('Proxy tasks initialized'); diff --git a/apps/data-service/src/utils/batch-processor.ts b/apps/data-service/src/utils/batch-processor.ts index 3c4120d..8b6e4ff 100644 --- a/apps/data-service/src/utils/batch-processor.ts +++ b/apps/data-service/src/utils/batch-processor.ts @@ -33,8 +33,8 @@ export class BatchProcessor { ttl: cacheOptions?.ttl || 86400 * 2, // 48 hours default enableMetrics: true }); + this.initialize(); } - /** * Initialize the batch processor and wait for cache to be ready */ @@ -55,19 +55,19 @@ export class BatchProcessor { ttlHours: ((this.cacheOptions?.ttl || 86400 * 2) / 3600).toFixed(1) }); } catch (error) { - logger.error('Failed to initialize BatchProcessor', { + logger.warn('BatchProcessor cache not ready within timeout, continuing with fallback mode', { error: error instanceof Error ? error.message : String(error), timeout }); - throw new Error(`BatchProcessor initialization failed: ${error instanceof Error ? error.message : String(error)}`); + // Don't throw - mark as ready anyway and let cache operations use their fallback mechanisms + this.isReady = true; } } - /** * Check if the batch processor is ready */ getReadyStatus(): boolean { - return this.isReady && this.cacheProvider.isReady(); + return this.isReady; // Don't require cache to be ready, let individual operations handle fallbacks } /** * Generate a unique key for storing batch payload in Redis @@ -77,17 +77,11 @@ export class BatchProcessor { return `payload:${jobNamePrefix}:${batchIndex}:${Date.now()}`; }/** * Store batch payload in Redis and return the key - */ - private async storeBatchPayload( + */ private async storeBatchPayload( items: T[], config: BatchConfig, batchIndex: number ): Promise { - // Ensure cache is ready before storing - if (!this.cacheProvider.isReady()) { - throw new Error('Cache provider not ready - cannot store batch payload'); - } - const payloadKey = this.generatePayloadKey(config.jobNamePrefix, batchIndex); const payload = { items, @@ -101,21 +95,29 @@ export class BatchProcessor { const ttlSeconds = (config.payloadTtlHours || 24) * 60 * 60; - await this.cacheProvider.set( - payloadKey, - JSON.stringify(payload), - ttlSeconds - ); + try { + await this.cacheProvider.set( + payloadKey, + JSON.stringify(payload), + ttlSeconds + ); - logger.info('Stored batch payload in Redis', { - payloadKey, - itemCount: items.length, - batchIndex, - ttlHours: config.payloadTtlHours || 24 - }); + logger.info('Stored batch payload in Redis', { + payloadKey, + itemCount: items.length, + batchIndex, + ttlHours: config.payloadTtlHours || 24 + }); + } catch (error) { + logger.error('Failed to store batch payload, job will run without caching', { + payloadKey, + error: error instanceof Error ? error.message : String(error) + }); + // Don't throw - the job can still run, just without the cached payload + } return payloadKey; - } /** + }/** * Load batch payload from Redis */ private async loadBatchPayload(payloadKey: string): Promise<{ @@ -183,11 +185,16 @@ export class BatchProcessor { if (items.length === 0) { return { totalItems: 0, jobsCreated: 0 }; - } - - // Final readiness check + } // Final readiness check - wait briefly for cache to be ready if (!this.cacheProvider.isReady()) { - throw new Error('Cache provider is not ready - cannot process items'); + logger.warn('Cache provider not ready, waiting briefly...'); + try { + await this.cacheProvider.waitForReady(10000); // Wait up to 10 seconds + logger.info('Cache provider became ready'); + } catch (error) { + logger.warn('Cache provider still not ready, continuing with fallback mode'); + // Don't throw error - let the cache operations use their fallback mechanisms + } } logger.info('Starting item processing', { @@ -324,7 +331,7 @@ export class BatchProcessor { operation: `process-${jobNamePrefix}-batch`, payload: { // Optimized: only store reference and metadata - payloadKey: this.keyPrefix + payloadKey, + payloadKey: payloadKey, batchIndex, total: totalBatches, itemCount: batchItems.length, diff --git a/libs/cache/src/redis-cache.ts b/libs/cache/src/redis-cache.ts index 274e67b..f8fcd8f 100644 --- a/libs/cache/src/redis-cache.ts +++ b/libs/cache/src/redis-cache.ts @@ -46,32 +46,31 @@ export class RedisCache implements CacheProvider { } private setupEventHandlers(): void { - // this.redis.on('connect', () => { - // this.logger.info('Redis cache connected'); - // }); + this.redis.on('connect', () => { + this.logger.info('Redis cache connected'); + }); - // this.redis.on('ready', () => { - // this.isConnected = true; - // this.logger.info('Redis cache ready'); - // }); + this.redis.on('ready', () => { + this.isConnected = true; + this.logger.info('Redis cache ready'); + }); - // this.redis.on('error', (error: any) => { - // this.isConnected = false; - // this.logger.error('Redis cache connection error', { error: error.message }); - // }); + this.redis.on('error', (error: any) => { + this.isConnected = false; + this.logger.error('Redis cache connection error', { error: error.message }); + }); - // this.redis.on('close', () => { - // this.isConnected = false; - // this.logger.warn('Redis cache connection closed'); - // }); + this.redis.on('close', () => { + this.isConnected = false; + this.logger.warn('Redis cache connection closed'); + }); - // this.redis.on('reconnecting', () => { - // this.logger.info('Redis cache reconnecting...'); - // }); + this.redis.on('reconnecting', () => { + this.logger.info('Redis cache reconnecting...'); + }); } private getKey(key: string): string { - console.log(`Using key prefix: ${this.keyPrefix}`); return `${this.keyPrefix}${key}`; } @@ -97,8 +96,8 @@ export class RedisCache implements CacheProvider { operationName: string ): Promise { try { - if (!this.isConnected) { - this.logger.warn(`Redis not connected for ${operationName}, using fallback`); + if (!this.isReady()) { + this.logger.warn(`Redis not ready for ${operationName}, using fallback`); this.updateStats(false, true); return fallback; } @@ -236,6 +235,6 @@ export class RedisCache implements CacheProvider { } isReady(): boolean { - return this.isConnected && this.redis.status === 'ready'; + return this.redis.status === 'ready'; } }