From fe7733aeb5d93e6e7ffd3d45fcfc1040442a9a64 Mon Sep 17 00:00:00 2001 From: Boki Date: Wed, 11 Jun 2025 09:53:04 -0400 Subject: [PATCH] working on queue --- .../src/providers/proxy.provider.ts | 5 +- .../data-service/src/providers/proxy.tasks.ts | 119 +++++++++++++----- .../src/services/queue.service.ts | 3 +- apps/data-service/src/utils/batch-helpers.ts | 4 +- libs/http/src/types.ts | 8 +- 5 files changed, 105 insertions(+), 34 deletions(-) diff --git a/apps/data-service/src/providers/proxy.provider.ts b/apps/data-service/src/providers/proxy.provider.ts index 4da7b95..73d59dc 100644 --- a/apps/data-service/src/providers/proxy.provider.ts +++ b/apps/data-service/src/providers/proxy.provider.ts @@ -34,8 +34,9 @@ export const proxyProvider: ProviderConfig = { index, source: 'batch-processing' }), - queueManager, { - totalDelayHours: 0.1,//parseFloat(process.env.PROXY_VALIDATION_HOURS || '1'), + queueManager, + { + totalDelayHours: 4,//parseFloat(process.env.PROXY_VALIDATION_HOURS || '1'), batchSize: parseInt(process.env.PROXY_BATCH_SIZE || '200'), useBatching: process.env.PROXY_DIRECT_MODE !== 'true', priority: 2, diff --git a/apps/data-service/src/providers/proxy.tasks.ts b/apps/data-service/src/providers/proxy.tasks.ts index 1c7ea48..e4396f7 100644 --- a/apps/data-service/src/providers/proxy.tasks.ts +++ b/apps/data-service/src/providers/proxy.tasks.ts @@ -33,25 +33,24 @@ const PROXY_CONFIG = { {id: 'speedx', url: 'https://raw.githubusercontent.com/TheSpeedX/PROXY-List/master/http.txt', protocol: 'http'}, {id: 'monosans', url: 'https://raw.githubusercontent.com/monosans/proxy-list/main/proxies/http.txt', protocol: 'http'}, + {id: 'murong', url: 'https://raw.githubusercontent.com/MuRongPIG/Proxy-Master/main/http.txt', protocol: 'http'}, + {id: 'vakhov-fresh', url: 'https://raw.githubusercontent.com/vakhov/fresh-proxy-list/master/http.txt', protocol: 'http'}, + {id: 'kangproxy', url: 'https://raw.githubusercontent.com/officialputuid/KangProxy/refs/heads/KangProxy/http/http.txt', protocol: 'http'}, + {id: 'gfpcom', url: 'https://raw.githubusercontent.com/gfpcom/free-proxy-list/refs/heads/main/list/http.txt', protocol: 'http'}, + {id: 'dpangestuw', url: 'https://raw.githubusercontent.com/dpangestuw/Free-Proxy/refs/heads/main/http_proxies.txt', protocol: 'http'}, + {id: 'gitrecon', url: 'https://raw.githubusercontent.com/gitrecon1455/fresh-proxy-list/refs/heads/main/proxylist.txt', protocol: 'http'}, + {id: 'vakhov-master', url: 'https://raw.githubusercontent.com/vakhov/fresh-proxy-list/refs/heads/master/http.txt', protocol: 'http'}, + {id: 'breaking-tech', url: 'https://raw.githubusercontent.com/BreakingTechFr/Proxy_Free/refs/heads/main/proxies/http.txt', protocol: 'http'}, + {id: 'ercindedeoglu', url: 'https://raw.githubusercontent.com/ErcinDedeoglu/proxies/main/proxies/http.txt', protocol: 'http'}, + {id: 'tuanminpay', url: 'https://raw.githubusercontent.com/TuanMinPay/live-proxy/master/http.txt', protocol: 'http'}, - // {id: 'murong', url: 'https://raw.githubusercontent.com/MuRongPIG/Proxy-Master/main/http.txt', protocol: 'http'}, - // {id: 'vakhov-fresh', url: 'https://raw.githubusercontent.com/vakhov/fresh-proxy-list/master/http.txt', protocol: 'http'}, - // {id: 'kangproxy', url: 'https://raw.githubusercontent.com/officialputuid/KangProxy/refs/heads/KangProxy/http/http.txt', protocol: 'http'}, - // {id: 'gfpcom', url: 'https://raw.githubusercontent.com/gfpcom/free-proxy-list/refs/heads/main/list/http.txt', protocol: 'http'}, - // {id: 'dpangestuw', url: 'https://raw.githubusercontent.com/dpangestuw/Free-Proxy/refs/heads/main/http_proxies.txt', protocol: 'http'}, - // {id: 'gitrecon', url: 'https://raw.githubusercontent.com/gitrecon1455/fresh-proxy-list/refs/heads/main/proxylist.txt', protocol: 'http'}, - // {id: 'vakhov-master', url: 'https://raw.githubusercontent.com/vakhov/fresh-proxy-list/refs/heads/master/http.txt', protocol: 'http'}, - // {id: 'breaking-tech', url: 'https://raw.githubusercontent.com/BreakingTechFr/Proxy_Free/refs/heads/main/proxies/http.txt', protocol: 'http'}, - // {id: 'ercindedeoglu', url: 'https://raw.githubusercontent.com/ErcinDedeoglu/proxies/main/proxies/http.txt', protocol: 'http'}, - // {id: 'tuanminpay', url: 'https://raw.githubusercontent.com/TuanMinPay/live-proxy/master/http.txt', protocol: 'http'}, - - // {id: 'r00tee-https', url: 'https://raw.githubusercontent.com/r00tee/Proxy-List/refs/heads/main/Https.txt', protocol: 'https'}, - // {id: 'ercindedeoglu-https', url: 'https://raw.githubusercontent.com/ErcinDedeoglu/proxies/main/proxies/https.txt', protocol: 'https'}, + {id: 'r00tee-https', url: 'https://raw.githubusercontent.com/r00tee/Proxy-List/refs/heads/main/Https.txt', protocol: 'https'}, + {id: 'ercindedeoglu-https', url: 'https://raw.githubusercontent.com/ErcinDedeoglu/proxies/main/proxies/https.txt', protocol: 'https'}, {id: 'vakhov-fresh-https', url: 'https://raw.githubusercontent.com/vakhov/fresh-proxy-list/refs/heads/master/https.txt', protocol: 'https'}, - // {id: 'databay-https', url: 'https://raw.githubusercontent.com/databay-labs/free-proxy-list/refs/heads/master/https.txt', protocol: 'https'}, - // {id: 'kangproxy-https', url: 'https://raw.githubusercontent.com/officialputuid/KangProxy/refs/heads/KangProxy/https/https.txt', protocol: 'https'}, - // {id: 'zloi-user-https', url: 'https://raw.githubusercontent.com/zloi-user/hideip.me/refs/heads/master/https.txt', protocol: 'https'}, - // {id: 'gfpcom-https', url: 'https://raw.githubusercontent.com/gfpcom/free-proxy-list/refs/heads/main/list/https.txt', protocol: 'https'}, + {id: 'databay-https', url: 'https://raw.githubusercontent.com/databay-labs/free-proxy-list/refs/heads/master/https.txt', protocol: 'https'}, + {id: 'kangproxy-https', url: 'https://raw.githubusercontent.com/officialputuid/KangProxy/refs/heads/KangProxy/https/https.txt', protocol: 'https'}, + {id: 'zloi-user-https', url: 'https://raw.githubusercontent.com/zloi-user/hideip.me/refs/heads/master/https.txt', protocol: 'https'}, + {id: 'gfpcom-https', url: 'https://raw.githubusercontent.com/gfpcom/free-proxy-list/refs/heads/main/list/https.txt', protocol: 'https'}, ] }; @@ -107,6 +106,74 @@ async function resetProxyStats(): Promise { return Promise.resolve(); } +/** + * Update proxy data in cache with working/total stats and average response time + * @param proxy - The proxy to update + * @param isWorking - Whether the proxy is currently working + */ +async function updateProxyInCache(proxy: ProxyInfo, isWorking: boolean): Promise { + const cacheKey = `${PROXY_CONFIG.CACHE_KEY}:${proxy.protocol}://${proxy.host}:${proxy.port}`; + + try { + const existing: any = await cache.get(cacheKey); + + // For failed proxies, only update if they already exist + if (!isWorking && !existing) { + logger.debug('Proxy not in cache, skipping failed update', { + proxy: `${proxy.host}:${proxy.port}` + }); + return; + } + + // Calculate new average response time if we have a response time + let newAverageResponseTime = existing?.averageResponseTime; + if (proxy.responseTime !== undefined) { + const existingAvg = existing?.averageResponseTime || 0; + const existingTotal = existing?.total || 0; + + // Calculate weighted average: (existing_avg * existing_count + new_response) / (existing_count + 1) + newAverageResponseTime = existingTotal > 0 + ? ((existingAvg * existingTotal) + proxy.responseTime) / (existingTotal + 1) + : proxy.responseTime; + } + + // Build updated proxy data + const updated = { + ...existing, + ...proxy, // Keep latest proxy info + total: (existing?.total || 0) + 1, + working: isWorking ? (existing?.working || 0) + 1 : (existing?.working || 0), + isWorking, + lastChecked: new Date(), + // Add firstSeen only for new entries + ...(existing ? {} : { firstSeen: new Date() }), + // Update average response time if we calculated a new one + ...(newAverageResponseTime !== undefined ? { averageResponseTime: newAverageResponseTime } : {}) + }; + + // Calculate success rate + updated.successRate = updated.total > 0 ? (updated.working / updated.total) * 100 : 0; + + // Save to cache: reset TTL for working proxies, keep existing TTL for failed ones + const cacheOptions = isWorking ? PROXY_CONFIG.CACHE_TTL : undefined; + await cache.set(cacheKey, updated, cacheOptions); + + logger.debug(`Updated ${isWorking ? 'working' : 'failed'} proxy in cache`, { + proxy: `${proxy.host}:${proxy.port}`, + working: updated.working, + total: updated.total, + successRate: updated.successRate.toFixed(1) + '%', + avgResponseTime: updated.averageResponseTime ? `${updated.averageResponseTime.toFixed(0)}ms` : 'N/A' + }); + + } catch (error) { + logger.error('Failed to update proxy in cache', { + proxy: `${proxy.host}:${proxy.port}`, + error: error instanceof Error ? error.message : String(error) + }); + } +} + /** * Initialize proxy cache for use during application startup * This should be called before any proxy operations @@ -281,19 +348,18 @@ export async function checkProxy(proxy: ProxyInfo): Promise { }); const isWorking = response.status >= 200 && response.status < 300; - const result: ProxyInfo = { ...proxy, isWorking, - checkedAt: new Date(), + lastChecked: new Date(), responseTime: response.responseTime, }; if (isWorking && !JSON.stringify(response.data).includes(PROXY_CONFIG.CHECK_IP)) { success = true; - await cache.set(`${PROXY_CONFIG.CACHE_KEY}:${proxy.protocol}://${proxy.host}:${proxy.port}`, result, { ttl: PROXY_CONFIG.CACHE_TTL }); + await updateProxyInCache(result, true); } else { - await cache.del(`${PROXY_CONFIG.CACHE_KEY}:${proxy.protocol}://${proxy.host}:${proxy.port}`); + await updateProxyInCache(result, false); } if( proxy.source ){ @@ -307,21 +373,18 @@ export async function checkProxy(proxy: ProxyInfo): Promise { }); return result; - } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error); - const result: ProxyInfo = { ...proxy, isWorking: false, error: errorMessage, - checkedAt: new Date() + lastChecked: new Date() }; - // If the proxy check failed, remove it from cache - success is here cause i think abort signal fails sometimes - // if (!success) { - // await cache.set(`${PROXY_CONFIG.CACHE_KEY}:${proxy.protocol}://${proxy.host}:${proxy.port}`, result); - // } + // Update cache for failed proxy (increment total, don't update TTL) + await updateProxyInCache(result, false); + if( proxy.source ){ await updateProxyStats(proxy.source, success); } diff --git a/apps/data-service/src/services/queue.service.ts b/apps/data-service/src/services/queue.service.ts index cce2548..9067729 100644 --- a/apps/data-service/src/services/queue.service.ts +++ b/apps/data-service/src/services/queue.service.ts @@ -308,7 +308,8 @@ export class QueueService { workers: this.workers.length, concurrency: this.getTotalConcurrency() }; - } async shutdown() { + } + async shutdown() { if (!this.isInitialized) { this.logger.warn('Queue service not initialized, nothing to shutdown'); return; diff --git a/apps/data-service/src/utils/batch-helpers.ts b/apps/data-service/src/utils/batch-helpers.ts index e730dfc..5be4892 100644 --- a/apps/data-service/src/utils/batch-helpers.ts +++ b/apps/data-service/src/utils/batch-helpers.ts @@ -110,7 +110,7 @@ async function processDirect( ): Promise> { const totalDelayMs = options.totalDelayHours * 60 * 60 * 1000; - const delayPerItem = Math.floor(totalDelayMs / items.length); + const delayPerItem = totalDelayMs / items.length; logger.info('Creating direct jobs', { totalItems: items.length, @@ -157,7 +157,7 @@ async function processBatched( const batchSize = options.batchSize || 100; const batches = createBatches(items, batchSize); const totalDelayMs = options.totalDelayHours * 60 * 60 * 1000; - const delayPerBatch = Math.floor(totalDelayMs / batches.length); + const delayPerBatch = totalDelayMs / batches.length; logger.info('Creating batch jobs', { totalItems: items.length, diff --git a/libs/http/src/types.ts b/libs/http/src/types.ts index 1f173a9..19ec210 100644 --- a/libs/http/src/types.ts +++ b/libs/http/src/types.ts @@ -12,7 +12,13 @@ export interface ProxyInfo { isWorking?: boolean; responseTime?: number; error?: string; - checkedAt?: Date; + // Enhanced tracking properties + working?: number; // Number of successful checks + total?: number; // Total number of checks + successRate?: number; // Success rate percentage + averageResponseTime?: number; // Average response time in milliseconds + firstSeen?: Date; // When the proxy was first added to cache + lastChecked?: Date; // When the proxy was last checked } export interface HttpClientConfig {