simplifid queue service

This commit is contained in:
Boki 2025-06-11 07:28:47 -04:00
parent 709fc347e9
commit b645b58102
2 changed files with 137 additions and 194 deletions

View file

@ -35,7 +35,7 @@ export const proxyProvider: ProviderConfig = {
source: 'batch-processing' source: 'batch-processing'
}), }),
queueManager, { queueManager, {
totalDelayMs: parseInt(process.env.PROXY_VALIDATION_HOURS || '4') * 60 * 60 * 1000, totalDelayMs: parseInt(process.env.PROXY_VALIDATION_HOURS || '7') * 60 * 60 * 1000,
batchSize: parseInt(process.env.PROXY_BATCH_SIZE || '200'), batchSize: parseInt(process.env.PROXY_BATCH_SIZE || '200'),
useBatching: process.env.PROXY_DIRECT_MODE !== 'true', useBatching: process.env.PROXY_DIRECT_MODE !== 'true',
priority: 2, priority: 2,
@ -114,16 +114,16 @@ export const proxyProvider: ProviderConfig = {
} }
}, },
scheduledJobs: [ scheduledJobs: [
{ // {
type: 'proxy-maintenance', // type: 'proxy-maintenance',
operation: 'fetch-and-check', // operation: 'fetch-and-check',
payload: {}, // payload: {},
// should remove and just run at the same time so app restarts dont keeping adding same jobs // // should remove and just run at the same time so app restarts dont keeping adding same jobs
cronPattern: getEvery24HourCron(), // cronPattern: getEvery24HourCron(),
priority: 5, // priority: 5,
immediately: true, // Don't run immediately during startup to avoid conflicts // immediately: true, // Don't run immediately during startup to avoid conflicts
description: 'Fetch and validate proxy list from sources' // description: 'Fetch and validate proxy list from sources'
} // }
] ]
}; };

View file

@ -8,14 +8,22 @@ export class QueueService {
private workers: Worker[] = []; private workers: Worker[] = [];
private queueEvents!: QueueEvents; private queueEvents!: QueueEvents;
private config = {
workers: parseInt(process.env.WORKER_COUNT || '5'),
concurrency: parseInt(process.env.WORKER_CONCURRENCY || '20'),
redis: {
host: process.env.DRAGONFLY_HOST || 'localhost',
port: parseInt(process.env.DRAGONFLY_PORT || '6379')
}
};
private get isInitialized() { private get isInitialized() {
return !!this.queue; return !!this.queue;
} }
constructor() { constructor() {
// Don't initialize in constructor to allow for proper async initialization // Don't initialize in constructor to allow for proper async initialization
} } async initialize() {
async initialize() {
if (this.isInitialized) { if (this.isInitialized) {
this.logger.warn('Queue service already initialized'); this.logger.warn('Queue service already initialized');
return; return;
@ -23,105 +31,94 @@ export class QueueService {
this.logger.info('Initializing queue service...'); this.logger.info('Initializing queue service...');
// Register all providers first
await this.registerProviders();
const connection = this.getConnection();
const queueName = '{data-service-queue}';
try { try {
// Step 1: Register providers
await this.registerProviders();
// Step 2: Setup queue and workers
const connection = this.getConnection();
const queueName = '{data-service-queue}';
this.queue = new Queue(queueName, { this.queue = new Queue(queueName, {
connection, connection,
defaultJobOptions: { defaultJobOptions: {
removeOnComplete: 10, removeOnComplete: 10,
removeOnFail: 5, removeOnFail: 5,
attempts: 3, attempts: 3,
backoff: { backoff: { type: 'exponential', delay: 1000 }
type: 'exponential',
delay: 1000,
}
} }
}); });
// Create workers (keeping same count as before)
const workerCount = parseInt(process.env.WORKER_COUNT || '5');
const concurrencyPerWorker = parseInt(process.env.WORKER_CONCURRENCY || '20');
for (let i = 0; i < workerCount; i++) {
const worker = new Worker(
queueName,
this.processJob.bind(this),
{
connection: { ...connection },
concurrency: concurrencyPerWorker,
maxStalledCount: 1,
stalledInterval: 30000,
}
);
this.setupWorkerEvents(worker, i);
this.workers.push(worker);
}
this.queueEvents = new QueueEvents(queueName, { connection }); this.queueEvents = new QueueEvents(queueName, { connection });
// Wait for readiness // Step 3: Create workers
await this.queue.waitUntilReady(); const { workerCount, totalConcurrency } = this.createWorkers(queueName, connection);
await Promise.all(this.workers.map(worker => worker.waitUntilReady()));
await this.queueEvents.waitUntilReady();
// Step 4: Wait for readiness (parallel)
await Promise.all([
this.queue.waitUntilReady(),
this.queueEvents.waitUntilReady(),
...this.workers.map(worker => worker.waitUntilReady())
]);
// Step 5: Setup events and scheduled tasks
this.setupQueueEvents(); this.setupQueueEvents();
await this.setupScheduledTasks(); await this.setupScheduledTasks();
this.logger.info('Queue service initialized successfully', { this.logger.info('Queue service initialized successfully', {
workers: this.workers.length, workers: workerCount,
totalConcurrency: workerCount * concurrencyPerWorker totalConcurrency
}); });
} catch (error) { } catch (error) {
this.logger.error('Failed to initialize queue service', { error }); this.logger.error('Failed to initialize queue service', { error });
throw error; throw error;
} }
} } private getConnection() {
// Update getTotalConcurrency method
getTotalConcurrency() {
if (!this.isInitialized) {
return 0;
}
return this.workers.reduce((total, worker) => {
return total + (worker.opts.concurrency || 1);
}, 0);
}
private getConnection() {
return { return {
host: process.env.DRAGONFLY_HOST || 'localhost', ...this.config.redis,
port: parseInt(process.env.DRAGONFLY_PORT || '6379'),
maxRetriesPerRequest: null, maxRetriesPerRequest: null,
retryDelayOnFailover: 100, retryDelayOnFailover: 100,
lazyConnect: false lazyConnect: false
}; };
} }
private setupWorkerEvents(worker: Worker, index: number) { private createWorkers(queueName: string, connection: any) {
worker.on('ready', () => { for (let i = 0; i < this.config.workers; i++) {
this.logger.info(`Worker ${index + 1} ready`); const worker = new Worker(queueName, this.processJob.bind(this), {
connection: { ...connection },
concurrency: this.config.concurrency,
maxStalledCount: 1,
stalledInterval: 30000,
});
// Setup events inline
worker.on('ready', () => this.logger.info(`Worker ${i + 1} ready`));
worker.on('error', (error) => this.logger.error(`Worker ${i + 1} error`, { error }));
this.workers.push(worker);
}
return {
workerCount: this.config.workers,
totalConcurrency: this.config.workers * this.config.concurrency
};
} private setupQueueEvents() {
// Only log failures, not every completion
this.queueEvents.on('failed', (job, error) => {
this.logger.error('Job failed', {
id: job.jobId,
error: String(error)
});
}); });
worker.on('error', (error) => { // Only log completions in debug mode
this.logger.error(`Worker ${index + 1} error`, { error }); if (process.env.LOG_LEVEL === 'debug') {
}); this.queueEvents.on('completed', (job) => {
} this.logger.debug('Job completed', { id: job.jobId });
});
private setupQueueEvents() { }
this.queueEvents.on('completed', (job) => { }private async registerProviders() {
this.logger.debug('Job completed', { id: job.jobId });
});
this.queueEvents.on('failed', (job) => {
this.logger.error('Job failed', { id: job.jobId, error: job.failedReason });
});
} private async registerProviders() {
this.logger.info('Registering providers...'); this.logger.info('Registering providers...');
try { try {
@ -187,72 +184,63 @@ export class QueueService {
}); });
throw error; throw error;
} }
} } async addBulk(jobs: any[]): Promise<any[]> {
async addBulk(jobs: any[]): Promise<any[]> {
return await this.queue.addBulk(jobs); return await this.queue.addBulk(jobs);
} }
private async setupScheduledTasks() { private getTotalConcurrency() {
try { return this.workers.reduce((total, worker) => total + (worker.opts.concurrency || 1), 0);
this.logger.info('Setting up scheduled tasks from providers...');
const allScheduledJobs = providerRegistry.getAllScheduledJobs();
if (allScheduledJobs.length === 0) {
this.logger.warn('No scheduled jobs found in providers');
return;
}
let successCount = 0;
let failureCount = 0;
// Process each scheduled job - simplified without complex update logic
for (const { provider, job } of allScheduledJobs) {
try {
await this.addRecurringJob({
type: job.type,
provider: provider,
operation: job.operation,
payload: job.payload,
priority: job.priority,
immediately: job.immediately || false
}, job.cronPattern);
this.logger.info('Scheduled job registered', {
type: job.type,
provider,
operation: job.operation,
cronPattern: job.cronPattern,
description: job.description,
immediately: job.immediately || false
});
successCount++;
} catch (error) {
this.logger.error('Failed to register scheduled job', {
type: job.type,
provider,
error: error instanceof Error ? error.message : String(error)
});
failureCount++;
}
}
this.logger.info(`Scheduled tasks setup complete`, {
total: allScheduledJobs.length,
successful: successCount,
failed: failureCount
});
} catch (error) {
this.logger.error('Failed to setup scheduled tasks', error);
}
} }
async addJob(jobData: JobData, options?: any) { private async setupScheduledTasks() {
if (!this.isInitialized) { const allScheduledJobs = providerRegistry.getAllScheduledJobs();
throw new Error('Queue service not initialized. Call initialize() first.');
if (allScheduledJobs.length === 0) {
this.logger.warn('No scheduled jobs found in providers');
return;
} }
this.logger.info('Setting up scheduled tasks...', { count: allScheduledJobs.length });
// Use Promise.allSettled for parallel processing + better error handling
const results = await Promise.allSettled(
allScheduledJobs.map(async ({ provider, job }) => {
await this.addRecurringJob({
type: job.type,
provider,
operation: job.operation,
payload: job.payload,
priority: job.priority,
immediately: job.immediately || false
}, job.cronPattern);
return { provider, operation: job.operation };
})
);
// Log results
const successful = results.filter(r => r.status === 'fulfilled');
const failed = results.filter(r => r.status === 'rejected');
if (failed.length > 0) {
failed.forEach((result, index) => {
const { provider, job } = allScheduledJobs[index];
this.logger.error('Failed to register scheduled job', {
provider,
operation: job.operation,
error: result.reason
});
});
}
this.logger.info('Scheduled tasks setup complete', {
successful: successful.length,
failed: failed.length
});
} private async addJobInternal(jobData: JobData, options: any = {}) {
if (!this.isInitialized) {
throw new Error('Queue service not initialized');
}
const jobType = jobData.type || `${jobData.provider}-${jobData.operation}`; const jobType = jobData.type || `${jobData.provider}-${jobData.operation}`;
return this.queue.add(jobType, jobData, { return this.queue.add(jobType, jobData, {
priority: jobData.priority || 0, priority: jobData.priority || 0,
@ -261,22 +249,19 @@ export class QueueService {
...options ...options
}); });
} }
async addRecurringJob(jobData: JobData, cronPattern: string, options?: any) {
if (!this.isInitialized) {
throw new Error('Queue service not initialized. Call initialize() first.');
}
async addJob(jobData: JobData, options?: any) {
return this.addJobInternal(jobData, options);
} async addRecurringJob(jobData: JobData, cronPattern: string, options?: any) {
const jobKey = `recurring-${jobData.provider}-${jobData.operation}`; const jobKey = `recurring-${jobData.provider}-${jobData.operation}`;
// Let BullMQ handle duplicate prevention with consistent jobId return this.addJobInternal(jobData, {
const jobType = jobData.type || `${jobData.provider}-${jobData.operation}`;
const job = await this.queue.add(jobType, jobData, {
repeat: { repeat: {
pattern: cronPattern, pattern: cronPattern,
tz: 'UTC', tz: 'UTC',
immediately: jobData.immediately || false, immediately: jobData.immediately || false,
}, },
jobId: jobKey, // Consistent ID prevents duplicates jobId: jobKey,
removeOnComplete: 1, removeOnComplete: 1,
removeOnFail: 1, removeOnFail: 1,
attempts: 2, attempts: 2,
@ -286,15 +271,6 @@ export class QueueService {
}, },
...options ...options
}); });
this.logger.info('Recurring job added successfully', {
jobKey,
type: jobData.type,
cronPattern,
immediately: jobData.immediately || false
});
return job;
} }
async getJobStats() { async getJobStats() {
if (!this.isInitialized) { if (!this.isInitialized) {
@ -322,50 +298,18 @@ export class QueueService {
await this.queue.drain(); await this.queue.drain();
} }
} }
async getQueueStatus() { async getQueueStatus() {
if (!this.isInitialized) { if (!this.isInitialized) {
throw new Error('Queue service not initialized. Call initialize() first.'); throw new Error('Queue service not initialized');
} }
const stats = await this.getJobStats(); const stats = await this.getJobStats();
return { return {
...stats, ...stats,
workers: this.workers.length, workers: this.workers.length,
totalConcurrency: this.getTotalConcurrency(), concurrency: this.getTotalConcurrency()
queue: this.queue.name,
connection: {
host: process.env.DRAGONFLY_HOST || 'localhost',
port: parseInt(process.env.DRAGONFLY_PORT || '6379')
}
}; };
} }
getWorkerCount() {
return this.workers.length;
}
getRegisteredProviders() {
return providerRegistry.getProviders().map(({ key, config }) => ({
key,
name: config.name,
operations: Object.keys(config.operations),
scheduledJobs: config.scheduledJobs?.length || 0
}));
}
getScheduledJobsInfo() {
return providerRegistry.getAllScheduledJobs().map(({ provider, job }) => ({
id: `${provider}-${job.type}`,
provider,
type: job.type,
operation: job.operation,
cronPattern: job.cronPattern,
priority: job.priority,
description: job.description,
immediately: job.immediately || false
}));
}
async shutdown() { async shutdown() {
if (!this.isInitialized) { if (!this.isInitialized) {
this.logger.warn('Queue service not initialized, nothing to shutdown'); this.logger.warn('Queue service not initialized, nothing to shutdown');
@ -375,7 +319,6 @@ export class QueueService {
this.logger.info('Shutting down queue service'); this.logger.info('Shutting down queue service');
// Close all workers // Close all workers
this.logger.info(`Closing ${this.workers.length} workers...`);
await Promise.all(this.workers.map((worker, index) => { await Promise.all(this.workers.map((worker, index) => {
this.logger.debug(`Closing worker ${index + 1}`); this.logger.debug(`Closing worker ${index + 1}`);
return worker.close(); return worker.close();