stock-bot/libs/queue/src/queue.ts

323 lines
No EOL
8.4 KiB
TypeScript

import { Queue as BullQueue, Worker, QueueEvents, type Job } from 'bullmq';
import { getLogger } from '@stock-bot/logger';
import { handlerRegistry } from './handler-registry';
import type { JobData, JobOptions, QueueStats, RedisConfig } from './types';
import { getRedisConnection } from './utils';
const logger = getLogger('queue');
export interface QueueWorkerConfig {
workers?: number;
concurrency?: number;
startWorker?: boolean;
}
/**
* Consolidated Queue class that handles both job operations and optional worker management
* Can be used as a simple job queue or with workers for automatic processing
*/
export class Queue {
private bullQueue: BullQueue;
private workers: Worker[] = [];
private queueEvents?: QueueEvents;
private queueName: string;
private redisConfig: RedisConfig;
constructor(
queueName: string,
redisConfig: RedisConfig,
defaultJobOptions: JobOptions = {},
config: QueueWorkerConfig = {}
) {
this.queueName = queueName;
this.redisConfig = redisConfig;
const connection = getRedisConnection(redisConfig);
// Initialize BullMQ queue
this.bullQueue = new BullQueue(`{${queueName}}`, {
connection,
defaultJobOptions: {
removeOnComplete: 10,
removeOnFail: 5,
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
...defaultJobOptions,
},
});
// Initialize queue events if workers will be used
if (config.workers && config.workers > 0) {
this.queueEvents = new QueueEvents(`{${queueName}}`, { connection });
}
// Start workers if requested and not explicitly disabled
if (config.workers && config.workers > 0 && config.startWorker !== false) {
this.startWorkers(config.workers, config.concurrency || 1);
}
logger.debug('Queue created', {
queueName,
workers: config.workers || 0,
concurrency: config.concurrency || 1
});
}
/**
* Get the queue name
*/
getName(): string {
return this.queueName;
}
/**
* Add a single job to the queue
*/
async add(name: string, data: JobData, options: JobOptions = {}): Promise<Job> {
logger.debug('Adding job', { queueName: this.queueName, jobName: name });
return await this.bullQueue.add(name, data, options);
}
/**
* Add multiple jobs to the queue in bulk
*/
async addBulk(
jobs: Array<{ name: string; data: JobData; opts?: JobOptions }>
): Promise<Job[]> {
logger.debug('Adding bulk jobs', {
queueName: this.queueName,
jobCount: jobs.length
});
return await this.bullQueue.addBulk(jobs);
}
/**
* Get queue statistics
*/
async getStats(): Promise<QueueStats> {
const [waiting, active, completed, failed, delayed] = await Promise.all([
this.bullQueue.getWaiting(),
this.bullQueue.getActive(),
this.bullQueue.getCompleted(),
this.bullQueue.getFailed(),
this.bullQueue.getDelayed(),
]);
const isPaused = await this.bullQueue.isPaused();
return {
waiting: waiting.length,
active: active.length,
completed: completed.length,
failed: failed.length,
delayed: delayed.length,
paused: isPaused,
workers: this.workers.length,
};
}
/**
* Get a specific job by ID
*/
async getJob(jobId: string): Promise<Job | undefined> {
return await this.bullQueue.getJob(jobId);
}
/**
* Get jobs by state
*/
async getJobs(
states: Array<'waiting' | 'active' | 'completed' | 'failed' | 'delayed'>,
start = 0,
end = 100
): Promise<Job[]> {
return await this.bullQueue.getJobs(states, start, end);
}
/**
* Pause the queue (stops processing new jobs)
*/
async pause(): Promise<void> {
await this.bullQueue.pause();
logger.info('Queue paused', { queueName: this.queueName });
}
/**
* Resume the queue
*/
async resume(): Promise<void> {
await this.bullQueue.resume();
logger.info('Queue resumed', { queueName: this.queueName });
}
/**
* Drain the queue (remove all jobs)
*/
async drain(delayed = false): Promise<void> {
await this.bullQueue.drain(delayed);
logger.info('Queue drained', { queueName: this.queueName, delayed });
}
/**
* Clean completed and failed jobs
*/
async clean(
grace: number = 0,
limit: number = 100,
type: 'completed' | 'failed' = 'completed'
): Promise<void> {
await this.bullQueue.clean(grace, limit, type);
logger.debug('Queue cleaned', { queueName: this.queueName, type, grace, limit });
}
/**
* Wait until the queue is ready
*/
async waitUntilReady(): Promise<void> {
await this.bullQueue.waitUntilReady();
}
/**
* Close the queue (cleanup resources)
*/
async close(): Promise<void> {
try {
// Close workers first
if (this.workers.length > 0) {
await Promise.all(this.workers.map(worker => worker.close()));
this.workers = [];
logger.debug('Workers closed', { queueName: this.queueName });
}
// Close queue events
if (this.queueEvents) {
await this.queueEvents.close();
logger.debug('Queue events closed', { queueName: this.queueName });
}
// Close the queue itself
await this.bullQueue.close();
logger.info('Queue closed', { queueName: this.queueName });
} catch (error) {
logger.error('Error closing queue', { queueName: this.queueName, error });
throw error;
}
}
/**
* Start workers for this queue
*/
private startWorkers(workerCount: number, concurrency: number): void {
const connection = getRedisConnection(this.redisConfig);
for (let i = 0; i < workerCount; i++) {
const worker = new Worker(
`{${this.queueName}}`,
this.processJob.bind(this),
{
connection,
concurrency,
maxStalledCount: 3,
stalledInterval: 30000,
}
);
// Setup worker event handlers
worker.on('completed', (job) => {
logger.debug('Job completed', {
queueName: this.queueName,
jobId: job.id,
handler: job.data?.handler,
operation: job.data?.operation,
});
});
worker.on('failed', (job, err) => {
logger.error('Job failed', {
queueName: this.queueName,
jobId: job?.id,
handler: job?.data?.handler,
operation: job?.data?.operation,
error: err.message,
});
});
worker.on('error', (error) => {
logger.error('Worker error', {
queueName: this.queueName,
workerId: i,
error: error.message,
});
});
this.workers.push(worker);
}
logger.info('Workers started', {
queueName: this.queueName,
workerCount,
concurrency,
});
}
/**
* Process a job using the handler registry
*/
private async processJob(job: Job): Promise<unknown> {
const { handler, operation, payload }: JobData = job.data;
logger.debug('Processing job', {
id: job.id,
handler,
operation,
queueName: this.queueName,
});
try {
// Look up handler in registry
const jobHandler = handlerRegistry.getHandler(handler, operation);
if (!jobHandler) {
throw new Error(`No handler found for ${handler}:${operation}`);
}
const result = await jobHandler(payload);
logger.debug('Job completed successfully', {
id: job.id,
handler,
operation,
queueName: this.queueName,
});
return result;
} catch (error) {
logger.error('Job processing failed', {
id: job.id,
handler,
operation,
queueName: this.queueName,
error: error instanceof Error ? error.message : String(error),
});
throw error;
}
}
/**
* Get the number of active workers
*/
getWorkerCount(): number {
return this.workers.length;
}
/**
* Get the underlying BullMQ queue (for advanced operations)
* @deprecated Use direct methods instead
*/
getBullQueue(): BullQueue {
return this.bullQueue;
}
}