This commit is contained in:
Boki 2025-06-10 15:07:37 -04:00
parent 32d0eaac2d
commit 9825e99540
3 changed files with 68 additions and 59 deletions

View file

@ -120,14 +120,17 @@ async function initializeSharedResources() {
httpClient = new HttpClient({ timeout: 10000 }, logger); httpClient = new HttpClient({ timeout: 10000 }, logger);
concurrencyLimit = pLimit(PROXY_CONFIG.CONCURRENCY_LIMIT); concurrencyLimit = pLimit(PROXY_CONFIG.CONCURRENCY_LIMIT);
// Try to connect to cache, but don't block initialization if it fails // Check if cache is ready, but don't block initialization
try { if (cache.isReady()) {
// Use longer timeout for cache connection logger.info('Cache already ready');
await cache.waitForReady(30000); // 30 seconds } else {
logger.info('Cache connection established'); logger.info('Cache not ready yet, tasks will use fallback mode');
} catch (error) { // Try to wait briefly for cache to be ready, but don't block
logger.warn('Cache connection failed, continuing with degraded functionality:', {error}); cache.waitForReady(5000).then(() => {
// Don't throw - allow the service to continue with cache fallbacks logger.info('Cache became ready after initialization');
}).catch(error => {
logger.warn('Cache connection timeout, continuing with fallback mode:', {error: error.message});
});
} }
logger.info('Proxy tasks initialized'); logger.info('Proxy tasks initialized');

View file

@ -33,8 +33,8 @@ export class BatchProcessor {
ttl: cacheOptions?.ttl || 86400 * 2, // 48 hours default ttl: cacheOptions?.ttl || 86400 * 2, // 48 hours default
enableMetrics: true enableMetrics: true
}); });
this.initialize();
} }
/** /**
* Initialize the batch processor and wait for cache to be ready * Initialize the batch processor and wait for cache to be ready
*/ */
@ -55,19 +55,19 @@ export class BatchProcessor {
ttlHours: ((this.cacheOptions?.ttl || 86400 * 2) / 3600).toFixed(1) ttlHours: ((this.cacheOptions?.ttl || 86400 * 2) / 3600).toFixed(1)
}); });
} catch (error) { } catch (error) {
logger.error('Failed to initialize BatchProcessor', { logger.warn('BatchProcessor cache not ready within timeout, continuing with fallback mode', {
error: error instanceof Error ? error.message : String(error), error: error instanceof Error ? error.message : String(error),
timeout timeout
}); });
throw new Error(`BatchProcessor initialization failed: ${error instanceof Error ? error.message : String(error)}`); // Don't throw - mark as ready anyway and let cache operations use their fallback mechanisms
this.isReady = true;
} }
} }
/** /**
* Check if the batch processor is ready * Check if the batch processor is ready
*/ */
getReadyStatus(): boolean { getReadyStatus(): boolean {
return this.isReady && this.cacheProvider.isReady(); return this.isReady; // Don't require cache to be ready, let individual operations handle fallbacks
} }
/** /**
* Generate a unique key for storing batch payload in Redis * Generate a unique key for storing batch payload in Redis
@ -77,17 +77,11 @@ export class BatchProcessor {
return `payload:${jobNamePrefix}:${batchIndex}:${Date.now()}`; return `payload:${jobNamePrefix}:${batchIndex}:${Date.now()}`;
}/** }/**
* Store batch payload in Redis and return the key * Store batch payload in Redis and return the key
*/ */ private async storeBatchPayload<T>(
private async storeBatchPayload<T>(
items: T[], items: T[],
config: BatchConfig<T>, config: BatchConfig<T>,
batchIndex: number batchIndex: number
): Promise<string> { ): Promise<string> {
// Ensure cache is ready before storing
if (!this.cacheProvider.isReady()) {
throw new Error('Cache provider not ready - cannot store batch payload');
}
const payloadKey = this.generatePayloadKey(config.jobNamePrefix, batchIndex); const payloadKey = this.generatePayloadKey(config.jobNamePrefix, batchIndex);
const payload = { const payload = {
items, items,
@ -101,21 +95,29 @@ export class BatchProcessor {
const ttlSeconds = (config.payloadTtlHours || 24) * 60 * 60; const ttlSeconds = (config.payloadTtlHours || 24) * 60 * 60;
await this.cacheProvider.set( try {
payloadKey, await this.cacheProvider.set(
JSON.stringify(payload), payloadKey,
ttlSeconds JSON.stringify(payload),
); ttlSeconds
);
logger.info('Stored batch payload in Redis', { logger.info('Stored batch payload in Redis', {
payloadKey, payloadKey,
itemCount: items.length, itemCount: items.length,
batchIndex, batchIndex,
ttlHours: config.payloadTtlHours || 24 ttlHours: config.payloadTtlHours || 24
}); });
} catch (error) {
logger.error('Failed to store batch payload, job will run without caching', {
payloadKey,
error: error instanceof Error ? error.message : String(error)
});
// Don't throw - the job can still run, just without the cached payload
}
return payloadKey; return payloadKey;
} /** }/**
* Load batch payload from Redis * Load batch payload from Redis
*/ */
private async loadBatchPayload<T>(payloadKey: string): Promise<{ private async loadBatchPayload<T>(payloadKey: string): Promise<{
@ -183,11 +185,16 @@ export class BatchProcessor {
if (items.length === 0) { if (items.length === 0) {
return { totalItems: 0, jobsCreated: 0 }; return { totalItems: 0, jobsCreated: 0 };
} } // Final readiness check - wait briefly for cache to be ready
// Final readiness check
if (!this.cacheProvider.isReady()) { if (!this.cacheProvider.isReady()) {
throw new Error('Cache provider is not ready - cannot process items'); logger.warn('Cache provider not ready, waiting briefly...');
try {
await this.cacheProvider.waitForReady(10000); // Wait up to 10 seconds
logger.info('Cache provider became ready');
} catch (error) {
logger.warn('Cache provider still not ready, continuing with fallback mode');
// Don't throw error - let the cache operations use their fallback mechanisms
}
} }
logger.info('Starting item processing', { logger.info('Starting item processing', {
@ -324,7 +331,7 @@ export class BatchProcessor {
operation: `process-${jobNamePrefix}-batch`, operation: `process-${jobNamePrefix}-batch`,
payload: { payload: {
// Optimized: only store reference and metadata // Optimized: only store reference and metadata
payloadKey: this.keyPrefix + payloadKey, payloadKey: payloadKey,
batchIndex, batchIndex,
total: totalBatches, total: totalBatches,
itemCount: batchItems.length, itemCount: batchItems.length,

View file

@ -46,32 +46,31 @@ export class RedisCache implements CacheProvider {
} }
private setupEventHandlers(): void { private setupEventHandlers(): void {
// this.redis.on('connect', () => { this.redis.on('connect', () => {
// this.logger.info('Redis cache connected'); this.logger.info('Redis cache connected');
// }); });
// this.redis.on('ready', () => { this.redis.on('ready', () => {
// this.isConnected = true; this.isConnected = true;
// this.logger.info('Redis cache ready'); this.logger.info('Redis cache ready');
// }); });
// this.redis.on('error', (error: any) => { this.redis.on('error', (error: any) => {
// this.isConnected = false; this.isConnected = false;
// this.logger.error('Redis cache connection error', { error: error.message }); this.logger.error('Redis cache connection error', { error: error.message });
// }); });
// this.redis.on('close', () => { this.redis.on('close', () => {
// this.isConnected = false; this.isConnected = false;
// this.logger.warn('Redis cache connection closed'); this.logger.warn('Redis cache connection closed');
// }); });
// this.redis.on('reconnecting', () => { this.redis.on('reconnecting', () => {
// this.logger.info('Redis cache reconnecting...'); this.logger.info('Redis cache reconnecting...');
// }); });
} }
private getKey(key: string): string { private getKey(key: string): string {
console.log(`Using key prefix: ${this.keyPrefix}`);
return `${this.keyPrefix}${key}`; return `${this.keyPrefix}${key}`;
} }
@ -97,8 +96,8 @@ export class RedisCache implements CacheProvider {
operationName: string operationName: string
): Promise<T> { ): Promise<T> {
try { try {
if (!this.isConnected) { if (!this.isReady()) {
this.logger.warn(`Redis not connected for ${operationName}, using fallback`); this.logger.warn(`Redis not ready for ${operationName}, using fallback`);
this.updateStats(false, true); this.updateStats(false, true);
return fallback; return fallback;
} }
@ -236,6 +235,6 @@ export class RedisCache implements CacheProvider {
} }
isReady(): boolean { isReady(): boolean {
return this.isConnected && this.redis.status === 'ready'; return this.redis.status === 'ready';
} }
} }