diff --git a/apps/data-service/package.json b/apps/data-service/package.json index bf8141f..d56e9db 100644 --- a/apps/data-service/package.json +++ b/apps/data-service/package.json @@ -10,17 +10,19 @@ "start": "bun dist/index.js", "test": "bun test", "clean": "rm -rf dist" - }, "dependencies": { + }, + "dependencies": { + "@stock-bot/cache": "*", "@stock-bot/config": "*", - "@stock-bot/logger": "*", - "@stock-bot/shutdown": "*", - "@stock-bot/types": "*", - "@stock-bot/questdb-client": "*", - "@stock-bot/mongodb-client": "*", "@stock-bot/event-bus": "*", "@stock-bot/http": "*", - "@stock-bot/cache": "*", + "@stock-bot/logger": "*", + "@stock-bot/mongodb-client": "*", + "@stock-bot/questdb-client": "*", + "@stock-bot/shutdown": "*", + "@stock-bot/types": "*", "hono": "^4.0.0", + "p-limit": "^6.2.0", "ws": "^8.0.0" }, "devDependencies": { diff --git a/apps/data-service/src/services/proxy.service.ts b/apps/data-service/src/services/proxy.service.ts index bf8001f..e231d3e 100644 --- a/apps/data-service/src/services/proxy.service.ts +++ b/apps/data-service/src/services/proxy.service.ts @@ -1,6 +1,7 @@ import { Logger } from '@stock-bot/logger'; import createCache, { type CacheProvider } from '@stock-bot/cache'; import { HttpClient, HttpClientConfig, ProxyConfig , RequestConfig } from '@stock-bot/http'; +import pLimit from 'p-limit'; export interface ProxySource { url: string; @@ -35,6 +36,7 @@ export class ProxyService { private logger; private cache: CacheProvider; private httpClient: HttpClient; + private readonly concurrencyLimit = pLimit(200); private readonly CACHE_PREFIX = 'proxy:'; private readonly WORKING_PROXIES_KEY = 'proxy:working'; private readonly PROXY_STATS_KEY = 'proxy:stats'; @@ -434,16 +436,14 @@ export class ProxyService { }; } } - /** * Validate proxies in background */ private async validateProxiesInBackground(proxies: ProxyConfig[]): Promise { this.logger.info('Starting background proxy validation', { count: proxies.length }); - const concurrency = 50; // Process 50 proxies concurrently - const chunks = this.chunkArray(proxies, concurrency); - for (const chunk of chunks) { - const validationPromises = chunk.map(proxy => + + const validationPromises = proxies.map(proxy => + this.concurrencyLimit(() => this.checkProxy(proxy).catch(error => { this.logger.error('Error validating proxy', { host: proxy.host, @@ -452,15 +452,12 @@ export class ProxyService { }); return null; }) - ); - await Promise.allSettled(validationPromises); - // Small delay between chunks to avoid overwhelming the system - await new Promise(resolve => setTimeout(resolve, 1000)); - } + ) + ); + await Promise.allSettled(validationPromises); this.logger.info('Background proxy validation completed'); } - /** * Start periodic proxy health checks */ @@ -470,7 +467,9 @@ export class ProxyService { setInterval(async () => { try { const workingProxies = await this.getWorkingProxies(100); // Check up to 100 working proxies - const validationPromises = workingProxies.map(proxy => this.checkProxy(proxy)); + const validationPromises = workingProxies.map(proxy => + this.concurrencyLimit(() => this.checkProxy(proxy)) + ); const results = await Promise.allSettled(validationPromises); const successCount = results.filter(r => @@ -502,7 +501,6 @@ export class ProxyService { this.logger.error('Error clearing proxy data', error); } } - /** * Get cache key for a proxy */ @@ -510,17 +508,6 @@ export class ProxyService { return `${this.CACHE_PREFIX}${proxy.host}:${proxy.port}`; } - /** - * Split array into chunks - */ - private chunkArray(array: T[], size: number): T[][] { - const chunks: T[][] = []; - for (let i = 0; i < array.length; i += size) { - chunks.push(array.slice(i, i + size)); - } - return chunks; - } - /** * Graceful shutdown */ diff --git a/bun.lock b/bun.lock index 4a6509d..52a70d9 100644 --- a/bun.lock +++ b/bun.lock @@ -71,6 +71,7 @@ "@stock-bot/shutdown": "*", "@stock-bot/types": "*", "hono": "^4.0.0", + "p-limit": "^6.2.0", "ws": "^8.0.0", }, "devDependencies": { @@ -1646,7 +1647,7 @@ "p-cancelable": ["p-cancelable@4.0.1", "", {}, "sha512-wBowNApzd45EIKdO1LaU+LrMBwAcjfPaYtVzV3lmfM3gf8Z4CHZsiIqlM8TZZ8okYvh5A1cP6gTfCRQtwUpaUg=="], - "p-limit": ["p-limit@3.1.0", "", { "dependencies": { "yocto-queue": "^0.1.0" } }, "sha512-TYOanM3wGwNGsZN2cVTYPArw454xnXj5qmWF1bEoAc4+cU/ol7GVh7odevjp1FNHduHc3KZMcFduxU5Xc6uJRQ=="], + "p-limit": ["p-limit@6.2.0", "", { "dependencies": { "yocto-queue": "^1.1.1" } }, "sha512-kuUqqHNUqoIWp/c467RI4X6mmyuojY5jGutNU0wVTmEOOfcuwLqyMVoAi9MKi2Ak+5i9+nhmrK4ufZE8069kHA=="], "p-locate": ["p-locate@5.0.0", "", { "dependencies": { "p-limit": "^3.0.2" } }, "sha512-LaNjtRWUBY++zB5nE/NwcaoMylSPk+S+ZHNB1TzdbMJMny6dynpAGt7X/tl/QYq3TIeE6nxHppbo2LGymrG5Pw=="], @@ -2060,7 +2061,7 @@ "yauzl": ["yauzl@3.2.0", "", { "dependencies": { "buffer-crc32": "~0.2.3", "pend": "~1.2.0" } }, "sha512-Ow9nuGZE+qp1u4JIPvg+uCiUr7xGQWdff7JQSk5VGYTAZMDe2q8lxJ10ygv10qmSj031Ty/6FNJpLO4o1Sgc+w=="], - "yocto-queue": ["yocto-queue@0.1.0", "", {}, "sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q=="], + "yocto-queue": ["yocto-queue@1.2.1", "", {}, "sha512-AyeEbWOu/TAXdxlV9wmGcR0+yh2j3vYPGOECcIj2S7MkrLyC7ne+oye2BKTItt0ii2PHk4cDy+95+LshzbXnGg=="], "yoctocolors-cjs": ["yoctocolors-cjs@2.1.2", "", {}, "sha512-cYVsTjKl8b+FrnidjibDWskAv7UKOfcwaVZdp/it9n1s9fU3IkgDbhdIRKCW4JDsAlECJY0ytoVPT3sK6kideA=="], @@ -2278,6 +2279,8 @@ "ora/strip-ansi": ["strip-ansi@7.1.0", "", { "dependencies": { "ansi-regex": "^6.0.1" } }, "sha512-iq6eVVI64nQQTRYq2KtEg2d2uU7LElhTJwsH4YzIHZshxlgZms/wIc4VoDQTlG/IvVIrBKG06CrZnp0qv7hkcQ=="], + "p-locate/p-limit": ["p-limit@3.1.0", "", { "dependencies": { "yocto-queue": "^0.1.0" } }, "sha512-TYOanM3wGwNGsZN2cVTYPArw454xnXj5qmWF1bEoAc4+cU/ol7GVh7odevjp1FNHduHc3KZMcFduxU5Xc6uJRQ=="], + "path-scurry/lru-cache": ["lru-cache@10.4.3", "", {}, "sha512-JNAzZcXrCt42VGLuYz0zfAzDfAvJWW6AfYlDBQyDV5DClI2m5sAmK+OIO7s59XfsRsWHp02jAJrRadPRGTt6SQ=="], "pkg-dir/find-up": ["find-up@4.1.0", "", { "dependencies": { "locate-path": "^5.0.0", "path-exists": "^4.0.0" } }, "sha512-PpOwAdQ/YlXQ2vj8a3h8IipDuYRi3wceVQQGYWxNINccq40Anw7BlsEXCMbt1Zt+OLA6Fq9suIpIWD0OsnISlw=="], @@ -2434,6 +2437,8 @@ "ora/strip-ansi/ansi-regex": ["ansi-regex@6.1.0", "", {}, "sha512-7HSX4QQb4CspciLpVFwyRe79O3xsIZDDLER21kERQ71oaPodF8jL725AgJMFAYbooIqolJoRLuM81SpeUkpkvA=="], + "p-locate/p-limit/yocto-queue": ["yocto-queue@0.1.0", "", {}, "sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q=="], + "pkg-dir/find-up/locate-path": ["locate-path@5.0.0", "", { "dependencies": { "p-locate": "^4.1.0" } }, "sha512-t7hw9pI+WvuwNJXwk5zVHpyhIqzg2qTlklJOf0mVxGSbe3Fp2VieZcduNYjaLDoy6p9uGpQEGWG87WpMKlNq8g=="], "readdir-glob/minimatch/brace-expansion": ["brace-expansion@2.0.1", "", { "dependencies": { "balanced-match": "^1.0.0" } }, "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA=="],