import { Queue as BullQueue, type Job } from 'bullmq'; import { handlerRegistry } from '@stock-bot/handlers'; import { getLogger, type Logger } from '@stock-bot/logger'; import { QueueManager } from './queue-manager'; import { Queue } from './queue'; import type { SmartQueueConfig, QueueRoute, JobData, JobOptions, RedisConfig } from './types'; import { getFullQueueName, parseQueueName } from './service-utils'; import { getRedisConnection } from './utils'; /** * Smart Queue Manager with automatic service discovery and routing * Handles cross-service communication seamlessly */ export class SmartQueueManager extends QueueManager { private serviceName: string; private queueRoutes = new Map(); private connections = new Map(); // Redis connections by DB private producerQueues = new Map(); // For cross-service sending private _logger: Logger; constructor(config: SmartQueueConfig, logger?: Logger) { // Always use DB 0 for queues (unified queue database) const modifiedConfig = { ...config, redis: { ...config.redis, db: 0, // All queues in DB 0 }, }; super(modifiedConfig, logger); this.serviceName = config.serviceName; this._logger = logger || getLogger('SmartQueueManager'); // Auto-discover routes if enabled if (config.autoDiscoverHandlers !== false) { this.discoverQueueRoutes(); } this._logger.info('SmartQueueManager initialized', { service: this.serviceName, discoveredRoutes: this.queueRoutes.size, }); } /** * Discover all available queue routes from handler registry */ private discoverQueueRoutes(): void { try { const handlers = handlerRegistry.getAllHandlers(); for (const [handlerName, handlerConfig] of handlers) { // Get the service that registered this handler const ownerService = handlerRegistry.getHandlerService(handlerName); if (ownerService) { const fullName = getFullQueueName(ownerService, handlerName); this.queueRoutes.set(handlerName, { fullName, service: ownerService, handler: handlerName, db: 0, // All queues in DB 0 operations: Object.keys(handlerConfig.operations || {}), }); this._logger.trace('Discovered queue route', { handler: handlerName, service: ownerService, operations: Object.keys(handlerConfig.operations || {}).length, }); } else { this._logger.warn('Handler has no service ownership', { handlerName }); } } // Also discover handlers registered by the current service const myHandlers = handlerRegistry.getServiceHandlers(this.serviceName); for (const handlerName of myHandlers) { if (!this.queueRoutes.has(handlerName)) { const fullName = getFullQueueName(this.serviceName, handlerName); this.queueRoutes.set(handlerName, { fullName, service: this.serviceName, handler: handlerName, db: 0, // All queues in DB 0 }); } } this._logger.info('Queue routes discovered', { totalRoutes: this.queueRoutes.size, routes: Array.from(this.queueRoutes.values()).map(r => ({ handler: r.handler, service: r.service })), }); } catch (error) { this._logger.error('Failed to discover queue routes', { error }); } } /** * Get or create a Redis connection for a specific DB */ private getConnection(db: number): any { if (!this.connections.has(db)) { const redisConfig: RedisConfig = { ...this.getRedisConfig(), db, }; const connection = getRedisConnection(redisConfig); this.connections.set(db, connection); this._logger.debug('Created Redis connection', { db }); } return this.connections.get(db); } /** * Get a queue for the current service (for processing) * Overrides parent to use namespaced queue names and ensure service-specific workers */ override getQueue(queueName: string, options = {}): Queue { // Check if this is already a full queue name (service:handler format) const parsed = parseQueueName(queueName); let fullQueueName: string; let isOwnQueue: boolean; if (parsed) { // Already in service:handler format fullQueueName = queueName; isOwnQueue = parsed.service === this.serviceName; } else { // Just handler name, assume it's for current service fullQueueName = getFullQueueName(this.serviceName, queueName); isOwnQueue = true; } // For cross-service queues, create without workers (producer-only) if (!isOwnQueue) { return super.getQueue(fullQueueName, { ...options, workers: 0, // No workers for other services' queues }); } // For own service queues, use configured workers return super.getQueue(fullQueueName, options); } /** * Send a job to any queue (local or remote) * This is the main method for cross-service communication */ async send( targetQueue: string, operation: string, payload: unknown, options: JobOptions = {} ): Promise { // Resolve the target queue const route = this.resolveQueueRoute(targetQueue); if (!route) { throw new Error(`Unknown queue: ${targetQueue}`); } // Validate operation if we have metadata if (route.operations && !route.operations.includes(operation)) { this._logger.warn('Operation not found in handler metadata', { queue: targetQueue, operation, available: route.operations, }); } // Get or create producer queue for the target const producerQueue = this.getProducerQueue(route); // Create job data const jobData: JobData = { handler: route.handler, operation, payload, }; // Send the job const job = await producerQueue.add(operation, jobData, options); this._logger.debug('Job sent to queue', { from: this.serviceName, to: route.service, queue: route.handler, operation, jobId: job.id, }); return job; } /** * Alias for send() with more explicit name */ async sendTo( targetService: string, handler: string, operation: string, payload: unknown, options: JobOptions = {} ): Promise { const fullQueueName = `${targetService}:${handler}`; return this.send(fullQueueName, operation, payload, options); } /** * Resolve a queue name to a route */ private resolveQueueRoute(queueName: string): QueueRoute | null { // Check if it's a full queue name with service prefix const parsed = parseQueueName(queueName); if (parsed) { // Try to find in discovered routes by handler name const route = this.queueRoutes.get(parsed.handler); if (route && route.service === parsed.service) { return route; } // Create a route on the fly return { fullName: queueName, service: parsed.service, handler: parsed.handler, db: 0, // All queues in DB 0 }; } // Check if it's just a handler name in our routes const route = this.queueRoutes.get(queueName); if (route) { return route; } // Try to find in handler registry const ownerService = handlerRegistry.getHandlerService(queueName); if (ownerService) { return { fullName: getFullQueueName(ownerService, queueName), service: ownerService, handler: queueName, db: 0, // All queues in DB 0 }; } return null; } /** * Get or create a producer queue for cross-service communication */ private getProducerQueue(route: QueueRoute): BullQueue { if (!this.producerQueues.has(route.fullName)) { const connection = this.getConnection(route.db); // Use the same queue name format as workers const queue = new BullQueue(route.fullName, { connection, defaultJobOptions: this.getConfig().defaultQueueOptions?.defaultJobOptions || {}, }); this.producerQueues.set(route.fullName, queue); } return this.producerQueues.get(route.fullName)!; } /** * Get all queues (for monitoring purposes) */ getAllQueues(): Record { const allQueues: Record = {}; // Get all worker queues using public API const workerQueueNames = this.getQueueNames(); for (const name of workerQueueNames) { const queue = this.getQueue(name); if (queue && typeof queue.getBullQueue === 'function') { // Extract the underlying BullMQ queue using the public getter // Use the simple handler name without service prefix for display const parsed = parseQueueName(name); const simpleName = parsed ? parsed.handler : name; if (simpleName) { allQueues[simpleName] = queue.getBullQueue(); } } } // Add producer queues for (const [name, queue] of this.producerQueues) { // Use the simple handler name without service prefix for display const parsed = parseQueueName(name); const simpleName = parsed ? parsed.handler : name; if (simpleName && !allQueues[simpleName]) { allQueues[simpleName] = queue; } } // If no queues found, create from discovered routes if (Object.keys(allQueues).length === 0) { for (const [handlerName, route] of this.queueRoutes) { const connection = this.getConnection(0); // Use unified queue DB allQueues[handlerName] = new BullQueue(route.fullName, { connection, defaultJobOptions: this.getConfig().defaultQueueOptions?.defaultJobOptions || {}, }); } } return allQueues; } /** * Get statistics for all queues across all services */ async getAllStats(): Promise> { const stats: Record = {}; // Get stats for local queues stats[this.serviceName] = await this.getGlobalStats(); // Get stats for other services if we have access // This would require additional implementation return stats; } /** * Start workers for all queues belonging to this service * Overrides parent to ensure only own queues get workers */ override startAllWorkers(): void { if (!this.getConfig().delayWorkerStart) { this._logger.info( 'startAllWorkers() called but workers already started automatically (delayWorkerStart is false)' ); return; } let workersStarted = 0; const queues = this.getQueues(); for (const [queueName, queue] of queues) { // Parse queue name to check if it belongs to this service const parsed = parseQueueName(queueName); // Skip if not our service's queue if (parsed && parsed.service !== this.serviceName) { this._logger.trace('Skipping workers for cross-service queue', { queueName, ownerService: parsed.service, currentService: this.serviceName, }); continue; } const workerCount = this.getConfig().defaultQueueOptions?.workers || 1; const concurrency = this.getConfig().defaultQueueOptions?.concurrency || 1; if (workerCount > 0) { queue.startWorkersManually(workerCount, concurrency); workersStarted++; this._logger.debug('Started workers for queue', { queueName, workers: workerCount, concurrency, }); } } this._logger.info('Service workers started', { service: this.serviceName, totalQueues: queues.size, queuesWithWorkers: workersStarted, delayWorkerStart: this.getConfig().delayWorkerStart, }); } /** * Graceful shutdown */ override async shutdown(): Promise { // Close producer queues for (const [name, queue] of this.producerQueues) { await queue.close(); this._logger.debug('Closed producer queue', { queue: name }); } // Close additional connections for (const [db, connection] of this.connections) { if (db !== 0) { // Don't close our main connection (DB 0 for queues) connection.disconnect(); this._logger.debug('Closed Redis connection', { db }); } } // Call parent shutdown await super.shutdown(); } }