added start worker delay

This commit is contained in:
Boki 2025-06-21 07:43:37 -04:00
parent a0e1593af9
commit 19ecd95346
6 changed files with 92 additions and 7 deletions

2
.env
View file

@ -4,7 +4,7 @@
# Core Application Settings # Core Application Settings
NODE_ENV=development NODE_ENV=development
LOG_LEVEL=warn LOG_LEVEL=debug
LOG_HIDE_OBJECT=true LOG_HIDE_OBJECT=true
# Data Service Configuration # Data Service Configuration

View file

@ -93,7 +93,7 @@ async function initializeServices() {
}); });
logger.info('PostgreSQL connected'); logger.info('PostgreSQL connected');
// Initialize queue system // Initialize queue system (with delayed worker start)
logger.debug('Initializing queue system...'); logger.debug('Initializing queue system...');
const queueManagerConfig: QueueManagerConfig = { const queueManagerConfig: QueueManagerConfig = {
redis: queueConfig?.redis || { redis: queueConfig?.redis || {
@ -111,12 +111,13 @@ async function initializeServices() {
removeOnComplete: true, removeOnComplete: true,
removeOnFail: false, removeOnFail: false,
}, },
workers: 5, workers: 2,
concurrency: 20, concurrency: 1,
enableMetrics: true, enableMetrics: true,
enableDLQ: true, enableDLQ: true,
}, },
enableScheduledJobs: true, enableScheduledJobs: true,
delayWorkerStart: true, // Prevent workers from starting until all singletons are ready
}; };
queueManager = QueueManager.getOrInitialize(queueManagerConfig); queueManager = QueueManager.getOrInitialize(queueManagerConfig);
@ -189,6 +190,11 @@ async function initializeServices() {
} }
logger.info('Scheduled jobs created', { totalJobs: totalScheduledJobs }); logger.info('Scheduled jobs created', { totalJobs: totalScheduledJobs });
// Now that all singletons are initialized and jobs are scheduled, start the workers
logger.debug('Starting queue workers...');
queueManager.startAllWorkers();
logger.info('Queue workers started');
logger.info('All services initialized successfully'); logger.info('All services initialized successfully');
} catch (error) { } catch (error) {
logger.error('Failed to initialize services', { error }); logger.error('Failed to initialize services', { error });

View file

@ -23,6 +23,7 @@ let globalConfig: LoggerConfig = {
logFilePath: './logs', logFilePath: './logs',
logLoki: false, logLoki: false,
environment: 'development', environment: 'development',
hideObject: false,
}; };
// Log level priorities for comparison // Log level priorities for comparison
@ -40,6 +41,7 @@ const LOG_LEVELS: Record<LogLevel, number> = {
*/ */
export function setLoggerConfig(config: LoggerConfig): void { export function setLoggerConfig(config: LoggerConfig): void {
globalConfig = { ...globalConfig, ...config }; globalConfig = { ...globalConfig, ...config };
// Clear cache to force recreation with new config
loggerCache.clear(); loggerCache.clear();
} }
/** /**
@ -173,14 +175,45 @@ export class Logger {
return; return;
} }
const data = { ...this.context, ...metadata }; let data = { ...this.context, ...metadata };
// Filter out objects if hideObject is enabled
if (globalConfig.hideObject) {
data = this.filterObjects(data);
}
if (typeof message === 'string') { if (typeof message === 'string') {
(this.pino as any)[level](data, message); (this.pino as any)[level](data, message);
} else {
if (globalConfig.hideObject) {
(this.pino as any)[level]({}, `Object logged (hidden)`);
} else { } else {
(this.pino as any)[level]({ ...data, data: message }, 'Object logged'); (this.pino as any)[level]({ ...data, data: message }, 'Object logged');
} }
} }
}
/**
* Filter out objects from metadata when hideObject is enabled
*/
private filterObjects(data: Record<string, any>): Record<string, any> {
const filtered: Record<string, any> = {};
for (const [key, value] of Object.entries(data)) {
if (typeof value === 'object' && value !== null && !Array.isArray(value)) {
// Keep error objects for debugging
if (key === 'err' || key === 'error' || value instanceof Error) {
filtered[key] = value;
} else {
filtered[key] = '[object hidden]';
}
} else {
filtered[key] = value;
}
}
return filtered;
}
// Simple log level methods // Simple log level methods
trace(message: string | object, metadata?: LogMetadata): void { trace(message: string | object, metadata?: LogMetadata): void {

View file

@ -129,7 +129,7 @@ export class QueueManager {
const queueConfig: QueueWorkerConfig = { const queueConfig: QueueWorkerConfig = {
workers: mergedOptions.workers, workers: mergedOptions.workers,
concurrency: mergedOptions.concurrency, concurrency: mergedOptions.concurrency,
startWorker: !!mergedOptions.workers && mergedOptions.workers > 0, startWorker: !!mergedOptions.workers && mergedOptions.workers > 0 && !this.config.delayWorkerStart,
}; };
const queue = new Queue( const queue = new Queue(
@ -437,6 +437,33 @@ export class QueueManager {
} }
} }
/**
* Start workers for all queues (used when delayWorkerStart is enabled)
*/
startAllWorkers(): void {
if (!this.config.delayWorkerStart) {
logger.warn('startAllWorkers() called but delayWorkerStart is not enabled');
return;
}
let workersStarted = 0;
for (const queue of this.queues.values()) {
const workerCount = this.config.defaultQueueOptions?.workers || 1;
const concurrency = this.config.defaultQueueOptions?.concurrency || 1;
if (workerCount > 0) {
queue.startWorkersManually(workerCount, concurrency);
workersStarted++;
}
}
logger.info('All workers started', {
totalQueues: this.queues.size,
queuesWithWorkers: workersStarted,
delayWorkerStart: this.config.delayWorkerStart
});
}
/** /**
* Wait for all queues to be ready * Wait for all queues to be ready
*/ */

View file

@ -337,6 +337,24 @@ export class Queue {
} }
} }
/**
* Start workers manually (for delayed initialization)
*/
startWorkersManually(workerCount: number, concurrency: number = 1): void {
if (this.workers.length > 0) {
logger.warn('Workers already started for queue', { queueName: this.queueName });
return;
}
// Initialize queue events if not already done
if (!this.queueEvents) {
const connection = getRedisConnection(this.redisConfig);
this.queueEvents = new QueueEvents(`{${this.queueName}}`, { connection });
}
this.startWorkers(workerCount, concurrency);
}
/** /**
* Get the number of active workers * Get the number of active workers
*/ */

View file

@ -71,6 +71,7 @@ export interface QueueManagerConfig {
enableScheduledJobs?: boolean; enableScheduledJobs?: boolean;
globalRateLimit?: RateLimitConfig; globalRateLimit?: RateLimitConfig;
rateLimitRules?: RateLimitRule[]; // Global rate limit rules rateLimitRules?: RateLimitRule[]; // Global rate limit rules
delayWorkerStart?: boolean; // If true, workers won't start automatically
} }
export interface QueueStats { export interface QueueStats {