From 1ccdbddb71a72d4390bb3201613d39977ccf97b4 Mon Sep 17 00:00:00 2001 From: Bojan Kucera Date: Sun, 8 Jun 2025 19:32:53 -0400 Subject: [PATCH] added initial jobs for proxies, will prob have to be optimized --- .../src/providers/proxy.provider.ts | 143 +++++++++++++++++- .../data-service/src/providers/proxy.tasks.ts | 32 +--- 2 files changed, 141 insertions(+), 34 deletions(-) diff --git a/apps/data-service/src/providers/proxy.provider.ts b/apps/data-service/src/providers/proxy.provider.ts index f82190a..8102478 100644 --- a/apps/data-service/src/providers/proxy.provider.ts +++ b/apps/data-service/src/providers/proxy.provider.ts @@ -1,4 +1,9 @@ +import { ProxyInfo } from 'libs/http/src/types'; import { ProviderConfig } from '../services/provider-registry.service'; +import { Logger } from '@stock-bot/logger'; + +// Create logger for this provider +const logger = new Logger('proxy-provider'); // This will run at the same time each day as when the app started const getEvery24HourCron = (): string => { @@ -14,12 +19,140 @@ export const proxyProvider: ProviderConfig = { operations: { 'fetch-and-check': async (payload: { sources?: string[] }) => { const { proxyService } = await import('./proxy.tasks'); - return await proxyService.fetchProxiesFromSources(); + const proxies = await proxyService.fetchProxiesFromSources(); + const proxiesCount = proxies.length; + // Get the actual proxies to create individual jobs + if (proxiesCount > 0) { + try { + const { queueManager } = await import('../services/queue.service'); + if (proxies && proxies.length > 0) { + // Calculate delay distribution over 24 hours + const totalDelayMs = 24 * 60 * 60 * 1000; // 24 hours in milliseconds + const delayPerProxy = Math.floor(totalDelayMs / proxies.length); + + logger.info('Creating individual proxy validation jobs', { + proxyCount: proxies.length, + distributionPeriod: '24 hours', + delayPerProxy: `${(delayPerProxy / 1000 / 60).toFixed(2)} minutes` + }); + + let queuedCount = 0; + + for (let i = 0; i < proxies.length; i++) { + const proxy = proxies[i]; + const delay = i * delayPerProxy; + + try { + await queueManager.addJob({ + type: 'proxy-validation', + service: 'proxy', + provider: 'proxy-service', + operation: 'check-proxy', + payload: { + proxy: proxy, + source: 'fetch-and-check', + autoTriggered: true, + batchIndex: i, + totalBatch: proxies.length + }, + priority: 3 + }, { + delay: delay + }); + + queuedCount++; + // Log progress every 100 jobs + if ((i + 1) % 100 === 0 || i === proxies.length - 1) { + logger.info('Proxy validation jobs queued progress', { + queued: i + 1, + total: proxies.length, + percentage: `${((i + 1) / proxies.length * 100).toFixed(1)}%` + }); + } + + } catch (error) { + logger.error('Failed to queue proxy validation job', { + proxy: `${proxy.host}:${proxy.port}`, + batchIndex: i, + error: error instanceof Error ? error.message : String(error) + }); + } } + + logger.info('Proxy validation jobs queuing completed', { + total: proxies.length, + successful: queuedCount, + failed: proxies.length - queuedCount, + totalDelay: `${(totalDelayMs / 1000 / 60 / 60).toFixed(1)} hours`, + avgDelayPerJob: `${(delayPerProxy / 1000 / 60).toFixed(2)} minutes` + }); + + return { + proxiesFetched: proxiesCount, + jobsQueued: queuedCount, + totalDelay: `${(totalDelayMs / 1000 / 60 / 60).toFixed(1)} hours`, + avgDelayPerJob: `${(delayPerProxy / 1000 / 60).toFixed(2)} minutes` + }; + } else { + logger.warn('No proxies found to create validation jobs', { + proxiesFetched: proxiesCount + }); + return { + proxiesFetched: proxiesCount, + jobsQueued: 0, + message: 'No cached proxies found' + }; + } + + } catch (error) { + logger.error('Failed to create individual proxy validation jobs', { + proxiesCount, + error: error instanceof Error ? error.message : String(error) + }); + return { + proxiesFetched: proxiesCount, + jobsQueued: 0, + error: error instanceof Error ? error.message : String(error) + }; } + } else { logger.info('No proxies fetched, skipping job creation'); + return { + proxiesFetched: 0, + jobsQueued: 0, + message: 'No proxies fetched' + }; + } }, - - 'check-specific': async (payload: { proxies: any[] }) => { - const { proxyService } = await import('./proxy.tasks'); - return await proxyService.checkProxies(payload.proxies); + 'check-proxy': async (payload: { + proxy: ProxyInfo, + source?: string, + batchIndex?: number, + totalBatch?: number + }) => { + const { checkProxy } = await import('./proxy.tasks'); + + logger.debug('Checking individual proxy', { + proxy: `${payload.proxy.host}:${payload.proxy.port}`, + batchIndex: payload.batchIndex, + totalBatch: payload.totalBatch, + source: payload.source + }); + + const result = await checkProxy(payload.proxy); + + logger.debug('Proxy check completed', { + proxy: `${payload.proxy.host}:${payload.proxy.port}`, + isWorking: result.isWorking, + responseTime: result.responseTime, + batchIndex: payload.batchIndex + }); + + return { + result: result, + batchInfo: { + index: payload.batchIndex, + total: payload.totalBatch, + source: payload.source + } + }; }, }, diff --git a/apps/data-service/src/providers/proxy.tasks.ts b/apps/data-service/src/providers/proxy.tasks.ts index 936833f..4481287 100644 --- a/apps/data-service/src/providers/proxy.tasks.ts +++ b/apps/data-service/src/providers/proxy.tasks.ts @@ -114,7 +114,7 @@ export async function queueProxyCheck(proxies: ProxyInfo[]): Promise { return jobId; } -export async function fetchProxiesFromSources(): Promise { +export async function fetchProxiesFromSources(): Promise { initializeSharedResources(); const sources = PROXY_CONFIG.PROXY_SOURCES.map(source => @@ -124,7 +124,7 @@ export async function fetchProxiesFromSources(): Promise { let allProxies: ProxyInfo[] = result.flat(); allProxies = removeDuplicateProxies(allProxies); // await checkProxies(allProxies); - return allProxies.length; + return allProxies; } export async function fetchProxiesFromSource(source: { url: string; protocol: string }): Promise { @@ -177,29 +177,6 @@ export async function fetchProxiesFromSource(source: { url: string; protocol: st return allProxies; } -/** - * Check multiple proxies concurrently - */ -export async function checkProxies(proxies: ProxyInfo[]): Promise { - initializeSharedResources(); - - logger.info('Checking proxies', { count: proxies.length }); - - const checkPromises = proxies.map(proxy => - concurrencyLimit(() => checkProxy(proxy)) - ); - const results = await Promise.all(checkPromises); - const workingCount = results.filter((r: ProxyInfo) => r.isWorking).length; - - logger.info('Proxy check completed', { - total: proxies.length, - working: workingCount, - failed: proxies.length - workingCount - }); - - return results; -} - /** * Check if a proxy is working */ @@ -207,7 +184,7 @@ export async function checkProxy(proxy: ProxyInfo): Promise { initializeSharedResources(); let success = false; - logger.debug(`Checking Proxy with ${concurrencyLimit.pendingCount} pending:`, { + logger.debug(`Checking Proxy:`, { protocol: proxy.protocol, host: proxy.host, port: proxy.port, @@ -269,8 +246,6 @@ export async function checkProxy(proxy: ProxyInfo): Promise { } } - - // Utility functions function cleanProxyUrl(url: string): string { return url @@ -301,7 +276,6 @@ export const proxyTasks = { fetchProxiesFromSources, fetchProxiesFromSource, checkProxy, - checkProxies, }; // Export singleton instance for backward compatibility (optional)