got app working with jobs scheduling
This commit is contained in:
parent
52c2f08db2
commit
ead13257b3
2 changed files with 102 additions and 16 deletions
|
|
@ -36,27 +36,45 @@ export class QueueService {
|
||||||
const connection = {
|
const connection = {
|
||||||
host: process.env.DRAGONFLY_HOST || 'localhost',
|
host: process.env.DRAGONFLY_HOST || 'localhost',
|
||||||
port: parseInt(process.env.DRAGONFLY_PORT || '6379'),
|
port: parseInt(process.env.DRAGONFLY_PORT || '6379'),
|
||||||
|
// Add these Redis-specific options to fix the undeclared key issue
|
||||||
|
maxRetriesPerRequest: 3,
|
||||||
|
retryDelayOnFailover: 100,
|
||||||
|
enableReadyCheck: false,
|
||||||
|
lazyConnect: true,
|
||||||
|
// Disable Redis Cluster mode if you're using standalone Redis/Dragonfly
|
||||||
|
enableOfflineQueue: false
|
||||||
};
|
};
|
||||||
|
|
||||||
this.logger.info('Connecting to Redis/Dragonfly', connection);
|
this.logger.info('Connecting to Redis/Dragonfly', connection);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
this.queue = new Queue('data-service-queue', { connection });
|
this.queue = new Queue('{data-service-queue}', {
|
||||||
this.worker = new Worker('data-service-queue', this.processJob.bind(this), {
|
|
||||||
connection,
|
connection,
|
||||||
concurrency: 10
|
defaultJobOptions: {
|
||||||
|
removeOnComplete: 10,
|
||||||
|
removeOnFail: 5,
|
||||||
|
attempts: 3,
|
||||||
|
backoff: {
|
||||||
|
type: 'exponential',
|
||||||
|
delay: 1000,
|
||||||
|
}
|
||||||
|
}
|
||||||
});
|
});
|
||||||
this.queueEvents = new QueueEvents('data-service-queue', { connection }); // Test connection
|
this.worker = new Worker('{data-service-queue}', this.processJob.bind(this), {
|
||||||
|
connection,
|
||||||
|
concurrency: 5, // Reduce concurrency to avoid overwhelming Redis
|
||||||
|
});
|
||||||
|
this.queueEvents = new QueueEvents('{data-service-queue}', { connection }); // Test connection
|
||||||
await this.queue.waitUntilReady();
|
await this.queue.waitUntilReady();
|
||||||
await this.worker.waitUntilReady();
|
await this.worker.waitUntilReady();
|
||||||
await this.queueEvents.waitUntilReady();
|
await this.queueEvents.waitUntilReady();
|
||||||
|
|
||||||
this.setupEventListeners();
|
this.setupEventListeners();
|
||||||
|
|
||||||
this.isInitialized = true;
|
this.isInitialized = true;
|
||||||
this.logger.info('Queue service initialized successfully');
|
this.logger.info('Queue service initialized successfully');
|
||||||
|
|
||||||
await this.setupScheduledTasks();
|
await this.setupScheduledTasks();
|
||||||
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
this.logger.error('Failed to initialize queue service', { error });
|
this.logger.error('Failed to initialize queue service', { error });
|
||||||
throw error;
|
throw error;
|
||||||
|
|
@ -144,6 +162,22 @@ export class QueueService {
|
||||||
try {
|
try {
|
||||||
this.logger.info('Setting up scheduled tasks from providers...');
|
this.logger.info('Setting up scheduled tasks from providers...');
|
||||||
|
|
||||||
|
// Clear any existing repeatable jobs first
|
||||||
|
const repeatableJobs = await this.queue.getRepeatableJobs();
|
||||||
|
this.logger.info(`Found ${repeatableJobs.length} existing repeatable jobs`);
|
||||||
|
|
||||||
|
for (const job of repeatableJobs) {
|
||||||
|
try {
|
||||||
|
await this.queue.removeJobScheduler(job.name);
|
||||||
|
this.logger.debug('Removed existing repeatable job', { name: job.name });
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.warn('Failed to remove existing repeatable job', {
|
||||||
|
name: job.name,
|
||||||
|
error: error instanceof Error ? error.message : String(error)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Get all scheduled jobs from all providers
|
// Get all scheduled jobs from all providers
|
||||||
const allScheduledJobs = providerRegistry.getAllScheduledJobs();
|
const allScheduledJobs = providerRegistry.getAllScheduledJobs();
|
||||||
|
|
||||||
|
|
@ -152,9 +186,15 @@ export class QueueService {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register each scheduled job
|
let successCount = 0;
|
||||||
|
let failureCount = 0;
|
||||||
|
|
||||||
|
// Register each scheduled job with delay between registrations
|
||||||
for (const { service, provider, job } of allScheduledJobs) {
|
for (const { service, provider, job } of allScheduledJobs) {
|
||||||
try {
|
try {
|
||||||
|
// Add a small delay between job registrations to avoid overwhelming Redis
|
||||||
|
await new Promise(resolve => setTimeout(resolve, 100));
|
||||||
|
|
||||||
await this.addRecurringJob({
|
await this.addRecurringJob({
|
||||||
type: job.type,
|
type: job.type,
|
||||||
service: service,
|
service: service,
|
||||||
|
|
@ -172,6 +212,9 @@ export class QueueService {
|
||||||
cronPattern: job.cronPattern,
|
cronPattern: job.cronPattern,
|
||||||
description: job.description
|
description: job.description
|
||||||
});
|
});
|
||||||
|
|
||||||
|
successCount++;
|
||||||
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
this.logger.error('Failed to register scheduled job', {
|
this.logger.error('Failed to register scheduled job', {
|
||||||
type: job.type,
|
type: job.type,
|
||||||
|
|
@ -179,10 +222,11 @@ export class QueueService {
|
||||||
provider,
|
provider,
|
||||||
error: error instanceof Error ? error.message : String(error)
|
error: error instanceof Error ? error.message : String(error)
|
||||||
});
|
});
|
||||||
|
failureCount++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
this.logger.info(`Successfully configured ${allScheduledJobs.length} scheduled tasks`);
|
this.logger.info(`Scheduled tasks setup complete: ${successCount} successful, ${failureCount} failed`);
|
||||||
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
this.logger.error('Failed to setup scheduled tasks', error);
|
this.logger.error('Failed to setup scheduled tasks', error);
|
||||||
|
|
@ -201,20 +245,60 @@ export class QueueService {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async addRecurringJob(jobData: JobData, cronPattern: string) {
|
async addRecurringJob(jobData: JobData, cronPattern: string, options?: any) {
|
||||||
if (!this.isInitialized) {
|
if (!this.isInitialized) {
|
||||||
throw new Error('Queue service not initialized. Call initialize() first.');
|
throw new Error('Queue service not initialized. Call initialize() first.');
|
||||||
}
|
}
|
||||||
return this.queue.add(
|
|
||||||
`recurring-${jobData.type}`,
|
try {
|
||||||
jobData,
|
// Create a unique job ID to avoid Redis key conflicts
|
||||||
{
|
const jobId = `recurring-${jobData.service}-${jobData.provider}-${jobData.operation}`;
|
||||||
repeat: { pattern: cronPattern },
|
|
||||||
|
// First, try to remove any existing recurring job with the same ID
|
||||||
|
try {
|
||||||
|
await this.queue.removeRepeatable(jobData.type, {
|
||||||
|
pattern: cronPattern,
|
||||||
|
jobId: jobId
|
||||||
|
});
|
||||||
|
} catch (removeError) {
|
||||||
|
// Ignore errors when removing non-existent jobs
|
||||||
|
this.logger.debug('No existing recurring job to remove', { jobId });
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the new recurring job with proper options
|
||||||
|
const job = await this.queue.add(jobData.type, jobData, {
|
||||||
|
repeat: {
|
||||||
|
pattern: cronPattern,
|
||||||
|
// Use UTC timezone to avoid timezone issues
|
||||||
|
tz: 'UTC'
|
||||||
|
},
|
||||||
|
jobId: jobId,
|
||||||
removeOnComplete: 1,
|
removeOnComplete: 1,
|
||||||
removeOnFail: 1,
|
removeOnFail: 1,
|
||||||
jobId: `recurring-${jobData.service}-${jobData.provider}-${jobData.operation}`
|
attempts: 2,
|
||||||
|
backoff: {
|
||||||
|
type: 'fixed',
|
||||||
|
delay: 5000
|
||||||
|
},
|
||||||
|
...options
|
||||||
|
});
|
||||||
|
|
||||||
|
this.logger.info('Recurring job added successfully', {
|
||||||
|
jobId: jobId,
|
||||||
|
type: jobData.type,
|
||||||
|
cronPattern: cronPattern
|
||||||
|
});
|
||||||
|
|
||||||
|
return job;
|
||||||
|
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.error('Failed to add recurring job', {
|
||||||
|
jobData,
|
||||||
|
cronPattern,
|
||||||
|
error: error instanceof Error ? error.message : String(error)
|
||||||
|
});
|
||||||
|
throw error;
|
||||||
}
|
}
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async getJobStats() {
|
async getJobStats() {
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,8 @@ services:
|
||||||
- --proactor_threads=8
|
- --proactor_threads=8
|
||||||
- --bind=0.0.0.0
|
- --bind=0.0.0.0
|
||||||
- --admin_port=6380
|
- --admin_port=6380
|
||||||
|
- --cluster_mode=emulated
|
||||||
|
- --lock_on_hashtags
|
||||||
volumes:
|
volumes:
|
||||||
- dragonfly_data:/data
|
- dragonfly_data:/data
|
||||||
restart: unless-stopped
|
restart: unless-stopped
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue