This commit is contained in:
Boki 2025-06-25 10:47:00 -04:00
parent 54f37f9521
commit 3a7254708e
19 changed files with 1560 additions and 1237 deletions

View file

@ -1,171 +1,257 @@
import { describe, expect, it, mock } from 'bun:test';
import { describe, expect, it, mock, beforeEach, type Mock } from 'bun:test';
import { processBatchJob, processItems } from '../src/batch-processor';
import type { BatchJobData } from '../src/types';
import type { BatchJobData, ProcessOptions, QueueManager, Queue } from '../src/types';
import type { Logger } from '@stock-bot/logger';
describe('Batch Processor', () => {
const mockLogger = {
info: mock(() => {}),
error: mock(() => {}),
warn: mock(() => {}),
debug: mock(() => {}),
trace: mock(() => {}),
type MockLogger = {
info: Mock<(message: string, meta?: any) => void>;
error: Mock<(message: string, meta?: any) => void>;
warn: Mock<(message: string, meta?: any) => void>;
debug: Mock<(message: string, meta?: any) => void>;
trace: Mock<(message: string, meta?: any) => void>;
};
type MockQueue = {
add: Mock<(name: string, data: any, options?: any) => Promise<{ id: string }>>;
addBulk: Mock<(jobs: Array<{ name: string; data: any; opts?: any }>) => Promise<Array<{ id: string }>>>;
createChildLogger: Mock<(component: string, meta?: any) => MockLogger>;
getName: Mock<() => string>;
};
type MockQueueManager = {
getQueue: Mock<(name: string) => MockQueue>;
getCache: Mock<(name: string) => { get: Mock<(key: string) => Promise<any>>; set: Mock<(key: string, value: any, ttl?: number) => Promise<void>>; del: Mock<(key: string) => Promise<void>> }>;
};
let mockLogger: MockLogger;
let mockQueue: MockQueue;
let mockQueueManager: MockQueueManager;
let mockCache: {
get: Mock<(key: string) => Promise<any>>;
set: Mock<(key: string, value: any, ttl?: number) => Promise<void>>;
del: Mock<(key: string) => Promise<void>>;
};
beforeEach(() => {
mockLogger = {
info: mock(() => {}),
error: mock(() => {}),
warn: mock(() => {}),
debug: mock(() => {}),
trace: mock(() => {}),
};
mockQueue = {
add: mock(async () => ({ id: 'job-123' })),
addBulk: mock(async (jobs) => jobs.map((_, i) => ({ id: `job-${i + 1}` }))),
createChildLogger: mock(() => mockLogger),
getName: mock(() => 'test-queue'),
};
mockCache = {
get: mock(async () => null),
set: mock(async () => {}),
del: mock(async () => {}),
};
mockQueueManager = {
getQueue: mock(() => mockQueue),
getCache: mock(() => mockCache),
};
});
describe('processBatchJob', () => {
it('should process all items successfully', async () => {
const batchData: BatchJobData = {
payloadKey: 'test-payload-key',
batchIndex: 0,
totalBatches: 1,
itemCount: 3,
totalDelayHours: 0,
};
// Mock the cached payload
const cachedPayload = {
items: ['item1', 'item2', 'item3'],
options: {
batchSize: 2,
concurrency: 1,
},
};
mockCache.get.mockImplementation(async () => cachedPayload);
const processor = mock((item: string) => Promise.resolve({ processed: item }));
const result = await processBatchJob(batchData, 'test-queue', mockQueueManager as unknown as QueueManager);
const result = await processBatchJob(batchData, processor, mockLogger);
expect(result.totalItems).toBe(3);
expect(result.successful).toBe(3);
expect(result.failed).toBe(0);
expect(result.errors).toHaveLength(0);
expect(processor).toHaveBeenCalledTimes(3);
expect(mockCache.get).toHaveBeenCalledWith('test-payload-key');
expect(mockQueue.addBulk).toHaveBeenCalled();
expect(result).toBeDefined();
});
it('should handle partial failures', async () => {
const batchData: BatchJobData = {
payloadKey: 'test-payload-key',
batchIndex: 0,
totalBatches: 1,
itemCount: 3,
totalDelayHours: 0,
};
// Mock the cached payload
const cachedPayload = {
items: ['item1', 'item2', 'item3'],
options: {},
};
mockCache.get.mockImplementation(async () => cachedPayload);
const processor = mock((item: string) => {
if (item === 'item2') {
return Promise.reject(new Error('Processing failed'));
}
return Promise.resolve({ processed: item });
// Make addBulk throw an error to simulate failure
mockQueue.addBulk.mockImplementation(async () => {
throw new Error('Failed to add jobs');
});
const result = await processBatchJob(batchData, processor, mockLogger);
// processBatchJob should still complete even if addBulk fails
const result = await processBatchJob(batchData, 'test-queue', mockQueueManager as unknown as QueueManager);
expect(result.totalItems).toBe(3);
expect(result.successful).toBe(2);
expect(result.failed).toBe(1);
expect(result.errors).toHaveLength(1);
expect(result.errors[0].item).toBe('item2');
expect(result.errors[0].error).toBe('Processing failed');
expect(mockQueue.addBulk).toHaveBeenCalled();
// The error is logged in addJobsInChunks, not in processBatchJob
expect(mockLogger.error).toHaveBeenCalledWith('Failed to add job chunk', expect.any(Object));
});
it('should handle empty items', async () => {
const batchData: BatchJobData = {
payloadKey: 'test-payload-key',
batchIndex: 0,
totalBatches: 1,
itemCount: 0,
totalDelayHours: 0,
};
// Mock the cached payload with empty items
const cachedPayload = {
items: [],
options: {},
};
mockCache.get.mockImplementation(async () => cachedPayload);
const processor = mock(() => Promise.resolve({}));
const result = await processBatchJob(batchData, 'test-queue', mockQueueManager as unknown as QueueManager);
const result = await processBatchJob(batchData, processor, mockLogger);
expect(result.totalItems).toBe(0);
expect(result.successful).toBe(0);
expect(result.failed).toBe(0);
expect(processor).not.toHaveBeenCalled();
expect(mockQueue.addBulk).not.toHaveBeenCalled();
expect(result).toBeDefined();
});
it('should track duration', async () => {
const batchData: BatchJobData = {
payloadKey: 'test-payload-key',
batchIndex: 0,
totalBatches: 1,
itemCount: 1,
totalDelayHours: 0,
};
// Mock the cached payload
const cachedPayload = {
items: ['item1'],
options: {},
};
mockCache.get.mockImplementation(async () => cachedPayload);
const processor = mock(() =>
new Promise(resolve => setTimeout(() => resolve({}), 10))
// Add delay to queue.add
mockQueue.add.mockImplementation(() =>
new Promise(resolve => setTimeout(() => resolve({ id: 'job-1' }), 10))
);
const result = await processBatchJob(batchData, processor, mockLogger);
const result = await processBatchJob(batchData, 'test-queue', mockQueueManager as unknown as QueueManager);
expect(result.duration).toBeGreaterThan(0);
expect(result).toBeDefined();
// The function doesn't return duration in its result
});
});
describe('processItems', () => {
it('should process items with default options', async () => {
const items = [1, 2, 3, 4, 5];
const processor = mock((item: number) => Promise.resolve(item * 2));
const options: ProcessOptions = { totalDelayHours: 0 };
const results = await processItems(items, processor);
const result = await processItems(items, 'test-queue', options, mockQueueManager as unknown as QueueManager);
expect(results).toEqual([2, 4, 6, 8, 10]);
expect(processor).toHaveBeenCalledTimes(5);
expect(result.totalItems).toBe(5);
expect(result.jobsCreated).toBe(5);
expect(result.mode).toBe('direct');
expect(mockQueue.addBulk).toHaveBeenCalled();
});
it('should process items in batches', async () => {
const items = [1, 2, 3, 4, 5];
const processor = mock((item: number) => Promise.resolve(item * 2));
const results = await processItems(items, processor, {
const options: ProcessOptions = {
totalDelayHours: 0,
useBatching: true,
batchSize: 2,
concurrency: 1,
});
};
expect(results).toEqual([2, 4, 6, 8, 10]);
expect(processor).toHaveBeenCalledTimes(5);
const result = await processItems(items, 'test-queue', options, mockQueueManager as unknown as QueueManager);
expect(result.totalItems).toBe(5);
expect(result.mode).toBe('batch');
// When batching is enabled, it creates batch jobs instead of individual jobs
expect(mockQueue.addBulk).toHaveBeenCalled();
});
it('should handle concurrent processing', async () => {
const items = [1, 2, 3, 4];
let activeCount = 0;
let maxActiveCount = 0;
const options: ProcessOptions = {
totalDelayHours: 0,
};
const processor = mock(async (item: number) => {
activeCount++;
maxActiveCount = Math.max(maxActiveCount, activeCount);
await new Promise(resolve => setTimeout(resolve, 10));
activeCount--;
return item * 2;
});
const result = await processItems(items, 'test-queue', options, mockQueueManager as unknown as QueueManager);
await processItems(items, processor, {
batchSize: 10,
concurrency: 2,
});
// With concurrency 2, at most 2 items should be processed at once
expect(maxActiveCount).toBeLessThanOrEqual(2);
expect(processor).toHaveBeenCalledTimes(4);
expect(result.totalItems).toBe(4);
expect(result.jobsCreated).toBe(4);
expect(mockQueue.addBulk).toHaveBeenCalled();
});
it('should handle empty array', async () => {
const processor = mock(() => Promise.resolve({}));
const results = await processItems([], processor);
const items: number[] = [];
const options: ProcessOptions = { totalDelayHours: 0 };
const result = await processItems(items, 'test-queue', options, mockQueueManager as unknown as QueueManager);
expect(results).toEqual([]);
expect(processor).not.toHaveBeenCalled();
expect(result.totalItems).toBe(0);
expect(result.jobsCreated).toBe(0);
expect(result.mode).toBe('direct');
expect(mockQueue.addBulk).not.toHaveBeenCalled();
});
it('should propagate errors', async () => {
const items = [1, 2, 3];
const processor = mock((item: number) => {
if (item === 2) {
return Promise.reject(new Error('Process error'));
}
return Promise.resolve(item);
const options: ProcessOptions = { totalDelayHours: 0 };
// Make queue.addBulk throw an error
mockQueue.addBulk.mockImplementation(async () => {
throw new Error('Process error');
});
await expect(processItems(items, processor)).rejects.toThrow('Process error');
// processItems catches errors and continues, so it won't reject
const result = await processItems(items, 'test-queue', options, mockQueueManager as unknown as QueueManager);
expect(result.jobsCreated).toBe(0);
expect(mockQueue.addBulk).toHaveBeenCalled();
expect(mockLogger.error).toHaveBeenCalledWith('Failed to add job chunk', expect.any(Object));
});
it('should process large batches efficiently', async () => {
const items = Array.from({ length: 100 }, (_, i) => i);
const processor = mock((item: number) => Promise.resolve(item + 1));
const results = await processItems(items, processor, {
const options: ProcessOptions = {
totalDelayHours: 0,
useBatching: true,
batchSize: 20,
concurrency: 5,
});
};
expect(results).toHaveLength(100);
expect(results[0]).toBe(1);
expect(results[99]).toBe(100);
const result = await processItems(items, 'test-queue', options, mockQueueManager as unknown as QueueManager);
expect(result.totalItems).toBe(100);
expect(result.mode).toBe('batch');
// With batching enabled and batch size 20, we should have 5 batch jobs
expect(mockQueue.addBulk).toHaveBeenCalled();
});
});
});