linxus fs fixes
This commit is contained in:
parent
ac23b70146
commit
0b7846fe67
292 changed files with 41947 additions and 41947 deletions
|
|
@ -1,293 +1,293 @@
|
|||
import { getLogger } from '@stock-bot/logger';
|
||||
|
||||
export interface BatchConfig<T> {
|
||||
items: T[];
|
||||
batchSize?: number; // Optional - only used for batch mode
|
||||
totalDelayMs: number;
|
||||
jobNamePrefix: string;
|
||||
operation: string;
|
||||
service: string;
|
||||
provider: string;
|
||||
priority?: number;
|
||||
createJobData: (item: T, index: number) => any;
|
||||
removeOnComplete?: number;
|
||||
removeOnFail?: number;
|
||||
useBatching?: boolean; // Simple flag to choose mode
|
||||
}
|
||||
|
||||
const logger = getLogger('batch-processor');
|
||||
|
||||
export class BatchProcessor {
|
||||
constructor(private queueManager: any) {}
|
||||
|
||||
/**
|
||||
* Unified method that handles both direct and batch approaches
|
||||
*/
|
||||
async processItems<T>(config: BatchConfig<T>) {
|
||||
const { items, useBatching = false } = config;
|
||||
|
||||
if (items.length === 0) {
|
||||
return { totalItems: 0, jobsCreated: 0 };
|
||||
}
|
||||
|
||||
if (useBatching) {
|
||||
return await this.createBatchJobs(config);
|
||||
} else {
|
||||
return await this.createDirectJobs(config);
|
||||
}
|
||||
}
|
||||
|
||||
private async createDirectJobs<T>(config: BatchConfig<T>) {
|
||||
const {
|
||||
items,
|
||||
totalDelayMs,
|
||||
jobNamePrefix,
|
||||
operation,
|
||||
service,
|
||||
provider,
|
||||
priority = 2,
|
||||
createJobData,
|
||||
removeOnComplete = 5,
|
||||
removeOnFail = 3
|
||||
} = config;
|
||||
|
||||
const delayPerItem = Math.floor(totalDelayMs / items.length);
|
||||
const chunkSize = 100;
|
||||
let totalJobsCreated = 0;
|
||||
|
||||
logger.info('Creating direct jobs', {
|
||||
totalItems: items.length,
|
||||
delayPerItem: `${(delayPerItem / 1000).toFixed(1)}s`,
|
||||
estimatedDuration: `${(totalDelayMs / 1000 / 60 / 60).toFixed(1)} hours`
|
||||
});
|
||||
|
||||
// Process in chunks to avoid overwhelming Redis
|
||||
for (let i = 0; i < items.length; i += chunkSize) {
|
||||
const chunk = items.slice(i, i + chunkSize);
|
||||
|
||||
const jobs = chunk.map((item, chunkIndex) => {
|
||||
const globalIndex = i + chunkIndex;
|
||||
return {
|
||||
name: `${jobNamePrefix}-processing`,
|
||||
data: {
|
||||
type: `${jobNamePrefix}-processing`,
|
||||
service,
|
||||
provider,
|
||||
operation,
|
||||
payload: createJobData(item, globalIndex),
|
||||
priority
|
||||
},
|
||||
opts: {
|
||||
delay: globalIndex * delayPerItem,
|
||||
jobId: `${jobNamePrefix}-${globalIndex}-${Date.now()}`,
|
||||
removeOnComplete,
|
||||
removeOnFail
|
||||
}
|
||||
};
|
||||
});
|
||||
|
||||
try {
|
||||
const createdJobs = await this.queueManager.queue.addBulk(jobs);
|
||||
totalJobsCreated += createdJobs.length;
|
||||
|
||||
// Log progress every 500 jobs
|
||||
if (totalJobsCreated % 500 === 0 || i + chunkSize >= items.length) {
|
||||
logger.info('Direct job creation progress', {
|
||||
created: totalJobsCreated,
|
||||
total: items.length,
|
||||
percentage: `${((totalJobsCreated / items.length) * 100).toFixed(1)}%`
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Failed to create job chunk', {
|
||||
startIndex: i,
|
||||
chunkSize: chunk.length,
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
totalItems: items.length,
|
||||
jobsCreated: totalJobsCreated,
|
||||
mode: 'direct'
|
||||
};
|
||||
}
|
||||
|
||||
private async createBatchJobs<T>(config: BatchConfig<T>) {
|
||||
const {
|
||||
items,
|
||||
batchSize = 200,
|
||||
totalDelayMs,
|
||||
jobNamePrefix,
|
||||
operation,
|
||||
service,
|
||||
provider,
|
||||
priority = 3
|
||||
} = config;
|
||||
|
||||
const totalBatches = Math.ceil(items.length / batchSize);
|
||||
const delayPerBatch = Math.floor(totalDelayMs / totalBatches);
|
||||
const chunkSize = 50; // Create batch jobs in chunks
|
||||
let batchJobsCreated = 0;
|
||||
|
||||
logger.info('Creating batch jobs', {
|
||||
totalItems: items.length,
|
||||
batchSize,
|
||||
totalBatches,
|
||||
delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes`
|
||||
});
|
||||
|
||||
// Create batch jobs in chunks
|
||||
for (let chunkStart = 0; chunkStart < totalBatches; chunkStart += chunkSize) {
|
||||
const chunkEnd = Math.min(chunkStart + chunkSize, totalBatches);
|
||||
const batchJobs = [];
|
||||
|
||||
for (let batchIndex = chunkStart; batchIndex < chunkEnd; batchIndex++) {
|
||||
const startIndex = batchIndex * batchSize;
|
||||
const endIndex = Math.min(startIndex + batchSize, items.length);
|
||||
const batchItems = items.slice(startIndex, endIndex);
|
||||
|
||||
batchJobs.push({
|
||||
name: `${jobNamePrefix}-batch-processing`,
|
||||
data: {
|
||||
type: `${jobNamePrefix}-batch-processing`,
|
||||
service,
|
||||
provider,
|
||||
operation: `process-${jobNamePrefix}-batch`,
|
||||
payload: {
|
||||
items: batchItems,
|
||||
batchIndex,
|
||||
total: totalBatches,
|
||||
config: { ...config, priority: priority - 1 }
|
||||
},
|
||||
priority
|
||||
},
|
||||
opts: {
|
||||
delay: batchIndex * delayPerBatch,
|
||||
jobId: `${jobNamePrefix}-batch-${batchIndex}-${Date.now()}`
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
try {
|
||||
const createdJobs = await this.queueManager.queue.addBulk(batchJobs);
|
||||
batchJobsCreated += createdJobs.length;
|
||||
|
||||
logger.info('Batch chunk created', {
|
||||
chunkStart: chunkStart + 1,
|
||||
chunkEnd,
|
||||
created: createdJobs.length,
|
||||
totalCreated: batchJobsCreated,
|
||||
progress: `${((chunkEnd / totalBatches) * 100).toFixed(1)}%`
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error('Failed to create batch chunk', {
|
||||
chunkStart,
|
||||
chunkEnd,
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
}
|
||||
|
||||
// Small delay between chunks
|
||||
if (chunkEnd < totalBatches) {
|
||||
await new Promise(resolve => setTimeout(resolve, 100));
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
totalItems: items.length,
|
||||
batchJobsCreated,
|
||||
totalBatches,
|
||||
estimatedDurationHours: totalDelayMs / 1000 / 60 / 60,
|
||||
mode: 'batch'
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a batch (called by batch jobs)
|
||||
*/
|
||||
async processBatch<T>(payload: {
|
||||
items: T[];
|
||||
batchIndex: number;
|
||||
total: number;
|
||||
config: BatchConfig<T>;
|
||||
}, createJobData?: (item: T, index: number) => any) {
|
||||
const { items, batchIndex, total, config } = payload;
|
||||
|
||||
logger.info('Processing batch', {
|
||||
batchIndex,
|
||||
batchSize: items.length,
|
||||
total,
|
||||
progress: `${((batchIndex + 1) / total * 100).toFixed(2)}%`
|
||||
});
|
||||
|
||||
const totalBatchDelayMs = config.totalDelayMs / total;
|
||||
const delayPerItem = Math.floor(totalBatchDelayMs / items.length);
|
||||
|
||||
const jobs = items.map((item, itemIndex) => {
|
||||
// Use the provided createJobData function or fall back to config
|
||||
const jobDataFn = createJobData || config.createJobData;
|
||||
|
||||
if (!jobDataFn) {
|
||||
throw new Error('createJobData function is required');
|
||||
}
|
||||
|
||||
const userData = jobDataFn(item, itemIndex);
|
||||
|
||||
return {
|
||||
name: `${config.jobNamePrefix}-processing`,
|
||||
data: {
|
||||
type: `${config.jobNamePrefix}-processing`,
|
||||
service: config.service,
|
||||
provider: config.provider,
|
||||
operation: config.operation,
|
||||
payload: {
|
||||
...userData,
|
||||
batchIndex,
|
||||
itemIndex,
|
||||
total,
|
||||
source: userData.source || 'batch-processing'
|
||||
},
|
||||
priority: config.priority || 2
|
||||
},
|
||||
opts: {
|
||||
delay: itemIndex * delayPerItem,
|
||||
jobId: `${config.jobNamePrefix}-${batchIndex}-${itemIndex}-${Date.now()}`,
|
||||
removeOnComplete: config.removeOnComplete || 5,
|
||||
removeOnFail: config.removeOnFail || 3
|
||||
}
|
||||
};
|
||||
});
|
||||
|
||||
try {
|
||||
const createdJobs = await this.queueManager.queue.addBulk(jobs);
|
||||
|
||||
logger.info('Batch processing completed', {
|
||||
batchIndex,
|
||||
totalItems: items.length,
|
||||
jobsCreated: createdJobs.length,
|
||||
progress: `${((batchIndex + 1) / total * 100).toFixed(2)}%`
|
||||
});
|
||||
|
||||
return {
|
||||
batchIndex,
|
||||
totalItems: items.length,
|
||||
jobsCreated: createdJobs.length,
|
||||
jobsFailed: 0
|
||||
};
|
||||
} catch (error) {
|
||||
logger.error('Failed to process batch', {
|
||||
batchIndex,
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
|
||||
return {
|
||||
batchIndex,
|
||||
totalItems: items.length,
|
||||
jobsCreated: 0,
|
||||
jobsFailed: items.length
|
||||
};
|
||||
}
|
||||
}
|
||||
import { getLogger } from '@stock-bot/logger';
|
||||
|
||||
export interface BatchConfig<T> {
|
||||
items: T[];
|
||||
batchSize?: number; // Optional - only used for batch mode
|
||||
totalDelayMs: number;
|
||||
jobNamePrefix: string;
|
||||
operation: string;
|
||||
service: string;
|
||||
provider: string;
|
||||
priority?: number;
|
||||
createJobData: (item: T, index: number) => any;
|
||||
removeOnComplete?: number;
|
||||
removeOnFail?: number;
|
||||
useBatching?: boolean; // Simple flag to choose mode
|
||||
}
|
||||
|
||||
const logger = getLogger('batch-processor');
|
||||
|
||||
export class BatchProcessor {
|
||||
constructor(private queueManager: any) {}
|
||||
|
||||
/**
|
||||
* Unified method that handles both direct and batch approaches
|
||||
*/
|
||||
async processItems<T>(config: BatchConfig<T>) {
|
||||
const { items, useBatching = false } = config;
|
||||
|
||||
if (items.length === 0) {
|
||||
return { totalItems: 0, jobsCreated: 0 };
|
||||
}
|
||||
|
||||
if (useBatching) {
|
||||
return await this.createBatchJobs(config);
|
||||
} else {
|
||||
return await this.createDirectJobs(config);
|
||||
}
|
||||
}
|
||||
|
||||
private async createDirectJobs<T>(config: BatchConfig<T>) {
|
||||
const {
|
||||
items,
|
||||
totalDelayMs,
|
||||
jobNamePrefix,
|
||||
operation,
|
||||
service,
|
||||
provider,
|
||||
priority = 2,
|
||||
createJobData,
|
||||
removeOnComplete = 5,
|
||||
removeOnFail = 3
|
||||
} = config;
|
||||
|
||||
const delayPerItem = Math.floor(totalDelayMs / items.length);
|
||||
const chunkSize = 100;
|
||||
let totalJobsCreated = 0;
|
||||
|
||||
logger.info('Creating direct jobs', {
|
||||
totalItems: items.length,
|
||||
delayPerItem: `${(delayPerItem / 1000).toFixed(1)}s`,
|
||||
estimatedDuration: `${(totalDelayMs / 1000 / 60 / 60).toFixed(1)} hours`
|
||||
});
|
||||
|
||||
// Process in chunks to avoid overwhelming Redis
|
||||
for (let i = 0; i < items.length; i += chunkSize) {
|
||||
const chunk = items.slice(i, i + chunkSize);
|
||||
|
||||
const jobs = chunk.map((item, chunkIndex) => {
|
||||
const globalIndex = i + chunkIndex;
|
||||
return {
|
||||
name: `${jobNamePrefix}-processing`,
|
||||
data: {
|
||||
type: `${jobNamePrefix}-processing`,
|
||||
service,
|
||||
provider,
|
||||
operation,
|
||||
payload: createJobData(item, globalIndex),
|
||||
priority
|
||||
},
|
||||
opts: {
|
||||
delay: globalIndex * delayPerItem,
|
||||
jobId: `${jobNamePrefix}-${globalIndex}-${Date.now()}`,
|
||||
removeOnComplete,
|
||||
removeOnFail
|
||||
}
|
||||
};
|
||||
});
|
||||
|
||||
try {
|
||||
const createdJobs = await this.queueManager.queue.addBulk(jobs);
|
||||
totalJobsCreated += createdJobs.length;
|
||||
|
||||
// Log progress every 500 jobs
|
||||
if (totalJobsCreated % 500 === 0 || i + chunkSize >= items.length) {
|
||||
logger.info('Direct job creation progress', {
|
||||
created: totalJobsCreated,
|
||||
total: items.length,
|
||||
percentage: `${((totalJobsCreated / items.length) * 100).toFixed(1)}%`
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Failed to create job chunk', {
|
||||
startIndex: i,
|
||||
chunkSize: chunk.length,
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
totalItems: items.length,
|
||||
jobsCreated: totalJobsCreated,
|
||||
mode: 'direct'
|
||||
};
|
||||
}
|
||||
|
||||
private async createBatchJobs<T>(config: BatchConfig<T>) {
|
||||
const {
|
||||
items,
|
||||
batchSize = 200,
|
||||
totalDelayMs,
|
||||
jobNamePrefix,
|
||||
operation,
|
||||
service,
|
||||
provider,
|
||||
priority = 3
|
||||
} = config;
|
||||
|
||||
const totalBatches = Math.ceil(items.length / batchSize);
|
||||
const delayPerBatch = Math.floor(totalDelayMs / totalBatches);
|
||||
const chunkSize = 50; // Create batch jobs in chunks
|
||||
let batchJobsCreated = 0;
|
||||
|
||||
logger.info('Creating batch jobs', {
|
||||
totalItems: items.length,
|
||||
batchSize,
|
||||
totalBatches,
|
||||
delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes`
|
||||
});
|
||||
|
||||
// Create batch jobs in chunks
|
||||
for (let chunkStart = 0; chunkStart < totalBatches; chunkStart += chunkSize) {
|
||||
const chunkEnd = Math.min(chunkStart + chunkSize, totalBatches);
|
||||
const batchJobs = [];
|
||||
|
||||
for (let batchIndex = chunkStart; batchIndex < chunkEnd; batchIndex++) {
|
||||
const startIndex = batchIndex * batchSize;
|
||||
const endIndex = Math.min(startIndex + batchSize, items.length);
|
||||
const batchItems = items.slice(startIndex, endIndex);
|
||||
|
||||
batchJobs.push({
|
||||
name: `${jobNamePrefix}-batch-processing`,
|
||||
data: {
|
||||
type: `${jobNamePrefix}-batch-processing`,
|
||||
service,
|
||||
provider,
|
||||
operation: `process-${jobNamePrefix}-batch`,
|
||||
payload: {
|
||||
items: batchItems,
|
||||
batchIndex,
|
||||
total: totalBatches,
|
||||
config: { ...config, priority: priority - 1 }
|
||||
},
|
||||
priority
|
||||
},
|
||||
opts: {
|
||||
delay: batchIndex * delayPerBatch,
|
||||
jobId: `${jobNamePrefix}-batch-${batchIndex}-${Date.now()}`
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
try {
|
||||
const createdJobs = await this.queueManager.queue.addBulk(batchJobs);
|
||||
batchJobsCreated += createdJobs.length;
|
||||
|
||||
logger.info('Batch chunk created', {
|
||||
chunkStart: chunkStart + 1,
|
||||
chunkEnd,
|
||||
created: createdJobs.length,
|
||||
totalCreated: batchJobsCreated,
|
||||
progress: `${((chunkEnd / totalBatches) * 100).toFixed(1)}%`
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error('Failed to create batch chunk', {
|
||||
chunkStart,
|
||||
chunkEnd,
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
}
|
||||
|
||||
// Small delay between chunks
|
||||
if (chunkEnd < totalBatches) {
|
||||
await new Promise(resolve => setTimeout(resolve, 100));
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
totalItems: items.length,
|
||||
batchJobsCreated,
|
||||
totalBatches,
|
||||
estimatedDurationHours: totalDelayMs / 1000 / 60 / 60,
|
||||
mode: 'batch'
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a batch (called by batch jobs)
|
||||
*/
|
||||
async processBatch<T>(payload: {
|
||||
items: T[];
|
||||
batchIndex: number;
|
||||
total: number;
|
||||
config: BatchConfig<T>;
|
||||
}, createJobData?: (item: T, index: number) => any) {
|
||||
const { items, batchIndex, total, config } = payload;
|
||||
|
||||
logger.info('Processing batch', {
|
||||
batchIndex,
|
||||
batchSize: items.length,
|
||||
total,
|
||||
progress: `${((batchIndex + 1) / total * 100).toFixed(2)}%`
|
||||
});
|
||||
|
||||
const totalBatchDelayMs = config.totalDelayMs / total;
|
||||
const delayPerItem = Math.floor(totalBatchDelayMs / items.length);
|
||||
|
||||
const jobs = items.map((item, itemIndex) => {
|
||||
// Use the provided createJobData function or fall back to config
|
||||
const jobDataFn = createJobData || config.createJobData;
|
||||
|
||||
if (!jobDataFn) {
|
||||
throw new Error('createJobData function is required');
|
||||
}
|
||||
|
||||
const userData = jobDataFn(item, itemIndex);
|
||||
|
||||
return {
|
||||
name: `${config.jobNamePrefix}-processing`,
|
||||
data: {
|
||||
type: `${config.jobNamePrefix}-processing`,
|
||||
service: config.service,
|
||||
provider: config.provider,
|
||||
operation: config.operation,
|
||||
payload: {
|
||||
...userData,
|
||||
batchIndex,
|
||||
itemIndex,
|
||||
total,
|
||||
source: userData.source || 'batch-processing'
|
||||
},
|
||||
priority: config.priority || 2
|
||||
},
|
||||
opts: {
|
||||
delay: itemIndex * delayPerItem,
|
||||
jobId: `${config.jobNamePrefix}-${batchIndex}-${itemIndex}-${Date.now()}`,
|
||||
removeOnComplete: config.removeOnComplete || 5,
|
||||
removeOnFail: config.removeOnFail || 3
|
||||
}
|
||||
};
|
||||
});
|
||||
|
||||
try {
|
||||
const createdJobs = await this.queueManager.queue.addBulk(jobs);
|
||||
|
||||
logger.info('Batch processing completed', {
|
||||
batchIndex,
|
||||
totalItems: items.length,
|
||||
jobsCreated: createdJobs.length,
|
||||
progress: `${((batchIndex + 1) / total * 100).toFixed(2)}%`
|
||||
});
|
||||
|
||||
return {
|
||||
batchIndex,
|
||||
totalItems: items.length,
|
||||
jobsCreated: createdJobs.length,
|
||||
jobsFailed: 0
|
||||
};
|
||||
} catch (error) {
|
||||
logger.error('Failed to process batch', {
|
||||
batchIndex,
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
|
||||
return {
|
||||
batchIndex,
|
||||
totalItems: items.length,
|
||||
jobsCreated: 0,
|
||||
jobsFailed: items.length
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue