From cdc2f44e86934fc3d236ba713e58c594ea89d7d5 Mon Sep 17 00:00:00 2001 From: Boki Date: Sun, 22 Jun 2025 21:06:33 -0400 Subject: [PATCH] fixed proxy handler --- apps/data-ingestion/src/handlers/index.ts | 1 + .../proxy/operations/fetch.operations.ts | 4 - .../src/handlers/proxy/proxy.handler.ts | 165 +++++++++--------- 3 files changed, 79 insertions(+), 91 deletions(-) diff --git a/apps/data-ingestion/src/handlers/index.ts b/apps/data-ingestion/src/handlers/index.ts index 7fe1c89..024e928 100644 --- a/apps/data-ingestion/src/handlers/index.ts +++ b/apps/data-ingestion/src/handlers/index.ts @@ -9,6 +9,7 @@ import { getLogger } from '@stock-bot/logger'; // Import handlers for bundling (ensures they're included in the build) import './ceo/ceo.handler'; import './ib/ib.handler'; +import './proxy/proxy.handler'; import './qm/qm.handler'; import './webshare/webshare.handler'; diff --git a/apps/data-ingestion/src/handlers/proxy/operations/fetch.operations.ts b/apps/data-ingestion/src/handlers/proxy/operations/fetch.operations.ts index f92bf5d..335cc9d 100644 --- a/apps/data-ingestion/src/handlers/proxy/operations/fetch.operations.ts +++ b/apps/data-ingestion/src/handlers/proxy/operations/fetch.operations.ts @@ -8,10 +8,6 @@ import { fetch } from '@stock-bot/utils'; import { PROXY_CONFIG } from '../shared/config'; import type { ProxySource } from '../shared/types'; - httpClient = new HttpClient({ timeout: 10000 }, ctx.logger); - } - return httpClient; -} export async function fetchProxiesFromSources(): Promise { const ctx = { diff --git a/apps/data-ingestion/src/handlers/proxy/proxy.handler.ts b/apps/data-ingestion/src/handlers/proxy/proxy.handler.ts index 6261728..b64f1d6 100644 --- a/apps/data-ingestion/src/handlers/proxy/proxy.handler.ts +++ b/apps/data-ingestion/src/handlers/proxy/proxy.handler.ts @@ -1,95 +1,86 @@ -/** - * Proxy Provider for new queue system - */ -import type { ServiceContainer } from '@stock-bot/di'; -import { getLogger } from '@stock-bot/logger'; -import type { ProxyInfo } from '@stock-bot/proxy'; import { - createJobHandler, - handlerRegistry, - type HandlerConfigWithSchedule, -} from '@stock-bot/queue'; + BaseHandler, + Handler, + Operation, + ScheduledOperation, + type IServiceContainer, +} from '@stock-bot/handlers'; +import type { ProxyInfo } from '@stock-bot/proxy'; +import { processItems } from '@stock-bot/queue'; +import { fetchProxiesFromSources } from './operations/fetch.operations'; +import { checkProxy } from './operations/check.operations'; -const handlerLogger = getLogger('proxy-handler'); +@Handler('proxy') +export class ProxyHandler extends BaseHandler { + constructor(services: IServiceContainer) { + super(services); + } -// Initialize and register the Proxy provider -export function initializeProxyProvider(_container: ServiceContainer) { - handlerLogger.debug('Registering proxy provider with scheduled jobs...'); + @Operation('fetch-from-sources') + @ScheduledOperation('proxy-fetch-and-check', '0 0 * * 0', { + priority: 0, + description: 'Fetch and validate proxy list from sources', + // immediately: true, // Don't run immediately during startup to avoid conflicts + }) + async fetchFromSources(): Promise<{ + processed: number; + jobsCreated: number; + batchesCreated?: number; + mode: string; + }> { + // Fetch proxies from all configured sources + this.logger.info('Processing fetch proxies from sources request'); + + const proxies = await fetchProxiesFromSources(); + this.logger.info('Fetched proxies from sources', { count: proxies.length }); - const proxyProviderConfig: HandlerConfigWithSchedule = { - name: 'proxy', + if (proxies.length === 0) { + this.logger.warn('No proxies fetched from sources'); + return { processed: 0, jobsCreated: 0, mode: 'direct' }; + } - operations: { - 'fetch-from-sources': createJobHandler(async () => { - // Fetch proxies from all configured sources - handlerLogger.info('Processing fetch proxies from sources request'); - const { fetchProxiesFromSources } = await import('./operations/fetch.operations'); - const { processItems } = await import('@stock-bot/queue'); + // Get QueueManager from service container + const queueManager = this.queue; + if (!queueManager) { + throw new Error('Queue manager not available'); + } + + // Batch process the proxies through check-proxy operation + const batchResult = await processItems(proxies, 'proxy', { + handler: 'proxy', + operation: 'check-proxy', + totalDelayHours: 0.083, // 5 minutes (5/60 hours) + batchSize: 50, // Process 50 proxies per batch + priority: 3, + useBatching: true, + retries: 1, + ttl: 30000, // 30 second timeout per proxy check + removeOnComplete: 5, + removeOnFail: 3, + }, queueManager); - // Fetch all proxies from sources - const proxies = await fetchProxiesFromSources(); - handlerLogger.info('Fetched proxies from sources', { count: proxies.length }); + this.logger.info('Batch proxy validation completed', { + totalProxies: proxies.length, + jobsCreated: batchResult.jobsCreated, + mode: batchResult.mode, + batchesCreated: batchResult.batchesCreated, + duration: `${batchResult.duration}ms`, + }); - if (proxies.length === 0) { - handlerLogger.warn('No proxies fetched from sources'); - return { processed: 0, successful: 0 }; - } + return { + processed: proxies.length, + jobsCreated: batchResult.jobsCreated, + batchesCreated: batchResult.batchesCreated, + mode: batchResult.mode, + }; + } - // Get QueueManager instance - we have to use getInstance for now until handlers get container access - const { QueueManager } = await import('@stock-bot/queue'); - const queueManager = QueueManager.getInstance(); - - // Batch process the proxies through check-proxy operation - const batchResult = await processItems(proxies, 'proxy', { - handler: 'proxy', - operation: 'check-proxy', - totalDelayHours: 0.083, // 5 minutes (5/60 hours) - batchSize: 50, // Process 50 proxies per batch - priority: 3, - useBatching: true, - retries: 1, - ttl: 30000, // 30 second timeout per proxy check - removeOnComplete: 5, - removeOnFail: 3, - }, queueManager); - - handlerLogger.info('Batch proxy validation completed', { - totalProxies: proxies.length, - jobsCreated: batchResult.jobsCreated, - mode: batchResult.mode, - batchesCreated: batchResult.batchesCreated, - duration: `${batchResult.duration}ms`, - }); - - return { - processed: proxies.length, - jobsCreated: batchResult.jobsCreated, - batchesCreated: batchResult.batchesCreated, - mode: batchResult.mode, - }; - }), - - 'check-proxy': createJobHandler(async (payload: ProxyInfo) => { - // payload is now the raw proxy info object - handlerLogger.debug('Processing proxy check request', { - proxy: `${payload.host}:${payload.port}`, - }); - const { checkProxy } = await import('./operations/check.operations'); - return checkProxy(payload); - }), - }, - scheduledJobs: [ - { - type: 'proxy-fetch-and-check', - operation: 'fetch-from-sources', - cronPattern: '0 0 * * 0', // Every week at midnight on Sunday - priority: 0, - description: 'Fetch and validate proxy list from sources', - // immediately: true, // Don't run immediately during startup to avoid conflicts - }, - ], - }; - - handlerRegistry.registerWithSchedule(proxyProviderConfig); - handlerLogger.debug('Proxy provider registered successfully with scheduled jobs'); -} + @Operation('check-proxy') + async checkProxyOperation(payload: ProxyInfo): Promise { + // payload is now the raw proxy info object + this.logger.debug('Processing proxy check request', { + proxy: `${payload.host}:${payload.port}`, + }); + return checkProxy(payload); + } +} \ No newline at end of file