fixed batching and waiting priority plus cleanup

This commit is contained in:
Boki 2025-06-11 12:56:07 -04:00
parent e4d5dba73a
commit 6fb98c69f2
6 changed files with 121 additions and 155 deletions

2
.env
View file

@ -10,7 +10,7 @@ LOG_LEVEL=info
DATA_SERVICE_PORT=2001 DATA_SERVICE_PORT=2001
# Queue and Worker Configuration # Queue and Worker Configuration
WORKER_COUNT=5 WORKER_COUNT=4
WORKER_CONCURRENCY=20 WORKER_CONCURRENCY=20
# =========================================== # ===========================================

View file

@ -37,21 +37,14 @@ export const proxyProvider: ProviderConfig = {
}), }),
queueManager, queueManager,
{ {
totalDelayHours: 0.1, //parseFloat(process.env.PROXY_VALIDATION_HOURS || '1'), totalDelayHours: 12, //parseFloat(process.env.PROXY_VALIDATION_HOURS || '1'),
batchSize: parseInt(process.env.PROXY_BATCH_SIZE || '200'), batchSize: parseInt(process.env.PROXY_BATCH_SIZE || '200'),
useBatching: process.env.PROXY_DIRECT_MODE !== 'true', useBatching: process.env.PROXY_DIRECT_MODE !== 'true',
priority: 2,
provider: 'proxy-provider', provider: 'proxy-provider',
operation: 'check-proxy', operation: 'check-proxy',
} }
); );
return { return result;
proxiesFetched: result.totalItems,
jobsCreated: result.jobsCreated,
mode: result.mode,
batchesCreated: result.batchesCreated,
processingTimeMs: result.duration,
};
}, },
'process-batch-items': async (payload: any) => { 'process-batch-items': async (payload: any) => {
// Process a batch using the simplified batch helpers // Process a batch using the simplified batch helpers
@ -77,55 +70,29 @@ export const proxyProvider: ProviderConfig = {
proxy: `${payload.proxy.host}:${payload.proxy.port}`, proxy: `${payload.proxy.host}:${payload.proxy.port}`,
isWorking: result.isWorking, isWorking: result.isWorking,
responseTime: result.responseTime, responseTime: result.responseTime,
batchIndex: payload.batchIndex,
}); });
return { return { result, proxy: payload.proxy };
result,
proxy: payload.proxy,
// Only include batch info if it exists (for batch mode)
...(payload.batchIndex !== undefined && {
batchInfo: {
batchIndex: payload.batchIndex,
itemIndex: payload.itemIndex,
total: payload.total,
source: payload.source,
},
}),
};
} catch (error) { } catch (error) {
logger.warn('Proxy validation failed', { logger.warn('Proxy validation failed', {
proxy: `${payload.proxy.host}:${payload.proxy.port}`, proxy: `${payload.proxy.host}:${payload.proxy.port}`,
error: error instanceof Error ? error.message : String(error), error: error instanceof Error ? error.message : String(error),
batchIndex: payload.batchIndex,
}); });
return { return { result: { isWorking: false, error: String(error) }, proxy: payload.proxy };
result: { isWorking: false, error: String(error) },
proxy: payload.proxy,
// Only include batch info if it exists (for batch mode)
...(payload.batchIndex !== undefined && {
batchInfo: {
batchIndex: payload.batchIndex,
itemIndex: payload.itemIndex,
total: payload.total,
source: payload.source,
},
}),
};
} }
}, },
}, },
scheduledJobs: [ scheduledJobs: [
{ // {
type: 'proxy-maintenance', // type: 'proxy-maintenance',
operation: 'fetch-and-check', // operation: 'fetch-and-check',
payload: {}, // payload: {},
// should remove and just run at the same time so app restarts dont keeping adding same jobs // // should remove and just run at the same time so app restarts dont keeping adding same jobs
cronPattern: getEvery24HourCron(), // cronPattern: getEvery24HourCron(),
priority: 5, // priority: 5,
immediately: true, // Don't run immediately during startup to avoid conflicts // immediately: true, // Don't run immediately during startup to avoid conflicts
description: 'Fetch and validate proxy list from sources', // description: 'Fetch and validate proxy list from sources',
}, // },
], ],
}; };

View file

@ -54,103 +54,102 @@ const PROXY_CONFIG = {
url: 'https://raw.githubusercontent.com/databay-labs/free-proxy-list/refs/heads/master/http.txt', url: 'https://raw.githubusercontent.com/databay-labs/free-proxy-list/refs/heads/master/http.txt',
protocol: 'http', protocol: 'http',
}, },
// { {
// id: 'speedx', id: 'speedx',
// url: 'https://raw.githubusercontent.com/TheSpeedX/PROXY-List/master/http.txt', url: 'https://raw.githubusercontent.com/TheSpeedX/PROXY-List/master/http.txt',
// protocol: 'http', protocol: 'http',
// }, },
// { {
// id: 'monosans', id: 'monosans',
// url: 'https://raw.githubusercontent.com/monosans/proxy-list/main/proxies/http.txt', url: 'https://raw.githubusercontent.com/monosans/proxy-list/main/proxies/http.txt',
// protocol: 'http', protocol: 'http',
// }, },
{
id: 'murong',
url: 'https://raw.githubusercontent.com/MuRongPIG/Proxy-Master/main/http.txt',
protocol: 'http',
},
{
id: 'vakhov-fresh',
url: 'https://raw.githubusercontent.com/vakhov/fresh-proxy-list/master/http.txt',
protocol: 'http',
},
{
id: 'kangproxy',
url: 'https://raw.githubusercontent.com/officialputuid/KangProxy/refs/heads/KangProxy/http/http.txt',
protocol: 'http',
},
{
id: 'gfpcom',
url: 'https://raw.githubusercontent.com/gfpcom/free-proxy-list/refs/heads/main/list/http.txt',
protocol: 'http',
},
{
id: 'dpangestuw',
url: 'https://raw.githubusercontent.com/dpangestuw/Free-Proxy/refs/heads/main/http_proxies.txt',
protocol: 'http',
},
{
id: 'gitrecon',
url: 'https://raw.githubusercontent.com/gitrecon1455/fresh-proxy-list/refs/heads/main/proxylist.txt',
protocol: 'http',
},
{
id: 'vakhov-master',
url: 'https://raw.githubusercontent.com/vakhov/fresh-proxy-list/refs/heads/master/http.txt',
protocol: 'http',
},
{
id: 'breaking-tech',
url: 'https://raw.githubusercontent.com/BreakingTechFr/Proxy_Free/refs/heads/main/proxies/http.txt',
protocol: 'http',
},
{
id: 'ercindedeoglu',
url: 'https://raw.githubusercontent.com/ErcinDedeoglu/proxies/main/proxies/http.txt',
protocol: 'http',
},
{
id: 'tuanminpay',
url: 'https://raw.githubusercontent.com/TuanMinPay/live-proxy/master/http.txt',
protocol: 'http',
},
// { {
// id: 'murong', id: 'r00tee-https',
// url: 'https://raw.githubusercontent.com/MuRongPIG/Proxy-Master/main/http.txt', url: 'https://raw.githubusercontent.com/r00tee/Proxy-List/refs/heads/main/Https.txt',
// protocol: 'http', protocol: 'https',
// }, },
// { {
// id: 'vakhov-fresh', id: 'ercindedeoglu-https',
// url: 'https://raw.githubusercontent.com/vakhov/fresh-proxy-list/master/http.txt', url: 'https://raw.githubusercontent.com/ErcinDedeoglu/proxies/main/proxies/https.txt',
// protocol: 'http', protocol: 'https',
// }, },
// { {
// id: 'kangproxy', id: 'vakhov-fresh-https',
// url: 'https://raw.githubusercontent.com/officialputuid/KangProxy/refs/heads/KangProxy/http/http.txt', url: 'https://raw.githubusercontent.com/vakhov/fresh-proxy-list/refs/heads/master/https.txt',
// protocol: 'http', protocol: 'https',
// }, },
// { {
// id: 'gfpcom', id: 'databay-https',
// url: 'https://raw.githubusercontent.com/gfpcom/free-proxy-list/refs/heads/main/list/http.txt', url: 'https://raw.githubusercontent.com/databay-labs/free-proxy-list/refs/heads/master/https.txt',
// protocol: 'http', protocol: 'https',
// }, },
// { {
// id: 'dpangestuw', id: 'kangproxy-https',
// url: 'https://raw.githubusercontent.com/dpangestuw/Free-Proxy/refs/heads/main/http_proxies.txt', url: 'https://raw.githubusercontent.com/officialputuid/KangProxy/refs/heads/KangProxy/https/https.txt',
// protocol: 'http', protocol: 'https',
// }, },
// { {
// id: 'gitrecon', id: 'zloi-user-https',
// url: 'https://raw.githubusercontent.com/gitrecon1455/fresh-proxy-list/refs/heads/main/proxylist.txt', url: 'https://raw.githubusercontent.com/zloi-user/hideip.me/refs/heads/master/https.txt',
// protocol: 'http', protocol: 'https',
// }, },
// { {
// id: 'vakhov-master', id: 'gfpcom-https',
// url: 'https://raw.githubusercontent.com/vakhov/fresh-proxy-list/refs/heads/master/http.txt', url: 'https://raw.githubusercontent.com/gfpcom/free-proxy-list/refs/heads/main/list/https.txt',
// protocol: 'http', protocol: 'https',
// }, },
// {
// id: 'breaking-tech',
// url: 'https://raw.githubusercontent.com/BreakingTechFr/Proxy_Free/refs/heads/main/proxies/http.txt',
// protocol: 'http',
// },
// {
// id: 'ercindedeoglu',
// url: 'https://raw.githubusercontent.com/ErcinDedeoglu/proxies/main/proxies/http.txt',
// protocol: 'http',
// },
// {
// id: 'tuanminpay',
// url: 'https://raw.githubusercontent.com/TuanMinPay/live-proxy/master/http.txt',
// protocol: 'http',
// },
// {
// id: 'r00tee-https',
// url: 'https://raw.githubusercontent.com/r00tee/Proxy-List/refs/heads/main/Https.txt',
// protocol: 'https',
// },
// {
// id: 'ercindedeoglu-https',
// url: 'https://raw.githubusercontent.com/ErcinDedeoglu/proxies/main/proxies/https.txt',
// protocol: 'https',
// },
// {
// id: 'vakhov-fresh-https',
// url: 'https://raw.githubusercontent.com/vakhov/fresh-proxy-list/refs/heads/master/https.txt',
// protocol: 'https',
// },
// {
// id: 'databay-https',
// url: 'https://raw.githubusercontent.com/databay-labs/free-proxy-list/refs/heads/master/https.txt',
// protocol: 'https',
// },
// {
// id: 'kangproxy-https',
// url: 'https://raw.githubusercontent.com/officialputuid/KangProxy/refs/heads/KangProxy/https/https.txt',
// protocol: 'https',
// },
// {
// id: 'zloi-user-https',
// url: 'https://raw.githubusercontent.com/zloi-user/hideip.me/refs/heads/master/https.txt',
// protocol: 'https',
// },
// {
// id: 'gfpcom-https',
// url: 'https://raw.githubusercontent.com/gfpcom/free-proxy-list/refs/heads/main/list/https.txt',
// protocol: 'https',
// },
], ],
}; };

