diff --git a/apps/data-service/src/providers/proxy.tasks.ts b/apps/data-service/src/providers/proxy.tasks.ts index 4974d54..9b0b537 100644 --- a/apps/data-service/src/providers/proxy.tasks.ts +++ b/apps/data-service/src/providers/proxy.tasks.ts @@ -10,6 +10,7 @@ export interface ProxySource { protocol: string; working?: number; // Optional, used for stats total?: number; // Optional, used for stats + percentWorking?: number; // Optional, used for stats lastChecked?: Date; // Optional, used for stats } @@ -57,17 +58,31 @@ let logger: ReturnType; let cache: CacheProvider; let httpClient: HttpClient; let concurrencyLimit: ReturnType; -let proxyStats: ProxySource[] = [] +let proxyStats: ProxySource[] = PROXY_CONFIG.PROXY_SOURCES.map(source => ({ + id: source.id, + total: 0, + working: 0, + lastChecked: new Date(), + protocol: source.protocol, + url: source.url, + })); // make a function that takes in source id and a boolean success and updates the proxyStats array -function updateProxyStats(sourceId: string, success: boolean) { +async function updateProxyStats(sourceId: string, success: boolean) { const source = proxyStats.find(s => s.id === sourceId); - if (source !== undefined && source !== null && source.working && source.total) { + if (source !== undefined) { + if(typeof source.working !== 'number') + source.working = 0; + if(typeof source.total !== 'number') + source.total = 0; source.total += 1; if (success) { source.working += 1; } + source.percentWorking = source.working / source.total * 100; + source.lastChecked = new Date(); + await cache.set(`${PROXY_CONFIG.CACHE_STATS_KEY}:${source.id}`, source, PROXY_CONFIG.CACHE_TTL); return source; } else { logger.warn(`Unknown proxy source: ${sourceId}`); @@ -84,18 +99,24 @@ async function resetProxyStats(): Promise { protocol: source.protocol, url: source.url, })); - // for (const source of proxyStats) { - // await cache.set(`${PROXY_CONFIG.CACHE_STATS_KEY}:${source.id}`, source, PROXY_CONFIG.CACHE_TTL); - // } + for (const source of proxyStats) { + await cache.set(`${PROXY_CONFIG.CACHE_STATS_KEY}:${source.id}`, source, PROXY_CONFIG.CACHE_TTL); + } return Promise.resolve(); } // Initialize shared resources -function initializeSharedResources() { +async function initializeSharedResources() { if (!logger) { logger = getLogger('proxy-tasks'); - cache = createCache('hybrid'); + cache = createCache('hybrid', { + name: 'proxy-tasks', + keyPrefix: 'proxy:', + ttl: PROXY_CONFIG.CACHE_TTL, + enableMetrics: true + }); + await cache.waitForReady(); httpClient = new HttpClient({ timeout: 10000 }, logger); concurrencyLimit = pLimit(PROXY_CONFIG.CONCURRENCY_LIMIT); logger.info('Proxy tasks initialized'); @@ -104,7 +125,7 @@ function initializeSharedResources() { // Individual task functions export async function queueProxyFetch(): Promise { - initializeSharedResources(); + await initializeSharedResources(); const { queueManager } = await import('../services/queue.service'); const job = await queueManager.addJob({ @@ -122,7 +143,7 @@ export async function queueProxyFetch(): Promise { } export async function queueProxyCheck(proxies: ProxyInfo[]): Promise { - initializeSharedResources(); + await initializeSharedResources(); const { queueManager } = await import('../services/queue.service'); const job = await queueManager.addJob({ @@ -140,7 +161,7 @@ export async function queueProxyCheck(proxies: ProxyInfo[]): Promise { } export async function fetchProxiesFromSources(): Promise { - initializeSharedResources(); + await initializeSharedResources(); await resetProxyStats(); const sources = PROXY_CONFIG.PROXY_SOURCES.map(source => @@ -154,7 +175,7 @@ export async function fetchProxiesFromSources(): Promise { } export async function fetchProxiesFromSource(source: ProxySource): Promise { - initializeSharedResources(); + await initializeSharedResources(); const allProxies: ProxyInfo[] = []; @@ -208,7 +229,7 @@ export async function fetchProxiesFromSource(source: ProxySource): Promise { - initializeSharedResources(); + await initializeSharedResources(); let success = false; logger.debug(`Checking Proxy:`, { @@ -233,16 +254,16 @@ export async function checkProxy(proxy: ProxyInfo): Promise { responseTime: response.responseTime, }; - // if (isWorking && !JSON.stringify(response.data).includes(PROXY_CONFIG.CHECK_IP)) { - // success = true; - // await cache.set(`${PROXY_CONFIG.CACHE_KEY}:${proxy.protocol}://${proxy.host}:${proxy.port}`, result, PROXY_CONFIG.CACHE_TTL); - // } else { - // await cache.del(`${PROXY_CONFIG.CACHE_KEY}:${proxy.protocol}://${proxy.host}:${proxy.port}`); - // } + if (isWorking && !JSON.stringify(response.data).includes(PROXY_CONFIG.CHECK_IP)) { + success = true; + await cache.set(`${PROXY_CONFIG.CACHE_KEY}:${proxy.protocol}://${proxy.host}:${proxy.port}`, result, PROXY_CONFIG.CACHE_TTL); + } else { + await cache.del(`${PROXY_CONFIG.CACHE_KEY}:${proxy.protocol}://${proxy.host}:${proxy.port}`); + } - // if( proxy.source ){ - // updateProxyStats(proxy.source, success); - // } + if( proxy.source ){ + await updateProxyStats(proxy.source, success); + } logger.debug('Proxy check completed', { host: proxy.host, @@ -266,9 +287,9 @@ export async function checkProxy(proxy: ProxyInfo): Promise { // if (!success) { // await cache.set(`${PROXY_CONFIG.CACHE_KEY}:${proxy.protocol}://${proxy.host}:${proxy.port}`, result); // } - // if( proxy.source ){ - // updateProxyStats(proxy.source, success); - // } + if( proxy.source ){ + await updateProxyStats(proxy.source, success); + } logger.debug('Proxy check failed', { host: proxy.host, diff --git a/apps/data-service/src/utils/batch-processor.ts b/apps/data-service/src/utils/batch-processor.ts index 5f3222f..acc2394 100644 --- a/apps/data-service/src/utils/batch-processor.ts +++ b/apps/data-service/src/utils/batch-processor.ts @@ -23,7 +23,6 @@ export class BatchProcessor { private cacheProvider: CacheProvider; private isReady = false; private keyPrefix: string = 'batch:'; // Default key prefix for batch payloads - constructor( private queueManager: any, private cacheOptions?: { keyPrefix?: string; ttl?: number } // Optional cache configuration @@ -31,6 +30,7 @@ export class BatchProcessor { this.keyPrefix = cacheOptions?.keyPrefix || 'batch:'; // Initialize cache provider with batch-specific settings this.cacheProvider = createCache('redis', { + name: 'batch-processor', keyPrefix: this.keyPrefix, ttl: cacheOptions?.ttl || 86400 * 2, // 48 hours default enableMetrics: true diff --git a/libs/cache/src/providers/redis-cache.ts b/libs/cache/src/providers/redis-cache.ts index 2ffd1fd..0dc8a22 100644 --- a/libs/cache/src/providers/redis-cache.ts +++ b/libs/cache/src/providers/redis-cache.ts @@ -22,13 +22,15 @@ export class RedisCache implements CacheProvider { hitRate: 0, total: 0, uptime: 0 - }; - - constructor(options: CacheOptions = {}) { + }; constructor(options: CacheOptions = {}) { this.defaultTTL = options.ttl ?? 3600; // 1 hour default this.keyPrefix = options.keyPrefix ?? 'cache:'; this.enableMetrics = options.enableMetrics ?? true; + // Generate a connection name for monitoring + const baseName = options.name || this.keyPrefix.replace(':', '-').replace(/[^a-zA-Z0-9-]/g, ''); + const connectionName = `${baseName}-${Date.now()}`; + const redisConfig = { host: dragonflyConfig.DRAGONFLY_HOST, port: dragonflyConfig.DRAGONFLY_PORT, @@ -40,6 +42,7 @@ export class RedisCache implements CacheProvider { connectTimeout: dragonflyConfig.DRAGONFLY_CONNECT_TIMEOUT, commandTimeout: dragonflyConfig.DRAGONFLY_COMMAND_TIMEOUT, keepAlive: dragonflyConfig.DRAGONFLY_ENABLE_KEEPALIVE ? dragonflyConfig.DRAGONFLY_KEEPALIVE_INTERVAL * 1000 : 0, + connectionName, // Add connection name for monitoring ...(dragonflyConfig.DRAGONFLY_TLS && { tls: { cert: dragonflyConfig.DRAGONFLY_TLS_CERT_FILE || undefined, diff --git a/libs/cache/src/types.ts b/libs/cache/src/types.ts index 4b2364f..b58299f 100644 --- a/libs/cache/src/types.ts +++ b/libs/cache/src/types.ts @@ -26,6 +26,7 @@ export interface CacheOptions { enableMetrics?: boolean; maxMemoryItems?: number; memoryTTL?: number; + name?: string; // Add name for connection identification } export interface CacheStats { diff --git a/libs/http/src/client.ts b/libs/http/src/client.ts index f88013e..4960a9b 100644 --- a/libs/http/src/client.ts +++ b/libs/http/src/client.ts @@ -64,7 +64,7 @@ export class HttpClient { return response; } catch (error) { - if( this.logger?.getServiceName() === 'proxy-service' ) { + if( this.logger?.getServiceName() === 'proxy-tasks' ) { this.logger?.debug('HTTP request failed', { method: finalConfig.method, url: finalConfig.url,