From 67833a2fd743c3f34703a2ef6d9a0b2929b09ad9 Mon Sep 17 00:00:00 2001 From: Boki Date: Sat, 21 Jun 2025 11:13:50 -0400 Subject: [PATCH] refactored data-service fully --- .../src/handlers/ib/ib.handler.ts | 12 +- .../src/handlers/ib/ib.operations.ts | 309 ---------- .../ib/operations/exchanges.operations.ts | 67 ++ .../ib/operations/session.operations.ts | 88 +++ .../ib/operations/symbols.operations.ts | 125 ++++ .../src/handlers/ib/shared/config.ts | 23 + .../proxy/operations/check.operations.ts | 175 ++++++ .../proxy/operations/fetch.operations.ts | 112 ++++ .../proxy/operations/query.operations.ts | 79 +++ .../proxy/operations/queue.operations.ts | 40 ++ .../src/handlers/proxy/proxy.handler.ts | 4 +- .../src/handlers/proxy/proxy.operations.ts | 578 ------------------ .../src/handlers/proxy/shared/config.ts | 140 +++++ .../handlers/proxy/shared/proxy-manager.ts | 56 ++ .../src/handlers/proxy/shared/types.ts | 13 + .../fetch.operations.ts} | 23 +- .../src/handlers/webshare/shared/config.ts | 10 + .../src/handlers/webshare/webshare.handler.ts | 2 +- 18 files changed, 952 insertions(+), 904 deletions(-) delete mode 100644 apps/data-service/src/handlers/ib/ib.operations.ts create mode 100644 apps/data-service/src/handlers/ib/operations/exchanges.operations.ts create mode 100644 apps/data-service/src/handlers/ib/operations/session.operations.ts create mode 100644 apps/data-service/src/handlers/ib/operations/symbols.operations.ts create mode 100644 apps/data-service/src/handlers/ib/shared/config.ts create mode 100644 apps/data-service/src/handlers/proxy/operations/check.operations.ts create mode 100644 apps/data-service/src/handlers/proxy/operations/fetch.operations.ts create mode 100644 apps/data-service/src/handlers/proxy/operations/query.operations.ts create mode 100644 apps/data-service/src/handlers/proxy/operations/queue.operations.ts delete mode 100644 apps/data-service/src/handlers/proxy/proxy.operations.ts create mode 100644 apps/data-service/src/handlers/proxy/shared/config.ts create mode 100644 apps/data-service/src/handlers/proxy/shared/proxy-manager.ts create mode 100644 apps/data-service/src/handlers/proxy/shared/types.ts rename apps/data-service/src/handlers/webshare/{webshare.operations.ts => operations/fetch.operations.ts} (65%) create mode 100644 apps/data-service/src/handlers/webshare/shared/config.ts diff --git a/apps/data-service/src/handlers/ib/ib.handler.ts b/apps/data-service/src/handlers/ib/ib.handler.ts index 31dbecb..d4ef5d8 100644 --- a/apps/data-service/src/handlers/ib/ib.handler.ts +++ b/apps/data-service/src/handlers/ib/ib.handler.ts @@ -20,14 +20,15 @@ export function initializeIBProvider() { 'fetch-session': createJobHandler(async () => { // payload contains session configuration (not used in current implementation) logger.debug('Processing session fetch request'); - const { fetchSession } = await import('./ib.operations'); + const { fetchSession } = await import('./operations/session.operations'); return fetchSession(); }), 'fetch-exchanges': createJobHandler(async () => { // payload should contain session headers logger.debug('Processing exchanges fetch request'); - const { fetchSession, fetchExchanges } = await import('./ib.operations'); + const { fetchSession } = await import('./operations/session.operations'); + const { fetchExchanges } = await import('./operations/exchanges.operations'); const sessionHeaders = await fetchSession(); if (sessionHeaders) { return fetchExchanges(sessionHeaders); @@ -38,7 +39,8 @@ export function initializeIBProvider() { 'fetch-symbols': createJobHandler(async () => { // payload should contain session headers logger.debug('Processing symbols fetch request'); - const { fetchSession, fetchSymbols } = await import('./ib.operations'); + const { fetchSession } = await import('./operations/session.operations'); + const { fetchSymbols } = await import('./operations/symbols.operations'); const sessionHeaders = await fetchSession(); if (sessionHeaders) { return fetchSymbols(sessionHeaders); @@ -49,7 +51,9 @@ export function initializeIBProvider() { 'ib-exchanges-and-symbols': createJobHandler(async () => { // Legacy operation for scheduled jobs logger.info('Fetching symbol summary from IB'); - const { fetchSession, fetchExchanges, fetchSymbols } = await import('./ib.operations'); + const { fetchSession } = await import('./operations/session.operations'); + const { fetchExchanges } = await import('./operations/exchanges.operations'); + const { fetchSymbols } = await import('./operations/symbols.operations'); const sessionHeaders = await fetchSession(); logger.info('Fetched symbol summary from IB'); diff --git a/apps/data-service/src/handlers/ib/ib.operations.ts b/apps/data-service/src/handlers/ib/ib.operations.ts deleted file mode 100644 index 5fa6a9d..0000000 --- a/apps/data-service/src/handlers/ib/ib.operations.ts +++ /dev/null @@ -1,309 +0,0 @@ -import { Browser } from '@stock-bot/browser'; -import { getLogger } from '@stock-bot/logger'; -import { getMongoDBClient } from '@stock-bot/mongodb-client'; - -// Shared instances (module-scoped, not global) -let isInitialized = false; // Track if resources are initialized -let logger: ReturnType; -// let cache: CacheProvider; - -export async function initializeIBResources(): Promise { - // Skip if already initialized - if (isInitialized) { - return; - } - - logger = getLogger('proxy-tasks'); - // cache = createCache({ - // keyPrefix: 'proxy:', - // ttl: PROXY_CONFIG.CACHE_TTL, - // enableMetrics: true, - // }); - - // httpClient = new HttpClient({ timeout: 15000 }, logger); - - // if (waitForCache) { - // // logger.info('Initializing proxy cache...'); - // // await cache.waitForReady(10000); - // // logger.info('Proxy cache initialized successfully'); - // logger.info('Proxy tasks initialized'); - // } else { - // logger.info('Proxy tasks initialized (fallback mode)'); - // } - isInitialized = true; -} - -export async function fetchSession(): Promise | undefined> { - try { - await Browser.initialize({ headless: true, timeout: 10000, blockResources: false }); - logger.info('✅ Browser initialized'); - - const { page } = await Browser.createPageWithProxy( - 'https://www.interactivebrokers.com/en/trading/products-exchanges.php#/', - 'http://doimvbnb-US-rotate:w5fpiwrb9895@p.webshare.io:80' - ); - logger.info('✅ Page created with proxy'); - - const headersPromise = new Promise | undefined>(resolve => { - let resolved = false; - - page.onNetworkEvent(event => { - if (event.url.includes('/webrest/search/product-types/summary')) { - if (event.type === 'request') { - try { - resolve(event.headers); - } catch (e) { - resolve(undefined); - logger.debug('Raw Summary Response error', { error: (e as Error).message }); - } - } - } - }); - - // Timeout fallback - setTimeout(() => { - if (!resolved) { - resolved = true; - logger.warn('Timeout waiting for headers'); - resolve(undefined); - } - }, 30000); - }); - - logger.info('⏳ Waiting for page load...'); - await page.waitForLoadState('domcontentloaded', { timeout: 20000 }); - logger.info('✅ Page loaded'); - - //Products tabs - logger.info('🔍 Looking for Products tab...'); - const productsTab = page.locator('#productSearchTab[role="tab"][href="#products"]'); - await productsTab.waitFor({ timeout: 5000 }); - logger.info('✅ Found Products tab'); - logger.info('🖱️ Clicking Products tab...'); - await productsTab.click(); - logger.info('✅ Products tab clicked'); - - // New Products Checkbox - logger.info('🔍 Looking for "New Products Only" radio button...'); - const radioButton = page.locator('span.checkbox-text:has-text("New Products Only")'); - await radioButton.waitFor({ timeout: 5000 }); - logger.info(`🎯 Found "New Products Only" radio button`); - await radioButton.first().click(); - logger.info('✅ "New Products Only" radio button clicked'); - - // Wait for and return headers immediately when captured - logger.info('⏳ Waiting for headers to be captured...'); - const headers = await headersPromise; - page.close(); - if (headers) { - logger.info('✅ Headers captured successfully'); - } else { - logger.warn('⚠️ No headers were captured'); - } - - return headers; - } catch (error) { - logger.error('Failed to fetch IB symbol summary', { error }); - return; - } -} - -export async function fetchExchanges(sessionHeaders: Record): Promise { - try { - logger.info('🔍 Fetching exchanges with session headers...'); - - // The URL for the exchange data API - const exchangeUrl = 'https://www.interactivebrokers.com/webrest/exchanges'; - - // Configure the proxy - const proxyUrl = 'http://doimvbnb-US-rotate:w5fpiwrb9895@p.webshare.io:80'; - - // Prepare headers - include all session headers plus any additional ones - const requestHeaders = { - ...sessionHeaders, - Accept: 'application/json, text/plain, */*', - 'Accept-Language': 'en-US,en;q=0.9', - 'Cache-Control': 'no-cache', - Pragma: 'no-cache', - 'Sec-Fetch-Dest': 'empty', - 'Sec-Fetch-Mode': 'cors', - 'Sec-Fetch-Site': 'same-origin', - 'X-Requested-With': 'XMLHttpRequest', - }; - - logger.info('📤 Making request to exchange API...', { - url: exchangeUrl, - headerCount: Object.keys(requestHeaders).length, - }); - - // Use fetch with proxy configuration - const response = await fetch(exchangeUrl, { - method: 'GET', - headers: requestHeaders, - proxy: proxyUrl, - }); - - if (!response.ok) { - logger.error('❌ Exchange API request failed', { - status: response.status, - statusText: response.statusText, - }); - return null; - } - - const data = await response.json(); - const exchanges = data?.exchanges || []; - logger.info('✅ Exchange data fetched successfully'); - - logger.info('Saving IB exchanges to MongoDB...'); - const client = getMongoDBClient(); - await client.batchUpsert('ibExchanges', exchanges, ['id', 'country_code']); - logger.info('✅ Exchange IB data saved to MongoDB:', { - count: exchanges.length, - }); - - return exchanges; - } catch (error) { - logger.error('❌ Failed to fetch exchanges', { error }); - return null; - } -} - -// Fetch symbols from IB using the session headers -export async function fetchSymbols(sessionHeaders: Record): Promise { - try { - logger.info('🔍 Fetching symbols with session headers...'); - // Configure the proxy - const proxyUrl = 'http://doimvbnb-US-rotate:w5fpiwrb9895@p.webshare.io:80'; - // Prepare headers - include all session headers plus any additional ones - const requestHeaders = { - ...sessionHeaders, - Accept: 'application/json, text/plain, */*', - 'Accept-Language': 'en-US,en;q=0.9', - 'Cache-Control': 'no-cache', - Pragma: 'no-cache', - 'Sec-Fetch-Dest': 'empty', - 'Sec-Fetch-Mode': 'cors', - 'Sec-Fetch-Site': 'same-origin', - 'X-Requested-With': 'XMLHttpRequest', - }; - - const requestBody = { - domain: 'com', - newProduct: 'all', - pageNumber: 1, - pageSize: 100, - productCountry: ['CA', 'US'], - productSymbol: '', - productType: ['STK'], - sortDirection: 'asc', - sortField: 'symbol', - }; - - // Get Summary - const summaryResponse = await fetch( - 'https://www.interactivebrokers.com/webrest/search/product-types/summary', - { - method: 'POST', - headers: requestHeaders, - proxy: proxyUrl, - body: JSON.stringify(requestBody), - } - ); - - if (!summaryResponse.ok) { - logger.error('❌ Summary API request failed', { - status: summaryResponse.status, - statusText: summaryResponse.statusText, - }); - return null; - } - - const summaryData = await summaryResponse.json(); - logger.info('✅ IB Summary data fetched successfully', { - totalCount: summaryData[0].totalCount, - }); - - const symbols = []; - requestBody.pageSize = 500; - const pageCount = Math.ceil(summaryData[0].totalCount / 500) || 0; - logger.info('Fetching Symbols for IB', { pageCount }); - const symbolPromises = []; - for (let page = 1; page <= pageCount; page++) { - requestBody.pageNumber = page; - - // Fetch symbols for the current page - const symbolsResponse = fetch( - 'https://www.interactivebrokers.com/webrest/search/products-by-filters', - { - method: 'POST', - headers: requestHeaders, - proxy: proxyUrl, - body: JSON.stringify(requestBody), - } - ); - symbolPromises.push(symbolsResponse); - } - const responses = await Promise.all(symbolPromises); - for (const response of responses) { - if (!response.ok) { - logger.error('❌ Symbols API request failed', { - status: response.status, - statusText: response.statusText, - }); - return null; - } - const data = await response.json(); - const symJson = data?.products || []; - if (symJson && symJson.length > 0) { - symbols.push(...symJson); - } else { - logger.warn('⚠️ No symbols found in response'); - continue; - } - } - if (symbols.length === 0) { - logger.warn('⚠️ No symbols fetched from IB'); - return null; - } - - logger.info('✅ IB symbols fetched successfully, saving to DB...', { - totalSymbols: symbols.length, - }); - const client = getMongoDBClient(); - await client.batchUpsert('ib_symbols', symbols, ['symbol', 'exchangeId']); - logger.info('Saved IB symbols to DB', { - totalSymbols: symbols.length, - }); - - return symbols; - // logger.info('📤 Making request to exchange API...', { - // url: exchangeUrl, - // headerCount: Object.keys(requestHeaders).length, - // }); - - // // Use fetch with proxy configuration - // const response = await fetch(exchangeUrl, { - // method: 'GET', - // headers: requestHeaders, - // proxy: proxyUrl, - // }); - - // if (!response.ok) { - // logger.error('❌ Exchange API request failed', { - // status: response.status, - // statusText: response.statusText, - // }); - // return null; - // } - } catch (error) { - logger.error('❌ Failed to fetch symbols', { error }); - return null; - } -} - -export const ibTasks = { - fetchSymbols, - fetchSession, - fetchExchanges, -}; diff --git a/apps/data-service/src/handlers/ib/operations/exchanges.operations.ts b/apps/data-service/src/handlers/ib/operations/exchanges.operations.ts new file mode 100644 index 0000000..4260442 --- /dev/null +++ b/apps/data-service/src/handlers/ib/operations/exchanges.operations.ts @@ -0,0 +1,67 @@ +/** + * IB Exchanges Operations - Fetching exchange data from IB API + */ +import { getMongoDBClient } from '@stock-bot/mongodb-client'; +import { OperationContext } from '@stock-bot/utils'; + +import { IB_CONFIG } from '../shared/config'; + +export async function fetchExchanges(sessionHeaders: Record): Promise { + const ctx = OperationContext.create('ib', 'exchanges'); + + try { + ctx.logger.info('🔍 Fetching exchanges with session headers...'); + + // The URL for the exchange data API + const exchangeUrl = IB_CONFIG.BASE_URL + IB_CONFIG.EXCHANGE_API; + + // Prepare headers - include all session headers plus any additional ones + const requestHeaders = { + ...sessionHeaders, + Accept: 'application/json, text/plain, */*', + 'Accept-Language': 'en-US,en;q=0.9', + 'Cache-Control': 'no-cache', + Pragma: 'no-cache', + 'Sec-Fetch-Dest': 'empty', + 'Sec-Fetch-Mode': 'cors', + 'Sec-Fetch-Site': 'same-origin', + 'X-Requested-With': 'XMLHttpRequest', + }; + + ctx.logger.info('📤 Making request to exchange API...', { + url: exchangeUrl, + headerCount: Object.keys(requestHeaders).length, + }); + + // Use fetch with proxy configuration + const response = await fetch(exchangeUrl, { + method: 'GET', + headers: requestHeaders, + proxy: IB_CONFIG.DEFAULT_PROXY, + }); + + if (!response.ok) { + ctx.logger.error('❌ Exchange API request failed', { + status: response.status, + statusText: response.statusText, + }); + return null; + } + + const data = await response.json(); + const exchanges = data?.exchanges || []; + ctx.logger.info('✅ Exchange data fetched successfully'); + + ctx.logger.info('Saving IB exchanges to MongoDB...'); + const client = getMongoDBClient(); + await client.batchUpsert('ibExchanges', exchanges, ['id', 'country_code']); + ctx.logger.info('✅ Exchange IB data saved to MongoDB:', { + count: exchanges.length, + }); + + return exchanges; + } catch (error) { + ctx.logger.error('❌ Failed to fetch exchanges', { error }); + return null; + } +} \ No newline at end of file diff --git a/apps/data-service/src/handlers/ib/operations/session.operations.ts b/apps/data-service/src/handlers/ib/operations/session.operations.ts new file mode 100644 index 0000000..e67f420 --- /dev/null +++ b/apps/data-service/src/handlers/ib/operations/session.operations.ts @@ -0,0 +1,88 @@ +/** + * IB Session Operations - Browser automation for session headers + */ +import { Browser } from '@stock-bot/browser'; +import { OperationContext } from '@stock-bot/utils'; + +import { IB_CONFIG } from '../shared/config'; + +export async function fetchSession(): Promise | undefined> { + const ctx = OperationContext.create('ib', 'session'); + + try { + await Browser.initialize({ + headless: true, + timeout: IB_CONFIG.BROWSER_TIMEOUT, + blockResources: false + }); + ctx.logger.info('✅ Browser initialized'); + + const { page } = await Browser.createPageWithProxy( + IB_CONFIG.BASE_URL + IB_CONFIG.PRODUCTS_PAGE, + IB_CONFIG.DEFAULT_PROXY + ); + ctx.logger.info('✅ Page created with proxy'); + + const headersPromise = new Promise | undefined>(resolve => { + let resolved = false; + + page.onNetworkEvent(event => { + if (event.url.includes('/webrest/search/product-types/summary')) { + if (event.type === 'request') { + try { + resolve(event.headers); + } catch (e) { + resolve(undefined); + ctx.logger.debug('Raw Summary Response error', { error: (e as Error).message }); + } + } + } + }); + + // Timeout fallback + setTimeout(() => { + if (!resolved) { + resolved = true; + ctx.logger.warn('Timeout waiting for headers'); + resolve(undefined); + } + }, IB_CONFIG.HEADERS_TIMEOUT); + }); + + ctx.logger.info('⏳ Waiting for page load...'); + await page.waitForLoadState('domcontentloaded', { timeout: IB_CONFIG.PAGE_LOAD_TIMEOUT }); + ctx.logger.info('✅ Page loaded'); + + //Products tabs + ctx.logger.info('🔍 Looking for Products tab...'); + const productsTab = page.locator('#productSearchTab[role=\"tab\"][href=\"#products\"]'); + await productsTab.waitFor({ timeout: IB_CONFIG.ELEMENT_TIMEOUT }); + ctx.logger.info('✅ Found Products tab'); + ctx.logger.info('🖱️ Clicking Products tab...'); + await productsTab.click(); + ctx.logger.info('✅ Products tab clicked'); + + // New Products Checkbox + ctx.logger.info('🔍 Looking for \"New Products Only\" radio button...'); + const radioButton = page.locator('span.checkbox-text:has-text(\"New Products Only\")'); + await radioButton.waitFor({ timeout: IB_CONFIG.ELEMENT_TIMEOUT }); + ctx.logger.info(`🎯 Found \"New Products Only\" radio button`); + await radioButton.first().click(); + ctx.logger.info('✅ \"New Products Only\" radio button clicked'); + + // Wait for and return headers immediately when captured + ctx.logger.info('⏳ Waiting for headers to be captured...'); + const headers = await headersPromise; + page.close(); + if (headers) { + ctx.logger.info('✅ Headers captured successfully'); + } else { + ctx.logger.warn('⚠️ No headers were captured'); + } + + return headers; + } catch (error) { + ctx.logger.error('Failed to fetch IB symbol summary', { error }); + return; + } +} \ No newline at end of file diff --git a/apps/data-service/src/handlers/ib/operations/symbols.operations.ts b/apps/data-service/src/handlers/ib/operations/symbols.operations.ts new file mode 100644 index 0000000..94653df --- /dev/null +++ b/apps/data-service/src/handlers/ib/operations/symbols.operations.ts @@ -0,0 +1,125 @@ +/** + * IB Symbols Operations - Fetching symbol data from IB API + */ +import { getMongoDBClient } from '@stock-bot/mongodb-client'; +import { OperationContext } from '@stock-bot/utils'; + +import { IB_CONFIG } from '../shared/config'; + +// Fetch symbols from IB using the session headers +export async function fetchSymbols(sessionHeaders: Record): Promise { + const ctx = OperationContext.create('ib', 'symbols'); + + try { + ctx.logger.info('🔍 Fetching symbols with session headers...'); + + // Prepare headers - include all session headers plus any additional ones + const requestHeaders = { + ...sessionHeaders, + Accept: 'application/json, text/plain, */*', + 'Accept-Language': 'en-US,en;q=0.9', + 'Cache-Control': 'no-cache', + Pragma: 'no-cache', + 'Sec-Fetch-Dest': 'empty', + 'Sec-Fetch-Mode': 'cors', + 'Sec-Fetch-Site': 'same-origin', + 'X-Requested-With': 'XMLHttpRequest', + }; + + const requestBody = { + domain: 'com', + newProduct: 'all', + pageNumber: 1, + pageSize: 100, + productCountry: IB_CONFIG.PRODUCT_COUNTRIES, + productSymbol: '', + productType: IB_CONFIG.PRODUCT_TYPES, + sortDirection: 'asc', + sortField: 'symbol', + }; + + // Get Summary + const summaryResponse = await fetch( + IB_CONFIG.BASE_URL + IB_CONFIG.SUMMARY_API, + { + method: 'POST', + headers: requestHeaders, + proxy: IB_CONFIG.DEFAULT_PROXY, + body: JSON.stringify(requestBody), + } + ); + + if (!summaryResponse.ok) { + ctx.logger.error('❌ Summary API request failed', { + status: summaryResponse.status, + statusText: summaryResponse.statusText, + }); + return null; + } + + const summaryData = await summaryResponse.json(); + ctx.logger.info('✅ IB Summary data fetched successfully', { + totalCount: summaryData[0].totalCount, + }); + + const symbols = []; + requestBody.pageSize = IB_CONFIG.PAGE_SIZE; + const pageCount = Math.ceil(summaryData[0].totalCount / IB_CONFIG.PAGE_SIZE) || 0; + ctx.logger.info('Fetching Symbols for IB', { pageCount }); + + const symbolPromises = []; + for (let page = 1; page <= pageCount; page++) { + requestBody.pageNumber = page; + + // Fetch symbols for the current page + const symbolsResponse = fetch( + IB_CONFIG.BASE_URL + IB_CONFIG.PRODUCTS_API, + { + method: 'POST', + headers: requestHeaders, + proxy: IB_CONFIG.DEFAULT_PROXY, + body: JSON.stringify(requestBody), + } + ); + symbolPromises.push(symbolsResponse); + } + + const responses = await Promise.all(symbolPromises); + for (const response of responses) { + if (!response.ok) { + ctx.logger.error('❌ Symbols API request failed', { + status: response.status, + statusText: response.statusText, + }); + return null; + } + const data = await response.json(); + const symJson = data?.products || []; + if (symJson && symJson.length > 0) { + symbols.push(...symJson); + } else { + ctx.logger.warn('⚠️ No symbols found in response'); + continue; + } + } + + if (symbols.length === 0) { + ctx.logger.warn('⚠️ No symbols fetched from IB'); + return null; + } + + ctx.logger.info('✅ IB symbols fetched successfully, saving to DB...', { + totalSymbols: symbols.length, + }); + const client = getMongoDBClient(); + await client.batchUpsert('ib_symbols', symbols, ['symbol', 'exchangeId']); + ctx.logger.info('Saved IB symbols to DB', { + totalSymbols: symbols.length, + }); + + return symbols; + } catch (error) { + ctx.logger.error('❌ Failed to fetch symbols', { error }); + return null; + } +} \ No newline at end of file diff --git a/apps/data-service/src/handlers/ib/shared/config.ts b/apps/data-service/src/handlers/ib/shared/config.ts new file mode 100644 index 0000000..1f09326 --- /dev/null +++ b/apps/data-service/src/handlers/ib/shared/config.ts @@ -0,0 +1,23 @@ +/** + * Interactive Brokers Configuration Constants + */ + +export const IB_CONFIG = { + BASE_URL: 'https://www.interactivebrokers.com', + PRODUCTS_PAGE: '/en/trading/products-exchanges.php#/', + EXCHANGE_API: '/webrest/exchanges', + SUMMARY_API: '/webrest/search/product-types/summary', + PRODUCTS_API: '/webrest/search/products-by-filters', + + // Browser configuration + BROWSER_TIMEOUT: 10000, + PAGE_LOAD_TIMEOUT: 20000, + ELEMENT_TIMEOUT: 5000, + HEADERS_TIMEOUT: 30000, + + // API configuration + DEFAULT_PROXY: 'http://doimvbnb-US-rotate:w5fpiwrb9895@p.webshare.io:80', + PAGE_SIZE: 500, + PRODUCT_COUNTRIES: ['CA', 'US'], + PRODUCT_TYPES: ['STK'], +}; \ No newline at end of file diff --git a/apps/data-service/src/handlers/proxy/operations/check.operations.ts b/apps/data-service/src/handlers/proxy/operations/check.operations.ts new file mode 100644 index 0000000..d49ead3 --- /dev/null +++ b/apps/data-service/src/handlers/proxy/operations/check.operations.ts @@ -0,0 +1,175 @@ +/** + * Proxy Check Operations - Checking proxy functionality + */ +import { HttpClient, ProxyInfo } from '@stock-bot/http'; +import { OperationContext } from '@stock-bot/utils'; + +import { PROXY_CONFIG } from '../shared/config'; +import { ProxyStatsManager } from '../shared/proxy-manager'; + +// Shared HTTP client +let httpClient: HttpClient; + +function getHttpClient(ctx: OperationContext): HttpClient { + if (!httpClient) { + httpClient = new HttpClient({ timeout: 10000 }, ctx.logger); + } + return httpClient; +} + +/** + * Check if a proxy is working + */ +export async function checkProxy(proxy: ProxyInfo): Promise { + const ctx = OperationContext.create('proxy', 'check'); + + let success = false; + ctx.logger.debug(`Checking Proxy:`, { + protocol: proxy.protocol, + host: proxy.host, + port: proxy.port, + }); + + try { + // Test the proxy + const client = getHttpClient(ctx); + const response = await client.get(PROXY_CONFIG.CHECK_URL, { + proxy, + timeout: PROXY_CONFIG.CHECK_TIMEOUT, + }); + + const isWorking = response.status >= 200 && response.status < 300; + const result: ProxyInfo = { + ...proxy, + isWorking, + lastChecked: new Date(), + responseTime: response.responseTime, + }; + + if (isWorking && !JSON.stringify(response.data).includes(PROXY_CONFIG.CHECK_IP)) { + success = true; + await updateProxyInCache(result, true, ctx); + } else { + await updateProxyInCache(result, false, ctx); + } + + if (proxy.source) { + updateProxyStats(proxy.source, success, ctx); + } + + ctx.logger.debug('Proxy check completed', { + host: proxy.host, + port: proxy.port, + isWorking, + }); + + return result; + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); + const result: ProxyInfo = { + ...proxy, + isWorking: false, + error: errorMessage, + lastChecked: new Date(), + }; + + // Update cache for failed proxy (increment total, don't update TTL) + await updateProxyInCache(result, false, ctx); + + if (proxy.source) { + updateProxyStats(proxy.source, success, ctx); + } + + ctx.logger.debug('Proxy check failed', { + host: proxy.host, + port: proxy.port, + error: errorMessage, + }); + + return result; + } +} + +/** + * Update proxy data in cache with working/total stats and average response time + */ +async function updateProxyInCache(proxy: ProxyInfo, isWorking: boolean, ctx: OperationContext): Promise { + const cacheKey = `${PROXY_CONFIG.CACHE_KEY}:${proxy.protocol}://${proxy.host}:${proxy.port}`; + + try { + const existing: ProxyInfo | null = await ctx.cache.get(cacheKey); + + // For failed proxies, only update if they already exist + if (!isWorking && !existing) { + ctx.logger.debug('Proxy not in cache, skipping failed update', { + proxy: `${proxy.host}:${proxy.port}`, + }); + return; + } + + // Calculate new average response time if we have a response time + let newAverageResponseTime = existing?.averageResponseTime; + if (proxy.responseTime !== undefined) { + const existingAvg = existing?.averageResponseTime || 0; + const existingTotal = existing?.total || 0; + + // Calculate weighted average: (existing_avg * existing_count + new_response) / (existing_count + 1) + newAverageResponseTime = + existingTotal > 0 + ? (existingAvg * existingTotal + proxy.responseTime) / (existingTotal + 1) + : proxy.responseTime; + } + + // Build updated proxy data + const updated = { + ...existing, + ...proxy, // Keep latest proxy info + total: (existing?.total || 0) + 1, + working: isWorking ? (existing?.working || 0) + 1 : existing?.working || 0, + isWorking, + lastChecked: new Date(), + // Add firstSeen only for new entries + ...(existing ? {} : { firstSeen: new Date() }), + // Update average response time if we calculated a new one + ...(newAverageResponseTime !== undefined + ? { averageResponseTime: newAverageResponseTime } + : {}), + }; + + // Calculate success rate + updated.successRate = updated.total > 0 ? (updated.working / updated.total) * 100 : 0; + + // Save to cache: reset TTL for working proxies, keep existing TTL for failed ones + const cacheOptions = isWorking ? { ttl: PROXY_CONFIG.CACHE_TTL } : undefined; + await ctx.cache.set(cacheKey, updated, cacheOptions); + + ctx.logger.debug(`Updated ${isWorking ? 'working' : 'failed'} proxy in cache`, { + proxy: `${proxy.host}:${proxy.port}`, + working: updated.working, + total: updated.total, + successRate: updated.successRate.toFixed(1) + '%', + avgResponseTime: updated.averageResponseTime + ? `${updated.averageResponseTime.toFixed(0)}ms` + : 'N/A', + }); + } catch (error) { + ctx.logger.error('Failed to update proxy in cache', { + proxy: `${proxy.host}:${proxy.port}`, + error: error instanceof Error ? error.message : String(error), + }); + } +} + +function updateProxyStats(sourceId: string, success: boolean, ctx: OperationContext) { + const statsManager = ProxyStatsManager.getInstance(); + const source = statsManager.updateSourceStats(sourceId, success); + + if (!source) { + ctx.logger.warn(`Unknown proxy source: ${sourceId}`); + return; + } + + // Cache the updated stats + ctx.cache.set(`${PROXY_CONFIG.CACHE_STATS_KEY}:${source.id}`, source, { ttl: PROXY_CONFIG.CACHE_TTL }) + .catch(error => ctx.logger.debug('Failed to cache proxy stats', { error })); +} \ No newline at end of file diff --git a/apps/data-service/src/handlers/proxy/operations/fetch.operations.ts b/apps/data-service/src/handlers/proxy/operations/fetch.operations.ts new file mode 100644 index 0000000..ed6910c --- /dev/null +++ b/apps/data-service/src/handlers/proxy/operations/fetch.operations.ts @@ -0,0 +1,112 @@ +/** + * Proxy Fetch Operations - Fetching proxies from sources + */ +import { HttpClient, ProxyInfo } from '@stock-bot/http'; +import { OperationContext } from '@stock-bot/utils'; + +import { PROXY_CONFIG } from '../shared/config'; +import { ProxyStatsManager } from '../shared/proxy-manager'; +import type { ProxySource } from '../shared/types'; + +// Shared HTTP client +let httpClient: HttpClient; + +function getHttpClient(ctx: OperationContext): HttpClient { + if (!httpClient) { + httpClient = new HttpClient({ timeout: 10000 }, ctx.logger); + } + return httpClient; +} + +export async function fetchProxiesFromSources(): Promise { + const ctx = OperationContext.create('proxy', 'fetch-sources'); + + const statsManager = ProxyStatsManager.getInstance(); + statsManager.resetStats(); + + const fetchPromises = PROXY_CONFIG.PROXY_SOURCES.map(source => fetchProxiesFromSource(source, ctx)); + const results = await Promise.all(fetchPromises); + let allProxies: ProxyInfo[] = results.flat(); + allProxies = removeDuplicateProxies(allProxies); + + ctx.logger.info('Fetched proxies from all sources', { total: allProxies.length }); + return allProxies; +} + +export async function fetchProxiesFromSource(source: ProxySource, ctx?: OperationContext): Promise { + if (!ctx) { + ctx = OperationContext.create('proxy', 'fetch-source'); + } + + const allProxies: ProxyInfo[] = []; + + try { + ctx.logger.info(`Fetching proxies from ${source.url}`); + + const client = getHttpClient(ctx); + const response = await client.get(source.url, { + timeout: 10000, + }); + + if (response.status !== 200) { + ctx.logger.warn(`Failed to fetch from ${source.url}: ${response.status}`); + return []; + } + + const text = response.data; + const lines = text.split('\n').filter((line: string) => line.trim()); + + for (const line of lines) { + let trimmed = line.trim(); + trimmed = cleanProxyUrl(trimmed); + if (!trimmed || trimmed.startsWith('#')) { + continue; + } + + // Parse formats like \"host:port\" or \"host:port:user:pass\" + const parts = trimmed.split(':'); + if (parts.length >= 2) { + const proxy: ProxyInfo = { + source: source.id, + protocol: source.protocol as 'http' | 'https' | 'socks4' | 'socks5', + host: parts[0], + port: parseInt(parts[1]), + }; + + if (!isNaN(proxy.port) && proxy.host) { + allProxies.push(proxy); + } + } + } + + ctx.logger.info(`Parsed ${allProxies.length} proxies from ${source.url}`); + } catch (error) { + ctx.logger.error(`Error fetching proxies from ${source.url}`, error); + return []; + } + + return allProxies; +} + +// Utility functions +function cleanProxyUrl(url: string): string { + return url + .replace(/^https?:\/\//, '') + .replace(/^0+/, '') + .replace(/:0+(\d)/g, ':$1'); +} + +function removeDuplicateProxies(proxies: ProxyInfo[]): ProxyInfo[] { + const seen = new Set(); + const unique: ProxyInfo[] = []; + + for (const proxy of proxies) { + const key = `${proxy.protocol}://${proxy.host}:${proxy.port}`; + if (!seen.has(key)) { + seen.add(key); + unique.push(proxy); + } + } + + return unique; +} \ No newline at end of file diff --git a/apps/data-service/src/handlers/proxy/operations/query.operations.ts b/apps/data-service/src/handlers/proxy/operations/query.operations.ts new file mode 100644 index 0000000..87165fd --- /dev/null +++ b/apps/data-service/src/handlers/proxy/operations/query.operations.ts @@ -0,0 +1,79 @@ +/** + * Proxy Query Operations - Getting active proxies from cache + */ +import { ProxyInfo } from '@stock-bot/http'; +import { OperationContext } from '@stock-bot/utils'; + +import { PROXY_CONFIG } from '../shared/config'; + +/** + * Get a random active proxy from the cache + * @param protocol - Optional protocol filter ('http' | 'https' | 'socks4' | 'socks5') + * @param minSuccessRate - Minimum success rate percentage (default: 50) + * @returns A random working proxy or null if none found + */ +export async function getRandomActiveProxy( + protocol?: 'http' | 'https' | 'socks4' | 'socks5', + minSuccessRate: number = 50 +): Promise { + const ctx = OperationContext.create('proxy', 'get-random'); + + try { + // Get all active proxy keys from cache + const pattern = protocol + ? `${PROXY_CONFIG.CACHE_KEY}:${protocol}://*` + : `${PROXY_CONFIG.CACHE_KEY}:*`; + + const keys = await ctx.cache.keys(pattern); + + if (keys.length === 0) { + ctx.logger.debug('No active proxies found in cache', { pattern }); + return null; + } + + // Shuffle the keys for randomness + const shuffledKeys = keys.sort(() => Math.random() - 0.5); + + // Find a working proxy that meets the criteria + for (const key of shuffledKeys) { + try { + const proxyData: ProxyInfo | null = await ctx.cache.get(key); + + if ( + proxyData && + proxyData.isWorking && + (!proxyData.successRate || proxyData.successRate >= minSuccessRate) + ) { + ctx.logger.debug('Random active proxy selected', { + proxy: `${proxyData.host}:${proxyData.port}`, + protocol: proxyData.protocol, + successRate: proxyData.successRate?.toFixed(1) + '%', + avgResponseTime: proxyData.averageResponseTime + ? `${proxyData.averageResponseTime.toFixed(0)}ms` + : 'N/A', + }); + + return proxyData; + } + } catch (error) { + ctx.logger.debug('Error reading proxy from cache', { key, error: (error as Error).message }); + continue; + } + } + + ctx.logger.debug('No working proxies found meeting criteria', { + protocol, + minSuccessRate, + keysChecked: shuffledKeys.length, + }); + + return null; + } catch (error) { + ctx.logger.error('Error getting random active proxy', { + error: error instanceof Error ? error.message : String(error), + protocol, + minSuccessRate, + }); + return null; + } +} \ No newline at end of file diff --git a/apps/data-service/src/handlers/proxy/operations/queue.operations.ts b/apps/data-service/src/handlers/proxy/operations/queue.operations.ts new file mode 100644 index 0000000..22b23a8 --- /dev/null +++ b/apps/data-service/src/handlers/proxy/operations/queue.operations.ts @@ -0,0 +1,40 @@ +/** + * Proxy Queue Operations - Queueing proxy operations + */ +import { ProxyInfo } from '@stock-bot/http'; +import { QueueManager } from '@stock-bot/queue'; +import { OperationContext } from '@stock-bot/utils'; + +export async function queueProxyFetch(): Promise { + const ctx = OperationContext.create('proxy', 'queue-fetch'); + + const queueManager = QueueManager.getInstance(); + const queue = queueManager.getQueue('proxy'); + const job = await queue.add('proxy-fetch', { + handler: 'proxy', + operation: 'fetch-and-check', + payload: {}, + priority: 5, + }); + + const jobId = job.id || 'unknown'; + ctx.logger.info('Proxy fetch job queued', { jobId }); + return jobId; +} + +export async function queueProxyCheck(proxies: ProxyInfo[]): Promise { + const ctx = OperationContext.create('proxy', 'queue-check'); + + const queueManager = QueueManager.getInstance(); + const queue = queueManager.getQueue('proxy'); + const job = await queue.add('proxy-check', { + handler: 'proxy', + operation: 'check-specific', + payload: { proxies }, + priority: 3, + }); + + const jobId = job.id || 'unknown'; + ctx.logger.info('Proxy check job queued', { jobId, count: proxies.length }); + return jobId; +} \ No newline at end of file diff --git a/apps/data-service/src/handlers/proxy/proxy.handler.ts b/apps/data-service/src/handlers/proxy/proxy.handler.ts index eb59569..7236f09 100644 --- a/apps/data-service/src/handlers/proxy/proxy.handler.ts +++ b/apps/data-service/src/handlers/proxy/proxy.handler.ts @@ -18,7 +18,7 @@ export function initializeProxyProvider() { 'fetch-from-sources': createJobHandler(async () => { // Fetch proxies from all configured sources handlerLogger.info('Processing fetch proxies from sources request'); - const { fetchProxiesFromSources } = await import('./proxy.operations'); + const { fetchProxiesFromSources } = await import('./operations/fetch.operations'); const { processItems } = await import('@stock-bot/queue'); // Fetch all proxies from sources @@ -65,7 +65,7 @@ export function initializeProxyProvider() { handlerLogger.debug('Processing proxy check request', { proxy: `${payload.host}:${payload.port}`, }); - const { checkProxy } = await import('./proxy.operations'); + const { checkProxy } = await import('./operations/check.operations'); return checkProxy(payload); }), }, diff --git a/apps/data-service/src/handlers/proxy/proxy.operations.ts b/apps/data-service/src/handlers/proxy/proxy.operations.ts deleted file mode 100644 index f468e34..0000000 --- a/apps/data-service/src/handlers/proxy/proxy.operations.ts +++ /dev/null @@ -1,578 +0,0 @@ -import { createCache, type CacheProvider } from '@stock-bot/cache'; -import { getDatabaseConfig } from '@stock-bot/config'; -import { HttpClient, ProxyInfo } from '@stock-bot/http'; -import { getLogger } from '@stock-bot/logger'; -import { QueueManager } from '@stock-bot/queue'; - -// Type definitions -export interface ProxySource { - id: string; - url: string; - protocol: string; - working?: number; // Optional, used for stats - total?: number; // Optional, used for stats - percentWorking?: number; // Optional, used for stats - lastChecked?: Date; // Optional, used for stats -} - -// Shared configuration and utilities -const PROXY_CONFIG = { - CACHE_KEY: 'active', - CACHE_STATS_KEY: 'stats', - CACHE_TTL: 86400, // 24 hours - CHECK_TIMEOUT: 7000, - CHECK_IP: '99.246.102.205', - CHECK_URL: 'https://proxy-detection.stare.gg/?api_key=bd406bf53ddc6abe1d9de5907830a955', - PROXY_SOURCES: [ - { - id: 'prxchk', - url: 'https://raw.githubusercontent.com/prxchk/proxy-list/main/http.txt', - protocol: 'http', - }, - { - id: 'casals', - url: 'https://raw.githubusercontent.com/casals-ar/proxy-list/main/http', - protocol: 'http', - }, - { - id: 'sunny9577', - url: 'https://raw.githubusercontent.com/sunny9577/proxy-scraper/master/proxies.txt', - protocol: 'http', - }, - { - id: 'themiralay', - url: 'https://raw.githubusercontent.com/themiralay/Proxy-List-World/refs/heads/master/data.txt', - protocol: 'http', - }, - { - id: 'casa-ls', - url: 'https://raw.githubusercontent.com/casa-ls/proxy-list/refs/heads/main/http', - protocol: 'http', - }, - { - id: 'databay', - url: 'https://raw.githubusercontent.com/databay-labs/free-proxy-list/refs/heads/master/http.txt', - protocol: 'http', - }, - { - id: 'speedx', - url: 'https://raw.githubusercontent.com/TheSpeedX/PROXY-List/master/http.txt', - protocol: 'http', - }, - { - id: 'monosans', - url: 'https://raw.githubusercontent.com/monosans/proxy-list/main/proxies/http.txt', - protocol: 'http', - }, - { - id: 'murong', - url: 'https://raw.githubusercontent.com/MuRongPIG/Proxy-Master/main/http.txt', - protocol: 'http', - }, - { - id: 'vakhov-fresh', - url: 'https://raw.githubusercontent.com/vakhov/fresh-proxy-list/master/http.txt', - protocol: 'http', - }, - { - id: 'kangproxy', - url: 'https://raw.githubusercontent.com/officialputuid/KangProxy/refs/heads/KangProxy/http/http.txt', - protocol: 'http', - }, - { - id: 'gfpcom', - url: 'https://raw.githubusercontent.com/gfpcom/free-proxy-list/refs/heads/main/list/http.txt', - protocol: 'http', - }, - { - id: 'dpangestuw', - url: 'https://raw.githubusercontent.com/dpangestuw/Free-Proxy/refs/heads/main/http_proxies.txt', - protocol: 'http', - }, - { - id: 'gitrecon', - url: 'https://raw.githubusercontent.com/gitrecon1455/fresh-proxy-list/refs/heads/main/proxylist.txt', - protocol: 'http', - }, - { - id: 'vakhov-master', - url: 'https://raw.githubusercontent.com/vakhov/fresh-proxy-list/refs/heads/master/http.txt', - protocol: 'http', - }, - { - id: 'breaking-tech', - url: 'https://raw.githubusercontent.com/BreakingTechFr/Proxy_Free/refs/heads/main/proxies/http.txt', - protocol: 'http', - }, - { - id: 'ercindedeoglu', - url: 'https://raw.githubusercontent.com/ErcinDedeoglu/proxies/main/proxies/http.txt', - protocol: 'http', - }, - { - id: 'tuanminpay', - url: 'https://raw.githubusercontent.com/TuanMinPay/live-proxy/master/http.txt', - protocol: 'http', - }, - - { - id: 'r00tee-https', - url: 'https://raw.githubusercontent.com/r00tee/Proxy-List/refs/heads/main/Https.txt', - protocol: 'https', - }, - { - id: 'ercindedeoglu-https', - url: 'https://raw.githubusercontent.com/ErcinDedeoglu/proxies/main/proxies/https.txt', - protocol: 'https', - }, - { - id: 'vakhov-fresh-https', - url: 'https://raw.githubusercontent.com/vakhov/fresh-proxy-list/refs/heads/master/https.txt', - protocol: 'https', - }, - { - id: 'databay-https', - url: 'https://raw.githubusercontent.com/databay-labs/free-proxy-list/refs/heads/master/https.txt', - protocol: 'https', - }, - { - id: 'kangproxy-https', - url: 'https://raw.githubusercontent.com/officialputuid/KangProxy/refs/heads/KangProxy/https/https.txt', - protocol: 'https', - }, - { - id: 'zloi-user-https', - url: 'https://raw.githubusercontent.com/zloi-user/hideip.me/refs/heads/master/https.txt', - protocol: 'https', - }, - { - id: 'gfpcom-https', - url: 'https://raw.githubusercontent.com/gfpcom/free-proxy-list/refs/heads/main/list/https.txt', - protocol: 'https', - }, - ], -}; - -// Shared instances (module-scoped, not global) -let isInitialized = false; // Track if resources are initialized -let logger: ReturnType; -let cache: CacheProvider; -let httpClient: HttpClient; -let proxyStats: ProxySource[] = PROXY_CONFIG.PROXY_SOURCES.map(source => ({ - id: source.id, - total: 0, - working: 0, - lastChecked: new Date(), - protocol: source.protocol, - url: source.url, -})); - -/** - * Initialize proxy resources (cache and shared dependencies) - * This should be called before any proxy operations - * @param waitForCache - Whether to wait for cache readiness (default: false for fallback mode) - */ -export async function initializeProxyResources(waitForCache = false): Promise { - // Skip if already initialized - if (isInitialized) { - return; - } - - logger = getLogger('proxy-tasks'); - const databaseConfig = getDatabaseConfig(); - cache = createCache({ - redisConfig: databaseConfig.dragonfly, - keyPrefix: 'proxy:', - ttl: PROXY_CONFIG.CACHE_TTL, - enableMetrics: true, - }); - - httpClient = new HttpClient({ timeout: 10000 }, logger); - - if (waitForCache) { - logger.info('Initializing proxy cache...'); - await cache.waitForReady(10000); - logger.info('Proxy cache initialized successfully'); - logger.info('Proxy tasks initialized'); - } else { - logger.info('Proxy tasks initialized (fallback mode)'); - } - isInitialized = true; -} - -// make a function that takes in source id and a boolean success and updates the proxyStats array -async function updateProxyStats(sourceId: string, success: boolean) { - const source = proxyStats.find(s => s.id === sourceId); - if (source !== undefined) { - if (typeof source.working !== 'number') { - source.working = 0; - } - if (typeof source.total !== 'number') { - source.total = 0; - } - source.total += 1; - if (success) { - source.working += 1; - } - source.percentWorking = (source.working / source.total) * 100; - source.lastChecked = new Date(); - await cache.set(`${PROXY_CONFIG.CACHE_STATS_KEY}:${source.id}`, source, PROXY_CONFIG.CACHE_TTL); - return source; - } else { - logger.warn(`Unknown proxy source: ${sourceId}`); - } -} - -// make a function that resets proxyStats -async function resetProxyStats(): Promise { - proxyStats = PROXY_CONFIG.PROXY_SOURCES.map(source => ({ - id: source.id, - total: 0, - working: 0, - lastChecked: new Date(), - protocol: source.protocol, - url: source.url, - })); - for (const source of proxyStats) { - await cache.set(`${PROXY_CONFIG.CACHE_STATS_KEY}:${source.id}`, source, PROXY_CONFIG.CACHE_TTL); - } - return Promise.resolve(); -} - -/** - * Update proxy data in cache with working/total stats and average response time - * @param proxy - The proxy to update - * @param isWorking - Whether the proxy is currently working - */ -async function updateProxyInCache(proxy: ProxyInfo, isWorking: boolean): Promise { - const cacheKey = `${PROXY_CONFIG.CACHE_KEY}:${proxy.protocol}://${proxy.host}:${proxy.port}`; - - try { - const existing: ProxyInfo | null = await cache.get(cacheKey); - - // For failed proxies, only update if they already exist - if (!isWorking && !existing) { - logger.debug('Proxy not in cache, skipping failed update', { - proxy: `${proxy.host}:${proxy.port}`, - }); - return; - } - - // Calculate new average response time if we have a response time - let newAverageResponseTime = existing?.averageResponseTime; - if (proxy.responseTime !== undefined) { - const existingAvg = existing?.averageResponseTime || 0; - const existingTotal = existing?.total || 0; - - // Calculate weighted average: (existing_avg * existing_count + new_response) / (existing_count + 1) - newAverageResponseTime = - existingTotal > 0 - ? (existingAvg * existingTotal + proxy.responseTime) / (existingTotal + 1) - : proxy.responseTime; - } - - // Build updated proxy data - const updated = { - ...existing, - ...proxy, // Keep latest proxy info - total: (existing?.total || 0) + 1, - working: isWorking ? (existing?.working || 0) + 1 : existing?.working || 0, - isWorking, - lastChecked: new Date(), - // Add firstSeen only for new entries - ...(existing ? {} : { firstSeen: new Date() }), - // Update average response time if we calculated a new one - ...(newAverageResponseTime !== undefined - ? { averageResponseTime: newAverageResponseTime } - : {}), - }; - - // Calculate success rate - updated.successRate = updated.total > 0 ? (updated.working / updated.total) * 100 : 0; - - // Save to cache: reset TTL for working proxies, keep existing TTL for failed ones - const cacheOptions = isWorking ? PROXY_CONFIG.CACHE_TTL : undefined; - await cache.set(cacheKey, updated, cacheOptions); - - logger.debug(`Updated ${isWorking ? 'working' : 'failed'} proxy in cache`, { - proxy: `${proxy.host}:${proxy.port}`, - working: updated.working, - total: updated.total, - successRate: updated.successRate.toFixed(1) + '%', - avgResponseTime: updated.averageResponseTime - ? `${updated.averageResponseTime.toFixed(0)}ms` - : 'N/A', - }); - } catch (error) { - logger.error('Failed to update proxy in cache', { - proxy: `${proxy.host}:${proxy.port}`, - error: error instanceof Error ? error.message : String(error), - }); - } -} - -// Individual task functions -export async function queueProxyFetch(): Promise { - const queueManager = QueueManager.getInstance(); - const queue = queueManager.getQueue('proxy'); - const job = await queue.add('proxy-fetch', { - handler: 'proxy', - operation: 'fetch-and-check', - payload: {}, - priority: 5, - }); - - const jobId = job.id || 'unknown'; - logger.info('Proxy fetch job queued', { jobId }); - return jobId; -} - -export async function queueProxyCheck(proxies: ProxyInfo[]): Promise { - const queueManager = QueueManager.getInstance(); - const queue = queueManager.getQueue('proxy'); - const job = await queue.add('proxy-check', { - handler: 'proxy', - operation: 'check-specific', - payload: { proxies }, - priority: 3, - }); - - const jobId = job.id || 'unknown'; - logger.info('Proxy check job queued', { jobId, count: proxies.length }); - return jobId; -} - -export async function fetchProxiesFromSources(): Promise { - await resetProxyStats(); - const fetchPromises = PROXY_CONFIG.PROXY_SOURCES.map(source => fetchProxiesFromSource(source)); - const results = await Promise.all(fetchPromises); - let allProxies: ProxyInfo[] = results.flat(); - allProxies = removeDuplicateProxies(allProxies); - return allProxies; -} - -export async function fetchProxiesFromSource(source: ProxySource): Promise { - const allProxies: ProxyInfo[] = []; - - try { - logger.info(`Fetching proxies from ${source.url}`); - - const response = await httpClient.get(source.url, { - timeout: 10000, - }); - - if (response.status !== 200) { - logger.warn(`Failed to fetch from ${source.url}: ${response.status}`); - return []; - } - - const text = response.data; - const lines = text.split('\n').filter((line: string) => line.trim()); - - for (const line of lines) { - let trimmed = line.trim(); - trimmed = cleanProxyUrl(trimmed); - if (!trimmed || trimmed.startsWith('#')) { - continue; - } - - // Parse formats like "host:port" or "host:port:user:pass" - const parts = trimmed.split(':'); - if (parts.length >= 2) { - const proxy: ProxyInfo = { - source: source.id, - protocol: source.protocol as 'http' | 'https' | 'socks4' | 'socks5', - host: parts[0], - port: parseInt(parts[1]), - }; - - if (!isNaN(proxy.port) && proxy.host) { - allProxies.push(proxy); - } - } - } - - logger.info(`Parsed ${allProxies.length} proxies from ${source.url}`); - } catch (error) { - logger.error(`Error fetching proxies from ${source.url}`, error); - return []; - } - - return allProxies; -} - -/** - * Check if a proxy is working - */ -export async function checkProxy(proxy: ProxyInfo): Promise { - let success = false; - logger.debug(`Checking Proxy:`, { - protocol: proxy.protocol, - host: proxy.host, - port: proxy.port, - }); - - try { - // Test the proxy - const response = await httpClient.get(PROXY_CONFIG.CHECK_URL, { - proxy, - timeout: PROXY_CONFIG.CHECK_TIMEOUT, - }); - - const isWorking = response.status >= 200 && response.status < 300; - const result: ProxyInfo = { - ...proxy, - isWorking, - lastChecked: new Date(), - responseTime: response.responseTime, - }; - - if (isWorking && !JSON.stringify(response.data).includes(PROXY_CONFIG.CHECK_IP)) { - success = true; - await updateProxyInCache(result, true); - } else { - await updateProxyInCache(result, false); - } - - if (proxy.source) { - await updateProxyStats(proxy.source, success); - } - - logger.debug('Proxy check completed', { - host: proxy.host, - port: proxy.port, - isWorking, - }); - - return result; - } catch (error) { - const errorMessage = error instanceof Error ? error.message : String(error); - const result: ProxyInfo = { - ...proxy, - isWorking: false, - error: errorMessage, - lastChecked: new Date(), - }; - - // Update cache for failed proxy (increment total, don't update TTL) - await updateProxyInCache(result, false); - - if (proxy.source) { - await updateProxyStats(proxy.source, success); - } - - logger.debug('Proxy check failed', { - host: proxy.host, - port: proxy.port, - error: errorMessage, - }); - - return result; - } -} - -/** - * Get a random active proxy from the cache - * @param protocol - Optional protocol filter ('http' | 'https' | 'socks4' | 'socks5') - * @param minSuccessRate - Minimum success rate percentage (default: 50) - * @returns A random working proxy or null if none found - */ -export async function getRandomActiveProxy( - protocol?: 'http' | 'https' | 'socks4' | 'socks5', - minSuccessRate: number = 50 -): Promise { - try { - // Get all active proxy keys from cache - const pattern = protocol - ? `${PROXY_CONFIG.CACHE_KEY}:${protocol}://*` - : `${PROXY_CONFIG.CACHE_KEY}:*`; - - const keys = await cache.keys(pattern); - - if (keys.length === 0) { - logger.debug('No active proxies found in cache', { pattern }); - return null; - } - - // Shuffle the keys for randomness - const shuffledKeys = keys.sort(() => Math.random() - 0.5); - - // Find a working proxy that meets the criteria - for (const key of shuffledKeys) { - try { - const proxyData: ProxyInfo | null = await cache.get(key); - - if ( - proxyData && - proxyData.isWorking && - (!proxyData.successRate || proxyData.successRate >= minSuccessRate) - ) { - logger.debug('Random active proxy selected', { - proxy: `${proxyData.host}:${proxyData.port}`, - protocol: proxyData.protocol, - successRate: proxyData.successRate?.toFixed(1) + '%', - avgResponseTime: proxyData.averageResponseTime - ? `${proxyData.averageResponseTime.toFixed(0)}ms` - : 'N/A', - }); - - return proxyData; - } - } catch (error) { - logger.debug('Error reading proxy from cache', { key, error: (error as Error).message }); - continue; - } - } - - logger.debug('No working proxies found meeting criteria', { - protocol, - minSuccessRate, - keysChecked: shuffledKeys.length, - }); - - return null; - } catch (error) { - logger.error('Error getting random active proxy', { - error: error instanceof Error ? error.message : String(error), - protocol, - minSuccessRate, - }); - return null; - } -} - -// Utility functions -function cleanProxyUrl(url: string): string { - return url - .replace(/^https?:\/\//, '') - .replace(/^0+/, '') - .replace(/:0+(\d)/g, ':$1'); -} - -function removeDuplicateProxies(proxies: ProxyInfo[]): ProxyInfo[] { - const seen = new Set(); - const unique: ProxyInfo[] = []; - - for (const proxy of proxies) { - const key = `${proxy.protocol}://${proxy.host}:${proxy.port}`; - if (!seen.has(key)) { - seen.add(key); - unique.push(proxy); - } - } - - return unique; -} - -// Optional: Export a convenience object that groups related tasks -export const proxyTasks = { - queueProxyFetch, - queueProxyCheck, - fetchProxiesFromSources, - fetchProxiesFromSource, - checkProxy, -}; - -// Export singleton instance for backward compatibility (optional) -// Remove this if you want to fully move to the task-based approach -export const proxyService = proxyTasks; diff --git a/apps/data-service/src/handlers/proxy/shared/config.ts b/apps/data-service/src/handlers/proxy/shared/config.ts new file mode 100644 index 0000000..260605b --- /dev/null +++ b/apps/data-service/src/handlers/proxy/shared/config.ts @@ -0,0 +1,140 @@ +/** + * Proxy Configuration Constants + */ + +export const PROXY_CONFIG = { + CACHE_KEY: 'active', + CACHE_STATS_KEY: 'stats', + CACHE_TTL: 86400, // 24 hours + CHECK_TIMEOUT: 7000, + CHECK_IP: '99.246.102.205', + CHECK_URL: 'https://proxy-detection.stare.gg/?api_key=bd406bf53ddc6abe1d9de5907830a955', + PROXY_SOURCES: [ + { + id: 'prxchk', + url: 'https://raw.githubusercontent.com/prxchk/proxy-list/main/http.txt', + protocol: 'http', + }, + { + id: 'casals', + url: 'https://raw.githubusercontent.com/casals-ar/proxy-list/main/http', + protocol: 'http', + }, + { + id: 'sunny9577', + url: 'https://raw.githubusercontent.com/sunny9577/proxy-scraper/master/proxies.txt', + protocol: 'http', + }, + { + id: 'themiralay', + url: 'https://raw.githubusercontent.com/themiralay/Proxy-List-World/refs/heads/master/data.txt', + protocol: 'http', + }, + { + id: 'casa-ls', + url: 'https://raw.githubusercontent.com/casa-ls/proxy-list/refs/heads/main/http', + protocol: 'http', + }, + { + id: 'databay', + url: 'https://raw.githubusercontent.com/databay-labs/free-proxy-list/refs/heads/master/http.txt', + protocol: 'http', + }, + { + id: 'speedx', + url: 'https://raw.githubusercontent.com/TheSpeedX/PROXY-List/master/http.txt', + protocol: 'http', + }, + { + id: 'monosans', + url: 'https://raw.githubusercontent.com/monosans/proxy-list/main/proxies/http.txt', + protocol: 'http', + }, + { + id: 'murong', + url: 'https://raw.githubusercontent.com/MuRongPIG/Proxy-Master/main/http.txt', + protocol: 'http', + }, + { + id: 'vakhov-fresh', + url: 'https://raw.githubusercontent.com/vakhov/fresh-proxy-list/master/http.txt', + protocol: 'http', + }, + { + id: 'kangproxy', + url: 'https://raw.githubusercontent.com/officialputuid/KangProxy/refs/heads/KangProxy/http/http.txt', + protocol: 'http', + }, + { + id: 'gfpcom', + url: 'https://raw.githubusercontent.com/gfpcom/free-proxy-list/refs/heads/main/list/http.txt', + protocol: 'http', + }, + { + id: 'dpangestuw', + url: 'https://raw.githubusercontent.com/dpangestuw/Free-Proxy/refs/heads/main/http_proxies.txt', + protocol: 'http', + }, + { + id: 'gitrecon', + url: 'https://raw.githubusercontent.com/gitrecon1455/fresh-proxy-list/refs/heads/main/proxylist.txt', + protocol: 'http', + }, + { + id: 'vakhov-master', + url: 'https://raw.githubusercontent.com/vakhov/fresh-proxy-list/refs/heads/master/http.txt', + protocol: 'http', + }, + { + id: 'breaking-tech', + url: 'https://raw.githubusercontent.com/BreakingTechFr/Proxy_Free/refs/heads/main/proxies/http.txt', + protocol: 'http', + }, + { + id: 'ercindedeoglu', + url: 'https://raw.githubusercontent.com/ErcinDedeoglu/proxies/main/proxies/http.txt', + protocol: 'http', + }, + { + id: 'tuanminpay', + url: 'https://raw.githubusercontent.com/TuanMinPay/live-proxy/master/http.txt', + protocol: 'http', + }, + + { + id: 'r00tee-https', + url: 'https://raw.githubusercontent.com/r00tee/Proxy-List/refs/heads/main/Https.txt', + protocol: 'https', + }, + { + id: 'ercindedeoglu-https', + url: 'https://raw.githubusercontent.com/ErcinDedeoglu/proxies/main/proxies/https.txt', + protocol: 'https', + }, + { + id: 'vakhov-fresh-https', + url: 'https://raw.githubusercontent.com/vakhov/fresh-proxy-list/refs/heads/master/https.txt', + protocol: 'https', + }, + { + id: 'databay-https', + url: 'https://raw.githubusercontent.com/databay-labs/free-proxy-list/refs/heads/master/https.txt', + protocol: 'https', + }, + { + id: 'kangproxy-https', + url: 'https://raw.githubusercontent.com/officialputuid/KangProxy/refs/heads/KangProxy/https/https.txt', + protocol: 'https', + }, + { + id: 'zloi-user-https', + url: 'https://raw.githubusercontent.com/zloi-user/hideip.me/refs/heads/master/https.txt', + protocol: 'https', + }, + { + id: 'gfpcom-https', + url: 'https://raw.githubusercontent.com/gfpcom/free-proxy-list/refs/heads/main/list/https.txt', + protocol: 'https', + }, + ], +}; \ No newline at end of file diff --git a/apps/data-service/src/handlers/proxy/shared/proxy-manager.ts b/apps/data-service/src/handlers/proxy/shared/proxy-manager.ts new file mode 100644 index 0000000..9712d80 --- /dev/null +++ b/apps/data-service/src/handlers/proxy/shared/proxy-manager.ts @@ -0,0 +1,56 @@ +/** + * Proxy Stats Manager - Singleton for managing proxy statistics + */ +import type { ProxySource } from './types'; +import { PROXY_CONFIG } from './config'; + +export class ProxyStatsManager { + private static instance: ProxyStatsManager | null = null; + private proxyStats: ProxySource[] = []; + + private constructor() { + this.resetStats(); + } + + static getInstance(): ProxyStatsManager { + if (!ProxyStatsManager.instance) { + ProxyStatsManager.instance = new ProxyStatsManager(); + } + return ProxyStatsManager.instance; + } + + resetStats(): void { + this.proxyStats = PROXY_CONFIG.PROXY_SOURCES.map(source => ({ + id: source.id, + total: 0, + working: 0, + lastChecked: new Date(), + protocol: source.protocol, + url: source.url, + })); + } + + getStats(): ProxySource[] { + return [...this.proxyStats]; + } + + updateSourceStats(sourceId: string, success: boolean): ProxySource | undefined { + const source = this.proxyStats.find(s => s.id === sourceId); + if (source) { + if (typeof source.working !== 'number') { + source.working = 0; + } + if (typeof source.total !== 'number') { + source.total = 0; + } + source.total += 1; + if (success) { + source.working += 1; + } + source.percentWorking = (source.working / source.total) * 100; + source.lastChecked = new Date(); + return source; + } + return undefined; + } +} \ No newline at end of file diff --git a/apps/data-service/src/handlers/proxy/shared/types.ts b/apps/data-service/src/handlers/proxy/shared/types.ts new file mode 100644 index 0000000..b28c618 --- /dev/null +++ b/apps/data-service/src/handlers/proxy/shared/types.ts @@ -0,0 +1,13 @@ +/** + * Proxy Shared Types + */ + +export interface ProxySource { + id: string; + url: string; + protocol: string; + working?: number; // Optional, used for stats + total?: number; // Optional, used for stats + percentWorking?: number; // Optional, used for stats + lastChecked?: Date; // Optional, used for stats +} \ No newline at end of file diff --git a/apps/data-service/src/handlers/webshare/webshare.operations.ts b/apps/data-service/src/handlers/webshare/operations/fetch.operations.ts similarity index 65% rename from apps/data-service/src/handlers/webshare/webshare.operations.ts rename to apps/data-service/src/handlers/webshare/operations/fetch.operations.ts index 3024856..bc43682 100644 --- a/apps/data-service/src/handlers/webshare/webshare.operations.ts +++ b/apps/data-service/src/handlers/webshare/operations/fetch.operations.ts @@ -1,15 +1,17 @@ /** - * WebShare Tasks - API integration + * WebShare Fetch Operations - API integration */ -import { getLogger } from '@stock-bot/logger'; import { type ProxyInfo } from '@stock-bot/http'; +import { OperationContext } from '@stock-bot/utils'; -const logger = getLogger('webshare-tasks'); +import { WEBSHARE_CONFIG } from '../shared/config'; /** * Fetch proxies from WebShare API and convert to ProxyInfo format */ export async function fetchWebShareProxies(): Promise { + const ctx = OperationContext.create('webshare', 'fetch-proxies'); + try { // Get configuration from config system const { getConfig } = await import('@stock-bot/config'); @@ -19,25 +21,26 @@ export async function fetchWebShareProxies(): Promise { const apiUrl = config.webshare?.apiUrl; if (!apiKey || !apiUrl) { - logger.error('Missing WebShare configuration', { + ctx.logger.error('Missing WebShare configuration', { hasApiKey: !!apiKey, hasApiUrl: !!apiUrl, }); return []; } - logger.info('Fetching proxies from WebShare API', { apiUrl }); + ctx.logger.info('Fetching proxies from WebShare API', { apiUrl }); - const response = await fetch(`${apiUrl}proxy/list/?mode=direct&page=1&page_size=100`, { + const response = await fetch(`${apiUrl}proxy/list/?mode=${WEBSHARE_CONFIG.DEFAULT_MODE}&page=${WEBSHARE_CONFIG.DEFAULT_PAGE}&page_size=${WEBSHARE_CONFIG.DEFAULT_PAGE_SIZE}`, { method: 'GET', headers: { Authorization: `Token ${apiKey}`, 'Content-Type': 'application/json', }, + signal: AbortSignal.timeout(WEBSHARE_CONFIG.TIMEOUT), }); if (!response.ok) { - logger.error('WebShare API request failed', { + ctx.logger.error('WebShare API request failed', { status: response.status, statusText: response.statusText, }); @@ -47,7 +50,7 @@ export async function fetchWebShareProxies(): Promise { const data = await response.json(); if (!data.results || !Array.isArray(data.results)) { - logger.error('Invalid response format from WebShare API', { data }); + ctx.logger.error('Invalid response format from WebShare API', { data }); return []; } @@ -69,14 +72,14 @@ export async function fetchWebShareProxies(): Promise { lastChecked: new Date(), })); - logger.info('Successfully fetched proxies from WebShare', { + ctx.logger.info('Successfully fetched proxies from WebShare', { count: proxies.length, total: data.count || proxies.length, }); return proxies; } catch (error) { - logger.error('Failed to fetch proxies from WebShare', { error }); + ctx.logger.error('Failed to fetch proxies from WebShare', { error }); return []; } } \ No newline at end of file diff --git a/apps/data-service/src/handlers/webshare/shared/config.ts b/apps/data-service/src/handlers/webshare/shared/config.ts new file mode 100644 index 0000000..f34aa79 --- /dev/null +++ b/apps/data-service/src/handlers/webshare/shared/config.ts @@ -0,0 +1,10 @@ +/** + * WebShare Configuration Constants + */ + +export const WEBSHARE_CONFIG = { + DEFAULT_PAGE_SIZE: 100, + DEFAULT_MODE: 'direct', + DEFAULT_PAGE: 1, + TIMEOUT: 10000, +}; \ No newline at end of file diff --git a/apps/data-service/src/handlers/webshare/webshare.handler.ts b/apps/data-service/src/handlers/webshare/webshare.handler.ts index edd6dbe..fc6d650 100644 --- a/apps/data-service/src/handlers/webshare/webshare.handler.ts +++ b/apps/data-service/src/handlers/webshare/webshare.handler.ts @@ -21,7 +21,7 @@ export function initializeWebShareProvider() { operations: { 'fetch-proxies': createJobHandler(async () => { logger.info('Fetching proxies from WebShare API'); - const { fetchWebShareProxies } = await import('./webshare.operations'); + const { fetchWebShareProxies } = await import('./operations/fetch.operations'); try { const proxies = await fetchWebShareProxies();