545 lines
No EOL
18 KiB
TypeScript
545 lines
No EOL
18 KiB
TypeScript
import { getLogger } from '@stock-bot/logger';
|
|
import { createCache, CacheProvider } from '@stock-bot/cache';
|
|
|
|
export interface BatchConfig<T> {
|
|
items: T[];
|
|
batchSize?: number; // Optional - only used for batch mode
|
|
totalDelayMs: number;
|
|
jobNamePrefix: string;
|
|
operation: string;
|
|
service: string;
|
|
provider: string;
|
|
priority?: number;
|
|
createJobData: (item: T, index: number) => any;
|
|
removeOnComplete?: number;
|
|
removeOnFail?: number;
|
|
useBatching?: boolean; // Simple flag to choose mode
|
|
payloadTtlHours?: number; // TTL for stored payloads (default 24 hours)
|
|
}
|
|
|
|
const logger = getLogger('batch-processor');
|
|
|
|
export class BatchProcessor {
|
|
private cacheProvider: CacheProvider;
|
|
private isReady = false;
|
|
private keyPrefix: string = 'batch:'; // Default key prefix for batch payloads
|
|
constructor(
|
|
private queueManager: any,
|
|
private cacheOptions?: { keyPrefix?: string; ttl?: number } // Optional cache configuration
|
|
) {
|
|
this.keyPrefix = cacheOptions?.keyPrefix || 'batch:'; // Initialize cache provider with batch-specific settings
|
|
this.cacheProvider = createCache({
|
|
keyPrefix: this.keyPrefix,
|
|
ttl: cacheOptions?.ttl || 86400 * 2, // 48 hours default
|
|
enableMetrics: true
|
|
});
|
|
this.initialize();
|
|
}
|
|
/**
|
|
* Initialize the batch processor and wait for cache to be ready
|
|
*/
|
|
async initialize(timeout: number = 10000): Promise<void> {
|
|
if (this.isReady) {
|
|
logger.warn('BatchProcessor already initialized');
|
|
return;
|
|
}
|
|
|
|
logger.info('Initializing BatchProcessor, waiting for cache to be ready...');
|
|
|
|
try {
|
|
await this.cacheProvider.waitForReady(timeout);
|
|
this.isReady = true;
|
|
logger.info('BatchProcessor initialized successfully', {
|
|
cacheReady: this.cacheProvider.isReady(),
|
|
keyPrefix: this.keyPrefix,
|
|
ttlHours: ((this.cacheOptions?.ttl || 86400 * 2) / 3600).toFixed(1)
|
|
});
|
|
} catch (error) {
|
|
logger.warn('BatchProcessor cache not ready within timeout, continuing with fallback mode', {
|
|
error: error instanceof Error ? error.message : String(error),
|
|
timeout
|
|
});
|
|
// Don't throw - mark as ready anyway and let cache operations use their fallback mechanisms
|
|
this.isReady = true;
|
|
}
|
|
}
|
|
/**
|
|
* Check if the batch processor is ready
|
|
*/
|
|
getReadyStatus(): boolean {
|
|
return this.isReady; // Don't require cache to be ready, let individual operations handle fallbacks
|
|
}
|
|
/**
|
|
* Generate a unique key for storing batch payload in Redis
|
|
* Note: The cache provider will add its keyPrefix ('batch:') automatically
|
|
*/
|
|
private generatePayloadKey(jobNamePrefix: string, batchIndex: number): string {
|
|
return `payload:${jobNamePrefix}:${batchIndex}:${Date.now()}`;
|
|
}/**
|
|
* Store batch payload in Redis and return the key
|
|
*/ private async storeBatchPayload<T>(
|
|
items: T[],
|
|
config: BatchConfig<T>,
|
|
batchIndex: number
|
|
): Promise<string> {
|
|
const payloadKey = this.generatePayloadKey(config.jobNamePrefix, batchIndex);
|
|
const payload = {
|
|
items,
|
|
batchIndex,
|
|
config: {
|
|
...config,
|
|
items: undefined // Don't store items twice
|
|
},
|
|
createdAt: new Date().toISOString()
|
|
};
|
|
|
|
const ttlSeconds = (config.payloadTtlHours || 24) * 60 * 60;
|
|
|
|
try {
|
|
await this.cacheProvider.set(
|
|
payloadKey,
|
|
JSON.stringify(payload),
|
|
ttlSeconds
|
|
);
|
|
|
|
logger.info('Stored batch payload in Redis', {
|
|
payloadKey,
|
|
itemCount: items.length,
|
|
batchIndex,
|
|
ttlHours: config.payloadTtlHours || 24
|
|
});
|
|
} catch (error) {
|
|
logger.error('Failed to store batch payload, job will run without caching', {
|
|
payloadKey,
|
|
error: error instanceof Error ? error.message : String(error)
|
|
});
|
|
// Don't throw - the job can still run, just without the cached payload
|
|
}
|
|
|
|
return payloadKey;
|
|
}/**
|
|
* Load batch payload from Redis
|
|
*/
|
|
private async loadBatchPayload<T>(payloadKey: string): Promise<{
|
|
items: T[];
|
|
batchIndex: number;
|
|
config: BatchConfig<T>;
|
|
} | null> {
|
|
// Auto-initialize if not ready
|
|
if (!this.cacheProvider.isReady() || !this.isReady) {
|
|
logger.info('Cache provider not ready, initializing...', { payloadKey });
|
|
try {
|
|
await this.initialize();
|
|
} catch (error) {
|
|
logger.error('Failed to initialize cache provider for loading', {
|
|
payloadKey,
|
|
error: error instanceof Error ? error.message : String(error)
|
|
});
|
|
throw new Error('Cache provider initialization failed - cannot load batch payload');
|
|
}
|
|
}
|
|
|
|
try {
|
|
const payloadData = await this.cacheProvider.get<any>(payloadKey);
|
|
|
|
if (!payloadData) {
|
|
logger.error('Batch payload not found in Redis', { payloadKey });
|
|
throw new Error('Batch payload not found in Redis');
|
|
}
|
|
|
|
// Handle both string and already-parsed object
|
|
let payload;
|
|
if (typeof payloadData === 'string') {
|
|
payload = JSON.parse(payloadData);
|
|
} else {
|
|
// Already parsed by cache provider
|
|
payload = payloadData;
|
|
}
|
|
|
|
logger.info('Loaded batch payload from Redis', {
|
|
payloadKey,
|
|
itemCount: payload.items?.length || 0,
|
|
batchIndex: payload.batchIndex
|
|
});
|
|
|
|
return payload;
|
|
} catch (error) {
|
|
logger.error('Failed to load batch payload from Redis', {
|
|
payloadKey,
|
|
error: error instanceof Error ? error.message : String(error)
|
|
});
|
|
throw new Error('Failed to load batch payload from Redis');
|
|
}
|
|
}
|
|
/**
|
|
* Unified method that handles both direct and batch approaches
|
|
*/
|
|
async processItems<T>(config: BatchConfig<T>) {
|
|
// Check if BatchProcessor is ready
|
|
if (!this.getReadyStatus()) {
|
|
logger.warn('BatchProcessor not ready, attempting to initialize...');
|
|
await this.initialize();
|
|
}
|
|
|
|
const { items, useBatching = false } = config;
|
|
|
|
if (items.length === 0) {
|
|
return { totalItems: 0, jobsCreated: 0 };
|
|
} // Final readiness check - wait briefly for cache to be ready
|
|
if (!this.cacheProvider.isReady()) {
|
|
logger.warn('Cache provider not ready, waiting briefly...');
|
|
try {
|
|
await this.cacheProvider.waitForReady(10000); // Wait up to 10 seconds
|
|
logger.info('Cache provider became ready');
|
|
} catch (error) {
|
|
logger.warn('Cache provider still not ready, continuing with fallback mode');
|
|
// Don't throw error - let the cache operations use their fallback mechanisms
|
|
}
|
|
}
|
|
|
|
logger.info('Starting item processing', {
|
|
totalItems: items.length,
|
|
mode: useBatching ? 'batch' : 'direct',
|
|
cacheReady: this.cacheProvider.isReady()
|
|
});
|
|
|
|
if (useBatching) {
|
|
return await this.createBatchJobs(config);
|
|
} else {
|
|
return await this.createDirectJobs(config);
|
|
}
|
|
}
|
|
|
|
private async createDirectJobs<T>(config: BatchConfig<T>) {
|
|
const {
|
|
items,
|
|
totalDelayMs,
|
|
jobNamePrefix,
|
|
operation,
|
|
service,
|
|
provider,
|
|
priority = 2,
|
|
createJobData,
|
|
removeOnComplete = 5,
|
|
removeOnFail = 3
|
|
} = config;
|
|
|
|
const delayPerItem = Math.floor(totalDelayMs / items.length);
|
|
const chunkSize = 100;
|
|
let totalJobsCreated = 0;
|
|
|
|
logger.info('Creating direct jobs', {
|
|
totalItems: items.length,
|
|
delayPerItem: `${(delayPerItem / 1000).toFixed(1)}s`,
|
|
estimatedDuration: `${(totalDelayMs / 1000 / 60 / 60).toFixed(1)} hours`
|
|
});
|
|
|
|
// Process in chunks to avoid overwhelming Redis
|
|
for (let i = 0; i < items.length; i += chunkSize) {
|
|
const chunk = items.slice(i, i + chunkSize);
|
|
|
|
const jobs = chunk.map((item, chunkIndex) => {
|
|
const globalIndex = i + chunkIndex;
|
|
return {
|
|
name: `${jobNamePrefix}-processing`,
|
|
data: {
|
|
type: `${jobNamePrefix}-processing`,
|
|
service,
|
|
provider,
|
|
operation,
|
|
payload: createJobData(item, globalIndex),
|
|
priority
|
|
},
|
|
opts: {
|
|
delay: globalIndex * delayPerItem,
|
|
jobId: `${jobNamePrefix}:${globalIndex}:${Date.now()}`,
|
|
removeOnComplete,
|
|
removeOnFail
|
|
}
|
|
};
|
|
});
|
|
|
|
try {
|
|
const createdJobs = await this.queueManager.queue.addBulk(jobs);
|
|
totalJobsCreated += createdJobs.length;
|
|
|
|
// Log progress every 500 jobs
|
|
if (totalJobsCreated % 500 === 0 || i + chunkSize >= items.length) {
|
|
logger.info('Direct job creation progress', {
|
|
created: totalJobsCreated,
|
|
total: items.length,
|
|
percentage: `${((totalJobsCreated / items.length) * 100).toFixed(1)}%`
|
|
});
|
|
}
|
|
} catch (error) {
|
|
logger.error('Failed to create job chunk', {
|
|
startIndex: i,
|
|
chunkSize: chunk.length,
|
|
error: error instanceof Error ? error.message : String(error)
|
|
});
|
|
}
|
|
}
|
|
|
|
return {
|
|
totalItems: items.length,
|
|
jobsCreated: totalJobsCreated,
|
|
mode: 'direct'
|
|
};
|
|
}
|
|
private async createBatchJobs<T>(config: BatchConfig<T>) {
|
|
const {
|
|
items,
|
|
batchSize = 200,
|
|
totalDelayMs,
|
|
jobNamePrefix,
|
|
operation,
|
|
service,
|
|
provider,
|
|
priority = 3
|
|
} = config;
|
|
|
|
const totalBatches = Math.ceil(items.length / batchSize);
|
|
const delayPerBatch = Math.floor(totalDelayMs / totalBatches);
|
|
const chunkSize = 50; // Create batch jobs in chunks
|
|
let batchJobsCreated = 0;
|
|
|
|
logger.info('Creating optimized batch jobs with Redis payload storage', {
|
|
totalItems: items.length,
|
|
batchSize,
|
|
totalBatches,
|
|
delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes`,
|
|
payloadTtlHours: config.payloadTtlHours || 24
|
|
});
|
|
|
|
// Create batch jobs in chunks
|
|
for (let chunkStart = 0; chunkStart < totalBatches; chunkStart += chunkSize) {
|
|
const chunkEnd = Math.min(chunkStart + chunkSize, totalBatches);
|
|
const batchJobs = [];
|
|
|
|
for (let batchIndex = chunkStart; batchIndex < chunkEnd; batchIndex++) {
|
|
const startIndex = batchIndex * batchSize;
|
|
const endIndex = Math.min(startIndex + batchSize, items.length);
|
|
const batchItems = items.slice(startIndex, endIndex);
|
|
// Store batch payload in Redis and get reference key
|
|
const payloadKey = await this.storeBatchPayload(batchItems, config, batchIndex);
|
|
batchJobs.push({
|
|
name: `${jobNamePrefix}-batch-processing`,
|
|
data: {
|
|
type: `${jobNamePrefix}-batch-processing`,
|
|
service,
|
|
provider,
|
|
operation: `process-${jobNamePrefix}-batch`,
|
|
payload: {
|
|
// Optimized: only store reference and metadata
|
|
payloadKey: payloadKey,
|
|
batchIndex,
|
|
total: totalBatches,
|
|
itemCount: batchItems.length,
|
|
configSnapshot: {
|
|
jobNamePrefix: config.jobNamePrefix,
|
|
operation: config.operation,
|
|
service: config.service,
|
|
provider: config.provider,
|
|
priority: config.priority,
|
|
removeOnComplete: config.removeOnComplete,
|
|
removeOnFail: config.removeOnFail,
|
|
totalDelayMs: config.totalDelayMs
|
|
}
|
|
},
|
|
priority
|
|
},
|
|
opts: {
|
|
delay: batchIndex * delayPerBatch,
|
|
jobId: `${jobNamePrefix}-batch:${batchIndex}:${Date.now()}`
|
|
}
|
|
});
|
|
}
|
|
|
|
try {
|
|
const createdJobs = await this.queueManager.queue.addBulk(batchJobs);
|
|
batchJobsCreated += createdJobs.length;
|
|
logger.info('Optimized batch chunk created', {
|
|
chunkStart: chunkStart + 1,
|
|
chunkEnd,
|
|
created: createdJobs.length,
|
|
totalCreated: batchJobsCreated,
|
|
progress: `${((chunkEnd / totalBatches) * 100).toFixed(1)}%`,
|
|
usingRedisStorage: true
|
|
});
|
|
} catch (error) {
|
|
logger.error('Failed to create batch chunk', {
|
|
chunkStart,
|
|
chunkEnd,
|
|
error: error instanceof Error ? error.message : String(error)
|
|
});
|
|
}
|
|
|
|
// Small delay between chunks
|
|
if (chunkEnd < totalBatches) {
|
|
await new Promise(resolve => setTimeout(resolve, 100));
|
|
}
|
|
} return {
|
|
totalItems: items.length,
|
|
batchJobsCreated,
|
|
totalBatches,
|
|
estimatedDurationHours: totalDelayMs / 1000 / 60 / 60,
|
|
mode: 'batch',
|
|
optimized: true
|
|
};
|
|
}
|
|
/**
|
|
* Process a batch (called by batch jobs)
|
|
* Supports both optimized (Redis payload storage) and fallback modes
|
|
*/
|
|
async processBatch<T>(
|
|
jobPayload: any,
|
|
createJobData?: (item: T, index: number) => any
|
|
) {
|
|
let batchData: {
|
|
items: T[];
|
|
batchIndex: number;
|
|
config: BatchConfig<T>;
|
|
};
|
|
|
|
let total: number;
|
|
|
|
// Check if this is an optimized batch with Redis payload storage
|
|
if (jobPayload.payloadKey) {
|
|
logger.info('Processing optimized batch with Redis payload storage', {
|
|
payloadKey: jobPayload.payloadKey,
|
|
batchIndex: jobPayload.batchIndex,
|
|
itemCount: jobPayload.itemCount
|
|
});
|
|
|
|
// Load actual payload from Redis
|
|
const loadedPayload = await this.loadBatchPayload<T>(jobPayload.payloadKey);
|
|
|
|
if (!loadedPayload) {
|
|
throw new Error(`Failed to load batch payload from Redis: ${jobPayload.payloadKey}`);
|
|
}
|
|
|
|
batchData = loadedPayload;
|
|
total = jobPayload.total;
|
|
|
|
// Clean up Redis payload after loading (optional - you might want to keep it for retry scenarios)
|
|
// await this.redisClient?.del(jobPayload.payloadKey);
|
|
|
|
} else {
|
|
// Fallback: payload stored directly in job data
|
|
logger.info('Processing batch with inline payload storage', {
|
|
batchIndex: jobPayload.batchIndex,
|
|
itemCount: jobPayload.items?.length || 0
|
|
});
|
|
|
|
batchData = {
|
|
items: jobPayload.items,
|
|
batchIndex: jobPayload.batchIndex,
|
|
config: jobPayload.config
|
|
};
|
|
total = jobPayload.total;
|
|
}
|
|
|
|
const { items, batchIndex, config } = batchData;
|
|
|
|
logger.info('Processing batch', {
|
|
batchIndex,
|
|
batchSize: items.length,
|
|
total,
|
|
progress: `${((batchIndex + 1) / total * 100).toFixed(2)}%`,
|
|
isOptimized: !!jobPayload.payloadKey
|
|
});
|
|
|
|
const totalBatchDelayMs = config.totalDelayMs / total;
|
|
const delayPerItem = Math.floor(totalBatchDelayMs / items.length);
|
|
|
|
const jobs = items.map((item, itemIndex) => {
|
|
// Use the provided createJobData function or fall back to config
|
|
const jobDataFn = createJobData || config.createJobData;
|
|
|
|
if (!jobDataFn) {
|
|
throw new Error('createJobData function is required');
|
|
}
|
|
|
|
const userData = jobDataFn(item, itemIndex);
|
|
|
|
return {
|
|
name: `${config.jobNamePrefix}-processing`,
|
|
data: {
|
|
type: `${config.jobNamePrefix}-processing`,
|
|
service: config.service,
|
|
provider: config.provider,
|
|
operation: config.operation,
|
|
payload: {
|
|
...userData,
|
|
batchIndex,
|
|
itemIndex,
|
|
total,
|
|
source: userData.source || 'batch-processing'
|
|
},
|
|
priority: config.priority || 2
|
|
},
|
|
opts: {
|
|
delay: itemIndex * delayPerItem,
|
|
jobId: `${config.jobNamePrefix}:${batchIndex}:${itemIndex}:${Date.now()}`,
|
|
removeOnComplete: config.removeOnComplete || 5,
|
|
removeOnFail: config.removeOnFail || 3
|
|
}
|
|
};
|
|
});
|
|
|
|
try {
|
|
const createdJobs = await this.queueManager.queue.addBulk(jobs);
|
|
|
|
logger.info('Batch processing completed', {
|
|
batchIndex,
|
|
totalItems: items.length,
|
|
jobsCreated: createdJobs.length,
|
|
progress: `${((batchIndex + 1) / total * 100).toFixed(2)}%`,
|
|
memoryOptimized: !!jobPayload.payloadKey
|
|
});
|
|
|
|
return {
|
|
batchIndex,
|
|
totalItems: items.length,
|
|
jobsCreated: createdJobs.length,
|
|
jobsFailed: 0,
|
|
payloadKey: jobPayload.payloadKey || null
|
|
};
|
|
} catch (error) {
|
|
logger.error('Failed to process batch', {
|
|
batchIndex,
|
|
error: error instanceof Error ? error.message : String(error)
|
|
});
|
|
|
|
return {
|
|
batchIndex,
|
|
totalItems: items.length,
|
|
jobsCreated: 0,
|
|
jobsFailed: items.length,
|
|
payloadKey: jobPayload.payloadKey || null
|
|
};
|
|
}
|
|
} /**
|
|
* Clean up Redis payload after successful processing (optional)
|
|
*/
|
|
async cleanupBatchPayload(payloadKey: string): Promise<void> {
|
|
if (!payloadKey) {
|
|
return;
|
|
}
|
|
|
|
if (!this.cacheProvider.isReady()) {
|
|
logger.warn('Cache provider not ready - skipping cleanup', { payloadKey });
|
|
return;
|
|
}
|
|
|
|
try {
|
|
await this.cacheProvider.del(payloadKey);
|
|
logger.info('Cleaned up batch payload from Redis', { payloadKey });
|
|
} catch (error) {
|
|
logger.warn('Failed to cleanup batch payload', {
|
|
payloadKey,
|
|
error: error instanceof Error ? error.message : String(error)
|
|
});
|
|
}
|
|
}
|
|
} |