From 0d8119e3de3e144c91fe696b00d9ec25555e94b6 Mon Sep 17 00:00:00 2001 From: Boki Date: Thu, 19 Jun 2025 08:34:16 -0400 Subject: [PATCH] fixes --- libs/queue/src/improved-queue-manager.ts | 347 +++++++++++++++++++++++ libs/queue/src/index.ts | 2 +- 2 files changed, 348 insertions(+), 1 deletion(-) create mode 100644 libs/queue/src/improved-queue-manager.ts diff --git a/libs/queue/src/improved-queue-manager.ts b/libs/queue/src/improved-queue-manager.ts new file mode 100644 index 0000000..04e8421 --- /dev/null +++ b/libs/queue/src/improved-queue-manager.ts @@ -0,0 +1,347 @@ +import { getLogger } from '@stock-bot/logger'; +import { QueueRateLimiter } from './rate-limiter'; +import { Queue, type QueueWorkerConfig } from './queue'; +import { CacheProvider, createCache } from '@stock-bot/cache'; +import type { + QueueManagerConfig, + QueueOptions, + GlobalStats, + QueueStats, + RateLimitRule, + RedisConfig +} from './types'; +import { getRedisConnection } from './utils'; + +const logger = getLogger('queue-manager'); + +/** + * Improved Singleton QueueManager with better patterns + */ +export class QueueManager { + private static instance: QueueManager | null = null; + private static initializationPromise: Promise | null = null; + private static config: QueueManagerConfig | null = null; + + private queues = new Map(); + private caches = new Map(); + private rateLimiter?: QueueRateLimiter; + private redisConnection: ReturnType; + private isShuttingDown = false; + private shutdownPromise: Promise | null = null; + + private constructor(config: QueueManagerConfig) { + this.redisConnection = getRedisConnection(config.redis); + + // Initialize rate limiter if rules are provided + if (config.rateLimitRules && config.rateLimitRules.length > 0) { + this.rateLimiter = new QueueRateLimiter(this.redisConnection); + config.rateLimitRules.forEach(rule => { + this.rateLimiter!.addRule(rule); + }); + } + + logger.info('QueueManager singleton initialized'); + } + + /** + * Initialize the singleton with config (thread-safe) + * Should be called once at application startup + */ + static async initialize(config: QueueManagerConfig): Promise { + // If already initializing, return the same promise (prevents race conditions) + if (QueueManager.initializationPromise) { + logger.debug('QueueManager initialization already in progress'); + return QueueManager.initializationPromise; + } + + // If already initialized, validate config matches + if (QueueManager.instance) { + if (!QueueManager.isConfigEqual(config, QueueManager.config!)) { + throw new Error('QueueManager already initialized with different config'); + } + return QueueManager.instance; + } + + // Start initialization + QueueManager.initializationPromise = (async () => { + try { + QueueManager.config = config; + QueueManager.instance = new QueueManager(config); + + // Wait for connections to be ready + await QueueManager.instance.waitUntilReady(); + + return QueueManager.instance; + } catch (error) { + // Clean up on failure + QueueManager.instance = null; + QueueManager.config = null; + QueueManager.initializationPromise = null; + throw error; + } + })(); + + return QueueManager.initializationPromise; + } + + /** + * Get the singleton instance + * Throws if not initialized - forces explicit initialization + */ + static getInstance(): QueueManager { + if (!QueueManager.instance) { + throw new Error( + 'QueueManager not initialized. Call QueueManager.initialize(config) first.' + ); + } + return QueueManager.instance; + } + + /** + * Check if QueueManager is initialized + */ + static isInitialized(): boolean { + return QueueManager.instance !== null; + } + + /** + * Get current configuration (readonly) + */ + static getConfig(): Readonly | null { + return QueueManager.config ? { ...QueueManager.config } : null; + } + + /** + * Reset the singleton (thread-safe, mainly for testing) + */ + static async reset(): Promise { + // Wait for any ongoing initialization + if (QueueManager.initializationPromise) { + try { + await QueueManager.initializationPromise; + } catch { + // Ignore initialization errors during reset + } + } + + if (QueueManager.instance) { + await QueueManager.instance.shutdown(); + } + + QueueManager.instance = null; + QueueManager.config = null; + QueueManager.initializationPromise = null; + } + + /** + * Compare two configs for equality + */ + private static isConfigEqual(a: QueueManagerConfig, b: QueueManagerConfig): boolean { + return ( + a.redis.host === b.redis.host && + a.redis.port === b.redis.port && + a.redis.password === b.redis.password && + a.redis.db === b.redis.db + ); + } + + /** + * Wait until all connections are ready + */ + async waitUntilReady(timeout: number = 5000): Promise { + const startTime = Date.now(); + + // Wait for all queues to be ready + const readyPromises = Array.from(this.queues.values()).map(queue => + queue.waitUntilReady() + ); + + // Add timeout + await Promise.race([ + Promise.all(readyPromises), + new Promise((_, reject) => + setTimeout(() => reject(new Error('QueueManager ready timeout')), timeout) + ) + ]); + + logger.debug('QueueManager ready', { + duration: Date.now() - startTime + }); + } + + /** + * Get or create a queue - unified method that handles both scenarios + */ + getQueue(queueName: string, options: QueueOptions = {}): Queue { + if (this.isShuttingDown) { + throw new Error('QueueManager is shutting down'); + } + + // Return existing queue if it exists + if (this.queues.has(queueName)) { + return this.queues.get(queueName)!; + } + + // Create new queue with merged options + const mergedOptions = { + ...QueueManager.config!.defaultQueueOptions, + ...options, + }; + + // Prepare queue configuration + const queueConfig: QueueWorkerConfig = { + workers: mergedOptions.workers, + concurrency: mergedOptions.concurrency, + startWorker: mergedOptions.workers && mergedOptions.workers > 0, + }; + + const queue = new Queue( + queueName, + QueueManager.config!.redis, + mergedOptions.defaultJobOptions || {}, + queueConfig + ); + + // Store the queue + this.queues.set(queueName, queue); + + // Automatically initialize batch cache for the queue + this.initializeBatchCacheSync(queueName); + + // Add queue-specific rate limit rules + if (this.rateLimiter && mergedOptions.rateLimitRules) { + mergedOptions.rateLimitRules.forEach(rule => { + const ruleWithQueue = { ...rule, queueName }; + this.rateLimiter!.addRule(ruleWithQueue); + }); + } + + logger.info('Queue created with batch cache', { + queueName, + workers: mergedOptions.workers || 0, + concurrency: mergedOptions.concurrency || 1 + }); + + return queue; + } + + /** + * Initialize batch cache synchronously + */ + private initializeBatchCacheSync(queueName: string): void { + this.getCache(queueName); + logger.debug('Batch cache initialized synchronously for queue', { queueName }); + } + + /** + * Get or create a cache for a queue + */ + getCache(queueName: string): CacheProvider { + if (!this.caches.has(queueName)) { + const cacheProvider = createCache({ + redisConfig: QueueManager.config!.redis, + keyPrefix: `batch:${queueName}:`, + ttl: 86400, // 24 hours default + enableMetrics: true, + }); + this.caches.set(queueName, cacheProvider); + logger.debug('Cache created for queue', { queueName }); + } + return this.caches.get(queueName)!; + } + + /** + * Shutdown all queues and workers (thread-safe) + */ + async shutdown(): Promise { + // If already shutting down, return the existing promise + if (this.shutdownPromise) { + return this.shutdownPromise; + } + + if (this.isShuttingDown) { + return; + } + + this.isShuttingDown = true; + logger.info('Shutting down QueueManager...'); + + this.shutdownPromise = this.performShutdown(); + return this.shutdownPromise; + } + + /** + * Perform the actual shutdown + */ + private async performShutdown(): Promise { + try { + // Close all queues (this now includes workers) + const queueShutdownPromises = Array.from(this.queues.values()).map(async (queue) => { + try { + await queue.close(); + } catch (error) { + logger.warn('Error closing queue', { error: error.message }); + } + }); + + await Promise.all(queueShutdownPromises); + + // Close all caches + const cacheShutdownPromises = Array.from(this.caches.values()).map(async (cache) => { + try { + if (typeof cache.disconnect === 'function') { + await cache.disconnect(); + } else if (typeof cache.close === 'function') { + await cache.close(); + } else if (typeof cache.quit === 'function') { + await cache.quit(); + } + } catch (error) { + logger.warn('Error closing cache', { error: error.message }); + } + }); + + await Promise.all(cacheShutdownPromises); + + // Clear collections + this.queues.clear(); + this.caches.clear(); + + logger.info('QueueManager shutdown complete'); + } catch (error) { + logger.error('Error during shutdown', { error: error.message }); + throw error; + } + } + + // ... rest of the methods remain the same ... + + hasQueue(queueName: string): boolean { + return this.queues.has(queueName); + } + + getQueueNames(): string[] { + return Array.from(this.queues.keys()); + } + + async getGlobalStats(): Promise { + const queueStats: Record = {}; + let totalJobs = 0; + let totalWorkers = 0; + + for (const [queueName, queue] of this.queues) { + const stats = await queue.getStats(); + queueStats[queueName] = stats; + + totalJobs += stats.waiting + stats.active + stats.completed + stats.failed + stats.delayed; + totalWorkers += stats.workers || 0; + } + + return { + queues: queueStats, + totalJobs, + totalWorkers, + uptime: process.uptime(), + }; + } +} \ No newline at end of file diff --git a/libs/queue/src/index.ts b/libs/queue/src/index.ts index 0f66ada..77af269 100644 --- a/libs/queue/src/index.ts +++ b/libs/queue/src/index.ts @@ -17,7 +17,7 @@ export { } from './queue-factory'; // DLQ handling -export { DLQHandler } from './dlq-handler'; +export { DeadLetterQueueHandler, DeadLetterQueueHandler as DLQHandler } from './dlq-handler'; // Metrics export { QueueMetricsCollector } from './queue-metrics';