added initial jobs for proxies, will prob have to be optimized
This commit is contained in:
parent
39f6c42044
commit
1ccdbddb71
2 changed files with 141 additions and 34 deletions
|
|
@ -1,4 +1,9 @@
|
||||||
|
import { ProxyInfo } from 'libs/http/src/types';
|
||||||
import { ProviderConfig } from '../services/provider-registry.service';
|
import { ProviderConfig } from '../services/provider-registry.service';
|
||||||
|
import { Logger } from '@stock-bot/logger';
|
||||||
|
|
||||||
|
// Create logger for this provider
|
||||||
|
const logger = new Logger('proxy-provider');
|
||||||
|
|
||||||
// This will run at the same time each day as when the app started
|
// This will run at the same time each day as when the app started
|
||||||
const getEvery24HourCron = (): string => {
|
const getEvery24HourCron = (): string => {
|
||||||
|
|
@ -14,12 +19,140 @@ export const proxyProvider: ProviderConfig = {
|
||||||
operations: {
|
operations: {
|
||||||
'fetch-and-check': async (payload: { sources?: string[] }) => {
|
'fetch-and-check': async (payload: { sources?: string[] }) => {
|
||||||
const { proxyService } = await import('./proxy.tasks');
|
const { proxyService } = await import('./proxy.tasks');
|
||||||
return await proxyService.fetchProxiesFromSources();
|
const proxies = await proxyService.fetchProxiesFromSources();
|
||||||
|
const proxiesCount = proxies.length;
|
||||||
|
// Get the actual proxies to create individual jobs
|
||||||
|
if (proxiesCount > 0) {
|
||||||
|
try {
|
||||||
|
const { queueManager } = await import('../services/queue.service');
|
||||||
|
if (proxies && proxies.length > 0) {
|
||||||
|
// Calculate delay distribution over 24 hours
|
||||||
|
const totalDelayMs = 24 * 60 * 60 * 1000; // 24 hours in milliseconds
|
||||||
|
const delayPerProxy = Math.floor(totalDelayMs / proxies.length);
|
||||||
|
|
||||||
|
logger.info('Creating individual proxy validation jobs', {
|
||||||
|
proxyCount: proxies.length,
|
||||||
|
distributionPeriod: '24 hours',
|
||||||
|
delayPerProxy: `${(delayPerProxy / 1000 / 60).toFixed(2)} minutes`
|
||||||
|
});
|
||||||
|
|
||||||
|
let queuedCount = 0;
|
||||||
|
|
||||||
|
for (let i = 0; i < proxies.length; i++) {
|
||||||
|
const proxy = proxies[i];
|
||||||
|
const delay = i * delayPerProxy;
|
||||||
|
|
||||||
|
try {
|
||||||
|
await queueManager.addJob({
|
||||||
|
type: 'proxy-validation',
|
||||||
|
service: 'proxy',
|
||||||
|
provider: 'proxy-service',
|
||||||
|
operation: 'check-proxy',
|
||||||
|
payload: {
|
||||||
|
proxy: proxy,
|
||||||
|
source: 'fetch-and-check',
|
||||||
|
autoTriggered: true,
|
||||||
|
batchIndex: i,
|
||||||
|
totalBatch: proxies.length
|
||||||
|
},
|
||||||
|
priority: 3
|
||||||
|
}, {
|
||||||
|
delay: delay
|
||||||
|
});
|
||||||
|
|
||||||
|
queuedCount++;
|
||||||
|
// Log progress every 100 jobs
|
||||||
|
if ((i + 1) % 100 === 0 || i === proxies.length - 1) {
|
||||||
|
logger.info('Proxy validation jobs queued progress', {
|
||||||
|
queued: i + 1,
|
||||||
|
total: proxies.length,
|
||||||
|
percentage: `${((i + 1) / proxies.length * 100).toFixed(1)}%`
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('Failed to queue proxy validation job', {
|
||||||
|
proxy: `${proxy.host}:${proxy.port}`,
|
||||||
|
batchIndex: i,
|
||||||
|
error: error instanceof Error ? error.message : String(error)
|
||||||
|
});
|
||||||
|
} }
|
||||||
|
|
||||||
|
logger.info('Proxy validation jobs queuing completed', {
|
||||||
|
total: proxies.length,
|
||||||
|
successful: queuedCount,
|
||||||
|
failed: proxies.length - queuedCount,
|
||||||
|
totalDelay: `${(totalDelayMs / 1000 / 60 / 60).toFixed(1)} hours`,
|
||||||
|
avgDelayPerJob: `${(delayPerProxy / 1000 / 60).toFixed(2)} minutes`
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
proxiesFetched: proxiesCount,
|
||||||
|
jobsQueued: queuedCount,
|
||||||
|
totalDelay: `${(totalDelayMs / 1000 / 60 / 60).toFixed(1)} hours`,
|
||||||
|
avgDelayPerJob: `${(delayPerProxy / 1000 / 60).toFixed(2)} minutes`
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
logger.warn('No proxies found to create validation jobs', {
|
||||||
|
proxiesFetched: proxiesCount
|
||||||
|
});
|
||||||
|
return {
|
||||||
|
proxiesFetched: proxiesCount,
|
||||||
|
jobsQueued: 0,
|
||||||
|
message: 'No cached proxies found'
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('Failed to create individual proxy validation jobs', {
|
||||||
|
proxiesCount,
|
||||||
|
error: error instanceof Error ? error.message : String(error)
|
||||||
|
});
|
||||||
|
return {
|
||||||
|
proxiesFetched: proxiesCount,
|
||||||
|
jobsQueued: 0,
|
||||||
|
error: error instanceof Error ? error.message : String(error)
|
||||||
|
}; }
|
||||||
|
} else { logger.info('No proxies fetched, skipping job creation');
|
||||||
|
return {
|
||||||
|
proxiesFetched: 0,
|
||||||
|
jobsQueued: 0,
|
||||||
|
message: 'No proxies fetched'
|
||||||
|
};
|
||||||
|
}
|
||||||
},
|
},
|
||||||
|
'check-proxy': async (payload: {
|
||||||
'check-specific': async (payload: { proxies: any[] }) => {
|
proxy: ProxyInfo,
|
||||||
const { proxyService } = await import('./proxy.tasks');
|
source?: string,
|
||||||
return await proxyService.checkProxies(payload.proxies);
|
batchIndex?: number,
|
||||||
|
totalBatch?: number
|
||||||
|
}) => {
|
||||||
|
const { checkProxy } = await import('./proxy.tasks');
|
||||||
|
|
||||||
|
logger.debug('Checking individual proxy', {
|
||||||
|
proxy: `${payload.proxy.host}:${payload.proxy.port}`,
|
||||||
|
batchIndex: payload.batchIndex,
|
||||||
|
totalBatch: payload.totalBatch,
|
||||||
|
source: payload.source
|
||||||
|
});
|
||||||
|
|
||||||
|
const result = await checkProxy(payload.proxy);
|
||||||
|
|
||||||
|
logger.debug('Proxy check completed', {
|
||||||
|
proxy: `${payload.proxy.host}:${payload.proxy.port}`,
|
||||||
|
isWorking: result.isWorking,
|
||||||
|
responseTime: result.responseTime,
|
||||||
|
batchIndex: payload.batchIndex
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
result: result,
|
||||||
|
batchInfo: {
|
||||||
|
index: payload.batchIndex,
|
||||||
|
total: payload.totalBatch,
|
||||||
|
source: payload.source
|
||||||
|
}
|
||||||
|
};
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -114,7 +114,7 @@ export async function queueProxyCheck(proxies: ProxyInfo[]): Promise<string> {
|
||||||
return jobId;
|
return jobId;
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function fetchProxiesFromSources(): Promise<number> {
|
export async function fetchProxiesFromSources(): Promise<ProxyInfo[]> {
|
||||||
initializeSharedResources();
|
initializeSharedResources();
|
||||||
|
|
||||||
const sources = PROXY_CONFIG.PROXY_SOURCES.map(source =>
|
const sources = PROXY_CONFIG.PROXY_SOURCES.map(source =>
|
||||||
|
|
@ -124,7 +124,7 @@ export async function fetchProxiesFromSources(): Promise<number> {
|
||||||
let allProxies: ProxyInfo[] = result.flat();
|
let allProxies: ProxyInfo[] = result.flat();
|
||||||
allProxies = removeDuplicateProxies(allProxies);
|
allProxies = removeDuplicateProxies(allProxies);
|
||||||
// await checkProxies(allProxies);
|
// await checkProxies(allProxies);
|
||||||
return allProxies.length;
|
return allProxies;
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function fetchProxiesFromSource(source: { url: string; protocol: string }): Promise<ProxyInfo[]> {
|
export async function fetchProxiesFromSource(source: { url: string; protocol: string }): Promise<ProxyInfo[]> {
|
||||||
|
|
@ -177,29 +177,6 @@ export async function fetchProxiesFromSource(source: { url: string; protocol: st
|
||||||
return allProxies;
|
return allProxies;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Check multiple proxies concurrently
|
|
||||||
*/
|
|
||||||
export async function checkProxies(proxies: ProxyInfo[]): Promise<ProxyInfo[]> {
|
|
||||||
initializeSharedResources();
|
|
||||||
|
|
||||||
logger.info('Checking proxies', { count: proxies.length });
|
|
||||||
|
|
||||||
const checkPromises = proxies.map(proxy =>
|
|
||||||
concurrencyLimit(() => checkProxy(proxy))
|
|
||||||
);
|
|
||||||
const results = await Promise.all(checkPromises);
|
|
||||||
const workingCount = results.filter((r: ProxyInfo) => r.isWorking).length;
|
|
||||||
|
|
||||||
logger.info('Proxy check completed', {
|
|
||||||
total: proxies.length,
|
|
||||||
working: workingCount,
|
|
||||||
failed: proxies.length - workingCount
|
|
||||||
});
|
|
||||||
|
|
||||||
return results;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if a proxy is working
|
* Check if a proxy is working
|
||||||
*/
|
*/
|
||||||
|
|
@ -207,7 +184,7 @@ export async function checkProxy(proxy: ProxyInfo): Promise<ProxyInfo> {
|
||||||
initializeSharedResources();
|
initializeSharedResources();
|
||||||
|
|
||||||
let success = false;
|
let success = false;
|
||||||
logger.debug(`Checking Proxy with ${concurrencyLimit.pendingCount} pending:`, {
|
logger.debug(`Checking Proxy:`, {
|
||||||
protocol: proxy.protocol,
|
protocol: proxy.protocol,
|
||||||
host: proxy.host,
|
host: proxy.host,
|
||||||
port: proxy.port,
|
port: proxy.port,
|
||||||
|
|
@ -269,8 +246,6 @@ export async function checkProxy(proxy: ProxyInfo): Promise<ProxyInfo> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// Utility functions
|
// Utility functions
|
||||||
function cleanProxyUrl(url: string): string {
|
function cleanProxyUrl(url: string): string {
|
||||||
return url
|
return url
|
||||||
|
|
@ -301,7 +276,6 @@ export const proxyTasks = {
|
||||||
fetchProxiesFromSources,
|
fetchProxiesFromSources,
|
||||||
fetchProxiesFromSource,
|
fetchProxiesFromSource,
|
||||||
checkProxy,
|
checkProxy,
|
||||||
checkProxies,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// Export singleton instance for backward compatibility (optional)
|
// Export singleton instance for backward compatibility (optional)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue