diff --git a/apps/stock/config/config/default.json b/apps/stock/config/config/default.json index 9cf829b..b7bc587 100644 --- a/apps/stock/config/config/default.json +++ b/apps/stock/config/config/default.json @@ -80,7 +80,7 @@ "workers": 1, "concurrency": 1, "enableScheduledJobs": true, - "delayWorkerStart": false, + "delayWorkerStart": true, "defaultJobOptions": { "attempts": 3, "backoff": { diff --git a/apps/stock/data-ingestion/scripts/trigger-job.ts b/apps/stock/data-ingestion/scripts/trigger-job.ts new file mode 100644 index 0000000..0158c63 --- /dev/null +++ b/apps/stock/data-ingestion/scripts/trigger-job.ts @@ -0,0 +1,31 @@ +#!/usr/bin/env bun + +import Redis from 'ioredis'; +import { Queue } from 'bullmq'; + +const redis = new Redis({ + host: 'localhost', + port: 6379, + db: 0, +}); + +const queue = new Queue('{data-ingestion_webshare}', { + connection: redis, +}); + +async function triggerJob() { + console.log('Triggering webshare fetch-proxies job...'); + + const job = await queue.add('fetch-proxies', { + handler: 'webshare', + operation: 'fetch-proxies', + payload: {}, + }); + + console.log(`Job ${job.id} added to queue`); + + await redis.quit(); + process.exit(0); +} + +triggerJob().catch(console.error); \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/ceo/ceo.handler.ts b/apps/stock/data-ingestion/src/handlers/ceo/ceo.handler.ts index 443f97e..a3a813d 100644 --- a/apps/stock/data-ingestion/src/handlers/ceo/ceo.handler.ts +++ b/apps/stock/data-ingestion/src/handlers/ceo/ceo.handler.ts @@ -22,7 +22,7 @@ export class CeoHandler extends BaseHandler { updateCeoChannels = updateCeoChannels; @Operation('update-unique-symbols') - @ScheduledOperation('process-unique-symbols', '0 0 1 * *', { + @ScheduledOperation('update-unique-symbols', '0 0 1 * *', { priority: 5, immediately: false, description: 'Process unique CEO symbols and schedule individual jobs', diff --git a/apps/stock/data-ingestion/src/handlers/index.ts b/apps/stock/data-ingestion/src/handlers/index.ts index f193237..acf9ca6 100644 --- a/apps/stock/data-ingestion/src/handlers/index.ts +++ b/apps/stock/data-ingestion/src/handlers/index.ts @@ -32,14 +32,43 @@ export async function initializeAllHandlers(serviceContainer: IServiceContainer) count: handlers.length, handlers: handlers.map(h => (h as any).__handlerName || h.name), }); + + // Log what metadata the handlers have + for (const HandlerClass of handlers) { + const metadata = (HandlerClass as any).__handlerMetadata; + logger.info(`Handler ${HandlerClass.name} metadata`, { + hasMetadata: !!metadata, + handlerName: metadata?.name, + operationCount: metadata?.operations?.length || 0, + scheduledJobCount: metadata?.scheduledJobs?.length || 0, + }); + } - // If the container has a handler scanner, we can manually register these - const scanner = (serviceContainer as any).handlerScanner; - if (scanner?.registerHandlerClass) { - for (const HandlerClass of handlers) { - scanner.registerHandlerClass(HandlerClass, { serviceName: 'data-ingestion' }); + // Get the DI container from the service container + const diContainer = (serviceContainer as any)._diContainer; + + if (diContainer?.resolve) { + // Get handler scanner from DI container + const scanner = diContainer.resolve('handlerScanner'); + if (scanner?.registerHandlerClass) { + for (const HandlerClass of handlers) { + scanner.registerHandlerClass(HandlerClass, { serviceName: 'data-ingestion' }); + } + logger.info('Handlers registered with scanner'); + } else { + logger.warn('Handler scanner not found or missing registerHandlerClass method'); } - logger.info('Handlers registered with scanner'); + + // Also check the handler registry directly + const handlerRegistry = diContainer.resolve('handlerRegistry'); + if (handlerRegistry) { + logger.info('Handler registry state after registration', { + registeredHandlers: handlerRegistry.getHandlerNames(), + handlersWithSchedule: handlerRegistry.getAllHandlersWithSchedule().size, + }); + } + } else { + logger.error('Could not access DI container from service container'); } } catch (error) { logger.error('Handler initialization failed', { error }); diff --git a/libs/core/config/src/schemas/service.schema.ts b/libs/core/config/src/schemas/service.schema.ts index 004725e..a8db927 100644 --- a/libs/core/config/src/schemas/service.schema.ts +++ b/libs/core/config/src/schemas/service.schema.ts @@ -45,7 +45,7 @@ export const queueConfigSchema = z.object({ workers: z.number().default(1), concurrency: z.number().default(1), enableScheduledJobs: z.boolean().default(true), - delayWorkerStart: z.boolean().default(false), + delayWorkerStart: z.boolean().default(true), // ServiceApplication handles worker startup defaultJobOptions: z .object({ attempts: z.number().default(3), diff --git a/libs/core/di/src/container/builder.ts b/libs/core/di/src/container/builder.ts index 9dffb6a..be2255e 100644 --- a/libs/core/di/src/container/builder.ts +++ b/libs/core/di/src/container/builder.ts @@ -149,7 +149,12 @@ export class ServiceContainerBuilder { // Register handler infrastructure first container.register({ handlerRegistry: asClass(HandlerRegistry).singleton(), - handlerScanner: asClass(HandlerScanner).singleton(), + handlerScanner: asFunction(({ handlerRegistry }) => { + return new HandlerScanner(handlerRegistry, container, { + serviceName: config.service?.serviceName || config.service?.name, + autoRegister: true, + }); + }).singleton(), }); registerCoreServices(container, config); @@ -177,7 +182,7 @@ export class ServiceContainerBuilder { globalCache, proxy: proxyManager, // Map proxyManager to proxy browser, - queue: queueManager, // Map queueManager to queue + queueManager, // Provide queueManager directly mongodb: mongoClient, // Map mongoClient to mongodb postgres: postgresClient, // Map postgresClient to postgres questdb: questdbClient, // Map questdbClient to questdb diff --git a/libs/core/di/src/registrations/service.registration.ts b/libs/core/di/src/registrations/service.registration.ts index ed2d588..4a12673 100644 --- a/libs/core/di/src/registrations/service.registration.ts +++ b/libs/core/di/src/registrations/service.registration.ts @@ -76,7 +76,7 @@ export function registerApplicationServices( defaultJobOptions: config.queue!.defaultJobOptions, }, enableScheduledJobs: config.queue!.enableScheduledJobs ?? true, - delayWorkerStart: config.queue!.delayWorkerStart ?? false, + delayWorkerStart: config.queue!.delayWorkerStart ?? true, // Changed to true so ServiceApplication can start workers autoDiscoverHandlers: true, }; return new SmartQueueManager(queueConfig, handlerRegistry, logger); diff --git a/libs/core/di/src/scanner/handler-scanner.ts b/libs/core/di/src/scanner/handler-scanner.ts index 40bc994..a15eb0f 100644 --- a/libs/core/di/src/scanner/handler-scanner.ts +++ b/libs/core/di/src/scanner/handler-scanner.ts @@ -3,7 +3,7 @@ * Discovers and registers handlers with the DI container */ -import { asClass, type AwilixContainer } from 'awilix'; +import { asClass, asFunction, type AwilixContainer } from 'awilix'; import { glob } from 'glob'; import type { HandlerConfiguration, @@ -157,7 +157,9 @@ export class HandlerScanner { // Register with DI container if auto-register is enabled if (this.options.autoRegister !== false) { this.container.register({ - [handlerName]: asClass(HandlerClass).singleton(), + [handlerName]: asFunction(({ serviceContainer }) => { + return new HandlerClass(serviceContainer); + }).singleton(), }); } diff --git a/libs/core/di/src/service-application.ts b/libs/core/di/src/service-application.ts index 74b841e..7657890 100644 --- a/libs/core/di/src/service-application.ts +++ b/libs/core/di/src/service-application.ts @@ -278,7 +278,11 @@ export class ServiceApplication { // Initialize handlers if enabled if (this.serviceConfig.enableHandlers && handlerInitializer) { this.logger.debug('Initializing handlers...'); - await handlerInitializer(this.serviceContainer); + // Pass the service container with the DI container attached + const containerWithDI = Object.assign({}, this.serviceContainer, { + _diContainer: this.container + }); + await handlerInitializer(containerWithDI); this.logger.info('Handlers initialized'); } @@ -335,6 +339,8 @@ export class ServiceApplication { this.logger.debug('Creating scheduled jobs from registered handlers...'); const handlerRegistry = this.container.resolve('handlerRegistry'); const allHandlers = handlerRegistry.getAllHandlersWithSchedule(); + + this.logger.info(`Found ${allHandlers.size} handlers with scheduled jobs: ${Array.from(allHandlers.keys()).join(', ')}`); let totalScheduledJobs = 0; for (const [handlerName, config] of allHandlers) { @@ -356,7 +362,16 @@ export class ServiceApplication { this.logger.error('Queue manager is not initialized, cannot create scheduled jobs'); continue; } - const queue = queueManager.getQueue(handlerName); + // Pass the handler registry explicitly when creating queues for scheduled jobs + this.logger.debug('Creating queue for scheduled jobs', { + handlerName, + hasHandlerRegistry: !!handlerRegistry, + registeredHandlers: handlerRegistry.getHandlerNames(), + }); + + const queue = queueManager.getQueue(handlerName, { + handlerRegistry: handlerRegistry + }); for (const scheduledJob of config.scheduledJobs) { // Include handler and operation info in job data @@ -375,6 +390,12 @@ export class ServiceApplication { }, }; + this.logger.debug('Adding scheduled job', { + handler: handlerName, + operation: scheduledJob.operation, + hasOperation: !!handlerRegistry.getOperation(handlerName, scheduledJob.operation), + }); + await queue.addScheduledJob( scheduledJob.operation, jobData, diff --git a/libs/core/handlers/src/base/BaseHandler.ts b/libs/core/handlers/src/base/BaseHandler.ts index b9f3273..7c2127f 100644 --- a/libs/core/handlers/src/base/BaseHandler.ts +++ b/libs/core/handlers/src/base/BaseHandler.ts @@ -1,4 +1,3 @@ -import type { Collection } from 'mongodb'; import { createNamespacedCache } from '@stock-bot/cache'; import { getLogger } from '@stock-bot/logger'; import type { @@ -11,6 +10,7 @@ import type { ServiceTypes, } from '@stock-bot/types'; import { fetch } from '@stock-bot/utils'; +import type { Collection } from 'mongodb'; // Handler registry is now injected, not imported import { createJobHandler } from '../utils/create-job-handler'; @@ -45,7 +45,8 @@ export abstract class BaseHandler implements IHandler { readonly logger: ServiceTypes['logger']; readonly cache: ServiceTypes['cache']; readonly globalCache: ServiceTypes['globalCache']; - readonly queue: ServiceTypes['queue']; + readonly queueManager: ServiceTypes['queueManager']; + readonly queue: ServiceTypes['queue']; // Specific queue for this handler readonly proxy: ServiceTypes['proxy']; readonly browser: ServiceTypes['browser']; readonly mongodb: ServiceTypes['mongodb']; @@ -55,21 +56,26 @@ export abstract class BaseHandler implements IHandler { private handlerName: string; constructor(services: IServiceContainer, handlerName?: string) { + // Read handler name from decorator first, then fallback to parameter or class name + const constructor = this.constructor as any; + this.handlerName = + constructor.__handlerName || handlerName || this.constructor.name.toLowerCase(); + // Flatten all services onto the handler instance this.logger = getLogger(this.constructor.name); this.cache = services.cache; this.globalCache = services.globalCache; - this.queue = services.queue; + this.queueManager = services.queueManager; this.proxy = services.proxy; this.browser = services.browser; this.mongodb = services.mongodb; this.postgres = services.postgres; this.questdb = services.questdb; - - // Read handler name from decorator first, then fallback to parameter or class name - const constructor = this.constructor as any; - this.handlerName = - constructor.__handlerName || handlerName || this.constructor.name.toLowerCase(); + + // Get the specific queue for this handler + if (this.queueManager) { + this.queue = this.queueManager.getQueue(this.handlerName); + } } /** @@ -117,16 +123,15 @@ export abstract class BaseHandler implements IHandler { options?: JobScheduleOptions ): Promise { if (!this.queue) { - throw new Error('Queue service is not available'); + throw new Error('Queue service is not available for this handler'); } - const queue = this.queue.getQueue(this.handlerName); const jobData = { handler: this.handlerName, operation, payload, }; - await queue.add(operation, jobData, options || {}); + await this.queue.add(operation, jobData, options || {}); } /** @@ -331,7 +336,9 @@ export abstract class BaseHandler implements IHandler { static extractMetadata(): HandlerMetadata | null { const constructor = this as any; const handlerName = constructor.__handlerName; - if (!handlerName) return null; + if (!handlerName){ + return null; + } const operations = constructor.__operations || []; const schedules = constructor.__schedules || []; diff --git a/libs/core/queue/src/queue-manager.ts b/libs/core/queue/src/queue-manager.ts index 264d823..104b95a 100644 --- a/libs/core/queue/src/queue-manager.ts +++ b/libs/core/queue/src/queue-manager.ts @@ -1,11 +1,18 @@ +import { Queue as BullQueue, type Job } from 'bullmq'; import { createCache } from '@stock-bot/cache'; import type { CacheProvider } from '@stock-bot/cache'; +import type { HandlerRegistry } from '@stock-bot/handler-registry'; +import { getLogger } from '@stock-bot/logger'; import { Queue, type QueueWorkerConfig } from './queue'; import { QueueRateLimiter } from './rate-limiter'; +import { getFullQueueName, parseQueueName } from './service-utils'; import type { GlobalStats, + JobData, + JobOptions, QueueManagerConfig, QueueOptions, + QueueRoute, QueueStats, RateLimitRule, } from './types'; @@ -22,8 +29,8 @@ interface Logger { } /** - * QueueManager provides unified queue and cache management - * Main entry point for all queue operations with getQueue() method + * QueueManager provides unified queue and cache management with service discovery + * Handles both local and cross-service queue operations */ export class QueueManager { private queues = new Map(); @@ -34,10 +41,29 @@ export class QueueManager { private shutdownPromise: Promise | null = null; private config: QueueManagerConfig; private readonly logger: Logger; + + // Service discovery features + private serviceName?: string; + private queueRoutes = new Map(); + private producerQueues = new Map(); // For cross-service sending + private handlerRegistry?: HandlerRegistry; - constructor(config: QueueManagerConfig, logger?: Logger) { + constructor(config: QueueManagerConfig, handlerRegistry?: HandlerRegistry, logger?: Logger) { + // Always use DB 0 for queues if service name is provided + if (config.serviceName) { + config = { + ...config, + redis: { + ...config.redis, + db: 0, // All queues in DB 0 for cross-service communication + }, + }; + } + this.config = config; - this.logger = logger || console; + this.serviceName = config.serviceName; + this.handlerRegistry = handlerRegistry; + this.logger = logger || getLogger('QueueManager'); this.redisConnection = getRedisConnection(config.redis); // Initialize rate limiter if rules are provided @@ -50,16 +76,58 @@ export class QueueManager { }); } + // Auto-discover routes if enabled and registry provided + if (config.serviceName && config.autoDiscoverHandlers !== false && handlerRegistry) { + this.discoverQueueRoutes(); + } + this.logger.info('QueueManager initialized', { redis: `${config.redis.host}:${config.redis.port}`, + service: this.serviceName, + discoveredRoutes: this.queueRoutes.size, + hasRegistry: !!handlerRegistry, }); } /** * Get or create a queue - unified method that handles both scenarios * This is the main method for accessing queues + * If serviceName is configured, automatically handles namespacing */ getQueue(queueName: string, options: QueueOptions = {}): Queue { + let fullQueueName = queueName; + let isOwnQueue = true; + + // Handle service namespacing if service name is configured + if (this.serviceName) { + const parsed = parseQueueName(queueName); + + if (parsed) { + // Already in service:handler format + fullQueueName = queueName; + isOwnQueue = parsed.service === this.serviceName; + } else { + // Just handler name, assume it's for current service + fullQueueName = getFullQueueName(this.serviceName, queueName); + isOwnQueue = true; + } + + // For cross-service queues, create without workers (producer-only) + if (!isOwnQueue) { + options = { + ...options, + workers: 0, // No workers for other services' queues + }; + } else { + // For own service queues, include handler registry + options = { + ...options, + handlerRegistry: this.handlerRegistry + }; + } + + queueName = fullQueueName; + } // Return existing queue if it exists if (this.queues.has(queueName)) { const existingQueue = this.queues.get(queueName); @@ -83,6 +151,7 @@ export class QueueManager { workers, concurrency, startWorker: workers > 0 && !this.config.delayWorkerStart, + handlerRegistry: options.handlerRegistry || this.handlerRegistry, }; const queue = new Queue( @@ -112,8 +181,13 @@ export class QueueManager { this.logger.info('Queue created with batch cache', { queueName, + originalQueueName: options.handlerRegistry ? 'has-handler-registry' : 'no-handler-registry', workers: workers, concurrency: concurrency, + handlerRegistryProvided: !!this.handlerRegistry, + willStartWorkers: workers > 0 && !this.config.delayWorkerStart, + isOwnQueue, + serviceName: this.serviceName, }); return queue; @@ -411,18 +485,42 @@ export class QueueManager { } let workersStarted = 0; - for (const queue of this.queues.values()) { + const queues = this.queues; + + this.logger.info(`Starting workers for ${queues.size} queues: ${Array.from(queues.keys()).join(', ')} (service: ${this.serviceName})`); + + for (const [queueName, queue] of queues) { + // If we have a service name, check if this queue belongs to us + if (this.serviceName) { + const parsed = parseQueueName(queueName); + // Skip if not our service's queue + if (parsed && parsed.service !== this.serviceName) { + this.logger.trace('Skipping workers for cross-service queue', { + queueName, + ownerService: parsed.service, + currentService: this.serviceName, + }); + continue; + } + } + const workerCount = this.config.defaultQueueOptions?.workers || 1; const concurrency = this.config.defaultQueueOptions?.concurrency || 1; if (workerCount > 0) { queue.startWorkersManually(workerCount, concurrency); workersStarted++; + this.logger.debug('Started workers for queue', { + queueName, + workers: workerCount, + concurrency, + }); } } - this.logger.info('All workers started', { - totalQueues: this.queues.size, + this.logger.info('Service workers started', { + service: this.serviceName || 'default', + totalQueues: queues.size, queuesWithWorkers: workersStarted, delayWorkerStart: this.config.delayWorkerStart, }); @@ -449,4 +547,169 @@ export class QueueManager { getConfig(): Readonly { return { ...this.config }; } + + /** + * Send a job to any queue (local or remote) + * This is the main method for cross-service communication + */ + async send( + targetQueue: string, + operation: string, + payload: unknown, + options: JobOptions = {} + ): Promise { + if (!this.serviceName) { + // If no service name, just use regular queue + const queue = this.getQueue(targetQueue); + return queue.add(operation, { handler: targetQueue, operation, payload }, options); + } + + // Resolve the target queue + const route = this.resolveQueueRoute(targetQueue); + if (!route) { + throw new Error(`Unknown queue: ${targetQueue}`); + } + + // Validate operation if we have metadata + if (route.operations && !route.operations.includes(operation)) { + this.logger.warn('Operation not found in handler metadata', { + handler: route.handler, + operation, + availableOperations: route.operations, + }); + } + + // Use a producer queue for cross-service sending + const producerQueue = this.getProducerQueue(route.fullName); + + const jobData: JobData = { + handler: route.handler, + operation, + payload, + }; + + this.logger.trace('Sending job to queue', { + targetQueue: route.fullName, + handler: route.handler, + operation, + fromService: this.serviceName, + toService: route.service, + }); + + return producerQueue.add(operation, jobData, options); + } + + /** + * Resolve a queue name to a route + */ + private resolveQueueRoute(queueName: string): QueueRoute | null { + // Check if it's a full queue name with service prefix + const parsed = parseQueueName(queueName); + if (parsed) { + // Try to find in discovered routes by handler name + const route = this.queueRoutes.get(parsed.handler); + if (route && route.service === parsed.service) { + return route; + } + // Create a route on the fly + return { + fullName: queueName, + service: parsed.service, + handler: parsed.handler, + db: 0, // All queues in DB 0 + }; + } + + // Check if it's just a handler name in our routes + const route = this.queueRoutes.get(queueName); + if (route) { + return route; + } + + // Try to find in handler registry + const ownerService = this.handlerRegistry?.getHandlerService(queueName); + if (ownerService) { + return { + fullName: getFullQueueName(ownerService, queueName), + service: ownerService, + handler: queueName, + db: 0, // All queues in DB 0 + }; + } + + return null; + } + + /** + * Get a producer queue for sending to other services + */ + private getProducerQueue(queueName: string): BullQueue { + if (!this.producerQueues.has(queueName)) { + const queue = new BullQueue(queueName, { + connection: this.redisConnection, + defaultJobOptions: this.config.defaultQueueOptions?.defaultJobOptions, + }); + this.producerQueues.set(queueName, queue); + } + return this.producerQueues.get(queueName)!; + } + + /** + * Discover all available queue routes from handler registry + */ + private discoverQueueRoutes(): void { + if (!this.handlerRegistry) { + this.logger.warn('No handler registry provided, skipping route discovery'); + return; + } + + try { + const handlers = this.handlerRegistry.getAllMetadata(); + for (const [handlerName, metadata] of handlers) { + // Get the service that registered this handler + const ownerService = metadata.service; + if (ownerService) { + const fullName = getFullQueueName(ownerService, handlerName); + + this.queueRoutes.set(handlerName, { + fullName, + service: ownerService, + handler: handlerName, + db: 0, // All queues in DB 0 + operations: metadata.operations.map((op: any) => op.name), + }); + + this.logger.trace('Discovered queue route', { + handler: handlerName, + service: ownerService, + operations: metadata.operations.length, + }); + } + } + + // Also discover handlers registered by the current service + if (this.serviceName) { + const myHandlers = this.handlerRegistry.getServiceHandlers(this.serviceName); + for (const metadata of myHandlers) { + const handlerName = metadata.name; + if (!this.queueRoutes.has(handlerName)) { + const fullName = getFullQueueName(this.serviceName, handlerName); + this.queueRoutes.set(handlerName, { + fullName, + service: this.serviceName, + handler: handlerName, + db: 0, // All queues in DB 0 + }); + } + } + } + + this.logger.info('Queue routes discovered', { + totalRoutes: this.queueRoutes.size, + routes: Array.from(this.queueRoutes.keys()), + }); + } catch (error) { + this.logger.error('Failed to discover queue routes', { error }); + } + } } diff --git a/libs/core/queue/src/queue.ts b/libs/core/queue/src/queue.ts index 0328310..ebdb572 100644 --- a/libs/core/queue/src/queue.ts +++ b/libs/core/queue/src/queue.ts @@ -45,6 +45,13 @@ export class Queue { this.redisConfig = redisConfig; this.logger = logger || console; this.handlerRegistry = config.handlerRegistry; + + this.logger.debug('Queue constructor called', { + queueName, + hasHandlerRegistry: !!config.handlerRegistry, + handlerRegistryType: config.handlerRegistry ? typeof config.handlerRegistry : 'undefined', + configKeys: Object.keys(config), + }); const connection = getRedisConnection(redisConfig); @@ -70,7 +77,20 @@ export class Queue { // Start workers if requested and not explicitly disabled if (config.workers && config.workers > 0 && config.startWorker !== false) { + this.logger.info('Starting workers for queue', { + queueName, + workers: config.workers, + concurrency: config.concurrency || 1, + hasHandlerRegistry: !!this.handlerRegistry, + }); this.startWorkers(config.workers, config.concurrency || 1); + } else { + this.logger.info('Not starting workers for queue', { + queueName, + workers: config.workers || 0, + startWorker: config.startWorker, + hasHandlerRegistry: !!this.handlerRegistry, + }); } this.logger.trace('Queue created', { @@ -288,6 +308,12 @@ export class Queue { maxStalledCount: 3, stalledInterval: 30000, }); + + this.logger.info(`Starting worker ${i + 1}/${workerCount} for queue`, { + queueName: this.queueName, + workerId: i, + concurrency, + }); // Setup worker event handlers worker.on('completed', job => { @@ -345,6 +371,14 @@ export class Queue { if (!this.handlerRegistry) { throw new Error('Handler registry not configured for worker processing'); } + + this.logger.debug('Looking up handler in registry', { + handler, + operation, + queueName: this.queueName, + registeredHandlers: this.handlerRegistry.getHandlerNames(), + }); + const jobHandler = this.handlerRegistry.getOperation(handler, operation); if (!jobHandler) { @@ -381,6 +415,13 @@ export class Queue { this.logger.warn('Workers already started for queue', { queueName: this.queueName }); return; } + + this.logger.info('Starting workers manually', { + queueName: this.queueName, + workerCount, + concurrency, + hasHandlerRegistry: !!this.handlerRegistry, + }); // Initialize queue events if not already done if (!this.queueEvents) { diff --git a/libs/core/queue/src/smart-queue-manager.ts b/libs/core/queue/src/smart-queue-manager.ts index d760706..e6404cc 100644 --- a/libs/core/queue/src/smart-queue-manager.ts +++ b/libs/core/queue/src/smart-queue-manager.ts @@ -1,414 +1,18 @@ -import { Queue as BullQueue, type Job } from 'bullmq'; -import type { HandlerRegistry } from '@stock-bot/handler-registry'; -import { getLogger, type Logger } from '@stock-bot/logger'; -import { Queue } from './queue'; +// SmartQueueManager has been merged into QueueManager +// This file is kept for backward compatibility + import { QueueManager } from './queue-manager'; -import { getFullQueueName, parseQueueName } from './service-utils'; -import type { JobData, JobOptions, QueueRoute, RedisConfig, SmartQueueConfig } from './types'; -import { getRedisConnection } from './utils'; +import type { SmartQueueConfig } from './types'; +import type { HandlerRegistry } from '@stock-bot/handler-registry'; +import type { Logger } from '@stock-bot/logger'; /** - * Smart Queue Manager with automatic service discovery and routing - * Handles cross-service communication seamlessly + * @deprecated Use QueueManager directly with serviceName config + * SmartQueueManager functionality has been merged into QueueManager */ export class SmartQueueManager extends QueueManager { - private serviceName: string; - private queueRoutes = new Map(); - private connections = new Map(); // Redis connections by DB - private producerQueues = new Map(); // For cross-service sending - private _logger: Logger; - private handlerRegistry?: HandlerRegistry; - constructor(config: SmartQueueConfig, handlerRegistry?: HandlerRegistry, logger?: Logger) { - // Always use DB 0 for queues (unified queue database) - const modifiedConfig = { - ...config, - redis: { - ...config.redis, - db: 0, // All queues in DB 0 - }, - }; - - super(modifiedConfig, logger); - - this.serviceName = config.serviceName; - this.handlerRegistry = handlerRegistry; - this._logger = logger || getLogger('SmartQueueManager'); - - // Auto-discover routes if enabled and registry provided - if (config.autoDiscoverHandlers !== false && handlerRegistry) { - this.discoverQueueRoutes(); - } - - this._logger.info('SmartQueueManager initialized', { - service: this.serviceName, - discoveredRoutes: this.queueRoutes.size, - hasRegistry: !!handlerRegistry, - }); + // SmartQueueConfig already has serviceName, just pass it to QueueManager + super(config, handlerRegistry, logger); } - - /** - * Discover all available queue routes from handler registry - */ - private discoverQueueRoutes(): void { - if (!this.handlerRegistry) { - this._logger.warn('No handler registry provided, skipping route discovery'); - return; - } - - try { - const handlers = this.handlerRegistry.getAllMetadata(); - for (const [handlerName, metadata] of handlers) { - // Get the service that registered this handler - const ownerService = metadata.service; - if (ownerService) { - const fullName = getFullQueueName(ownerService, handlerName); - - this.queueRoutes.set(handlerName, { - fullName, - service: ownerService, - handler: handlerName, - db: 0, // All queues in DB 0 - operations: metadata.operations.map((op: any) => op.name), - }); - - this._logger.trace('Discovered queue route', { - handler: handlerName, - service: ownerService, - operations: metadata.operations.length, - }); - } else { - this._logger.warn('Handler has no service ownership', { handlerName }); - } - } - - // Also discover handlers registered by the current service - const myHandlers = this.handlerRegistry.getServiceHandlers(this.serviceName); - for (const metadata of myHandlers) { - const handlerName = metadata.name; - if (!this.queueRoutes.has(handlerName)) { - const fullName = getFullQueueName(this.serviceName, handlerName); - this.queueRoutes.set(handlerName, { - fullName, - service: this.serviceName, - handler: handlerName, - db: 0, // All queues in DB 0 - }); - } - } - - this._logger.info('Queue routes discovered', { - totalRoutes: this.queueRoutes.size, - routes: Array.from(this.queueRoutes.values()).map(r => ({ - handler: r.handler, - service: r.service, - })), - }); - } catch (error) { - this._logger.error('Failed to discover queue routes', { error }); - } - } - - /** - * Get or create a Redis connection for a specific DB - */ - private getConnection(db: number): any { - if (!this.connections.has(db)) { - const redisConfig: RedisConfig = { - ...this.getRedisConfig(), - db, - }; - const connection = getRedisConnection(redisConfig); - this.connections.set(db, connection); - this._logger.debug('Created Redis connection', { db }); - } - return this.connections.get(db); - } - - /** - * Get a queue for the current service (for processing) - * Overrides parent to use namespaced queue names and ensure service-specific workers - */ - override getQueue(queueName: string, options = {}): Queue { - // Check if this is already a full queue name (service:handler format) - const parsed = parseQueueName(queueName); - - let fullQueueName: string; - let isOwnQueue: boolean; - - if (parsed) { - // Already in service:handler format - fullQueueName = queueName; - isOwnQueue = parsed.service === this.serviceName; - } else { - // Just handler name, assume it's for current service - fullQueueName = getFullQueueName(this.serviceName, queueName); - isOwnQueue = true; - } - - // For cross-service queues, create without workers (producer-only) - if (!isOwnQueue) { - return super.getQueue(fullQueueName, { - ...options, - workers: 0, // No workers for other services' queues - }); - } - - // For own service queues, use configured workers - return super.getQueue(fullQueueName, options); - } - - /** - * Send a job to any queue (local or remote) - * This is the main method for cross-service communication - */ - async send( - targetQueue: string, - operation: string, - payload: unknown, - options: JobOptions = {} - ): Promise { - // Resolve the target queue - const route = this.resolveQueueRoute(targetQueue); - if (!route) { - throw new Error(`Unknown queue: ${targetQueue}`); - } - - // Validate operation if we have metadata - if (route.operations && !route.operations.includes(operation)) { - this._logger.warn('Operation not found in handler metadata', { - queue: targetQueue, - operation, - available: route.operations, - }); - } - - // Get or create producer queue for the target - const producerQueue = this.getProducerQueue(route); - - // Create job data - const jobData: JobData = { - handler: route.handler, - operation, - payload, - }; - - // Send the job - const job = await producerQueue.add(operation, jobData, options); - - this._logger.debug('Job sent to queue', { - from: this.serviceName, - to: route.service, - queue: route.handler, - operation, - jobId: job.id, - }); - - return job; - } - - /** - * Alias for send() with more explicit name - */ - async sendTo( - targetService: string, - handler: string, - operation: string, - payload: unknown, - options: JobOptions = {} - ): Promise { - const fullQueueName = `${targetService}:${handler}`; - return this.send(fullQueueName, operation, payload, options); - } - - /** - * Resolve a queue name to a route - */ - private resolveQueueRoute(queueName: string): QueueRoute | null { - // Check if it's a full queue name with service prefix - const parsed = parseQueueName(queueName); - if (parsed) { - // Try to find in discovered routes by handler name - const route = this.queueRoutes.get(parsed.handler); - if (route && route.service === parsed.service) { - return route; - } - // Create a route on the fly - return { - fullName: queueName, - service: parsed.service, - handler: parsed.handler, - db: 0, // All queues in DB 0 - }; - } - - // Check if it's just a handler name in our routes - const route = this.queueRoutes.get(queueName); - if (route) { - return route; - } - - // Try to find in handler registry - const ownerService = this.handlerRegistry?.getHandlerService(queueName); - if (ownerService) { - return { - fullName: getFullQueueName(ownerService, queueName), - service: ownerService, - handler: queueName, - db: 0, // All queues in DB 0 - }; - } - - return null; - } - - /** - * Get or create a producer queue for cross-service communication - */ - private getProducerQueue(route: QueueRoute): BullQueue { - if (!this.producerQueues.has(route.fullName)) { - const connection = this.getConnection(route.db); - // Use the same queue name format as workers - const queue = new BullQueue(route.fullName, { - connection, - defaultJobOptions: this.getConfig().defaultQueueOptions?.defaultJobOptions || {}, - }); - this.producerQueues.set(route.fullName, queue); - } - return this.producerQueues.get(route.fullName)!; - } - - /** - * Get all queues (for monitoring purposes) - */ - getAllQueues(): Record { - const allQueues: Record = {}; - - // Get all worker queues using public API - const workerQueueNames = this.getQueueNames(); - for (const name of workerQueueNames) { - const queue = this.getQueue(name); - if (queue && typeof queue.getBullQueue === 'function') { - // Extract the underlying BullMQ queue using the public getter - // Use the simple handler name without service prefix for display - const parsed = parseQueueName(name); - const simpleName = parsed ? parsed.handler : name; - if (simpleName) { - allQueues[simpleName] = queue.getBullQueue(); - } - } - } - - // Add producer queues - for (const [name, queue] of this.producerQueues) { - // Use the simple handler name without service prefix for display - const parsed = parseQueueName(name); - const simpleName = parsed ? parsed.handler : name; - if (simpleName && !allQueues[simpleName]) { - allQueues[simpleName] = queue; - } - } - - // If no queues found, create from discovered routes - if (Object.keys(allQueues).length === 0) { - for (const [handlerName, route] of this.queueRoutes) { - const connection = this.getConnection(0); // Use unified queue DB - allQueues[handlerName] = new BullQueue(route.fullName, { - connection, - defaultJobOptions: this.getConfig().defaultQueueOptions?.defaultJobOptions || {}, - }); - } - } - - return allQueues; - } - - /** - * Get statistics for all queues across all services - */ - async getAllStats(): Promise> { - const stats: Record = {}; - - // Get stats for local queues - stats[this.serviceName] = await this.getGlobalStats(); - - // Get stats for other services if we have access - // This would require additional implementation - - return stats; - } - - /** - * Start workers for all queues belonging to this service - * Overrides parent to ensure only own queues get workers - */ - override startAllWorkers(): void { - if (!this.getConfig().delayWorkerStart) { - this._logger.info( - 'startAllWorkers() called but workers already started automatically (delayWorkerStart is false)' - ); - return; - } - - let workersStarted = 0; - const queues = this.getQueues(); - - for (const [queueName, queue] of queues) { - // Parse queue name to check if it belongs to this service - const parsed = parseQueueName(queueName); - - // Skip if not our service's queue - if (parsed && parsed.service !== this.serviceName) { - this._logger.trace('Skipping workers for cross-service queue', { - queueName, - ownerService: parsed.service, - currentService: this.serviceName, - }); - continue; - } - - const workerCount = this.getConfig().defaultQueueOptions?.workers || 1; - const concurrency = this.getConfig().defaultQueueOptions?.concurrency || 1; - - if (workerCount > 0) { - queue.startWorkersManually(workerCount, concurrency); - workersStarted++; - this._logger.debug('Started workers for queue', { - queueName, - workers: workerCount, - concurrency, - }); - } - } - - this._logger.info('Service workers started', { - service: this.serviceName, - totalQueues: queues.size, - queuesWithWorkers: workersStarted, - delayWorkerStart: this.getConfig().delayWorkerStart, - }); - } - - /** - * Graceful shutdown - */ - override async shutdown(): Promise { - // Close producer queues - for (const [name, queue] of this.producerQueues) { - await queue.close(); - this._logger.debug('Closed producer queue', { queue: name }); - } - - // Close additional connections - for (const [db, connection] of this.connections) { - if (db !== 0) { - // Don't close our main connection (DB 0 for queues) - connection.disconnect(); - this._logger.debug('Closed Redis connection', { db }); - } - } - - // Call parent shutdown - await super.shutdown(); - } -} +} \ No newline at end of file diff --git a/libs/core/queue/src/types.ts b/libs/core/queue/src/types.ts index 2712d74..df58707 100644 --- a/libs/core/queue/src/types.ts +++ b/libs/core/queue/src/types.ts @@ -63,6 +63,7 @@ export interface QueueOptions { enableDLQ?: boolean; enableRateLimit?: boolean; rateLimitRules?: RateLimitRule[]; // Queue-specific rate limit rules + handlerRegistry?: any; // HandlerRegistry from @stock-bot/handler-registry } export interface QueueManagerConfig { @@ -72,6 +73,8 @@ export interface QueueManagerConfig { globalRateLimit?: RateLimitConfig; rateLimitRules?: RateLimitRule[]; // Global rate limit rules delayWorkerStart?: boolean; // If true, workers won't start automatically + serviceName?: string; // For service discovery and namespacing + autoDiscoverHandlers?: boolean; // Auto-discover queue routes from handler registry } // Queue-specific stats that extend the base types diff --git a/libs/core/types/src/services.ts b/libs/core/types/src/services.ts index 280970f..dbda07c 100644 --- a/libs/core/types/src/services.ts +++ b/libs/core/types/src/services.ts @@ -302,7 +302,8 @@ export interface ServiceTypes { logger: Logger; cache?: CacheProvider; globalCache?: CacheProvider; - queue?: QueueManager; + queueManager?: QueueManager; + queue?: Queue; // Individual queue instance proxy?: ProxyManager; browser?: Browser; mongodb?: MongoDBClient; diff --git a/test-queue.ts b/test-queue.ts new file mode 100644 index 0000000..3a8fb1a --- /dev/null +++ b/test-queue.ts @@ -0,0 +1,22 @@ +import { Queue } from 'bullmq'; + +const queue = new Queue('{data-ingestion_ceo}', { + connection: { + host: 'localhost', + port: 6379, + db: 0, + }, +}); + +async function testJob() { + const job = await queue.add('fetchCompany', { + handler: 'ceo', + operation: 'fetchCompany', + payload: { symbol: 'AAPL' }, + }); + + console.log('Job added:', job.id); + process.exit(0); +} + +testJob().catch(console.error); \ No newline at end of file