From d3ef73ae00b39849ea0d472f301e301bbec9e4b0 Mon Sep 17 00:00:00 2001 From: Boki Date: Thu, 19 Jun 2025 08:22:00 -0400 Subject: [PATCH] queue work --- libs/queue/src/batch-processor.ts | 95 +-- libs/queue/src/index.ts | 5 +- libs/queue/src/queue-factory.ts | 80 +-- libs/queue/src/queue-instance.ts | 273 -------- libs/queue/src/queue-manager.ts | 818 +++++++++--------------- libs/queue/src/queue.ts | 324 ++++++++++ libs/queue/src/rate-limiter.ts | 292 +++++---- libs/queue/src/types.ts | 88 ++- libs/queue/test/batch-processor.test.ts | 49 +- 9 files changed, 938 insertions(+), 1086 deletions(-) delete mode 100644 libs/queue/src/queue-instance.ts create mode 100644 libs/queue/src/queue.ts diff --git a/libs/queue/src/batch-processor.ts b/libs/queue/src/batch-processor.ts index f49d187..a7627f2 100644 --- a/libs/queue/src/batch-processor.ts +++ b/libs/queue/src/batch-processor.ts @@ -1,48 +1,20 @@ -import { CacheProvider, createCache } from '@stock-bot/cache'; import { getLogger } from '@stock-bot/logger'; -import type { Queue } from './queue-instance'; +import { QueueManager } from './queue-manager'; import type { BatchJobData, BatchResult, JobData, ProcessOptions } from './types'; const logger = getLogger('batch-processor'); -const cacheProviders = new Map(); - -function getCache(queueName: string, redisConfig: any): CacheProvider { - if (!cacheProviders.has(queueName)) { - const cacheProvider = createCache({ - redisConfig, - keyPrefix: `batch:${queueName}:`, - ttl: 86400, // 24 hours default - enableMetrics: true, - }); - cacheProviders.set(queueName, cacheProvider); - } - return cacheProviders.get(queueName) as CacheProvider; -} - -/** - * Initialize the batch cache before any batch operations - * This should be called during application startup - */ -export async function initializeBatchCache(queue: Queue): Promise { - const queueName = queue.getName(); - const redisConfig = queue.getRedisConfig(); - logger.info('Initializing batch cache...', { queueName }); - - const cache = getCache(queueName, redisConfig); - await cache.waitForReady(10000); - logger.info('Batch cache initialized successfully', { queueName }); -} - /** * Main function - processes items either directly or in batches * Each item becomes payload: item (no processing needed) */ export async function processItems( items: T[], - queue: Queue, + queueName: string, options: ProcessOptions ): Promise { + const queueManager = QueueManager.getInstance(); + const queue = queueManager.getQueue(queueName); const startTime = Date.now(); if (items.length === 0) { @@ -63,8 +35,8 @@ export async function processItems( try { const result = options.useBatching - ? await processBatched(items, queue, options) - : await processDirect(items, queue, options); + ? await processBatched(items, queueName, options) + : await processDirect(items, queueName, options); const duration = Date.now() - startTime; @@ -85,9 +57,11 @@ export async function processItems( */ async function processDirect( items: T[], - queue: Queue, + queueName: string, options: ProcessOptions ): Promise> { + const queueManager = QueueManager.getInstance(); + const queue = queueManager.getQueue(queueName); const totalDelayMs = options.totalDelayHours * 60 * 60 * 1000; // Convert hours to milliseconds const delayPerItem = totalDelayMs / items.length; @@ -114,7 +88,7 @@ async function processDirect( }, })); - const createdJobs = await addJobsInChunks(queue, jobs); + const createdJobs = await addJobsInChunks(queueName, jobs); return { @@ -129,9 +103,11 @@ async function processDirect( */ async function processBatched( items: T[], - queue: Queue, + queueName: string, options: ProcessOptions ): Promise> { + const queueManager = QueueManager.getInstance(); + const queue = queueManager.getQueue(queueName); const batchSize = options.batchSize || 100; const batches = createBatches(items, batchSize); const totalDelayMs = options.totalDelayHours * 60 * 60 * 1000; // Convert hours to milliseconds @@ -147,7 +123,7 @@ async function processBatched( const batchJobs = await Promise.all( batches.map(async (batch, batchIndex) => { // Just store the items directly - no processing needed - const payloadKey = await storeItems(batch, queue, options); + const payloadKey = await storeItems(batch, queueName, options); return { name: 'process-batch', @@ -174,7 +150,7 @@ async function processBatched( }) ); - const createdJobs = await addJobsInChunks(queue, batchJobs); + const createdJobs = await addJobsInChunks(queueName, batchJobs); return { totalItems: items.length, @@ -189,8 +165,10 @@ async function processBatched( */ export async function processBatchJob( jobData: BatchJobData, - queue: Queue + queueName: string ): Promise { + const queueManager = QueueManager.getInstance(); + const queue = queueManager.getQueue(queueName); const { payloadKey, batchIndex, totalBatches, itemCount } = jobData; logger.debug('Processing batch job', { @@ -200,7 +178,7 @@ export async function processBatchJob( }); try { - const payload = await loadPayload(payloadKey, queue); + const payload = await loadPayload(payloadKey, queueName); if (!payload || !payload.items || !payload.options) { logger.error('Invalid payload data', { payloadKey, payload }); throw new Error(`Invalid payload data for key: ${payloadKey}`); @@ -225,10 +203,10 @@ export async function processBatchJob( }, })); - const createdJobs = await addJobsInChunks(queue, jobs); + const createdJobs = await addJobsInChunks(queueName, jobs); // Cleanup payload after successful processing - await cleanupPayload(payloadKey, queue); + await cleanupPayload(payloadKey, queueName); return { batchIndex, @@ -253,14 +231,11 @@ function createBatches(items: T[], batchSize: number): T[][] { async function storeItems( items: T[], - queue: Queue, + queueName: string, options: ProcessOptions ): Promise { - if (!queue) { - throw new Error('Batch cache not initialized. Call initializeBatchCache() first.'); - } - - const cache = getCache(queue.getName(), queue.getRedisConfig()); + const queueManager = QueueManager.getInstance(); + const cache = queueManager.getCache(queueName); const payloadKey = `payload:${Date.now()}:${Math.random().toString(36).substr(2, 9)}`; const payload = { @@ -283,7 +258,7 @@ async function storeItems( async function loadPayload( key: string, - queue: Queue + queueName: string ): Promise<{ items: T[]; options: { @@ -294,11 +269,8 @@ async function loadPayload( operation: string; }; } | null> { - if (!queue) { - throw new Error('Batch cache not initialized. Call initializeBatchCache() first.'); - } - - const cache = getCache(queue.getName(), queue.getRedisConfig()); + const queueManager = QueueManager.getInstance(); + const cache = queueManager.getCache(queueName); return (await cache.get(key)) as { items: T[]; options: { @@ -311,20 +283,19 @@ async function loadPayload( } | null; } -async function cleanupPayload(key: string, queue: Queue): Promise { - if (!queue) { - throw new Error('Batch cache not initialized. Call initializeBatchCache() first.'); - } - - const cache = getCache(queue.getName(), queue.getRedisConfig()); +async function cleanupPayload(key: string, queueName: string): Promise { + const queueManager = QueueManager.getInstance(); + const cache = queueManager.getCache(queueName); await cache.del(key); } async function addJobsInChunks( - queue: Queue, + queueName: string, jobs: Array<{ name: string; data: JobData; opts?: Record }>, chunkSize = 100 ): Promise { + const queueManager = QueueManager.getInstance(); + const queue = queueManager.getQueue(queueName); const allCreatedJobs = []; for (let i = 0; i < jobs.length; i += chunkSize) { diff --git a/libs/queue/src/index.ts b/libs/queue/src/index.ts index 43861e2..04eb8a7 100644 --- a/libs/queue/src/index.ts +++ b/libs/queue/src/index.ts @@ -1,7 +1,6 @@ export * from './batch-processor'; export * from './handler-registry'; export * from './queue-manager'; -export * from './queue-instance'; export * from './queue-factory'; export * from './types'; export * from './dlq-handler'; @@ -9,10 +8,10 @@ export * from './queue-metrics'; export * from './rate-limiter'; // Re-export commonly used functions -export { initializeBatchCache, processBatchJob, processItems } from './batch-processor'; +export { processBatchJob, processItems } from './batch-processor'; export { QueueManager } from './queue-manager'; -export { Queue } from './queue-instance'; +export { Queue, type QueueConfig } from './queue'; export { handlerRegistry } from './handler-registry'; diff --git a/libs/queue/src/queue-factory.ts b/libs/queue/src/queue-factory.ts index 200dc99..315d654 100644 --- a/libs/queue/src/queue-factory.ts +++ b/libs/queue/src/queue-factory.ts @@ -1,18 +1,14 @@ import { getLogger } from '@stock-bot/logger'; import { QueueManager } from './queue-manager'; -import { Queue } from './queue-instance'; -import type { ProcessOptions, BatchResult } from './types'; +import { Queue } from './queue'; +import { processItems } from './batch-processor'; +import type { ProcessOptions, BatchResult, QueueManagerConfig } from './types'; const logger = getLogger('queue-factory'); -// Global queue manager (manages workers and handlers) -let queueManager: QueueManager | null = null; -// Registry of individual queues -const queues = new Map(); -let globalRedisConfig: any = null; - /** * Initialize the queue system with global configuration + * This now uses the singleton QueueManager pattern */ export async function initializeQueueSystem(config: { redis: any; @@ -22,68 +18,55 @@ export async function initializeQueueSystem(config: { }): Promise { logger.info('Initializing global queue system...'); - globalRedisConfig = config.redis; - - // Initialize the global queue manager for worker management - queueManager = new QueueManager({ - queueName: 'system-queue-manager', - redis: globalRedisConfig, - workers: config.workers || 5, - concurrency: config.concurrency || 20, - defaultJobOptions: config.defaultJobOptions, - handlers: [], // Will be set by individual services - }); + const queueManagerConfig: QueueManagerConfig = { + redis: config.redis, + defaultQueueOptions: { + defaultJobOptions: config.defaultJobOptions, + workers: config.workers || 5, + concurrency: config.concurrency || 20, + }, + }; - await queueManager.initialize(); + // Initialize the singleton QueueManager + QueueManager.initialize(queueManagerConfig); - logger.info('Queue system initialized'); + logger.info('Queue system initialized with singleton QueueManager'); } /** * Get or create a queue for the given queue name + * Now uses the singleton QueueManager */ export function getQueue(queueName: string): Queue { - if (!globalRedisConfig) { - throw new Error('Queue system not initialized. Call initializeQueueSystem() first.'); - } - - if (!queues.has(queueName)) { - logger.info(`Creating new queue: ${queueName}`); - - const queue = new Queue(queueName, globalRedisConfig); - queues.set(queueName, queue); - } - - return queues.get(queueName)!; + const queueManager = QueueManager.getInstance(); + return queueManager.getQueue(queueName); } /** * Process items using the specified queue + * Now uses the batch processor directly */ export async function processItemsWithQueue( queueName: string, items: T[], options: ProcessOptions ): Promise { - const queue = getQueue(queueName); - return queue.processItems(items, options); + return processItems(items, queueName, options); } /** * Get all active queue names */ export function getActiveQueueNames(): string[] { - return Array.from(queues.keys()); + const queueManager = QueueManager.getInstance(); + return queueManager.getQueueNames(); } /** * Get the global queue manager (for advanced operations) */ export function getQueueManager(): QueueManager { - if (!queueManager) { - throw new Error('Queue system not initialized. Call initializeQueueSystem() first.'); - } - return queueManager; + return QueueManager.getInstance(); } /** @@ -92,21 +75,8 @@ export function getQueueManager(): QueueManager { export async function shutdownAllQueues(): Promise { logger.info('Shutting down all queues...'); - // Shutdown individual queues - const queueShutdownPromises = Array.from(queues.values()).map(queue => - queue.shutdown().catch(error => { - logger.error('Error shutting down queue', { error }); - }) - ); - - await Promise.all(queueShutdownPromises); - queues.clear(); - - // Shutdown the global queue manager - if (queueManager) { - await queueManager.shutdown(); - queueManager = null; - } + // Reset the singleton QueueManager (handles all cleanup) + await QueueManager.reset(); logger.info('All queues shut down'); } \ No newline at end of file diff --git a/libs/queue/src/queue-instance.ts b/libs/queue/src/queue-instance.ts deleted file mode 100644 index 1f06137..0000000 --- a/libs/queue/src/queue-instance.ts +++ /dev/null @@ -1,273 +0,0 @@ -import { Queue as BullQueue, Worker, QueueEvents, type Job } from 'bullmq'; -import { getLogger } from '@stock-bot/logger'; -import { processItems, processBatchJob } from './batch-processor'; -import { handlerRegistry } from './handler-registry'; -import type { JobData, ProcessOptions, BatchResult, BatchJobData } from './types'; -import { getRedisConnection } from './utils'; - -const logger = getLogger('queue-instance'); - -export class Queue { - private bullQueue: BullQueue; - private workers: Worker[] = []; - private queueEvents: QueueEvents; - private queueName: string; - private redisConfig: any; - private initialized = false; - - constructor(queueName: string, redisConfig: any, options: { startWorker?: boolean } = {}) { - this.queueName = queueName; - this.redisConfig = redisConfig; - - const connection = getRedisConnection(redisConfig); - - // Initialize BullMQ queue - this.bullQueue = new BullQueue(`{${queueName}}`, { - connection, - defaultJobOptions: { - removeOnComplete: 10, - removeOnFail: 5, - attempts: 3, - backoff: { - type: 'exponential', - delay: 1000, - }, - }, - }); - - // Initialize queue events - this.queueEvents = new QueueEvents(`{${queueName}}`, { connection }); - - // Start a worker for this queue unless explicitly disabled - if (options.startWorker !== false) { - this.startWorker(); - } - } - - /** - * Get the queue name - */ - getName(): string { - return this.queueName; - } - - /** - * Get the redis configuration - */ - getRedisConfig(): any { - return this.redisConfig; - } - - /** - * Initialize batch cache for this queue - */ - async initialize(): Promise { - if (this.initialized) { - return; - } - - const { initializeBatchCache } = await import('./batch-processor'); - await initializeBatchCache(this); - this.initialized = true; - - logger.info(`Queue initialized: ${this.queueName}`); - } - - /** - * Process items using this queue - */ - async processItems(items: T[], options: ProcessOptions): Promise { - // Ensure queue is initialized - if (!this.initialized) { - await this.initialize(); - } - - return processItems(items, this, options); - } - - /** - * Add a single job to the queue - */ - async add(name: string, data: JobData, options: Record = {}): Promise { - return await this.bullQueue.add(name, data, options); - } - - /** - * Add multiple jobs to the queue in bulk - */ - async addBulk( - jobs: Array<{ name: string; data: JobData; opts?: Record }> - ): Promise { - const createdJobs = await this.bullQueue.addBulk(jobs); - - return createdJobs; - } - - /** - * Get queue statistics - */ - async getStats(): Promise<{ - waiting: number; - active: number; - completed: number; - failed: number; - delayed: number; - }> { - const [waiting, active, completed, failed, delayed] = await Promise.all([ - this.bullQueue.getWaiting(), - this.bullQueue.getActive(), - this.bullQueue.getCompleted(), - this.bullQueue.getFailed(), - this.bullQueue.getDelayed(), - ]); - - return { - waiting: waiting.length, - active: active.length, - completed: completed.length, - failed: failed.length, - delayed: delayed.length, - }; - } - - /** - * Pause the queue - */ - async pause(): Promise { - await this.bullQueue.pause(); - logger.info(`Queue paused: ${this.queueName}`); - } - - /** - * Resume the queue - */ - async resume(): Promise { - await this.bullQueue.resume(); - logger.info(`Queue resumed: ${this.queueName}`); - } - - /** - * Clean completed and failed jobs - */ - async clean(grace: number = 0, limit: number = 100): Promise { - await Promise.all([ - this.bullQueue.clean(grace, limit, 'completed'), - this.bullQueue.clean(grace, limit, 'failed'), - ]); - logger.info(`Queue cleaned: ${this.queueName}`, { grace, limit }); - } - - /** - * Shutdown this queue - */ - async shutdown(): Promise { - logger.info(`Shutting down queue: ${this.queueName}`); - - try { - // Close workers - await Promise.all(this.workers.map(worker => worker.close())); - this.workers = []; - - // Close queue events - await this.queueEvents.close(); - - // Close queue - await this.bullQueue.close(); - - logger.info(`Queue shutdown complete: ${this.queueName}`); - } catch (error) { - logger.error(`Error during queue shutdown: ${this.queueName}`, { error }); - throw error; - } - } - - /** - * Start a worker for this queue - */ - private startWorker(): void { - const connection = getRedisConnection(this.redisConfig); - - const worker = new Worker(`{${this.queueName}}`, this.processJob.bind(this), { - connection, - concurrency: 20, - }); - - worker.on('completed', job => { - logger.debug('Job completed', { - id: job.id, - name: job.name, - queue: this.queueName, - }); - }); - - worker.on('failed', (job, err) => { - logger.error('Job failed', { - id: job?.id, - name: job?.name, - queue: this.queueName, - error: err.message, - }); - }); - - this.workers.push(worker); - logger.info(`Started worker for queue: ${this.queueName}`); - } - - /** - * Process a job - */ - private async processJob(job: Job) { - const { handler, operation, payload }: JobData = job.data; - - logger.info('Processing job', { - id: job.id, - handler, - operation, - queue: this.queueName, - payloadKeys: Object.keys(payload || {}), - }); - - try { - let result; - - if (operation === 'process-batch-items') { - // Special handling for batch processing - result = await processBatchJob(payload as BatchJobData, this); - } else { - // Regular handler lookup - const jobHandler = handlerRegistry.getHandler(handler, operation); - - if (!jobHandler) { - throw new Error(`No handler found for ${handler}:${operation}`); - } - - result = await jobHandler(payload); - } - - logger.info('Job completed successfully', { - id: job.id, - handler, - operation, - queue: this.queueName, - }); - - return result; - } catch (error) { - logger.error('Job processing failed', { - id: job.id, - handler, - operation, - queue: this.queueName, - error: error instanceof Error ? error.message : String(error), - }); - throw error; - } - } - - /** - * Get the BullMQ queue instance (for advanced operations) - */ - getBullQueue(): BullQueue { - return this.bullQueue; - } -} \ No newline at end of file diff --git a/libs/queue/src/queue-manager.ts b/libs/queue/src/queue-manager.ts index 6228508..2fe33cb 100644 --- a/libs/queue/src/queue-manager.ts +++ b/libs/queue/src/queue-manager.ts @@ -1,578 +1,394 @@ -import { Queue, QueueEvents, Worker, type Job } from 'bullmq'; import { getLogger } from '@stock-bot/logger'; -import { DeadLetterQueueHandler } from './dlq-handler'; -import { handlerRegistry } from './handler-registry'; -import { QueueMetricsCollector } from './queue-metrics'; -import { QueueRateLimiter, type RateLimitRule } from './rate-limiter'; -import type { HandlerConfig, HandlerInitializer, JobData, QueueConfig } from './types'; +import { QueueRateLimiter } from './rate-limiter'; +import { Queue, type QueueConfig } from './queue'; +import { CacheProvider, createCache } from '@stock-bot/cache'; +import type { + QueueManagerConfig, + QueueOptions, + GlobalStats, + QueueStats, + RateLimitRule +} from './types'; import { getRedisConnection } from './utils'; const logger = getLogger('queue-manager'); +/** + * Singleton QueueManager that provides unified queue and cache management + * Main entry point for all queue operations with getQueue() method + */ export class QueueManager { - private queue!: Queue; - private workers: Worker[] = []; - private queueEvents!: QueueEvents; - private config: Required; - private handlers: HandlerInitializer[]; - private enableScheduledJobs: boolean; - private dlqHandler?: DeadLetterQueueHandler; - private metricsCollector?: QueueMetricsCollector; + private static instance: QueueManager | null = null; + private queues = new Map(); + private caches = new Map(); private rateLimiter?: QueueRateLimiter; + private redisConnection: any; + private isShuttingDown = false; + private isInitialized = false; - private get isInitialized() { - return !!this.queue; + private constructor(private config: QueueManagerConfig) { + this.redisConnection = getRedisConnection(config.redis); + + // Initialize rate limiter if rules are provided + if (config.rateLimitRules && config.rateLimitRules.length > 0) { + this.rateLimiter = new QueueRateLimiter(this.redisConnection); + config.rateLimitRules.forEach(rule => { + this.rateLimiter!.addRule(rule); + }); + } + + this.isInitialized = true; + logger.info('QueueManager singleton initialized'); } /** - * Get the queue name + * Get the singleton instance */ - get queueName(): string { - return this.config.queueName; - } - - constructor(config: QueueConfig = {}) { - // Enhanced configuration - this.handlers = config.handlers || []; - this.enableScheduledJobs = config.enableScheduledJobs ?? true; - - // Set default configuration - this.config = { - workers: config.workers ?? 5, - concurrency: config.concurrency ?? 20, - redis: { - host: config.redis?.host || 'localhost', - port: config.redis?.port || 6379, - password: config.redis?.password || '', - db: config.redis?.db || 0, - }, - queueName: config.queueName || 'default-queue', - defaultJobOptions: { - removeOnComplete: 10, - removeOnFail: 5, - attempts: 3, - backoff: { - type: 'exponential', - delay: 1000, - }, - ...config.defaultJobOptions, - }, - handlers: this.handlers, - enableScheduledJobs: this.enableScheduledJobs, - enableRateLimit: config.enableRateLimit || false, - globalRateLimit: config.globalRateLimit, - enableDLQ: config.enableDLQ || false, - dlqConfig: config.dlqConfig, - enableMetrics: config.enableMetrics || false, - rateLimitRules: config.rateLimitRules || [], - }; + static getInstance(config?: QueueManagerConfig): QueueManager { + if (!QueueManager.instance) { + if (!config) { + throw new Error('QueueManager not initialized. Provide config on first call.'); + } + QueueManager.instance = new QueueManager(config); + } + return QueueManager.instance; } /** - * Initialize the queue manager with enhanced handler and scheduled job support + * Initialize the singleton with config */ - async initialize(): Promise { - if (this.isInitialized) { - logger.warn('Queue manager already initialized'); - return; + static initialize(config: QueueManagerConfig): QueueManager { + if (QueueManager.instance) { + logger.warn('QueueManager already initialized, returning existing instance'); + return QueueManager.instance; + } + QueueManager.instance = new QueueManager(config); + return QueueManager.instance; + } + + /** + * Reset the singleton (mainly for testing) + */ + static async reset(): Promise { + if (QueueManager.instance) { + await QueueManager.instance.shutdown(); + QueueManager.instance = null; + } + } + + /** + * Get or create a queue - unified method that handles both scenarios + * This is the main method for accessing queues + */ + getQueue(queueName: string, options: QueueOptions = {}): Queue { + // Return existing queue if it exists + if (this.queues.has(queueName)) { + return this.queues.get(queueName)!; } - logger.info('Initializing enhanced queue manager...', { - queueName: this.config.queueName, - workers: this.config.workers, - concurrency: this.config.concurrency, - handlers: this.handlers.length, - enableScheduledJobs: this.enableScheduledJobs, + // Create new queue with merged options + const mergedOptions = { + ...this.config.defaultQueueOptions, + ...options, + }; + + // Prepare queue configuration + const queueConfig: QueueConfig = { + workers: mergedOptions.workers, + concurrency: mergedOptions.concurrency, + startWorker: mergedOptions.workers && mergedOptions.workers > 0, + }; + + const queue = new Queue( + queueName, + this.config.redis, + mergedOptions.defaultJobOptions || {}, + queueConfig + ); + + // Store the queue + this.queues.set(queueName, queue); + + // Automatically initialize batch cache for the queue + this.initializeBatchCacheSync(queueName); + + // Add queue-specific rate limit rules + if (this.rateLimiter && mergedOptions.rateLimitRules) { + mergedOptions.rateLimitRules.forEach(rule => { + // Ensure queue name is set for queue-specific rules + const ruleWithQueue = { ...rule, queueName }; + this.rateLimiter!.addRule(ruleWithQueue); + }); + } + + logger.info('Queue created with batch cache', { + queueName, + workers: mergedOptions.workers || 0, + concurrency: mergedOptions.concurrency || 1 }); - try { - // Step 1: Register all handlers - await this.registerHandlers(); + return queue; + } - // Step 2: Initialize core queue infrastructure - const connection = this.getConnection(); - const queueName = `{${this.config.queueName}}`; + /** + * Check if a queue exists + */ + hasQueue(queueName: string): boolean { + return this.queues.has(queueName); + } - // Initialize queue - this.queue = new Queue(queueName, { - connection, - defaultJobOptions: this.config.defaultJobOptions, + /** + * Get all queue names + */ + getQueueNames(): string[] { + return Array.from(this.queues.keys()); + } + + /** + * Get or create a cache for a queue + */ + getCache(queueName: string): CacheProvider { + if (!this.caches.has(queueName)) { + const cacheProvider = createCache({ + redisConfig: this.config.redis, + keyPrefix: `batch:${queueName}:`, + ttl: 86400, // 24 hours default + enableMetrics: true, }); - - // Initialize queue events - this.queueEvents = new QueueEvents(queueName, { connection }); - - // Wait for queue to be ready - await this.queue.waitUntilReady(); - - // Step 3: Initialize DLQ handler if enabled - if (this.config.enableDLQ) { - this.dlqHandler = new DeadLetterQueueHandler(this.queue, connection, this.config.dlqConfig); - } - - // Step 4: Initialize metrics collector if enabled - if (this.config.enableMetrics) { - this.metricsCollector = new QueueMetricsCollector(this.queue, this.queueEvents); - } - - // Step 5: Initialize rate limiter if enabled - if (this.config.enableRateLimit && this.config.rateLimitRules) { - const redis = await this.getRedisClient(); - this.rateLimiter = new QueueRateLimiter(redis); - - // Add configured rate limit rules - for (const rule of this.config.rateLimitRules) { - this.rateLimiter.addRule(rule); - } - } - - // Step 6: Start workers - await this.startWorkers(); - - // Step 7: Setup event listeners - this.setupEventListeners(); - - // Step 8: Batch cache will be initialized by individual Queue instances - - // Step 9: Set up scheduled jobs - if (this.enableScheduledJobs) { - await this.setupScheduledJobs(); - } - - logger.info('Enhanced queue manager initialized successfully'); - } catch (error) { - logger.error('Failed to initialize enhanced queue manager', { error }); - throw error; + this.caches.set(queueName, cacheProvider); + logger.debug('Cache created for queue', { queueName }); } + return this.caches.get(queueName)!; } /** - * Register all configured handlers + * Initialize cache for a queue (ensures it's ready) */ - private async registerHandlers(): Promise { - logger.info('Registering queue handlers...', { count: this.handlers.length }); - - // Initialize handlers using the configured handler initializers - for (const handlerInitializer of this.handlers) { - try { - await handlerInitializer(); - } catch (error) { - logger.error('Failed to initialize handler', { error }); - throw error; - } - } - - // Now register all handlers from the registry with the queue manager - const allHandlers = handlerRegistry.getAllHandlers(); - for (const [handlerName, config] of allHandlers) { - this.registerHandler(handlerName, config.operations); - logger.info(`Registered handler: ${handlerName}`); - } - - // Log scheduled jobs - const scheduledJobs = handlerRegistry.getAllScheduledJobs(); - logger.info(`Registered ${scheduledJobs.length} scheduled jobs across all handlers`); - for (const { handler, job } of scheduledJobs) { - logger.info( - `Scheduled job: ${handler}.${job.type} - ${job.description} (${job.cronPattern})` - ); - } - - logger.info('All handlers registered successfully'); + async initializeCache(queueName: string): Promise { + const cache = this.getCache(queueName); + await cache.waitForReady(10000); + logger.info('Cache initialized for queue', { queueName }); } /** - * Set up scheduled jobs from handler registry + * Initialize batch cache synchronously (for automatic initialization) + * The cache will be ready for use, but we don't wait for Redis connection */ - private async setupScheduledJobs(): Promise { - const scheduledJobs = handlerRegistry.getAllScheduledJobs(); + private initializeBatchCacheSync(queueName: string): void { + // Just create the cache - it will connect automatically when first used + this.getCache(queueName); + logger.debug('Batch cache initialized synchronously for queue', { queueName }); + } - if (scheduledJobs.length === 0) { - logger.info('No scheduled jobs found'); - return; + /** + * Get statistics for all queues + */ + async getGlobalStats(): Promise { + const queueStats: Record = {}; + let totalJobs = 0; + let totalWorkers = 0; + + for (const [queueName, queue] of this.queues) { + const stats = await queue.getStats(); + queueStats[queueName] = stats; + + totalJobs += stats.waiting + stats.active + stats.completed + stats.failed + stats.delayed; + totalWorkers += stats.workers || 0; } - logger.info(`Setting up ${scheduledJobs.length} scheduled jobs...`); - - for (const { handler, job } of scheduledJobs) { - try { - const jobData: JobData = { - type: job.type, - handler, - operation: job.operation, - payload: job.payload, - priority: job.priority, - }; - - await this.add(`recurring-${handler}-${job.operation}`, jobData, { - repeat: { - pattern: job.cronPattern, - tz: 'UTC', - immediately: job.immediately || false, - }, - delay: job.delay || 0, - removeOnComplete: 1, - removeOnFail: 1, - attempts: 2, - backoff: { - type: 'fixed', - delay: 5000, - }, - }); - - logger.info(`Scheduled job registered: ${handler}.${job.type} (${job.cronPattern})`); - } catch (error) { - logger.error(`Failed to register scheduled job: ${handler}.${job.type}`, { error }); - } - } - - logger.info('Scheduled jobs setup complete'); - } - - /** - * Register a handler with its operations - */ - registerHandler(handlerName: string, config: HandlerConfig): void { - handlerRegistry.register(handlerName, config); - } - - /** - * Add a single job to the queue - */ - async add(name: string, data: JobData, options: Record = {}): Promise { - this.ensureInitialized(); - return await this.queue.add(name, data, options); - } - - /** - * Add multiple jobs to the queue in bulk - */ - async addBulk( - jobs: Array<{ name: string; data: JobData; opts?: Record }> - ): Promise { - this.ensureInitialized(); - return await this.queue.addBulk(jobs); - } - - /** - * Get queue statistics - */ - async getStats(): Promise<{ - waiting: number; - active: number; - completed: number; - failed: number; - delayed: number; - }> { - this.ensureInitialized(); - const [waiting, active, completed, failed, delayed] = await Promise.all([ - this.queue.getWaiting(), - this.queue.getActive(), - this.queue.getCompleted(), - this.queue.getFailed(), - this.queue.getDelayed(), - ]); - return { - waiting: waiting.length, - active: active.length, - completed: completed.length, - failed: failed.length, - delayed: delayed.length, + queues: queueStats, + totalJobs, + totalWorkers, + uptime: process.uptime(), }; } /** - * Pause the queue + * Get statistics for a specific queue */ - async pause(): Promise { - this.ensureInitialized(); - await this.queue.pause(); - logger.info('Queue paused'); - } - - /** - * Resume the queue - */ - async resume(): Promise { - this.ensureInitialized(); - await this.queue.resume(); - logger.info('Queue resumed'); - } - - /** - * Clean completed and failed jobs - */ - async clean(grace: number = 0, limit: number = 100): Promise { - this.ensureInitialized(); - await Promise.all([ - this.queue.clean(grace, limit, 'completed'), - this.queue.clean(grace, limit, 'failed'), - ]); - logger.info('Queue cleaned', { grace, limit }); - } - - /** - * Get the queue name - */ - getQueueName(): string { - return this.config.queueName; - } - - /** - * Get the redis configuration - */ - getRedisConfig(): any { - return this.config.redis; - } - - /** - * Get queue metrics - */ - async getMetrics() { - if (!this.metricsCollector) { - throw new Error('Metrics not enabled. Set enableMetrics: true in config'); + async getQueueStats(queueName: string): Promise { + const queue = this.queues.get(queueName); + if (!queue) { + return undefined; } - return this.metricsCollector.collect(); + return await queue.getStats(); } /** - * Get metrics report - */ - async getMetricsReport(): Promise { - if (!this.metricsCollector) { - throw new Error('Metrics not enabled. Set enableMetrics: true in config'); - } - return this.metricsCollector.getReport(); - } - - /** - * Get DLQ stats - */ - async getDLQStats() { - if (!this.dlqHandler) { - throw new Error('DLQ not enabled. Set enableDLQ: true in config'); - } - return this.dlqHandler.getStats(); - } - - /** - * Retry jobs from DLQ - */ - async retryDLQJobs(limit = 10) { - if (!this.dlqHandler) { - throw new Error('DLQ not enabled. Set enableDLQ: true in config'); - } - return this.dlqHandler.retryDLQJobs(limit); - } - - /** - * Add rate limit rule + * Add a rate limit rule */ addRateLimitRule(rule: RateLimitRule): void { if (!this.rateLimiter) { - throw new Error('Rate limiting not enabled. Set enableRateLimit: true in config'); + this.rateLimiter = new QueueRateLimiter(this.redisConnection); } this.rateLimiter.addRule(rule); } /** - * Get rate limit status + * Check rate limits for a job */ - async getRateLimitStatus(handler: string, operation: string) { + async checkRateLimit(queueName: string, handler: string, operation: string): Promise<{ + allowed: boolean; + retryAfter?: number; + remainingPoints?: number; + appliedRule?: RateLimitRule; + }> { if (!this.rateLimiter) { - throw new Error('Rate limiting not enabled. Set enableRateLimit: true in config'); + return { allowed: true }; } - return this.rateLimiter.getStatus(handler, operation); + + return await this.rateLimiter.checkLimit(queueName, handler, operation); } /** - * Shutdown the queue manager + * Get rate limit status + */ + async getRateLimitStatus(queueName: string, handler: string, operation: string) { + if (!this.rateLimiter) { + return { + queueName, + handler, + operation, + }; + } + + return await this.rateLimiter.getStatus(queueName, handler, operation); + } + + /** + * Pause all queues + */ + async pauseAll(): Promise { + const pausePromises = Array.from(this.queues.values()).map(queue => queue.pause()); + await Promise.all(pausePromises); + logger.info('All queues paused'); + } + + /** + * Resume all queues + */ + async resumeAll(): Promise { + const resumePromises = Array.from(this.queues.values()).map(queue => queue.resume()); + await Promise.all(resumePromises); + logger.info('All queues resumed'); + } + + /** + * Pause a specific queue + */ + async pauseQueue(queueName: string): Promise { + const queue = this.queues.get(queueName); + if (!queue) { + return false; + } + + await queue.pause(); + return true; + } + + /** + * Resume a specific queue + */ + async resumeQueue(queueName: string): Promise { + const queue = this.queues.get(queueName); + if (!queue) { + return false; + } + + await queue.resume(); + return true; + } + + /** + * Drain all queues + */ + async drainAll(delayed = false): Promise { + const drainPromises = Array.from(this.queues.values()).map(queue => queue.drain(delayed)); + await Promise.all(drainPromises); + logger.info('All queues drained', { delayed }); + } + + /** + * Clean all queues + */ + async cleanAll( + grace: number = 0, + limit: number = 100, + type: 'completed' | 'failed' = 'completed' + ): Promise { + const cleanPromises = Array.from(this.queues.values()).map(queue => + queue.clean(grace, limit, type) + ); + await Promise.all(cleanPromises); + logger.info('All queues cleaned', { type, grace, limit }); + } + + + /** + * Shutdown all queues and workers */ async shutdown(): Promise { - logger.info('Shutting down queue manager...'); - - const shutdownTasks: Promise[] = []; - - try { - // Shutdown DLQ handler - if (this.dlqHandler) { - shutdownTasks.push( - this.dlqHandler.shutdown().catch(err => - logger.warn('Error shutting down DLQ handler', { error: err }) - ) - ); - } - - // Close workers - if (this.workers.length > 0) { - shutdownTasks.push( - Promise.all( - this.workers.map(worker => - worker.close().catch(err => - logger.warn('Error closing worker', { error: err }) - ) - ) - ).then(() => { - this.workers = []; - }) - ); - } - - // Close queue events - if (this.queueEvents) { - shutdownTasks.push( - this.queueEvents.close().catch(err => - logger.warn('Error closing queue events', { error: err }) - ) - ); - } - - // Close queue - if (this.queue) { - shutdownTasks.push( - this.queue.close().catch(err => - logger.warn('Error closing queue', { error: err }) - ) - ); - } - - // Wait for all shutdown tasks with a timeout - await Promise.race([ - Promise.all(shutdownTasks), - new Promise((_, reject) => - setTimeout(() => reject(new Error('Shutdown timeout')), 5000) - ) - ]).catch(err => { - logger.warn('Some shutdown tasks did not complete cleanly', { error: err }); - }); - - logger.info('Queue manager shutdown complete'); - } catch (error) { - logger.error('Error during queue manager shutdown', { error }); - // Don't throw in shutdown to avoid hanging tests - } - } - - private getConnection() { - return getRedisConnection(this.config.redis); - } - - private async getRedisClient() { - // Create a redis client for rate limiting - const Redis = require('ioredis'); - return new Redis(this.getConnection()); - } - - private async startWorkers(): Promise { - const connection = this.getConnection(); - const queueName = `{${this.config.queueName}}`; - - for (let i = 0; i < this.config.workers; i++) { - const worker = new Worker(queueName, this.processJob.bind(this), { - connection, - concurrency: this.config.concurrency, - }); - - worker.on('completed', job => { - logger.debug('Job completed', { - id: job.id, - name: job.name, - }); - }); - - worker.on('failed', (job, err) => { - logger.error('Job failed', { - id: job?.id, - name: job?.name, - error: err.message, - }); - }); - - this.workers.push(worker); + if (this.isShuttingDown) { + return; } - logger.info(`Started ${this.config.workers} workers`); - } - - private async processJob(job: Job) { - const { handler, operation, payload }: JobData = job.data; - - logger.info('Processing job', { - id: job.id, - handler, - operation, - payloadKeys: Object.keys(payload || {}), - }); + this.isShuttingDown = true; + logger.info('Shutting down QueueManager...'); try { - // Check rate limits if enabled - if (this.rateLimiter) { - const rateLimit = await this.rateLimiter.checkLimit(handler, operation); - if (!rateLimit.allowed) { - // Reschedule job with delay - const delay = rateLimit.retryAfter || 60000; - logger.warn('Job rate limited, rescheduling', { - id: job.id, - handler, - operation, - retryAfter: delay, - }); - - throw new Error(`Rate limited. Retry after ${delay}ms`); + // Close all queues (this now includes workers since they're managed by Queue class) + const queueShutdownPromises = Array.from(this.queues.values()).map(async (queue) => { + try { + await queue.close(); + } catch (error) { + logger.warn('Error closing queue', { error: error.message }); } - } - - // Regular handler lookup - const jobHandler = handlerRegistry.getHandler(handler, operation); - - if (!jobHandler) { - throw new Error(`No handler found for ${handler}:${operation}`); - } - - const result = await jobHandler(payload); - - logger.info('Job completed successfully', { - id: job.id, - handler, - operation, }); - return result; + await Promise.all(queueShutdownPromises); + + // Close all caches + const cacheShutdownPromises = Array.from(this.caches.values()).map(async (cache) => { + try { + // Try different disconnect methods as different cache providers may use different names + if (typeof cache.disconnect === 'function') { + await cache.disconnect(); + } else if (typeof cache.close === 'function') { + await cache.close(); + } else if (typeof cache.quit === 'function') { + await cache.quit(); + } + } catch (error) { + logger.warn('Error closing cache', { error: error.message }); + } + }); + + await Promise.all(cacheShutdownPromises); + + // Clear collections + this.queues.clear(); + this.caches.clear(); + + logger.info('QueueManager shutdown complete'); } catch (error) { - logger.error('Job processing failed', { - id: job.id, - handler, - operation, - error: error instanceof Error ? error.message : String(error), - }); - - // Handle DLQ if enabled - if (this.dlqHandler && error instanceof Error) { - await this.dlqHandler.handleFailedJob(job, error); - } - + logger.error('Error during shutdown', { error: error.message }); throw error; } } - private setupEventListeners(): void { - this.queueEvents.on('completed', ({ jobId }) => { - logger.debug('Job completed event', { jobId }); - }); - - this.queueEvents.on('failed', ({ jobId, failedReason }) => { - logger.warn('Job failed event', { jobId, failedReason }); - }); - - this.queueEvents.on('stalled', ({ jobId }) => { - logger.warn('Job stalled event', { jobId }); - }); + /** + * Wait for all queues to be ready + */ + async waitUntilReady(): Promise { + const readyPromises = Array.from(this.queues.values()).map(queue => queue.waitUntilReady()); + await Promise.all(readyPromises); } - private ensureInitialized(): void { - if (!this.isInitialized) { - throw new Error('Queue manager not initialized. Call initialize() first.'); - } + /** + * Get Redis configuration (for backward compatibility) + */ + getRedisConfig() { + return this.config.redis; } } diff --git a/libs/queue/src/queue.ts b/libs/queue/src/queue.ts new file mode 100644 index 0000000..2aec8cc --- /dev/null +++ b/libs/queue/src/queue.ts @@ -0,0 +1,324 @@ +import { Queue as BullQueue, Worker, QueueEvents, type Job } from 'bullmq'; +import { getLogger } from '@stock-bot/logger'; +import { handlerRegistry } from './handler-registry'; +import type { JobData, JobOptions, QueueStats } from './types'; +import { getRedisConnection } from './utils'; + +const logger = getLogger('queue'); + +export interface QueueConfig { + workers?: number; + concurrency?: number; + startWorker?: boolean; +} + +/** + * Consolidated Queue class that handles both job operations and optional worker management + * Can be used as a simple job queue or with workers for automatic processing + */ +export class Queue { + private bullQueue: BullQueue; + private workers: Worker[] = []; + private queueEvents?: QueueEvents; + private queueName: string; + private redisConfig: any; + + constructor( + queueName: string, + redisConfig: any, + defaultJobOptions: JobOptions = {}, + config: QueueConfig = {} + ) { + this.queueName = queueName; + this.redisConfig = redisConfig; + + const connection = getRedisConnection(redisConfig); + + // Initialize BullMQ queue + this.bullQueue = new BullQueue(`{${queueName}}`, { + connection, + defaultJobOptions: { + removeOnComplete: 10, + removeOnFail: 5, + attempts: 3, + backoff: { + type: 'exponential', + delay: 1000, + }, + ...defaultJobOptions, + }, + }); + + // Initialize queue events if workers will be used + if (config.workers && config.workers > 0) { + this.queueEvents = new QueueEvents(`{${queueName}}`, { connection }); + } + + // Start workers if requested and not explicitly disabled + if (config.workers && config.workers > 0 && config.startWorker !== false) { + this.startWorkers(config.workers, config.concurrency || 1); + } + + logger.debug('Queue created', { + queueName, + workers: config.workers || 0, + concurrency: config.concurrency || 1 + }); + } + + /** + * Get the queue name + */ + getName(): string { + return this.queueName; + } + + /** + * Add a single job to the queue + */ + async add(name: string, data: JobData, options: JobOptions = {}): Promise { + logger.debug('Adding job', { queueName: this.queueName, jobName: name }); + return await this.bullQueue.add(name, data, options); + } + + /** + * Add multiple jobs to the queue in bulk + */ + async addBulk( + jobs: Array<{ name: string; data: JobData; opts?: JobOptions }> + ): Promise { + logger.debug('Adding bulk jobs', { + queueName: this.queueName, + jobCount: jobs.length + }); + return await this.bullQueue.addBulk(jobs); + } + + /** + * Get queue statistics + */ + async getStats(): Promise { + const [waiting, active, completed, failed, delayed] = await Promise.all([ + this.bullQueue.getWaiting(), + this.bullQueue.getActive(), + this.bullQueue.getCompleted(), + this.bullQueue.getFailed(), + this.bullQueue.getDelayed(), + ]); + + const isPaused = await this.bullQueue.isPaused(); + + return { + waiting: waiting.length, + active: active.length, + completed: completed.length, + failed: failed.length, + delayed: delayed.length, + paused: isPaused, + workers: this.workers.length, + }; + } + + /** + * Get a specific job by ID + */ + async getJob(jobId: string): Promise { + return await this.bullQueue.getJob(jobId); + } + + /** + * Get jobs by state + */ + async getJobs( + states: Array<'waiting' | 'active' | 'completed' | 'failed' | 'delayed'>, + start = 0, + end = 100 + ): Promise { + return await this.bullQueue.getJobs(states, start, end); + } + + /** + * Pause the queue (stops processing new jobs) + */ + async pause(): Promise { + await this.bullQueue.pause(); + logger.info('Queue paused', { queueName: this.queueName }); + } + + /** + * Resume the queue + */ + async resume(): Promise { + await this.bullQueue.resume(); + logger.info('Queue resumed', { queueName: this.queueName }); + } + + /** + * Drain the queue (remove all jobs) + */ + async drain(delayed = false): Promise { + await this.bullQueue.drain(delayed); + logger.info('Queue drained', { queueName: this.queueName, delayed }); + } + + /** + * Clean completed and failed jobs + */ + async clean( + grace: number = 0, + limit: number = 100, + type: 'completed' | 'failed' = 'completed' + ): Promise { + await this.bullQueue.clean(grace, limit, type); + logger.debug('Queue cleaned', { queueName: this.queueName, type, grace, limit }); + } + + /** + * Wait until the queue is ready + */ + async waitUntilReady(): Promise { + await this.bullQueue.waitUntilReady(); + } + + /** + * Close the queue (cleanup resources) + */ + async close(): Promise { + try { + // Close workers first + if (this.workers.length > 0) { + await Promise.all(this.workers.map(worker => worker.close())); + this.workers = []; + logger.debug('Workers closed', { queueName: this.queueName }); + } + + // Close queue events + if (this.queueEvents) { + await this.queueEvents.close(); + logger.debug('Queue events closed', { queueName: this.queueName }); + } + + // Close the queue itself + await this.bullQueue.close(); + logger.info('Queue closed', { queueName: this.queueName }); + } catch (error) { + logger.error('Error closing queue', { queueName: this.queueName, error }); + throw error; + } + } + + /** + * Start workers for this queue + */ + private startWorkers(workerCount: number, concurrency: number): void { + const connection = getRedisConnection(this.redisConfig); + + for (let i = 0; i < workerCount; i++) { + const worker = new Worker( + `{${this.queueName}}`, + this.processJob.bind(this), + { + connection, + concurrency, + maxStalledCount: 3, + stalledInterval: 30000, + maxStalledTime: 60000, + } + ); + + // Setup worker event handlers + worker.on('completed', (job) => { + logger.debug('Job completed', { + queueName: this.queueName, + jobId: job.id, + handler: job.data?.handler, + operation: job.data?.operation, + }); + }); + + worker.on('failed', (job, err) => { + logger.error('Job failed', { + queueName: this.queueName, + jobId: job?.id, + handler: job?.data?.handler, + operation: job?.data?.operation, + error: err.message, + }); + }); + + worker.on('error', (error) => { + logger.error('Worker error', { + queueName: this.queueName, + workerId: i, + error: error.message, + }); + }); + + this.workers.push(worker); + } + + logger.info('Workers started', { + queueName: this.queueName, + workerCount, + concurrency, + }); + } + + /** + * Process a job using the handler registry + */ + private async processJob(job: Job): Promise { + const { handler, operation, payload }: JobData = job.data; + + logger.debug('Processing job', { + id: job.id, + handler, + operation, + queueName: this.queueName, + }); + + try { + // Look up handler in registry + const jobHandler = handlerRegistry.getHandler(handler, operation); + + if (!jobHandler) { + throw new Error(`No handler found for ${handler}:${operation}`); + } + + const result = await jobHandler(payload); + + logger.debug('Job completed successfully', { + id: job.id, + handler, + operation, + queueName: this.queueName, + }); + + return result; + } catch (error) { + logger.error('Job processing failed', { + id: job.id, + handler, + operation, + queueName: this.queueName, + error: error instanceof Error ? error.message : String(error), + }); + throw error; + } + } + + /** + * Get the number of active workers + */ + getWorkerCount(): number { + return this.workers.length; + } + + /** + * Get the underlying BullMQ queue (for advanced operations) + * @deprecated Use direct methods instead + */ + getBullQueue(): BullQueue { + return this.bullQueue; + } +} \ No newline at end of file diff --git a/libs/queue/src/rate-limiter.ts b/libs/queue/src/rate-limiter.ts index d4057f1..2e4d123 100644 --- a/libs/queue/src/rate-limiter.ts +++ b/libs/queue/src/rate-limiter.ts @@ -11,9 +11,10 @@ export interface RateLimitConfig { } export interface RateLimitRule { - level: 'global' | 'handler' | 'operation'; - handler?: string; - operation?: string; + level: 'global' | 'queue' | 'handler' | 'operation'; + queueName?: string; // For queue-level limits + handler?: string; // For handler-level limits + operation?: string; // For operation-level limits (most specific) config: RateLimitConfig; } @@ -29,7 +30,7 @@ export class QueueRateLimiter { addRule(rule: RateLimitRule): void { this.rules.push(rule); - const key = this.getRuleKey(rule.level, rule.handler, rule.operation); + const key = this.getRuleKey(rule.level, rule.queueName, rule.handler, rule.operation); const limiter = new RateLimiterRedis({ storeClient: this.redisClient, keyPrefix: rule.config.keyPrefix || `rl:${key}`, @@ -42,6 +43,7 @@ export class QueueRateLimiter { logger.info('Rate limit rule added', { level: rule.level, + queueName: rule.queueName, handler: rule.handler, operation: rule.operation, points: rule.config.points, @@ -51,44 +53,77 @@ export class QueueRateLimiter { /** * Check if a job can be processed based on rate limits + * Uses hierarchical precedence: operation > handler > queue > global + * The most specific matching rule takes precedence */ - async checkLimit(handler: string, operation: string): Promise<{ + async checkLimit(queueName: string, handler: string, operation: string): Promise<{ allowed: boolean; retryAfter?: number; remainingPoints?: number; + appliedRule?: RateLimitRule; }> { - const limiters = this.getApplicableLimiters(handler, operation); + const applicableRule = this.getMostSpecificRule(queueName, handler, operation); - if (limiters.length === 0) { + if (!applicableRule) { + return { allowed: true }; + } + + const key = this.getRuleKey(applicableRule.level, applicableRule.queueName, applicableRule.handler, applicableRule.operation); + const limiter = this.limiters.get(key); + + if (!limiter) { + logger.warn('Rate limiter not found for rule', { key, rule: applicableRule }); return { allowed: true }; } try { - // Check all applicable rate limiters - const results = await Promise.all( - limiters.map(({ limiter, key }) => this.consumePoint(limiter, key)) - ); - - // All limiters must allow the request - const blocked = results.find(r => !r.allowed); - if (blocked) { - return blocked; - } - - // Return the most restrictive remaining points - const minRemainingPoints = Math.min(...results.map(r => r.remainingPoints || Infinity)); + const result = await this.consumePoint(limiter, this.getConsumerKey(queueName, handler, operation)); return { - allowed: true, - remainingPoints: minRemainingPoints === Infinity ? undefined : minRemainingPoints, + ...result, + appliedRule: applicableRule, }; } catch (error) { - logger.error('Rate limit check failed', { handler, operation, error }); + logger.error('Rate limit check failed', { queueName, handler, operation, error }); // On error, allow the request to proceed return { allowed: true }; } } + /** + * Get the most specific rule that applies to this job + * Precedence: operation > handler > queue > global + */ + private getMostSpecificRule(queueName: string, handler: string, operation: string): RateLimitRule | undefined { + // 1. Check for operation-specific rule (most specific) + let rule = this.rules.find(r => + r.level === 'operation' && + r.queueName === queueName && + r.handler === handler && + r.operation === operation + ); + if (rule) return rule; + + // 2. Check for handler-specific rule + rule = this.rules.find(r => + r.level === 'handler' && + r.queueName === queueName && + r.handler === handler + ); + if (rule) return rule; + + // 3. Check for queue-specific rule + rule = this.rules.find(r => + r.level === 'queue' && + r.queueName === queueName + ); + if (rule) return rule; + + // 4. Check for global rule (least specific) + rule = this.rules.find(r => r.level === 'global'); + return rule; + } + /** * Consume a point from the rate limiter */ @@ -120,148 +155,120 @@ export class QueueRateLimiter { } /** - * Get applicable rate limiters for a handler/operation + * Get rule key for storing rate limiter */ - private getApplicableLimiters(handler: string, operation: string): Array<{ limiter: RateLimiterRedis; key: string }> { - const applicable: Array<{ limiter: RateLimiterRedis; key: string }> = []; - - for (const rule of this.rules) { - let applies = false; - let consumerKey = ''; - - switch (rule.level) { - case 'global': - // Global limit applies to all - applies = true; - consumerKey = 'global'; - break; - - case 'handler': - // Handler limit applies if handler matches - if (rule.handler === handler) { - applies = true; - consumerKey = handler; - } - break; - - case 'operation': - // Operation limit applies if both handler and operation match - if (rule.handler === handler && rule.operation === operation) { - applies = true; - consumerKey = `${handler}:${operation}`; - } - break; - } - - if (applies) { - const ruleKey = this.getRuleKey(rule.level, rule.handler, rule.operation); - const limiter = this.limiters.get(ruleKey); - if (limiter) { - applicable.push({ limiter, key: consumerKey }); - } - } - } - - return applicable; - } - - /** - * Get rule key - */ - private getRuleKey(level: string, handler?: string, operation?: string): string { + private getRuleKey(level: string, queueName?: string, handler?: string, operation?: string): string { switch (level) { case 'global': return 'global'; + case 'queue': + return `queue:${queueName}`; case 'handler': - return `handler:${handler}`; + return `handler:${queueName}:${handler}`; case 'operation': - return `operation:${handler}:${operation}`; + return `operation:${queueName}:${handler}:${operation}`; default: return level; } } /** - * Get current rate limit status for a handler/operation + * Get consumer key for rate limiting (what gets counted) */ - async getStatus(handler: string, operation: string): Promise<{ + private getConsumerKey(queueName: string, handler: string, operation: string): string { + return `${queueName}:${handler}:${operation}`; + } + + /** + * Get current rate limit status for a queue/handler/operation + */ + async getStatus(queueName: string, handler: string, operation: string): Promise<{ + queueName: string; handler: string; operation: string; - limits: Array<{ + appliedRule?: RateLimitRule; + limit?: { level: string; points: number; duration: number; remaining: number; resetIn: number; - }>; - }> { - const applicable = this.getApplicableLimiters(handler, operation); - - const limits = await Promise.all( - applicable.map(async ({ limiter, key }) => { - const rule = this.rules.find(r => { - const ruleKey = this.getRuleKey(r.level, r.handler, r.operation); - return this.limiters.get(ruleKey) === limiter; - }); - - try { - const result = await limiter.get(key); - if (!result) { - return { - level: rule?.level || 'unknown', - points: limiter.points, - duration: limiter.duration, - remaining: limiter.points, - resetIn: 0, - }; - } - - return { - level: rule?.level || 'unknown', - points: limiter.points, - duration: limiter.duration, - remaining: result.remainingPoints, - resetIn: result.msBeforeNext, - }; - } catch (error) { - return { - level: rule?.level || 'unknown', - points: limiter.points, - duration: limiter.duration, - remaining: 0, - resetIn: 0, - }; - } - }) - ); - - return { - handler, - operation, - limits, }; + }> { + const applicableRule = this.getMostSpecificRule(queueName, handler, operation); + + if (!applicableRule) { + return { + queueName, + handler, + operation, + }; + } + + const key = this.getRuleKey(applicableRule.level, applicableRule.queueName, applicableRule.handler, applicableRule.operation); + const limiter = this.limiters.get(key); + + if (!limiter) { + return { + queueName, + handler, + operation, + appliedRule: applicableRule, + }; + } + + try { + const consumerKey = this.getConsumerKey(queueName, handler, operation); + const result = await limiter.get(consumerKey); + + const limit = { + level: applicableRule.level, + points: limiter.points, + duration: limiter.duration, + remaining: result?.remainingPoints ?? limiter.points, + resetIn: result?.msBeforeNext ?? 0, + }; + + return { + queueName, + handler, + operation, + appliedRule, + limit, + }; + } catch (error) { + logger.error('Failed to get rate limit status', { queueName, handler, operation, error }); + return { + queueName, + handler, + operation, + appliedRule, + }; + } } /** - * Reset rate limits for a handler/operation + * Reset rate limits for a specific consumer */ - async reset(handler: string, operation?: string): Promise { - const applicable = operation - ? this.getApplicableLimiters(handler, operation) - : this.rules - .filter(r => !handler || r.handler === handler) - .map(r => { - const key = this.getRuleKey(r.level, r.handler, r.operation); - const limiter = this.limiters.get(key); - return limiter ? { limiter, key: handler || 'global' } : null; - }) - .filter(Boolean) as Array<{ limiter: RateLimiterRedis; key: string }>; + async reset(queueName: string, handler?: string, operation?: string): Promise { + if (handler && operation) { + // Reset specific operation + const consumerKey = this.getConsumerKey(queueName, handler, operation); + const rule = this.getMostSpecificRule(queueName, handler, operation); + + if (rule) { + const key = this.getRuleKey(rule.level, rule.queueName, rule.handler, rule.operation); + const limiter = this.limiters.get(key); + if (limiter) { + await limiter.delete(consumerKey); + } + } + } else { + // Reset broader scope - this is more complex with the new hierarchy + logger.warn('Broad reset not implemented yet', { queueName, handler, operation }); + } - await Promise.all( - applicable.map(({ limiter, key }) => limiter.delete(key)) - ); - - logger.info('Rate limits reset', { handler, operation }); + logger.info('Rate limits reset', { queueName, handler, operation }); } /** @@ -274,10 +281,11 @@ export class QueueRateLimiter { /** * Remove a rate limit rule */ - removeRule(level: string, handler?: string, operation?: string): boolean { - const key = this.getRuleKey(level, handler, operation); + removeRule(level: string, queueName?: string, handler?: string, operation?: string): boolean { + const key = this.getRuleKey(level, queueName, handler, operation); const ruleIndex = this.rules.findIndex(r => r.level === level && + (!queueName || r.queueName === queueName) && (!handler || r.handler === handler) && (!operation || r.operation === operation) ); @@ -286,7 +294,7 @@ export class QueueRateLimiter { this.rules.splice(ruleIndex, 1); this.limiters.delete(key); - logger.info('Rate limit rule removed', { level, handler, operation }); + logger.info('Rate limit rule removed', { level, queueName, handler, operation }); return true; } diff --git a/libs/queue/src/types.ts b/libs/queue/src/types.ts index f4d24fd..ba2219f 100644 --- a/libs/queue/src/types.ts +++ b/libs/queue/src/types.ts @@ -31,34 +31,69 @@ export interface BatchResult { duration: number; } -export interface QueueConfig { +// New improved types for the refactored architecture +export interface RedisConfig { + host?: string; + port?: number; + password?: string; + db?: number; +} + +export interface JobOptions { + priority?: number; + delay?: number; + attempts?: number; + removeOnComplete?: number; + removeOnFail?: number; + backoff?: { + type: 'exponential' | 'fixed'; + delay: number; + }; +} + +export interface QueueOptions { + defaultJobOptions?: JobOptions; workers?: number; concurrency?: number; - redis?: { - host?: string; - port?: number; - password?: string; - db?: number; - }; - queueName?: string; - defaultJobOptions?: { - removeOnComplete?: number; - removeOnFail?: number; - attempts?: number; - backoff?: { - type: string; - delay: number; - }; - }; - handlers?: HandlerInitializer[]; - enableScheduledJobs?: boolean; - // Rate limiting - enableRateLimit?: boolean; - globalRateLimit?: RateLimitConfig; + enableMetrics?: boolean; enableDLQ?: boolean; + enableRateLimit?: boolean; + rateLimitRules?: RateLimitRule[]; // Queue-specific rate limit rules +} + +export interface QueueManagerConfig { + redis: RedisConfig; + defaultQueueOptions?: QueueOptions; + enableScheduledJobs?: boolean; + globalRateLimit?: RateLimitConfig; + rateLimitRules?: RateLimitRule[]; // Global rate limit rules +} + +export interface QueueStats { + waiting: number; + active: number; + completed: number; + failed: number; + delayed: number; + paused: boolean; + workers?: number; +} + +export interface GlobalStats { + queues: Record; + totalJobs: number; + totalWorkers: number; + uptime: number; +} + +// Legacy type for backward compatibility +export interface QueueConfig extends QueueManagerConfig { + queueName?: string; + workers?: number; + concurrency?: number; + handlers?: HandlerInitializer[]; dlqConfig?: DLQConfig; enableMetrics?: boolean; - rateLimitRules?: RateLimitRule[]; } export interface JobHandler { @@ -108,9 +143,10 @@ export interface RateLimitConfig { } export interface RateLimitRule { - level: 'global' | 'handler' | 'operation'; - handler?: string; - operation?: string; + level: 'global' | 'queue' | 'handler' | 'operation'; + queueName?: string; // For queue-level limits + handler?: string; // For handler-level limits + operation?: string; // For operation-level limits (most specific) config: RateLimitConfig; } diff --git a/libs/queue/test/batch-processor.test.ts b/libs/queue/test/batch-processor.test.ts index e4c7412..35b041e 100644 --- a/libs/queue/test/batch-processor.test.ts +++ b/libs/queue/test/batch-processor.test.ts @@ -1,5 +1,5 @@ import { describe, test, expect, beforeEach, afterEach } from 'bun:test'; -import { QueueManager, Queue, handlerRegistry, processItems, initializeBatchCache } from '../src'; +import { QueueManager, Queue, handlerRegistry, processItems } from '../src'; // Suppress Redis connection errors in tests process.on('unhandledRejection', (reason, promise) => { @@ -16,6 +16,7 @@ process.on('unhandledRejection', (reason, promise) => { describe('Batch Processor', () => { let queueManager: QueueManager; let queue: Queue; + let queueName: string; const redisConfig = { host: 'localhost', @@ -44,21 +45,21 @@ describe('Batch Processor', () => { }); // Use unique queue name per test to avoid conflicts - const uniqueQueueName = `batch-test-queue-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; + queueName = `batch-test-queue-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; - // Initialize queue manager with no workers to prevent immediate processing - queueManager = new QueueManager({ - queueName: uniqueQueueName, + // Reset and initialize singleton QueueManager for tests + await QueueManager.reset(); + queueManager = QueueManager.initialize({ redis: redisConfig, - workers: 0, // No workers in tests - concurrency: 5, + defaultQueueOptions: { + workers: 0, // No workers in tests + concurrency: 5, + }, }); - await queueManager.initialize(); - - // Create Queue instance without worker to prevent immediate job processing - queue = new Queue(queueManager.getQueueName(), queueManager.getRedisConfig(), { startWorker: false }); - await initializeBatchCache(queue); + // Get queue using the new getQueue() method (batch cache is now auto-initialized) + queue = queueManager.getQueue(queueName); + // Note: Batch cache is now automatically initialized when getting the queue // Ensure completely clean state - wait for queue to be ready first await queue.getBullQueue().waitUntilReady(); @@ -89,12 +90,12 @@ describe('Batch Processor', () => { } catch (error) { // Ignore cleanup errors } - await queue.shutdown(); + await queue.close(); } if (queueManager) { await Promise.race([ - queueManager.shutdown(), + QueueManager.reset(), new Promise((_, reject) => setTimeout(() => reject(new Error('Shutdown timeout')), 3000) ) @@ -112,7 +113,7 @@ describe('Batch Processor', () => { test('should process items directly without batching', async () => { const items = ['item1', 'item2', 'item3', 'item4', 'item5']; - const result = await processItems(items, queue, { + const result = await processItems(items, queueName, { totalDelayHours: 0.001, // 3.6 seconds total useBatching: false, handler: 'batch-test', @@ -158,7 +159,7 @@ describe('Batch Processor', () => { { id: 3, name: 'Product C', price: 300 }, ]; - const result = await processItems(items, queue, { + const result = await processItems(items, queueName, { totalDelayHours: 0.001, useBatching: false, handler: 'batch-test', @@ -182,7 +183,7 @@ describe('Batch Processor', () => { test('should process items in batches', async () => { const items = Array.from({ length: 50 }, (_, i) => ({ id: i, value: `item-${i}` })); - const result = await processItems(items, queue, { + const result = await processItems(items, queueName, { totalDelayHours: 0.001, useBatching: true, batchSize: 10, @@ -204,7 +205,7 @@ describe('Batch Processor', () => { test('should handle different batch sizes', async () => { const items = Array.from({ length: 23 }, (_, i) => i); - const result = await processItems(items, queue, { + const result = await processItems(items, queueName, { totalDelayHours: 0.001, useBatching: true, batchSize: 7, @@ -222,7 +223,7 @@ describe('Batch Processor', () => { { type: 'B', data: 'test2' }, ]; - const result = await processItems(items, queue, { + const result = await processItems(items, queueName, { totalDelayHours: 0.001, useBatching: true, batchSize: 2, @@ -245,7 +246,7 @@ describe('Batch Processor', () => { describe('Empty and Edge Cases', () => { test('should handle empty item list', async () => { - const result = await processItems([], queue, { + const result = await processItems([], queueName, { totalDelayHours: 1, handler: 'batch-test', operation: 'process-item', @@ -257,7 +258,7 @@ describe('Batch Processor', () => { }); test('should handle single item', async () => { - const result = await processItems(['single-item'], queue, { + const result = await processItems(['single-item'], queueName, { totalDelayHours: 0.001, handler: 'batch-test', operation: 'process-item', @@ -270,7 +271,7 @@ describe('Batch Processor', () => { test('should handle large batch with delays', async () => { const items = Array.from({ length: 100 }, (_, i) => ({ index: i })); - const result = await processItems(items, queue, { + const result = await processItems(items, queueName, { totalDelayHours: 0.01, // 36 seconds total useBatching: true, batchSize: 25, @@ -294,7 +295,7 @@ describe('Batch Processor', () => { test('should respect custom job options', async () => { const items = ['a', 'b', 'c']; - await processItems(items, queue, { + await processItems(items, queueName, { totalDelayHours: 0, handler: 'batch-test', operation: 'process-item', @@ -339,7 +340,7 @@ describe('Batch Processor', () => { }, }); - await processItems(['test'], queue, { + await processItems(['test'], queueName, { totalDelayHours: 0, handler: 'custom-handler', operation: 'custom-operation',