trying to get simpler batcher working
This commit is contained in:
parent
df611a3ce3
commit
2f074271cc
5 changed files with 82 additions and 652 deletions
|
|
@ -81,29 +81,7 @@ export async function exampleBatchJobProcessor(jobData: any) {
|
|||
return result;
|
||||
}
|
||||
|
||||
// Comparison: Old vs New approach
|
||||
|
||||
// OLD COMPLEX WAY:
|
||||
/*
|
||||
const batchProcessor = new BatchProcessor(queueManager);
|
||||
await batchProcessor.initialize();
|
||||
await batchProcessor.processItems({
|
||||
items: symbols,
|
||||
batchSize: 200,
|
||||
totalDelayMs: 3600000,
|
||||
jobNamePrefix: 'yahoo-live',
|
||||
operation: 'live-data',
|
||||
service: 'data-service',
|
||||
provider: 'yahoo',
|
||||
priority: 2,
|
||||
createJobData: (symbol, index) => ({ symbol }),
|
||||
useBatching: true,
|
||||
removeOnComplete: 5,
|
||||
removeOnFail: 3
|
||||
});
|
||||
*/
|
||||
|
||||
// NEW SIMPLE WAY:
|
||||
// Example: Simple functional approach
|
||||
/*
|
||||
await processSymbols(symbols, queueManager, {
|
||||
operation: 'live-data',
|
||||
|
|
|
|||
|
|
@ -25,14 +25,15 @@ export const proxyProvider: ProviderConfig = {
|
|||
|
||||
if (proxies.length === 0) {
|
||||
return { proxiesFetched: 0, jobsCreated: 0 };
|
||||
}
|
||||
|
||||
// Use simplified functional approach
|
||||
} // Use simplified functional approach
|
||||
const result = await processProxies(proxies, queueManager, {
|
||||
totalDelayMs: parseInt(process.env.PROXY_VALIDATION_HOURS || '4') * 60 * 60 * 1000,
|
||||
batchSize: parseInt(process.env.PROXY_BATCH_SIZE || '200'),
|
||||
useBatching: process.env.PROXY_DIRECT_MODE !== 'true',
|
||||
priority: 2
|
||||
priority: 2,
|
||||
service: 'proxy',
|
||||
provider: 'proxy-service',
|
||||
operation: 'check-proxy'
|
||||
});return {
|
||||
proxiesFetched: result.totalItems,
|
||||
jobsCreated: result.jobsCreated,
|
||||
|
|
@ -40,7 +41,8 @@ export const proxyProvider: ProviderConfig = {
|
|||
batchesCreated: result.batchesCreated,
|
||||
processingTimeMs: result.duration
|
||||
};
|
||||
}, 'process-proxy-batch': async (payload: any) => {
|
||||
},
|
||||
'process-batch-items': async (payload: any) => {
|
||||
// Process a batch using the simplified batch helpers
|
||||
const { processBatchJob } = await import('../utils/batch-helpers');
|
||||
const { queueManager } = await import('../services/queue.service');
|
||||
|
|
|
|||
|
|
@ -14,6 +14,10 @@ export interface ProcessOptions {
|
|||
ttl?: number;
|
||||
removeOnComplete?: number;
|
||||
removeOnFail?: number;
|
||||
// Job routing information
|
||||
service?: string;
|
||||
provider?: string;
|
||||
operation?: string;
|
||||
}
|
||||
|
||||
export interface BatchResult {
|
||||
|
|
@ -106,9 +110,9 @@ async function processDirect<T>(
|
|||
name: 'process-item',
|
||||
data: {
|
||||
type: 'process-item',
|
||||
service: 'batch-processor',
|
||||
provider: 'direct',
|
||||
operation: 'process-single-item',
|
||||
service: options.service || 'data-service',
|
||||
provider: options.provider || 'generic',
|
||||
operation: options.operation || 'process-item',
|
||||
payload: processor(item, index),
|
||||
priority: options.priority || 1
|
||||
},
|
||||
|
|
@ -205,18 +209,22 @@ export async function processBatchJob(jobData: any, queue: QueueService): Promis
|
|||
|
||||
try {
|
||||
const payload = await loadPayload(payloadKey);
|
||||
if (!payload || !payload.items || !payload.processorStr) {
|
||||
logger.error('Invalid payload data', { payloadKey, payload });
|
||||
throw new Error(`Invalid payload data for key: ${payloadKey}`);
|
||||
}
|
||||
const { items, processorStr, options } = payload;
|
||||
|
||||
// Deserialize processor function (in production, use safer alternatives)
|
||||
// Deserialize the processor function
|
||||
const processor = new Function('return ' + processorStr)();
|
||||
|
||||
const jobs = items.map((item: any, index: number) => ({
|
||||
name: 'process-item',
|
||||
data: {
|
||||
type: 'process-item',
|
||||
service: 'batch-processor',
|
||||
provider: 'batch-item',
|
||||
operation: 'process-single-item',
|
||||
service: options.service || 'data-service',
|
||||
provider: options.provider || 'generic',
|
||||
operation: options.operation || 'process-item',
|
||||
payload: processor(item, index),
|
||||
priority: options.priority || 1
|
||||
},
|
||||
|
|
@ -260,6 +268,10 @@ async function storePayload<T>(
|
|||
options: ProcessOptions
|
||||
): Promise<string> {
|
||||
const cache = getCache();
|
||||
|
||||
// Wait for cache to be ready before storing
|
||||
await cache.waitForReady(5000);
|
||||
|
||||
const key = `payload_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
|
||||
|
||||
const payload = {
|
||||
|
|
@ -268,14 +280,24 @@ async function storePayload<T>(
|
|||
options: {
|
||||
delayPerItem: 1000,
|
||||
priority: options.priority || 1,
|
||||
retries: options.retries || 3
|
||||
retries: options.retries || 3,
|
||||
// Store routing information for later use
|
||||
service: options.service || 'data-service',
|
||||
provider: options.provider || 'generic',
|
||||
operation: options.operation || 'process-item'
|
||||
},
|
||||
createdAt: Date.now()
|
||||
};
|
||||
|
||||
await cache.set(key, JSON.stringify(payload), options.ttl || 86400);
|
||||
logger.debug('Storing batch payload', {
|
||||
key,
|
||||
itemCount: items.length,
|
||||
cacheReady: cache.isReady()
|
||||
});
|
||||
|
||||
logger.debug('Stored batch payload', {
|
||||
await cache.set(key, payload, options.ttl || 86400);
|
||||
|
||||
logger.debug('Stored batch payload successfully', {
|
||||
key,
|
||||
itemCount: items.length
|
||||
});
|
||||
|
|
@ -285,13 +307,27 @@ async function storePayload<T>(
|
|||
|
||||
async function loadPayload(key: string): Promise<any> {
|
||||
const cache = getCache();
|
||||
|
||||
// Wait for cache to be ready before loading
|
||||
await cache.waitForReady(5000);
|
||||
|
||||
logger.debug('Loading batch payload', {
|
||||
key,
|
||||
cacheReady: cache.isReady()
|
||||
});
|
||||
|
||||
const data = await cache.get(key);
|
||||
|
||||
if (!data) {
|
||||
logger.error('Payload not found in cache', {
|
||||
key,
|
||||
cacheReady: cache.isReady()
|
||||
});
|
||||
throw new Error(`Payload not found: ${key}`);
|
||||
}
|
||||
|
||||
return JSON.parse(data as string);
|
||||
logger.debug('Loaded batch payload successfully', { key });
|
||||
return data;
|
||||
}
|
||||
|
||||
async function cleanupPayload(key: string): Promise<void> {
|
||||
|
|
@ -356,7 +392,10 @@ export async function processSymbols(
|
|||
totalDelayMs: options.totalDelayMs,
|
||||
batchSize: options.batchSize || 100,
|
||||
priority: options.priority || 1,
|
||||
useBatching: options.useBatching || false
|
||||
useBatching: options.useBatching || false,
|
||||
service: options.service,
|
||||
provider: options.provider,
|
||||
operation: options.operation
|
||||
}
|
||||
);
|
||||
}
|
||||
|
|
@ -369,6 +408,9 @@ export async function processProxies(
|
|||
useBatching?: boolean;
|
||||
batchSize?: number;
|
||||
priority?: number;
|
||||
service?: string;
|
||||
provider?: string;
|
||||
operation?: string;
|
||||
}
|
||||
): Promise<BatchResult> {
|
||||
return processItems(
|
||||
|
|
@ -383,7 +425,10 @@ export async function processProxies(
|
|||
totalDelayMs: options.totalDelayMs,
|
||||
batchSize: options.batchSize || 200,
|
||||
priority: options.priority || 2,
|
||||
useBatching: options.useBatching || true
|
||||
useBatching: options.useBatching || true,
|
||||
service: options.service || 'data-service',
|
||||
provider: options.provider || 'proxy-service',
|
||||
operation: options.operation || 'check-proxy'
|
||||
}
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,545 +0,0 @@
|
|||
import { getLogger } from '@stock-bot/logger';
|
||||
import { createCache, CacheProvider } from '@stock-bot/cache';
|
||||
|
||||
export interface BatchConfig<T> {
|
||||
items: T[];
|
||||
batchSize?: number; // Optional - only used for batch mode
|
||||
totalDelayMs: number;
|
||||
jobNamePrefix: string;
|
||||
operation: string;
|
||||
service: string;
|
||||
provider: string;
|
||||
priority?: number;
|
||||
createJobData: (item: T, index: number) => any;
|
||||
removeOnComplete?: number;
|
||||
removeOnFail?: number;
|
||||
useBatching?: boolean; // Simple flag to choose mode
|
||||
payloadTtlHours?: number; // TTL for stored payloads (default 24 hours)
|
||||
}
|
||||
|
||||
const logger = getLogger('batch-processor');
|
||||
|
||||
export class BatchProcessor {
|
||||
private cacheProvider: CacheProvider;
|
||||
private isReady = false;
|
||||
private keyPrefix: string = 'batch:'; // Default key prefix for batch payloads
|
||||
constructor(
|
||||
private queueManager: any,
|
||||
private cacheOptions?: { keyPrefix?: string; ttl?: number } // Optional cache configuration
|
||||
) {
|
||||
this.keyPrefix = cacheOptions?.keyPrefix || 'batch:'; // Initialize cache provider with batch-specific settings
|
||||
this.cacheProvider = createCache({
|
||||
keyPrefix: this.keyPrefix,
|
||||
ttl: cacheOptions?.ttl || 86400 * 2, // 48 hours default
|
||||
enableMetrics: true
|
||||
});
|
||||
this.initialize();
|
||||
}
|
||||
/**
|
||||
* Initialize the batch processor and wait for cache to be ready
|
||||
*/
|
||||
async initialize(timeout: number = 10000): Promise<void> {
|
||||
if (this.isReady) {
|
||||
logger.warn('BatchProcessor already initialized');
|
||||
return;
|
||||
}
|
||||
|
||||
logger.info('Initializing BatchProcessor, waiting for cache to be ready...');
|
||||
|
||||
try {
|
||||
await this.cacheProvider.waitForReady(timeout);
|
||||
this.isReady = true;
|
||||
logger.info('BatchProcessor initialized successfully', {
|
||||
cacheReady: this.cacheProvider.isReady(),
|
||||
keyPrefix: this.keyPrefix,
|
||||
ttlHours: ((this.cacheOptions?.ttl || 86400 * 2) / 3600).toFixed(1)
|
||||
});
|
||||
} catch (error) {
|
||||
logger.warn('BatchProcessor cache not ready within timeout, continuing with fallback mode', {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
timeout
|
||||
});
|
||||
// Don't throw - mark as ready anyway and let cache operations use their fallback mechanisms
|
||||
this.isReady = true;
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Check if the batch processor is ready
|
||||
*/
|
||||
getReadyStatus(): boolean {
|
||||
return this.isReady; // Don't require cache to be ready, let individual operations handle fallbacks
|
||||
}
|
||||
/**
|
||||
* Generate a unique key for storing batch payload in Redis
|
||||
* Note: The cache provider will add its keyPrefix ('batch:') automatically
|
||||
*/
|
||||
private generatePayloadKey(jobNamePrefix: string, batchIndex: number): string {
|
||||
return `payload:${jobNamePrefix}:${batchIndex}:${Date.now()}`;
|
||||
}/**
|
||||
* Store batch payload in Redis and return the key
|
||||
*/ private async storeBatchPayload<T>(
|
||||
items: T[],
|
||||
config: BatchConfig<T>,
|
||||
batchIndex: number
|
||||
): Promise<string> {
|
||||
const payloadKey = this.generatePayloadKey(config.jobNamePrefix, batchIndex);
|
||||
const payload = {
|
||||
items,
|
||||
batchIndex,
|
||||
config: {
|
||||
...config,
|
||||
items: undefined // Don't store items twice
|
||||
},
|
||||
createdAt: new Date().toISOString()
|
||||
};
|
||||
|
||||
const ttlSeconds = (config.payloadTtlHours || 24) * 60 * 60;
|
||||
|
||||
try {
|
||||
await this.cacheProvider.set(
|
||||
payloadKey,
|
||||
JSON.stringify(payload),
|
||||
ttlSeconds
|
||||
);
|
||||
|
||||
logger.info('Stored batch payload in Redis', {
|
||||
payloadKey,
|
||||
itemCount: items.length,
|
||||
batchIndex,
|
||||
ttlHours: config.payloadTtlHours || 24
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error('Failed to store batch payload, job will run without caching', {
|
||||
payloadKey,
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
// Don't throw - the job can still run, just without the cached payload
|
||||
}
|
||||
|
||||
return payloadKey;
|
||||
}/**
|
||||
* Load batch payload from Redis
|
||||
*/
|
||||
private async loadBatchPayload<T>(payloadKey: string): Promise<{
|
||||
items: T[];
|
||||
batchIndex: number;
|
||||
config: BatchConfig<T>;
|
||||
} | null> {
|
||||
// Auto-initialize if not ready
|
||||
if (!this.cacheProvider.isReady() || !this.isReady) {
|
||||
logger.info('Cache provider not ready, initializing...', { payloadKey });
|
||||
try {
|
||||
await this.initialize();
|
||||
} catch (error) {
|
||||
logger.error('Failed to initialize cache provider for loading', {
|
||||
payloadKey,
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
throw new Error('Cache provider initialization failed - cannot load batch payload');
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
const payloadData = await this.cacheProvider.get<any>(payloadKey);
|
||||
|
||||
if (!payloadData) {
|
||||
logger.error('Batch payload not found in Redis', { payloadKey });
|
||||
throw new Error('Batch payload not found in Redis');
|
||||
}
|
||||
|
||||
// Handle both string and already-parsed object
|
||||
let payload;
|
||||
if (typeof payloadData === 'string') {
|
||||
payload = JSON.parse(payloadData);
|
||||
} else {
|
||||
// Already parsed by cache provider
|
||||
payload = payloadData;
|
||||
}
|
||||
|
||||
logger.info('Loaded batch payload from Redis', {
|
||||
payloadKey,
|
||||
itemCount: payload.items?.length || 0,
|
||||
batchIndex: payload.batchIndex
|
||||
});
|
||||
|
||||
return payload;
|
||||
} catch (error) {
|
||||
logger.error('Failed to load batch payload from Redis', {
|
||||
payloadKey,
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
throw new Error('Failed to load batch payload from Redis');
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Unified method that handles both direct and batch approaches
|
||||
*/
|
||||
async processItems<T>(config: BatchConfig<T>) {
|
||||
// Check if BatchProcessor is ready
|
||||
if (!this.getReadyStatus()) {
|
||||
logger.warn('BatchProcessor not ready, attempting to initialize...');
|
||||
await this.initialize();
|
||||
}
|
||||
|
||||
const { items, useBatching = false } = config;
|
||||
|
||||
if (items.length === 0) {
|
||||
return { totalItems: 0, jobsCreated: 0 };
|
||||
} // Final readiness check - wait briefly for cache to be ready
|
||||
if (!this.cacheProvider.isReady()) {
|
||||
logger.warn('Cache provider not ready, waiting briefly...');
|
||||
try {
|
||||
await this.cacheProvider.waitForReady(10000); // Wait up to 10 seconds
|
||||
logger.info('Cache provider became ready');
|
||||
} catch (error) {
|
||||
logger.warn('Cache provider still not ready, continuing with fallback mode');
|
||||
// Don't throw error - let the cache operations use their fallback mechanisms
|
||||
}
|
||||
}
|
||||
|
||||
logger.info('Starting item processing', {
|
||||
totalItems: items.length,
|
||||
mode: useBatching ? 'batch' : 'direct',
|
||||
cacheReady: this.cacheProvider.isReady()
|
||||
});
|
||||
|
||||
if (useBatching) {
|
||||
return await this.createBatchJobs(config);
|
||||
} else {
|
||||
return await this.createDirectJobs(config);
|
||||
}
|
||||
}
|
||||
|
||||
private async createDirectJobs<T>(config: BatchConfig<T>) {
|
||||
const {
|
||||
items,
|
||||
totalDelayMs,
|
||||
jobNamePrefix,
|
||||
operation,
|
||||
service,
|
||||
provider,
|
||||
priority = 2,
|
||||
createJobData,
|
||||
removeOnComplete = 5,
|
||||
removeOnFail = 3
|
||||
} = config;
|
||||
|
||||
const delayPerItem = Math.floor(totalDelayMs / items.length);
|
||||
const chunkSize = 100;
|
||||
let totalJobsCreated = 0;
|
||||
|
||||
logger.info('Creating direct jobs', {
|
||||
totalItems: items.length,
|
||||
delayPerItem: `${(delayPerItem / 1000).toFixed(1)}s`,
|
||||
estimatedDuration: `${(totalDelayMs / 1000 / 60 / 60).toFixed(1)} hours`
|
||||
});
|
||||
|
||||
// Process in chunks to avoid overwhelming Redis
|
||||
for (let i = 0; i < items.length; i += chunkSize) {
|
||||
const chunk = items.slice(i, i + chunkSize);
|
||||
|
||||
const jobs = chunk.map((item, chunkIndex) => {
|
||||
const globalIndex = i + chunkIndex;
|
||||
return {
|
||||
name: `${jobNamePrefix}-processing`,
|
||||
data: {
|
||||
type: `${jobNamePrefix}-processing`,
|
||||
service,
|
||||
provider,
|
||||
operation,
|
||||
payload: createJobData(item, globalIndex),
|
||||
priority
|
||||
},
|
||||
opts: {
|
||||
delay: globalIndex * delayPerItem,
|
||||
jobId: `${jobNamePrefix}:${globalIndex}:${Date.now()}`,
|
||||
removeOnComplete,
|
||||
removeOnFail
|
||||
}
|
||||
};
|
||||
});
|
||||
|
||||
try {
|
||||
const createdJobs = await this.queueManager.queue.addBulk(jobs);
|
||||
totalJobsCreated += createdJobs.length;
|
||||
|
||||
// Log progress every 500 jobs
|
||||
if (totalJobsCreated % 500 === 0 || i + chunkSize >= items.length) {
|
||||
logger.info('Direct job creation progress', {
|
||||
created: totalJobsCreated,
|
||||
total: items.length,
|
||||
percentage: `${((totalJobsCreated / items.length) * 100).toFixed(1)}%`
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Failed to create job chunk', {
|
||||
startIndex: i,
|
||||
chunkSize: chunk.length,
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
totalItems: items.length,
|
||||
jobsCreated: totalJobsCreated,
|
||||
mode: 'direct'
|
||||
};
|
||||
}
|
||||
private async createBatchJobs<T>(config: BatchConfig<T>) {
|
||||
const {
|
||||
items,
|
||||
batchSize = 200,
|
||||
totalDelayMs,
|
||||
jobNamePrefix,
|
||||
operation,
|
||||
service,
|
||||
provider,
|
||||
priority = 3
|
||||
} = config;
|
||||
|
||||
const totalBatches = Math.ceil(items.length / batchSize);
|
||||
const delayPerBatch = Math.floor(totalDelayMs / totalBatches);
|
||||
const chunkSize = 50; // Create batch jobs in chunks
|
||||
let batchJobsCreated = 0;
|
||||
|
||||
logger.info('Creating optimized batch jobs with Redis payload storage', {
|
||||
totalItems: items.length,
|
||||
batchSize,
|
||||
totalBatches,
|
||||
delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes`,
|
||||
payloadTtlHours: config.payloadTtlHours || 24
|
||||
});
|
||||
|
||||
// Create batch jobs in chunks
|
||||
for (let chunkStart = 0; chunkStart < totalBatches; chunkStart += chunkSize) {
|
||||
const chunkEnd = Math.min(chunkStart + chunkSize, totalBatches);
|
||||
const batchJobs = [];
|
||||
|
||||
for (let batchIndex = chunkStart; batchIndex < chunkEnd; batchIndex++) {
|
||||
const startIndex = batchIndex * batchSize;
|
||||
const endIndex = Math.min(startIndex + batchSize, items.length);
|
||||
const batchItems = items.slice(startIndex, endIndex);
|
||||
// Store batch payload in Redis and get reference key
|
||||
const payloadKey = await this.storeBatchPayload(batchItems, config, batchIndex);
|
||||
batchJobs.push({
|
||||
name: `${jobNamePrefix}-batch-processing`,
|
||||
data: {
|
||||
type: `${jobNamePrefix}-batch-processing`,
|
||||
service,
|
||||
provider,
|
||||
operation: `process-${jobNamePrefix}-batch`,
|
||||
payload: {
|
||||
// Optimized: only store reference and metadata
|
||||
payloadKey: payloadKey,
|
||||
batchIndex,
|
||||
total: totalBatches,
|
||||
itemCount: batchItems.length,
|
||||
configSnapshot: {
|
||||
jobNamePrefix: config.jobNamePrefix,
|
||||
operation: config.operation,
|
||||
service: config.service,
|
||||
provider: config.provider,
|
||||
priority: config.priority,
|
||||
removeOnComplete: config.removeOnComplete,
|
||||
removeOnFail: config.removeOnFail,
|
||||
totalDelayMs: config.totalDelayMs
|
||||
}
|
||||
},
|
||||
priority
|
||||
},
|
||||
opts: {
|
||||
delay: batchIndex * delayPerBatch,
|
||||
jobId: `${jobNamePrefix}-batch:${batchIndex}:${Date.now()}`
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
try {
|
||||
const createdJobs = await this.queueManager.queue.addBulk(batchJobs);
|
||||
batchJobsCreated += createdJobs.length;
|
||||
logger.info('Optimized batch chunk created', {
|
||||
chunkStart: chunkStart + 1,
|
||||
chunkEnd,
|
||||
created: createdJobs.length,
|
||||
totalCreated: batchJobsCreated,
|
||||
progress: `${((chunkEnd / totalBatches) * 100).toFixed(1)}%`,
|
||||
usingRedisStorage: true
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error('Failed to create batch chunk', {
|
||||
chunkStart,
|
||||
chunkEnd,
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
}
|
||||
|
||||
// Small delay between chunks
|
||||
if (chunkEnd < totalBatches) {
|
||||
await new Promise(resolve => setTimeout(resolve, 100));
|
||||
}
|
||||
} return {
|
||||
totalItems: items.length,
|
||||
batchJobsCreated,
|
||||
totalBatches,
|
||||
estimatedDurationHours: totalDelayMs / 1000 / 60 / 60,
|
||||
mode: 'batch',
|
||||
optimized: true
|
||||
};
|
||||
}
|
||||
/**
|
||||
* Process a batch (called by batch jobs)
|
||||
* Supports both optimized (Redis payload storage) and fallback modes
|
||||
*/
|
||||
async processBatch<T>(
|
||||
jobPayload: any,
|
||||
createJobData?: (item: T, index: number) => any
|
||||
) {
|
||||
let batchData: {
|
||||
items: T[];
|
||||
batchIndex: number;
|
||||
config: BatchConfig<T>;
|
||||
};
|
||||
|
||||
let total: number;
|
||||
|
||||
// Check if this is an optimized batch with Redis payload storage
|
||||
if (jobPayload.payloadKey) {
|
||||
logger.info('Processing optimized batch with Redis payload storage', {
|
||||
payloadKey: jobPayload.payloadKey,
|
||||
batchIndex: jobPayload.batchIndex,
|
||||
itemCount: jobPayload.itemCount
|
||||
});
|
||||
|
||||
// Load actual payload from Redis
|
||||
const loadedPayload = await this.loadBatchPayload<T>(jobPayload.payloadKey);
|
||||
|
||||
if (!loadedPayload) {
|
||||
throw new Error(`Failed to load batch payload from Redis: ${jobPayload.payloadKey}`);
|
||||
}
|
||||
|
||||
batchData = loadedPayload;
|
||||
total = jobPayload.total;
|
||||
|
||||
// Clean up Redis payload after loading (optional - you might want to keep it for retry scenarios)
|
||||
// await this.redisClient?.del(jobPayload.payloadKey);
|
||||
|
||||
} else {
|
||||
// Fallback: payload stored directly in job data
|
||||
logger.info('Processing batch with inline payload storage', {
|
||||
batchIndex: jobPayload.batchIndex,
|
||||
itemCount: jobPayload.items?.length || 0
|
||||
});
|
||||
|
||||
batchData = {
|
||||
items: jobPayload.items,
|
||||
batchIndex: jobPayload.batchIndex,
|
||||
config: jobPayload.config
|
||||
};
|
||||
total = jobPayload.total;
|
||||
}
|
||||
|
||||
const { items, batchIndex, config } = batchData;
|
||||
|
||||
logger.info('Processing batch', {
|
||||
batchIndex,
|
||||
batchSize: items.length,
|
||||
total,
|
||||
progress: `${((batchIndex + 1) / total * 100).toFixed(2)}%`,
|
||||
isOptimized: !!jobPayload.payloadKey
|
||||
});
|
||||
|
||||
const totalBatchDelayMs = config.totalDelayMs / total;
|
||||
const delayPerItem = Math.floor(totalBatchDelayMs / items.length);
|
||||
|
||||
const jobs = items.map((item, itemIndex) => {
|
||||
// Use the provided createJobData function or fall back to config
|
||||
const jobDataFn = createJobData || config.createJobData;
|
||||
|
||||
if (!jobDataFn) {
|
||||
throw new Error('createJobData function is required');
|
||||
}
|
||||
|
||||
const userData = jobDataFn(item, itemIndex);
|
||||
|
||||
return {
|
||||
name: `${config.jobNamePrefix}-processing`,
|
||||
data: {
|
||||
type: `${config.jobNamePrefix}-processing`,
|
||||
service: config.service,
|
||||
provider: config.provider,
|
||||
operation: config.operation,
|
||||
payload: {
|
||||
...userData,
|
||||
batchIndex,
|
||||
itemIndex,
|
||||
total,
|
||||
source: userData.source || 'batch-processing'
|
||||
},
|
||||
priority: config.priority || 2
|
||||
},
|
||||
opts: {
|
||||
delay: itemIndex * delayPerItem,
|
||||
jobId: `${config.jobNamePrefix}:${batchIndex}:${itemIndex}:${Date.now()}`,
|
||||
removeOnComplete: config.removeOnComplete || 5,
|
||||
removeOnFail: config.removeOnFail || 3
|
||||
}
|
||||
};
|
||||
});
|
||||
|
||||
try {
|
||||
const createdJobs = await this.queueManager.queue.addBulk(jobs);
|
||||
|
||||
logger.info('Batch processing completed', {
|
||||
batchIndex,
|
||||
totalItems: items.length,
|
||||
jobsCreated: createdJobs.length,
|
||||
progress: `${((batchIndex + 1) / total * 100).toFixed(2)}%`,
|
||||
memoryOptimized: !!jobPayload.payloadKey
|
||||
});
|
||||
|
||||
return {
|
||||
batchIndex,
|
||||
totalItems: items.length,
|
||||
jobsCreated: createdJobs.length,
|
||||
jobsFailed: 0,
|
||||
payloadKey: jobPayload.payloadKey || null
|
||||
};
|
||||
} catch (error) {
|
||||
logger.error('Failed to process batch', {
|
||||
batchIndex,
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
|
||||
return {
|
||||
batchIndex,
|
||||
totalItems: items.length,
|
||||
jobsCreated: 0,
|
||||
jobsFailed: items.length,
|
||||
payloadKey: jobPayload.payloadKey || null
|
||||
};
|
||||
}
|
||||
} /**
|
||||
* Clean up Redis payload after successful processing (optional)
|
||||
*/
|
||||
async cleanupBatchPayload(payloadKey: string): Promise<void> {
|
||||
if (!payloadKey) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!this.cacheProvider.isReady()) {
|
||||
logger.warn('Cache provider not ready - skipping cleanup', { payloadKey });
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
await this.cacheProvider.del(payloadKey);
|
||||
logger.info('Cleaned up batch payload from Redis', { payloadKey });
|
||||
} catch (error) {
|
||||
logger.warn('Failed to cleanup batch payload', {
|
||||
payloadKey,
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,61 +1,26 @@
|
|||
# Batch Processing Migration Guide
|
||||
|
||||
## ✅ MIGRATION COMPLETED
|
||||
|
||||
The migration from the complex `BatchProcessor` class to the new functional batch processing approach has been **successfully completed**. The old `BatchProcessor` class has been removed entirely.
|
||||
|
||||
## Overview
|
||||
|
||||
The new functional batch processing approach simplifies the complex `BatchProcessor` class into simple, composable functions.
|
||||
The new functional batch processing approach simplified the complex `BatchProcessor` class into simple, composable functions.
|
||||
|
||||
## Key Benefits
|
||||
## Key Benefits Achieved
|
||||
|
||||
✅ **90% less code** - From 545 lines to ~200 lines
|
||||
✅ **Simpler API** - Just function calls instead of class instantiation
|
||||
✅ **Better performance** - Less overhead and memory usage
|
||||
✅ **Same functionality** - All features preserved
|
||||
✅ **Type safe** - Better TypeScript support
|
||||
|
||||
## Migration Examples
|
||||
|
||||
### Before (Complex Class-based)
|
||||
|
||||
```typescript
|
||||
import { BatchProcessor } from '../utils/batch-processor';
|
||||
|
||||
const batchProcessor = new BatchProcessor(queueManager);
|
||||
await batchProcessor.initialize();
|
||||
|
||||
const result = await batchProcessor.processItems({
|
||||
items: symbols,
|
||||
batchSize: 200,
|
||||
totalDelayMs: 3600000,
|
||||
jobNamePrefix: 'yahoo-live',
|
||||
operation: 'live-data',
|
||||
service: 'data-service',
|
||||
provider: 'yahoo',
|
||||
priority: 2,
|
||||
createJobData: (symbol, index) => ({ symbol }),
|
||||
useBatching: true,
|
||||
removeOnComplete: 5,
|
||||
removeOnFail: 3
|
||||
});
|
||||
```
|
||||
|
||||
### After (Simple Functional)
|
||||
|
||||
```typescript
|
||||
import { processSymbols } from '../utils/batch-helpers';
|
||||
|
||||
const result = await processSymbols(symbols, queueManager, {
|
||||
operation: 'live-data',
|
||||
service: 'data-service',
|
||||
provider: 'yahoo',
|
||||
totalDelayMs: 3600000,
|
||||
useBatching: true,
|
||||
batchSize: 200,
|
||||
priority: 2
|
||||
});
|
||||
```
|
||||
✅ **No more payload conflicts** - Single consistent batch system
|
||||
|
||||
## Available Functions
|
||||
|
||||
All batch processing now uses the new functional approach:
|
||||
|
||||
### 1. `processItems<T>()` - Generic processing
|
||||
|
||||
```typescript
|
||||
|
|
@ -153,22 +118,12 @@ const result = await processBatchJob(jobData, queueManager);
|
|||
|
||||
## Provider Migration
|
||||
|
||||
### Update Provider Operations
|
||||
### ✅ Current Implementation
|
||||
|
||||
**Before:**
|
||||
```typescript
|
||||
'process-proxy-batch': async (payload: any) => {
|
||||
const batchProcessor = new BatchProcessor(queueManager);
|
||||
return await batchProcessor.processBatch(
|
||||
payload,
|
||||
(proxy: ProxyInfo) => ({ proxy, source: 'batch' })
|
||||
);
|
||||
}
|
||||
```
|
||||
All providers now use the new functional approach:
|
||||
|
||||
**After:**
|
||||
```typescript
|
||||
'process-proxy-batch': async (payload: any) => {
|
||||
'process-batch-items': async (payload: any) => {
|
||||
const { processBatchJob } = await import('../utils/batch-helpers');
|
||||
return await processBatchJob(payload, queueManager);
|
||||
}
|
||||
|
|
@ -200,14 +155,9 @@ curl -X POST http://localhost:3002/api/test/batch-custom \
|
|||
| API Complexity | High | Low | Much simpler |
|
||||
| Type Safety | Medium | High | Better types |
|
||||
|
||||
## Backward Compatibility
|
||||
## ✅ Migration Complete
|
||||
|
||||
The old `BatchProcessor` class is still available but deprecated. You can migrate gradually:
|
||||
|
||||
1. **Phase 1**: Use new functions for new features
|
||||
2. **Phase 2**: Migrate existing simple use cases
|
||||
3. **Phase 3**: Replace complex use cases
|
||||
4. **Phase 4**: Remove old BatchProcessor
|
||||
The old `BatchProcessor` class has been completely removed. All batch processing now uses the simplified functional approach.
|
||||
|
||||
## Common Issues & Solutions
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue