work on queue
This commit is contained in:
parent
bf2fa003b9
commit
9b8a7bdd4b
6 changed files with 488 additions and 420 deletions
|
|
@ -9,12 +9,13 @@ export interface JobData {
|
|||
operation: string;
|
||||
payload: any;
|
||||
priority?: number;
|
||||
immediately?: boolean;
|
||||
}
|
||||
|
||||
export class QueueService {
|
||||
private logger = new Logger('queue-service');
|
||||
private queue!: Queue;
|
||||
private worker!: Worker;
|
||||
private workers: Worker[] = [];
|
||||
private queueEvents!: QueueEvents;
|
||||
private isInitialized = false;
|
||||
|
||||
|
|
@ -45,6 +46,10 @@ export class QueueService {
|
|||
enableOfflineQueue: false
|
||||
};
|
||||
|
||||
// Worker configuration
|
||||
const workerCount = parseInt(process.env.WORKER_COUNT || '4');
|
||||
const concurrencyPerWorker = parseInt(process.env.WORKER_CONCURRENCY || '20');
|
||||
|
||||
this.logger.info('Connecting to Redis/Dragonfly', connection);
|
||||
|
||||
try {
|
||||
|
|
@ -60,13 +65,34 @@ export class QueueService {
|
|||
}
|
||||
}
|
||||
});
|
||||
this.worker = new Worker('{data-service-queue}', this.processJob.bind(this), {
|
||||
connection,
|
||||
concurrency: 5, // Reduce concurrency to avoid overwhelming Redis
|
||||
});
|
||||
// Create multiple workers
|
||||
for (let i = 0; i < workerCount; i++) {
|
||||
const worker = new Worker(
|
||||
'{data-service-queue}',
|
||||
this.processJob.bind(this),
|
||||
{
|
||||
connection: { ...connection }, // Each worker gets its own connection
|
||||
concurrency: concurrencyPerWorker,
|
||||
maxStalledCount: 1,
|
||||
stalledInterval: 30000,
|
||||
}
|
||||
);
|
||||
// Add worker-specific logging
|
||||
worker.on('ready', () => {
|
||||
this.logger.info(`Worker ${i + 1} ready`, { workerId: i + 1 });
|
||||
});
|
||||
|
||||
worker.on('error', (error) => {
|
||||
this.logger.error(`Worker ${i + 1} error`, { workerId: i + 1, error });
|
||||
});
|
||||
|
||||
this.workers.push(worker);
|
||||
}
|
||||
this.queueEvents = new QueueEvents('{data-service-queue}', { connection }); // Test connection
|
||||
|
||||
// Wait for all workers to be ready
|
||||
await this.queue.waitUntilReady();
|
||||
await this.worker.waitUntilReady();
|
||||
await Promise.all(this.workers.map(worker => worker.waitUntilReady()));
|
||||
await this.queueEvents.waitUntilReady();
|
||||
|
||||
this.setupEventListeners();
|
||||
|
|
@ -81,6 +107,16 @@ export class QueueService {
|
|||
}
|
||||
}
|
||||
|
||||
// Update getTotalConcurrency method
|
||||
getTotalConcurrency() {
|
||||
if (!this.isInitialized) {
|
||||
return 0;
|
||||
}
|
||||
return this.workers.reduce((total, worker) => {
|
||||
return total + (worker.opts.concurrency || 1);
|
||||
}, 0);
|
||||
}
|
||||
|
||||
private async registerProviders() {
|
||||
this.logger.info('Registering providers...');
|
||||
|
||||
|
|
@ -159,79 +195,104 @@ export class QueueService {
|
|||
});
|
||||
}
|
||||
private async setupScheduledTasks() {
|
||||
try {
|
||||
this.logger.info('Setting up scheduled tasks from providers...');
|
||||
|
||||
// Clear any existing repeatable jobs first
|
||||
const repeatableJobs = await this.queue.getRepeatableJobs();
|
||||
this.logger.info(`Found ${repeatableJobs.length} existing repeatable jobs`);
|
||||
|
||||
for (const job of repeatableJobs) {
|
||||
try {
|
||||
await this.queue.removeJobScheduler(job.name);
|
||||
this.logger.debug('Removed existing repeatable job', { name: job.name });
|
||||
} catch (error) {
|
||||
this.logger.warn('Failed to remove existing repeatable job', {
|
||||
name: job.name,
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Get all scheduled jobs from all providers
|
||||
const allScheduledJobs = providerRegistry.getAllScheduledJobs();
|
||||
|
||||
if (allScheduledJobs.length === 0) {
|
||||
this.logger.warn('No scheduled jobs found in providers');
|
||||
return;
|
||||
}
|
||||
|
||||
let successCount = 0;
|
||||
let failureCount = 0;
|
||||
|
||||
// Register each scheduled job with delay between registrations
|
||||
for (const { service, provider, job } of allScheduledJobs) {
|
||||
try {
|
||||
// Add a small delay between job registrations to avoid overwhelming Redis
|
||||
await new Promise(resolve => setTimeout(resolve, 100));
|
||||
|
||||
await this.addRecurringJob({
|
||||
type: job.type,
|
||||
service: service,
|
||||
provider: provider,
|
||||
operation: job.operation,
|
||||
payload: job.payload,
|
||||
priority: job.priority
|
||||
}, job.cronPattern);
|
||||
|
||||
this.logger.info('Scheduled job registered', {
|
||||
type: job.type,
|
||||
service,
|
||||
provider,
|
||||
operation: job.operation,
|
||||
cronPattern: job.cronPattern,
|
||||
description: job.description
|
||||
});
|
||||
|
||||
successCount++;
|
||||
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to register scheduled job', {
|
||||
type: job.type,
|
||||
service,
|
||||
provider,
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
failureCount++;
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.info(`Scheduled tasks setup complete: ${successCount} successful, ${failureCount} failed`);
|
||||
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to setup scheduled tasks', error);
|
||||
try {
|
||||
this.logger.info('Setting up scheduled tasks from providers...');
|
||||
|
||||
// Get all scheduled jobs from all providers
|
||||
const allScheduledJobs = providerRegistry.getAllScheduledJobs();
|
||||
|
||||
if (allScheduledJobs.length === 0) {
|
||||
this.logger.warn('No scheduled jobs found in providers');
|
||||
return;
|
||||
}
|
||||
|
||||
// Get existing repeatable jobs for comparison
|
||||
const existingJobs = await this.queue.getRepeatableJobs();
|
||||
this.logger.info(`Found ${existingJobs.length} existing repeatable jobs`);
|
||||
|
||||
let successCount = 0;
|
||||
let failureCount = 0;
|
||||
let updatedCount = 0;
|
||||
let newCount = 0;
|
||||
|
||||
// Process each scheduled job
|
||||
for (const { service, provider, job } of allScheduledJobs) {
|
||||
try {
|
||||
const jobKey = `${service}-${provider}-${job.operation}`;
|
||||
|
||||
// Check if this job already exists
|
||||
const existingJob = existingJobs.find(existing =>
|
||||
existing.key?.includes(jobKey) || existing.name === job.type
|
||||
);
|
||||
|
||||
if (existingJob) {
|
||||
// Check if the job needs updating (different cron pattern or config)
|
||||
const needsUpdate = existingJob.pattern !== job.cronPattern;
|
||||
|
||||
if (needsUpdate) {
|
||||
this.logger.info('Job configuration changed, updating', {
|
||||
jobKey,
|
||||
oldPattern: existingJob.pattern,
|
||||
newPattern: job.cronPattern
|
||||
});
|
||||
updatedCount++;
|
||||
} else {
|
||||
this.logger.debug('Job unchanged, skipping', { jobKey });
|
||||
successCount++;
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
newCount++;
|
||||
}
|
||||
|
||||
// Add delay between job registrations
|
||||
await new Promise(resolve => setTimeout(resolve, 100));
|
||||
|
||||
await this.addRecurringJob({
|
||||
type: job.type,
|
||||
service: service,
|
||||
provider: provider,
|
||||
operation: job.operation,
|
||||
payload: job.payload,
|
||||
priority: job.priority,
|
||||
immediately: job.immediately || false
|
||||
}, job.cronPattern);
|
||||
|
||||
this.logger.info('Scheduled job registered', {
|
||||
type: job.type,
|
||||
service,
|
||||
provider,
|
||||
operation: job.operation,
|
||||
cronPattern: job.cronPattern,
|
||||
description: job.description,
|
||||
immediately: job.immediately || false
|
||||
});
|
||||
|
||||
successCount++;
|
||||
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to register scheduled job', {
|
||||
type: job.type,
|
||||
service,
|
||||
provider,
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
failureCount++;
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.info(`Scheduled tasks setup complete`, {
|
||||
total: allScheduledJobs.length,
|
||||
successful: successCount,
|
||||
failed: failureCount,
|
||||
updated: updatedCount,
|
||||
new: newCount
|
||||
});
|
||||
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to setup scheduled tasks', error);
|
||||
}
|
||||
}
|
||||
|
||||
async addJob(jobData: JobData, options?: any) {
|
||||
if (!this.isInitialized) {
|
||||
|
|
@ -251,28 +312,43 @@ export class QueueService {
|
|||
}
|
||||
|
||||
try {
|
||||
// Create a unique job ID to avoid Redis key conflicts
|
||||
const jobId = `recurring-${jobData.service}-${jobData.provider}-${jobData.operation}`;
|
||||
// Create a unique job key for this specific job
|
||||
const jobKey = `${jobData.service}-${jobData.provider}-${jobData.operation}`;
|
||||
|
||||
// First, try to remove any existing recurring job with the same ID
|
||||
try {
|
||||
await this.queue.removeRepeatable(jobData.type, {
|
||||
pattern: cronPattern,
|
||||
jobId: jobId
|
||||
// Get all existing repeatable jobs
|
||||
const existingJobs = await this.queue.getRepeatableJobs();
|
||||
|
||||
// Find and remove the existing job with the same key if it exists
|
||||
const existingJob = existingJobs.find(job => {
|
||||
// Check if this is the same job by comparing key components
|
||||
return job.key?.includes(jobKey) || job.name === jobData.type;
|
||||
});
|
||||
|
||||
if (existingJob) {
|
||||
this.logger.info('Updating existing recurring job', {
|
||||
jobKey,
|
||||
existingPattern: existingJob.pattern,
|
||||
newPattern: cronPattern
|
||||
});
|
||||
} catch (removeError) {
|
||||
// Ignore errors when removing non-existent jobs
|
||||
this.logger.debug('No existing recurring job to remove', { jobId });
|
||||
|
||||
// Remove the existing job
|
||||
await this.queue.removeRepeatableByKey(existingJob.key);
|
||||
|
||||
// Small delay to ensure cleanup is complete
|
||||
await new Promise(resolve => setTimeout(resolve, 100));
|
||||
} else {
|
||||
this.logger.info('Creating new recurring job', { jobKey, cronPattern });
|
||||
}
|
||||
|
||||
// Add the new recurring job with proper options
|
||||
// Add the new/updated recurring job
|
||||
const job = await this.queue.add(jobData.type, jobData, {
|
||||
repeat: {
|
||||
pattern: cronPattern,
|
||||
// Use UTC timezone to avoid timezone issues
|
||||
tz: 'UTC'
|
||||
tz: 'UTC',
|
||||
immediately: jobData.immediately || false,
|
||||
},
|
||||
jobId: jobId,
|
||||
// Use a consistent jobId for this specific recurring job
|
||||
jobId: `recurring-${jobKey}`,
|
||||
removeOnComplete: 1,
|
||||
removeOnFail: 1,
|
||||
attempts: 2,
|
||||
|
|
@ -283,16 +359,17 @@ export class QueueService {
|
|||
...options
|
||||
});
|
||||
|
||||
this.logger.info('Recurring job added successfully', {
|
||||
jobId: jobId,
|
||||
this.logger.info('Recurring job added/updated successfully', {
|
||||
jobKey,
|
||||
type: jobData.type,
|
||||
cronPattern: cronPattern
|
||||
cronPattern,
|
||||
immediately: jobData.immediately || false
|
||||
});
|
||||
|
||||
return job;
|
||||
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to add recurring job', {
|
||||
this.logger.error('Failed to add/update recurring job', {
|
||||
jobData,
|
||||
cronPattern,
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
|
|
@ -363,7 +440,8 @@ export class QueueService {
|
|||
operation: job.operation,
|
||||
cronPattern: job.cronPattern,
|
||||
priority: job.priority,
|
||||
description: job.description
|
||||
description: job.description,
|
||||
immediately: job.immediately || false
|
||||
}));
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue