import { afterEach, beforeEach, describe, expect, test } from 'bun:test'; import { handlerRegistry, processItems, Queue, QueueManager } from '../src'; // Suppress Redis connection errors in tests process.on('unhandledRejection', (reason, promise) => { if (reason && typeof reason === 'object' && 'message' in reason) { const message = (reason as Error).message; if ( message.includes('Connection is closed') || message.includes('Connection is in monitoring mode') ) { return; } } console.error('Unhandled Rejection at:', promise, 'reason:', reason); }); describe('Batch Processor', () => { let queueManager: QueueManager; let queue: Queue; let queueName: string; const redisConfig = { host: 'localhost', port: 6379, password: '', db: 0, }; beforeEach(async () => { // Clear handler registry handlerRegistry.clear(); // Register test handler handlerRegistry.register('batch-test', { 'process-item': async payload => { return { processed: true, data: payload }; }, generic: async payload => { return { processed: true, data: payload }; }, 'process-batch-items': async _batchData => { // This is called by the batch processor internally return { batchProcessed: true }; }, }); // Use unique queue name per test to avoid conflicts queueName = `batch-test-queue-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; // Reset and initialize singleton QueueManager for tests await QueueManager.reset(); queueManager = QueueManager.initialize({ redis: redisConfig, defaultQueueOptions: { workers: 0, // No workers in tests concurrency: 5, }, }); // Get queue using the new getQueue() method (batch cache is now auto-initialized) queue = queueManager.getQueue(queueName); // Note: Batch cache is now automatically initialized when getting the queue // Ensure completely clean state - wait for queue to be ready first await queue.getBullQueue().waitUntilReady(); // Clear all job states await queue.getBullQueue().drain(true); await queue.getBullQueue().clean(0, 1000, 'completed'); await queue.getBullQueue().clean(0, 1000, 'failed'); await queue.getBullQueue().clean(0, 1000, 'active'); await queue.getBullQueue().clean(0, 1000, 'waiting'); await queue.getBullQueue().clean(0, 1000, 'delayed'); // Add a small delay to ensure cleanup is complete await new Promise(resolve => setTimeout(resolve, 50)); }); afterEach(async () => { try { // Clean up jobs first if (queue) { try { await queue.getBullQueue().drain(true); await queue.getBullQueue().clean(0, 1000, 'completed'); await queue.getBullQueue().clean(0, 1000, 'failed'); await queue.getBullQueue().clean(0, 1000, 'active'); await queue.getBullQueue().clean(0, 1000, 'waiting'); await queue.getBullQueue().clean(0, 1000, 'delayed'); } catch { // Ignore cleanup errors } await queue.close(); } if (queueManager) { await Promise.race([ QueueManager.reset(), new Promise((_, reject) => setTimeout(() => reject(new Error('Shutdown timeout')), 3000)), ]); } } catch (error) { console.warn('Cleanup error:', error.message); } finally { handlerRegistry.clear(); await new Promise(resolve => setTimeout(resolve, 100)); } }); describe('Direct Processing', () => { test('should process items directly without batching', async () => { const items = ['item1', 'item2', 'item3', 'item4', 'item5']; const result = await processItems(items, queueName, { totalDelayHours: 0.001, // 3.6 seconds total useBatching: false, handler: 'batch-test', operation: 'process-item', priority: 1, }); expect(result.mode).toBe('direct'); expect(result.totalItems).toBe(5); expect(result.jobsCreated).toBe(5); // Verify jobs were created - BullMQ has an issue where job ID "1" doesn't show up in state queries // but exists when queried directly, so we need to check both ways const [delayedJobs, waitingJobs, activeJobs, completedJobs, failedJobs, job1] = await Promise.all([ queue.getBullQueue().getJobs(['delayed']), queue.getBullQueue().getJobs(['waiting']), queue.getBullQueue().getJobs(['active']), queue.getBullQueue().getJobs(['completed']), queue.getBullQueue().getJobs(['failed']), queue.getBullQueue().getJob('1'), // Job 1 often doesn't show up in state queries ]); const jobs = [...delayedJobs, ...waitingJobs, ...activeJobs, ...completedJobs, ...failedJobs]; const ourJobs = jobs.filter( j => j.name === 'process-item' && j.data.handler === 'batch-test' ); // Include job 1 if we found it directly but it wasn't in the state queries if ( job1 && job1.name === 'process-item' && job1.data.handler === 'batch-test' && !ourJobs.find(j => j.id === '1') ) { ourJobs.push(job1); } expect(ourJobs.length).toBe(5); // Check delays are distributed const delays = ourJobs.map(j => j.opts.delay || 0).sort((a, b) => a - b); expect(delays[0]).toBe(0); expect(delays[4]).toBeGreaterThan(delays[0]); }); test('should process complex objects directly', async () => { const items = [ { id: 1, name: 'Product A', price: 100 }, { id: 2, name: 'Product B', price: 200 }, { id: 3, name: 'Product C', price: 300 }, ]; const result = await processItems(items, queueName, { totalDelayHours: 0.001, useBatching: false, handler: 'batch-test', operation: 'process-item', }); expect(result.jobsCreated).toBe(3); // Check job payloads const jobs = await queue.getBullQueue().getJobs(['waiting', 'delayed']); const ourJobs = jobs.filter( j => j.name === 'process-item' && j.data.handler === 'batch-test' ); const payloads = ourJobs.map(j => j.data.payload); expect(payloads).toContainEqual({ id: 1, name: 'Product A', price: 100 }); expect(payloads).toContainEqual({ id: 2, name: 'Product B', price: 200 }); expect(payloads).toContainEqual({ id: 3, name: 'Product C', price: 300 }); }); }); describe('Batch Processing', () => { test('should process items in batches', async () => { const items = Array.from({ length: 50 }, (_, i) => ({ id: i, value: `item-${i}` })); const result = await processItems(items, queueName, { totalDelayHours: 0.001, useBatching: true, batchSize: 10, handler: 'batch-test', operation: 'process-item', }); expect(result.mode).toBe('batch'); expect(result.totalItems).toBe(50); expect(result.batchesCreated).toBe(5); // 50 items / 10 per batch expect(result.jobsCreated).toBe(5); // 5 batch jobs // Verify batch jobs were created const jobs = await queue.getBullQueue().getJobs(['delayed', 'waiting']); const batchJobs = jobs.filter(j => j.name === 'process-batch'); expect(batchJobs.length).toBe(5); }); test('should handle different batch sizes', async () => { const items = Array.from({ length: 23 }, (_, i) => i); const result = await processItems(items, queueName, { totalDelayHours: 0.001, useBatching: true, batchSize: 7, handler: 'batch-test', operation: 'process-item', }); expect(result.batchesCreated).toBe(4); // 23/7 = 3.28, rounded up to 4 expect(result.jobsCreated).toBe(4); }); test('should store batch payloads in cache', async () => { const items = [ { type: 'A', data: 'test1' }, { type: 'B', data: 'test2' }, ]; const result = await processItems(items, queueName, { totalDelayHours: 0.001, useBatching: true, batchSize: 2, handler: 'batch-test', operation: 'process-item', ttl: 3600, // 1 hour TTL }); expect(result.jobsCreated).toBe(1); // Get the batch job const jobs = await queue.getBullQueue().getJobs(['waiting', 'delayed']); expect(jobs.length).toBe(1); const batchJob = jobs[0]; expect(batchJob.data.payload.payloadKey).toBeDefined(); expect(batchJob.data.payload.itemCount).toBe(2); }); }); describe('Empty and Edge Cases', () => { test('should handle empty item list', async () => { const result = await processItems([], queueName, { totalDelayHours: 1, handler: 'batch-test', operation: 'process-item', }); expect(result.totalItems).toBe(0); expect(result.jobsCreated).toBe(0); expect(result.duration).toBeDefined(); }); test('should handle single item', async () => { const result = await processItems(['single-item'], queueName, { totalDelayHours: 0.001, handler: 'batch-test', operation: 'process-item', }); expect(result.totalItems).toBe(1); expect(result.jobsCreated).toBe(1); }); test('should handle large batch with delays', async () => { const items = Array.from({ length: 100 }, (_, i) => ({ index: i })); const result = await processItems(items, queueName, { totalDelayHours: 0.01, // 36 seconds total useBatching: true, batchSize: 25, handler: 'batch-test', operation: 'process-item', }); expect(result.batchesCreated).toBe(4); // 100/25 expect(result.jobsCreated).toBe(4); // Check delays are distributed const jobs = await queue.getBullQueue().getJobs(['delayed', 'waiting']); const delays = jobs.map(j => j.opts.delay || 0).sort((a, b) => a - b); expect(delays[0]).toBe(0); // First batch has no delay expect(delays[3]).toBeGreaterThan(0); // Last batch has delay }); }); describe('Job Options', () => { test('should respect custom job options', async () => { const items = ['a', 'b', 'c']; await processItems(items, queueName, { totalDelayHours: 0, handler: 'batch-test', operation: 'process-item', priority: 5, retries: 10, removeOnComplete: 100, removeOnFail: 100, }); // Check all states including job ID "1" specifically (as it often doesn't show up in state queries) const [waitingJobs, delayedJobs, job1, job2, job3] = await Promise.all([ queue.getBullQueue().getJobs(['waiting']), queue.getBullQueue().getJobs(['delayed']), queue.getBullQueue().getJob('1'), queue.getBullQueue().getJob('2'), queue.getBullQueue().getJob('3'), ]); const jobs = [...waitingJobs, ...delayedJobs]; // Add any missing jobs that exist but don't show up in state queries [job1, job2, job3].forEach(job => { if (job && !jobs.find(j => j.id === job.id)) { jobs.push(job); } }); expect(jobs.length).toBe(3); jobs.forEach(job => { expect(job.opts.priority).toBe(5); expect(job.opts.attempts).toBe(10); expect(job.opts.removeOnComplete).toBe(100); expect(job.opts.removeOnFail).toBe(100); }); }); test('should set handler and operation correctly', async () => { // Register custom handler for this test handlerRegistry.register('custom-handler', { 'custom-operation': async payload => { return { processed: true, data: payload }; }, }); await processItems(['test'], queueName, { totalDelayHours: 0, handler: 'custom-handler', operation: 'custom-operation', }); const jobs = await queue.getBullQueue().getJobs(['waiting']); expect(jobs.length).toBe(1); expect(jobs[0].data.handler).toBe('custom-handler'); expect(jobs[0].data.operation).toBe('custom-operation'); }); }); });