fixed queue close

This commit is contained in:
Boki 2025-06-20 17:13:10 -04:00
parent c048e00d7f
commit dbfa80b2a2
2 changed files with 84 additions and 121 deletions

View file

@ -1,13 +1,13 @@
import { getLogger } from '@stock-bot/logger';
import { QueueRateLimiter } from './rate-limiter';
import { Queue, type QueueWorkerConfig } from './queue';
import { CacheProvider, createCache } from '@stock-bot/cache'; import { CacheProvider, createCache } from '@stock-bot/cache';
import type { import { getLogger } from '@stock-bot/logger';
QueueManagerConfig, import { Queue, type QueueWorkerConfig } from './queue';
QueueOptions, import { QueueRateLimiter } from './rate-limiter';
GlobalStats, import type {
GlobalStats,
QueueManagerConfig,
QueueOptions,
QueueStats, QueueStats,
RateLimitRule RateLimitRule,
} from './types'; } from './types';
import { getRedisConnection } from './utils'; import { getRedisConnection } from './utils';
@ -30,7 +30,7 @@ export class QueueManager {
private constructor(config: QueueManagerConfig) { private constructor(config: QueueManagerConfig) {
this.config = config; this.config = config;
this.redisConnection = getRedisConnection(config.redis); this.redisConnection = getRedisConnection(config.redis);
// Initialize rate limiter if rules are provided // Initialize rate limiter if rules are provided
if (config.rateLimitRules && config.rateLimitRules.length > 0) { if (config.rateLimitRules && config.rateLimitRules.length > 0) {
this.rateLimiter = new QueueRateLimiter(this.redisConnection); this.rateLimiter = new QueueRateLimiter(this.redisConnection);
@ -40,7 +40,7 @@ export class QueueManager {
} }
}); });
} }
logger.info('QueueManager singleton initialized', { logger.info('QueueManager singleton initialized', {
redis: `${config.redis.host}:${config.redis.port}`, redis: `${config.redis.host}:${config.redis.port}`,
}); });
@ -52,9 +52,7 @@ export class QueueManager {
*/ */
static getInstance(): QueueManager { static getInstance(): QueueManager {
if (!QueueManager.instance) { if (!QueueManager.instance) {
throw new Error( throw new Error('QueueManager not initialized. Call QueueManager.initialize(config) first.');
'QueueManager not initialized. Call QueueManager.initialize(config) first.'
);
} }
return QueueManager.instance; return QueueManager.instance;
} }
@ -80,14 +78,14 @@ export class QueueManager {
if (QueueManager.instance) { if (QueueManager.instance) {
return QueueManager.instance; return QueueManager.instance;
} }
if (!config) { if (!config) {
throw new Error( throw new Error(
'QueueManager not initialized and no config provided. ' + 'QueueManager not initialized and no config provided. ' +
'Either call initialize(config) first or provide config to getOrInitialize(config).' 'Either call initialize(config) first or provide config to getOrInitialize(config).'
); );
} }
return QueueManager.initialize(config); return QueueManager.initialize(config);
} }
@ -135,8 +133,8 @@ export class QueueManager {
}; };
const queue = new Queue( const queue = new Queue(
queueName, queueName,
this.config.redis, this.config.redis,
mergedOptions.defaultJobOptions || {}, mergedOptions.defaultJobOptions || {},
queueConfig queueConfig
); );
@ -158,10 +156,10 @@ export class QueueManager {
}); });
} }
logger.info('Queue created with batch cache', { logger.info('Queue created with batch cache', {
queueName, queueName,
workers: mergedOptions.workers || 0, workers: mergedOptions.workers || 0,
concurrency: mergedOptions.concurrency || 1 concurrency: mergedOptions.concurrency || 1,
}); });
return queue; return queue;
@ -232,7 +230,7 @@ export class QueueManager {
for (const [queueName, queue] of this.queues) { for (const [queueName, queue] of this.queues) {
const stats = await queue.getStats(); const stats = await queue.getStats();
queueStats[queueName] = stats; queueStats[queueName] = stats;
totalJobs += stats.waiting + stats.active + stats.completed + stats.failed + stats.delayed; totalJobs += stats.waiting + stats.active + stats.completed + stats.failed + stats.delayed;
totalWorkers += stats.workers || 0; totalWorkers += stats.workers || 0;
} }
@ -269,7 +267,11 @@ export class QueueManager {
/** /**
* Check rate limits for a job * Check rate limits for a job
*/ */
async checkRateLimit(queueName: string, handler: string, operation: string): Promise<{ async checkRateLimit(
queueName: string,
handler: string,
operation: string
): Promise<{
allowed: boolean; allowed: boolean;
retryAfter?: number; retryAfter?: number;
remainingPoints?: number; remainingPoints?: number;
@ -278,7 +280,7 @@ export class QueueManager {
if (!this.rateLimiter) { if (!this.rateLimiter) {
return { allowed: true }; return { allowed: true };
} }
return await this.rateLimiter.checkLimit(queueName, handler, operation); return await this.rateLimiter.checkLimit(queueName, handler, operation);
} }
@ -293,7 +295,7 @@ export class QueueManager {
operation, operation,
}; };
} }
return await this.rateLimiter.getStatus(queueName, handler, operation); return await this.rateLimiter.getStatus(queueName, handler, operation);
} }
@ -323,7 +325,7 @@ export class QueueManager {
if (!queue) { if (!queue) {
return false; return false;
} }
await queue.pause(); await queue.pause();
return true; return true;
} }
@ -336,7 +338,7 @@ export class QueueManager {
if (!queue) { if (!queue) {
return false; return false;
} }
await queue.resume(); await queue.resume();
return true; return true;
} }
@ -354,18 +356,17 @@ export class QueueManager {
* Clean all queues * Clean all queues
*/ */
async cleanAll( async cleanAll(
grace: number = 0, grace: number = 0,
limit: number = 100, limit: number = 100,
type: 'completed' | 'failed' = 'completed' type: 'completed' | 'failed' = 'completed'
): Promise<void> { ): Promise<void> {
const cleanPromises = Array.from(this.queues.values()).map(queue => const cleanPromises = Array.from(this.queues.values()).map(queue =>
queue.clean(grace, limit, type) queue.clean(grace, limit, type)
); );
await Promise.all(cleanPromises); await Promise.all(cleanPromises);
logger.info('All queues cleaned', { type, grace, limit }); logger.info('All queues cleaned', { type, grace, limit });
} }
/** /**
* Shutdown all queues and workers (thread-safe) * Shutdown all queues and workers (thread-safe)
*/ */
@ -393,15 +394,15 @@ export class QueueManager {
private async performShutdown(): Promise<void> { private async performShutdown(): Promise<void> {
try { try {
// Close all queues (this now includes workers since they're managed by Queue class) // Close all queues (this now includes workers since they're managed by Queue class)
const queueShutdownPromises = Array.from(this.queues.values()).map(async (queue) => { const queueShutdownPromises = Array.from(this.queues.values()).map(async queue => {
try { try {
// Add timeout to queue.close() to prevent hanging // Add timeout to queue.close() to prevent hanging
const closePromise = queue.close(); await queue.close();
const timeoutPromise = new Promise<never>((_, reject) => // const timeoutPromise = new Promise<never>((_, reject) =>
setTimeout(() => reject(new Error('Queue close timeout')), 100) // setTimeout(() => reject(new Error('Queue close timeout')), 100)
); // );
await Promise.race([closePromise, timeoutPromise]); // await Promise.race([closePromise, timeoutPromise]);
} catch (error) { } catch (error) {
logger.warn('Error closing queue', { error: (error as Error).message }); logger.warn('Error closing queue', { error: (error as Error).message });
} }
@ -410,7 +411,7 @@ export class QueueManager {
await Promise.all(queueShutdownPromises); await Promise.all(queueShutdownPromises);
// Close all caches // Close all caches
const cacheShutdownPromises = Array.from(this.caches.values()).map(async (cache) => { const cacheShutdownPromises = Array.from(this.caches.values()).map(async cache => {
try { try {
// Clear cache before shutdown // Clear cache before shutdown
await cache.clear(); await cache.clear();

View file

@ -1,4 +1,4 @@
import { Queue as BullQueue, Worker, QueueEvents, type Job } from 'bullmq'; import { Queue as BullQueue, QueueEvents, Worker, type Job } from 'bullmq';
import { getLogger } from '@stock-bot/logger'; import { getLogger } from '@stock-bot/logger';
import { handlerRegistry } from './handler-registry'; import { handlerRegistry } from './handler-registry';
import type { JobData, JobOptions, QueueStats, RedisConfig } from './types'; import type { JobData, JobOptions, QueueStats, RedisConfig } from './types';
@ -24,14 +24,14 @@ export class Queue {
private redisConfig: RedisConfig; private redisConfig: RedisConfig;
constructor( constructor(
queueName: string, queueName: string,
redisConfig: RedisConfig, redisConfig: RedisConfig,
defaultJobOptions: JobOptions = {}, defaultJobOptions: JobOptions = {},
config: QueueWorkerConfig = {} config: QueueWorkerConfig = {}
) { ) {
this.queueName = queueName; this.queueName = queueName;
this.redisConfig = redisConfig; this.redisConfig = redisConfig;
const connection = getRedisConnection(redisConfig); const connection = getRedisConnection(redisConfig);
// Initialize BullMQ queue // Initialize BullMQ queue
@ -59,10 +59,10 @@ export class Queue {
this.startWorkers(config.workers, config.concurrency || 1); this.startWorkers(config.workers, config.concurrency || 1);
} }
logger.trace('Queue created', { logger.trace('Queue created', {
queueName, queueName,
workers: config.workers || 0, workers: config.workers || 0,
concurrency: config.concurrency || 1 concurrency: config.concurrency || 1,
}); });
} }
@ -84,12 +84,10 @@ export class Queue {
/** /**
* Add multiple jobs to the queue in bulk * Add multiple jobs to the queue in bulk
*/ */
async addBulk( async addBulk(jobs: Array<{ name: string; data: JobData; opts?: JobOptions }>): Promise<Job[]> {
jobs: Array<{ name: string; data: JobData; opts?: JobOptions }> logger.trace('Adding bulk jobs', {
): Promise<Job[]> { queueName: this.queueName,
logger.trace('Adding bulk jobs', { jobCount: jobs.length,
queueName: this.queueName,
jobCount: jobs.length
}); });
return await this.bullQueue.addBulk(jobs); return await this.bullQueue.addBulk(jobs);
} }
@ -98,9 +96,9 @@ export class Queue {
* Add a scheduled job with cron-like pattern * Add a scheduled job with cron-like pattern
*/ */
async addScheduledJob( async addScheduledJob(
name: string, name: string,
data: JobData, data: JobData,
cronPattern: string, cronPattern: string,
options: JobOptions = {} options: JobOptions = {}
): Promise<Job> { ): Promise<Job> {
const scheduledOptions: JobOptions = { const scheduledOptions: JobOptions = {
@ -112,15 +110,15 @@ export class Queue {
...options.repeat, ...options.repeat,
}, },
}; };
logger.info('Adding scheduled job', { logger.info('Adding scheduled job', {
queueName: this.queueName, queueName: this.queueName,
jobName: name, jobName: name,
cronPattern, cronPattern,
repeatKey: scheduledOptions.repeat?.key, repeatKey: scheduledOptions.repeat?.key,
immediately: scheduledOptions.repeat?.immediately immediately: scheduledOptions.repeat?.immediately,
}); });
return await this.bullQueue.add(name, data, scheduledOptions); return await this.bullQueue.add(name, data, scheduledOptions);
} }
@ -195,8 +193,8 @@ export class Queue {
* Clean completed and failed jobs * Clean completed and failed jobs
*/ */
async clean( async clean(
grace: number = 0, grace: number = 0,
limit: number = 100, limit: number = 100,
type: 'completed' | 'failed' = 'completed' type: 'completed' | 'failed' = 'completed'
): Promise<void> { ): Promise<void> {
await this.bullQueue.clean(grace, limit, type); await this.bullQueue.clean(grace, limit, type);
@ -210,62 +208,30 @@ export class Queue {
await this.bullQueue.waitUntilReady(); await this.bullQueue.waitUntilReady();
} }
/**
* Close the queue (cleanup resources)
*/
/** /**
* Close the queue (cleanup resources) * Close the queue (cleanup resources)
*/ */
async close(): Promise<void> { async close(): Promise<void> {
try { try {
// Close workers first with timeout // Close the queue itself
if (this.workers.length > 0) { await this.bullQueue.close();
const workerClosePromises = this.workers.map((worker) => { logger.info('Queue closed', { queueName: this.queueName });
return new Promise<void>((resolve) => {
const timeout = setTimeout(() => {
resolve();
}, 50);
worker.close().then(() => {
clearTimeout(timeout);
resolve();
}).catch(() => {
clearTimeout(timeout);
resolve();
});
});
});
await Promise.all(workerClosePromises);
this.workers = [];
logger.debug('Workers closed', { queueName: this.queueName });
}
// Close queue events with timeout // Close queue events
if (this.queueEvents) { if (this.queueEvents) {
const eventsClosePromise = this.queueEvents.close(); await this.queueEvents.close();
const eventsTimeoutPromise = new Promise<never>((_, reject) =>
setTimeout(() => reject(new Error('Queue events close timeout')), 50)
);
try {
await Promise.race([eventsClosePromise, eventsTimeoutPromise]);
} catch (error) {
// Silently ignore timeout
}
logger.debug('Queue events closed', { queueName: this.queueName }); logger.debug('Queue events closed', { queueName: this.queueName });
} }
// Close the queue itself with timeout // Close workers first
const queueClosePromise = this.bullQueue.close(); if (this.workers.length > 0) {
const queueTimeoutPromise = new Promise<never>((_, reject) => await Promise.all(this.workers.map(worker => worker.close()));
setTimeout(() => reject(new Error('BullQueue close timeout')), 50) this.workers = [];
); logger.debug('Workers closed', { queueName: this.queueName });
try {
await Promise.race([queueClosePromise, queueTimeoutPromise]);
} catch (error) {
// Silently ignore timeout
} }
logger.info('Queue closed', { queueName: this.queueName });
} catch (error) { } catch (error) {
logger.error('Error closing queue', { queueName: this.queueName, error }); logger.error('Error closing queue', { queueName: this.queueName, error });
throw error; throw error;
@ -279,19 +245,15 @@ export class Queue {
const connection = getRedisConnection(this.redisConfig); const connection = getRedisConnection(this.redisConfig);
for (let i = 0; i < workerCount; i++) { for (let i = 0; i < workerCount; i++) {
const worker = new Worker( const worker = new Worker(`{${this.queueName}}`, this.processJob.bind(this), {
`{${this.queueName}}`, connection,
this.processJob.bind(this), concurrency,
{ maxStalledCount: 3,
connection, stalledInterval: 30000,
concurrency, });
maxStalledCount: 3,
stalledInterval: 30000,
}
);
// Setup worker event handlers // Setup worker event handlers
worker.on('completed', (job) => { worker.on('completed', job => {
logger.trace('Job completed', { logger.trace('Job completed', {
queueName: this.queueName, queueName: this.queueName,
jobId: job.id, jobId: job.id,
@ -310,7 +272,7 @@ export class Queue {
}); });
}); });
worker.on('error', (error) => { worker.on('error', error => {
logger.error('Worker error', { logger.error('Worker error', {
queueName: this.queueName, queueName: this.queueName,
workerId: i, workerId: i,
@ -385,4 +347,4 @@ export class Queue {
getBullQueue(): BullQueue { getBullQueue(): BullQueue {
return this.bullQueue; return this.bullQueue;
} }
} }