added batching mess
This commit is contained in:
parent
fe96cf6679
commit
22992cd393
4 changed files with 603 additions and 95 deletions
|
|
@ -19,112 +19,205 @@ export const proxyProvider: ProviderConfig = {
|
|||
operations: {
|
||||
'fetch-and-check': async (payload: { sources?: string[] }) => {
|
||||
const { proxyService } = await import('./proxy.tasks');
|
||||
const { queueManager } = await import('../services/queue.service');
|
||||
|
||||
await queueManager.drainQueue();
|
||||
|
||||
const proxies = await proxyService.fetchProxiesFromSources();
|
||||
const proxiesCount = proxies.length;
|
||||
// Get the actual proxies to create individual jobs
|
||||
if (proxiesCount > 0) {
|
||||
try {
|
||||
const { queueManager } = await import('../services/queue.service');
|
||||
if (proxies && proxies.length > 0) {
|
||||
// Calculate delay distribution over 24 hours
|
||||
const totalDelayMs = 24 * 60 * 60 * 1000; // 24 hours in milliseconds
|
||||
const delayPerProxy = Math.floor(totalDelayMs / proxies.length);
|
||||
|
||||
if (proxiesCount === 0) {
|
||||
logger.info('No proxies fetched, skipping job creation');
|
||||
return { proxiesFetched: 0, batchJobsCreated: 0 };
|
||||
}
|
||||
|
||||
try {
|
||||
// Optimized batch size for 800k proxies
|
||||
const batchSize = 200; // Process 200 proxies per batch job
|
||||
const totalBatches = Math.ceil(proxies.length / batchSize);
|
||||
const totalDelayMs = 24 * 60 * 60 * 1000; // 24 hours
|
||||
const delayPerBatch = Math.floor(totalDelayMs / totalBatches);
|
||||
|
||||
logger.info('Creating proxy validation batch jobs', {
|
||||
totalProxies: proxies.length,
|
||||
batchSize,
|
||||
totalBatches,
|
||||
delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes`,
|
||||
estimatedCompletion: '24 hours'
|
||||
});
|
||||
|
||||
// Process batches in chunks to avoid memory issues
|
||||
const batchCreationChunkSize = 50; // Create 50 batch jobs at a time
|
||||
let batchJobsCreated = 0;
|
||||
|
||||
for (let chunkStart = 0; chunkStart < totalBatches; chunkStart += batchCreationChunkSize) {
|
||||
const chunkEnd = Math.min(chunkStart + batchCreationChunkSize, totalBatches);
|
||||
|
||||
// Create batch jobs in parallel for this chunk
|
||||
const batchPromises = [];
|
||||
|
||||
for (let i = chunkStart; i < chunkEnd; i++) {
|
||||
const startIndex = i * batchSize;
|
||||
const endIndex = Math.min(startIndex + batchSize, proxies.length);
|
||||
const batchProxies = proxies.slice(startIndex, endIndex);
|
||||
const delay = i * delayPerBatch;
|
||||
|
||||
logger.info('Creating individual proxy validation jobs', {
|
||||
proxyCount: proxies.length,
|
||||
distributionPeriod: '24 hours',
|
||||
delayPerProxy: `${(delayPerProxy / 1000 / 60).toFixed(2)} minutes`
|
||||
const batchPromise = queueManager.addJob({
|
||||
type: 'proxy-batch-validation',
|
||||
service: 'proxy',
|
||||
provider: 'proxy-service',
|
||||
operation: 'process-proxy-batch',
|
||||
payload: {
|
||||
proxies: batchProxies,
|
||||
batchIndex: i,
|
||||
totalBatches,
|
||||
source: 'fetch-and-check'
|
||||
},
|
||||
priority: 3
|
||||
}, {
|
||||
delay: delay,
|
||||
jobId: `proxy-batch-${i}-${Date.now()}`
|
||||
});
|
||||
|
||||
let queuedCount = 0;
|
||||
|
||||
for (let i = 0; i < proxies.length; i++) {
|
||||
const proxy = proxies[i];
|
||||
const delay = i * delayPerProxy;
|
||||
|
||||
try {
|
||||
await queueManager.addJob({
|
||||
type: 'proxy-validation',
|
||||
service: 'proxy',
|
||||
provider: 'proxy-service',
|
||||
operation: 'check-proxy',
|
||||
payload: {
|
||||
proxy: proxy,
|
||||
source: 'fetch-and-check',
|
||||
autoTriggered: true,
|
||||
batchIndex: i,
|
||||
totalBatch: proxies.length
|
||||
},
|
||||
priority: 3
|
||||
}, {
|
||||
delay: delay
|
||||
});
|
||||
|
||||
queuedCount++;
|
||||
// Log progress every 100 jobs
|
||||
if ((i + 1) % 100 === 0 || i === proxies.length - 1) {
|
||||
logger.info('Proxy validation jobs queued progress', {
|
||||
queued: i + 1,
|
||||
total: proxies.length,
|
||||
percentage: `${((i + 1) / proxies.length * 100).toFixed(1)}%`
|
||||
});
|
||||
}
|
||||
|
||||
} catch (error) {
|
||||
logger.error('Failed to queue proxy validation job', {
|
||||
proxy: `${proxy.host}:${proxy.port}`,
|
||||
batchIndex: i,
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
} }
|
||||
|
||||
logger.info('Proxy validation jobs queuing completed', {
|
||||
total: proxies.length,
|
||||
successful: queuedCount,
|
||||
failed: proxies.length - queuedCount,
|
||||
totalDelay: `${(totalDelayMs / 1000 / 60 / 60).toFixed(1)} hours`,
|
||||
avgDelayPerJob: `${(delayPerProxy / 1000 / 60).toFixed(2)} minutes`
|
||||
});
|
||||
|
||||
return {
|
||||
proxiesFetched: proxiesCount,
|
||||
jobsQueued: queuedCount,
|
||||
totalDelay: `${(totalDelayMs / 1000 / 60 / 60).toFixed(1)} hours`,
|
||||
avgDelayPerJob: `${(delayPerProxy / 1000 / 60).toFixed(2)} minutes`
|
||||
};
|
||||
} else {
|
||||
logger.warn('No proxies found to create validation jobs', {
|
||||
proxiesFetched: proxiesCount
|
||||
});
|
||||
return {
|
||||
proxiesFetched: proxiesCount,
|
||||
jobsQueued: 0,
|
||||
message: 'No cached proxies found'
|
||||
};
|
||||
batchPromises.push(batchPromise);
|
||||
}
|
||||
|
||||
} catch (error) {
|
||||
logger.error('Failed to create individual proxy validation jobs', {
|
||||
proxiesCount,
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
// Wait for this chunk to complete
|
||||
const results = await Promise.allSettled(batchPromises);
|
||||
const successful = results.filter(r => r.status === 'fulfilled').length;
|
||||
const failed = results.filter(r => r.status === 'rejected').length;
|
||||
|
||||
batchJobsCreated += successful;
|
||||
|
||||
logger.info('Batch chunk created', {
|
||||
chunkStart: chunkStart + 1,
|
||||
chunkEnd,
|
||||
totalChunks: Math.ceil(totalBatches / batchCreationChunkSize),
|
||||
successful,
|
||||
failed,
|
||||
totalCreated: batchJobsCreated,
|
||||
progress: `${((chunkEnd / totalBatches) * 100).toFixed(1)}%`
|
||||
});
|
||||
return {
|
||||
proxiesFetched: proxiesCount,
|
||||
jobsQueued: 0,
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
}; }
|
||||
} else { logger.info('No proxies fetched, skipping job creation');
|
||||
|
||||
// Small delay between chunks to prevent overwhelming Redis
|
||||
if (chunkEnd < totalBatches) {
|
||||
await new Promise(resolve => setTimeout(resolve, 100));
|
||||
}
|
||||
}
|
||||
|
||||
logger.info('All batch jobs creation completed', {
|
||||
totalProxies: proxies.length,
|
||||
batchJobsCreated,
|
||||
totalBatches,
|
||||
avgProxiesPerBatch: Math.floor(proxies.length / totalBatches),
|
||||
estimatedDuration: '24 hours'
|
||||
});
|
||||
|
||||
return {
|
||||
proxiesFetched: 0,
|
||||
jobsQueued: 0,
|
||||
message: 'No proxies fetched'
|
||||
proxiesFetched: proxiesCount,
|
||||
batchJobsCreated,
|
||||
totalBatches,
|
||||
avgProxiesPerBatch: Math.floor(proxies.length / totalBatches)
|
||||
};
|
||||
|
||||
} catch (error) {
|
||||
logger.error('Failed to create batch jobs', {
|
||||
proxiesCount,
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
throw error;
|
||||
}
|
||||
},
|
||||
|
||||
'process-proxy-batch': async (payload: {
|
||||
proxies: ProxyInfo[],
|
||||
batchIndex: number,
|
||||
totalBatches: number,
|
||||
source: string
|
||||
}) => {
|
||||
const { queueManager } = await import('../services/queue.service');
|
||||
|
||||
logger.info('Processing proxy batch', {
|
||||
batchIndex: payload.batchIndex,
|
||||
batchSize: payload.proxies.length,
|
||||
totalBatches: payload.totalBatches,
|
||||
progress: `${((payload.batchIndex + 1) / payload.totalBatches * 100).toFixed(2)}%`
|
||||
});
|
||||
|
||||
const batchDelayMs = 15 * 60 * 1000; // 15 minutes per batch
|
||||
const delayPerProxy = Math.floor(batchDelayMs / payload.proxies.length);
|
||||
|
||||
logger.info('Batch timing calculated', {
|
||||
batchIndex: payload.batchIndex,
|
||||
proxiesInBatch: payload.proxies.length,
|
||||
batchDurationMinutes: 30,
|
||||
delayPerProxySeconds: Math.floor(delayPerProxy / 1000),
|
||||
delayPerProxyMs: delayPerProxy
|
||||
});
|
||||
|
||||
// Use BullMQ's addBulk for better performance
|
||||
const jobsToCreate = payload.proxies.map((proxy, i) => ({
|
||||
name: 'proxy-validation',
|
||||
data: {
|
||||
type: 'proxy-validation',
|
||||
service: 'proxy',
|
||||
provider: 'proxy-service',
|
||||
operation: 'check-proxy',
|
||||
payload: {
|
||||
proxy: proxy,
|
||||
source: payload.source,
|
||||
batchIndex: payload.batchIndex,
|
||||
proxyIndexInBatch: i,
|
||||
totalBatch: payload.totalBatches
|
||||
},
|
||||
priority: 2
|
||||
},
|
||||
opts: {
|
||||
delay: i * delayPerProxy,
|
||||
jobId: `proxy-${proxy.host}-${proxy.port}-batch${payload.batchIndex}-${Date.now()}-${i}`,
|
||||
removeOnComplete: 3,
|
||||
removeOnFail: 5
|
||||
}
|
||||
}));
|
||||
|
||||
try {
|
||||
const jobs = await queueManager.addBulk(jobsToCreate);
|
||||
|
||||
logger.info('Batch processing completed successfully', {
|
||||
batchIndex: payload.batchIndex,
|
||||
totalProxies: payload.proxies.length,
|
||||
jobsCreated: jobs.length,
|
||||
batchDelay: '15 minutes',
|
||||
progress: `${((payload.batchIndex + 1) / payload.totalBatches * 100).toFixed(2)}%`
|
||||
});
|
||||
|
||||
return {
|
||||
batchIndex: payload.batchIndex,
|
||||
totalProxies: payload.proxies.length,
|
||||
jobsCreated: jobs.length,
|
||||
jobsFailed: 0
|
||||
};
|
||||
} catch (error) {
|
||||
logger.error('Failed to create validation jobs for batch', {
|
||||
batchIndex: payload.batchIndex,
|
||||
batchSize: payload.proxies.length,
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
|
||||
return {
|
||||
batchIndex: payload.batchIndex,
|
||||
totalProxies: payload.proxies.length,
|
||||
jobsCreated: 0,
|
||||
jobsFailed: payload.proxies.length
|
||||
};
|
||||
}
|
||||
},
|
||||
'check-proxy': async (payload: {
|
||||
|
||||
'check-proxy': async (payload: {
|
||||
proxy: ProxyInfo,
|
||||
source?: string,
|
||||
batchIndex?: number,
|
||||
proxyIndexInBatch?: number,
|
||||
totalBatch?: number
|
||||
}) => {
|
||||
const { checkProxy } = await import('./proxy.tasks');
|
||||
|
|
@ -132,7 +225,7 @@ export const proxyProvider: ProviderConfig = {
|
|||
logger.debug('Checking individual proxy', {
|
||||
proxy: `${payload.proxy.host}:${payload.proxy.port}`,
|
||||
batchIndex: payload.batchIndex,
|
||||
totalBatch: payload.totalBatch,
|
||||
proxyIndex: payload.proxyIndexInBatch,
|
||||
source: payload.source
|
||||
});
|
||||
|
||||
|
|
@ -148,12 +241,13 @@ export const proxyProvider: ProviderConfig = {
|
|||
return {
|
||||
result: result,
|
||||
batchInfo: {
|
||||
index: payload.batchIndex,
|
||||
batchIndex: payload.batchIndex,
|
||||
proxyIndex: payload.proxyIndexInBatch,
|
||||
total: payload.totalBatch,
|
||||
source: payload.source
|
||||
}
|
||||
};
|
||||
},
|
||||
}
|
||||
},
|
||||
|
||||
scheduledJobs: [
|
||||
|
|
|
|||
|
|
@ -47,7 +47,7 @@ export class QueueService {
|
|||
};
|
||||
|
||||
// Worker configuration
|
||||
const workerCount = parseInt(process.env.WORKER_COUNT || '4');
|
||||
const workerCount = parseInt(process.env.WORKER_COUNT || '5');
|
||||
const concurrencyPerWorker = parseInt(process.env.WORKER_CONCURRENCY || '20');
|
||||
|
||||
this.logger.info('Connecting to Redis/Dragonfly', connection);
|
||||
|
|
@ -180,6 +180,10 @@ export class QueueService {
|
|||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async addBulk(jobs: any[]) : Promise<any[]> {
|
||||
return await this.queue.addBulk(jobs)
|
||||
}
|
||||
private setupEventListeners() {
|
||||
this.queueEvents.on('completed', (job) => {
|
||||
this.logger.info('Job completed', { id: job.jobId });
|
||||
|
|
@ -396,6 +400,13 @@ export class QueueService {
|
|||
delayed: delayed.length
|
||||
};
|
||||
}
|
||||
|
||||
async drainQueue() {
|
||||
if (!this.isInitialized) {
|
||||
await this.queue.drain()
|
||||
}
|
||||
}
|
||||
|
||||
async getQueueStatus() {
|
||||
if (!this.isInitialized) {
|
||||
throw new Error('Queue service not initialized. Call initialize() first.');
|
||||
|
|
@ -412,12 +423,14 @@ export class QueueService {
|
|||
}
|
||||
};
|
||||
}
|
||||
|
||||
getWorkerCount() {
|
||||
if (!this.isInitialized) {
|
||||
return 0;
|
||||
}
|
||||
return this.workers.length;
|
||||
}
|
||||
|
||||
getRegisteredProviders() {
|
||||
return providerRegistry.getProviders().map(({ key, config }) => ({
|
||||
key,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue