batch processor changes
This commit is contained in:
parent
8b6f6008e4
commit
f78fa459d2
1 changed files with 12 additions and 16 deletions
|
|
@ -87,11 +87,10 @@ export class BatchProcessor {
|
||||||
type: `${jobNamePrefix}-batch-processing`,
|
type: `${jobNamePrefix}-batch-processing`,
|
||||||
service,
|
service,
|
||||||
provider,
|
provider,
|
||||||
operation: `process-${jobNamePrefix}-batch`,
|
operation: `process-${jobNamePrefix}-batch`, payload: {
|
||||||
payload: {
|
|
||||||
items: batchItems,
|
items: batchItems,
|
||||||
batchIndex: i,
|
batchIndex: i,
|
||||||
totalBatch: totalBatches, // Changed to match your property name
|
total: totalBatches, // Changed to total to match proxy provider
|
||||||
batchSize,
|
batchSize,
|
||||||
config: {
|
config: {
|
||||||
jobNamePrefix,
|
jobNamePrefix,
|
||||||
|
|
@ -145,11 +144,10 @@ export class BatchProcessor {
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* Process a batch by creating individual item jobs
|
* Process a batch by creating individual item jobs
|
||||||
*/
|
*/ async processBatch<T>(payload: {
|
||||||
async processBatch<T>(payload: {
|
|
||||||
items: T[];
|
items: T[];
|
||||||
batchIndex: number;
|
batchIndex: number;
|
||||||
totalBatch: number; // Changed to match common property name
|
total: number; // Changed to match proxy provider
|
||||||
batchSize: number;
|
batchSize: number;
|
||||||
config: {
|
config: {
|
||||||
jobNamePrefix: string;
|
jobNamePrefix: string;
|
||||||
|
|
@ -166,13 +164,12 @@ export class BatchProcessor {
|
||||||
jobsCreated: number;
|
jobsCreated: number;
|
||||||
jobsFailed: number;
|
jobsFailed: number;
|
||||||
}> {
|
}> {
|
||||||
const { items, batchIndex, totalBatch, config } = payload;
|
const { items, batchIndex, total, config } = payload;
|
||||||
|
|
||||||
logger.info('Processing batch', {
|
logger.info('Processing batch', {
|
||||||
batchIndex,
|
batchIndex,
|
||||||
batchSize: items.length,
|
batchSize: items.length,
|
||||||
totalBatch,
|
total,
|
||||||
progress: `${((batchIndex + 1) / totalBatch * 100).toFixed(2)}%`
|
progress: `${((batchIndex + 1) / total * 100).toFixed(2)}%`
|
||||||
});
|
});
|
||||||
|
|
||||||
// Spread items over a reasonable time period
|
// Spread items over a reasonable time period
|
||||||
|
|
@ -183,12 +180,12 @@ export class BatchProcessor {
|
||||||
// Get user data first
|
// Get user data first
|
||||||
const userData = createJobData(item, i);
|
const userData = createJobData(item, i);
|
||||||
|
|
||||||
// Automatically merge with batch info using generic property names
|
// Automatically merge with batch info using your property names
|
||||||
const finalPayload = {
|
const finalPayload = {
|
||||||
...userData,
|
...userData,
|
||||||
batchIndex,
|
batchIndex,
|
||||||
itemIndexInBatch: i, // Generic property name
|
itemIndex: i, // Changed to match proxy provider
|
||||||
totalBatch, // Generic property name
|
total, // Changed to match proxy provider
|
||||||
source: userData.source || 'batch-processing'
|
source: userData.source || 'batch-processing'
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -213,13 +210,12 @@ export class BatchProcessor {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const jobs = await this.queueManager.queue.addBulk(jobsToCreate);
|
const jobs = await this.queueManager.queue.addBulk(jobsToCreate);
|
||||||
|
logger.info('Batch processing completed', {
|
||||||
logger.info('Batch processing completed', {
|
|
||||||
batchIndex,
|
batchIndex,
|
||||||
totalItems: items.length,
|
totalItems: items.length,
|
||||||
jobsCreated: jobs.length,
|
jobsCreated: jobs.length,
|
||||||
batchDelay: '15 minutes',
|
batchDelay: '15 minutes',
|
||||||
progress: `${((batchIndex + 1) / totalBatch * 100).toFixed(2)}%`
|
progress: `${((batchIndex + 1) / total * 100).toFixed(2)}%`
|
||||||
});
|
});
|
||||||
|
|
||||||
return {
|
return {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue