import { afterEach, beforeEach, describe, expect, test } from 'bun:test'; import { handlerRegistry, QueueManager } from '../src'; // Suppress Redis connection errors in tests process.on('unhandledRejection', (reason, promise) => { if (reason && typeof reason === 'object' && 'message' in reason) { const message = (reason as Error).message; if ( message.includes('Connection is closed') || message.includes('Connection is in monitoring mode') ) { return; } } console.error('Unhandled Rejection at:', promise, 'reason:', reason); }); describe('QueueManager', () => { let queueManager: QueueManager; // Use local Redis/Dragonfly const redisConfig = { host: 'localhost', port: 6379, password: '', db: 0, }; beforeEach(() => { handlerRegistry.clear(); }); afterEach(async () => { if (queueManager) { try { await Promise.race([ queueManager.shutdown(), new Promise((_, reject) => setTimeout(() => reject(new Error('Shutdown timeout')), 3000)), ]); } catch (error) { console.warn('Shutdown error:', error.message); } finally { queueManager = null as any; } } handlerRegistry.clear(); await new Promise(resolve => setTimeout(resolve, 100)); }); describe('Basic Operations', () => { test('should initialize queue manager', async () => { queueManager = new QueueManager({ queueName: 'test-queue', redis: redisConfig, workers: 1, concurrency: 5, }); await queueManager.initialize(); expect(queueManager.queueName).toBe('test-queue'); }); test('should add and process a job', async () => { let processedPayload: any; // Register handler handlerRegistry.register('test-handler', { 'test-operation': async payload => { processedPayload = payload; return { success: true, data: payload }; }, }); queueManager = new QueueManager({ queueName: 'test-queue', redis: redisConfig, workers: 1, }); await queueManager.initialize(); // Add job const job = await queueManager.add('test-job', { handler: 'test-handler', operation: 'test-operation', payload: { message: 'Hello, Queue!' }, }); expect(job.name).toBe('test-job'); // Wait for processing await new Promise(resolve => setTimeout(resolve, 100)); expect(processedPayload).toEqual({ message: 'Hello, Queue!' }); }); test('should handle missing handler gracefully', async () => { queueManager = new QueueManager({ queueName: 'test-queue', redis: redisConfig, workers: 1, }); await queueManager.initialize(); const job = await queueManager.add('test-job', { handler: 'non-existent', operation: 'test-operation', payload: { test: true }, }); // Wait for job to fail await new Promise(resolve => setTimeout(resolve, 100)); const failed = await job.isFailed(); expect(failed).toBe(true); }); test('should add multiple jobs in bulk', async () => { let processedCount = 0; handlerRegistry.register('bulk-handler', { process: async _payload => { processedCount++; return { processed: true }; }, }); queueManager = new QueueManager({ queueName: 'test-queue', redis: redisConfig, workers: 2, concurrency: 5, }); await queueManager.initialize(); const jobs = await queueManager.addBulk([ { name: 'job1', data: { handler: 'bulk-handler', operation: 'process', payload: { id: 1 } }, }, { name: 'job2', data: { handler: 'bulk-handler', operation: 'process', payload: { id: 2 } }, }, { name: 'job3', data: { handler: 'bulk-handler', operation: 'process', payload: { id: 3 } }, }, ]); expect(jobs.length).toBe(3); // Wait for processing await new Promise(resolve => setTimeout(resolve, 200)); expect(processedCount).toBe(3); }); test('should get queue statistics', async () => { queueManager = new QueueManager({ queueName: 'test-queue', redis: redisConfig, workers: 0, // No workers, jobs will stay in waiting }); await queueManager.initialize(); // Add some jobs await queueManager.add('job1', { handler: 'test', operation: 'test', payload: { id: 1 }, }); await queueManager.add('job2', { handler: 'test', operation: 'test', payload: { id: 2 }, }); const stats = await queueManager.getStats(); expect(stats.waiting).toBe(2); expect(stats.active).toBe(0); expect(stats.completed).toBe(0); expect(stats.failed).toBe(0); }); test('should pause and resume queue', async () => { let processedCount = 0; handlerRegistry.register('pause-test', { process: async () => { processedCount++; return { ok: true }; }, }); queueManager = new QueueManager({ queueName: 'test-queue', redis: redisConfig, workers: 1, }); await queueManager.initialize(); // Pause queue await queueManager.pause(); // Add job while paused await queueManager.add('job1', { handler: 'pause-test', operation: 'process', payload: {}, }); // Wait a bit - job should not be processed await new Promise(resolve => setTimeout(resolve, 100)); expect(processedCount).toBe(0); // Resume queue await queueManager.resume(); // Wait for processing await new Promise(resolve => setTimeout(resolve, 100)); expect(processedCount).toBe(1); }); }); describe('Scheduled Jobs', () => { test('should register and process scheduled jobs', async () => { let executionCount = 0; handlerRegistry.registerWithSchedule({ name: 'scheduled-handler', operations: { 'scheduled-task': async _payload => { executionCount++; return { executed: true, timestamp: Date.now() }; }, }, scheduledJobs: [ { type: 'test-schedule', operation: 'scheduled-task', payload: { test: true }, cronPattern: '*/1 * * * * *', // Every second description: 'Test scheduled job', }, ], }); queueManager = new QueueManager({ queueName: 'test-queue', redis: redisConfig, workers: 1, enableScheduledJobs: true, }); await queueManager.initialize(); // Wait for scheduled job to execute await new Promise(resolve => setTimeout(resolve, 2500)); expect(executionCount).toBeGreaterThanOrEqual(2); }); }); describe('Error Handling', () => { test('should handle job errors with retries', async () => { let attemptCount = 0; handlerRegistry.register('retry-handler', { 'failing-operation': async () => { attemptCount++; if (attemptCount < 3) { throw new Error(`Attempt ${attemptCount} failed`); } return { success: true }; }, }); queueManager = new QueueManager({ queueName: 'test-queue', redis: redisConfig, workers: 1, defaultJobOptions: { attempts: 3, backoff: { type: 'fixed', delay: 50, }, }, }); await queueManager.initialize(); const job = await queueManager.add('retry-job', { handler: 'retry-handler', operation: 'failing-operation', payload: {}, }); // Wait for retries await new Promise(resolve => setTimeout(resolve, 500)); const completed = await job.isCompleted(); expect(completed).toBe(true); expect(attemptCount).toBe(3); }); }); describe('Multiple Handlers', () => { test('should handle multiple handlers with different operations', async () => { const results: any[] = []; handlerRegistry.register('handler-a', { 'operation-1': async payload => { results.push({ handler: 'a', op: '1', payload }); return { handler: 'a', op: '1' }; }, 'operation-2': async payload => { results.push({ handler: 'a', op: '2', payload }); return { handler: 'a', op: '2' }; }, }); handlerRegistry.register('handler-b', { 'operation-1': async payload => { results.push({ handler: 'b', op: '1', payload }); return { handler: 'b', op: '1' }; }, }); queueManager = new QueueManager({ queueName: 'test-queue', redis: redisConfig, workers: 2, }); await queueManager.initialize(); // Add jobs for different handlers await queueManager.addBulk([ { name: 'job1', data: { handler: 'handler-a', operation: 'operation-1', payload: { id: 1 } }, }, { name: 'job2', data: { handler: 'handler-a', operation: 'operation-2', payload: { id: 2 } }, }, { name: 'job3', data: { handler: 'handler-b', operation: 'operation-1', payload: { id: 3 } }, }, ]); // Wait for processing await new Promise(resolve => setTimeout(resolve, 200)); expect(results.length).toBe(3); expect(results).toContainEqual({ handler: 'a', op: '1', payload: { id: 1 } }); expect(results).toContainEqual({ handler: 'a', op: '2', payload: { id: 2 } }); expect(results).toContainEqual({ handler: 'b', op: '1', payload: { id: 3 } }); }); }); });