added p-limit instead ot batch to proxy-service

This commit is contained in:
Bojan Kucera 2025-06-07 17:05:14 -04:00
parent 2ae1e73661
commit baa34a3805
3 changed files with 27 additions and 33 deletions

View file

@ -10,17 +10,19 @@
"start": "bun dist/index.js", "start": "bun dist/index.js",
"test": "bun test", "test": "bun test",
"clean": "rm -rf dist" "clean": "rm -rf dist"
}, "dependencies": { },
"dependencies": {
"@stock-bot/cache": "*",
"@stock-bot/config": "*", "@stock-bot/config": "*",
"@stock-bot/logger": "*",
"@stock-bot/shutdown": "*",
"@stock-bot/types": "*",
"@stock-bot/questdb-client": "*",
"@stock-bot/mongodb-client": "*",
"@stock-bot/event-bus": "*", "@stock-bot/event-bus": "*",
"@stock-bot/http": "*", "@stock-bot/http": "*",
"@stock-bot/cache": "*", "@stock-bot/logger": "*",
"@stock-bot/mongodb-client": "*",
"@stock-bot/questdb-client": "*",
"@stock-bot/shutdown": "*",
"@stock-bot/types": "*",
"hono": "^4.0.0", "hono": "^4.0.0",
"p-limit": "^6.2.0",
"ws": "^8.0.0" "ws": "^8.0.0"
}, },
"devDependencies": { "devDependencies": {

View file

@ -1,6 +1,7 @@
import { Logger } from '@stock-bot/logger'; import { Logger } from '@stock-bot/logger';
import createCache, { type CacheProvider } from '@stock-bot/cache'; import createCache, { type CacheProvider } from '@stock-bot/cache';
import { HttpClient, HttpClientConfig, ProxyConfig , RequestConfig } from '@stock-bot/http'; import { HttpClient, HttpClientConfig, ProxyConfig , RequestConfig } from '@stock-bot/http';
import pLimit from 'p-limit';
export interface ProxySource { export interface ProxySource {
url: string; url: string;
@ -35,6 +36,7 @@ export class ProxyService {
private logger; private logger;
private cache: CacheProvider; private cache: CacheProvider;
private httpClient: HttpClient; private httpClient: HttpClient;
private readonly concurrencyLimit = pLimit(200);
private readonly CACHE_PREFIX = 'proxy:'; private readonly CACHE_PREFIX = 'proxy:';
private readonly WORKING_PROXIES_KEY = 'proxy:working'; private readonly WORKING_PROXIES_KEY = 'proxy:working';
private readonly PROXY_STATS_KEY = 'proxy:stats'; private readonly PROXY_STATS_KEY = 'proxy:stats';
@ -434,16 +436,14 @@ export class ProxyService {
}; };
} }
} }
/** /**
* Validate proxies in background * Validate proxies in background
*/ */
private async validateProxiesInBackground(proxies: ProxyConfig[]): Promise<void> { private async validateProxiesInBackground(proxies: ProxyConfig[]): Promise<void> {
this.logger.info('Starting background proxy validation', { count: proxies.length }); this.logger.info('Starting background proxy validation', { count: proxies.length });
const concurrency = 50; // Process 50 proxies concurrently
const chunks = this.chunkArray(proxies, concurrency); const validationPromises = proxies.map(proxy =>
for (const chunk of chunks) { this.concurrencyLimit(() =>
const validationPromises = chunk.map(proxy =>
this.checkProxy(proxy).catch(error => { this.checkProxy(proxy).catch(error => {
this.logger.error('Error validating proxy', { this.logger.error('Error validating proxy', {
host: proxy.host, host: proxy.host,
@ -452,15 +452,12 @@ export class ProxyService {
}); });
return null; return null;
}) })
)
); );
await Promise.allSettled(validationPromises);
// Small delay between chunks to avoid overwhelming the system
await new Promise(resolve => setTimeout(resolve, 1000));
}
await Promise.allSettled(validationPromises);
this.logger.info('Background proxy validation completed'); this.logger.info('Background proxy validation completed');
} }
/** /**
* Start periodic proxy health checks * Start periodic proxy health checks
*/ */
@ -470,7 +467,9 @@ export class ProxyService {
setInterval(async () => { setInterval(async () => {
try { try {
const workingProxies = await this.getWorkingProxies(100); // Check up to 100 working proxies const workingProxies = await this.getWorkingProxies(100); // Check up to 100 working proxies
const validationPromises = workingProxies.map(proxy => this.checkProxy(proxy)); const validationPromises = workingProxies.map(proxy =>
this.concurrencyLimit(() => this.checkProxy(proxy))
);
const results = await Promise.allSettled(validationPromises); const results = await Promise.allSettled(validationPromises);
const successCount = results.filter(r => const successCount = results.filter(r =>
@ -502,7 +501,6 @@ export class ProxyService {
this.logger.error('Error clearing proxy data', error); this.logger.error('Error clearing proxy data', error);
} }
} }
/** /**
* Get cache key for a proxy * Get cache key for a proxy
*/ */
@ -510,17 +508,6 @@ export class ProxyService {
return `${this.CACHE_PREFIX}${proxy.host}:${proxy.port}`; return `${this.CACHE_PREFIX}${proxy.host}:${proxy.port}`;
} }
/**
* Split array into chunks
*/
private chunkArray<T>(array: T[], size: number): T[][] {
const chunks: T[][] = [];
for (let i = 0; i < array.length; i += size) {
chunks.push(array.slice(i, i + size));
}
return chunks;
}
/** /**
* Graceful shutdown * Graceful shutdown
*/ */

View file

@ -71,6 +71,7 @@
"@stock-bot/shutdown": "*", "@stock-bot/shutdown": "*",
"@stock-bot/types": "*", "@stock-bot/types": "*",
"hono": "^4.0.0", "hono": "^4.0.0",
"p-limit": "^6.2.0",
"ws": "^8.0.0", "ws": "^8.0.0",
}, },
"devDependencies": { "devDependencies": {
@ -1646,7 +1647,7 @@
"p-cancelable": ["p-cancelable@4.0.1", "", {}, "sha512-wBowNApzd45EIKdO1LaU+LrMBwAcjfPaYtVzV3lmfM3gf8Z4CHZsiIqlM8TZZ8okYvh5A1cP6gTfCRQtwUpaUg=="], "p-cancelable": ["p-cancelable@4.0.1", "", {}, "sha512-wBowNApzd45EIKdO1LaU+LrMBwAcjfPaYtVzV3lmfM3gf8Z4CHZsiIqlM8TZZ8okYvh5A1cP6gTfCRQtwUpaUg=="],
"p-limit": ["p-limit@3.1.0", "", { "dependencies": { "yocto-queue": "^0.1.0" } }, "sha512-TYOanM3wGwNGsZN2cVTYPArw454xnXj5qmWF1bEoAc4+cU/ol7GVh7odevjp1FNHduHc3KZMcFduxU5Xc6uJRQ=="], "p-limit": ["p-limit@6.2.0", "", { "dependencies": { "yocto-queue": "^1.1.1" } }, "sha512-kuUqqHNUqoIWp/c467RI4X6mmyuojY5jGutNU0wVTmEOOfcuwLqyMVoAi9MKi2Ak+5i9+nhmrK4ufZE8069kHA=="],
"p-locate": ["p-locate@5.0.0", "", { "dependencies": { "p-limit": "^3.0.2" } }, "sha512-LaNjtRWUBY++zB5nE/NwcaoMylSPk+S+ZHNB1TzdbMJMny6dynpAGt7X/tl/QYq3TIeE6nxHppbo2LGymrG5Pw=="], "p-locate": ["p-locate@5.0.0", "", { "dependencies": { "p-limit": "^3.0.2" } }, "sha512-LaNjtRWUBY++zB5nE/NwcaoMylSPk+S+ZHNB1TzdbMJMny6dynpAGt7X/tl/QYq3TIeE6nxHppbo2LGymrG5Pw=="],
@ -2060,7 +2061,7 @@
"yauzl": ["yauzl@3.2.0", "", { "dependencies": { "buffer-crc32": "~0.2.3", "pend": "~1.2.0" } }, "sha512-Ow9nuGZE+qp1u4JIPvg+uCiUr7xGQWdff7JQSk5VGYTAZMDe2q8lxJ10ygv10qmSj031Ty/6FNJpLO4o1Sgc+w=="], "yauzl": ["yauzl@3.2.0", "", { "dependencies": { "buffer-crc32": "~0.2.3", "pend": "~1.2.0" } }, "sha512-Ow9nuGZE+qp1u4JIPvg+uCiUr7xGQWdff7JQSk5VGYTAZMDe2q8lxJ10ygv10qmSj031Ty/6FNJpLO4o1Sgc+w=="],
"yocto-queue": ["yocto-queue@0.1.0", "", {}, "sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q=="], "yocto-queue": ["yocto-queue@1.2.1", "", {}, "sha512-AyeEbWOu/TAXdxlV9wmGcR0+yh2j3vYPGOECcIj2S7MkrLyC7ne+oye2BKTItt0ii2PHk4cDy+95+LshzbXnGg=="],
"yoctocolors-cjs": ["yoctocolors-cjs@2.1.2", "", {}, "sha512-cYVsTjKl8b+FrnidjibDWskAv7UKOfcwaVZdp/it9n1s9fU3IkgDbhdIRKCW4JDsAlECJY0ytoVPT3sK6kideA=="], "yoctocolors-cjs": ["yoctocolors-cjs@2.1.2", "", {}, "sha512-cYVsTjKl8b+FrnidjibDWskAv7UKOfcwaVZdp/it9n1s9fU3IkgDbhdIRKCW4JDsAlECJY0ytoVPT3sK6kideA=="],
@ -2278,6 +2279,8 @@
"ora/strip-ansi": ["strip-ansi@7.1.0", "", { "dependencies": { "ansi-regex": "^6.0.1" } }, "sha512-iq6eVVI64nQQTRYq2KtEg2d2uU7LElhTJwsH4YzIHZshxlgZms/wIc4VoDQTlG/IvVIrBKG06CrZnp0qv7hkcQ=="], "ora/strip-ansi": ["strip-ansi@7.1.0", "", { "dependencies": { "ansi-regex": "^6.0.1" } }, "sha512-iq6eVVI64nQQTRYq2KtEg2d2uU7LElhTJwsH4YzIHZshxlgZms/wIc4VoDQTlG/IvVIrBKG06CrZnp0qv7hkcQ=="],
"p-locate/p-limit": ["p-limit@3.1.0", "", { "dependencies": { "yocto-queue": "^0.1.0" } }, "sha512-TYOanM3wGwNGsZN2cVTYPArw454xnXj5qmWF1bEoAc4+cU/ol7GVh7odevjp1FNHduHc3KZMcFduxU5Xc6uJRQ=="],
"path-scurry/lru-cache": ["lru-cache@10.4.3", "", {}, "sha512-JNAzZcXrCt42VGLuYz0zfAzDfAvJWW6AfYlDBQyDV5DClI2m5sAmK+OIO7s59XfsRsWHp02jAJrRadPRGTt6SQ=="], "path-scurry/lru-cache": ["lru-cache@10.4.3", "", {}, "sha512-JNAzZcXrCt42VGLuYz0zfAzDfAvJWW6AfYlDBQyDV5DClI2m5sAmK+OIO7s59XfsRsWHp02jAJrRadPRGTt6SQ=="],
"pkg-dir/find-up": ["find-up@4.1.0", "", { "dependencies": { "locate-path": "^5.0.0", "path-exists": "^4.0.0" } }, "sha512-PpOwAdQ/YlXQ2vj8a3h8IipDuYRi3wceVQQGYWxNINccq40Anw7BlsEXCMbt1Zt+OLA6Fq9suIpIWD0OsnISlw=="], "pkg-dir/find-up": ["find-up@4.1.0", "", { "dependencies": { "locate-path": "^5.0.0", "path-exists": "^4.0.0" } }, "sha512-PpOwAdQ/YlXQ2vj8a3h8IipDuYRi3wceVQQGYWxNINccq40Anw7BlsEXCMbt1Zt+OLA6Fq9suIpIWD0OsnISlw=="],
@ -2434,6 +2437,8 @@
"ora/strip-ansi/ansi-regex": ["ansi-regex@6.1.0", "", {}, "sha512-7HSX4QQb4CspciLpVFwyRe79O3xsIZDDLER21kERQ71oaPodF8jL725AgJMFAYbooIqolJoRLuM81SpeUkpkvA=="], "ora/strip-ansi/ansi-regex": ["ansi-regex@6.1.0", "", {}, "sha512-7HSX4QQb4CspciLpVFwyRe79O3xsIZDDLER21kERQ71oaPodF8jL725AgJMFAYbooIqolJoRLuM81SpeUkpkvA=="],
"p-locate/p-limit/yocto-queue": ["yocto-queue@0.1.0", "", {}, "sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q=="],
"pkg-dir/find-up/locate-path": ["locate-path@5.0.0", "", { "dependencies": { "p-locate": "^4.1.0" } }, "sha512-t7hw9pI+WvuwNJXwk5zVHpyhIqzg2qTlklJOf0mVxGSbe3Fp2VieZcduNYjaLDoy6p9uGpQEGWG87WpMKlNq8g=="], "pkg-dir/find-up/locate-path": ["locate-path@5.0.0", "", { "dependencies": { "p-locate": "^4.1.0" } }, "sha512-t7hw9pI+WvuwNJXwk5zVHpyhIqzg2qTlklJOf0mVxGSbe3Fp2VieZcduNYjaLDoy6p9uGpQEGWG87WpMKlNq8g=="],
"readdir-glob/minimatch/brace-expansion": ["brace-expansion@2.0.1", "", { "dependencies": { "balanced-match": "^1.0.0" } }, "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA=="], "readdir-glob/minimatch/brace-expansion": ["brace-expansion@2.0.1", "", { "dependencies": { "balanced-match": "^1.0.0" } }, "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA=="],