327 lines
9.4 KiB
TypeScript
327 lines
9.4 KiB
TypeScript
import { getLogger } from '@stock-bot/logger';
|
|
|
|
export interface BatchConfig<T> {
|
|
items: T[];
|
|
batchSize: number;
|
|
totalDelayMs: number;
|
|
jobNamePrefix: string;
|
|
operation: string;
|
|
service: string;
|
|
provider: string;
|
|
priority?: number;
|
|
createJobData: (item: T, index: number) => any; // Simplified - no batchInfo parameter
|
|
removeOnComplete?: number;
|
|
removeOnFail?: number;
|
|
}
|
|
|
|
export interface BatchInfo {
|
|
batchIndex: number;
|
|
itemIndex: number; // Changed to match proxy provider
|
|
total: number; // Changed to match proxy provider
|
|
totalItems: number;
|
|
}
|
|
|
|
export interface BatchResult {
|
|
totalItems: number;
|
|
batchJobsCreated: number;
|
|
totalBatches: number;
|
|
avgItemsPerBatch: number;
|
|
estimatedDurationHours: number;
|
|
}
|
|
|
|
const logger = getLogger('batch-processor');
|
|
|
|
export class BatchProcessor {
|
|
constructor(private queueManager: any) {}
|
|
|
|
/**
|
|
* Create batch jobs that will later create individual item jobs
|
|
*/
|
|
async createBatchJobs<T>(config: BatchConfig<T>): Promise<BatchResult> {
|
|
const {
|
|
items,
|
|
batchSize,
|
|
totalDelayMs,
|
|
jobNamePrefix,
|
|
operation,
|
|
service,
|
|
provider,
|
|
priority = 3
|
|
} = config;
|
|
|
|
if (items.length === 0) {
|
|
return {
|
|
totalItems: 0,
|
|
batchJobsCreated: 0,
|
|
totalBatches: 0,
|
|
avgItemsPerBatch: 0,
|
|
estimatedDurationHours: 0
|
|
};
|
|
}
|
|
|
|
const totalBatches = Math.ceil(items.length / batchSize);
|
|
const delayPerBatch = Math.floor(totalDelayMs / totalBatches);
|
|
|
|
logger.info('Creating batch jobs', {
|
|
totalItems: items.length,
|
|
batchSize,
|
|
totalBatches,
|
|
delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes`,
|
|
estimatedDuration: `${(totalDelayMs / 1000 / 60 / 60).toFixed(1)} hours`,
|
|
jobPrefix: jobNamePrefix
|
|
});
|
|
|
|
const batchCreationChunkSize = 50;
|
|
let batchJobsCreated = 0;
|
|
|
|
for (let chunkStart = 0; chunkStart < totalBatches; chunkStart += batchCreationChunkSize) {
|
|
const chunkEnd = Math.min(chunkStart + batchCreationChunkSize, totalBatches);
|
|
const batchPromises = [];
|
|
|
|
for (let i = chunkStart; i < chunkEnd; i++) {
|
|
const startIndex = i * batchSize;
|
|
const endIndex = Math.min(startIndex + batchSize, items.length);
|
|
const batchItems = items.slice(startIndex, endIndex);
|
|
const delay = i * delayPerBatch;
|
|
const batchPromise = this.queueManager.addJob({
|
|
type: `${jobNamePrefix}-batch-processing`,
|
|
service,
|
|
provider,
|
|
operation: `process-${jobNamePrefix}-batch`,
|
|
payload: {
|
|
items: batchItems,
|
|
batchIndex: i,
|
|
totalBatch: totalBatches, // Changed to match your property name
|
|
batchSize,
|
|
config: {
|
|
jobNamePrefix,
|
|
operation,
|
|
service,
|
|
provider,
|
|
priority: priority - 1, // Individual jobs get slightly lower priority
|
|
removeOnComplete: config.removeOnComplete || 5,
|
|
removeOnFail: config.removeOnFail || 5
|
|
}
|
|
},
|
|
priority
|
|
}, {
|
|
delay: delay,
|
|
jobId: `${jobNamePrefix}-batch-${i}-${Date.now()}`
|
|
});
|
|
|
|
batchPromises.push(batchPromise);
|
|
}
|
|
|
|
const results = await Promise.allSettled(batchPromises);
|
|
const successful = results.filter(r => r.status === 'fulfilled').length;
|
|
const failed = results.filter(r => r.status === 'rejected').length;
|
|
|
|
batchJobsCreated += successful;
|
|
|
|
logger.info('Batch chunk created', {
|
|
chunkStart: chunkStart + 1,
|
|
chunkEnd,
|
|
successful,
|
|
failed,
|
|
totalCreated: batchJobsCreated,
|
|
progress: `${((chunkEnd / totalBatches) * 100).toFixed(1)}%`
|
|
});
|
|
|
|
if (chunkEnd < totalBatches) {
|
|
await new Promise(resolve => setTimeout(resolve, 100));
|
|
}
|
|
}
|
|
|
|
const result = {
|
|
totalItems: items.length,
|
|
batchJobsCreated,
|
|
totalBatches,
|
|
avgItemsPerBatch: Math.floor(items.length / totalBatches),
|
|
estimatedDurationHours: totalDelayMs / 1000 / 60 / 60
|
|
};
|
|
|
|
logger.info('Batch jobs creation completed', result);
|
|
return result;
|
|
}
|
|
/**
|
|
* Process a batch by creating individual item jobs
|
|
*/
|
|
async processBatch<T>(payload: {
|
|
items: T[];
|
|
batchIndex: number;
|
|
totalBatch: number; // Changed to match common property name
|
|
batchSize: number;
|
|
config: {
|
|
jobNamePrefix: string;
|
|
operation: string;
|
|
service: string;
|
|
provider: string;
|
|
priority: number;
|
|
removeOnComplete: number;
|
|
removeOnFail: number;
|
|
};
|
|
}, createJobData: (item: T, index: number) => any): Promise<{
|
|
batchIndex: number;
|
|
totalItems: number;
|
|
jobsCreated: number;
|
|
jobsFailed: number;
|
|
}> {
|
|
const { items, batchIndex, totalBatch, config } = payload;
|
|
|
|
logger.info('Processing batch', {
|
|
batchIndex,
|
|
batchSize: items.length,
|
|
totalBatch,
|
|
progress: `${((batchIndex + 1) / totalBatch * 100).toFixed(2)}%`
|
|
});
|
|
|
|
// Spread items over a reasonable time period
|
|
const batchDelayMs = 15 * 60 * 1000; // 15 minutes per batch
|
|
const delayPerItem = Math.floor(batchDelayMs / items.length);
|
|
|
|
const jobsToCreate = items.map((item, i) => {
|
|
// Get user data first
|
|
const userData = createJobData(item, i);
|
|
|
|
// Automatically merge with batch info using generic property names
|
|
const finalPayload = {
|
|
...userData,
|
|
batchIndex,
|
|
itemIndexInBatch: i, // Generic property name
|
|
totalBatch, // Generic property name
|
|
source: userData.source || 'batch-processing'
|
|
};
|
|
|
|
return {
|
|
name: `${config.jobNamePrefix}-processing`,
|
|
data: {
|
|
type: `${config.jobNamePrefix}-processing`,
|
|
service: config.service,
|
|
provider: config.provider,
|
|
operation: config.operation,
|
|
payload: finalPayload,
|
|
priority: config.priority
|
|
},
|
|
opts: {
|
|
delay: i * delayPerItem,
|
|
jobId: `${config.jobNamePrefix}-${batchIndex}-${i}-${Date.now()}`,
|
|
removeOnComplete: config.removeOnComplete,
|
|
removeOnFail: config.removeOnFail
|
|
}
|
|
};
|
|
});
|
|
|
|
try {
|
|
const jobs = await this.queueManager.queue.addBulk(jobsToCreate);
|
|
|
|
logger.info('Batch processing completed', {
|
|
batchIndex,
|
|
totalItems: items.length,
|
|
jobsCreated: jobs.length,
|
|
batchDelay: '15 minutes',
|
|
progress: `${((batchIndex + 1) / totalBatch * 100).toFixed(2)}%`
|
|
});
|
|
|
|
return {
|
|
batchIndex,
|
|
totalItems: items.length,
|
|
jobsCreated: jobs.length,
|
|
jobsFailed: 0
|
|
};
|
|
} catch (error) {
|
|
logger.error('Failed to create batch jobs', {
|
|
batchIndex,
|
|
batchSize: items.length,
|
|
error: error instanceof Error ? error.message : String(error)
|
|
});
|
|
|
|
return {
|
|
batchIndex,
|
|
totalItems: items.length,
|
|
jobsCreated: 0,
|
|
jobsFailed: items.length
|
|
};
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Directly create individual jobs without batching (simplified approach)
|
|
*/
|
|
async createDirectJobs<T>(config: BatchConfig<T>): Promise<{
|
|
totalItems: number;
|
|
jobsCreated: number;
|
|
}> {
|
|
const {
|
|
items,
|
|
totalDelayMs,
|
|
jobNamePrefix,
|
|
operation,
|
|
service,
|
|
provider,
|
|
priority = 2,
|
|
createJobData,
|
|
removeOnComplete = 5,
|
|
removeOnFail = 3
|
|
} = config;
|
|
|
|
if (items.length === 0) {
|
|
return { totalItems: 0, jobsCreated: 0 };
|
|
}
|
|
|
|
const delayPerItem = Math.floor(totalDelayMs / items.length);
|
|
const createBatchSize = 100; // Create jobs in chunks
|
|
let totalJobsCreated = 0;
|
|
|
|
logger.info('Creating direct jobs', {
|
|
totalItems: items.length,
|
|
delayPerItem: `${(delayPerItem / 1000).toFixed(1)}s`,
|
|
estimatedDuration: `${(totalDelayMs / 1000 / 60 / 60).toFixed(1)} hours`
|
|
});
|
|
|
|
for (let i = 0; i < items.length; i += createBatchSize) {
|
|
const batch = items.slice(i, i + createBatchSize);
|
|
const jobsToCreate = batch.map((item, batchIndex) => {
|
|
const globalIndex = i + batchIndex;
|
|
|
|
return {
|
|
name: `${jobNamePrefix}-processing`,
|
|
data: {
|
|
type: `${jobNamePrefix}-processing`,
|
|
service,
|
|
provider,
|
|
operation,
|
|
payload: createJobData(item, globalIndex),
|
|
priority
|
|
},
|
|
opts: {
|
|
delay: globalIndex * delayPerItem,
|
|
jobId: `${jobNamePrefix}-${globalIndex}-${Date.now()}`,
|
|
removeOnComplete,
|
|
removeOnFail
|
|
}
|
|
};
|
|
});
|
|
|
|
try {
|
|
const jobs = await this.queueManager.queue.addBulk(jobsToCreate);
|
|
totalJobsCreated += jobs.length;
|
|
|
|
if ((i + createBatchSize) % 500 === 0 || i + createBatchSize >= items.length) {
|
|
logger.info('Direct job creation progress', {
|
|
created: totalJobsCreated,
|
|
total: items.length,
|
|
percentage: `${((totalJobsCreated / items.length) * 100).toFixed(1)}%`
|
|
});
|
|
}
|
|
} catch (error) {
|
|
logger.error('Failed to create direct job batch', {
|
|
startIndex: i,
|
|
batchSize: batch.length,
|
|
error: error instanceof Error ? error.message : String(error)
|
|
});
|
|
}
|
|
}
|
|
|
|
return { totalItems: items.length, jobsCreated: totalJobsCreated };
|
|
}
|
|
}
|