278 lines
7.7 KiB
TypeScript
278 lines
7.7 KiB
TypeScript
import type { Job, Queue } from 'bullmq';
|
|
import { beforeEach, describe, expect, it, mock } from 'bun:test';
|
|
import { DeadLetterQueueHandler } from '../src/dlq-handler';
|
|
import type { RedisConfig } from '../src/types';
|
|
|
|
describe('DeadLetterQueueHandler', () => {
|
|
// Mock the DLQ Queue that will be created
|
|
const mockDLQQueue = {
|
|
name: 'test-queue-dlq',
|
|
add: mock(() => Promise.resolve({})),
|
|
getCompleted: mock(() => Promise.resolve([])),
|
|
getFailed: mock(() => Promise.resolve([])),
|
|
getWaiting: mock(() => Promise.resolve([])),
|
|
close: mock(() => Promise.resolve()),
|
|
} as unknown as Queue;
|
|
const mockLogger = {
|
|
info: mock(() => {}),
|
|
error: mock(() => {}),
|
|
warn: mock(() => {}),
|
|
debug: mock(() => {}),
|
|
trace: mock(() => {}),
|
|
};
|
|
|
|
const mockQueue = {
|
|
name: 'test-queue',
|
|
add: mock(() => Promise.resolve({})),
|
|
getCompleted: mock(() => Promise.resolve([])),
|
|
getFailed: mock(() => Promise.resolve([])),
|
|
getWaiting: mock(() => Promise.resolve([])),
|
|
} as unknown as Queue;
|
|
|
|
const mockRedisConfig: RedisConfig = {
|
|
host: 'localhost',
|
|
port: 6379,
|
|
};
|
|
|
|
let dlqHandler: DeadLetterQueueHandler;
|
|
|
|
beforeEach(() => {
|
|
// Reset DLQ queue mocks
|
|
mockDLQQueue.add = mock(() => Promise.resolve({}));
|
|
mockDLQQueue.getCompleted = mock(() => Promise.resolve([]));
|
|
mockDLQQueue.getFailed = mock(() => Promise.resolve([]));
|
|
mockDLQQueue.getWaiting = mock(() => Promise.resolve([]));
|
|
mockDLQQueue.close = mock(() => Promise.resolve());
|
|
|
|
// Create handler with mocked DLQ queue
|
|
dlqHandler = new DeadLetterQueueHandler(mockQueue, mockRedisConfig, {}, mockLogger);
|
|
// Override the dlq property to use our mock
|
|
(dlqHandler as any).dlq = mockDLQQueue;
|
|
|
|
// Reset mocks
|
|
mockLogger.info = mock(() => {});
|
|
mockLogger.error = mock(() => {});
|
|
mockLogger.warn = mock(() => {});
|
|
});
|
|
|
|
describe('handleFailedJob', () => {
|
|
it('should log failed job details', async () => {
|
|
const mockJob = {
|
|
id: 'job-123',
|
|
name: 'test-job',
|
|
queueName: 'test-queue',
|
|
data: {
|
|
handler: 'testHandler',
|
|
operation: 'testOp',
|
|
payload: { test: true },
|
|
},
|
|
attemptsMade: 3,
|
|
opts: { attempts: 3 },
|
|
failedReason: 'Test error',
|
|
finishedOn: Date.now(),
|
|
processedOn: Date.now() - 5000,
|
|
timestamp: Date.now() - 10000,
|
|
} as Job;
|
|
|
|
const error = new Error('Job processing failed');
|
|
|
|
await dlqHandler.handleFailedJob(mockJob, error);
|
|
|
|
expect(mockLogger.error).toHaveBeenCalledWith(
|
|
'Job moved to DLQ',
|
|
expect.objectContaining({
|
|
jobId: 'job-123',
|
|
jobName: 'test-job',
|
|
error: 'Job processing failed',
|
|
})
|
|
);
|
|
});
|
|
|
|
it('should handle jobs without data gracefully', async () => {
|
|
const mockJob = {
|
|
id: 'job-123',
|
|
name: 'test-job',
|
|
queueName: 'test-queue',
|
|
data: null,
|
|
attemptsMade: 3,
|
|
opts: { attempts: 3 },
|
|
timestamp: Date.now() - 10000,
|
|
processedOn: Date.now() - 5000,
|
|
} as any;
|
|
|
|
const error = new Error('No data');
|
|
|
|
await dlqHandler.handleFailedJob(mockJob, error);
|
|
|
|
expect(mockLogger.error).toHaveBeenCalled();
|
|
});
|
|
|
|
it('should check alert threshold', async () => {
|
|
const mockJob = {
|
|
id: 'job-123',
|
|
name: 'critical-job',
|
|
queueName: 'critical-queue',
|
|
data: { handler: 'critical', operation: 'process' },
|
|
attemptsMade: 3,
|
|
opts: { attempts: 3 },
|
|
} as Job;
|
|
|
|
const error = new Error('Critical failure');
|
|
|
|
await dlqHandler.handleFailedJob(mockJob, error);
|
|
|
|
expect(mockLogger.warn).toHaveBeenCalled();
|
|
});
|
|
});
|
|
|
|
describe('getStats', () => {
|
|
it('should return DLQ statistics', async () => {
|
|
const mockJobs = [
|
|
{
|
|
id: 'job-1',
|
|
data: { originalJob: { name: 'process' } },
|
|
timestamp: Date.now(),
|
|
},
|
|
{
|
|
id: 'job-2',
|
|
data: { originalJob: { name: 'process' } },
|
|
timestamp: Date.now() - 100000,
|
|
},
|
|
{
|
|
id: 'job-3',
|
|
data: { originalJob: { name: 'validate' } },
|
|
timestamp: Date.now() - 200000,
|
|
},
|
|
];
|
|
|
|
mockDLQQueue.getCompleted = mock(() => Promise.resolve(mockJobs));
|
|
mockDLQQueue.getFailed = mock(() => Promise.resolve([]));
|
|
mockDLQQueue.getWaiting = mock(() => Promise.resolve([]));
|
|
|
|
const stats = await dlqHandler.getStats();
|
|
|
|
expect(stats.total).toBe(3);
|
|
expect(stats.byJobName['process']).toBe(2);
|
|
expect(stats.byJobName['validate']).toBe(1);
|
|
expect(stats.oldestJob).toBeDefined();
|
|
});
|
|
|
|
it('should handle empty DLQ', async () => {
|
|
const stats = await dlqHandler.getStats();
|
|
|
|
expect(stats.total).toBe(0);
|
|
expect(stats.byJobName).toEqual({});
|
|
expect(stats.oldestJob).toBeNull();
|
|
});
|
|
});
|
|
|
|
describe('retryDLQJobs', () => {
|
|
it('should retry jobs from DLQ', async () => {
|
|
const mockDLQJobs = [
|
|
{
|
|
id: 'dlq-1',
|
|
data: {
|
|
originalJob: {
|
|
id: 'orig-1',
|
|
name: 'retry-job',
|
|
data: { test: true },
|
|
opts: {},
|
|
},
|
|
},
|
|
remove: mock(() => Promise.resolve()),
|
|
},
|
|
];
|
|
|
|
(dlqHandler as any).dlq = {
|
|
getCompleted: mock(() => Promise.resolve(mockDLQJobs)),
|
|
};
|
|
|
|
const retriedCount = await dlqHandler.retryDLQJobs(1);
|
|
|
|
expect(retriedCount).toBe(1);
|
|
expect(mockQueue.add).toHaveBeenCalledWith(
|
|
'retry-job',
|
|
{ test: true },
|
|
expect.objectContaining({ delay: expect.any(Number) })
|
|
);
|
|
expect(mockDLQJobs[0].remove).toHaveBeenCalled();
|
|
});
|
|
});
|
|
|
|
describe('cleanup', () => {
|
|
it('should clean up old DLQ entries', async () => {
|
|
const oldJob = {
|
|
id: 'old-1',
|
|
timestamp: Date.now() - 8 * 24 * 60 * 60 * 1000, // 8 days old
|
|
remove: mock(() => Promise.resolve()),
|
|
};
|
|
|
|
const newJob = {
|
|
id: 'new-1',
|
|
timestamp: Date.now() - 1 * 24 * 60 * 60 * 1000, // 1 day old
|
|
remove: mock(() => Promise.resolve()),
|
|
};
|
|
|
|
(dlqHandler as any).dlq = {
|
|
getCompleted: mock(() => Promise.resolve([oldJob, newJob])),
|
|
};
|
|
|
|
const removedCount = await dlqHandler.cleanup();
|
|
|
|
expect(removedCount).toBe(1);
|
|
expect(oldJob.remove).toHaveBeenCalled();
|
|
expect(newJob.remove).not.toHaveBeenCalled();
|
|
});
|
|
});
|
|
|
|
describe('inspectFailedJobs', () => {
|
|
it('should return formatted failed jobs', async () => {
|
|
const mockDLQJobs = [
|
|
{
|
|
data: {
|
|
originalJob: {
|
|
id: 'orig-1',
|
|
name: 'test-job',
|
|
data: { test: true },
|
|
attemptsMade: 3,
|
|
},
|
|
error: {
|
|
message: 'Test error',
|
|
stack: 'Error stack',
|
|
},
|
|
movedToDLQAt: '2024-01-01T00:00:00Z',
|
|
},
|
|
},
|
|
];
|
|
|
|
(dlqHandler as any).dlq = {
|
|
getCompleted: mock(() => Promise.resolve(mockDLQJobs)),
|
|
};
|
|
|
|
const inspected = await dlqHandler.inspectFailedJobs(10);
|
|
|
|
expect(inspected).toHaveLength(1);
|
|
expect(inspected[0]).toEqual({
|
|
id: 'orig-1',
|
|
name: 'test-job',
|
|
data: { test: true },
|
|
error: { message: 'Test error', stack: 'Error stack' },
|
|
failedAt: '2024-01-01T00:00:00Z',
|
|
attempts: 3,
|
|
});
|
|
});
|
|
});
|
|
|
|
describe('shutdown', () => {
|
|
it('should close DLQ and clear state', async () => {
|
|
const mockClose = mock(() => Promise.resolve());
|
|
(dlqHandler as any).dlq = {
|
|
close: mockClose,
|
|
};
|
|
|
|
await dlqHandler.shutdown();
|
|
|
|
expect(mockClose).toHaveBeenCalled();
|
|
});
|
|
});
|
|
});
|