import { Queue as BullQueue, type Job } from 'bullmq'; import { createCache } from '@stock-bot/cache'; import type { CacheProvider } from '@stock-bot/cache'; import type { HandlerRegistry } from '@stock-bot/handler-registry'; import { getLogger } from '@stock-bot/logger'; import { Queue, type QueueWorkerConfig } from './queue'; import { QueueRateLimiter } from './rate-limiter'; import { getFullQueueName, parseQueueName } from './service-utils'; import type { GlobalStats, JobData, JobOptions, QueueManagerConfig, QueueOptions, QueueRoute, QueueStats, RateLimitRule, } from './types'; import { getRedisConnection } from './utils'; // Logger interface for type safety interface Logger { info(message: string, meta?: Record): void; error(message: string, meta?: Record): void; warn(message: string, meta?: Record): void; debug(message: string, meta?: Record): void; trace(message: string, meta?: Record): void; child?(name: string, context?: Record): Logger; } /** * QueueManager provides unified queue and cache management with service discovery * Handles both local and cross-service queue operations */ export class QueueManager { private queues = new Map(); private caches = new Map(); private rateLimiter?: QueueRateLimiter; private redisConnection: ReturnType; private isShuttingDown = false; private shutdownPromise: Promise | null = null; private config: QueueManagerConfig; private readonly logger: Logger; // Service discovery features private serviceName?: string; private queueRoutes = new Map(); private producerQueues = new Map(); // For cross-service sending private handlerRegistry?: HandlerRegistry; constructor(config: QueueManagerConfig, handlerRegistry?: HandlerRegistry, logger?: Logger) { // Always use DB 0 for queues if service name is provided if (config.serviceName) { config = { ...config, redis: { ...config.redis, db: 0, // All queues in DB 0 for cross-service communication }, }; } this.config = config; this.serviceName = config.serviceName; this.handlerRegistry = handlerRegistry; this.logger = logger || getLogger('QueueManager'); this.redisConnection = getRedisConnection(config.redis); // Initialize rate limiter if rules are provided if (config.rateLimitRules && config.rateLimitRules.length > 0) { this.rateLimiter = new QueueRateLimiter(this.redisConnection, this.logger); config.rateLimitRules.forEach(rule => { if (this.rateLimiter) { this.rateLimiter.addRule(rule); } }); } // Auto-discover routes if enabled and registry provided if (config.serviceName && config.autoDiscoverHandlers !== false && handlerRegistry) { this.discoverQueueRoutes(); } this.logger.info('QueueManager initialized', { redis: `${config.redis.host}:${config.redis.port}`, service: this.serviceName, discoveredRoutes: this.queueRoutes.size, hasRegistry: !!handlerRegistry, }); } /** * Get or create a queue - unified method that handles both scenarios * This is the main method for accessing queues * If serviceName is configured, automatically handles namespacing */ getQueue(queueName: string, options: QueueOptions = {}): Queue { let fullQueueName = queueName; let isOwnQueue = true; // Handle service namespacing if service name is configured if (this.serviceName) { const parsed = parseQueueName(queueName); if (parsed) { // Already in service:handler format fullQueueName = queueName; isOwnQueue = parsed.service === this.serviceName; } else { // Just handler name, assume it's for current service fullQueueName = getFullQueueName(this.serviceName, queueName); isOwnQueue = true; } // For cross-service queues, create without workers (producer-only) if (!isOwnQueue) { options = { ...options, workers: 0, // No workers for other services' queues }; } else { // For own service queues, include handler registry options = { ...options, handlerRegistry: this.handlerRegistry }; } queueName = fullQueueName; } // Return existing queue if it exists if (this.queues.has(queueName)) { const existingQueue = this.queues.get(queueName); if (existingQueue) { return existingQueue; } } // Create new queue with merged options const mergedOptions = { ...this.config.defaultQueueOptions, ...options, }; // Prepare queue configuration const workers = mergedOptions.workers ?? this.config.defaultQueueOptions?.workers ?? 1; const concurrency = mergedOptions.concurrency ?? this.config.defaultQueueOptions?.concurrency ?? 1; const queueConfig: QueueWorkerConfig = { workers, concurrency, startWorker: workers > 0 && !this.config.delayWorkerStart, handlerRegistry: options.handlerRegistry || this.handlerRegistry, }; const queue = new Queue( queueName, this.config.redis, mergedOptions.defaultJobOptions || {}, queueConfig, this.logger ); // Store the queue this.queues.set(queueName, queue); // Automatically initialize batch cache for the queue this.initializeBatchCacheSync(queueName); // Add queue-specific rate limit rules if (this.rateLimiter && mergedOptions.rateLimitRules) { mergedOptions.rateLimitRules.forEach(rule => { // Ensure queue name is set for queue-specific rules const ruleWithQueue = { ...rule, queueName }; if (this.rateLimiter) { this.rateLimiter.addRule(ruleWithQueue); } }); } this.logger.info('Queue created with batch cache', { queueName, originalQueueName: options.handlerRegistry ? 'has-handler-registry' : 'no-handler-registry', workers: workers, concurrency: concurrency, handlerRegistryProvided: !!this.handlerRegistry, willStartWorkers: workers > 0 && !this.config.delayWorkerStart, isOwnQueue, serviceName: this.serviceName, }); return queue; } /** * Check if a queue exists */ hasQueue(queueName: string): boolean { return this.queues.has(queueName); } /** * Get all queue names */ getQueueNames(): string[] { return Array.from(this.queues.keys()); } /** * Get or create a cache for a queue */ getCache(queueName: string): CacheProvider { if (!this.caches.has(queueName)) { const cacheProvider = createCache({ redisConfig: this.config.redis, keyPrefix: `batch:${queueName}:`, ttl: 86400, // 24 hours default enableMetrics: true, logger: this.logger, }); this.caches.set(queueName, cacheProvider); this.logger.trace('Cache created for queue', { queueName }); } const cache = this.caches.get(queueName); if (!cache) { throw new Error(`Expected cache for queue ${queueName} to exist`); } return cache; } /** * Initialize cache for a queue (ensures it's ready) */ async initializeCache(queueName: string): Promise { const cache = this.getCache(queueName); await cache.waitForReady(10000); this.logger.info('Cache initialized for queue', { queueName }); } /** * Initialize batch cache synchronously (for automatic initialization) * The cache will be ready for use, but we don't wait for Redis connection */ private initializeBatchCacheSync(queueName: string): void { // Just create the cache - it will connect automatically when first used this.getCache(queueName); this.logger.trace('Batch cache initialized synchronously for queue', { queueName }); } /** * Get the queues map (for subclasses) */ protected getQueues(): Map { return this.queues; } /** * Get statistics for all queues */ async getGlobalStats(): Promise { const queueStats: Record = {}; let totalJobs = 0; let totalWorkers = 0; for (const [queueName, queue] of this.queues) { const stats = await queue.getStats(); queueStats[queueName] = stats; totalJobs += stats.waiting + stats.active + stats.completed + stats.failed + stats.delayed; totalWorkers += stats.workers || 0; } return { queues: queueStats, totalJobs, totalWorkers, uptime: process.uptime(), }; } /** * Get statistics for a specific queue */ async getQueueStats(queueName: string): Promise { const queue = this.queues.get(queueName); if (!queue) { return undefined; } return await queue.getStats(); } /** * Add a rate limit rule */ addRateLimitRule(rule: RateLimitRule): void { if (!this.rateLimiter) { this.rateLimiter = new QueueRateLimiter(this.redisConnection, this.logger); } this.rateLimiter.addRule(rule); } /** * Check rate limits for a job */ async checkRateLimit( queueName: string, handler: string, operation: string ): Promise<{ allowed: boolean; retryAfter?: number; remainingPoints?: number; appliedRule?: RateLimitRule; }> { if (!this.rateLimiter) { return { allowed: true }; } return await this.rateLimiter.checkLimit(queueName, handler, operation); } /** * Get rate limit status */ async getRateLimitStatus(queueName: string, handler: string, operation: string) { if (!this.rateLimiter) { return { queueName, handler, operation, }; } return await this.rateLimiter.getStatus(queueName, handler, operation); } /** * Pause all queues */ async pauseAll(): Promise { const pausePromises = Array.from(this.queues.values()).map(queue => queue.pause()); await Promise.all(pausePromises); this.logger.info('All queues paused'); } /** * Resume all queues */ async resumeAll(): Promise { const resumePromises = Array.from(this.queues.values()).map(queue => queue.resume()); await Promise.all(resumePromises); this.logger.info('All queues resumed'); } /** * Pause a specific queue */ async pauseQueue(queueName: string): Promise { const queue = this.queues.get(queueName); if (!queue) { return false; } await queue.pause(); return true; } /** * Resume a specific queue */ async resumeQueue(queueName: string): Promise { const queue = this.queues.get(queueName); if (!queue) { return false; } await queue.resume(); return true; } /** * Drain all queues */ async drainAll(delayed = false): Promise { const drainPromises = Array.from(this.queues.values()).map(queue => queue.drain(delayed)); await Promise.all(drainPromises); this.logger.info('All queues drained', { delayed }); } /** * Clean all queues */ async cleanAll( grace: number = 0, limit: number = 100, type: 'completed' | 'failed' = 'completed' ): Promise { const cleanPromises = Array.from(this.queues.values()).map(queue => queue.clean(grace, limit, type) ); await Promise.all(cleanPromises); this.logger.info('All queues cleaned', { type, grace, limit }); } /** * Shutdown all queues and workers (thread-safe) */ async shutdown(): Promise { // If already shutting down, return the existing promise if (this.shutdownPromise) { return this.shutdownPromise; } if (this.isShuttingDown) { return; } this.isShuttingDown = true; this.logger.info('Shutting down QueueManager...'); // Create shutdown promise this.shutdownPromise = this.performShutdown(); return this.shutdownPromise; } /** * Perform the actual shutdown */ private async performShutdown(): Promise { try { // Close all queues (this now includes workers since they're managed by Queue class) const queueShutdownPromises = Array.from(this.queues.values()).map(async queue => { try { // Add timeout to queue.close() to prevent hanging await queue.close(); // const timeoutPromise = new Promise((_, reject) => // setTimeout(() => reject(new Error('Queue close timeout')), 100) // ); // await Promise.race([closePromise, timeoutPromise]); } catch (error) { this.logger.warn('Error closing queue', { error: (error as Error).message }); } }); await Promise.all(queueShutdownPromises); // Close all caches const cacheShutdownPromises = Array.from(this.caches.values()).map(async cache => { try { // Clear cache before shutdown await cache.clear(); } catch (error) { this.logger.warn('Error clearing cache', { error: (error as Error).message }); } }); await Promise.all(cacheShutdownPromises); // Clear collections this.queues.clear(); this.caches.clear(); this.logger.info('QueueManager shutdown complete'); } catch (error) { this.logger.error('Error during shutdown', { error: (error as Error).message }); throw error; } finally { // Reset shutdown state this.shutdownPromise = null; this.isShuttingDown = false; } } /** * Start workers for all queues (used when delayWorkerStart is enabled) */ startAllWorkers(): void { if (!this.config.delayWorkerStart) { this.logger.info( 'startAllWorkers() called but workers already started automatically (delayWorkerStart is false)' ); return; } let workersStarted = 0; const queues = this.queues; this.logger.info(`Starting workers for ${queues.size} queues: ${Array.from(queues.keys()).join(', ')} (service: ${this.serviceName})`); for (const [queueName, queue] of queues) { // If we have a service name, check if this queue belongs to us if (this.serviceName) { const parsed = parseQueueName(queueName); // Skip if not our service's queue if (parsed && parsed.service !== this.serviceName) { this.logger.trace('Skipping workers for cross-service queue', { queueName, ownerService: parsed.service, currentService: this.serviceName, }); continue; } } const workerCount = this.config.defaultQueueOptions?.workers || 1; const concurrency = this.config.defaultQueueOptions?.concurrency || 1; if (workerCount > 0) { queue.startWorkersManually(workerCount, concurrency); workersStarted++; this.logger.debug('Started workers for queue', { queueName, workers: workerCount, concurrency, }); } } this.logger.info('Service workers started', { service: this.serviceName || 'default', totalQueues: queues.size, queuesWithWorkers: workersStarted, delayWorkerStart: this.config.delayWorkerStart, }); } /** * Wait for all queues to be ready */ async waitUntilReady(): Promise { const readyPromises = Array.from(this.queues.values()).map(queue => queue.waitUntilReady()); await Promise.all(readyPromises); } /** * Get Redis configuration (for backward compatibility) */ getRedisConfig() { return this.config.redis; } /** * Get the current configuration */ getConfig(): Readonly { return { ...this.config }; } /** * Send a job to any queue (local or remote) * This is the main method for cross-service communication */ async send( targetQueue: string, operation: string, payload: unknown, options: JobOptions = {} ): Promise { if (!this.serviceName) { // If no service name, just use regular queue const queue = this.getQueue(targetQueue); return queue.add(operation, { handler: targetQueue, operation, payload }, options); } // Resolve the target queue const route = this.resolveQueueRoute(targetQueue); if (!route) { throw new Error(`Unknown queue: ${targetQueue}`); } // Validate operation if we have metadata if (route.operations && !route.operations.includes(operation)) { this.logger.warn('Operation not found in handler metadata', { handler: route.handler, operation, availableOperations: route.operations, }); } // Use a producer queue for cross-service sending const producerQueue = this.getProducerQueue(route.fullName); const jobData: JobData = { handler: route.handler, operation, payload, }; this.logger.trace('Sending job to queue', { targetQueue: route.fullName, handler: route.handler, operation, fromService: this.serviceName, toService: route.service, }); return producerQueue.add(operation, jobData, options); } /** * Resolve a queue name to a route */ private resolveQueueRoute(queueName: string): QueueRoute | null { // Check if it's a full queue name with service prefix const parsed = parseQueueName(queueName); if (parsed) { // Try to find in discovered routes by handler name const route = this.queueRoutes.get(parsed.handler); if (route && route.service === parsed.service) { return route; } // Create a route on the fly return { fullName: queueName, service: parsed.service, handler: parsed.handler, db: 0, // All queues in DB 0 }; } // Check if it's just a handler name in our routes const route = this.queueRoutes.get(queueName); if (route) { return route; } // Try to find in handler registry const ownerService = this.handlerRegistry?.getHandlerService(queueName); if (ownerService) { return { fullName: getFullQueueName(ownerService, queueName), service: ownerService, handler: queueName, db: 0, // All queues in DB 0 }; } return null; } /** * Get a producer queue for sending to other services */ private getProducerQueue(queueName: string): BullQueue { if (!this.producerQueues.has(queueName)) { const queue = new BullQueue(queueName, { connection: this.redisConnection, defaultJobOptions: this.config.defaultQueueOptions?.defaultJobOptions, }); this.producerQueues.set(queueName, queue); } return this.producerQueues.get(queueName)!; } /** * Discover all available queue routes from handler registry */ private discoverQueueRoutes(): void { if (!this.handlerRegistry) { this.logger.warn('No handler registry provided, skipping route discovery'); return; } try { const handlers = this.handlerRegistry.getAllMetadata(); for (const [handlerName, metadata] of handlers) { // Get the service that registered this handler const ownerService = metadata.service; if (ownerService) { const fullName = getFullQueueName(ownerService, handlerName); this.queueRoutes.set(handlerName, { fullName, service: ownerService, handler: handlerName, db: 0, // All queues in DB 0 operations: metadata.operations.map((op: any) => op.name), }); this.logger.trace('Discovered queue route', { handler: handlerName, service: ownerService, operations: metadata.operations.length, }); } } // Also discover handlers registered by the current service if (this.serviceName) { const myHandlers = this.handlerRegistry.getServiceHandlers(this.serviceName); for (const metadata of myHandlers) { const handlerName = metadata.name; if (!this.queueRoutes.has(handlerName)) { const fullName = getFullQueueName(this.serviceName, handlerName); this.queueRoutes.set(handlerName, { fullName, service: this.serviceName, handler: handlerName, db: 0, // All queues in DB 0 }); } } } this.logger.info('Queue routes discovered', { totalRoutes: this.queueRoutes.size, routes: Array.from(this.queueRoutes.keys()), }); } catch (error) { this.logger.error('Failed to discover queue routes', { error }); } } }