test
This commit is contained in:
parent
afa381e390
commit
c048e00d7f
4 changed files with 50 additions and 10 deletions
|
|
@ -243,9 +243,8 @@ shutdown.onShutdown(async () => {
|
||||||
shutdown.onShutdown(async () => {
|
shutdown.onShutdown(async () => {
|
||||||
try {
|
try {
|
||||||
await shutdownLoggers();
|
await shutdownLoggers();
|
||||||
// process.stdout.write('Data service loggers shut down\n');
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
process.stderr.write(`Error shutting down loggers: ${error}\n`);
|
// Silently ignore logger shutdown errors
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -395,7 +395,13 @@ export class QueueManager {
|
||||||
// Close all queues (this now includes workers since they're managed by Queue class)
|
// Close all queues (this now includes workers since they're managed by Queue class)
|
||||||
const queueShutdownPromises = Array.from(this.queues.values()).map(async (queue) => {
|
const queueShutdownPromises = Array.from(this.queues.values()).map(async (queue) => {
|
||||||
try {
|
try {
|
||||||
await queue.close();
|
// Add timeout to queue.close() to prevent hanging
|
||||||
|
const closePromise = queue.close();
|
||||||
|
const timeoutPromise = new Promise<never>((_, reject) =>
|
||||||
|
setTimeout(() => reject(new Error('Queue close timeout')), 100)
|
||||||
|
);
|
||||||
|
|
||||||
|
await Promise.race([closePromise, timeoutPromise]);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.warn('Error closing queue', { error: (error as Error).message });
|
logger.warn('Error closing queue', { error: (error as Error).message });
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -215,21 +215,56 @@ export class Queue {
|
||||||
*/
|
*/
|
||||||
async close(): Promise<void> {
|
async close(): Promise<void> {
|
||||||
try {
|
try {
|
||||||
// Close workers first
|
// Close workers first with timeout
|
||||||
if (this.workers.length > 0) {
|
if (this.workers.length > 0) {
|
||||||
await Promise.all(this.workers.map(worker => worker.close()));
|
const workerClosePromises = this.workers.map((worker) => {
|
||||||
|
return new Promise<void>((resolve) => {
|
||||||
|
const timeout = setTimeout(() => {
|
||||||
|
resolve();
|
||||||
|
}, 50);
|
||||||
|
|
||||||
|
worker.close().then(() => {
|
||||||
|
clearTimeout(timeout);
|
||||||
|
resolve();
|
||||||
|
}).catch(() => {
|
||||||
|
clearTimeout(timeout);
|
||||||
|
resolve();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
await Promise.all(workerClosePromises);
|
||||||
this.workers = [];
|
this.workers = [];
|
||||||
logger.debug('Workers closed', { queueName: this.queueName });
|
logger.debug('Workers closed', { queueName: this.queueName });
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close queue events
|
// Close queue events with timeout
|
||||||
if (this.queueEvents) {
|
if (this.queueEvents) {
|
||||||
await this.queueEvents.close();
|
const eventsClosePromise = this.queueEvents.close();
|
||||||
|
const eventsTimeoutPromise = new Promise<never>((_, reject) =>
|
||||||
|
setTimeout(() => reject(new Error('Queue events close timeout')), 50)
|
||||||
|
);
|
||||||
|
|
||||||
|
try {
|
||||||
|
await Promise.race([eventsClosePromise, eventsTimeoutPromise]);
|
||||||
|
} catch (error) {
|
||||||
|
// Silently ignore timeout
|
||||||
|
}
|
||||||
logger.debug('Queue events closed', { queueName: this.queueName });
|
logger.debug('Queue events closed', { queueName: this.queueName });
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close the queue itself
|
// Close the queue itself with timeout
|
||||||
await this.bullQueue.close();
|
const queueClosePromise = this.bullQueue.close();
|
||||||
|
const queueTimeoutPromise = new Promise<never>((_, reject) =>
|
||||||
|
setTimeout(() => reject(new Error('BullQueue close timeout')), 50)
|
||||||
|
);
|
||||||
|
|
||||||
|
try {
|
||||||
|
await Promise.race([queueClosePromise, queueTimeoutPromise]);
|
||||||
|
} catch (error) {
|
||||||
|
// Silently ignore timeout
|
||||||
|
}
|
||||||
|
|
||||||
logger.info('Queue closed', { queueName: this.queueName });
|
logger.info('Queue closed', { queueName: this.queueName });
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error('Error closing queue', { queueName: this.queueName, error });
|
logger.error('Error closing queue', { queueName: this.queueName, error });
|
||||||
|
|
|
||||||
|
|
@ -171,7 +171,7 @@ export class Shutdown {
|
||||||
process.platform === 'win32' ? ['SIGINT', 'SIGTERM'] : ['SIGTERM', 'SIGINT', 'SIGUSR2'];
|
process.platform === 'win32' ? ['SIGINT', 'SIGTERM'] : ['SIGTERM', 'SIGINT', 'SIGUSR2'];
|
||||||
|
|
||||||
signals.forEach(signal => {
|
signals.forEach(signal => {
|
||||||
process.on(signal, () => {
|
process.once(signal, () => { // Changed from 'on' to 'once' to prevent multiple handlers
|
||||||
this.shutdownAndExit(signal).catch(() => {
|
this.shutdownAndExit(signal).catch(() => {
|
||||||
process.exit(1);
|
process.exit(1);
|
||||||
});
|
});
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue