From b2817656b3a0849ae80a163b6a77490f90082064 Mon Sep 17 00:00:00 2001 From: Bojan Kucera Date: Mon, 9 Jun 2025 08:07:45 -0400 Subject: [PATCH] finished proxy-provider - need to test --- .../src/providers/proxy.provider.ts | 102 ++-- .../data-service/src/utils/batch-processor.ts | 438 ++++++++---------- 2 files changed, 228 insertions(+), 312 deletions(-) diff --git a/apps/data-service/src/providers/proxy.provider.ts b/apps/data-service/src/providers/proxy.provider.ts index ab32165..b565ef0 100644 --- a/apps/data-service/src/providers/proxy.provider.ts +++ b/apps/data-service/src/providers/proxy.provider.ts @@ -17,96 +17,50 @@ const getEvery24HourCron = (): string => { export const proxyProvider: ProviderConfig = { name: 'proxy-service', service: 'proxy', - operations: { 'fetch-and-check': async (payload: { sources?: string[] }) => { + operations: { + 'fetch-and-check': async (payload: { sources?: string[] }) => { const { proxyService } = await import('./proxy.tasks'); const { queueManager } = await import('../services/queue.service'); await queueManager.drainQueue(); - const proxies = await proxyService.fetchProxiesFromSources(); - const proxiesCount = proxies.length; - if (proxiesCount === 0) { - logger.info('No proxies fetched, skipping job creation'); + if (proxies.length === 0) { return { proxiesFetched: 0, jobsCreated: 0 }; } const batchProcessor = new BatchProcessor(queueManager); - // Environment-configurable settings - const targetHours = parseInt(process.env.PROXY_VALIDATION_HOURS || '8'); - const batchSize = parseInt(process.env.PROXY_BATCH_SIZE || '200'); - const useDirectMode = process.env.PROXY_DIRECT_MODE === 'true'; - - logger.info('Proxy validation configuration', { - targetHours, - batchSize, - useDirectMode, - totalProxies: proxies.length + // Simplified configuration + const result = await batchProcessor.processItems({ + items: proxies, + batchSize: parseInt(process.env.PROXY_BATCH_SIZE || '200'), + totalDelayMs: parseInt(process.env.PROXY_VALIDATION_HOURS || '8') * 60 * 60 * 1000, + jobNamePrefix: 'proxy', + operation: 'check-proxy', + service: 'proxy', + provider: 'proxy-service', + priority: 2, + useBatching: process.env.PROXY_DIRECT_MODE !== 'true', // Simple boolean flag + createJobData: (proxy: ProxyInfo) => ({ + proxy, + source: 'fetch-and-check' + }), + removeOnComplete: 5, + removeOnFail: 3 }); - if (useDirectMode) { - // Direct approach - simpler, creates all jobs immediately - const result = await batchProcessor.createDirectJobs({ - items: proxies, - batchSize: 0, // Not used in direct mode - totalDelayMs: targetHours * 60 * 60 * 1000, - jobNamePrefix: 'proxy', - operation: 'check-proxy', - service: 'proxy', - provider: 'proxy-service', - priority: 2, - createJobData: (proxy: ProxyInfo) => ({ - proxy, - source: 'fetch-and-check' - }), - removeOnComplete: 5, - removeOnFail: 3 - }); - - return { - proxiesFetched: result.totalItems, - jobsCreated: result.jobsCreated, - mode: 'direct' - }; - } else { // Batch approach - creates batch jobs that create individual jobs - const result = await batchProcessor.createBatchJobs({ - items: proxies, - batchSize, - totalDelayMs: targetHours * 60 * 60 * 1000, - jobNamePrefix: 'proxy', - operation: 'check-proxy', - service: 'proxy', - provider: 'proxy-service', - priority: 3, - createJobData: (proxy: ProxyInfo) => ({ - proxy, - source: 'fetch-and-check' - }), - removeOnComplete: 3, - removeOnFail: 5 - }); - - return { - proxiesFetched: result.totalItems, - batchJobsCreated: result.batchJobsCreated, - totalBatches: result.totalBatches, - estimatedDurationHours: result.estimatedDurationHours, - mode: 'batch' - }; - } + return { + proxiesFetched: result.totalItems, + ...result + }; }, - 'process-proxy-batch': async (payload: any) => { + + 'process-proxy-batch': async (payload: any) => { + // Process a batch of proxies - uses the fetch-and-check JobNamePrefix process-(proxy)-batch const { queueManager } = await import('../services/queue.service'); const batchProcessor = new BatchProcessor(queueManager); - - return await batchProcessor.processBatch( - payload, - (proxy: ProxyInfo, index: number) => ({ - proxy, - source: payload.config?.source || 'batch-processing' - }) - ); + return await batchProcessor.processBatch(payload); }, 'check-proxy': async (payload: { diff --git a/apps/data-service/src/utils/batch-processor.ts b/apps/data-service/src/utils/batch-processor.ts index 61e54c2..b460122 100644 --- a/apps/data-service/src/utils/batch-processor.ts +++ b/apps/data-service/src/utils/batch-processor.ts @@ -2,31 +2,17 @@ import { getLogger } from '@stock-bot/logger'; export interface BatchConfig { items: T[]; - batchSize: number; + batchSize?: number; // Optional - only used for batch mode totalDelayMs: number; jobNamePrefix: string; operation: string; service: string; provider: string; priority?: number; - createJobData: (item: T, index: number) => any; // Simplified - no batchInfo parameter + createJobData: (item: T, index: number) => any; removeOnComplete?: number; removeOnFail?: number; -} - -export interface BatchInfo { - batchIndex: number; - itemIndex: number; // Changed to match proxy provider - total: number; // Changed to match proxy provider - totalItems: number; -} - -export interface BatchResult { - totalItems: number; - batchJobsCreated: number; - totalBatches: number; - avgItemsPerBatch: number; - estimatedDurationHours: number; + useBatching?: boolean; // Simple flag to choose mode } const logger = getLogger('batch-processor'); @@ -35,218 +21,23 @@ export class BatchProcessor { constructor(private queueManager: any) {} /** - * Create batch jobs that will later create individual item jobs + * Unified method that handles both direct and batch approaches */ - async createBatchJobs(config: BatchConfig): Promise { - const { - items, - batchSize, - totalDelayMs, - jobNamePrefix, - operation, - service, - provider, - priority = 3 - } = config; + async processItems(config: BatchConfig) { + const { items, useBatching = false } = config; if (items.length === 0) { - return { - totalItems: 0, - batchJobsCreated: 0, - totalBatches: 0, - avgItemsPerBatch: 0, - estimatedDurationHours: 0 - }; + return { totalItems: 0, jobsCreated: 0 }; } - const totalBatches = Math.ceil(items.length / batchSize); - const delayPerBatch = Math.floor(totalDelayMs / totalBatches); - - logger.info('Creating batch jobs', { - totalItems: items.length, - batchSize, - totalBatches, - delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes`, - estimatedDuration: `${(totalDelayMs / 1000 / 60 / 60).toFixed(1)} hours`, - jobPrefix: jobNamePrefix - }); - - const batchCreationChunkSize = 50; - let batchJobsCreated = 0; - - for (let chunkStart = 0; chunkStart < totalBatches; chunkStart += batchCreationChunkSize) { - const chunkEnd = Math.min(chunkStart + batchCreationChunkSize, totalBatches); - const batchPromises = []; - - for (let i = chunkStart; i < chunkEnd; i++) { - const startIndex = i * batchSize; - const endIndex = Math.min(startIndex + batchSize, items.length); - const batchItems = items.slice(startIndex, endIndex); - const delay = i * delayPerBatch; - const batchPromise = this.queueManager.addJob({ - type: `${jobNamePrefix}-batch-processing`, - service, - provider, - operation: `process-${jobNamePrefix}-batch`, payload: { - items: batchItems, - batchIndex: i, - total: totalBatches, // Changed to total to match proxy provider - batchSize, - config: { - jobNamePrefix, - operation, - service, - provider, - priority: priority - 1, // Individual jobs get slightly lower priority - removeOnComplete: config.removeOnComplete || 5, - removeOnFail: config.removeOnFail || 5 - } - }, - priority - }, { - delay: delay, - jobId: `${jobNamePrefix}-batch-${i}-${Date.now()}` - }); - - batchPromises.push(batchPromise); - } - - const results = await Promise.allSettled(batchPromises); - const successful = results.filter(r => r.status === 'fulfilled').length; - const failed = results.filter(r => r.status === 'rejected').length; - - batchJobsCreated += successful; - - logger.info('Batch chunk created', { - chunkStart: chunkStart + 1, - chunkEnd, - successful, - failed, - totalCreated: batchJobsCreated, - progress: `${((chunkEnd / totalBatches) * 100).toFixed(1)}%` - }); - - if (chunkEnd < totalBatches) { - await new Promise(resolve => setTimeout(resolve, 100)); - } - } - - const result = { - totalItems: items.length, - batchJobsCreated, - totalBatches, - avgItemsPerBatch: Math.floor(items.length / totalBatches), - estimatedDurationHours: totalDelayMs / 1000 / 60 / 60 - }; - - logger.info('Batch jobs creation completed', result); - return result; - } - /** - * Process a batch by creating individual item jobs - */ async processBatch(payload: { - items: T[]; - batchIndex: number; - total: number; // Changed to match proxy provider - batchSize: number; - config: { - jobNamePrefix: string; - operation: string; - service: string; - provider: string; - priority: number; - removeOnComplete: number; - removeOnFail: number; - }; - }, createJobData: (item: T, index: number) => any): Promise<{ - batchIndex: number; - totalItems: number; - jobsCreated: number; - jobsFailed: number; - }> { - const { items, batchIndex, total, config } = payload; - logger.info('Processing batch', { - batchIndex, - batchSize: items.length, - total, - progress: `${((batchIndex + 1) / total * 100).toFixed(2)}%` - }); - - // Spread items over a reasonable time period - const batchDelayMs = 15 * 60 * 1000; // 15 minutes per batch - const delayPerItem = Math.floor(batchDelayMs / items.length); - - const jobsToCreate = items.map((item, i) => { - // Get user data first - const userData = createJobData(item, i); - - // Automatically merge with batch info using your property names - const finalPayload = { - ...userData, - batchIndex, - itemIndex: i, // Changed to match proxy provider - total, // Changed to match proxy provider - source: userData.source || 'batch-processing' - }; - - return { - name: `${config.jobNamePrefix}-processing`, - data: { - type: `${config.jobNamePrefix}-processing`, - service: config.service, - provider: config.provider, - operation: config.operation, - payload: finalPayload, - priority: config.priority - }, - opts: { - delay: i * delayPerItem, - jobId: `${config.jobNamePrefix}-${batchIndex}-${i}-${Date.now()}`, - removeOnComplete: config.removeOnComplete, - removeOnFail: config.removeOnFail - } - }; - }); - - try { - const jobs = await this.queueManager.queue.addBulk(jobsToCreate); - logger.info('Batch processing completed', { - batchIndex, - totalItems: items.length, - jobsCreated: jobs.length, - batchDelay: '15 minutes', - progress: `${((batchIndex + 1) / total * 100).toFixed(2)}%` - }); - - return { - batchIndex, - totalItems: items.length, - jobsCreated: jobs.length, - jobsFailed: 0 - }; - } catch (error) { - logger.error('Failed to create batch jobs', { - batchIndex, - batchSize: items.length, - error: error instanceof Error ? error.message : String(error) - }); - - return { - batchIndex, - totalItems: items.length, - jobsCreated: 0, - jobsFailed: items.length - }; + if (useBatching) { + return await this.createBatchJobs(config); + } else { + return await this.createDirectJobs(config); } } - /** - * Directly create individual jobs without batching (simplified approach) - */ - async createDirectJobs(config: BatchConfig): Promise<{ - totalItems: number; - jobsCreated: number; - }> { + private async createDirectJobs(config: BatchConfig) { const { items, totalDelayMs, @@ -260,12 +51,8 @@ export class BatchProcessor { removeOnFail = 3 } = config; - if (items.length === 0) { - return { totalItems: 0, jobsCreated: 0 }; - } - const delayPerItem = Math.floor(totalDelayMs / items.length); - const createBatchSize = 100; // Create jobs in chunks + const chunkSize = 100; let totalJobsCreated = 0; logger.info('Creating direct jobs', { @@ -274,11 +61,12 @@ export class BatchProcessor { estimatedDuration: `${(totalDelayMs / 1000 / 60 / 60).toFixed(1)} hours` }); - for (let i = 0; i < items.length; i += createBatchSize) { - const batch = items.slice(i, i + createBatchSize); - const jobsToCreate = batch.map((item, batchIndex) => { - const globalIndex = i + batchIndex; - + // Process in chunks to avoid overwhelming Redis + for (let i = 0; i < items.length; i += chunkSize) { + const chunk = items.slice(i, i + chunkSize); + + const jobs = chunk.map((item, chunkIndex) => { + const globalIndex = i + chunkIndex; return { name: `${jobNamePrefix}-processing`, data: { @@ -299,10 +87,11 @@ export class BatchProcessor { }); try { - const jobs = await this.queueManager.queue.addBulk(jobsToCreate); - totalJobsCreated += jobs.length; + const createdJobs = await this.queueManager.queue.addBulk(jobs); + totalJobsCreated += createdJobs.length; - if ((i + createBatchSize) % 500 === 0 || i + createBatchSize >= items.length) { + // Log progress every 500 jobs + if (totalJobsCreated % 500 === 0 || i + chunkSize >= items.length) { logger.info('Direct job creation progress', { created: totalJobsCreated, total: items.length, @@ -310,14 +99,187 @@ export class BatchProcessor { }); } } catch (error) { - logger.error('Failed to create direct job batch', { + logger.error('Failed to create job chunk', { startIndex: i, - batchSize: batch.length, + chunkSize: chunk.length, error: error instanceof Error ? error.message : String(error) }); } } - return { totalItems: items.length, jobsCreated: totalJobsCreated }; + return { + totalItems: items.length, + jobsCreated: totalJobsCreated, + mode: 'direct' + }; } -} + + private async createBatchJobs(config: BatchConfig) { + const { + items, + batchSize = 200, + totalDelayMs, + jobNamePrefix, + operation, + service, + provider, + priority = 3 + } = config; + + const totalBatches = Math.ceil(items.length / batchSize); + const delayPerBatch = Math.floor(totalDelayMs / totalBatches); + const chunkSize = 50; // Create batch jobs in chunks + let batchJobsCreated = 0; + + logger.info('Creating batch jobs', { + totalItems: items.length, + batchSize, + totalBatches, + delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes` + }); + + // Create batch jobs in chunks + for (let chunkStart = 0; chunkStart < totalBatches; chunkStart += chunkSize) { + const chunkEnd = Math.min(chunkStart + chunkSize, totalBatches); + const batchJobs = []; + + for (let batchIndex = chunkStart; batchIndex < chunkEnd; batchIndex++) { + const startIndex = batchIndex * batchSize; + const endIndex = Math.min(startIndex + batchSize, items.length); + const batchItems = items.slice(startIndex, endIndex); + + batchJobs.push({ + name: `${jobNamePrefix}-batch-processing`, + data: { + type: `${jobNamePrefix}-batch-processing`, + service, + provider, + operation: `process-${jobNamePrefix}-batch`, + payload: { + items: batchItems, + batchIndex, + total: totalBatches, + config: { ...config, priority: priority - 1 } + }, + priority + }, + opts: { + delay: batchIndex * delayPerBatch, + jobId: `${jobNamePrefix}-batch-${batchIndex}-${Date.now()}` + } + }); + } + + try { + const createdJobs = await this.queueManager.queue.addBulk(batchJobs); + batchJobsCreated += createdJobs.length; + + logger.info('Batch chunk created', { + chunkStart: chunkStart + 1, + chunkEnd, + created: createdJobs.length, + totalCreated: batchJobsCreated, + progress: `${((chunkEnd / totalBatches) * 100).toFixed(1)}%` + }); + } catch (error) { + logger.error('Failed to create batch chunk', { + chunkStart, + chunkEnd, + error: error instanceof Error ? error.message : String(error) + }); + } + + // Small delay between chunks + if (chunkEnd < totalBatches) { + await new Promise(resolve => setTimeout(resolve, 100)); + } + } + + return { + totalItems: items.length, + batchJobsCreated, + totalBatches, + estimatedDurationHours: totalDelayMs / 1000 / 60 / 60, + mode: 'batch' + }; + } + + /** + * Process a batch (called by batch jobs) + */ + async processBatch(payload: { + items: T[]; + batchIndex: number; + total: number; + config: BatchConfig; + }) { + const { items, batchIndex, total, config } = payload; + + logger.info('Processing batch', { + batchIndex, + batchSize: items.length, + total, + progress: `${((batchIndex + 1) / total * 100).toFixed(2)}%` + }); + + const delayPerItem = Math.floor((15 * 60 * 1000) / items.length); // 15 min per batch + + const jobs = items.map((item, itemIndex) => { + const userData = config.createJobData(item, itemIndex); + + return { + name: `${config.jobNamePrefix}-processing`, + data: { + type: `${config.jobNamePrefix}-processing`, + service: config.service, + provider: config.provider, + operation: config.operation, + payload: { + ...userData, + batchIndex, + itemIndex, + total, + source: userData.source || 'batch-processing' + }, + priority: config.priority || 2 + }, + opts: { + delay: itemIndex * delayPerItem, + jobId: `${config.jobNamePrefix}-${batchIndex}-${itemIndex}-${Date.now()}`, + removeOnComplete: config.removeOnComplete || 5, + removeOnFail: config.removeOnFail || 3 + } + }; + }); + + try { + const createdJobs = await this.queueManager.queue.addBulk(jobs); + + logger.info('Batch processing completed', { + batchIndex, + totalItems: items.length, + jobsCreated: createdJobs.length, + progress: `${((batchIndex + 1) / total * 100).toFixed(2)}%` + }); + + return { + batchIndex, + totalItems: items.length, + jobsCreated: createdJobs.length, + jobsFailed: 0 + }; + } catch (error) { + logger.error('Failed to process batch', { + batchIndex, + error: error instanceof Error ? error.message : String(error) + }); + + return { + batchIndex, + totalItems: items.length, + jobsCreated: 0, + jobsFailed: items.length + }; + } + } +} \ No newline at end of file