diff --git a/apps/stock/data-ingestion/src/handlers/ceo/actions/process-individual-symbol.action.ts b/apps/stock/data-ingestion/src/handlers/ceo/actions/process-individual-symbol.action.ts index 2f988d5..f721b4c 100644 --- a/apps/stock/data-ingestion/src/handlers/ceo/actions/process-individual-symbol.action.ts +++ b/apps/stock/data-ingestion/src/handlers/ceo/actions/process-individual-symbol.action.ts @@ -98,7 +98,7 @@ export async function processIndividualSymbol( await this.scheduleOperation('process-individual-symbol', { ceoId: ceoId, timestamp: latestSpielTime, - }); + }, {priority: 1}); } this.logger.info( diff --git a/libs/data/cache/src/connection-manager.ts b/libs/data/cache/src/connection-manager.ts index 6da09b3..9339f67 100644 --- a/libs/data/cache/src/connection-manager.ts +++ b/libs/data/cache/src/connection-manager.ts @@ -41,7 +41,7 @@ export class RedisConnectionManager { if (singleton) { // Use shared connection across all instances if (!RedisConnectionManager.sharedConnections.has(name)) { - const connection = this.createConnection(name, redisConfig, db); + const connection = this.createConnection(name, redisConfig, db, logger); RedisConnectionManager.sharedConnections.set(name, connection); this.logger.info(`Created shared Redis connection: ${name}`); } @@ -53,7 +53,7 @@ export class RedisConnectionManager { } else { // Create unique connection per instance const uniqueName = `${name}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; - const connection = this.createConnection(uniqueName, redisConfig, db); + const connection = this.createConnection(uniqueName, redisConfig, db, logger); this.connections.set(uniqueName, connection); this.logger.debug(`Created unique Redis connection: ${uniqueName}`); return connection; @@ -63,7 +63,7 @@ export class RedisConnectionManager { /** * Create a new Redis connection with configuration */ - private createConnection(name: string, config: RedisConfig, db?: number): Redis { + private createConnection(name: string, config: RedisConfig, db?: number, logger?: any): Redis { const redisOptions = { host: config.host, port: config.port, @@ -88,26 +88,29 @@ export class RedisConnectionManager { }; const redis = new Redis(redisOptions); + + // Use the provided logger or fall back to instance logger + const log = logger || this.logger; // Setup event handlers redis.on('connect', () => { - this.logger.info(`Redis connection established: ${name}`); + log.info(`Redis connection established: ${name}`); }); redis.on('ready', () => { - this.logger.info(`Redis connection ready: ${name}`); + log.info(`Redis connection ready: ${name}`); }); redis.on('error', err => { - this.logger.error(`Redis connection error for ${name}:`, err); + log.error(`Redis connection error for ${name}:`, err); }); redis.on('close', () => { - this.logger.warn(`Redis connection closed: ${name}`); + log.warn(`Redis connection closed: ${name}`); }); redis.on('reconnecting', () => { - this.logger.warn(`Redis reconnecting: ${name}`); + log.warn(`Redis reconnecting: ${name}`); }); return redis; diff --git a/libs/services/queue/src/queue-manager.ts b/libs/services/queue/src/queue-manager.ts index b67c3e7..7f1acee 100644 --- a/libs/services/queue/src/queue-manager.ts +++ b/libs/services/queue/src/queue-manager.ts @@ -75,11 +75,13 @@ export class QueueManager { }; // Prepare queue configuration + const workers = mergedOptions.workers ?? this.config.defaultQueueOptions?.workers ?? 1; + const concurrency = mergedOptions.concurrency ?? this.config.defaultQueueOptions?.concurrency ?? 1; + const queueConfig: QueueWorkerConfig = { - workers: mergedOptions.workers, - concurrency: mergedOptions.concurrency, - startWorker: - !!mergedOptions.workers && mergedOptions.workers > 0 && !this.config.delayWorkerStart, + workers, + concurrency, + startWorker: workers > 0 && !this.config.delayWorkerStart, }; const queue = new Queue( @@ -109,8 +111,8 @@ export class QueueManager { this.logger.info('Queue created with batch cache', { queueName, - workers: mergedOptions.workers || 0, - concurrency: mergedOptions.concurrency || 1, + workers: workers, + concurrency: concurrency, }); return queue; @@ -140,6 +142,7 @@ export class QueueManager { keyPrefix: `batch:${queueName}:`, ttl: 86400, // 24 hours default enableMetrics: true, + logger: this.logger, }); this.caches.set(queueName, cacheProvider); this.logger.trace('Cache created for queue', { queueName }); @@ -210,7 +213,7 @@ export class QueueManager { */ addRateLimitRule(rule: RateLimitRule): void { if (!this.rateLimiter) { - this.rateLimiter = new QueueRateLimiter(this.redisConnection); + this.rateLimiter = new QueueRateLimiter(this.redisConnection, this.logger); } this.rateLimiter.addRule(rule); }