queue service simplification

This commit is contained in:
Boki 2025-06-10 23:35:33 -04:00
parent 423b40866c
commit 709fc347e9

View file

@ -1,18 +1,20 @@
import { Queue, Worker, QueueEvents } from 'bullmq'; import { Queue, Worker, QueueEvents, type Job } from 'bullmq';
import { getLogger } from '@stock-bot/logger'; import { getLogger } from '@stock-bot/logger';
import { providerRegistry, JobData } from './provider-registry.service'; import { providerRegistry, type JobData } from './provider-registry.service';
export class QueueService { export class QueueService {
private logger = getLogger('queue-service'); private logger = getLogger('queue-service');
private queue!: Queue; private queue!: Queue;
private workers: Worker[] = []; private workers: Worker[] = [];
private queueEvents!: QueueEvents; private queueEvents!: QueueEvents;
private isInitialized = false;
private get isInitialized() {
return !!this.queue;
}
constructor() { constructor() {
// Don't initialize in constructor to allow for proper async initialization // Don't initialize in constructor to allow for proper async initialization
} }
async initialize() { async initialize() {
if (this.isInitialized) { if (this.isInitialized) {
this.logger.warn('Queue service already initialized'); this.logger.warn('Queue service already initialized');
@ -24,26 +26,11 @@ export class QueueService {
// Register all providers first // Register all providers first
await this.registerProviders(); await this.registerProviders();
const connection = { const connection = this.getConnection();
host: process.env.DRAGONFLY_HOST || 'localhost', const queueName = '{data-service-queue}';
port: parseInt(process.env.DRAGONFLY_PORT || '6379'),
// Add these Redis-specific options to fix the undeclared key issue
maxRetriesPerRequest: null,
retryDelayOnFailover: 100,
enableReadyCheck: false,
lazyConnect: false,
// Disable Redis Cluster mode if you're using standalone Redis/Dragonfly
enableOfflineQueue: true
};
// Worker configuration
const workerCount = parseInt(process.env.WORKER_COUNT || '5');
const concurrencyPerWorker = parseInt(process.env.WORKER_CONCURRENCY || '20');
this.logger.info('Connecting to Redis/Dragonfly', connection);
try { try {
this.queue = new Queue('{data-service-queue}', { this.queue = new Queue(queueName, {
connection, connection,
defaultJobOptions: { defaultJobOptions: {
removeOnComplete: 10, removeOnComplete: 10,
@ -55,48 +42,47 @@ export class QueueService {
} }
} }
}); });
// Create multiple workers
// Create workers (keeping same count as before)
const workerCount = parseInt(process.env.WORKER_COUNT || '5');
const concurrencyPerWorker = parseInt(process.env.WORKER_CONCURRENCY || '20');
for (let i = 0; i < workerCount; i++) { for (let i = 0; i < workerCount; i++) {
const worker = new Worker( const worker = new Worker(
'{data-service-queue}', queueName,
this.processJob.bind(this), this.processJob.bind(this),
{ {
connection: { ...connection }, // Each worker gets its own connection connection: { ...connection },
concurrency: concurrencyPerWorker, concurrency: concurrencyPerWorker,
maxStalledCount: 1, maxStalledCount: 1,
stalledInterval: 30000, stalledInterval: 30000,
} }
); );
// Add worker-specific logging
worker.on('ready', () => {
this.logger.info(`Worker ${i + 1} ready`, { workerId: i + 1 });
});
worker.on('error', (error) => {
this.logger.error(`Worker ${i + 1} error`, { workerId: i + 1, error });
});
this.setupWorkerEvents(worker, i);
this.workers.push(worker); this.workers.push(worker);
} }
this.queueEvents = new QueueEvents('{data-service-queue}', { connection }); // Test connection
// Wait for all workers to be ready this.queueEvents = new QueueEvents(queueName, { connection });
// Wait for readiness
await this.queue.waitUntilReady(); await this.queue.waitUntilReady();
await Promise.all(this.workers.map(worker => worker.waitUntilReady())); await Promise.all(this.workers.map(worker => worker.waitUntilReady()));
await this.queueEvents.waitUntilReady(); await this.queueEvents.waitUntilReady();
this.setupEventListeners(); this.setupQueueEvents();
this.isInitialized = true;
this.logger.info('Queue service initialized successfully');
await this.setupScheduledTasks(); await this.setupScheduledTasks();
this.logger.info('Queue service initialized successfully', {
workers: this.workers.length,
totalConcurrency: workerCount * concurrencyPerWorker
});
} catch (error) { } catch (error) {
this.logger.error('Failed to initialize queue service', { error }); this.logger.error('Failed to initialize queue service', { error });
throw error; throw error;
} }
} }
// Update getTotalConcurrency method // Update getTotalConcurrency method
getTotalConcurrency() { getTotalConcurrency() {
if (!this.isInitialized) { if (!this.isInitialized) {
@ -107,25 +93,57 @@ export class QueueService {
}, 0); }, 0);
} }
private async registerProviders() { private getConnection() {
return {
host: process.env.DRAGONFLY_HOST || 'localhost',
port: parseInt(process.env.DRAGONFLY_PORT || '6379'),
maxRetriesPerRequest: null,
retryDelayOnFailover: 100,
lazyConnect: false
};
}
private setupWorkerEvents(worker: Worker, index: number) {
worker.on('ready', () => {
this.logger.info(`Worker ${index + 1} ready`);
});
worker.on('error', (error) => {
this.logger.error(`Worker ${index + 1} error`, { error });
});
}
private setupQueueEvents() {
this.queueEvents.on('completed', (job) => {
this.logger.debug('Job completed', { id: job.jobId });
});
this.queueEvents.on('failed', (job) => {
this.logger.error('Job failed', { id: job.jobId, error: job.failedReason });
});
} private async registerProviders() {
this.logger.info('Registering providers...'); this.logger.info('Registering providers...');
try { try {
// Import and register all providers // Define providers to register
const { proxyProvider } = await import('../providers/proxy.provider'); const providers = [
const { quotemediaProvider } = await import('../providers/quotemedia.provider'); { module: '../providers/proxy.provider', export: 'proxyProvider' },
const { yahooProvider } = await import('../providers/yahoo.provider'); { module: '../providers/quotemedia.provider', export: 'quotemediaProvider' },
{ module: '../providers/yahoo.provider', export: 'yahooProvider' }
];
providerRegistry.registerProvider(proxyProvider); // Import and register all providers
providerRegistry.registerProvider(quotemediaProvider); for (const { module, export: exportName } of providers) {
providerRegistry.registerProvider(yahooProvider); const providerModule = await import(module);
providerRegistry.registerProvider(providerModule[exportName]);
}
this.logger.info('All providers registered successfully'); this.logger.info('All providers registered successfully');
} catch (error) { } catch (error) {
this.logger.error('Failed to register providers', { error }); this.logger.error('Failed to register providers', { error });
throw error; throw error;
} }
} private async processJob(job: any) { }private async processJob(job: Job) {
const { provider, operation, payload }: JobData = job.data; const { provider, operation, payload }: JobData = job.data;
this.logger.info('Processing job', { this.logger.info('Processing job', {
@ -133,24 +151,23 @@ export class QueueService {
provider, provider,
operation, operation,
payloadKeys: Object.keys(payload || {}) payloadKeys: Object.keys(payload || {})
}); }); try {
let result;
try {
// Handle special batch processing jobs
if (operation === 'process-batch-items') { if (operation === 'process-batch-items') {
// Special handling for batch processing - requires 2 parameters
const { processBatchJob } = await import('../utils/batch-helpers'); const { processBatchJob } = await import('../utils/batch-helpers');
return await processBatchJob(payload, this); result = await processBatchJob(payload, this);
} } else {
// Regular handler lookup - requires 1 parameter
// Get handler from registry
const handler = providerRegistry.getHandler(provider, operation); const handler = providerRegistry.getHandler(provider, operation);
if (!handler) { if (!handler) {
throw new Error(`No handler found for ${provider}:${operation}`); throw new Error(`No handler found for ${provider}:${operation}`);
} }
// Execute the handler result = await handler(payload);
const result = await handler(payload); }
this.logger.info('Job completed successfully', { this.logger.info('Job completed successfully', {
id: job.id, id: job.id,
@ -171,27 +188,14 @@ export class QueueService {
throw error; throw error;
} }
} }
async addBulk(jobs: any[]): Promise<any[]> { async addBulk(jobs: any[]): Promise<any[]> {
return await this.queue.addBulk(jobs) return await this.queue.addBulk(jobs);
} }
private setupEventListeners() {
this.queueEvents.on('completed', (job) => {
this.logger.info('Job completed', { id: job.jobId });
});
this.queueEvents.on('failed', (job) => {
this.logger.error('Job failed', { id: job.jobId, error: job.failedReason });
});
// Note: Worker-specific events are already set up during worker creation
// No need for additional progress events since we handle them per-worker
}
private async setupScheduledTasks() { private async setupScheduledTasks() {
try { try {
this.logger.info('Setting up scheduled tasks from providers...'); this.logger.info('Setting up scheduled tasks from providers...');
// Get all scheduled jobs from all providers
const allScheduledJobs = providerRegistry.getAllScheduledJobs(); const allScheduledJobs = providerRegistry.getAllScheduledJobs();
if (allScheduledJobs.length === 0) { if (allScheduledJobs.length === 0) {
@ -199,46 +203,12 @@ export class QueueService {
return; return;
} }
// Get existing repeatable jobs for comparison
const existingJobs = await this.queue.getRepeatableJobs();
this.logger.info(`Found ${existingJobs.length} existing repeatable jobs`);
let successCount = 0; let successCount = 0;
let failureCount = 0; let failureCount = 0;
let updatedCount = 0;
let newCount = 0; // Process each scheduled job // Process each scheduled job - simplified without complex update logic
for (const { provider, job } of allScheduledJobs) { for (const { provider, job } of allScheduledJobs) {
try { try {
const jobKey = `${provider}-${job.operation}`;
// Check if this job already exists
const existingJob = existingJobs.find(existing =>
existing.key?.includes(jobKey) || existing.name === job.type
);
if (existingJob) {
// Check if the job needs updating (different cron pattern or config)
const needsUpdate = existingJob.pattern !== job.cronPattern;
if (needsUpdate) {
this.logger.info('Job configuration changed, updating', {
jobKey,
oldPattern: existingJob.pattern,
newPattern: job.cronPattern
});
updatedCount++;
} else {
this.logger.debug('Job unchanged, skipping', { jobKey });
successCount++;
continue;
}
} else {
newCount++;
}
// Add delay between job registrations
await new Promise(resolve => setTimeout(resolve, 100));
await this.addRecurringJob({ await this.addRecurringJob({
type: job.type, type: job.type,
provider: provider, provider: provider,
@ -272,9 +242,7 @@ export class QueueService {
this.logger.info(`Scheduled tasks setup complete`, { this.logger.info(`Scheduled tasks setup complete`, {
total: allScheduledJobs.length, total: allScheduledJobs.length,
successful: successCount, successful: successCount,
failed: failureCount, failed: failureCount
updated: updatedCount,
new: newCount
}); });
} catch (error) { } catch (error) {
@ -293,39 +261,14 @@ export class QueueService {
...options ...options
}); });
} }
async addRecurringJob(jobData: JobData, cronPattern: string, options?: any) { async addRecurringJob(jobData: JobData, cronPattern: string, options?: any) {
if (!this.isInitialized) { if (!this.isInitialized) {
throw new Error('Queue service not initialized. Call initialize() first.'); throw new Error('Queue service not initialized. Call initialize() first.');
} }
try { // Create a unique job key for this specific job const jobKey = `recurring-${jobData.provider}-${jobData.operation}`;
const jobKey = `${jobData.provider}-${jobData.operation}`;
// Get all existing repeatable jobs // Let BullMQ handle duplicate prevention with consistent jobId
const existingJobs = await this.queue.getRepeatableJobs();
// Find and remove the existing job with the same key if it exists
const existingJob = existingJobs.find(job => {
// Check if this is the same job by comparing key components
return job.key?.includes(jobKey) || job.name === jobData.type;
});
if (existingJob) {
this.logger.info('Updating existing recurring job', {
jobKey,
existingPattern: existingJob.pattern,
newPattern: cronPattern
}); // Remove the existing job
if (existingJob.key) {
await this.queue.removeRepeatableByKey(existingJob.key);
}
// Small delay to ensure cleanup is complete
await new Promise(resolve => setTimeout(resolve, 100));
} else {
this.logger.info('Creating new recurring job', { jobKey, cronPattern });
} // Add the new/updated recurring job
const jobType = jobData.type || `${jobData.provider}-${jobData.operation}`; const jobType = jobData.type || `${jobData.provider}-${jobData.operation}`;
const job = await this.queue.add(jobType, jobData, { const job = await this.queue.add(jobType, jobData, {
repeat: { repeat: {
@ -333,8 +276,7 @@ export class QueueService {
tz: 'UTC', tz: 'UTC',
immediately: jobData.immediately || false, immediately: jobData.immediately || false,
}, },
// Use a consistent jobId for this specific recurring job jobId: jobKey, // Consistent ID prevents duplicates
jobId: `recurring-${jobKey}`,
removeOnComplete: 1, removeOnComplete: 1,
removeOnFail: 1, removeOnFail: 1,
attempts: 2, attempts: 2,
@ -345,7 +287,7 @@ export class QueueService {
...options ...options
}); });
this.logger.info('Recurring job added/updated successfully', { this.logger.info('Recurring job added successfully', {
jobKey, jobKey,
type: jobData.type, type: jobData.type,
cronPattern, cronPattern,
@ -353,17 +295,7 @@ export class QueueService {
}); });
return job; return job;
} catch (error) {
this.logger.error('Failed to add/update recurring job', {
jobData,
cronPattern,
error: error instanceof Error ? error.message : String(error)
});
throw error;
} }
}
async getJobStats() { async getJobStats() {
if (!this.isInitialized) { if (!this.isInitialized) {
throw new Error('Queue service not initialized. Call initialize() first.'); throw new Error('Queue service not initialized. Call initialize() first.');
@ -386,8 +318,8 @@ export class QueueService {
} }
async drainQueue() { async drainQueue() {
if (!this.isInitialized) { if (this.isInitialized) {
await this.queue.drain() await this.queue.drain();
} }
} }
@ -398,7 +330,7 @@ export class QueueService {
const stats = await this.getJobStats(); const stats = await this.getJobStats();
return { return {
...stats, ...stats,
workers: this.getWorkerCount(), workers: this.workers.length,
totalConcurrency: this.getTotalConcurrency(), totalConcurrency: this.getTotalConcurrency(),
queue: this.queue.name, queue: this.queue.name,
connection: { connection: {
@ -409,11 +341,9 @@ export class QueueService {
} }
getWorkerCount() { getWorkerCount() {
if (!this.isInitialized) {
return 0;
}
return this.workers.length; return this.workers.length;
} }
getRegisteredProviders() { getRegisteredProviders() {
return providerRegistry.getProviders().map(({ key, config }) => ({ return providerRegistry.getProviders().map(({ key, config }) => ({
key, key,
@ -422,6 +352,7 @@ export class QueueService {
scheduledJobs: config.scheduledJobs?.length || 0 scheduledJobs: config.scheduledJobs?.length || 0
})); }));
} }
getScheduledJobsInfo() { getScheduledJobsInfo() {
return providerRegistry.getAllScheduledJobs().map(({ provider, job }) => ({ return providerRegistry.getAllScheduledJobs().map(({ provider, job }) => ({
id: `${provider}-${job.type}`, id: `${provider}-${job.type}`,
@ -434,11 +365,13 @@ export class QueueService {
immediately: job.immediately || false immediately: job.immediately || false
})); }));
} }
async shutdown() { async shutdown() {
if (!this.isInitialized) { if (!this.isInitialized) {
this.logger.warn('Queue service not initialized, nothing to shutdown'); this.logger.warn('Queue service not initialized, nothing to shutdown');
return; return;
} }
this.logger.info('Shutting down queue service'); this.logger.info('Shutting down queue service');
// Close all workers // Close all workers
@ -450,7 +383,6 @@ export class QueueService {
await this.queue.close(); await this.queue.close();
await this.queueEvents.close(); await this.queueEvents.close();
this.isInitialized = false;
this.logger.info('Queue service shutdown complete'); this.logger.info('Queue service shutdown complete');
} }
} }