diff --git a/apps/stock/data-ingestion/src/handlers/index.ts b/apps/stock/data-ingestion/src/handlers/index.ts index 024e928..e572141 100644 --- a/apps/stock/data-ingestion/src/handlers/index.ts +++ b/apps/stock/data-ingestion/src/handlers/index.ts @@ -27,6 +27,7 @@ export async function initializeAllHandlers(serviceContainer: IServiceContainer) pattern: '.handler.', exclude: ['test', 'spec'], dryRun: false, + serviceName: 'data-ingestion', }); logger.info('Handler auto-registration complete', { diff --git a/apps/stock/web-api/src/routes/monitoring.routes.ts b/apps/stock/web-api/src/routes/monitoring.routes.ts index e99adf3..89be314 100644 --- a/apps/stock/web-api/src/routes/monitoring.routes.ts +++ b/apps/stock/web-api/src/routes/monitoring.routes.ts @@ -234,17 +234,16 @@ export function createMonitoringRoutes(container: IServiceContainer) { const connection = { host: 'localhost', port: 6379, - db: 1, + db: 0, // All queues in DB 0 }; - const queue = new Queue(`{${queueName}}`, { connection }); + const queue = new Queue(queueName, { connection }); try { const counts = await queue.getJobCounts(); await queue.close(); return c.json({ queueName, - bullmqName: `{${queueName}}`, counts }); } catch (error: any) { diff --git a/apps/stock/web-api/src/services/monitoring.service.ts b/apps/stock/web-api/src/services/monitoring.service.ts index 87cc872..bf42e57 100644 --- a/apps/stock/web-api/src/services/monitoring.service.ts +++ b/apps/stock/web-api/src/services/monitoring.service.ts @@ -91,28 +91,42 @@ export class MonitoringService { }); // Always use the known queue names since web-api doesn't create worker queues - const queueNames = ['proxy', 'qm', 'ib', 'ceo', 'webshare', 'exchanges', 'symbols']; + const handlerMapping = { + 'proxy': 'data-ingestion', + 'qm': 'data-ingestion', + 'ib': 'data-ingestion', + 'ceo': 'data-ingestion', + 'webshare': 'data-ingestion', + 'exchanges': 'data-pipeline', + 'symbols': 'data-pipeline', + }; + + const queueNames = Object.keys(handlerMapping); this.logger.debug('Using known queue names', { count: queueNames.length, names: queueNames }); // Create BullMQ queues directly with the correct format - for (const queueName of queueNames) { + for (const handlerName of queueNames) { try { // Import BullMQ directly to create queue instances const { Queue: BullMQQueue } = await import('bullmq'); const connection = { host: 'localhost', port: 6379, - db: 1, // Queue DB + db: 0, // All queues now in DB 0 }; - // Create BullMQ queue with the correct format - const bullQueue = new BullMQQueue(`{${queueName}}`, { connection }); + // Get the service that owns this handler + const serviceName = handlerMapping[handlerName as keyof typeof handlerMapping]; + + // Create BullMQ queue with the new naming format {service_handler} + const fullQueueName = `{${serviceName}_${handlerName}}`; + const bullQueue = new BullMQQueue(fullQueueName, { connection }); // Get stats directly from BullMQ - const queueStats = await this.getQueueStatsForBullQueue(bullQueue, queueName); + const queueStats = await this.getQueueStatsForBullQueue(bullQueue, handlerName); stats.push({ - name: queueName, + name: handlerName, connected: true, jobs: queueStats, workers: { @@ -124,9 +138,9 @@ export class MonitoringService { // Close the queue connection after getting stats await bullQueue.close(); } catch (error) { - this.logger.warn(`Failed to get stats for queue ${queueName}`, { error }); + this.logger.warn(`Failed to get stats for queue ${handlerName}`, { error }); stats.push({ - name: queueName, + name: handlerName, connected: false, jobs: { waiting: 0, @@ -535,6 +549,20 @@ export class MonitoringService { for (const service of serviceEndpoints) { try { + // For the current service (web-api), add it directly without health check + if (service.name === 'web-api') { + services.push({ + name: 'web-api', + version: '1.0.0', + status: 'running', + port: process.env.PORT ? parseInt(process.env.PORT) : 2003, + uptime: Date.now() - this.startTime, + lastCheck: new Date().toISOString(), + healthy: true, + }); + continue; + } + const startTime = Date.now(); const response = await fetch(`http://localhost:${service.port}${service.path}`, { signal: AbortSignal.timeout(5000), // 5 second timeout @@ -578,17 +606,6 @@ export class MonitoringService { } } - // Add current service (web-api) - services.push({ - name: 'web-api', - version: '1.0.0', - status: 'running', - port: process.env.PORT ? parseInt(process.env.PORT) : 2003, - uptime: Date.now() - this.startTime, - lastCheck: new Date().toISOString(), - healthy: true, - }); - return services; } @@ -597,7 +614,9 @@ export class MonitoringService { */ async getProxyStats(): Promise { try { - if (!this.container.proxy) { + // Since web-api doesn't have proxy manager, query the cache directly + // The proxy manager stores data with cache:proxy: prefix + if (!this.container.cache) { return { enabled: false, totalProxies: 0, @@ -606,10 +625,55 @@ export class MonitoringService { }; } - const proxyManager = this.container.proxy as any; - - // Check if proxy manager is ready - if (!proxyManager.isReady || !proxyManager.isReady()) { + try { + // Get proxy data from cache using getRaw method + // The proxy manager uses cache:proxy: prefix, but web-api cache uses cache:api: + const cacheProvider = this.container.cache; + + if (cacheProvider.getRaw) { + // Use getRaw to access data with different cache prefix + // The proxy manager now uses a global cache:proxy: prefix + this.logger.debug('Attempting to fetch proxy data from cache'); + + const [cachedProxies, lastUpdateStr] = await Promise.all([ + cacheProvider.getRaw('cache:proxy:active'), + cacheProvider.getRaw('cache:proxy:last-update') + ]); + + this.logger.debug('Proxy cache data retrieved', { + hasProxies: !!cachedProxies, + isArray: Array.isArray(cachedProxies), + proxyCount: cachedProxies ? cachedProxies.length : 0, + lastUpdate: lastUpdateStr + }); + + if (cachedProxies && Array.isArray(cachedProxies)) { + const workingCount = cachedProxies.filter((p: any) => p.isWorking !== false).length; + const failedCount = cachedProxies.filter((p: any) => p.isWorking === false).length; + + return { + enabled: true, + totalProxies: cachedProxies.length, + workingProxies: workingCount, + failedProxies: failedCount, + lastUpdate: lastUpdateStr || undefined, + }; + } + } else { + this.logger.debug('Cache provider does not support getRaw method'); + } + + // No cached data found - proxies might not be initialized yet + return { + enabled: true, + totalProxies: 0, + workingProxies: 0, + failedProxies: 0, + }; + } catch (cacheError) { + this.logger.debug('Could not retrieve proxy data from cache', { error: cacheError }); + + // Return basic stats if cache query fails return { enabled: true, totalProxies: 0, @@ -617,18 +681,6 @@ export class MonitoringService { failedProxies: 0, }; } - - const stats = proxyManager.getStats ? proxyManager.getStats() : null; - const lastFetchTime = proxyManager.getLastFetchTime ? proxyManager.getLastFetchTime() : null; - - return { - enabled: true, - totalProxies: stats?.total || 0, - workingProxies: stats?.working || 0, - failedProxies: stats?.failed || 0, - lastUpdate: stats?.lastUpdate ? new Date(stats.lastUpdate).toISOString() : undefined, - lastFetchTime: lastFetchTime ? new Date(lastFetchTime).toISOString() : undefined, - }; } catch (error) { this.logger.error('Failed to get proxy stats', { error }); return null; diff --git a/docker-compose.yml b/docker-compose.yml index c89a242..a7f3eea 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -213,8 +213,8 @@ services: # Dragonfly - Redis replacement for caching and events - REDIS_HOST=dragonfly - REDIS_PORT=6379 - REDIS_PASSWORD= - - REDIS_DB=1 - - REDIS_URL=redis://dragonfly:6379 + - REDIS_DB=0 + - REDIS_URL=redis://dragonfly:6379/0 depends_on: - dragonfly restart: unless-stopped diff --git a/libs/core/cache/src/namespaced-cache.ts b/libs/core/cache/src/namespaced-cache.ts index 2ecf832..c42ed63 100644 --- a/libs/core/cache/src/namespaced-cache.ts +++ b/libs/core/cache/src/namespaced-cache.ts @@ -86,4 +86,16 @@ export class NamespacedCache implements CacheProvider { getFullPrefix(): string { return this.prefix; } + + /** + * Get a value using a raw Redis key (bypassing the namespace prefix) + * Delegates to the underlying cache's getRaw method if available + */ + async getRaw(key: string): Promise { + if (this.cache.getRaw) { + return this.cache.getRaw(key); + } + // Fallback for caches that don't implement getRaw + return null; + } } \ No newline at end of file diff --git a/libs/core/cache/src/redis-cache.ts b/libs/core/cache/src/redis-cache.ts index 3f932e2..51b811d 100644 --- a/libs/core/cache/src/redis-cache.ts +++ b/libs/core/cache/src/redis-cache.ts @@ -291,6 +291,29 @@ export class RedisCache implements CacheProvider { ); } + /** + * Get a value using a raw Redis key (bypassing the keyPrefix) + * Useful for accessing cache data from other services with different prefixes + */ + async getRaw(key: string): Promise { + return this.safeExecute( + async () => { + // Use the key directly without adding our prefix + const value = await this.redis.get(key); + if (!value) { + this.updateStats(false); + return null; + } + this.updateStats(true); + const parsed = JSON.parse(value); + this.logger.debug('Cache raw get hit', { key }); + return parsed; + }, + null, + 'getRaw' + ); + } + async keys(pattern: string): Promise { return this.safeExecute( async () => { diff --git a/libs/core/cache/src/types.ts b/libs/core/cache/src/types.ts index 6678f94..1e84061 100644 --- a/libs/core/cache/src/types.ts +++ b/libs/core/cache/src/types.ts @@ -76,6 +76,12 @@ export interface CacheProvider { * Atomically update field with transformation function */ updateField?(key: string, updater: (current: T | null) => T, ttl?: number): Promise; + + /** + * Get a value using a raw Redis key (bypassing the keyPrefix) + * Useful for accessing cache data from other services with different prefixes + */ + getRaw?(key: string): Promise; } export interface CacheOptions { diff --git a/libs/core/di/src/registrations/service.registration.ts b/libs/core/di/src/registrations/service.registration.ts index 9941e03..6fe294f 100644 --- a/libs/core/di/src/registrations/service.registration.ts +++ b/libs/core/di/src/registrations/service.registration.ts @@ -30,10 +30,22 @@ export function registerApplicationServices( // Proxy Manager if (config.proxy && config.redis.enabled) { container.register({ - proxyManager: asFunction(({ cache, logger }) => { - if (!cache) {return null;} + proxyManager: asFunction(({ logger }) => { + // Create a separate cache instance for proxy with global prefix + const { createCache } = require('@stock-bot/cache'); + const proxyCache = createCache({ + redisConfig: { + host: config.redis.host, + port: config.redis.port, + password: config.redis.password, + db: 1, // Use cache DB (usually DB 1) + }, + keyPrefix: 'cache:proxy:', + ttl: 86400, // 24 hours default + enableMetrics: true, + logger, + }); - const proxyCache = new NamespacedCache(cache, 'proxy'); const proxyManager = new ProxyManager(proxyCache, config.proxy, logger); // Note: Initialization will be handled by the lifecycle manager diff --git a/libs/core/di/src/service-application.ts b/libs/core/di/src/service-application.ts index cdf165e..289c92d 100644 --- a/libs/core/di/src/service-application.ts +++ b/libs/core/di/src/service-application.ts @@ -338,6 +338,18 @@ export class ServiceApplication { let totalScheduledJobs = 0; for (const [handlerName, config] of allHandlers) { if (config.scheduledJobs && config.scheduledJobs.length > 0) { + // Check if this handler belongs to the current service + const ownerService = handlerRegistry.getHandlerService(handlerName); + + if (ownerService !== this.config.service.serviceName) { + this.logger.trace('Skipping scheduled jobs for handler from different service', { + handler: handlerName, + ownerService, + currentService: this.config.service.serviceName, + }); + continue; + } + const queueManager = this.container.resolve('queueManager'); if (!queueManager) { this.logger.error('Queue manager is not initialized, cannot create scheduled jobs'); diff --git a/libs/core/handlers/src/base/BaseHandler.ts b/libs/core/handlers/src/base/BaseHandler.ts index 75eedd6..015ef92 100644 --- a/libs/core/handlers/src/base/BaseHandler.ts +++ b/libs/core/handlers/src/base/BaseHandler.ts @@ -159,7 +159,7 @@ export abstract class BaseHandler implements IHandler { /** * Create a sub-namespaced cache for specific operations - * Example: handler 'webshare' creates namespace 'webshare:api' -> keys will be 'cache:webshare:api:*' + * Example: handler 'webshare' creates namespace 'webshare:api' -> keys will be 'cache:data-ingestion:webshare:api:*' */ protected createNamespacedCache(subNamespace: string) { return createNamespacedCache(this.cache, `${this.handlerName}:${subNamespace}`); @@ -172,7 +172,8 @@ export abstract class BaseHandler implements IHandler { if (!this.cache) { return; } - return this.cache.set(`cache:${this.handlerName}:${key}`, value, ttl); + // Don't add 'cache:' prefix since the cache already has its own prefix + return this.cache.set(`${this.handlerName}:${key}`, value, ttl); } /** @@ -182,7 +183,8 @@ export abstract class BaseHandler implements IHandler { if (!this.cache) { return null; } - return this.cache.get(`cache:${this.handlerName}:${key}`); + // Don't add 'cache:' prefix since the cache already has its own prefix + return this.cache.get(`${this.handlerName}:${key}`); } /** @@ -192,7 +194,8 @@ export abstract class BaseHandler implements IHandler { if (!this.cache) { return; } - return this.cache.del(`cache:${this.handlerName}:${key}`); + // Don't add 'cache:' prefix since the cache already has its own prefix + return this.cache.del(`${this.handlerName}:${key}`); } /** @@ -294,7 +297,7 @@ export abstract class BaseHandler implements IHandler { * Register this handler using decorator metadata * Automatically reads @Handler, @Operation, and @QueueSchedule decorators */ - register(): void { + register(serviceName?: string): void { const constructor = this.constructor as any; const handlerName = constructor.__handlerName || this.handlerName; const operations = constructor.__operations || []; @@ -333,9 +336,10 @@ export abstract class BaseHandler implements IHandler { scheduledJobs, }; - handlerRegistry.registerWithSchedule(config); + handlerRegistry.registerWithSchedule(config, serviceName); this.logger.info('Handler registered using decorator metadata', { handlerName, + service: serviceName, operations: operations.map((op: any) => ({ name: op.name, method: op.method })), scheduledJobs: scheduledJobs.map((job: any) => ({ operation: job.operation, diff --git a/libs/core/handlers/src/registry/auto-register.ts b/libs/core/handlers/src/registry/auto-register.ts index fa684f0..7337875 100644 --- a/libs/core/handlers/src/registry/auto-register.ts +++ b/libs/core/handlers/src/registry/auto-register.ts @@ -73,9 +73,10 @@ export async function autoRegisterHandlers( pattern?: string; exclude?: string[]; dryRun?: boolean; + serviceName?: string; } = {} ): Promise<{ registered: string[]; failed: string[] }> { - const { pattern = '.handler.', exclude = [], dryRun = false } = options; + const { pattern = '.handler.', exclude = [], dryRun = false, serviceName } = options; const registered: string[] = []; const failed: string[] = []; @@ -124,10 +125,12 @@ export async function autoRegisterHandlers( // Create instance and register const handler = new HandlerClass(services); - handler.register(); + handler.register(serviceName); + + // No need to set service ownership separately - it's done in register() registered.push(handlerName); - logger.info(`Successfully registered handler: ${handlerName}`); + logger.info(`Successfully registered handler: ${handlerName}`, { service: serviceName }); } } } catch (error) { diff --git a/libs/core/handlers/src/registry/handler-registry.ts b/libs/core/handlers/src/registry/handler-registry.ts index 43bfbb2..19401b5 100644 --- a/libs/core/handlers/src/registry/handler-registry.ts +++ b/libs/core/handlers/src/registry/handler-registry.ts @@ -15,25 +15,33 @@ class HandlerRegistry { private readonly logger = getLogger('handler-registry'); private handlers = new Map(); private handlerSchedules = new Map(); + private handlerServices = new Map(); // Maps handler to service name /** * Register a handler with its operations (simple config) */ - register(handlerName: string, config: HandlerConfig): void { + register(handlerName: string, config: HandlerConfig, serviceName?: string): void { this.logger.info(`Registering handler: ${handlerName}`, { operations: Object.keys(config), + service: serviceName, }); this.handlers.set(handlerName, config); + + // Track service ownership if provided + if (serviceName) { + this.handlerServices.set(handlerName, serviceName); + } } /** * Register a handler with scheduled jobs (enhanced config) */ - registerWithSchedule(config: HandlerConfigWithSchedule): void { + registerWithSchedule(config: HandlerConfigWithSchedule, serviceName?: string): void { this.logger.info(`Registering handler with schedule: ${config.name}`, { operations: Object.keys(config.operations), scheduledJobs: config.scheduledJobs?.length || 0, + service: serviceName, }); this.handlers.set(config.name, config.operations); @@ -41,6 +49,11 @@ class HandlerRegistry { if (config.scheduledJobs && config.scheduledJobs.length > 0) { this.handlerSchedules.set(config.name, config.scheduledJobs); } + + // Track service ownership if provided + if (serviceName) { + this.handlerServices.set(config.name, serviceName); + } } /** @@ -130,12 +143,40 @@ class HandlerRegistry { }; } + /** + * Get the service that owns a handler + */ + getHandlerService(handlerName: string): string | undefined { + return this.handlerServices.get(handlerName); + } + + /** + * Get all handlers for a specific service + */ + getServiceHandlers(serviceName: string): string[] { + const handlers: string[] = []; + for (const [handler, service] of this.handlerServices) { + if (service === serviceName) { + handlers.push(handler); + } + } + return handlers; + } + + /** + * Set service ownership for a handler (used during auto-discovery) + */ + setHandlerService(handlerName: string, serviceName: string): void { + this.handlerServices.set(handlerName, serviceName); + } + /** * Clear all registrations (useful for testing) */ clear(): void { this.handlers.clear(); this.handlerSchedules.clear(); + this.handlerServices.clear(); } } diff --git a/libs/core/queue/src/index.ts b/libs/core/queue/src/index.ts index 5504861..2f986af 100644 --- a/libs/core/queue/src/index.ts +++ b/libs/core/queue/src/index.ts @@ -3,13 +3,13 @@ export { Queue } from './queue'; export { QueueManager } from './queue-manager'; export { SmartQueueManager } from './smart-queue-manager'; export { ServiceCache, createServiceCache } from './service-cache'; +// Service utilities export { - SERVICE_REGISTRY, - getServiceConfig, - findServiceForHandler, + normalizeServiceName, + generateCachePrefix, getFullQueueName, parseQueueName -} from './service-registry'; +} from './service-utils'; // Re-export handler registry and utilities from handlers package export { handlerRegistry, createJobHandler } from '@stock-bot/handlers'; @@ -71,5 +71,3 @@ export type { } from './types'; -// Re-export service registry types -export type { ServiceConfig } from './service-registry'; diff --git a/libs/core/queue/src/queue-manager.ts b/libs/core/queue/src/queue-manager.ts index 7f1acee..ad18385 100644 --- a/libs/core/queue/src/queue-manager.ts +++ b/libs/core/queue/src/queue-manager.ts @@ -8,6 +8,7 @@ import type { QueueOptions, QueueStats, RateLimitRule, + RedisConfig, } from './types'; import { getRedisConnection } from './utils'; @@ -173,6 +174,14 @@ export class QueueManager { this.logger.trace('Batch cache initialized synchronously for queue', { queueName }); } + /** + * Get the queues map (for subclasses) + */ + protected getQueues(): Map { + return this.queues; + } + + /** * Get statistics for all queues */ diff --git a/libs/core/queue/src/queue.ts b/libs/core/queue/src/queue.ts index 3fc4572..e8abd24 100644 --- a/libs/core/queue/src/queue.ts +++ b/libs/core/queue/src/queue.ts @@ -45,7 +45,7 @@ export class Queue { const connection = getRedisConnection(redisConfig); // Initialize BullMQ queue - this.bullQueue = new BullQueue(`{${queueName}}`, { + this.bullQueue = new BullQueue(queueName, { connection, defaultJobOptions: { removeOnComplete: 10, @@ -61,7 +61,7 @@ export class Queue { // Initialize queue events if workers will be used if (config.workers && config.workers > 0) { - this.queueEvents = new QueueEvents(`{${queueName}}`, { connection }); + this.queueEvents = new QueueEvents(queueName, { connection }); } // Start workers if requested and not explicitly disabled @@ -278,7 +278,7 @@ export class Queue { const connection = getRedisConnection(this.redisConfig); for (let i = 0; i < workerCount; i++) { - const worker = new Worker(`{${this.queueName}}`, this.processJob.bind(this), { + const worker = new Worker(this.queueName, this.processJob.bind(this), { connection, concurrency, maxStalledCount: 3, @@ -378,7 +378,7 @@ export class Queue { // Initialize queue events if not already done if (!this.queueEvents) { const connection = getRedisConnection(this.redisConfig); - this.queueEvents = new QueueEvents(`{${this.queueName}}`, { connection }); + this.queueEvents = new QueueEvents(this.queueName, { connection }); } this.startWorkers(workerCount, concurrency); diff --git a/libs/core/queue/src/service-cache.ts b/libs/core/queue/src/service-cache.ts index fa04fde..4ffb329 100644 --- a/libs/core/queue/src/service-cache.ts +++ b/libs/core/queue/src/service-cache.ts @@ -1,6 +1,6 @@ import { createCache, type CacheProvider, type CacheStats } from '@stock-bot/cache'; import type { RedisConfig } from './types'; -import { getServiceConfig } from './service-registry'; +import { generateCachePrefix } from './service-utils'; /** * Service-aware cache that uses the service's Redis DB @@ -16,24 +16,18 @@ export class ServiceCache implements CacheProvider { isGlobalCache: boolean = false, logger?: any ) { - // Get service configuration - const serviceConfig = getServiceConfig(serviceName); - if (!serviceConfig && !isGlobalCache) { - throw new Error(`Unknown service: ${serviceName}`); - } - // Determine Redis DB and prefix let db: number; let prefix: string; if (isGlobalCache) { - // Global cache uses db:0 - db = 0; + // Global cache uses db:1 + db = 1; prefix = 'stock-bot:shared'; } else { - // Service cache uses service's DB - db = serviceConfig!.db; - prefix = serviceConfig!.cachePrefix; + // Service cache also uses db:1 with service-specific prefix + db = 1; + prefix = generateCachePrefix(serviceName); } // Create underlying cache with correct DB @@ -148,6 +142,18 @@ export class ServiceCache implements CacheProvider { return this.cache.set(key, updated, ttl); } + /** + * Get a value using a raw Redis key (bypassing the keyPrefix) + * Delegates to the underlying cache's getRaw method if available + */ + async getRaw(key: string): Promise { + if (this.cache.getRaw) { + return this.cache.getRaw(key); + } + // Fallback: if underlying cache doesn't support getRaw, return null + return null; + } + /** * Get the actual Redis key with prefix */ diff --git a/libs/core/queue/src/service-registry.ts b/libs/core/queue/src/service-registry.deprecated.ts similarity index 50% rename from libs/core/queue/src/service-registry.ts rename to libs/core/queue/src/service-registry.deprecated.ts index 4891392..176d624 100644 --- a/libs/core/queue/src/service-registry.ts +++ b/libs/core/queue/src/service-registry.deprecated.ts @@ -1,67 +1,54 @@ /** * Service Registry Configuration * Maps services to their Redis databases and configurations + * + * @deprecated This static service registry has been replaced by runtime discovery + * using the handler registry. Service ownership is now tracked when handlers are + * registered, eliminating the need for static configuration. + * + * Migration: + * - Service names are auto-discovered from handler registration + * - Cache prefixes are generated using generateCachePrefix() + * - Queue names use getFullQueueName() from service-utils + * - Handler ownership is tracked by handlerRegistry.getHandlerService() */ export interface ServiceConfig { - /** Redis database number for this service (used for both queues and cache) */ - db: number; - /** Prefix for queue keys (e.g., 'bull:di') */ - queuePrefix: string; - /** Prefix for cache keys (e.g., 'cache:di') */ + /** Prefix for cache keys (e.g., 'cache:data-ingestion') */ cachePrefix: string; - /** Whether this service only produces jobs (doesn't process them) */ - producerOnly?: boolean; /** List of handlers this service owns (auto-discovered if not provided) */ handlers?: string[]; } /** * Central registry of all services and their configurations - * Each service gets one Redis DB for both queues and cache * * Database assignments: - * - db:0 = Global shared cache - * - db:1 = data-ingestion (queues + cache) - * - db:2 = data-pipeline (queues + cache) - * - db:3 = web-api (cache only, producer-only for queues) + * - db:0 = All queues (unified queue database) + * - db:1 = Global shared cache + service-specific caches */ export const SERVICE_REGISTRY: Record = { 'data-ingestion': { - db: 1, - queuePrefix: 'bull:di', - cachePrefix: 'cache:di', + cachePrefix: 'cache:data-ingestion', handlers: ['ceo', 'qm', 'webshare', 'ib', 'proxy'], }, 'data-pipeline': { - db: 2, - queuePrefix: 'bull:dp', - cachePrefix: 'cache:dp', + cachePrefix: 'cache:data-pipeline', handlers: ['exchanges', 'symbols'], }, 'web-api': { - db: 3, - queuePrefix: 'bull:api', // Not used since producer-only - cachePrefix: 'cache:api', - producerOnly: true, + cachePrefix: 'cache:web-api', }, // Add aliases for services with different naming conventions 'webApi': { - db: 3, - queuePrefix: 'bull:api', - cachePrefix: 'cache:api', - producerOnly: true, + cachePrefix: 'cache:web-api', }, 'dataIngestion': { - db: 1, - queuePrefix: 'bull:di', - cachePrefix: 'cache:di', + cachePrefix: 'cache:data-ingestion', handlers: ['ceo', 'qm', 'webshare', 'ib', 'proxy'], }, 'dataPipeline': { - db: 2, - queuePrefix: 'bull:dp', - cachePrefix: 'cache:dp', + cachePrefix: 'cache:data-pipeline', handlers: ['exchanges', 'symbols'], }, }; @@ -86,30 +73,26 @@ export function findServiceForHandler(handlerName: string): string | undefined { } /** - * Get full queue name - just the handler name since each service has its own Redis DB + * Get full queue name with service namespace */ export function getFullQueueName(serviceName: string, handlerName: string): string { - // Just return the handler name since DB isolation provides namespace separation - return handlerName; + // Use {service_handler} format for Dragonfly optimization and BullMQ compatibility + return `{${serviceName}_${handlerName}}`; } /** * Parse a full queue name into service and handler - * Since queue names are just handler names now, we need to find the service from the handler */ export function parseQueueName(fullQueueName: string): { service: string; handler: string } | null { - // Queue name is just the handler name now - const handlerName = fullQueueName; + // Match pattern {service_handler} + const match = fullQueueName.match(/^\{([^_]+)_([^}]+)\}$/); - // Find which service owns this handler - const serviceName = findServiceForHandler(handlerName); - - if (!serviceName) { + if (!match || !match[1] || !match[2]) { return null; } return { - service: serviceName, - handler: handlerName, + service: match[1], + handler: match[2], }; } \ No newline at end of file diff --git a/libs/core/queue/src/service-utils.ts b/libs/core/queue/src/service-utils.ts new file mode 100644 index 0000000..d6b3a5e --- /dev/null +++ b/libs/core/queue/src/service-utils.ts @@ -0,0 +1,53 @@ +/** + * Service utilities for name normalization and auto-discovery + */ + +/** + * Normalize service name to kebab-case format + * Examples: + * - webApi -> web-api + * - dataIngestion -> data-ingestion + * - data-pipeline -> data-pipeline (unchanged) + */ +export function normalizeServiceName(serviceName: string): string { + // Handle camelCase to kebab-case conversion + const kebabCase = serviceName + .replace(/([a-z])([A-Z])/g, '$1-$2') + .toLowerCase(); + + return kebabCase; +} + +/** + * Generate cache prefix for a service + */ +export function generateCachePrefix(serviceName: string): string { + const normalized = normalizeServiceName(serviceName); + return `cache:${normalized}`; +} + +/** + * Generate full queue name with service namespace + */ +export function getFullQueueName(serviceName: string, handlerName: string): string { + const normalized = normalizeServiceName(serviceName); + // Use {service_handler} format for Dragonfly optimization and BullMQ compatibility + return `{${normalized}_${handlerName}}`; +} + +/** + * Parse a full queue name into service and handler + */ +export function parseQueueName(fullQueueName: string): { service: string; handler: string } | null { + // Match pattern {service_handler} + const match = fullQueueName.match(/^\{([^_]+)_([^}]+)\}$/); + + if (!match || !match[1] || !match[2]) { + return null; + } + + return { + service: match[1], + handler: match[2], + }; +} \ No newline at end of file diff --git a/libs/core/queue/src/smart-queue-manager.ts b/libs/core/queue/src/smart-queue-manager.ts index 052bb67..29cd599 100644 --- a/libs/core/queue/src/smart-queue-manager.ts +++ b/libs/core/queue/src/smart-queue-manager.ts @@ -10,14 +10,7 @@ import type { JobOptions, RedisConfig } from './types'; -import { - SERVICE_REGISTRY, - getServiceConfig, - findServiceForHandler, - getFullQueueName, - parseQueueName, - type ServiceConfig -} from './service-registry'; +import { getFullQueueName, parseQueueName } from './service-utils'; import { getRedisConnection } from './utils'; /** @@ -26,32 +19,24 @@ import { getRedisConnection } from './utils'; */ export class SmartQueueManager extends QueueManager { private serviceName: string; - private serviceConfig: ServiceConfig; private queueRoutes = new Map(); private connections = new Map(); // Redis connections by DB private producerQueues = new Map(); // For cross-service sending private _logger: Logger; constructor(config: SmartQueueConfig, logger?: Logger) { - // Get service config - const serviceConfig = getServiceConfig(config.serviceName); - if (!serviceConfig) { - throw new Error(`Unknown service: ${config.serviceName}`); - } - - // Update Redis config to use service's DB + // Always use DB 0 for queues (unified queue database) const modifiedConfig = { ...config, redis: { ...config.redis, - db: serviceConfig.db, + db: 0, // All queues in DB 0 }, }; super(modifiedConfig, logger); this.serviceName = config.serviceName; - this.serviceConfig = serviceConfig; this._logger = logger || getLogger('SmartQueueManager'); // Auto-discover routes if enabled @@ -61,9 +46,7 @@ export class SmartQueueManager extends QueueManager { this._logger.info('SmartQueueManager initialized', { service: this.serviceName, - db: serviceConfig.db, - handlers: serviceConfig.handlers, - producerOnly: serviceConfig.producerOnly, + discoveredRoutes: this.queueRoutes.size, }); } @@ -71,51 +54,56 @@ export class SmartQueueManager extends QueueManager { * Discover all available queue routes from handler registry */ private discoverQueueRoutes(): void { - // Discover from handler registry if available try { const handlers = handlerRegistry.getAllHandlers(); for (const [handlerName, handlerConfig] of handlers) { - // Find which service owns this handler - const ownerService = findServiceForHandler(handlerName); + // Get the service that registered this handler + const ownerService = handlerRegistry.getHandlerService(handlerName); if (ownerService) { - const ownerConfig = getServiceConfig(ownerService)!; const fullName = getFullQueueName(ownerService, handlerName); this.queueRoutes.set(handlerName, { fullName, service: ownerService, handler: handlerName, - db: ownerConfig.db, + db: 0, // All queues in DB 0 operations: Object.keys(handlerConfig.operations || {}), }); this._logger.trace('Discovered queue route', { handler: handlerName, service: ownerService, - db: ownerConfig.db, + operations: Object.keys(handlerConfig.operations || {}).length, + }); + } else { + this._logger.warn('Handler has no service ownership', { handlerName }); + } + } + + // Also discover handlers registered by the current service + const myHandlers = handlerRegistry.getServiceHandlers(this.serviceName); + for (const handlerName of myHandlers) { + if (!this.queueRoutes.has(handlerName)) { + const fullName = getFullQueueName(this.serviceName, handlerName); + this.queueRoutes.set(handlerName, { + fullName, + service: this.serviceName, + handler: handlerName, + db: 0, // All queues in DB 0 }); } } - } catch (error) { - this._logger.warn('Handler registry not available, using static configuration', { error }); - } - // Also add routes from static configuration - Object.entries(SERVICE_REGISTRY).forEach(([serviceName, config]) => { - if (config.handlers) { - config.handlers.forEach(handlerName => { - if (!this.queueRoutes.has(handlerName)) { - const fullName = getFullQueueName(serviceName, handlerName); - this.queueRoutes.set(handlerName, { - fullName, - service: serviceName, - handler: handlerName, - db: config.db, - }); - } - }); - } - }); + this._logger.info('Queue routes discovered', { + totalRoutes: this.queueRoutes.size, + routes: Array.from(this.queueRoutes.values()).map(r => ({ + handler: r.handler, + service: r.service + })), + }); + } catch (error) { + this._logger.error('Failed to discover queue routes', { error }); + } } /** @@ -136,11 +124,34 @@ export class SmartQueueManager extends QueueManager { /** * Get a queue for the current service (for processing) - * Overrides parent to use namespaced queue names + * Overrides parent to use namespaced queue names and ensure service-specific workers */ override getQueue(queueName: string, options = {}): Queue { - // For local queues, use the service namespace - const fullQueueName = getFullQueueName(this.serviceName, queueName); + // Check if this is already a full queue name (service:handler format) + const parsed = parseQueueName(queueName); + + let fullQueueName: string; + let isOwnQueue: boolean; + + if (parsed) { + // Already in service:handler format + fullQueueName = queueName; + isOwnQueue = parsed.service === this.serviceName; + } else { + // Just handler name, assume it's for current service + fullQueueName = getFullQueueName(this.serviceName, queueName); + isOwnQueue = true; + } + + // For cross-service queues, create without workers (producer-only) + if (!isOwnQueue) { + return super.getQueue(fullQueueName, { + ...options, + workers: 0, // No workers for other services' queues + }); + } + + // For own service queues, use configured workers return super.getQueue(fullQueueName, options); } @@ -212,35 +223,37 @@ export class SmartQueueManager extends QueueManager { * Resolve a queue name to a route */ private resolveQueueRoute(queueName: string): QueueRoute | null { - // Check if it's a handler name (which is now the full queue name) + // Check if it's a full queue name with service prefix const parsed = parseQueueName(queueName); if (parsed) { - const config = getServiceConfig(parsed.service); - if (config) { - return { - fullName: queueName, - service: parsed.service, - handler: parsed.handler, - db: config.db, - }; + // Try to find in discovered routes by handler name + const route = this.queueRoutes.get(parsed.handler); + if (route && route.service === parsed.service) { + return route; } + // Create a route on the fly + return { + fullName: queueName, + service: parsed.service, + handler: parsed.handler, + db: 0, // All queues in DB 0 + }; } - // Check if it's just a handler name + // Check if it's just a handler name in our routes const route = this.queueRoutes.get(queueName); if (route) { return route; } - // Try to find in static config - const ownerService = findServiceForHandler(queueName); + // Try to find in handler registry + const ownerService = handlerRegistry.getHandlerService(queueName); if (ownerService) { - const config = getServiceConfig(ownerService)!; return { fullName: getFullQueueName(ownerService, queueName), service: ownerService, handler: queueName, - db: config.db, + db: 0, // All queues in DB 0 }; } @@ -253,8 +266,8 @@ export class SmartQueueManager extends QueueManager { private getProducerQueue(route: QueueRoute): BullQueue { if (!this.producerQueues.has(route.fullName)) { const connection = this.getConnection(route.db); - // Match the queue name format used by workers: {queueName} - const queue = new BullQueue(`{${route.fullName}}`, { + // Use the same queue name format as workers + const queue = new BullQueue(route.fullName, { connection, defaultJobOptions: this.getConfig().defaultQueueOptions?.defaultJobOptions || {}, }); @@ -276,8 +289,8 @@ export class SmartQueueManager extends QueueManager { if (queue && typeof queue.getBullQueue === 'function') { // Extract the underlying BullMQ queue using the public getter // Use the simple handler name without service prefix for display - const parts = name.split(':'); - const simpleName = parts.length > 1 ? parts[1] : name; + const parsed = parseQueueName(name); + const simpleName = parsed ? parsed.handler : name; if (simpleName) { allQueues[simpleName] = queue.getBullQueue(); } @@ -287,20 +300,18 @@ export class SmartQueueManager extends QueueManager { // Add producer queues for (const [name, queue] of this.producerQueues) { // Use the simple handler name without service prefix for display - const parts = name.split(':'); - const simpleName = parts.length > 1 ? parts[1] : name; + const parsed = parseQueueName(name); + const simpleName = parsed ? parsed.handler : name; if (simpleName && !allQueues[simpleName]) { allQueues[simpleName] = queue; } } - // If no queues found, return all registered handlers as BullMQ queues + // If no queues found, create from discovered routes if (Object.keys(allQueues).length === 0) { - // Create BullMQ queue instances for known handlers - const handlers = ['proxy', 'qm', 'ib', 'ceo', 'webshare', 'exchanges', 'symbols']; - for (const handler of handlers) { - const connection = this.getConnection(1); // Use default DB - allQueues[handler] = new BullQueue(`{${handler}}`, { + for (const [handlerName, route] of this.queueRoutes) { + const connection = this.getConnection(0); // Use unified queue DB + allQueues[handlerName] = new BullQueue(route.fullName, { connection, defaultJobOptions: this.getConfig().defaultQueueOptions?.defaultJobOptions || {}, }); @@ -325,6 +336,57 @@ export class SmartQueueManager extends QueueManager { return stats; } + /** + * Start workers for all queues belonging to this service + * Overrides parent to ensure only own queues get workers + */ + override startAllWorkers(): void { + if (!this.getConfig().delayWorkerStart) { + this._logger.info( + 'startAllWorkers() called but workers already started automatically (delayWorkerStart is false)' + ); + return; + } + + let workersStarted = 0; + const queues = this.getQueues(); + + for (const [queueName, queue] of queues) { + // Parse queue name to check if it belongs to this service + const parsed = parseQueueName(queueName); + + // Skip if not our service's queue + if (parsed && parsed.service !== this.serviceName) { + this._logger.trace('Skipping workers for cross-service queue', { + queueName, + ownerService: parsed.service, + currentService: this.serviceName, + }); + continue; + } + + const workerCount = this.getConfig().defaultQueueOptions?.workers || 1; + const concurrency = this.getConfig().defaultQueueOptions?.concurrency || 1; + + if (workerCount > 0) { + queue.startWorkersManually(workerCount, concurrency); + workersStarted++; + this._logger.debug('Started workers for queue', { + queueName, + workers: workerCount, + concurrency, + }); + } + } + + this._logger.info('Service workers started', { + service: this.serviceName, + totalQueues: queues.size, + queuesWithWorkers: workersStarted, + delayWorkerStart: this.getConfig().delayWorkerStart, + }); + } + /** * Graceful shutdown */ @@ -337,7 +399,7 @@ export class SmartQueueManager extends QueueManager { // Close additional connections for (const [db, connection] of this.connections) { - if (db !== this.serviceConfig.db) { // Don't close our main connection + if (db !== 0) { // Don't close our main connection (DB 0 for queues) connection.disconnect(); this._logger.debug('Closed Redis connection', { db }); }