diff --git a/libs/queue/src/improved-queue-manager.ts b/libs/queue/src/improved-queue-manager.ts deleted file mode 100644 index 04e8421..0000000 --- a/libs/queue/src/improved-queue-manager.ts +++ /dev/null @@ -1,347 +0,0 @@ -import { getLogger } from '@stock-bot/logger'; -import { QueueRateLimiter } from './rate-limiter'; -import { Queue, type QueueWorkerConfig } from './queue'; -import { CacheProvider, createCache } from '@stock-bot/cache'; -import type { - QueueManagerConfig, - QueueOptions, - GlobalStats, - QueueStats, - RateLimitRule, - RedisConfig -} from './types'; -import { getRedisConnection } from './utils'; - -const logger = getLogger('queue-manager'); - -/** - * Improved Singleton QueueManager with better patterns - */ -export class QueueManager { - private static instance: QueueManager | null = null; - private static initializationPromise: Promise | null = null; - private static config: QueueManagerConfig | null = null; - - private queues = new Map(); - private caches = new Map(); - private rateLimiter?: QueueRateLimiter; - private redisConnection: ReturnType; - private isShuttingDown = false; - private shutdownPromise: Promise | null = null; - - private constructor(config: QueueManagerConfig) { - this.redisConnection = getRedisConnection(config.redis); - - // Initialize rate limiter if rules are provided - if (config.rateLimitRules && config.rateLimitRules.length > 0) { - this.rateLimiter = new QueueRateLimiter(this.redisConnection); - config.rateLimitRules.forEach(rule => { - this.rateLimiter!.addRule(rule); - }); - } - - logger.info('QueueManager singleton initialized'); - } - - /** - * Initialize the singleton with config (thread-safe) - * Should be called once at application startup - */ - static async initialize(config: QueueManagerConfig): Promise { - // If already initializing, return the same promise (prevents race conditions) - if (QueueManager.initializationPromise) { - logger.debug('QueueManager initialization already in progress'); - return QueueManager.initializationPromise; - } - - // If already initialized, validate config matches - if (QueueManager.instance) { - if (!QueueManager.isConfigEqual(config, QueueManager.config!)) { - throw new Error('QueueManager already initialized with different config'); - } - return QueueManager.instance; - } - - // Start initialization - QueueManager.initializationPromise = (async () => { - try { - QueueManager.config = config; - QueueManager.instance = new QueueManager(config); - - // Wait for connections to be ready - await QueueManager.instance.waitUntilReady(); - - return QueueManager.instance; - } catch (error) { - // Clean up on failure - QueueManager.instance = null; - QueueManager.config = null; - QueueManager.initializationPromise = null; - throw error; - } - })(); - - return QueueManager.initializationPromise; - } - - /** - * Get the singleton instance - * Throws if not initialized - forces explicit initialization - */ - static getInstance(): QueueManager { - if (!QueueManager.instance) { - throw new Error( - 'QueueManager not initialized. Call QueueManager.initialize(config) first.' - ); - } - return QueueManager.instance; - } - - /** - * Check if QueueManager is initialized - */ - static isInitialized(): boolean { - return QueueManager.instance !== null; - } - - /** - * Get current configuration (readonly) - */ - static getConfig(): Readonly | null { - return QueueManager.config ? { ...QueueManager.config } : null; - } - - /** - * Reset the singleton (thread-safe, mainly for testing) - */ - static async reset(): Promise { - // Wait for any ongoing initialization - if (QueueManager.initializationPromise) { - try { - await QueueManager.initializationPromise; - } catch { - // Ignore initialization errors during reset - } - } - - if (QueueManager.instance) { - await QueueManager.instance.shutdown(); - } - - QueueManager.instance = null; - QueueManager.config = null; - QueueManager.initializationPromise = null; - } - - /** - * Compare two configs for equality - */ - private static isConfigEqual(a: QueueManagerConfig, b: QueueManagerConfig): boolean { - return ( - a.redis.host === b.redis.host && - a.redis.port === b.redis.port && - a.redis.password === b.redis.password && - a.redis.db === b.redis.db - ); - } - - /** - * Wait until all connections are ready - */ - async waitUntilReady(timeout: number = 5000): Promise { - const startTime = Date.now(); - - // Wait for all queues to be ready - const readyPromises = Array.from(this.queues.values()).map(queue => - queue.waitUntilReady() - ); - - // Add timeout - await Promise.race([ - Promise.all(readyPromises), - new Promise((_, reject) => - setTimeout(() => reject(new Error('QueueManager ready timeout')), timeout) - ) - ]); - - logger.debug('QueueManager ready', { - duration: Date.now() - startTime - }); - } - - /** - * Get or create a queue - unified method that handles both scenarios - */ - getQueue(queueName: string, options: QueueOptions = {}): Queue { - if (this.isShuttingDown) { - throw new Error('QueueManager is shutting down'); - } - - // Return existing queue if it exists - if (this.queues.has(queueName)) { - return this.queues.get(queueName)!; - } - - // Create new queue with merged options - const mergedOptions = { - ...QueueManager.config!.defaultQueueOptions, - ...options, - }; - - // Prepare queue configuration - const queueConfig: QueueWorkerConfig = { - workers: mergedOptions.workers, - concurrency: mergedOptions.concurrency, - startWorker: mergedOptions.workers && mergedOptions.workers > 0, - }; - - const queue = new Queue( - queueName, - QueueManager.config!.redis, - mergedOptions.defaultJobOptions || {}, - queueConfig - ); - - // Store the queue - this.queues.set(queueName, queue); - - // Automatically initialize batch cache for the queue - this.initializeBatchCacheSync(queueName); - - // Add queue-specific rate limit rules - if (this.rateLimiter && mergedOptions.rateLimitRules) { - mergedOptions.rateLimitRules.forEach(rule => { - const ruleWithQueue = { ...rule, queueName }; - this.rateLimiter!.addRule(ruleWithQueue); - }); - } - - logger.info('Queue created with batch cache', { - queueName, - workers: mergedOptions.workers || 0, - concurrency: mergedOptions.concurrency || 1 - }); - - return queue; - } - - /** - * Initialize batch cache synchronously - */ - private initializeBatchCacheSync(queueName: string): void { - this.getCache(queueName); - logger.debug('Batch cache initialized synchronously for queue', { queueName }); - } - - /** - * Get or create a cache for a queue - */ - getCache(queueName: string): CacheProvider { - if (!this.caches.has(queueName)) { - const cacheProvider = createCache({ - redisConfig: QueueManager.config!.redis, - keyPrefix: `batch:${queueName}:`, - ttl: 86400, // 24 hours default - enableMetrics: true, - }); - this.caches.set(queueName, cacheProvider); - logger.debug('Cache created for queue', { queueName }); - } - return this.caches.get(queueName)!; - } - - /** - * Shutdown all queues and workers (thread-safe) - */ - async shutdown(): Promise { - // If already shutting down, return the existing promise - if (this.shutdownPromise) { - return this.shutdownPromise; - } - - if (this.isShuttingDown) { - return; - } - - this.isShuttingDown = true; - logger.info('Shutting down QueueManager...'); - - this.shutdownPromise = this.performShutdown(); - return this.shutdownPromise; - } - - /** - * Perform the actual shutdown - */ - private async performShutdown(): Promise { - try { - // Close all queues (this now includes workers) - const queueShutdownPromises = Array.from(this.queues.values()).map(async (queue) => { - try { - await queue.close(); - } catch (error) { - logger.warn('Error closing queue', { error: error.message }); - } - }); - - await Promise.all(queueShutdownPromises); - - // Close all caches - const cacheShutdownPromises = Array.from(this.caches.values()).map(async (cache) => { - try { - if (typeof cache.disconnect === 'function') { - await cache.disconnect(); - } else if (typeof cache.close === 'function') { - await cache.close(); - } else if (typeof cache.quit === 'function') { - await cache.quit(); - } - } catch (error) { - logger.warn('Error closing cache', { error: error.message }); - } - }); - - await Promise.all(cacheShutdownPromises); - - // Clear collections - this.queues.clear(); - this.caches.clear(); - - logger.info('QueueManager shutdown complete'); - } catch (error) { - logger.error('Error during shutdown', { error: error.message }); - throw error; - } - } - - // ... rest of the methods remain the same ... - - hasQueue(queueName: string): boolean { - return this.queues.has(queueName); - } - - getQueueNames(): string[] { - return Array.from(this.queues.keys()); - } - - async getGlobalStats(): Promise { - const queueStats: Record = {}; - let totalJobs = 0; - let totalWorkers = 0; - - for (const [queueName, queue] of this.queues) { - const stats = await queue.getStats(); - queueStats[queueName] = stats; - - totalJobs += stats.waiting + stats.active + stats.completed + stats.failed + stats.delayed; - totalWorkers += stats.workers || 0; - } - - return { - queues: queueStats, - totalJobs, - totalWorkers, - uptime: process.uptime(), - }; - } -} \ No newline at end of file diff --git a/libs/queue/src/queue-manager.ts b/libs/queue/src/queue-manager.ts index 88bcdfe..4fc5529 100644 --- a/libs/queue/src/queue-manager.ts +++ b/libs/queue/src/queue-manager.ts @@ -25,9 +25,11 @@ export class QueueManager { private rateLimiter?: QueueRateLimiter; private redisConnection: ReturnType; private isShuttingDown = false; - private isInitialized = false; + private shutdownPromise: Promise | null = null; + private config: QueueManagerConfig; - private constructor(private config: QueueManagerConfig) { + private constructor(config: QueueManagerConfig) { + this.config = config; this.redisConnection = getRedisConnection(config.redis); // Initialize rate limiter if rules are provided @@ -38,25 +40,27 @@ export class QueueManager { }); } - this.isInitialized = true; - logger.info('QueueManager singleton initialized'); + logger.info('QueueManager singleton initialized', { + redis: `${config.redis.host}:${config.redis.port}`, + }); } /** * Get the singleton instance + * @throws Error if not initialized - use initialize() first */ - static getInstance(config?: QueueManagerConfig): QueueManager { + static getInstance(): QueueManager { if (!QueueManager.instance) { - if (!config) { - throw new Error('QueueManager not initialized. Provide config on first call.'); - } - QueueManager.instance = new QueueManager(config); + throw new Error( + 'QueueManager not initialized. Call QueueManager.initialize(config) first.' + ); } return QueueManager.instance; } /** * Initialize the singleton with config + * Must be called before getInstance() */ static initialize(config: QueueManagerConfig): QueueManager { if (QueueManager.instance) { @@ -67,6 +71,32 @@ export class QueueManager { return QueueManager.instance; } + /** + * Get or initialize the singleton + * Convenience method that combines initialize and getInstance + */ + static getOrInitialize(config?: QueueManagerConfig): QueueManager { + if (QueueManager.instance) { + return QueueManager.instance; + } + + if (!config) { + throw new Error( + 'QueueManager not initialized and no config provided. ' + + 'Either call initialize(config) first or provide config to getOrInitialize(config).' + ); + } + + return QueueManager.initialize(config); + } + + /** + * Check if the QueueManager is initialized + */ + static isInitialized(): boolean { + return QueueManager.instance !== null; + } + /** * Reset the singleton (mainly for testing) */ @@ -327,9 +357,14 @@ export class QueueManager { /** - * Shutdown all queues and workers + * Shutdown all queues and workers (thread-safe) */ async shutdown(): Promise { + // If already shutting down, return the existing promise + if (this.shutdownPromise) { + return this.shutdownPromise; + } + if (this.isShuttingDown) { return; } @@ -337,6 +372,15 @@ export class QueueManager { this.isShuttingDown = true; logger.info('Shutting down QueueManager...'); + // Create shutdown promise + this.shutdownPromise = this.performShutdown(); + return this.shutdownPromise; + } + + /** + * Perform the actual shutdown + */ + private async performShutdown(): Promise { try { // Close all queues (this now includes workers since they're managed by Queue class) const queueShutdownPromises = Array.from(this.queues.values()).map(async (queue) => { @@ -375,6 +419,10 @@ export class QueueManager { } catch (error) { logger.error('Error during shutdown', { error: error.message }); throw error; + } finally { + // Reset shutdown state + this.shutdownPromise = null; + this.isShuttingDown = false; } } @@ -392,4 +440,11 @@ export class QueueManager { getRedisConfig() { return this.config.redis; } + + /** + * Get the current configuration + */ + getConfig(): Readonly { + return { ...this.config }; + } }