created lots of tests
This commit is contained in:
parent
42baadae38
commit
54f37f9521
21 changed files with 4577 additions and 215 deletions
171
libs/core/queue/test/batch-processor.test.ts
Normal file
171
libs/core/queue/test/batch-processor.test.ts
Normal file
|
|
@ -0,0 +1,171 @@
|
|||
import { describe, expect, it, mock } from 'bun:test';
|
||||
import { processBatchJob, processItems } from '../src/batch-processor';
|
||||
import type { BatchJobData } from '../src/types';
|
||||
|
||||
describe('Batch Processor', () => {
|
||||
const mockLogger = {
|
||||
info: mock(() => {}),
|
||||
error: mock(() => {}),
|
||||
warn: mock(() => {}),
|
||||
debug: mock(() => {}),
|
||||
trace: mock(() => {}),
|
||||
};
|
||||
|
||||
describe('processBatchJob', () => {
|
||||
it('should process all items successfully', async () => {
|
||||
const batchData: BatchJobData = {
|
||||
items: ['item1', 'item2', 'item3'],
|
||||
options: {
|
||||
batchSize: 2,
|
||||
concurrency: 1,
|
||||
},
|
||||
};
|
||||
|
||||
const processor = mock((item: string) => Promise.resolve({ processed: item }));
|
||||
|
||||
const result = await processBatchJob(batchData, processor, mockLogger);
|
||||
|
||||
expect(result.totalItems).toBe(3);
|
||||
expect(result.successful).toBe(3);
|
||||
expect(result.failed).toBe(0);
|
||||
expect(result.errors).toHaveLength(0);
|
||||
expect(processor).toHaveBeenCalledTimes(3);
|
||||
});
|
||||
|
||||
it('should handle partial failures', async () => {
|
||||
const batchData: BatchJobData = {
|
||||
items: ['item1', 'item2', 'item3'],
|
||||
options: {},
|
||||
};
|
||||
|
||||
const processor = mock((item: string) => {
|
||||
if (item === 'item2') {
|
||||
return Promise.reject(new Error('Processing failed'));
|
||||
}
|
||||
return Promise.resolve({ processed: item });
|
||||
});
|
||||
|
||||
const result = await processBatchJob(batchData, processor, mockLogger);
|
||||
|
||||
expect(result.totalItems).toBe(3);
|
||||
expect(result.successful).toBe(2);
|
||||
expect(result.failed).toBe(1);
|
||||
expect(result.errors).toHaveLength(1);
|
||||
expect(result.errors[0].item).toBe('item2');
|
||||
expect(result.errors[0].error).toBe('Processing failed');
|
||||
});
|
||||
|
||||
it('should handle empty items', async () => {
|
||||
const batchData: BatchJobData = {
|
||||
items: [],
|
||||
options: {},
|
||||
};
|
||||
|
||||
const processor = mock(() => Promise.resolve({}));
|
||||
|
||||
const result = await processBatchJob(batchData, processor, mockLogger);
|
||||
|
||||
expect(result.totalItems).toBe(0);
|
||||
expect(result.successful).toBe(0);
|
||||
expect(result.failed).toBe(0);
|
||||
expect(processor).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should track duration', async () => {
|
||||
const batchData: BatchJobData = {
|
||||
items: ['item1'],
|
||||
options: {},
|
||||
};
|
||||
|
||||
const processor = mock(() =>
|
||||
new Promise(resolve => setTimeout(() => resolve({}), 10))
|
||||
);
|
||||
|
||||
const result = await processBatchJob(batchData, processor, mockLogger);
|
||||
|
||||
expect(result.duration).toBeGreaterThan(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe('processItems', () => {
|
||||
it('should process items with default options', async () => {
|
||||
const items = [1, 2, 3, 4, 5];
|
||||
const processor = mock((item: number) => Promise.resolve(item * 2));
|
||||
|
||||
const results = await processItems(items, processor);
|
||||
|
||||
expect(results).toEqual([2, 4, 6, 8, 10]);
|
||||
expect(processor).toHaveBeenCalledTimes(5);
|
||||
});
|
||||
|
||||
it('should process items in batches', async () => {
|
||||
const items = [1, 2, 3, 4, 5];
|
||||
const processor = mock((item: number) => Promise.resolve(item * 2));
|
||||
|
||||
const results = await processItems(items, processor, {
|
||||
batchSize: 2,
|
||||
concurrency: 1,
|
||||
});
|
||||
|
||||
expect(results).toEqual([2, 4, 6, 8, 10]);
|
||||
expect(processor).toHaveBeenCalledTimes(5);
|
||||
});
|
||||
|
||||
it('should handle concurrent processing', async () => {
|
||||
const items = [1, 2, 3, 4];
|
||||
let activeCount = 0;
|
||||
let maxActiveCount = 0;
|
||||
|
||||
const processor = mock(async (item: number) => {
|
||||
activeCount++;
|
||||
maxActiveCount = Math.max(maxActiveCount, activeCount);
|
||||
await new Promise(resolve => setTimeout(resolve, 10));
|
||||
activeCount--;
|
||||
return item * 2;
|
||||
});
|
||||
|
||||
await processItems(items, processor, {
|
||||
batchSize: 10,
|
||||
concurrency: 2,
|
||||
});
|
||||
|
||||
// With concurrency 2, at most 2 items should be processed at once
|
||||
expect(maxActiveCount).toBeLessThanOrEqual(2);
|
||||
expect(processor).toHaveBeenCalledTimes(4);
|
||||
});
|
||||
|
||||
it('should handle empty array', async () => {
|
||||
const processor = mock(() => Promise.resolve({}));
|
||||
const results = await processItems([], processor);
|
||||
|
||||
expect(results).toEqual([]);
|
||||
expect(processor).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should propagate errors', async () => {
|
||||
const items = [1, 2, 3];
|
||||
const processor = mock((item: number) => {
|
||||
if (item === 2) {
|
||||
return Promise.reject(new Error('Process error'));
|
||||
}
|
||||
return Promise.resolve(item);
|
||||
});
|
||||
|
||||
await expect(processItems(items, processor)).rejects.toThrow('Process error');
|
||||
});
|
||||
|
||||
it('should process large batches efficiently', async () => {
|
||||
const items = Array.from({ length: 100 }, (_, i) => i);
|
||||
const processor = mock((item: number) => Promise.resolve(item + 1));
|
||||
|
||||
const results = await processItems(items, processor, {
|
||||
batchSize: 20,
|
||||
concurrency: 5,
|
||||
});
|
||||
|
||||
expect(results).toHaveLength(100);
|
||||
expect(results[0]).toBe(1);
|
||||
expect(results[99]).toBe(100);
|
||||
});
|
||||
});
|
||||
});
|
||||
253
libs/core/queue/test/dlq-handler.test.ts
Normal file
253
libs/core/queue/test/dlq-handler.test.ts
Normal file
|
|
@ -0,0 +1,253 @@
|
|||
import { beforeEach, describe, expect, it, mock } from 'bun:test';
|
||||
import { DeadLetterQueueHandler } from '../src/dlq-handler';
|
||||
import type { Job, Queue } from 'bullmq';
|
||||
import type { RedisConfig } from '../src/types';
|
||||
|
||||
describe('DeadLetterQueueHandler', () => {
|
||||
const mockLogger = {
|
||||
info: mock(() => {}),
|
||||
error: mock(() => {}),
|
||||
warn: mock(() => {}),
|
||||
debug: mock(() => {}),
|
||||
trace: mock(() => {}),
|
||||
};
|
||||
|
||||
const mockQueue = {
|
||||
name: 'test-queue',
|
||||
add: mock(() => Promise.resolve({})),
|
||||
getCompleted: mock(() => Promise.resolve([])),
|
||||
getFailed: mock(() => Promise.resolve([])),
|
||||
getWaiting: mock(() => Promise.resolve([])),
|
||||
} as unknown as Queue;
|
||||
|
||||
const mockRedisConfig: RedisConfig = {
|
||||
host: 'localhost',
|
||||
port: 6379,
|
||||
};
|
||||
|
||||
let dlqHandler: DeadLetterQueueHandler;
|
||||
|
||||
beforeEach(() => {
|
||||
dlqHandler = new DeadLetterQueueHandler(mockQueue, mockRedisConfig, {}, mockLogger);
|
||||
// Reset mocks
|
||||
mockLogger.info = mock(() => {});
|
||||
mockLogger.error = mock(() => {});
|
||||
mockLogger.warn = mock(() => {});
|
||||
});
|
||||
|
||||
describe('handleFailedJob', () => {
|
||||
it('should log failed job details', async () => {
|
||||
const mockJob = {
|
||||
id: 'job-123',
|
||||
name: 'test-job',
|
||||
queueName: 'test-queue',
|
||||
data: {
|
||||
handler: 'testHandler',
|
||||
operation: 'testOp',
|
||||
payload: { test: true },
|
||||
},
|
||||
attemptsMade: 3,
|
||||
failedReason: 'Test error',
|
||||
finishedOn: Date.now(),
|
||||
processedOn: Date.now() - 5000,
|
||||
} as Job;
|
||||
|
||||
const error = new Error('Job processing failed');
|
||||
|
||||
await dlqHandler.handleFailedJob(mockJob, error);
|
||||
|
||||
expect(mockLogger.error).toHaveBeenCalledWith(
|
||||
'Job moved to DLQ',
|
||||
expect.objectContaining({
|
||||
jobId: 'job-123',
|
||||
jobName: 'test-job',
|
||||
error: 'Job processing failed',
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it('should handle jobs without data gracefully', async () => {
|
||||
const mockJob = {
|
||||
id: 'job-123',
|
||||
name: 'test-job',
|
||||
queueName: 'test-queue',
|
||||
data: null,
|
||||
attemptsMade: 1,
|
||||
} as any;
|
||||
|
||||
const error = new Error('No data');
|
||||
|
||||
await dlqHandler.handleFailedJob(mockJob, error);
|
||||
|
||||
expect(mockLogger.error).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should check alert threshold', async () => {
|
||||
const mockJob = {
|
||||
id: 'job-123',
|
||||
name: 'critical-job',
|
||||
queueName: 'critical-queue',
|
||||
data: { handler: 'critical', operation: 'process' },
|
||||
attemptsMade: 3,
|
||||
opts: { attempts: 3 },
|
||||
} as Job;
|
||||
|
||||
const error = new Error('Critical failure');
|
||||
|
||||
await dlqHandler.handleFailedJob(mockJob, error);
|
||||
|
||||
expect(mockLogger.warn).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('getStats', () => {
|
||||
it('should return DLQ statistics', async () => {
|
||||
const mockJobs = [
|
||||
{
|
||||
id: 'job-1',
|
||||
data: { originalJob: { name: 'process' } },
|
||||
timestamp: Date.now(),
|
||||
},
|
||||
{
|
||||
id: 'job-2',
|
||||
data: { originalJob: { name: 'process' } },
|
||||
timestamp: Date.now() - 100000,
|
||||
},
|
||||
{
|
||||
id: 'job-3',
|
||||
data: { originalJob: { name: 'validate' } },
|
||||
timestamp: Date.now() - 200000,
|
||||
},
|
||||
];
|
||||
|
||||
(mockQueue.getCompleted as any) = mock(() => Promise.resolve(mockJobs));
|
||||
(mockQueue.getFailed as any) = mock(() => Promise.resolve([]));
|
||||
(mockQueue.getWaiting as any) = mock(() => Promise.resolve([]));
|
||||
|
||||
const stats = await dlqHandler.getStats();
|
||||
|
||||
expect(stats.total).toBe(3);
|
||||
expect(stats.byJobName['process']).toBe(2);
|
||||
expect(stats.byJobName['validate']).toBe(1);
|
||||
expect(stats.oldestJob).toBeDefined();
|
||||
});
|
||||
|
||||
it('should handle empty DLQ', async () => {
|
||||
const stats = await dlqHandler.getStats();
|
||||
|
||||
expect(stats.total).toBe(0);
|
||||
expect(stats.byJobName).toEqual({});
|
||||
expect(stats.oldestJob).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
describe('retryDLQJobs', () => {
|
||||
it('should retry jobs from DLQ', async () => {
|
||||
const mockDLQJobs = [
|
||||
{
|
||||
id: 'dlq-1',
|
||||
data: {
|
||||
originalJob: {
|
||||
id: 'orig-1',
|
||||
name: 'retry-job',
|
||||
data: { test: true },
|
||||
opts: {},
|
||||
},
|
||||
},
|
||||
remove: mock(() => Promise.resolve()),
|
||||
},
|
||||
];
|
||||
|
||||
(dlqHandler as any).dlq = {
|
||||
getCompleted: mock(() => Promise.resolve(mockDLQJobs)),
|
||||
};
|
||||
|
||||
const retriedCount = await dlqHandler.retryDLQJobs(1);
|
||||
|
||||
expect(retriedCount).toBe(1);
|
||||
expect(mockQueue.add).toHaveBeenCalledWith(
|
||||
'retry-job',
|
||||
{ test: true },
|
||||
expect.objectContaining({ delay: expect.any(Number) })
|
||||
);
|
||||
expect(mockDLQJobs[0].remove).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('cleanup', () => {
|
||||
it('should clean up old DLQ entries', async () => {
|
||||
const oldJob = {
|
||||
id: 'old-1',
|
||||
timestamp: Date.now() - 8 * 24 * 60 * 60 * 1000, // 8 days old
|
||||
remove: mock(() => Promise.resolve()),
|
||||
};
|
||||
|
||||
const newJob = {
|
||||
id: 'new-1',
|
||||
timestamp: Date.now() - 1 * 24 * 60 * 60 * 1000, // 1 day old
|
||||
remove: mock(() => Promise.resolve()),
|
||||
};
|
||||
|
||||
(dlqHandler as any).dlq = {
|
||||
getCompleted: mock(() => Promise.resolve([oldJob, newJob])),
|
||||
};
|
||||
|
||||
const removedCount = await dlqHandler.cleanup();
|
||||
|
||||
expect(removedCount).toBe(1);
|
||||
expect(oldJob.remove).toHaveBeenCalled();
|
||||
expect(newJob.remove).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('inspectFailedJobs', () => {
|
||||
it('should return formatted failed jobs', async () => {
|
||||
const mockDLQJobs = [
|
||||
{
|
||||
data: {
|
||||
originalJob: {
|
||||
id: 'orig-1',
|
||||
name: 'test-job',
|
||||
data: { test: true },
|
||||
attemptsMade: 3,
|
||||
},
|
||||
error: {
|
||||
message: 'Test error',
|
||||
stack: 'Error stack',
|
||||
},
|
||||
movedToDLQAt: '2024-01-01T00:00:00Z',
|
||||
},
|
||||
},
|
||||
];
|
||||
|
||||
(dlqHandler as any).dlq = {
|
||||
getCompleted: mock(() => Promise.resolve(mockDLQJobs)),
|
||||
};
|
||||
|
||||
const inspected = await dlqHandler.inspectFailedJobs(10);
|
||||
|
||||
expect(inspected).toHaveLength(1);
|
||||
expect(inspected[0]).toEqual({
|
||||
id: 'orig-1',
|
||||
name: 'test-job',
|
||||
data: { test: true },
|
||||
error: { message: 'Test error', stack: 'Error stack' },
|
||||
failedAt: '2024-01-01T00:00:00Z',
|
||||
attempts: 3,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('shutdown', () => {
|
||||
it('should close DLQ and clear state', async () => {
|
||||
const mockClose = mock(() => Promise.resolve());
|
||||
(dlqHandler as any).dlq = {
|
||||
close: mockClose,
|
||||
};
|
||||
|
||||
await dlqHandler.shutdown();
|
||||
|
||||
expect(mockClose).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
})
|
||||
125
libs/core/queue/test/queue-class.test.ts
Normal file
125
libs/core/queue/test/queue-class.test.ts
Normal file
|
|
@ -0,0 +1,125 @@
|
|||
import { beforeEach, describe, expect, it, mock } from 'bun:test';
|
||||
import { Queue } from '../src/queue';
|
||||
import type { RedisConfig, JobData, QueueWorkerConfig } from '../src/types';
|
||||
|
||||
describe('Queue Class', () => {
|
||||
const mockRedisConfig: RedisConfig = {
|
||||
host: 'localhost',
|
||||
port: 6379,
|
||||
};
|
||||
|
||||
describe('basic functionality', () => {
|
||||
it('should create queue with minimal config', () => {
|
||||
const queue = new Queue('test-queue', mockRedisConfig);
|
||||
expect(queue).toBeDefined();
|
||||
expect(queue.getName()).toBe('test-queue');
|
||||
});
|
||||
|
||||
it('should create queue with default job options', () => {
|
||||
const defaultJobOptions = {
|
||||
attempts: 5,
|
||||
backoff: { type: 'exponential' as const, delay: 2000 },
|
||||
};
|
||||
|
||||
const queue = new Queue('test-queue', mockRedisConfig, defaultJobOptions);
|
||||
expect(queue).toBeDefined();
|
||||
expect(queue.getName()).toBe('test-queue');
|
||||
});
|
||||
|
||||
it('should create queue with custom logger', () => {
|
||||
const mockLogger = {
|
||||
info: mock(() => {}),
|
||||
error: mock(() => {}),
|
||||
warn: mock(() => {}),
|
||||
debug: mock(() => {}),
|
||||
trace: mock(() => {}),
|
||||
};
|
||||
|
||||
const queue = new Queue('test-queue', mockRedisConfig, {}, {}, mockLogger);
|
||||
expect(queue).toBeDefined();
|
||||
});
|
||||
|
||||
it('should create queue with worker config', () => {
|
||||
const workerConfig: QueueWorkerConfig = {
|
||||
workers: 2,
|
||||
concurrency: 5,
|
||||
startWorker: false, // Don't actually start workers
|
||||
serviceName: 'test-service',
|
||||
};
|
||||
|
||||
const queue = new Queue('test-queue', mockRedisConfig, {}, workerConfig);
|
||||
expect(queue).toBeDefined();
|
||||
});
|
||||
});
|
||||
|
||||
describe('queue naming and utilities', () => {
|
||||
it('should return queue name', () => {
|
||||
const queue = new Queue('my-test-queue', mockRedisConfig);
|
||||
expect(queue.getName()).toBe('my-test-queue');
|
||||
});
|
||||
|
||||
it('should get bull queue instance', () => {
|
||||
const queue = new Queue('test-queue', mockRedisConfig);
|
||||
const bullQueue = queue.getBullQueue();
|
||||
expect(bullQueue).toBeDefined();
|
||||
});
|
||||
|
||||
it('should create child logger with logger that supports child', () => {
|
||||
const mockChildLogger = {
|
||||
info: mock(() => {}),
|
||||
error: mock(() => {}),
|
||||
warn: mock(() => {}),
|
||||
debug: mock(() => {}),
|
||||
trace: mock(() => {}),
|
||||
};
|
||||
|
||||
const mockLogger = {
|
||||
info: mock(() => {}),
|
||||
error: mock(() => {}),
|
||||
warn: mock(() => {}),
|
||||
debug: mock(() => {}),
|
||||
trace: mock(() => {}),
|
||||
child: mock(() => mockChildLogger),
|
||||
};
|
||||
|
||||
const queue = new Queue('test-queue', mockRedisConfig, {}, {}, mockLogger);
|
||||
const childLogger = queue.createChildLogger('batch', { batchId: '123' });
|
||||
|
||||
expect(childLogger).toBe(mockChildLogger);
|
||||
expect(mockLogger.child).toHaveBeenCalledWith('batch', { batchId: '123' });
|
||||
});
|
||||
|
||||
it('should fallback to main logger if child not supported', () => {
|
||||
const mockLogger = {
|
||||
info: mock(() => {}),
|
||||
error: mock(() => {}),
|
||||
warn: mock(() => {}),
|
||||
debug: mock(() => {}),
|
||||
trace: mock(() => {}),
|
||||
};
|
||||
|
||||
const queue = new Queue('test-queue', mockRedisConfig, {}, {}, mockLogger);
|
||||
const childLogger = queue.createChildLogger('batch', { batchId: '123' });
|
||||
|
||||
expect(childLogger).toBe(mockLogger);
|
||||
});
|
||||
});
|
||||
|
||||
describe('worker count methods', () => {
|
||||
it('should get worker count when no workers', () => {
|
||||
const queue = new Queue('test-queue', mockRedisConfig);
|
||||
expect(queue.getWorkerCount()).toBe(0);
|
||||
});
|
||||
|
||||
it('should handle worker count with workers config', () => {
|
||||
const workerConfig: QueueWorkerConfig = {
|
||||
workers: 3,
|
||||
startWorker: false, // Don't actually start
|
||||
};
|
||||
|
||||
const queue = new Queue('test-queue', mockRedisConfig, {}, workerConfig);
|
||||
// Workers aren't actually started with startWorker: false
|
||||
expect(queue.getWorkerCount()).toBe(0);
|
||||
});
|
||||
});
|
||||
});
|
||||
232
libs/core/queue/test/queue-manager.test.ts
Normal file
232
libs/core/queue/test/queue-manager.test.ts
Normal file
|
|
@ -0,0 +1,232 @@
|
|||
import { beforeEach, describe, expect, it, mock } from 'bun:test';
|
||||
import { QueueManager } from '../src/queue-manager';
|
||||
import type { RedisConfig, QueueManagerConfig } from '../src/types';
|
||||
|
||||
describe.skip('QueueManager', () => {
|
||||
// Skipping these tests as they require real Redis connection
|
||||
// TODO: Create mock implementation or use testcontainers
|
||||
|
||||
const mockRedisConfig: RedisConfig = {
|
||||
host: 'localhost',
|
||||
port: 6379,
|
||||
};
|
||||
|
||||
const mockLogger = {
|
||||
info: mock(() => {}),
|
||||
error: mock(() => {}),
|
||||
warn: mock(() => {}),
|
||||
debug: mock(() => {}),
|
||||
trace: mock(() => {}),
|
||||
};
|
||||
|
||||
describe('constructor', () => {
|
||||
it('should create queue manager with default config', () => {
|
||||
const manager = new QueueManager(mockRedisConfig);
|
||||
expect(manager).toBeDefined();
|
||||
});
|
||||
|
||||
it('should create queue manager with custom config', () => {
|
||||
const config: QueueManagerConfig = {
|
||||
defaultJobOptions: {
|
||||
attempts: 5,
|
||||
removeOnComplete: 50,
|
||||
},
|
||||
enableMetrics: true,
|
||||
enableScheduler: true,
|
||||
};
|
||||
|
||||
const manager = new QueueManager(mockRedisConfig, config, mockLogger);
|
||||
expect(manager).toBeDefined();
|
||||
});
|
||||
});
|
||||
|
||||
describe('queue operations', () => {
|
||||
let manager: QueueManager;
|
||||
|
||||
beforeEach(() => {
|
||||
manager = new QueueManager(mockRedisConfig, {}, mockLogger);
|
||||
});
|
||||
|
||||
it('should create or get queue', () => {
|
||||
const queue = manager.createQueue('test-queue');
|
||||
expect(queue).toBeDefined();
|
||||
expect(queue.getName()).toBe('test-queue');
|
||||
});
|
||||
|
||||
it('should return same queue instance', () => {
|
||||
const queue1 = manager.createQueue('test-queue');
|
||||
const queue2 = manager.createQueue('test-queue');
|
||||
expect(queue1).toBe(queue2);
|
||||
});
|
||||
|
||||
it('should create queue with options', () => {
|
||||
const queue = manager.createQueue('test-queue', {
|
||||
concurrency: 5,
|
||||
workers: 2,
|
||||
});
|
||||
expect(queue).toBeDefined();
|
||||
});
|
||||
|
||||
it('should get existing queue', () => {
|
||||
manager.createQueue('test-queue');
|
||||
const queue = manager.getQueue('test-queue');
|
||||
expect(queue).toBeDefined();
|
||||
});
|
||||
|
||||
it('should return undefined for non-existent queue', () => {
|
||||
const queue = manager.getQueue('non-existent');
|
||||
expect(queue).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should list all queues', () => {
|
||||
manager.createQueue('queue1');
|
||||
manager.createQueue('queue2');
|
||||
const queues = manager.getQueues();
|
||||
expect(queues).toHaveLength(2);
|
||||
expect(queues.map(q => q.getName())).toContain('queue1');
|
||||
expect(queues.map(q => q.getName())).toContain('queue2');
|
||||
});
|
||||
|
||||
it('should check if queue exists', () => {
|
||||
manager.createQueue('test-queue');
|
||||
expect(manager.hasQueue('test-queue')).toBe(true);
|
||||
expect(manager.hasQueue('non-existent')).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe('cache operations', () => {
|
||||
let manager: QueueManager;
|
||||
|
||||
beforeEach(() => {
|
||||
manager = new QueueManager(mockRedisConfig, {}, mockLogger);
|
||||
});
|
||||
|
||||
it('should create cache', () => {
|
||||
const cache = manager.createCache('test-cache');
|
||||
expect(cache).toBeDefined();
|
||||
});
|
||||
|
||||
it('should get existing cache', () => {
|
||||
manager.createCache('test-cache');
|
||||
const cache = manager.getCache('test-cache');
|
||||
expect(cache).toBeDefined();
|
||||
});
|
||||
|
||||
it('should return same cache instance', () => {
|
||||
const cache1 = manager.createCache('test-cache');
|
||||
const cache2 = manager.createCache('test-cache');
|
||||
expect(cache1).toBe(cache2);
|
||||
});
|
||||
});
|
||||
|
||||
describe('service discovery', () => {
|
||||
let manager: QueueManager;
|
||||
|
||||
beforeEach(() => {
|
||||
manager = new QueueManager(mockRedisConfig, {}, mockLogger);
|
||||
});
|
||||
|
||||
it('should configure service name', () => {
|
||||
manager.configureService('test-service');
|
||||
expect((manager as any).serviceName).toBe('test-service');
|
||||
});
|
||||
|
||||
it('should register queue route', () => {
|
||||
manager.configureService('test-service');
|
||||
manager.registerQueueRoute({
|
||||
service: 'remote-service',
|
||||
handler: 'process',
|
||||
queueName: '{remote-service_process}',
|
||||
});
|
||||
|
||||
expect(manager.hasRoute('remote-service', 'process')).toBe(true);
|
||||
});
|
||||
|
||||
it('should send to remote queue', async () => {
|
||||
manager.configureService('test-service');
|
||||
manager.registerQueueRoute({
|
||||
service: 'remote-service',
|
||||
handler: 'process',
|
||||
queueName: '{remote-service_process}',
|
||||
});
|
||||
|
||||
const jobId = await manager.sendToQueue('remote-service', 'process', { data: 'test' });
|
||||
expect(jobId).toBeDefined();
|
||||
});
|
||||
|
||||
it('should send to local queue', async () => {
|
||||
manager.configureService('test-service');
|
||||
manager.createQueue('{test-service_process}');
|
||||
|
||||
const jobId = await manager.sendToQueue('test-service', 'process', { data: 'test' });
|
||||
expect(jobId).toBeDefined();
|
||||
});
|
||||
});
|
||||
|
||||
describe('shutdown', () => {
|
||||
it('should shutdown gracefully', async () => {
|
||||
const manager = new QueueManager(mockRedisConfig, {}, mockLogger);
|
||||
manager.createQueue('test-queue');
|
||||
|
||||
await manager.shutdown();
|
||||
expect((manager as any).isShuttingDown).toBe(true);
|
||||
});
|
||||
|
||||
it('should handle multiple shutdown calls', async () => {
|
||||
const manager = new QueueManager(mockRedisConfig, {}, mockLogger);
|
||||
|
||||
const promise1 = manager.shutdown();
|
||||
const promise2 = manager.shutdown();
|
||||
|
||||
expect(promise1).toBe(promise2);
|
||||
await promise1;
|
||||
});
|
||||
});
|
||||
|
||||
describe('metrics', () => {
|
||||
it('should get global stats', async () => {
|
||||
const manager = new QueueManager(mockRedisConfig, {
|
||||
enableMetrics: true,
|
||||
}, mockLogger);
|
||||
|
||||
manager.createQueue('queue1');
|
||||
manager.createQueue('queue2');
|
||||
|
||||
const stats = await manager.getGlobalStats();
|
||||
expect(stats).toBeDefined();
|
||||
expect(stats.totalQueues).toBe(2);
|
||||
});
|
||||
|
||||
it('should get queue stats', async () => {
|
||||
const manager = new QueueManager(mockRedisConfig, {
|
||||
enableMetrics: true,
|
||||
}, mockLogger);
|
||||
|
||||
const queue = manager.createQueue('test-queue');
|
||||
const stats = await manager.getQueueStats('test-queue');
|
||||
|
||||
expect(stats).toBeDefined();
|
||||
expect(stats.name).toBe('test-queue');
|
||||
});
|
||||
});
|
||||
|
||||
describe('rate limiting', () => {
|
||||
it('should apply rate limit rules', () => {
|
||||
const manager = new QueueManager(mockRedisConfig, {
|
||||
rateLimiter: {
|
||||
rules: [
|
||||
{
|
||||
name: 'api-limit',
|
||||
max: 100,
|
||||
duration: 60000,
|
||||
scope: 'global',
|
||||
},
|
||||
],
|
||||
},
|
||||
}, mockLogger);
|
||||
|
||||
const rateLimiter = (manager as any).rateLimiter;
|
||||
expect(rateLimiter).toBeDefined();
|
||||
});
|
||||
});
|
||||
});
|
||||
185
libs/core/queue/test/queue-metrics.test.ts
Normal file
185
libs/core/queue/test/queue-metrics.test.ts
Normal file
|
|
@ -0,0 +1,185 @@
|
|||
import { beforeEach, describe, expect, it, mock } from 'bun:test';
|
||||
import { QueueMetricsCollector } from '../src/queue-metrics';
|
||||
import type { Queue, QueueEvents } from 'bullmq';
|
||||
|
||||
describe('QueueMetricsCollector', () => {
|
||||
let metrics: QueueMetricsCollector;
|
||||
|
||||
const mockQueue = {
|
||||
name: 'test-queue',
|
||||
getWaitingCount: mock(() => Promise.resolve(0)),
|
||||
getActiveCount: mock(() => Promise.resolve(0)),
|
||||
getCompletedCount: mock(() => Promise.resolve(0)),
|
||||
getFailedCount: mock(() => Promise.resolve(0)),
|
||||
getDelayedCount: mock(() => Promise.resolve(0)),
|
||||
isPaused: mock(() => Promise.resolve(false)),
|
||||
getWaiting: mock(() => Promise.resolve([])),
|
||||
} as unknown as Queue;
|
||||
|
||||
const mockQueueEvents = {
|
||||
on: mock(() => {}),
|
||||
} as unknown as QueueEvents;
|
||||
|
||||
beforeEach(() => {
|
||||
metrics = new QueueMetricsCollector(mockQueue, mockQueueEvents);
|
||||
});
|
||||
|
||||
describe('collect metrics', () => {
|
||||
it('should collect current metrics', async () => {
|
||||
(mockQueue.getWaitingCount as any) = mock(() => Promise.resolve(5));
|
||||
(mockQueue.getActiveCount as any) = mock(() => Promise.resolve(2));
|
||||
(mockQueue.getCompletedCount as any) = mock(() => Promise.resolve(100));
|
||||
(mockQueue.getFailedCount as any) = mock(() => Promise.resolve(3));
|
||||
(mockQueue.getDelayedCount as any) = mock(() => Promise.resolve(1));
|
||||
|
||||
const result = await metrics.collect();
|
||||
|
||||
expect(result.waiting).toBe(5);
|
||||
expect(result.active).toBe(2);
|
||||
expect(result.completed).toBe(100);
|
||||
expect(result.failed).toBe(3);
|
||||
expect(result.delayed).toBe(1);
|
||||
expect(result.isHealthy).toBe(true);
|
||||
});
|
||||
|
||||
it('should detect health issues', async () => {
|
||||
(mockQueue.getWaitingCount as any) = mock(() => Promise.resolve(2000)); // High backlog
|
||||
(mockQueue.getActiveCount as any) = mock(() => Promise.resolve(150)); // High active
|
||||
(mockQueue.getFailedCount as any) = mock(() => Promise.resolve(50));
|
||||
|
||||
const result = await metrics.collect();
|
||||
|
||||
expect(result.isHealthy).toBe(false);
|
||||
expect(result.healthIssues.length).toBeGreaterThan(0);
|
||||
expect(result.healthIssues.some(issue => issue.includes('queue backlog'))).toBe(true);
|
||||
expect(result.healthIssues.some(issue => issue.includes('active jobs'))).toBe(true);
|
||||
});
|
||||
|
||||
it('should handle paused queue', async () => {
|
||||
(mockQueue.getWaitingCount as any) = mock(() => Promise.resolve(10));
|
||||
(mockQueue.isPaused as any) = mock(() => Promise.resolve(true));
|
||||
|
||||
const result = await metrics.collect();
|
||||
|
||||
expect(result.paused).toBe(10);
|
||||
});
|
||||
});
|
||||
|
||||
describe('processing time metrics', () => {
|
||||
it('should calculate processing time metrics', async () => {
|
||||
// Simulate some processing times
|
||||
(metrics as any).processingTimes = [1000, 2000, 3000, 4000, 5000];
|
||||
|
||||
const result = await metrics.collect();
|
||||
|
||||
expect(result.processingTime.avg).toBe(3000);
|
||||
expect(result.processingTime.min).toBe(1000);
|
||||
expect(result.processingTime.max).toBe(5000);
|
||||
expect(result.processingTime.p95).toBe(5000);
|
||||
});
|
||||
|
||||
it('should handle no processing times', async () => {
|
||||
const result = await metrics.collect();
|
||||
|
||||
expect(result.processingTime.avg).toBe(0);
|
||||
expect(result.processingTime.min).toBe(0);
|
||||
expect(result.processingTime.max).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe('throughput metrics', () => {
|
||||
it('should calculate throughput', async () => {
|
||||
// Simulate completed and failed timestamps
|
||||
const now = Date.now();
|
||||
(metrics as any).completedTimestamps = [
|
||||
now - 30000, // 30 seconds ago
|
||||
now - 20000,
|
||||
now - 10000,
|
||||
];
|
||||
(metrics as any).failedTimestamps = [
|
||||
now - 25000,
|
||||
now - 5000,
|
||||
];
|
||||
|
||||
const result = await metrics.collect();
|
||||
|
||||
expect(result.throughput.completedPerMinute).toBe(3);
|
||||
expect(result.throughput.failedPerMinute).toBe(2);
|
||||
expect(result.throughput.totalPerMinute).toBe(5);
|
||||
});
|
||||
});
|
||||
|
||||
describe('getReport', () => {
|
||||
it('should generate formatted report', async () => {
|
||||
(mockQueue.getWaitingCount as any) = mock(() => Promise.resolve(5));
|
||||
(mockQueue.getActiveCount as any) = mock(() => Promise.resolve(2));
|
||||
(mockQueue.getCompletedCount as any) = mock(() => Promise.resolve(100));
|
||||
(mockQueue.getFailedCount as any) = mock(() => Promise.resolve(3));
|
||||
|
||||
const report = await metrics.getReport();
|
||||
|
||||
expect(report).toContain('Queue Metrics Report');
|
||||
expect(report).toContain('✅ Healthy');
|
||||
expect(report).toContain('Waiting: 5');
|
||||
expect(report).toContain('Active: 2');
|
||||
expect(report).toContain('Completed: 100');
|
||||
expect(report).toContain('Failed: 3');
|
||||
});
|
||||
});
|
||||
|
||||
describe('getPrometheusMetrics', () => {
|
||||
it('should generate Prometheus formatted metrics', async () => {
|
||||
(mockQueue.getWaitingCount as any) = mock(() => Promise.resolve(5));
|
||||
(mockQueue.getActiveCount as any) = mock(() => Promise.resolve(2));
|
||||
(mockQueue.getCompletedCount as any) = mock(() => Promise.resolve(100));
|
||||
(mockQueue.getFailedCount as any) = mock(() => Promise.resolve(3));
|
||||
|
||||
const prometheusMetrics = await metrics.getPrometheusMetrics();
|
||||
|
||||
expect(prometheusMetrics).toContain('# HELP queue_jobs_total');
|
||||
expect(prometheusMetrics).toContain('queue_jobs_total{queue="test-queue",status="waiting"} 5');
|
||||
expect(prometheusMetrics).toContain('queue_jobs_total{queue="test-queue",status="active"} 2');
|
||||
expect(prometheusMetrics).toContain('queue_jobs_total{queue="test-queue",status="completed"} 100');
|
||||
expect(prometheusMetrics).toContain('# HELP queue_processing_time_seconds');
|
||||
expect(prometheusMetrics).toContain('# HELP queue_throughput_per_minute');
|
||||
expect(prometheusMetrics).toContain('# HELP queue_health');
|
||||
});
|
||||
});
|
||||
|
||||
describe('event listeners', () => {
|
||||
it('should setup event listeners on construction', () => {
|
||||
const newMockQueueEvents = {
|
||||
on: mock(() => {}),
|
||||
} as unknown as QueueEvents;
|
||||
|
||||
new QueueMetricsCollector(mockQueue, newMockQueueEvents);
|
||||
|
||||
expect(newMockQueueEvents.on).toHaveBeenCalledWith('completed', expect.any(Function));
|
||||
expect(newMockQueueEvents.on).toHaveBeenCalledWith('failed', expect.any(Function));
|
||||
expect(newMockQueueEvents.on).toHaveBeenCalledWith('active', expect.any(Function));
|
||||
});
|
||||
});
|
||||
|
||||
describe('oldest waiting job', () => {
|
||||
it('should get oldest waiting job date', async () => {
|
||||
const oldJob = {
|
||||
timestamp: Date.now() - 60000, // 1 minute ago
|
||||
};
|
||||
|
||||
(mockQueue.getWaiting as any) = mock(() => Promise.resolve([oldJob]));
|
||||
|
||||
const result = await metrics.collect();
|
||||
|
||||
expect(result.oldestWaitingJob).toBeDefined();
|
||||
expect(result.oldestWaitingJob).toBeInstanceOf(Date);
|
||||
});
|
||||
|
||||
it('should return null when no waiting jobs', async () => {
|
||||
(mockQueue.getWaiting as any) = mock(() => Promise.resolve([]));
|
||||
|
||||
const result = await metrics.collect();
|
||||
|
||||
expect(result.oldestWaitingJob).toBeNull();
|
||||
});
|
||||
});
|
||||
})
|
||||
336
libs/core/queue/test/rate-limiter.test.ts
Normal file
336
libs/core/queue/test/rate-limiter.test.ts
Normal file
|
|
@ -0,0 +1,336 @@
|
|||
import { beforeEach, describe, expect, it, mock } from 'bun:test';
|
||||
import { QueueRateLimiter } from '../src/rate-limiter';
|
||||
import type { RateLimitRule } from '../src/types';
|
||||
|
||||
describe('QueueRateLimiter', () => {
|
||||
const mockRedisClient = {
|
||||
host: 'localhost',
|
||||
port: 6379,
|
||||
};
|
||||
|
||||
const mockLogger = {
|
||||
info: mock(() => {}),
|
||||
error: mock(() => {}),
|
||||
warn: mock(() => {}),
|
||||
debug: mock(() => {}),
|
||||
};
|
||||
|
||||
describe('constructor', () => {
|
||||
it('should create rate limiter', () => {
|
||||
const limiter = new QueueRateLimiter(mockRedisClient, mockLogger);
|
||||
expect(limiter).toBeDefined();
|
||||
});
|
||||
});
|
||||
|
||||
describe('addRule', () => {
|
||||
it('should add a rate limit rule', () => {
|
||||
const limiter = new QueueRateLimiter(mockRedisClient, mockLogger);
|
||||
|
||||
const rule: RateLimitRule = {
|
||||
level: 'queue',
|
||||
queueName: 'test-queue',
|
||||
config: {
|
||||
points: 100,
|
||||
duration: 60,
|
||||
},
|
||||
};
|
||||
|
||||
limiter.addRule(rule);
|
||||
|
||||
expect(mockLogger.info).toHaveBeenCalledWith(
|
||||
'Rate limit rule added',
|
||||
expect.objectContaining({
|
||||
level: 'queue',
|
||||
queueName: 'test-queue',
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it('should add operation-level rule', () => {
|
||||
const limiter = new QueueRateLimiter(mockRedisClient, mockLogger);
|
||||
|
||||
const rule: RateLimitRule = {
|
||||
level: 'operation',
|
||||
queueName: 'test-queue',
|
||||
handler: 'user-service',
|
||||
operation: 'process-user',
|
||||
config: {
|
||||
points: 10,
|
||||
duration: 60,
|
||||
blockDuration: 300,
|
||||
},
|
||||
};
|
||||
|
||||
limiter.addRule(rule);
|
||||
const rules = limiter.getRules();
|
||||
expect(rules).toHaveLength(1);
|
||||
expect(rules[0]).toEqual(rule);
|
||||
});
|
||||
});
|
||||
|
||||
describe('checkLimit', () => {
|
||||
it('should allow when no rules apply', async () => {
|
||||
const limiter = new QueueRateLimiter(mockRedisClient, mockLogger);
|
||||
|
||||
const result = await limiter.checkLimit('test-queue', 'handler', 'operation');
|
||||
expect(result.allowed).toBe(true);
|
||||
expect(result.appliedRule).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should check against global rule', async () => {
|
||||
const limiter = new QueueRateLimiter(mockRedisClient, mockLogger);
|
||||
|
||||
const globalRule: RateLimitRule = {
|
||||
level: 'global',
|
||||
config: { points: 1000, duration: 60 },
|
||||
};
|
||||
|
||||
limiter.addRule(globalRule);
|
||||
|
||||
const result = await limiter.checkLimit('any-queue', 'any-handler', 'any-op');
|
||||
expect(result.appliedRule).toEqual(globalRule);
|
||||
});
|
||||
|
||||
it('should prefer more specific rules', async () => {
|
||||
const limiter = new QueueRateLimiter(mockRedisClient, mockLogger);
|
||||
|
||||
// Add rules from least to most specific
|
||||
const globalRule: RateLimitRule = {
|
||||
level: 'global',
|
||||
config: { points: 1000, duration: 60 },
|
||||
};
|
||||
|
||||
const queueRule: RateLimitRule = {
|
||||
level: 'queue',
|
||||
queueName: 'test-queue',
|
||||
config: { points: 100, duration: 60 },
|
||||
};
|
||||
|
||||
const handlerRule: RateLimitRule = {
|
||||
level: 'handler',
|
||||
queueName: 'test-queue',
|
||||
handler: 'test-handler',
|
||||
config: { points: 50, duration: 60 },
|
||||
};
|
||||
|
||||
const operationRule: RateLimitRule = {
|
||||
level: 'operation',
|
||||
queueName: 'test-queue',
|
||||
handler: 'test-handler',
|
||||
operation: 'test-op',
|
||||
config: { points: 10, duration: 60 },
|
||||
};
|
||||
|
||||
limiter.addRule(globalRule);
|
||||
limiter.addRule(queueRule);
|
||||
limiter.addRule(handlerRule);
|
||||
limiter.addRule(operationRule);
|
||||
|
||||
// Operation level should take precedence
|
||||
const result = await limiter.checkLimit('test-queue', 'test-handler', 'test-op');
|
||||
expect(result.appliedRule?.level).toBe('operation');
|
||||
|
||||
// Handler level for different operation
|
||||
const result2 = await limiter.checkLimit('test-queue', 'test-handler', 'other-op');
|
||||
expect(result2.appliedRule?.level).toBe('handler');
|
||||
|
||||
// Queue level for different handler
|
||||
const result3 = await limiter.checkLimit('test-queue', 'other-handler', 'some-op');
|
||||
expect(result3.appliedRule?.level).toBe('queue');
|
||||
|
||||
// Global for different queue
|
||||
const result4 = await limiter.checkLimit('other-queue', 'handler', 'op');
|
||||
expect(result4.appliedRule?.level).toBe('global');
|
||||
});
|
||||
});
|
||||
|
||||
describe('getStatus', () => {
|
||||
it('should get rate limit status', async () => {
|
||||
const limiter = new QueueRateLimiter(mockRedisClient, mockLogger);
|
||||
|
||||
const rule: RateLimitRule = {
|
||||
level: 'queue',
|
||||
queueName: 'test-queue',
|
||||
config: { points: 100, duration: 60 },
|
||||
};
|
||||
|
||||
limiter.addRule(rule);
|
||||
|
||||
const status = await limiter.getStatus('test-queue', 'handler', 'operation');
|
||||
|
||||
expect(status.queueName).toBe('test-queue');
|
||||
expect(status.handler).toBe('handler');
|
||||
expect(status.operation).toBe('operation');
|
||||
expect(status.appliedRule).toEqual(rule);
|
||||
});
|
||||
|
||||
it('should return status without rule when none apply', async () => {
|
||||
const limiter = new QueueRateLimiter(mockRedisClient, mockLogger);
|
||||
|
||||
const status = await limiter.getStatus('test-queue', 'handler', 'operation');
|
||||
|
||||
expect(status.queueName).toBe('test-queue');
|
||||
expect(status.appliedRule).toBeUndefined();
|
||||
expect(status.limit).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
describe('reset', () => {
|
||||
it('should reset rate limits for specific operation', async () => {
|
||||
const limiter = new QueueRateLimiter(mockRedisClient, mockLogger);
|
||||
|
||||
const rule: RateLimitRule = {
|
||||
level: 'operation',
|
||||
queueName: 'test-queue',
|
||||
handler: 'test-handler',
|
||||
operation: 'test-op',
|
||||
config: { points: 10, duration: 60 },
|
||||
};
|
||||
|
||||
limiter.addRule(rule);
|
||||
|
||||
await limiter.reset('test-queue', 'test-handler', 'test-op');
|
||||
|
||||
expect(mockLogger.info).toHaveBeenCalledWith(
|
||||
'Rate limits reset',
|
||||
expect.objectContaining({
|
||||
queueName: 'test-queue',
|
||||
handler: 'test-handler',
|
||||
operation: 'test-op',
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it('should warn about broad reset', async () => {
|
||||
const limiter = new QueueRateLimiter(mockRedisClient, mockLogger);
|
||||
|
||||
await limiter.reset('test-queue');
|
||||
|
||||
expect(mockLogger.warn).toHaveBeenCalledWith(
|
||||
'Broad reset not implemented yet',
|
||||
expect.objectContaining({ queueName: 'test-queue' })
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('removeRule', () => {
|
||||
it('should remove a rule', () => {
|
||||
const limiter = new QueueRateLimiter(mockRedisClient, mockLogger);
|
||||
|
||||
const rule: RateLimitRule = {
|
||||
level: 'queue',
|
||||
queueName: 'test-queue',
|
||||
config: { points: 100, duration: 60 },
|
||||
};
|
||||
|
||||
limiter.addRule(rule);
|
||||
expect(limiter.getRules()).toHaveLength(1);
|
||||
|
||||
const removed = limiter.removeRule('queue', 'test-queue');
|
||||
expect(removed).toBe(true);
|
||||
expect(limiter.getRules()).toHaveLength(0);
|
||||
});
|
||||
|
||||
it('should return false when rule not found', () => {
|
||||
const limiter = new QueueRateLimiter(mockRedisClient, mockLogger);
|
||||
|
||||
const removed = limiter.removeRule('queue', 'non-existent');
|
||||
expect(removed).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe('getRules', () => {
|
||||
it('should return all configured rules', () => {
|
||||
const limiter = new QueueRateLimiter(mockRedisClient, mockLogger);
|
||||
|
||||
const rule1: RateLimitRule = {
|
||||
level: 'global',
|
||||
config: { points: 1000, duration: 60 },
|
||||
};
|
||||
|
||||
const rule2: RateLimitRule = {
|
||||
level: 'queue',
|
||||
queueName: 'test-queue',
|
||||
config: { points: 100, duration: 60 },
|
||||
};
|
||||
|
||||
limiter.addRule(rule1);
|
||||
limiter.addRule(rule2);
|
||||
|
||||
const rules = limiter.getRules();
|
||||
expect(rules).toHaveLength(2);
|
||||
expect(rules).toContain(rule1);
|
||||
expect(rules).toContain(rule2);
|
||||
});
|
||||
});
|
||||
|
||||
describe('error handling', () => {
|
||||
it('should allow on rate limiter error', async () => {
|
||||
const limiter = new QueueRateLimiter(mockRedisClient, mockLogger);
|
||||
|
||||
// Add a rule but don't set up the actual limiter to cause an error
|
||||
const rule: RateLimitRule = {
|
||||
level: 'queue',
|
||||
queueName: 'test-queue',
|
||||
config: { points: 100, duration: 60 },
|
||||
};
|
||||
|
||||
limiter.addRule(rule);
|
||||
|
||||
// Force the limiter map to be empty to simulate error
|
||||
(limiter as any).limiters.clear();
|
||||
|
||||
const result = await limiter.checkLimit('test-queue', 'handler', 'operation');
|
||||
|
||||
expect(result.allowed).toBe(true); // Should allow on error
|
||||
expect(mockLogger.warn).toHaveBeenCalledWith(
|
||||
'Rate limiter not found for rule',
|
||||
expect.any(Object)
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('hierarchical rule precedence', () => {
|
||||
it('should correctly apply rule hierarchy', () => {
|
||||
const limiter = new QueueRateLimiter(mockRedisClient, mockLogger);
|
||||
|
||||
// Add multiple rules at different levels
|
||||
const rules: RateLimitRule[] = [
|
||||
{
|
||||
level: 'global',
|
||||
config: { points: 10000, duration: 60 },
|
||||
},
|
||||
{
|
||||
level: 'queue',
|
||||
queueName: 'email-queue',
|
||||
config: { points: 1000, duration: 60 },
|
||||
},
|
||||
{
|
||||
level: 'handler',
|
||||
queueName: 'email-queue',
|
||||
handler: 'email-service',
|
||||
config: { points: 100, duration: 60 },
|
||||
},
|
||||
{
|
||||
level: 'operation',
|
||||
queueName: 'email-queue',
|
||||
handler: 'email-service',
|
||||
operation: 'send-bulk',
|
||||
config: { points: 10, duration: 60 },
|
||||
},
|
||||
];
|
||||
|
||||
rules.forEach(rule => limiter.addRule(rule));
|
||||
|
||||
// Test that getMostSpecificRule works correctly
|
||||
const specificRule = (limiter as any).getMostSpecificRule(
|
||||
'email-queue',
|
||||
'email-service',
|
||||
'send-bulk'
|
||||
);
|
||||
|
||||
expect(specificRule?.level).toBe('operation');
|
||||
expect(specificRule?.config.points).toBe(10);
|
||||
});
|
||||
});
|
||||
})
|
||||
57
libs/core/queue/test/service-cache.test.ts
Normal file
57
libs/core/queue/test/service-cache.test.ts
Normal file
|
|
@ -0,0 +1,57 @@
|
|||
import { describe, expect, it } from 'bun:test';
|
||||
import { normalizeServiceName, generateCachePrefix } from '../src/service-utils';
|
||||
|
||||
describe('ServiceCache Integration', () => {
|
||||
// Since ServiceCache depends on external createCache, we'll test the utility functions it uses
|
||||
|
||||
describe('generateCachePrefix usage', () => {
|
||||
it('should generate correct cache prefix for service', () => {
|
||||
const prefix = generateCachePrefix('userService');
|
||||
expect(prefix).toBe('cache:user-service');
|
||||
});
|
||||
|
||||
it('should handle global cache prefix', () => {
|
||||
// This simulates what ServiceCache does for global cache
|
||||
const globalPrefix = 'stock-bot:shared';
|
||||
expect(globalPrefix).toBe('stock-bot:shared');
|
||||
});
|
||||
});
|
||||
|
||||
describe('cache key patterns', () => {
|
||||
it('should follow correct key pattern for job results', () => {
|
||||
const jobId = 'job-123';
|
||||
const expectedKey = `job:result:${jobId}`;
|
||||
expect(expectedKey).toBe('job:result:job-123');
|
||||
});
|
||||
|
||||
it('should follow correct key pattern for operation metrics', () => {
|
||||
const handler = 'test-handler';
|
||||
const operation = 'test-op';
|
||||
const expectedKey = `metrics:${handler}:${operation}`;
|
||||
expect(expectedKey).toBe('metrics:test-handler:test-op');
|
||||
});
|
||||
|
||||
it('should follow correct key pattern for service health', () => {
|
||||
const serviceName = 'test-service';
|
||||
const expectedKey = `health:${serviceName}`;
|
||||
expect(expectedKey).toBe('health:test-service');
|
||||
});
|
||||
|
||||
it('should follow correct key pattern for queue stats', () => {
|
||||
const queueName = 'test-queue';
|
||||
const expectedKey = `queue:stats:${queueName}`;
|
||||
expect(expectedKey).toBe('queue:stats:test-queue');
|
||||
});
|
||||
});
|
||||
|
||||
describe('service name normalization', () => {
|
||||
it('should normalize service names in cache prefix', () => {
|
||||
const serviceName = 'UserService';
|
||||
const normalized = normalizeServiceName(serviceName);
|
||||
expect(normalized).toBe('user-service');
|
||||
|
||||
const prefix = generateCachePrefix(normalized);
|
||||
expect(prefix).toBe('cache:user-service');
|
||||
});
|
||||
});
|
||||
})
|
||||
120
libs/core/queue/test/service-utils.test.ts
Normal file
120
libs/core/queue/test/service-utils.test.ts
Normal file
|
|
@ -0,0 +1,120 @@
|
|||
import { describe, expect, it } from 'bun:test';
|
||||
import {
|
||||
normalizeServiceName,
|
||||
generateCachePrefix,
|
||||
getFullQueueName,
|
||||
parseQueueName,
|
||||
} from '../src/service-utils';
|
||||
|
||||
describe('Service Utils', () => {
|
||||
describe('normalizeServiceName', () => {
|
||||
it('should convert camelCase to kebab-case', () => {
|
||||
expect(normalizeServiceName('myService')).toBe('my-service');
|
||||
expect(normalizeServiceName('userAuthService')).toBe('user-auth-service');
|
||||
expect(normalizeServiceName('APIGateway')).toBe('apigateway');
|
||||
});
|
||||
|
||||
it('should handle already kebab-case names', () => {
|
||||
expect(normalizeServiceName('my-service')).toBe('my-service');
|
||||
expect(normalizeServiceName('user-auth-service')).toBe('user-auth-service');
|
||||
});
|
||||
|
||||
it('should handle single word names', () => {
|
||||
expect(normalizeServiceName('service')).toBe('service');
|
||||
expect(normalizeServiceName('API')).toBe('api');
|
||||
});
|
||||
|
||||
it('should handle empty string', () => {
|
||||
expect(normalizeServiceName('')).toBe('');
|
||||
});
|
||||
});
|
||||
|
||||
describe('getFullQueueName', () => {
|
||||
it('should generate queue name with service and handler', () => {
|
||||
const name = getFullQueueName('userService', 'processUser');
|
||||
expect(name).toBe('{user-service_processUser}');
|
||||
});
|
||||
|
||||
it('should normalize service name but keep handler as-is', () => {
|
||||
const name = getFullQueueName('APIGateway', 'handleRequest');
|
||||
expect(name).toBe('{apigateway_handleRequest}');
|
||||
});
|
||||
|
||||
it('should handle already normalized service name', () => {
|
||||
const name = getFullQueueName('user-service', 'process-user');
|
||||
expect(name).toBe('{user-service_process-user}');
|
||||
});
|
||||
});
|
||||
|
||||
describe('generateCachePrefix', () => {
|
||||
it('should generate cache prefix for service', () => {
|
||||
const prefix = generateCachePrefix('userService');
|
||||
expect(prefix).toBe('cache:user-service');
|
||||
});
|
||||
|
||||
it('should generate cache prefix for service only', () => {
|
||||
const prefix = generateCachePrefix('userService');
|
||||
expect(prefix).toBe('cache:user-service');
|
||||
});
|
||||
|
||||
it('should normalize service names', () => {
|
||||
const prefix = generateCachePrefix('APIGateway');
|
||||
expect(prefix).toBe('cache:apigateway');
|
||||
});
|
||||
});
|
||||
|
||||
describe('parseQueueName', () => {
|
||||
it('should parse valid queue name', () => {
|
||||
const result = parseQueueName('{user-service_process-user}');
|
||||
expect(result).toEqual({
|
||||
service: 'user-service',
|
||||
handler: 'process-user',
|
||||
});
|
||||
});
|
||||
|
||||
it('should return null for invalid format', () => {
|
||||
expect(parseQueueName('invalid-queue-name')).toBeNull();
|
||||
expect(parseQueueName('user-service_process-user')).toBeNull(); // Missing braces
|
||||
expect(parseQueueName('{user-service-process-user}')).toBeNull(); // Missing underscore
|
||||
expect(parseQueueName('{user-service_}')).toBeNull(); // Missing handler
|
||||
expect(parseQueueName('{_process-user}')).toBeNull(); // Missing service
|
||||
});
|
||||
|
||||
it('should handle queue names with multiple underscores', () => {
|
||||
const result = parseQueueName('{user_service_process_user}');
|
||||
expect(result).toEqual({
|
||||
service: 'user',
|
||||
handler: 'service_process_user',
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('complete service utils workflow', () => {
|
||||
it('should handle full queue name generation and parsing', () => {
|
||||
// Generate a queue name
|
||||
const serviceName = 'userService';
|
||||
const handlerName = 'processOrder';
|
||||
const queueName = getFullQueueName(serviceName, handlerName);
|
||||
|
||||
expect(queueName).toBe('{user-service_processOrder}');
|
||||
|
||||
// Parse it back
|
||||
const parsed = parseQueueName(queueName);
|
||||
expect(parsed).toEqual({
|
||||
service: 'user-service',
|
||||
handler: 'processOrder',
|
||||
});
|
||||
});
|
||||
|
||||
it('should handle cache prefix generation', () => {
|
||||
const serviceName = 'orderService';
|
||||
const cachePrefix = generateCachePrefix(serviceName);
|
||||
|
||||
expect(cachePrefix).toBe('cache:order-service');
|
||||
|
||||
// Use it for cache keys
|
||||
const cacheKey = `${cachePrefix}:user:123`;
|
||||
expect(cacheKey).toBe('cache:order-service:user:123');
|
||||
});
|
||||
});
|
||||
})
|
||||
106
libs/core/queue/test/utils.test.ts
Normal file
106
libs/core/queue/test/utils.test.ts
Normal file
|
|
@ -0,0 +1,106 @@
|
|||
import { describe, expect, it, beforeEach, afterEach } from 'bun:test';
|
||||
import { getRedisConnection } from '../src/utils';
|
||||
import type { RedisConfig } from '../src/types';
|
||||
|
||||
describe('Queue Utils', () => {
|
||||
describe('getRedisConnection', () => {
|
||||
const originalEnv = process.env;
|
||||
|
||||
beforeEach(() => {
|
||||
process.env = { ...originalEnv };
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
process.env = originalEnv;
|
||||
});
|
||||
|
||||
it('should return test connection in test environment', () => {
|
||||
process.env.NODE_ENV = 'test';
|
||||
|
||||
const config: RedisConfig = {
|
||||
host: 'production.redis.com',
|
||||
port: 6380,
|
||||
password: 'secret',
|
||||
};
|
||||
|
||||
const connection = getRedisConnection(config);
|
||||
|
||||
expect(connection.host).toBe('localhost');
|
||||
expect(connection.port).toBe(6379);
|
||||
expect(connection.password).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should return test connection when BUNIT is set', () => {
|
||||
process.env.BUNIT = '1';
|
||||
|
||||
const config: RedisConfig = {
|
||||
host: 'production.redis.com',
|
||||
port: 6380,
|
||||
};
|
||||
|
||||
const connection = getRedisConnection(config);
|
||||
|
||||
expect(connection.host).toBe('localhost');
|
||||
expect(connection.port).toBe(6379);
|
||||
});
|
||||
|
||||
it('should return actual config in non-test environment', () => {
|
||||
process.env.NODE_ENV = 'production';
|
||||
delete process.env.BUNIT;
|
||||
|
||||
const config: RedisConfig = {
|
||||
host: 'production.redis.com',
|
||||
port: 6380,
|
||||
password: 'secret',
|
||||
db: 1,
|
||||
username: 'user',
|
||||
};
|
||||
|
||||
const connection = getRedisConnection(config);
|
||||
|
||||
expect(connection).toEqual(config);
|
||||
expect(connection.host).toBe('production.redis.com');
|
||||
expect(connection.port).toBe(6380);
|
||||
expect(connection.password).toBe('secret');
|
||||
expect(connection.db).toBe(1);
|
||||
expect(connection.username).toBe('user');
|
||||
});
|
||||
|
||||
it('should handle minimal config', () => {
|
||||
process.env.NODE_ENV = 'development';
|
||||
|
||||
const config: RedisConfig = {
|
||||
host: 'localhost',
|
||||
port: 6379,
|
||||
};
|
||||
|
||||
const connection = getRedisConnection(config);
|
||||
|
||||
expect(connection.host).toBe('localhost');
|
||||
expect(connection.port).toBe(6379);
|
||||
expect(connection.password).toBeUndefined();
|
||||
expect(connection.db).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should preserve all config properties in non-test mode', () => {
|
||||
delete process.env.NODE_ENV;
|
||||
delete process.env.BUNIT;
|
||||
|
||||
const config: RedisConfig = {
|
||||
host: 'redis.example.com',
|
||||
port: 6379,
|
||||
password: 'pass123',
|
||||
db: 2,
|
||||
username: 'admin',
|
||||
// Additional properties that might be passed
|
||||
maxRetriesPerRequest: 3,
|
||||
enableReadyCheck: true,
|
||||
enableOfflineQueue: false,
|
||||
} as RedisConfig & Record<string, any>;
|
||||
|
||||
const connection = getRedisConnection(config);
|
||||
|
||||
expect(connection).toEqual(config);
|
||||
});
|
||||
});
|
||||
})
|
||||
Loading…
Add table
Add a link
Reference in a new issue