diff --git a/apps/data-service/src/providers/proxy.provider.ts b/apps/data-service/src/providers/proxy.provider.ts index d154e5a..ab32165 100644 --- a/apps/data-service/src/providers/proxy.provider.ts +++ b/apps/data-service/src/providers/proxy.provider.ts @@ -1,6 +1,7 @@ import { ProxyInfo } from 'libs/http/src/types'; import { ProviderConfig } from '../services/provider-registry.service'; import { getLogger } from '@stock-bot/logger'; +import { BatchProcessor } from '../utils/batch-processor'; // Create logger for this provider const logger = getLogger('proxy-provider'); @@ -16,8 +17,7 @@ const getEvery24HourCron = (): string => { export const proxyProvider: ProviderConfig = { name: 'proxy-service', service: 'proxy', - operations: { - 'fetch-and-check': async (payload: { sources?: string[] }) => { + operations: { 'fetch-and-check': async (payload: { sources?: string[] }) => { const { proxyService } = await import('./proxy.tasks'); const { queueManager } = await import('../services/queue.service'); @@ -28,234 +28,148 @@ export const proxyProvider: ProviderConfig = { if (proxiesCount === 0) { logger.info('No proxies fetched, skipping job creation'); - return { proxiesFetched: 0, batchJobsCreated: 0 }; + return { proxiesFetched: 0, jobsCreated: 0 }; } - try { - // Optimized batch size for 800k proxies - const batchSize = 200; // Process 200 proxies per batch job - const totalBatches = Math.ceil(proxies.length / batchSize); - const totalDelayMs = 24 * 60 * 60 * 1000; // 24 hours - const delayPerBatch = Math.floor(totalDelayMs / totalBatches); - - logger.info('Creating proxy validation batch jobs', { - totalProxies: proxies.length, - batchSize, - totalBatches, - delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes`, - estimatedCompletion: '24 hours' - }); - - // Process batches in chunks to avoid memory issues - const batchCreationChunkSize = 50; // Create 50 batch jobs at a time - let batchJobsCreated = 0; - - for (let chunkStart = 0; chunkStart < totalBatches; chunkStart += batchCreationChunkSize) { - const chunkEnd = Math.min(chunkStart + batchCreationChunkSize, totalBatches); - - // Create batch jobs in parallel for this chunk - const batchPromises = []; - - for (let i = chunkStart; i < chunkEnd; i++) { - const startIndex = i * batchSize; - const endIndex = Math.min(startIndex + batchSize, proxies.length); - const batchProxies = proxies.slice(startIndex, endIndex); - const delay = i * delayPerBatch; - - const batchPromise = queueManager.addJob({ - type: 'proxy-batch-validation', - service: 'proxy', - provider: 'proxy-service', - operation: 'process-proxy-batch', - payload: { - proxies: batchProxies, - batchIndex: i, - totalBatches, - source: 'fetch-and-check' - }, - priority: 3 - }, { - delay: delay, - jobId: `proxy-batch-${i}-${Date.now()}` - }); - - batchPromises.push(batchPromise); - } - - // Wait for this chunk to complete - const results = await Promise.allSettled(batchPromises); - const successful = results.filter(r => r.status === 'fulfilled').length; - const failed = results.filter(r => r.status === 'rejected').length; - - batchJobsCreated += successful; - - logger.info('Batch chunk created', { - chunkStart: chunkStart + 1, - chunkEnd, - totalChunks: Math.ceil(totalBatches / batchCreationChunkSize), - successful, - failed, - totalCreated: batchJobsCreated, - progress: `${((chunkEnd / totalBatches) * 100).toFixed(1)}%` - }); - - // Small delay between chunks to prevent overwhelming Redis - if (chunkEnd < totalBatches) { - await new Promise(resolve => setTimeout(resolve, 100)); - } - } - - logger.info('All batch jobs creation completed', { - totalProxies: proxies.length, - batchJobsCreated, - totalBatches, - avgProxiesPerBatch: Math.floor(proxies.length / totalBatches), - estimatedDuration: '24 hours' - }); - - return { - proxiesFetched: proxiesCount, - batchJobsCreated, - totalBatches, - avgProxiesPerBatch: Math.floor(proxies.length / totalBatches) - }; - - } catch (error) { - logger.error('Failed to create batch jobs', { - proxiesCount, - error: error instanceof Error ? error.message : String(error) - }); - throw error; - } - }, - - 'process-proxy-batch': async (payload: { - proxies: ProxyInfo[], - batchIndex: number, - totalBatches: number, - source: string - }) => { - const { queueManager } = await import('../services/queue.service'); + const batchProcessor = new BatchProcessor(queueManager); - logger.info('Processing proxy batch', { - batchIndex: payload.batchIndex, - batchSize: payload.proxies.length, - totalBatches: payload.totalBatches, - progress: `${((payload.batchIndex + 1) / payload.totalBatches * 100).toFixed(2)}%` + // Environment-configurable settings + const targetHours = parseInt(process.env.PROXY_VALIDATION_HOURS || '8'); + const batchSize = parseInt(process.env.PROXY_BATCH_SIZE || '200'); + const useDirectMode = process.env.PROXY_DIRECT_MODE === 'true'; + + logger.info('Proxy validation configuration', { + targetHours, + batchSize, + useDirectMode, + totalProxies: proxies.length }); - const batchDelayMs = 15 * 60 * 1000; // 15 minutes per batch - const delayPerProxy = Math.floor(batchDelayMs / payload.proxies.length); - - logger.info('Batch timing calculated', { - batchIndex: payload.batchIndex, - proxiesInBatch: payload.proxies.length, - batchDurationMinutes: 30, - delayPerProxySeconds: Math.floor(delayPerProxy / 1000), - delayPerProxyMs: delayPerProxy - }); - - // Use BullMQ's addBulk for better performance - const jobsToCreate = payload.proxies.map((proxy, i) => ({ - name: 'proxy-validation', - data: { - type: 'proxy-validation', + if (useDirectMode) { + // Direct approach - simpler, creates all jobs immediately + const result = await batchProcessor.createDirectJobs({ + items: proxies, + batchSize: 0, // Not used in direct mode + totalDelayMs: targetHours * 60 * 60 * 1000, + jobNamePrefix: 'proxy', + operation: 'check-proxy', service: 'proxy', provider: 'proxy-service', + priority: 2, + createJobData: (proxy: ProxyInfo) => ({ + proxy, + source: 'fetch-and-check' + }), + removeOnComplete: 5, + removeOnFail: 3 + }); + + return { + proxiesFetched: result.totalItems, + jobsCreated: result.jobsCreated, + mode: 'direct' + }; + } else { // Batch approach - creates batch jobs that create individual jobs + const result = await batchProcessor.createBatchJobs({ + items: proxies, + batchSize, + totalDelayMs: targetHours * 60 * 60 * 1000, + jobNamePrefix: 'proxy', operation: 'check-proxy', - payload: { - proxy: proxy, - source: payload.source, - batchIndex: payload.batchIndex, - proxyIndexInBatch: i, - totalBatch: payload.totalBatches - }, - priority: 2 - }, - opts: { - delay: i * delayPerProxy, - jobId: `proxy-${proxy.host}-${proxy.port}-batch${payload.batchIndex}-${Date.now()}-${i}`, + service: 'proxy', + provider: 'proxy-service', + priority: 3, + createJobData: (proxy: ProxyInfo) => ({ + proxy, + source: 'fetch-and-check' + }), removeOnComplete: 3, removeOnFail: 5 - } - })); - - try { - const jobs = await queueManager.addBulk(jobsToCreate); - - logger.info('Batch processing completed successfully', { - batchIndex: payload.batchIndex, - totalProxies: payload.proxies.length, - jobsCreated: jobs.length, - batchDelay: '15 minutes', - progress: `${((payload.batchIndex + 1) / payload.totalBatches * 100).toFixed(2)}%` }); return { - batchIndex: payload.batchIndex, - totalProxies: payload.proxies.length, - jobsCreated: jobs.length, - jobsFailed: 0 - }; - } catch (error) { - logger.error('Failed to create validation jobs for batch', { - batchIndex: payload.batchIndex, - batchSize: payload.proxies.length, - error: error instanceof Error ? error.message : String(error) - }); - - return { - batchIndex: payload.batchIndex, - totalProxies: payload.proxies.length, - jobsCreated: 0, - jobsFailed: payload.proxies.length + proxiesFetched: result.totalItems, + batchJobsCreated: result.batchJobsCreated, + totalBatches: result.totalBatches, + estimatedDurationHours: result.estimatedDurationHours, + mode: 'batch' }; } }, - + 'process-proxy-batch': async (payload: any) => { + const { queueManager } = await import('../services/queue.service'); + const batchProcessor = new BatchProcessor(queueManager); + + return await batchProcessor.processBatch( + payload, + (proxy: ProxyInfo, index: number) => ({ + proxy, + source: payload.config?.source || 'batch-processing' + }) + ); + }, + 'check-proxy': async (payload: { proxy: ProxyInfo, source?: string, batchIndex?: number, - proxyIndexInBatch?: number, - totalBatch?: number + itemIndex?: number, + total?: number }) => { const { checkProxy } = await import('./proxy.tasks'); - logger.debug('Checking individual proxy', { - proxy: `${payload.proxy.host}:${payload.proxy.port}`, - batchIndex: payload.batchIndex, - proxyIndex: payload.proxyIndexInBatch, - source: payload.source - }); - - const result = await checkProxy(payload.proxy); - - logger.debug('Proxy check completed', { - proxy: `${payload.proxy.host}:${payload.proxy.port}`, - isWorking: result.isWorking, - responseTime: result.responseTime, - batchIndex: payload.batchIndex - }); - - return { - result: result, - batchInfo: { - batchIndex: payload.batchIndex, - proxyIndex: payload.proxyIndexInBatch, - total: payload.totalBatch, - source: payload.source - } - }; + try { + const result = await checkProxy(payload.proxy); + + logger.debug('Proxy validated', { + proxy: `${payload.proxy.host}:${payload.proxy.port}`, + isWorking: result.isWorking, + responseTime: result.responseTime, + batchIndex: payload.batchIndex + }); + + return { + result, + proxy: payload.proxy, + // Only include batch info if it exists (for batch mode) + ...(payload.batchIndex !== undefined && { + batchInfo: { + batchIndex: payload.batchIndex, + itemIndex: payload.itemIndex, + total: payload.total, + source: payload.source + } + }) + }; + } catch (error) { + logger.warn('Proxy validation failed', { + proxy: `${payload.proxy.host}:${payload.proxy.port}`, + error: error instanceof Error ? error.message : String(error), + batchIndex: payload.batchIndex + }); + + return { + result: { isWorking: false, error: String(error) }, + proxy: payload.proxy, + // Only include batch info if it exists (for batch mode) + ...(payload.batchIndex !== undefined && { + batchInfo: { + batchIndex: payload.batchIndex, + itemIndex: payload.itemIndex, + total: payload.total, + source: payload.source + } + }) + }; + } } }, - - scheduledJobs: [ + scheduledJobs: [ { type: 'proxy-maintenance', operation: 'fetch-and-check', payload: {}, - cronPattern: getEvery24HourCron(), // Every 15 minutes + cronPattern: getEvery24HourCron(), priority: 5, immediately: true, description: 'Fetch and validate proxy list from sources' diff --git a/apps/data-service/src/utils/batch-processor.ts b/apps/data-service/src/utils/batch-processor.ts new file mode 100644 index 0000000..bcfb87b --- /dev/null +++ b/apps/data-service/src/utils/batch-processor.ts @@ -0,0 +1,327 @@ +import { getLogger } from '@stock-bot/logger'; + +export interface BatchConfig { + items: T[]; + batchSize: number; + totalDelayMs: number; + jobNamePrefix: string; + operation: string; + service: string; + provider: string; + priority?: number; + createJobData: (item: T, index: number) => any; // Simplified - no batchInfo parameter + removeOnComplete?: number; + removeOnFail?: number; +} + +export interface BatchInfo { + batchIndex: number; + itemIndex: number; // Changed to match proxy provider + total: number; // Changed to match proxy provider + totalItems: number; +} + +export interface BatchResult { + totalItems: number; + batchJobsCreated: number; + totalBatches: number; + avgItemsPerBatch: number; + estimatedDurationHours: number; +} + +const logger = getLogger('batch-processor'); + +export class BatchProcessor { + constructor(private queueManager: any) {} + + /** + * Create batch jobs that will later create individual item jobs + */ + async createBatchJobs(config: BatchConfig): Promise { + const { + items, + batchSize, + totalDelayMs, + jobNamePrefix, + operation, + service, + provider, + priority = 3 + } = config; + + if (items.length === 0) { + return { + totalItems: 0, + batchJobsCreated: 0, + totalBatches: 0, + avgItemsPerBatch: 0, + estimatedDurationHours: 0 + }; + } + + const totalBatches = Math.ceil(items.length / batchSize); + const delayPerBatch = Math.floor(totalDelayMs / totalBatches); + + logger.info('Creating batch jobs', { + totalItems: items.length, + batchSize, + totalBatches, + delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes`, + estimatedDuration: `${(totalDelayMs / 1000 / 60 / 60).toFixed(1)} hours`, + jobPrefix: jobNamePrefix + }); + + const batchCreationChunkSize = 50; + let batchJobsCreated = 0; + + for (let chunkStart = 0; chunkStart < totalBatches; chunkStart += batchCreationChunkSize) { + const chunkEnd = Math.min(chunkStart + batchCreationChunkSize, totalBatches); + const batchPromises = []; + + for (let i = chunkStart; i < chunkEnd; i++) { + const startIndex = i * batchSize; + const endIndex = Math.min(startIndex + batchSize, items.length); + const batchItems = items.slice(startIndex, endIndex); + const delay = i * delayPerBatch; + const batchPromise = this.queueManager.addJob({ + type: `${jobNamePrefix}-batch-processing`, + service, + provider, + operation: `process-${jobNamePrefix}-batch`, + payload: { + items: batchItems, + batchIndex: i, + totalBatch: totalBatches, // Changed to match your property name + batchSize, + config: { + jobNamePrefix, + operation, + service, + provider, + priority: priority - 1, // Individual jobs get slightly lower priority + removeOnComplete: config.removeOnComplete || 5, + removeOnFail: config.removeOnFail || 5 + } + }, + priority + }, { + delay: delay, + jobId: `${jobNamePrefix}-batch-${i}-${Date.now()}` + }); + + batchPromises.push(batchPromise); + } + + const results = await Promise.allSettled(batchPromises); + const successful = results.filter(r => r.status === 'fulfilled').length; + const failed = results.filter(r => r.status === 'rejected').length; + + batchJobsCreated += successful; + + logger.info('Batch chunk created', { + chunkStart: chunkStart + 1, + chunkEnd, + successful, + failed, + totalCreated: batchJobsCreated, + progress: `${((chunkEnd / totalBatches) * 100).toFixed(1)}%` + }); + + if (chunkEnd < totalBatches) { + await new Promise(resolve => setTimeout(resolve, 100)); + } + } + + const result = { + totalItems: items.length, + batchJobsCreated, + totalBatches, + avgItemsPerBatch: Math.floor(items.length / totalBatches), + estimatedDurationHours: totalDelayMs / 1000 / 60 / 60 + }; + + logger.info('Batch jobs creation completed', result); + return result; + } + /** + * Process a batch by creating individual item jobs + */ + async processBatch(payload: { + items: T[]; + batchIndex: number; + totalBatch: number; // Changed to match common property name + batchSize: number; + config: { + jobNamePrefix: string; + operation: string; + service: string; + provider: string; + priority: number; + removeOnComplete: number; + removeOnFail: number; + }; + }, createJobData: (item: T, index: number) => any): Promise<{ + batchIndex: number; + totalItems: number; + jobsCreated: number; + jobsFailed: number; + }> { + const { items, batchIndex, totalBatch, config } = payload; + + logger.info('Processing batch', { + batchIndex, + batchSize: items.length, + totalBatch, + progress: `${((batchIndex + 1) / totalBatch * 100).toFixed(2)}%` + }); + + // Spread items over a reasonable time period + const batchDelayMs = 15 * 60 * 1000; // 15 minutes per batch + const delayPerItem = Math.floor(batchDelayMs / items.length); + + const jobsToCreate = items.map((item, i) => { + // Get user data first + const userData = createJobData(item, i); + + // Automatically merge with batch info using generic property names + const finalPayload = { + ...userData, + batchIndex, + itemIndexInBatch: i, // Generic property name + totalBatch, // Generic property name + source: userData.source || 'batch-processing' + }; + + return { + name: `${config.jobNamePrefix}-processing`, + data: { + type: `${config.jobNamePrefix}-processing`, + service: config.service, + provider: config.provider, + operation: config.operation, + payload: finalPayload, + priority: config.priority + }, + opts: { + delay: i * delayPerItem, + jobId: `${config.jobNamePrefix}-${batchIndex}-${i}-${Date.now()}`, + removeOnComplete: config.removeOnComplete, + removeOnFail: config.removeOnFail + } + }; + }); + + try { + const jobs = await this.queueManager.queue.addBulk(jobsToCreate); + + logger.info('Batch processing completed', { + batchIndex, + totalItems: items.length, + jobsCreated: jobs.length, + batchDelay: '15 minutes', + progress: `${((batchIndex + 1) / totalBatch * 100).toFixed(2)}%` + }); + + return { + batchIndex, + totalItems: items.length, + jobsCreated: jobs.length, + jobsFailed: 0 + }; + } catch (error) { + logger.error('Failed to create batch jobs', { + batchIndex, + batchSize: items.length, + error: error instanceof Error ? error.message : String(error) + }); + + return { + batchIndex, + totalItems: items.length, + jobsCreated: 0, + jobsFailed: items.length + }; + } + } + + /** + * Directly create individual jobs without batching (simplified approach) + */ + async createDirectJobs(config: BatchConfig): Promise<{ + totalItems: number; + jobsCreated: number; + }> { + const { + items, + totalDelayMs, + jobNamePrefix, + operation, + service, + provider, + priority = 2, + createJobData, + removeOnComplete = 5, + removeOnFail = 3 + } = config; + + if (items.length === 0) { + return { totalItems: 0, jobsCreated: 0 }; + } + + const delayPerItem = Math.floor(totalDelayMs / items.length); + const createBatchSize = 100; // Create jobs in chunks + let totalJobsCreated = 0; + + logger.info('Creating direct jobs', { + totalItems: items.length, + delayPerItem: `${(delayPerItem / 1000).toFixed(1)}s`, + estimatedDuration: `${(totalDelayMs / 1000 / 60 / 60).toFixed(1)} hours` + }); + + for (let i = 0; i < items.length; i += createBatchSize) { + const batch = items.slice(i, i + createBatchSize); + const jobsToCreate = batch.map((item, batchIndex) => { + const globalIndex = i + batchIndex; + + return { + name: `${jobNamePrefix}-processing`, + data: { + type: `${jobNamePrefix}-processing`, + service, + provider, + operation, + payload: createJobData(item, globalIndex), + priority + }, + opts: { + delay: globalIndex * delayPerItem, + jobId: `${jobNamePrefix}-${globalIndex}-${Date.now()}`, + removeOnComplete, + removeOnFail + } + }; + }); + + try { + const jobs = await this.queueManager.queue.addBulk(jobsToCreate); + totalJobsCreated += jobs.length; + + if ((i + createBatchSize) % 500 === 0 || i + createBatchSize >= items.length) { + logger.info('Direct job creation progress', { + created: totalJobsCreated, + total: items.length, + percentage: `${((totalJobsCreated / items.length) * 100).toFixed(1)}%` + }); + } + } catch (error) { + logger.error('Failed to create direct job batch', { + startIndex: i, + batchSize: batch.length, + error: error instanceof Error ? error.message : String(error) + }); + } + } + + return { totalItems: items.length, jobsCreated: totalJobsCreated }; + } +}