From 3097686849f0d162a08755baca0f377b17d922d6 Mon Sep 17 00:00:00 2001 From: Boki Date: Wed, 11 Jun 2025 12:56:07 -0400 Subject: [PATCH] fixed batching and waiting priority plus cleanup --- .env | 2 +- apps/data-service/src/config/app.config.ts | 0 .../src/providers/proxy.provider.ts | 61 ++---- .../data-service/src/providers/proxy.tasks.ts | 191 +++++++++--------- .../src/services/queue.service.ts | 8 +- apps/data-service/src/utils/batch-helpers.ts | 14 +- 6 files changed, 121 insertions(+), 155 deletions(-) delete mode 100644 apps/data-service/src/config/app.config.ts diff --git a/.env b/.env index b1ae1aa..5674a15 100644 --- a/.env +++ b/.env @@ -10,7 +10,7 @@ LOG_LEVEL=info DATA_SERVICE_PORT=2001 # Queue and Worker Configuration -WORKER_COUNT=5 +WORKER_COUNT=4 WORKER_CONCURRENCY=20 # =========================================== diff --git a/apps/data-service/src/config/app.config.ts b/apps/data-service/src/config/app.config.ts deleted file mode 100644 index e69de29..0000000 diff --git a/apps/data-service/src/providers/proxy.provider.ts b/apps/data-service/src/providers/proxy.provider.ts index c1ea4a8..0cb492d 100644 --- a/apps/data-service/src/providers/proxy.provider.ts +++ b/apps/data-service/src/providers/proxy.provider.ts @@ -37,21 +37,14 @@ export const proxyProvider: ProviderConfig = { }), queueManager, { - totalDelayHours: 0.1, //parseFloat(process.env.PROXY_VALIDATION_HOURS || '1'), + totalDelayHours: 12, //parseFloat(process.env.PROXY_VALIDATION_HOURS || '1'), batchSize: parseInt(process.env.PROXY_BATCH_SIZE || '200'), useBatching: process.env.PROXY_DIRECT_MODE !== 'true', - priority: 2, provider: 'proxy-provider', operation: 'check-proxy', } ); - return { - proxiesFetched: result.totalItems, - jobsCreated: result.jobsCreated, - mode: result.mode, - batchesCreated: result.batchesCreated, - processingTimeMs: result.duration, - }; + return result; }, 'process-batch-items': async (payload: any) => { // Process a batch using the simplified batch helpers @@ -77,55 +70,29 @@ export const proxyProvider: ProviderConfig = { proxy: `${payload.proxy.host}:${payload.proxy.port}`, isWorking: result.isWorking, responseTime: result.responseTime, - batchIndex: payload.batchIndex, }); - return { - result, - proxy: payload.proxy, - // Only include batch info if it exists (for batch mode) - ...(payload.batchIndex !== undefined && { - batchInfo: { - batchIndex: payload.batchIndex, - itemIndex: payload.itemIndex, - total: payload.total, - source: payload.source, - }, - }), - }; + return { result, proxy: payload.proxy }; } catch (error) { logger.warn('Proxy validation failed', { proxy: `${payload.proxy.host}:${payload.proxy.port}`, error: error instanceof Error ? error.message : String(error), - batchIndex: payload.batchIndex, }); - return { - result: { isWorking: false, error: String(error) }, - proxy: payload.proxy, - // Only include batch info if it exists (for batch mode) - ...(payload.batchIndex !== undefined && { - batchInfo: { - batchIndex: payload.batchIndex, - itemIndex: payload.itemIndex, - total: payload.total, - source: payload.source, - }, - }), - }; + return { result: { isWorking: false, error: String(error) }, proxy: payload.proxy }; } }, }, scheduledJobs: [ - { - type: 'proxy-maintenance', - operation: 'fetch-and-check', - payload: {}, - // should remove and just run at the same time so app restarts dont keeping adding same jobs - cronPattern: getEvery24HourCron(), - priority: 5, - immediately: true, // Don't run immediately during startup to avoid conflicts - description: 'Fetch and validate proxy list from sources', - }, + // { + // type: 'proxy-maintenance', + // operation: 'fetch-and-check', + // payload: {}, + // // should remove and just run at the same time so app restarts dont keeping adding same jobs + // cronPattern: getEvery24HourCron(), + // priority: 5, + // immediately: true, // Don't run immediately during startup to avoid conflicts + // description: 'Fetch and validate proxy list from sources', + // }, ], }; diff --git a/apps/data-service/src/providers/proxy.tasks.ts b/apps/data-service/src/providers/proxy.tasks.ts index 92cbc89..9e45804 100644 --- a/apps/data-service/src/providers/proxy.tasks.ts +++ b/apps/data-service/src/providers/proxy.tasks.ts @@ -54,103 +54,102 @@ const PROXY_CONFIG = { url: 'https://raw.githubusercontent.com/databay-labs/free-proxy-list/refs/heads/master/http.txt', protocol: 'http', }, - // { - // id: 'speedx', - // url: 'https://raw.githubusercontent.com/TheSpeedX/PROXY-List/master/http.txt', - // protocol: 'http', - // }, - // { - // id: 'monosans', - // url: 'https://raw.githubusercontent.com/monosans/proxy-list/main/proxies/http.txt', - // protocol: 'http', - // }, + { + id: 'speedx', + url: 'https://raw.githubusercontent.com/TheSpeedX/PROXY-List/master/http.txt', + protocol: 'http', + }, + { + id: 'monosans', + url: 'https://raw.githubusercontent.com/monosans/proxy-list/main/proxies/http.txt', + protocol: 'http', + }, + { + id: 'murong', + url: 'https://raw.githubusercontent.com/MuRongPIG/Proxy-Master/main/http.txt', + protocol: 'http', + }, + { + id: 'vakhov-fresh', + url: 'https://raw.githubusercontent.com/vakhov/fresh-proxy-list/master/http.txt', + protocol: 'http', + }, + { + id: 'kangproxy', + url: 'https://raw.githubusercontent.com/officialputuid/KangProxy/refs/heads/KangProxy/http/http.txt', + protocol: 'http', + }, + { + id: 'gfpcom', + url: 'https://raw.githubusercontent.com/gfpcom/free-proxy-list/refs/heads/main/list/http.txt', + protocol: 'http', + }, + { + id: 'dpangestuw', + url: 'https://raw.githubusercontent.com/dpangestuw/Free-Proxy/refs/heads/main/http_proxies.txt', + protocol: 'http', + }, + { + id: 'gitrecon', + url: 'https://raw.githubusercontent.com/gitrecon1455/fresh-proxy-list/refs/heads/main/proxylist.txt', + protocol: 'http', + }, + { + id: 'vakhov-master', + url: 'https://raw.githubusercontent.com/vakhov/fresh-proxy-list/refs/heads/master/http.txt', + protocol: 'http', + }, + { + id: 'breaking-tech', + url: 'https://raw.githubusercontent.com/BreakingTechFr/Proxy_Free/refs/heads/main/proxies/http.txt', + protocol: 'http', + }, + { + id: 'ercindedeoglu', + url: 'https://raw.githubusercontent.com/ErcinDedeoglu/proxies/main/proxies/http.txt', + protocol: 'http', + }, + { + id: 'tuanminpay', + url: 'https://raw.githubusercontent.com/TuanMinPay/live-proxy/master/http.txt', + protocol: 'http', + }, - // { - // id: 'murong', - // url: 'https://raw.githubusercontent.com/MuRongPIG/Proxy-Master/main/http.txt', - // protocol: 'http', - // }, - // { - // id: 'vakhov-fresh', - // url: 'https://raw.githubusercontent.com/vakhov/fresh-proxy-list/master/http.txt', - // protocol: 'http', - // }, - // { - // id: 'kangproxy', - // url: 'https://raw.githubusercontent.com/officialputuid/KangProxy/refs/heads/KangProxy/http/http.txt', - // protocol: 'http', - // }, - // { - // id: 'gfpcom', - // url: 'https://raw.githubusercontent.com/gfpcom/free-proxy-list/refs/heads/main/list/http.txt', - // protocol: 'http', - // }, - // { - // id: 'dpangestuw', - // url: 'https://raw.githubusercontent.com/dpangestuw/Free-Proxy/refs/heads/main/http_proxies.txt', - // protocol: 'http', - // }, - // { - // id: 'gitrecon', - // url: 'https://raw.githubusercontent.com/gitrecon1455/fresh-proxy-list/refs/heads/main/proxylist.txt', - // protocol: 'http', - // }, - // { - // id: 'vakhov-master', - // url: 'https://raw.githubusercontent.com/vakhov/fresh-proxy-list/refs/heads/master/http.txt', - // protocol: 'http', - // }, - // { - // id: 'breaking-tech', - // url: 'https://raw.githubusercontent.com/BreakingTechFr/Proxy_Free/refs/heads/main/proxies/http.txt', - // protocol: 'http', - // }, - // { - // id: 'ercindedeoglu', - // url: 'https://raw.githubusercontent.com/ErcinDedeoglu/proxies/main/proxies/http.txt', - // protocol: 'http', - // }, - // { - // id: 'tuanminpay', - // url: 'https://raw.githubusercontent.com/TuanMinPay/live-proxy/master/http.txt', - // protocol: 'http', - // }, - - // { - // id: 'r00tee-https', - // url: 'https://raw.githubusercontent.com/r00tee/Proxy-List/refs/heads/main/Https.txt', - // protocol: 'https', - // }, - // { - // id: 'ercindedeoglu-https', - // url: 'https://raw.githubusercontent.com/ErcinDedeoglu/proxies/main/proxies/https.txt', - // protocol: 'https', - // }, - // { - // id: 'vakhov-fresh-https', - // url: 'https://raw.githubusercontent.com/vakhov/fresh-proxy-list/refs/heads/master/https.txt', - // protocol: 'https', - // }, - // { - // id: 'databay-https', - // url: 'https://raw.githubusercontent.com/databay-labs/free-proxy-list/refs/heads/master/https.txt', - // protocol: 'https', - // }, - // { - // id: 'kangproxy-https', - // url: 'https://raw.githubusercontent.com/officialputuid/KangProxy/refs/heads/KangProxy/https/https.txt', - // protocol: 'https', - // }, - // { - // id: 'zloi-user-https', - // url: 'https://raw.githubusercontent.com/zloi-user/hideip.me/refs/heads/master/https.txt', - // protocol: 'https', - // }, - // { - // id: 'gfpcom-https', - // url: 'https://raw.githubusercontent.com/gfpcom/free-proxy-list/refs/heads/main/list/https.txt', - // protocol: 'https', - // }, + { + id: 'r00tee-https', + url: 'https://raw.githubusercontent.com/r00tee/Proxy-List/refs/heads/main/Https.txt', + protocol: 'https', + }, + { + id: 'ercindedeoglu-https', + url: 'https://raw.githubusercontent.com/ErcinDedeoglu/proxies/main/proxies/https.txt', + protocol: 'https', + }, + { + id: 'vakhov-fresh-https', + url: 'https://raw.githubusercontent.com/vakhov/fresh-proxy-list/refs/heads/master/https.txt', + protocol: 'https', + }, + { + id: 'databay-https', + url: 'https://raw.githubusercontent.com/databay-labs/free-proxy-list/refs/heads/master/https.txt', + protocol: 'https', + }, + { + id: 'kangproxy-https', + url: 'https://raw.githubusercontent.com/officialputuid/KangProxy/refs/heads/KangProxy/https/https.txt', + protocol: 'https', + }, + { + id: 'zloi-user-https', + url: 'https://raw.githubusercontent.com/zloi-user/hideip.me/refs/heads/master/https.txt', + protocol: 'https', + }, + { + id: 'gfpcom-https', + url: 'https://raw.githubusercontent.com/gfpcom/free-proxy-list/refs/heads/main/list/https.txt', + protocol: 'https', + }, ], }; diff --git a/apps/data-service/src/services/queue.service.ts b/apps/data-service/src/services/queue.service.ts index 4885f8d..d7bdbee 100644 --- a/apps/data-service/src/services/queue.service.ts +++ b/apps/data-service/src/services/queue.service.ts @@ -9,8 +9,8 @@ export class QueueService { private queueEvents!: QueueEvents; private config = { - workers: 1, //parseInt(process.env.WORKER_COUNT || '5'), - concurrency: 1, //parseInt(process.env.WORKER_CONCURRENCY || '20'), + workers: parseInt(process.env.WORKER_COUNT || '5'), + concurrency: parseInt(process.env.WORKER_CONCURRENCY || '20'), redis: { host: process.env.DRAGONFLY_HOST || 'localhost', port: parseInt(process.env.DRAGONFLY_PORT || '6379'), @@ -141,7 +141,7 @@ export class QueueService { }); this.queueEvents.on('failed', (job, error) => { - this.logger.error('Job failed', { + this.logger.debug('Job failed', { id: job.jobId, error: String(error), }); @@ -280,7 +280,7 @@ export class QueueService { const jobType = jobData.type || `${jobData.provider}-${jobData.operation}`; return this.queue.add(jobType, jobData, { - priority: jobData.priority || 0, + priority: jobData.priority || undefined, removeOnComplete: 10, removeOnFail: 5, ...options, diff --git a/apps/data-service/src/utils/batch-helpers.ts b/apps/data-service/src/utils/batch-helpers.ts index 70ef019..d859eaa 100644 --- a/apps/data-service/src/utils/batch-helpers.ts +++ b/apps/data-service/src/utils/batch-helpers.ts @@ -122,11 +122,11 @@ async function processDirect( provider: options.provider || 'generic', operation: options.operation || 'process-item', payload: processor(item, index), - priority: options.priority || 1, + priority: options.priority || undefined, }, opts: { delay: index * delayPerItem, - priority: options.priority || 1, + priority: options.priority || undefined, attempts: options.retries || 3, removeOnComplete: options.removeOnComplete || 10, removeOnFail: options.removeOnFail || 5, @@ -179,11 +179,11 @@ async function processBatched( totalBatches: batches.length, itemCount: batch.length, }, - priority: options.priority || 2, + priority: options.priority || undefined, }, opts: { delay: batchIndex * delayPerBatch, - priority: options.priority || 2, + priority: options.priority || undefined, attempts: options.retries || 3, removeOnComplete: options.removeOnComplete || 10, removeOnFail: options.removeOnFail || 5, @@ -233,11 +233,11 @@ export async function processBatchJob(jobData: any, queue: QueueService): Promis provider: options.provider || 'generic', operation: options.operation || 'generic', payload: processor(item, index), - priority: options.priority || 1, + priority: options.priority || undefined, }, opts: { delay: index * (options.delayPerItem || 1000), - priority: options.priority || 1, + priority: options.priority || undefined, attempts: options.retries || 3, }, })); @@ -288,7 +288,7 @@ async function storePayload( processorStr: processor.toString(), options: { delayPerItem: 1000, - priority: options.priority || 1, + priority: options.priority || undefined, retries: options.retries || 3, // Store routing information for later use provider: options.provider || 'generic',