From c048e00d7f65e7e4f490259e6d1075c26bf636f5 Mon Sep 17 00:00:00 2001 From: Boki Date: Fri, 20 Jun 2025 17:09:49 -0400 Subject: [PATCH] test --- apps/data-service/src/index.ts | 3 +-- libs/queue/src/queue-manager.ts | 8 +++++- libs/queue/src/queue.ts | 47 ++++++++++++++++++++++++++++----- libs/shutdown/src/shutdown.ts | 2 +- 4 files changed, 50 insertions(+), 10 deletions(-) diff --git a/apps/data-service/src/index.ts b/apps/data-service/src/index.ts index 34def46..17167ff 100644 --- a/apps/data-service/src/index.ts +++ b/apps/data-service/src/index.ts @@ -243,9 +243,8 @@ shutdown.onShutdown(async () => { shutdown.onShutdown(async () => { try { await shutdownLoggers(); - // process.stdout.write('Data service loggers shut down\n'); } catch (error) { - process.stderr.write(`Error shutting down loggers: ${error}\n`); + // Silently ignore logger shutdown errors } }); diff --git a/libs/queue/src/queue-manager.ts b/libs/queue/src/queue-manager.ts index 2b812ed..ab63d01 100644 --- a/libs/queue/src/queue-manager.ts +++ b/libs/queue/src/queue-manager.ts @@ -395,7 +395,13 @@ export class QueueManager { // Close all queues (this now includes workers since they're managed by Queue class) const queueShutdownPromises = Array.from(this.queues.values()).map(async (queue) => { try { - await queue.close(); + // Add timeout to queue.close() to prevent hanging + const closePromise = queue.close(); + const timeoutPromise = new Promise((_, reject) => + setTimeout(() => reject(new Error('Queue close timeout')), 100) + ); + + await Promise.race([closePromise, timeoutPromise]); } catch (error) { logger.warn('Error closing queue', { error: (error as Error).message }); } diff --git a/libs/queue/src/queue.ts b/libs/queue/src/queue.ts index 23302a8..734e7d4 100644 --- a/libs/queue/src/queue.ts +++ b/libs/queue/src/queue.ts @@ -215,21 +215,56 @@ export class Queue { */ async close(): Promise { try { - // Close workers first + // Close workers first with timeout if (this.workers.length > 0) { - await Promise.all(this.workers.map(worker => worker.close())); + const workerClosePromises = this.workers.map((worker) => { + return new Promise((resolve) => { + const timeout = setTimeout(() => { + resolve(); + }, 50); + + worker.close().then(() => { + clearTimeout(timeout); + resolve(); + }).catch(() => { + clearTimeout(timeout); + resolve(); + }); + }); + }); + + await Promise.all(workerClosePromises); this.workers = []; logger.debug('Workers closed', { queueName: this.queueName }); } - // Close queue events + // Close queue events with timeout if (this.queueEvents) { - await this.queueEvents.close(); + const eventsClosePromise = this.queueEvents.close(); + const eventsTimeoutPromise = new Promise((_, reject) => + setTimeout(() => reject(new Error('Queue events close timeout')), 50) + ); + + try { + await Promise.race([eventsClosePromise, eventsTimeoutPromise]); + } catch (error) { + // Silently ignore timeout + } logger.debug('Queue events closed', { queueName: this.queueName }); } - // Close the queue itself - await this.bullQueue.close(); + // Close the queue itself with timeout + const queueClosePromise = this.bullQueue.close(); + const queueTimeoutPromise = new Promise((_, reject) => + setTimeout(() => reject(new Error('BullQueue close timeout')), 50) + ); + + try { + await Promise.race([queueClosePromise, queueTimeoutPromise]); + } catch (error) { + // Silently ignore timeout + } + logger.info('Queue closed', { queueName: this.queueName }); } catch (error) { logger.error('Error closing queue', { queueName: this.queueName, error }); diff --git a/libs/shutdown/src/shutdown.ts b/libs/shutdown/src/shutdown.ts index e8ffdbc..2223efa 100644 --- a/libs/shutdown/src/shutdown.ts +++ b/libs/shutdown/src/shutdown.ts @@ -171,7 +171,7 @@ export class Shutdown { process.platform === 'win32' ? ['SIGINT', 'SIGTERM'] : ['SIGTERM', 'SIGINT', 'SIGUSR2']; signals.forEach(signal => { - process.on(signal, () => { + process.once(signal, () => { // Changed from 'on' to 'once' to prevent multiple handlers this.shutdownAndExit(signal).catch(() => { process.exit(1); });