fixed queue and finished initial qm work

This commit is contained in:
Boki 2025-06-28 08:31:59 -04:00
parent 52436c69b2
commit 00fbd31364
4 changed files with 45 additions and 56 deletions

View file

@ -443,12 +443,12 @@ export class QueueManager {
const queueShutdownPromises = Array.from(this.queues.values()).map(async queue => {
try {
// Add timeout to queue.close() to prevent hanging
await queue.close();
// const timeoutPromise = new Promise<never>((_, reject) =>
// setTimeout(() => reject(new Error('Queue close timeout')), 100)
// );
const closePromise = queue.close();
const timeoutPromise = new Promise<never>((_, reject) =>
setTimeout(() => reject(new Error('Queue close timeout')), 5000) // 5 second timeout per queue
);
// await Promise.race([closePromise, timeoutPromise]);
await Promise.race([closePromise, timeoutPromise]);
} catch (error) {
this.logger.warn('Error closing queue', { error: (error as Error).message });
}

View file

@ -264,26 +264,49 @@ export class Queue {
*/
async close(): Promise<void> {
try {
// Close the queue itself
await this.bullQueue.close();
this.logger.info('Queue closed', { queueName: this.queueName });
// Close queue events
if (this.queueEvents) {
await this.queueEvents.close();
this.logger.debug('Queue events closed', { queueName: this.queueName });
}
// Close workers first
// Close workers first with timeout
if (this.workers.length > 0) {
await Promise.all(
this.workers.map(async worker => {
return await worker.close();
})
);
const workerClosePromises = this.workers.map(async (worker, idx) => {
try {
const closePromise = worker.close();
const timeoutPromise = new Promise<void>((resolve) => {
setTimeout(() => {
this.logger.warn('Worker close timeout, forcing closure', {
queueName: this.queueName,
workerId: idx
});
resolve();
}, 2000); // 2 second timeout per worker
});
await Promise.race([closePromise, timeoutPromise]);
} catch (error) {
this.logger.warn('Error closing worker', {
queueName: this.queueName,
workerId: idx,
error
});
}
});
await Promise.all(workerClosePromises);
this.workers = [];
this.logger.debug('Workers closed', { queueName: this.queueName });
}
// Close queue events
if (this.queueEvents) {
try {
await this.queueEvents.close();
this.logger.debug('Queue events closed', { queueName: this.queueName });
} catch (error) {
this.logger.warn('Error closing queue events', { queueName: this.queueName, error });
}
}
// Close the queue itself
await this.bullQueue.close();
this.logger.info('Queue closed', { queueName: this.queueName });
} catch (error) {
this.logger.error('Error closing queue', { queueName: this.queueName, error });
throw error;