removed singletop pattern from queue manager

This commit is contained in:
Boki 2025-06-22 19:16:25 -04:00
parent eeb5d1aca2
commit db3aa9c330
12 changed files with 504 additions and 380 deletions

View file

@ -11,9 +11,9 @@ const logger = getLogger('batch-processor');
export async function processItems<T>(
items: T[],
queueName: string,
options: ProcessOptions
options: ProcessOptions,
queueManager: QueueManager
): Promise<BatchResult> {
const queueManager = QueueManager.getInstance();
queueManager.getQueue(queueName);
const startTime = Date.now();
@ -35,8 +35,8 @@ export async function processItems<T>(
try {
const result = options.useBatching
? await processBatched(items, queueName, options)
: await processDirect(items, queueName, options);
? await processBatched(items, queueName, options, queueManager)
: await processDirect(items, queueName, options, queueManager);
const duration = Date.now() - startTime;
@ -58,9 +58,9 @@ export async function processItems<T>(
async function processDirect<T>(
items: T[],
queueName: string,
options: ProcessOptions
options: ProcessOptions,
queueManager: QueueManager
): Promise<Omit<BatchResult, 'duration'>> {
const queueManager = QueueManager.getInstance();
queueManager.getQueue(queueName);
const totalDelayMs = options.totalDelayHours * 60 * 60 * 1000; // Convert hours to milliseconds
const delayPerItem = totalDelayMs / items.length;
@ -87,7 +87,7 @@ async function processDirect<T>(
},
}));
const createdJobs = await addJobsInChunks(queueName, jobs);
const createdJobs = await addJobsInChunks(queueName, jobs, queueManager);
return {
totalItems: items.length,
@ -102,9 +102,9 @@ async function processDirect<T>(
async function processBatched<T>(
items: T[],
queueName: string,
options: ProcessOptions
options: ProcessOptions,
queueManager: QueueManager
): Promise<Omit<BatchResult, 'duration'>> {
const queueManager = QueueManager.getInstance();
queueManager.getQueue(queueName);
const batchSize = options.batchSize || 100;
const batches = createBatches(items, batchSize);
@ -121,7 +121,7 @@ async function processBatched<T>(
const batchJobs = await Promise.all(
batches.map(async (batch, batchIndex) => {
// Just store the items directly - no processing needed
const payloadKey = await storeItems(batch, queueName, options);
const payloadKey = await storeItems(batch, queueName, options, queueManager);
return {
name: 'process-batch',
@ -148,7 +148,7 @@ async function processBatched<T>(
})
);
const createdJobs = await addJobsInChunks(queueName, batchJobs);
const createdJobs = await addJobsInChunks(queueName, batchJobs, queueManager);
return {
totalItems: items.length,
@ -161,8 +161,7 @@ async function processBatched<T>(
/**
* Process a batch job - loads items and creates individual jobs
*/
export async function processBatchJob(jobData: BatchJobData, queueName: string): Promise<unknown> {
const queueManager = QueueManager.getInstance();
export async function processBatchJob(jobData: BatchJobData, queueName: string, queueManager: QueueManager): Promise<unknown> {
queueManager.getQueue(queueName);
const { payloadKey, batchIndex, totalBatches, itemCount, totalDelayHours } = jobData;
@ -174,7 +173,7 @@ export async function processBatchJob(jobData: BatchJobData, queueName: string):
});
try {
const payload = await loadPayload(payloadKey, queueName);
const payload = await loadPayload(payloadKey, queueName, queueManager);
if (!payload || !payload.items || !payload.options) {
logger.error('Invalid payload data', { payloadKey, payload });
throw new Error(`Invalid payload data for key: ${payloadKey}`);
@ -210,10 +209,10 @@ export async function processBatchJob(jobData: BatchJobData, queueName: string):
},
}));
const createdJobs = await addJobsInChunks(queueName, jobs);
const createdJobs = await addJobsInChunks(queueName, jobs, queueManager);
// Cleanup payload after successful processing
await cleanupPayload(payloadKey, queueName);
await cleanupPayload(payloadKey, queueName, queueManager);
return {
batchIndex,
@ -239,9 +238,9 @@ function createBatches<T>(items: T[], batchSize: number): T[][] {
async function storeItems<T>(
items: T[],
queueName: string,
options: ProcessOptions
options: ProcessOptions,
queueManager: QueueManager
): Promise<string> {
const queueManager = QueueManager.getInstance();
const cache = queueManager.getCache(queueName);
const payloadKey = `payload:${Date.now()}:${Math.random().toString(36).substr(2, 9)}`;
@ -265,7 +264,8 @@ async function storeItems<T>(
async function loadPayload<T>(
key: string,
queueName: string
queueName: string,
queueManager: QueueManager
): Promise<{
items: T[];
options: {
@ -276,7 +276,6 @@ async function loadPayload<T>(
operation: string;
};
} | null> {
const queueManager = QueueManager.getInstance();
const cache = queueManager.getCache(queueName);
return (await cache.get(key)) as {
items: T[];
@ -290,8 +289,7 @@ async function loadPayload<T>(
} | null;
}
async function cleanupPayload(key: string, queueName: string): Promise<void> {
const queueManager = QueueManager.getInstance();
async function cleanupPayload(key: string, queueName: string, queueManager: QueueManager): Promise<void> {
const cache = queueManager.getCache(queueName);
await cache.del(key);
}
@ -299,9 +297,9 @@ async function cleanupPayload(key: string, queueName: string): Promise<void> {
async function addJobsInChunks(
queueName: string,
jobs: Array<{ name: string; data: JobData; opts?: Record<string, unknown> }>,
queueManager: QueueManager,
chunkSize = 100
): Promise<unknown[]> {
const queueManager = QueueManager.getInstance();
const queue = queueManager.getQueue(queueName);
const allCreatedJobs = [];