stock-bot/apps/data-service/src/services/queue.service.ts
2025-06-12 08:03:09 -04:00

419 lines
12 KiB
TypeScript

import { Queue, QueueEvents, Worker, type Job } from 'bullmq';
import { getLogger } from '@stock-bot/logger';
import { providerRegistry, type JobData } from './provider-registry.service';
export class QueueService {
private logger = getLogger('queue-service');
private queue!: Queue;
private workers: Worker[] = [];
private queueEvents!: QueueEvents;
private config = {
workers: parseInt(process.env.WORKER_COUNT || '5'),
concurrency: parseInt(process.env.WORKER_CONCURRENCY || '20'),
redis: {
host: process.env.DRAGONFLY_HOST || 'localhost',
port: parseInt(process.env.DRAGONFLY_PORT || '6379'),
},
};
private get isInitialized() {
return !!this.queue;
}
constructor() {
// Don't initialize in constructor to allow for proper async initialization
}
async initialize() {
if (this.isInitialized) {
this.logger.warn('Queue service already initialized');
return;
}
this.logger.info('Initializing queue service...');
try {
// Step 1: Register providers
await this.registerProviders();
// Step 2: Setup queue and workers
const connection = this.getConnection();
const queueName = '{data-service-queue}';
this.queue = new Queue(queueName, {
connection,
defaultJobOptions: {
removeOnComplete: 10,
removeOnFail: 5,
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
},
});
this.queueEvents = new QueueEvents(queueName, { connection });
// Step 3: Create workers
const { workerCount, totalConcurrency } = this.createWorkers(queueName, connection);
// Step 4: Wait for readiness (parallel)
await Promise.all([
this.queue.waitUntilReady(),
this.queueEvents.waitUntilReady(),
...this.workers.map(worker => worker.waitUntilReady()),
]);
// Step 5: Setup events and scheduled tasks
this.setupQueueEvents();
await this.setupScheduledTasks();
this.logger.info('Queue service initialized successfully', {
workers: workerCount,
totalConcurrency,
});
} catch (error) {
this.logger.error('Failed to initialize queue service', { error });
throw error;
}
}
private getConnection() {
return {
...this.config.redis,
maxRetriesPerRequest: null,
retryDelayOnFailover: 100,
lazyConnect: false,
};
}
private createWorkers(queueName: string, connection: any) {
for (let i = 0; i < this.config.workers; i++) {
const worker = new Worker(queueName, this.processJob.bind(this), {
connection: { ...connection },
concurrency: this.config.concurrency,
maxStalledCount: 1,
stalledInterval: 30000,
});
// Setup events inline
worker.on('ready', () => this.logger.info(`Worker ${i + 1} ready`));
worker.on('error', error => this.logger.error(`Worker ${i + 1} error`, { error }));
this.workers.push(worker);
}
return {
workerCount: this.config.workers,
totalConcurrency: this.config.workers * this.config.concurrency,
};
}
private setupQueueEvents() {
// Add comprehensive logging to see job flow
this.queueEvents.on('added', job => {
this.logger.debug('Job added to queue', {
id: job.jobId,
});
});
this.queueEvents.on('waiting', job => {
this.logger.debug('Job moved to waiting', {
id: job.jobId,
});
});
this.queueEvents.on('active', job => {
this.logger.debug('Job became active', {
id: job.jobId,
});
});
this.queueEvents.on('delayed', job => {
this.logger.debug('Job delayed', {
id: job.jobId,
delay: job.delay,
});
});
this.queueEvents.on('completed', job => {
this.logger.debug('Job completed', {
id: job.jobId,
});
});
this.queueEvents.on('failed', (job, error) => {
this.logger.debug('Job failed', {
id: job.jobId,
error: String(error),
});
});
}
private async registerProviders() {
this.logger.info('Registering providers...');
try {
// Define providers to register
const providers = [
{ module: '../providers/proxy.provider', export: 'proxyProvider' },
{ module: '../providers/ib.provider', export: 'ibProvider' },
// { module: '../providers/yahoo.provider', export: 'yahooProvider' },
];
// Import and register all providers
for (const { module, export: exportName } of providers) {
const providerModule = await import(module);
providerRegistry.registerProvider(providerModule[exportName]);
}
this.logger.info('All providers registered successfully');
} catch (error) {
this.logger.error('Failed to register providers', { error });
throw error;
}
}
private async processJob(job: Job) {
const { provider, operation, payload }: JobData = job.data;
this.logger.info('Processing job', {
id: job.id,
provider,
operation,
payloadKeys: Object.keys(payload || {}),
});
try {
let result;
if (operation === 'process-batch-items') {
// Special handling for batch processing - requires 2 parameters
const { processBatchJob } = await import('../utils/batch-helpers');
result = await processBatchJob(payload, this);
} else {
// Regular handler lookup - requires 1 parameter
const handler = providerRegistry.getHandler(provider, operation);
if (!handler) {
throw new Error(`No handler found for ${provider}:${operation}`);
}
result = await handler(payload);
}
this.logger.info('Job completed successfully', {
id: job.id,
provider,
operation,
});
return result;
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
this.logger.error('Job failed', {
id: job.id,
provider,
operation,
error: errorMessage,
});
throw error;
}
}
async addBulk(jobs: any[]): Promise<any[]> {
return await this.queue.addBulk(jobs);
}
private getTotalConcurrency() {
return this.workers.reduce((total, worker) => total + (worker.opts.concurrency || 1), 0);
}
private async setupScheduledTasks() {
const allScheduledJobs = providerRegistry.getAllScheduledJobs();
if (allScheduledJobs.length === 0) {
this.logger.warn('No scheduled jobs found in providers');
return;
}
this.logger.info('Setting up scheduled tasks...', { count: allScheduledJobs.length });
// Use Promise.allSettled for parallel processing + better error handling
const results = await Promise.allSettled(
allScheduledJobs.map(async ({ provider, job }) => {
await this.addRecurringJob(
{
type: job.type,
provider,
operation: job.operation,
payload: job.payload,
priority: job.priority,
immediately: job.immediately || false,
},
job.cronPattern
);
return { provider, operation: job.operation };
})
);
// Log results
const successful = results.filter(r => r.status === 'fulfilled');
const failed = results.filter(r => r.status === 'rejected');
if (failed.length > 0) {
failed.forEach((result, index) => {
const { provider, job } = allScheduledJobs[index];
this.logger.error('Failed to register scheduled job', {
provider,
operation: job.operation,
error: result.reason,
});
});
}
this.logger.info('Scheduled tasks setup complete', {
successful: successful.length,
failed: failed.length,
});
}
private async addJobInternal(jobData: JobData, options: any = {}) {
if (!this.isInitialized) {
throw new Error('Queue service not initialized');
}
const jobType = jobData.type || `${jobData.provider}-${jobData.operation}`;
return this.queue.add(jobType, jobData, {
priority: jobData.priority || undefined,
removeOnComplete: 10,
removeOnFail: 5,
...options,
});
}
async addJob(jobData: JobData, options?: any) {
return this.addJobInternal(jobData, options);
}
async addRecurringJob(jobData: JobData, cronPattern: string, options?: any) {
const jobKey = `recurring-${jobData.provider}-${jobData.operation}`;
return this.addJobInternal(jobData, {
repeat: {
pattern: cronPattern,
tz: 'UTC',
immediately: jobData.immediately || false,
},
jobId: jobKey,
removeOnComplete: 1,
removeOnFail: 1,
attempts: 2,
backoff: {
type: 'fixed',
delay: 5000,
},
...options,
});
}
async getJobStats() {
if (!this.isInitialized) {
throw new Error('Queue service not initialized. Call initialize() first.');
}
const [waiting, active, completed, failed, delayed] = await Promise.all([
this.queue.getWaiting(),
this.queue.getActive(),
this.queue.getCompleted(),
this.queue.getFailed(),
this.queue.getDelayed(),
]);
return {
waiting: waiting.length,
active: active.length,
completed: completed.length,
failed: failed.length,
delayed: delayed.length,
};
}
async drainQueue() {
if (this.isInitialized) {
await this.queue.drain();
}
}
async getQueueStatus() {
if (!this.isInitialized) {
throw new Error('Queue service not initialized');
}
const stats = await this.getJobStats();
return {
...stats,
workers: this.workers.length,
concurrency: this.getTotalConcurrency(),
};
}
async shutdown() {
if (!this.isInitialized) {
this.logger.warn('Queue service not initialized, nothing to shutdown');
return;
}
this.logger.info('Shutting down queue service gracefully...');
try {
// Step 1: Stop accepting new jobs and wait for current jobs to finish
this.logger.debug('Closing workers gracefully...');
const workerClosePromises = this.workers.map(async (worker, index) => {
this.logger.debug(`Closing worker ${index + 1}/${this.workers.length}`);
try {
// Wait for current jobs to finish, then close
await Promise.race([
worker.close(),
new Promise((_, reject) =>
setTimeout(() => reject(new Error(`Worker ${index + 1} close timeout`)), 5000)
),
]);
this.logger.debug(`Worker ${index + 1} closed successfully`);
} catch (error) {
this.logger.error(`Failed to close worker ${index + 1}`, { error });
// Force close if graceful close fails
await worker.close(true);
}
});
await Promise.allSettled(workerClosePromises);
this.logger.debug('All workers closed');
// Step 2: Close queue and events with timeout protection
this.logger.debug('Closing queue and events...');
await Promise.allSettled([
Promise.race([
this.queue.close(),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Queue close timeout')), 3000)
),
]).catch(error => this.logger.error('Queue close error', { error })),
Promise.race([
this.queueEvents.close(),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('QueueEvents close timeout')), 3000)
),
]).catch(error => this.logger.error('QueueEvents close error', { error })),
]);
this.logger.info('Queue service shutdown completed successfully');
} catch (error) {
this.logger.error('Error during queue service shutdown', { error });
// Force close everything as last resort
try {
await Promise.allSettled([
...this.workers.map(worker => worker.close(true)),
this.queue.close(),
this.queueEvents.close(),
]);
} catch (forceCloseError) {
this.logger.error('Force close also failed', { error: forceCloseError });
}
throw error;
}
}
}
export const queueManager = new QueueService();