From 19ecd953462cd29ec26fb87ad9cc7913f3630841 Mon Sep 17 00:00:00 2001 From: Boki Date: Sat, 21 Jun 2025 07:43:37 -0400 Subject: [PATCH] added start worker delay --- .env | 2 +- apps/data-service/src/index.ts | 12 ++++++++--- libs/logger/src/logger.ts | 37 +++++++++++++++++++++++++++++++-- libs/queue/src/queue-manager.ts | 29 +++++++++++++++++++++++++- libs/queue/src/queue.ts | 18 ++++++++++++++++ libs/queue/src/types.ts | 1 + 6 files changed, 92 insertions(+), 7 deletions(-) diff --git a/.env b/.env index 6378a24..e648d2f 100644 --- a/.env +++ b/.env @@ -4,7 +4,7 @@ # Core Application Settings NODE_ENV=development -LOG_LEVEL=warn +LOG_LEVEL=debug LOG_HIDE_OBJECT=true # Data Service Configuration diff --git a/apps/data-service/src/index.ts b/apps/data-service/src/index.ts index 19440a6..56c7f15 100644 --- a/apps/data-service/src/index.ts +++ b/apps/data-service/src/index.ts @@ -93,7 +93,7 @@ async function initializeServices() { }); logger.info('PostgreSQL connected'); - // Initialize queue system + // Initialize queue system (with delayed worker start) logger.debug('Initializing queue system...'); const queueManagerConfig: QueueManagerConfig = { redis: queueConfig?.redis || { @@ -111,12 +111,13 @@ async function initializeServices() { removeOnComplete: true, removeOnFail: false, }, - workers: 5, - concurrency: 20, + workers: 2, + concurrency: 1, enableMetrics: true, enableDLQ: true, }, enableScheduledJobs: true, + delayWorkerStart: true, // Prevent workers from starting until all singletons are ready }; queueManager = QueueManager.getOrInitialize(queueManagerConfig); @@ -189,6 +190,11 @@ async function initializeServices() { } logger.info('Scheduled jobs created', { totalJobs: totalScheduledJobs }); + // Now that all singletons are initialized and jobs are scheduled, start the workers + logger.debug('Starting queue workers...'); + queueManager.startAllWorkers(); + logger.info('Queue workers started'); + logger.info('All services initialized successfully'); } catch (error) { logger.error('Failed to initialize services', { error }); diff --git a/libs/logger/src/logger.ts b/libs/logger/src/logger.ts index 8af4b25..fa7bf01 100644 --- a/libs/logger/src/logger.ts +++ b/libs/logger/src/logger.ts @@ -23,6 +23,7 @@ let globalConfig: LoggerConfig = { logFilePath: './logs', logLoki: false, environment: 'development', + hideObject: false, }; // Log level priorities for comparison @@ -40,6 +41,7 @@ const LOG_LEVELS: Record = { */ export function setLoggerConfig(config: LoggerConfig): void { globalConfig = { ...globalConfig, ...config }; + // Clear cache to force recreation with new config loggerCache.clear(); } /** @@ -173,15 +175,46 @@ export class Logger { return; } - const data = { ...this.context, ...metadata }; + let data = { ...this.context, ...metadata }; + + // Filter out objects if hideObject is enabled + if (globalConfig.hideObject) { + data = this.filterObjects(data); + } if (typeof message === 'string') { (this.pino as any)[level](data, message); } else { - (this.pino as any)[level]({ ...data, data: message }, 'Object logged'); + if (globalConfig.hideObject) { + (this.pino as any)[level]({}, `Object logged (hidden)`); + } else { + (this.pino as any)[level]({ ...data, data: message }, 'Object logged'); + } } } + /** + * Filter out objects from metadata when hideObject is enabled + */ + private filterObjects(data: Record): Record { + const filtered: Record = {}; + + for (const [key, value] of Object.entries(data)) { + if (typeof value === 'object' && value !== null && !Array.isArray(value)) { + // Keep error objects for debugging + if (key === 'err' || key === 'error' || value instanceof Error) { + filtered[key] = value; + } else { + filtered[key] = '[object hidden]'; + } + } else { + filtered[key] = value; + } + } + + return filtered; + } + // Simple log level methods trace(message: string | object, metadata?: LogMetadata): void { this.log('trace', message, metadata); diff --git a/libs/queue/src/queue-manager.ts b/libs/queue/src/queue-manager.ts index b555824..ee2ac14 100644 --- a/libs/queue/src/queue-manager.ts +++ b/libs/queue/src/queue-manager.ts @@ -129,7 +129,7 @@ export class QueueManager { const queueConfig: QueueWorkerConfig = { workers: mergedOptions.workers, concurrency: mergedOptions.concurrency, - startWorker: !!mergedOptions.workers && mergedOptions.workers > 0, + startWorker: !!mergedOptions.workers && mergedOptions.workers > 0 && !this.config.delayWorkerStart, }; const queue = new Queue( @@ -437,6 +437,33 @@ export class QueueManager { } } + /** + * Start workers for all queues (used when delayWorkerStart is enabled) + */ + startAllWorkers(): void { + if (!this.config.delayWorkerStart) { + logger.warn('startAllWorkers() called but delayWorkerStart is not enabled'); + return; + } + + let workersStarted = 0; + for (const queue of this.queues.values()) { + const workerCount = this.config.defaultQueueOptions?.workers || 1; + const concurrency = this.config.defaultQueueOptions?.concurrency || 1; + + if (workerCount > 0) { + queue.startWorkersManually(workerCount, concurrency); + workersStarted++; + } + } + + logger.info('All workers started', { + totalQueues: this.queues.size, + queuesWithWorkers: workersStarted, + delayWorkerStart: this.config.delayWorkerStart + }); + } + /** * Wait for all queues to be ready */ diff --git a/libs/queue/src/queue.ts b/libs/queue/src/queue.ts index 8f9bdb2..efc4cc1 100644 --- a/libs/queue/src/queue.ts +++ b/libs/queue/src/queue.ts @@ -337,6 +337,24 @@ export class Queue { } } + /** + * Start workers manually (for delayed initialization) + */ + startWorkersManually(workerCount: number, concurrency: number = 1): void { + if (this.workers.length > 0) { + logger.warn('Workers already started for queue', { queueName: this.queueName }); + return; + } + + // Initialize queue events if not already done + if (!this.queueEvents) { + const connection = getRedisConnection(this.redisConfig); + this.queueEvents = new QueueEvents(`{${this.queueName}}`, { connection }); + } + + this.startWorkers(workerCount, concurrency); + } + /** * Get the number of active workers */ diff --git a/libs/queue/src/types.ts b/libs/queue/src/types.ts index 0801fd0..be562b8 100644 --- a/libs/queue/src/types.ts +++ b/libs/queue/src/types.ts @@ -71,6 +71,7 @@ export interface QueueManagerConfig { enableScheduledJobs?: boolean; globalRateLimit?: RateLimitConfig; rateLimitRules?: RateLimitRule[]; // Global rate limit rules + delayWorkerStart?: boolean; // If true, workers won't start automatically } export interface QueueStats {