finished proxy-provider - need to test

This commit is contained in:
Bojan Kucera 2025-06-09 08:07:45 -04:00
parent f78fa459d2
commit b2817656b3
2 changed files with 228 additions and 312 deletions

View file

@ -17,96 +17,50 @@ const getEvery24HourCron = (): string => {
export const proxyProvider: ProviderConfig = {
name: 'proxy-service',
service: 'proxy',
operations: { 'fetch-and-check': async (payload: { sources?: string[] }) => {
operations: {
'fetch-and-check': async (payload: { sources?: string[] }) => {
const { proxyService } = await import('./proxy.tasks');
const { queueManager } = await import('../services/queue.service');
await queueManager.drainQueue();
const proxies = await proxyService.fetchProxiesFromSources();
const proxiesCount = proxies.length;
if (proxiesCount === 0) {
logger.info('No proxies fetched, skipping job creation');
if (proxies.length === 0) {
return { proxiesFetched: 0, jobsCreated: 0 };
}
const batchProcessor = new BatchProcessor(queueManager);
// Environment-configurable settings
const targetHours = parseInt(process.env.PROXY_VALIDATION_HOURS || '8');
const batchSize = parseInt(process.env.PROXY_BATCH_SIZE || '200');
const useDirectMode = process.env.PROXY_DIRECT_MODE === 'true';
logger.info('Proxy validation configuration', {
targetHours,
batchSize,
useDirectMode,
totalProxies: proxies.length
// Simplified configuration
const result = await batchProcessor.processItems({
items: proxies,
batchSize: parseInt(process.env.PROXY_BATCH_SIZE || '200'),
totalDelayMs: parseInt(process.env.PROXY_VALIDATION_HOURS || '8') * 60 * 60 * 1000,
jobNamePrefix: 'proxy',
operation: 'check-proxy',
service: 'proxy',
provider: 'proxy-service',
priority: 2,
useBatching: process.env.PROXY_DIRECT_MODE !== 'true', // Simple boolean flag
createJobData: (proxy: ProxyInfo) => ({
proxy,
source: 'fetch-and-check'
}),
removeOnComplete: 5,
removeOnFail: 3
});
if (useDirectMode) {
// Direct approach - simpler, creates all jobs immediately
const result = await batchProcessor.createDirectJobs({
items: proxies,
batchSize: 0, // Not used in direct mode
totalDelayMs: targetHours * 60 * 60 * 1000,
jobNamePrefix: 'proxy',
operation: 'check-proxy',
service: 'proxy',
provider: 'proxy-service',
priority: 2,
createJobData: (proxy: ProxyInfo) => ({
proxy,
source: 'fetch-and-check'
}),
removeOnComplete: 5,
removeOnFail: 3
});
return {
proxiesFetched: result.totalItems,
jobsCreated: result.jobsCreated,
mode: 'direct'
};
} else { // Batch approach - creates batch jobs that create individual jobs
const result = await batchProcessor.createBatchJobs({
items: proxies,
batchSize,
totalDelayMs: targetHours * 60 * 60 * 1000,
jobNamePrefix: 'proxy',
operation: 'check-proxy',
service: 'proxy',
provider: 'proxy-service',
priority: 3,
createJobData: (proxy: ProxyInfo) => ({
proxy,
source: 'fetch-and-check'
}),
removeOnComplete: 3,
removeOnFail: 5
});
return {
proxiesFetched: result.totalItems,
batchJobsCreated: result.batchJobsCreated,
totalBatches: result.totalBatches,
estimatedDurationHours: result.estimatedDurationHours,
mode: 'batch'
};
}
return {
proxiesFetched: result.totalItems,
...result
};
},
'process-proxy-batch': async (payload: any) => {
'process-proxy-batch': async (payload: any) => {
// Process a batch of proxies - uses the fetch-and-check JobNamePrefix process-(proxy)-batch
const { queueManager } = await import('../services/queue.service');
const batchProcessor = new BatchProcessor(queueManager);
return await batchProcessor.processBatch(
payload,
(proxy: ProxyInfo, index: number) => ({
proxy,
source: payload.config?.source || 'batch-processing'
})
);
return await batchProcessor.processBatch(payload);
},
'check-proxy': async (payload: {

View file

@ -2,31 +2,17 @@ import { getLogger } from '@stock-bot/logger';
export interface BatchConfig<T> {
items: T[];
batchSize: number;
batchSize?: number; // Optional - only used for batch mode
totalDelayMs: number;
jobNamePrefix: string;
operation: string;
service: string;
provider: string;
priority?: number;
createJobData: (item: T, index: number) => any; // Simplified - no batchInfo parameter
createJobData: (item: T, index: number) => any;
removeOnComplete?: number;
removeOnFail?: number;
}
export interface BatchInfo {
batchIndex: number;
itemIndex: number; // Changed to match proxy provider
total: number; // Changed to match proxy provider
totalItems: number;
}
export interface BatchResult {
totalItems: number;
batchJobsCreated: number;
totalBatches: number;
avgItemsPerBatch: number;
estimatedDurationHours: number;
useBatching?: boolean; // Simple flag to choose mode
}
const logger = getLogger('batch-processor');
@ -35,218 +21,23 @@ export class BatchProcessor {
constructor(private queueManager: any) {}
/**
* Create batch jobs that will later create individual item jobs
* Unified method that handles both direct and batch approaches
*/
async createBatchJobs<T>(config: BatchConfig<T>): Promise<BatchResult> {
const {
items,
batchSize,
totalDelayMs,
jobNamePrefix,
operation,
service,
provider,
priority = 3
} = config;
async processItems<T>(config: BatchConfig<T>) {
const { items, useBatching = false } = config;
if (items.length === 0) {
return {
totalItems: 0,
batchJobsCreated: 0,
totalBatches: 0,
avgItemsPerBatch: 0,
estimatedDurationHours: 0
};
return { totalItems: 0, jobsCreated: 0 };
}
const totalBatches = Math.ceil(items.length / batchSize);
const delayPerBatch = Math.floor(totalDelayMs / totalBatches);
logger.info('Creating batch jobs', {
totalItems: items.length,
batchSize,
totalBatches,
delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes`,
estimatedDuration: `${(totalDelayMs / 1000 / 60 / 60).toFixed(1)} hours`,
jobPrefix: jobNamePrefix
});
const batchCreationChunkSize = 50;
let batchJobsCreated = 0;
for (let chunkStart = 0; chunkStart < totalBatches; chunkStart += batchCreationChunkSize) {
const chunkEnd = Math.min(chunkStart + batchCreationChunkSize, totalBatches);
const batchPromises = [];
for (let i = chunkStart; i < chunkEnd; i++) {
const startIndex = i * batchSize;
const endIndex = Math.min(startIndex + batchSize, items.length);
const batchItems = items.slice(startIndex, endIndex);
const delay = i * delayPerBatch;
const batchPromise = this.queueManager.addJob({
type: `${jobNamePrefix}-batch-processing`,
service,
provider,
operation: `process-${jobNamePrefix}-batch`, payload: {
items: batchItems,
batchIndex: i,
total: totalBatches, // Changed to total to match proxy provider
batchSize,
config: {
jobNamePrefix,
operation,
service,
provider,
priority: priority - 1, // Individual jobs get slightly lower priority
removeOnComplete: config.removeOnComplete || 5,
removeOnFail: config.removeOnFail || 5
}
},
priority
}, {
delay: delay,
jobId: `${jobNamePrefix}-batch-${i}-${Date.now()}`
});
batchPromises.push(batchPromise);
}
const results = await Promise.allSettled(batchPromises);
const successful = results.filter(r => r.status === 'fulfilled').length;
const failed = results.filter(r => r.status === 'rejected').length;
batchJobsCreated += successful;
logger.info('Batch chunk created', {
chunkStart: chunkStart + 1,
chunkEnd,
successful,
failed,
totalCreated: batchJobsCreated,
progress: `${((chunkEnd / totalBatches) * 100).toFixed(1)}%`
});
if (chunkEnd < totalBatches) {
await new Promise(resolve => setTimeout(resolve, 100));
}
}
const result = {
totalItems: items.length,
batchJobsCreated,
totalBatches,
avgItemsPerBatch: Math.floor(items.length / totalBatches),
estimatedDurationHours: totalDelayMs / 1000 / 60 / 60
};
logger.info('Batch jobs creation completed', result);
return result;
}
/**
* Process a batch by creating individual item jobs
*/ async processBatch<T>(payload: {
items: T[];
batchIndex: number;
total: number; // Changed to match proxy provider
batchSize: number;
config: {
jobNamePrefix: string;
operation: string;
service: string;
provider: string;
priority: number;
removeOnComplete: number;
removeOnFail: number;
};
}, createJobData: (item: T, index: number) => any): Promise<{
batchIndex: number;
totalItems: number;
jobsCreated: number;
jobsFailed: number;
}> {
const { items, batchIndex, total, config } = payload;
logger.info('Processing batch', {
batchIndex,
batchSize: items.length,
total,
progress: `${((batchIndex + 1) / total * 100).toFixed(2)}%`
});
// Spread items over a reasonable time period
const batchDelayMs = 15 * 60 * 1000; // 15 minutes per batch
const delayPerItem = Math.floor(batchDelayMs / items.length);
const jobsToCreate = items.map((item, i) => {
// Get user data first
const userData = createJobData(item, i);
// Automatically merge with batch info using your property names
const finalPayload = {
...userData,
batchIndex,
itemIndex: i, // Changed to match proxy provider
total, // Changed to match proxy provider
source: userData.source || 'batch-processing'
};
return {
name: `${config.jobNamePrefix}-processing`,
data: {
type: `${config.jobNamePrefix}-processing`,
service: config.service,
provider: config.provider,
operation: config.operation,
payload: finalPayload,
priority: config.priority
},
opts: {
delay: i * delayPerItem,
jobId: `${config.jobNamePrefix}-${batchIndex}-${i}-${Date.now()}`,
removeOnComplete: config.removeOnComplete,
removeOnFail: config.removeOnFail
}
};
});
try {
const jobs = await this.queueManager.queue.addBulk(jobsToCreate);
logger.info('Batch processing completed', {
batchIndex,
totalItems: items.length,
jobsCreated: jobs.length,
batchDelay: '15 minutes',
progress: `${((batchIndex + 1) / total * 100).toFixed(2)}%`
});
return {
batchIndex,
totalItems: items.length,
jobsCreated: jobs.length,
jobsFailed: 0
};
} catch (error) {
logger.error('Failed to create batch jobs', {
batchIndex,
batchSize: items.length,
error: error instanceof Error ? error.message : String(error)
});
return {
batchIndex,
totalItems: items.length,
jobsCreated: 0,
jobsFailed: items.length
};
if (useBatching) {
return await this.createBatchJobs(config);
} else {
return await this.createDirectJobs(config);
}
}
/**
* Directly create individual jobs without batching (simplified approach)
*/
async createDirectJobs<T>(config: BatchConfig<T>): Promise<{
totalItems: number;
jobsCreated: number;
}> {
private async createDirectJobs<T>(config: BatchConfig<T>) {
const {
items,
totalDelayMs,
@ -260,12 +51,8 @@ export class BatchProcessor {
removeOnFail = 3
} = config;
if (items.length === 0) {
return { totalItems: 0, jobsCreated: 0 };
}
const delayPerItem = Math.floor(totalDelayMs / items.length);
const createBatchSize = 100; // Create jobs in chunks
const chunkSize = 100;
let totalJobsCreated = 0;
logger.info('Creating direct jobs', {
@ -274,11 +61,12 @@ export class BatchProcessor {
estimatedDuration: `${(totalDelayMs / 1000 / 60 / 60).toFixed(1)} hours`
});
for (let i = 0; i < items.length; i += createBatchSize) {
const batch = items.slice(i, i + createBatchSize);
const jobsToCreate = batch.map((item, batchIndex) => {
const globalIndex = i + batchIndex;
// Process in chunks to avoid overwhelming Redis
for (let i = 0; i < items.length; i += chunkSize) {
const chunk = items.slice(i, i + chunkSize);
const jobs = chunk.map((item, chunkIndex) => {
const globalIndex = i + chunkIndex;
return {
name: `${jobNamePrefix}-processing`,
data: {
@ -299,10 +87,11 @@ export class BatchProcessor {
});
try {
const jobs = await this.queueManager.queue.addBulk(jobsToCreate);
totalJobsCreated += jobs.length;
const createdJobs = await this.queueManager.queue.addBulk(jobs);
totalJobsCreated += createdJobs.length;
if ((i + createBatchSize) % 500 === 0 || i + createBatchSize >= items.length) {
// Log progress every 500 jobs
if (totalJobsCreated % 500 === 0 || i + chunkSize >= items.length) {
logger.info('Direct job creation progress', {
created: totalJobsCreated,
total: items.length,
@ -310,14 +99,187 @@ export class BatchProcessor {
});
}
} catch (error) {
logger.error('Failed to create direct job batch', {
logger.error('Failed to create job chunk', {
startIndex: i,
batchSize: batch.length,
chunkSize: chunk.length,
error: error instanceof Error ? error.message : String(error)
});
}
}
return { totalItems: items.length, jobsCreated: totalJobsCreated };
return {
totalItems: items.length,
jobsCreated: totalJobsCreated,
mode: 'direct'
};
}
}
private async createBatchJobs<T>(config: BatchConfig<T>) {
const {
items,
batchSize = 200,
totalDelayMs,
jobNamePrefix,
operation,
service,
provider,
priority = 3
} = config;
const totalBatches = Math.ceil(items.length / batchSize);
const delayPerBatch = Math.floor(totalDelayMs / totalBatches);
const chunkSize = 50; // Create batch jobs in chunks
let batchJobsCreated = 0;
logger.info('Creating batch jobs', {
totalItems: items.length,
batchSize,
totalBatches,
delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes`
});
// Create batch jobs in chunks
for (let chunkStart = 0; chunkStart < totalBatches; chunkStart += chunkSize) {
const chunkEnd = Math.min(chunkStart + chunkSize, totalBatches);
const batchJobs = [];
for (let batchIndex = chunkStart; batchIndex < chunkEnd; batchIndex++) {
const startIndex = batchIndex * batchSize;
const endIndex = Math.min(startIndex + batchSize, items.length);
const batchItems = items.slice(startIndex, endIndex);
batchJobs.push({
name: `${jobNamePrefix}-batch-processing`,
data: {
type: `${jobNamePrefix}-batch-processing`,
service,
provider,
operation: `process-${jobNamePrefix}-batch`,
payload: {
items: batchItems,
batchIndex,
total: totalBatches,
config: { ...config, priority: priority - 1 }
},
priority
},
opts: {
delay: batchIndex * delayPerBatch,
jobId: `${jobNamePrefix}-batch-${batchIndex}-${Date.now()}`
}
});
}
try {
const createdJobs = await this.queueManager.queue.addBulk(batchJobs);
batchJobsCreated += createdJobs.length;
logger.info('Batch chunk created', {
chunkStart: chunkStart + 1,
chunkEnd,
created: createdJobs.length,
totalCreated: batchJobsCreated,
progress: `${((chunkEnd / totalBatches) * 100).toFixed(1)}%`
});
} catch (error) {
logger.error('Failed to create batch chunk', {
chunkStart,
chunkEnd,
error: error instanceof Error ? error.message : String(error)
});
}
// Small delay between chunks
if (chunkEnd < totalBatches) {
await new Promise(resolve => setTimeout(resolve, 100));
}
}
return {
totalItems: items.length,
batchJobsCreated,
totalBatches,
estimatedDurationHours: totalDelayMs / 1000 / 60 / 60,
mode: 'batch'
};
}
/**
* Process a batch (called by batch jobs)
*/
async processBatch<T>(payload: {
items: T[];
batchIndex: number;
total: number;
config: BatchConfig<T>;
}) {
const { items, batchIndex, total, config } = payload;
logger.info('Processing batch', {
batchIndex,
batchSize: items.length,
total,
progress: `${((batchIndex + 1) / total * 100).toFixed(2)}%`
});
const delayPerItem = Math.floor((15 * 60 * 1000) / items.length); // 15 min per batch
const jobs = items.map((item, itemIndex) => {
const userData = config.createJobData(item, itemIndex);
return {
name: `${config.jobNamePrefix}-processing`,
data: {
type: `${config.jobNamePrefix}-processing`,
service: config.service,
provider: config.provider,
operation: config.operation,
payload: {
...userData,
batchIndex,
itemIndex,
total,
source: userData.source || 'batch-processing'
},
priority: config.priority || 2
},
opts: {
delay: itemIndex * delayPerItem,
jobId: `${config.jobNamePrefix}-${batchIndex}-${itemIndex}-${Date.now()}`,
removeOnComplete: config.removeOnComplete || 5,
removeOnFail: config.removeOnFail || 3
}
};
});
try {
const createdJobs = await this.queueManager.queue.addBulk(jobs);
logger.info('Batch processing completed', {
batchIndex,
totalItems: items.length,
jobsCreated: createdJobs.length,
progress: `${((batchIndex + 1) / total * 100).toFixed(2)}%`
});
return {
batchIndex,
totalItems: items.length,
jobsCreated: createdJobs.length,
jobsFailed: 0
};
} catch (error) {
logger.error('Failed to process batch', {
batchIndex,
error: error instanceof Error ? error.message : String(error)
});
return {
batchIndex,
totalItems: items.length,
jobsCreated: 0,
jobsFailed: items.length
};
}
}
}