working on queue

This commit is contained in:
Boki 2025-06-11 09:53:04 -04:00
parent be807378a3
commit 24b7ed15e4
5 changed files with 105 additions and 34 deletions

View file

@ -34,8 +34,9 @@ export const proxyProvider: ProviderConfig = {
index,
source: 'batch-processing'
}),
queueManager, {
totalDelayHours: 0.1,//parseFloat(process.env.PROXY_VALIDATION_HOURS || '1'),
queueManager,
{
totalDelayHours: 4,//parseFloat(process.env.PROXY_VALIDATION_HOURS || '1'),
batchSize: parseInt(process.env.PROXY_BATCH_SIZE || '200'),
useBatching: process.env.PROXY_DIRECT_MODE !== 'true',
priority: 2,

View file

@ -33,25 +33,24 @@ const PROXY_CONFIG = {
{id: 'speedx', url: 'https://raw.githubusercontent.com/TheSpeedX/PROXY-List/master/http.txt', protocol: 'http'},
{id: 'monosans', url: 'https://raw.githubusercontent.com/monosans/proxy-list/main/proxies/http.txt', protocol: 'http'},
{id: 'murong', url: 'https://raw.githubusercontent.com/MuRongPIG/Proxy-Master/main/http.txt', protocol: 'http'},
{id: 'vakhov-fresh', url: 'https://raw.githubusercontent.com/vakhov/fresh-proxy-list/master/http.txt', protocol: 'http'},
{id: 'kangproxy', url: 'https://raw.githubusercontent.com/officialputuid/KangProxy/refs/heads/KangProxy/http/http.txt', protocol: 'http'},
{id: 'gfpcom', url: 'https://raw.githubusercontent.com/gfpcom/free-proxy-list/refs/heads/main/list/http.txt', protocol: 'http'},
{id: 'dpangestuw', url: 'https://raw.githubusercontent.com/dpangestuw/Free-Proxy/refs/heads/main/http_proxies.txt', protocol: 'http'},
{id: 'gitrecon', url: 'https://raw.githubusercontent.com/gitrecon1455/fresh-proxy-list/refs/heads/main/proxylist.txt', protocol: 'http'},
{id: 'vakhov-master', url: 'https://raw.githubusercontent.com/vakhov/fresh-proxy-list/refs/heads/master/http.txt', protocol: 'http'},
{id: 'breaking-tech', url: 'https://raw.githubusercontent.com/BreakingTechFr/Proxy_Free/refs/heads/main/proxies/http.txt', protocol: 'http'},
{id: 'ercindedeoglu', url: 'https://raw.githubusercontent.com/ErcinDedeoglu/proxies/main/proxies/http.txt', protocol: 'http'},
{id: 'tuanminpay', url: 'https://raw.githubusercontent.com/TuanMinPay/live-proxy/master/http.txt', protocol: 'http'},
// {id: 'murong', url: 'https://raw.githubusercontent.com/MuRongPIG/Proxy-Master/main/http.txt', protocol: 'http'},
// {id: 'vakhov-fresh', url: 'https://raw.githubusercontent.com/vakhov/fresh-proxy-list/master/http.txt', protocol: 'http'},
// {id: 'kangproxy', url: 'https://raw.githubusercontent.com/officialputuid/KangProxy/refs/heads/KangProxy/http/http.txt', protocol: 'http'},
// {id: 'gfpcom', url: 'https://raw.githubusercontent.com/gfpcom/free-proxy-list/refs/heads/main/list/http.txt', protocol: 'http'},
// {id: 'dpangestuw', url: 'https://raw.githubusercontent.com/dpangestuw/Free-Proxy/refs/heads/main/http_proxies.txt', protocol: 'http'},
// {id: 'gitrecon', url: 'https://raw.githubusercontent.com/gitrecon1455/fresh-proxy-list/refs/heads/main/proxylist.txt', protocol: 'http'},
// {id: 'vakhov-master', url: 'https://raw.githubusercontent.com/vakhov/fresh-proxy-list/refs/heads/master/http.txt', protocol: 'http'},
// {id: 'breaking-tech', url: 'https://raw.githubusercontent.com/BreakingTechFr/Proxy_Free/refs/heads/main/proxies/http.txt', protocol: 'http'},
// {id: 'ercindedeoglu', url: 'https://raw.githubusercontent.com/ErcinDedeoglu/proxies/main/proxies/http.txt', protocol: 'http'},
// {id: 'tuanminpay', url: 'https://raw.githubusercontent.com/TuanMinPay/live-proxy/master/http.txt', protocol: 'http'},
// {id: 'r00tee-https', url: 'https://raw.githubusercontent.com/r00tee/Proxy-List/refs/heads/main/Https.txt', protocol: 'https'},
// {id: 'ercindedeoglu-https', url: 'https://raw.githubusercontent.com/ErcinDedeoglu/proxies/main/proxies/https.txt', protocol: 'https'},
{id: 'r00tee-https', url: 'https://raw.githubusercontent.com/r00tee/Proxy-List/refs/heads/main/Https.txt', protocol: 'https'},
{id: 'ercindedeoglu-https', url: 'https://raw.githubusercontent.com/ErcinDedeoglu/proxies/main/proxies/https.txt', protocol: 'https'},
{id: 'vakhov-fresh-https', url: 'https://raw.githubusercontent.com/vakhov/fresh-proxy-list/refs/heads/master/https.txt', protocol: 'https'},
// {id: 'databay-https', url: 'https://raw.githubusercontent.com/databay-labs/free-proxy-list/refs/heads/master/https.txt', protocol: 'https'},
// {id: 'kangproxy-https', url: 'https://raw.githubusercontent.com/officialputuid/KangProxy/refs/heads/KangProxy/https/https.txt', protocol: 'https'},
// {id: 'zloi-user-https', url: 'https://raw.githubusercontent.com/zloi-user/hideip.me/refs/heads/master/https.txt', protocol: 'https'},
// {id: 'gfpcom-https', url: 'https://raw.githubusercontent.com/gfpcom/free-proxy-list/refs/heads/main/list/https.txt', protocol: 'https'},
{id: 'databay-https', url: 'https://raw.githubusercontent.com/databay-labs/free-proxy-list/refs/heads/master/https.txt', protocol: 'https'},
{id: 'kangproxy-https', url: 'https://raw.githubusercontent.com/officialputuid/KangProxy/refs/heads/KangProxy/https/https.txt', protocol: 'https'},
{id: 'zloi-user-https', url: 'https://raw.githubusercontent.com/zloi-user/hideip.me/refs/heads/master/https.txt', protocol: 'https'},
{id: 'gfpcom-https', url: 'https://raw.githubusercontent.com/gfpcom/free-proxy-list/refs/heads/main/list/https.txt', protocol: 'https'},
]
};
@ -107,6 +106,74 @@ async function resetProxyStats(): Promise<void> {
return Promise.resolve();
}
/**
* Update proxy data in cache with working/total stats and average response time
* @param proxy - The proxy to update
* @param isWorking - Whether the proxy is currently working
*/
async function updateProxyInCache(proxy: ProxyInfo, isWorking: boolean): Promise<void> {
const cacheKey = `${PROXY_CONFIG.CACHE_KEY}:${proxy.protocol}://${proxy.host}:${proxy.port}`;
try {
const existing: any = await cache.get(cacheKey);
// For failed proxies, only update if they already exist
if (!isWorking && !existing) {
logger.debug('Proxy not in cache, skipping failed update', {
proxy: `${proxy.host}:${proxy.port}`
});
return;
}
// Calculate new average response time if we have a response time
let newAverageResponseTime = existing?.averageResponseTime;
if (proxy.responseTime !== undefined) {
const existingAvg = existing?.averageResponseTime || 0;
const existingTotal = existing?.total || 0;
// Calculate weighted average: (existing_avg * existing_count + new_response) / (existing_count + 1)
newAverageResponseTime = existingTotal > 0
? ((existingAvg * existingTotal) + proxy.responseTime) / (existingTotal + 1)
: proxy.responseTime;
}
// Build updated proxy data
const updated = {
...existing,
...proxy, // Keep latest proxy info
total: (existing?.total || 0) + 1,
working: isWorking ? (existing?.working || 0) + 1 : (existing?.working || 0),
isWorking,
lastChecked: new Date(),
// Add firstSeen only for new entries
...(existing ? {} : { firstSeen: new Date() }),
// Update average response time if we calculated a new one
...(newAverageResponseTime !== undefined ? { averageResponseTime: newAverageResponseTime } : {})
};
// Calculate success rate
updated.successRate = updated.total > 0 ? (updated.working / updated.total) * 100 : 0;
// Save to cache: reset TTL for working proxies, keep existing TTL for failed ones
const cacheOptions = isWorking ? PROXY_CONFIG.CACHE_TTL : undefined;
await cache.set(cacheKey, updated, cacheOptions);
logger.debug(`Updated ${isWorking ? 'working' : 'failed'} proxy in cache`, {
proxy: `${proxy.host}:${proxy.port}`,
working: updated.working,
total: updated.total,
successRate: updated.successRate.toFixed(1) + '%',
avgResponseTime: updated.averageResponseTime ? `${updated.averageResponseTime.toFixed(0)}ms` : 'N/A'
});
} catch (error) {
logger.error('Failed to update proxy in cache', {
proxy: `${proxy.host}:${proxy.port}`,
error: error instanceof Error ? error.message : String(error)
});
}
}
/**
* Initialize proxy cache for use during application startup
* This should be called before any proxy operations
@ -281,19 +348,18 @@ export async function checkProxy(proxy: ProxyInfo): Promise<ProxyInfo> {
});
const isWorking = response.status >= 200 && response.status < 300;
const result: ProxyInfo = {
...proxy,
isWorking,
checkedAt: new Date(),
lastChecked: new Date(),
responseTime: response.responseTime,
};
if (isWorking && !JSON.stringify(response.data).includes(PROXY_CONFIG.CHECK_IP)) {
success = true;
await cache.set(`${PROXY_CONFIG.CACHE_KEY}:${proxy.protocol}://${proxy.host}:${proxy.port}`, result, { ttl: PROXY_CONFIG.CACHE_TTL });
await updateProxyInCache(result, true);
} else {
await cache.del(`${PROXY_CONFIG.CACHE_KEY}:${proxy.protocol}://${proxy.host}:${proxy.port}`);
await updateProxyInCache(result, false);
}
if( proxy.source ){
@ -307,21 +373,18 @@ export async function checkProxy(proxy: ProxyInfo): Promise<ProxyInfo> {
});
return result;
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
const result: ProxyInfo = {
...proxy,
isWorking: false,
error: errorMessage,
checkedAt: new Date()
lastChecked: new Date()
};
// If the proxy check failed, remove it from cache - success is here cause i think abort signal fails sometimes
// if (!success) {
// await cache.set(`${PROXY_CONFIG.CACHE_KEY}:${proxy.protocol}://${proxy.host}:${proxy.port}`, result);
// }
// Update cache for failed proxy (increment total, don't update TTL)
await updateProxyInCache(result, false);
if( proxy.source ){
await updateProxyStats(proxy.source, success);
}

View file

@ -308,7 +308,8 @@ export class QueueService {
workers: this.workers.length,
concurrency: this.getTotalConcurrency()
};
} async shutdown() {
}
async shutdown() {
if (!this.isInitialized) {
this.logger.warn('Queue service not initialized, nothing to shutdown');
return;

View file

@ -110,7 +110,7 @@ async function processDirect<T>(
): Promise<Omit<BatchResult, 'duration'>> {
const totalDelayMs = options.totalDelayHours * 60 * 60 * 1000;
const delayPerItem = Math.floor(totalDelayMs / items.length);
const delayPerItem = totalDelayMs / items.length;
logger.info('Creating direct jobs', {
totalItems: items.length,
@ -157,7 +157,7 @@ async function processBatched<T>(
const batchSize = options.batchSize || 100;
const batches = createBatches(items, batchSize);
const totalDelayMs = options.totalDelayHours * 60 * 60 * 1000;
const delayPerBatch = Math.floor(totalDelayMs / batches.length);
const delayPerBatch = totalDelayMs / batches.length;
logger.info('Creating batch jobs', {
totalItems: items.length,

View file

@ -12,7 +12,13 @@ export interface ProxyInfo {
isWorking?: boolean;
responseTime?: number;
error?: string;
checkedAt?: Date;
// Enhanced tracking properties
working?: number; // Number of successful checks
total?: number; // Total number of checks
successRate?: number; // Success rate percentage
averageResponseTime?: number; // Average response time in milliseconds
firstSeen?: Date; // When the proxy was first added to cache
lastChecked?: Date; // When the proxy was last checked
}
export interface HttpClientConfig {