View file

@ -9,8 +9,8 @@ export class QueueService {
private queueEvents!: QueueEvents; private queueEvents!: QueueEvents;
private config = { private config = {
workers: 1, //parseInt(process.env.WORKER_COUNT || '5'), workers: parseInt(process.env.WORKER_COUNT || '5'),
concurrency: 1, //parseInt(process.env.WORKER_CONCURRENCY || '20'), concurrency: parseInt(process.env.WORKER_CONCURRENCY || '20'),
redis: { redis: {
host: process.env.DRAGONFLY_HOST || 'localhost', host: process.env.DRAGONFLY_HOST || 'localhost',
port: parseInt(process.env.DRAGONFLY_PORT || '6379'), port: parseInt(process.env.DRAGONFLY_PORT || '6379'),
@ -141,7 +141,7 @@ export class QueueService {
}); });
this.queueEvents.on('failed', (job, error) => { this.queueEvents.on('failed', (job, error) => {
this.logger.error('Job failed', { this.logger.debug('Job failed', {
id: job.jobId, id: job.jobId,
error: String(error), error: String(error),
}); });
@ -280,7 +280,7 @@ export class QueueService {
const jobType = jobData.type || `${jobData.provider}-${jobData.operation}`; const jobType = jobData.type || `${jobData.provider}-${jobData.operation}`;
return this.queue.add(jobType, jobData, { return this.queue.add(jobType, jobData, {
priority: jobData.priority || 0, priority: jobData.priority || undefined,
removeOnComplete: 10, removeOnComplete: 10,
removeOnFail: 5, removeOnFail: 5,
...options, ...options,

View file

@ -122,11 +122,11 @@ async function processDirect<T>(
provider: options.provider || 'generic', provider: options.provider || 'generic',
operation: options.operation || 'process-item', operation: options.operation || 'process-item',
payload: processor(item, index), payload: processor(item, index),
priority: options.priority || 1, priority: options.priority || undefined,
}, },
opts: { opts: {
delay: index * delayPerItem, delay: index * delayPerItem,
priority: options.priority || 1, priority: options.priority || undefined,
attempts: options.retries || 3, attempts: options.retries || 3,
removeOnComplete: options.removeOnComplete || 10, removeOnComplete: options.removeOnComplete || 10,
removeOnFail: options.removeOnFail || 5, removeOnFail: options.removeOnFail || 5,
@ -179,11 +179,11 @@ async function processBatched<T>(
totalBatches: batches.length, totalBatches: batches.length,
itemCount: batch.length, itemCount: batch.length,
}, },
priority: options.priority || 2, priority: options.priority || undefined,
}, },
opts: { opts: {
delay: batchIndex * delayPerBatch, delay: batchIndex * delayPerBatch,
priority: options.priority || 2, priority: options.priority || undefined,
attempts: options.retries || 3, attempts: options.retries || 3,
removeOnComplete: options.removeOnComplete || 10, removeOnComplete: options.removeOnComplete || 10,
removeOnFail: options.removeOnFail || 5, removeOnFail: options.removeOnFail || 5,
@ -233,11 +233,11 @@ export async function processBatchJob(jobData: any, queue: QueueService): Promis
provider: options.provider || 'generic', provider: options.provider || 'generic',
operation: options.operation || 'generic', operation: options.operation || 'generic',
payload: processor(item, index), payload: processor(item, index),
priority: options.priority || 1, priority: options.priority || undefined,
}, },
opts: { opts: {
delay: index * (options.delayPerItem || 1000), delay: index * (options.delayPerItem || 1000),
priority: options.priority || 1, priority: options.priority || undefined,
attempts: options.retries || 3, attempts: options.retries || 3,
}, },
})); }));
@ -288,7 +288,7 @@ async function storePayload<T>(
processorStr: processor.toString(), processorStr: processor.toString(),
options: { options: {
delayPerItem: 1000, delayPerItem: 1000,
priority: options.priority || 1, priority: options.priority || undefined,
retries: options.retries || 3, retries: options.retries || 3,
// Store routing information for later use // Store routing information for later use
provider: options.provider || 'generic', provider: options.provider || 'generic',