stock-bot/apps/data-service/src/providers/proxy.provider.ts
2025-06-09 18:12:41 -04:00

141 lines
4.5 KiB
TypeScript

import { ProxyInfo } from 'libs/http/src/types';
import { ProviderConfig } from '../services/provider-registry.service';
import { getLogger } from '@stock-bot/logger';
import { BatchProcessor } from '../utils/batch-processor';
// Create logger for this provider
const logger = getLogger('proxy-provider');
// This will run at the same time each day as when the app started
const getEvery24HourCron = (): string => {
const now = new Date();
const hours = now.getHours();
const minutes = now.getMinutes();
return `${minutes} ${hours} * * *`; // Every day at startup time
};
export const proxyProvider: ProviderConfig = {
name: 'proxy-service',
service: 'proxy',
operations: {
'fetch-and-check': async (payload: { sources?: string[] }) => {
const { proxyService } = await import('./proxy.tasks');
const { queueManager } = await import('../services/queue.service');
await queueManager.drainQueue();
const proxies = await proxyService.fetchProxiesFromSources();
if (proxies.length === 0) {
return { proxiesFetched: 0, jobsCreated: 0 };
}
const batchProcessor = new BatchProcessor(queueManager);
// Simplified configuration
const result = await batchProcessor.processItems({
items: proxies,
batchSize: parseInt(process.env.PROXY_BATCH_SIZE || '200'),
totalDelayMs: parseInt(process.env.PROXY_VALIDATION_HOURS || '4') * 60 * 60 * 1000 ,
jobNamePrefix: 'proxy',
operation: 'check-proxy',
service: 'proxy',
provider: 'proxy-service',
priority: 2,
useBatching: process.env.PROXY_DIRECT_MODE !== 'true', // Simple boolean flag
createJobData: (proxy: ProxyInfo) => ({
proxy,
source: 'fetch-and-check'
}),
removeOnComplete: 5,
removeOnFail: 3
});
return {
proxiesFetched: result.totalItems,
...result
};
},
'process-proxy-batch': async (payload: any) => {
// Process a batch of proxies - uses the fetch-and-check JobNamePrefix process-(proxy)-batch
const { queueManager } = await import('../services/queue.service');
const batchProcessor = new BatchProcessor(queueManager);
return await batchProcessor.processBatch(
payload,
(proxy: ProxyInfo) => ({
proxy,
source: payload.config?.source || 'batch-processing'
})
);
},
'check-proxy': async (payload: {
proxy: ProxyInfo,
source?: string,
batchIndex?: number,
itemIndex?: number,
total?: number
}) => {
const { checkProxy } = await import('./proxy.tasks');
try {
const result = await checkProxy(payload.proxy);
logger.debug('Proxy validated', {
proxy: `${payload.proxy.host}:${payload.proxy.port}`,
isWorking: result.isWorking,
responseTime: result.responseTime,
batchIndex: payload.batchIndex
});
return {
result,
proxy: payload.proxy,
// Only include batch info if it exists (for batch mode)
...(payload.batchIndex !== undefined && {
batchInfo: {
batchIndex: payload.batchIndex,
itemIndex: payload.itemIndex,
total: payload.total,
source: payload.source
}
})
};
} catch (error) {
logger.warn('Proxy validation failed', {
proxy: `${payload.proxy.host}:${payload.proxy.port}`,
error: error instanceof Error ? error.message : String(error),
batchIndex: payload.batchIndex
});
return {
result: { isWorking: false, error: String(error) },
proxy: payload.proxy,
// Only include batch info if it exists (for batch mode)
...(payload.batchIndex !== undefined && {
batchInfo: {
batchIndex: payload.batchIndex,
itemIndex: payload.itemIndex,
total: payload.total,
source: payload.source
}
})
};
}
}
},
scheduledJobs: [
{
type: 'proxy-maintenance',
operation: 'fetch-and-check',
payload: {},
// should remove and just run at the same time so app restarts dont keeping adding same jobs
cronPattern: getEvery24HourCron(),
priority: 5,
immediately: true,
description: 'Fetch and validate proxy list from sources'
}
]
};