added env back and fixed up queue service

This commit is contained in:
Boki 2025-06-11 08:03:55 -04:00
parent b645b58102
commit 16599c86da
5 changed files with 258 additions and 33 deletions

View file

@ -292,7 +292,6 @@ export class QueueService {
delayed: delayed.length
};
}
async drainQueue() {
if (this.isInitialized) {
await this.queue.drain();
@ -309,24 +308,71 @@ export class QueueService {
workers: this.workers.length,
concurrency: this.getTotalConcurrency()
};
}
async shutdown() {
} async shutdown() {
if (!this.isInitialized) {
this.logger.warn('Queue service not initialized, nothing to shutdown');
return;
}
this.logger.info('Shutting down queue service');
this.logger.info('Shutting down queue service gracefully...');
// Close all workers
await Promise.all(this.workers.map((worker, index) => {
this.logger.debug(`Closing worker ${index + 1}`);
return worker.close();
}));
await this.queue.close();
await this.queueEvents.close();
this.logger.info('Queue service shutdown complete');
try {
// Step 1: Stop accepting new jobs and wait for current jobs to finish
this.logger.debug('Closing workers gracefully...');
const workerClosePromises = this.workers.map(async (worker, index) => {
this.logger.debug(`Closing worker ${index + 1}/${this.workers.length}`);
try {
// Wait for current jobs to finish, then close
await Promise.race([
worker.close(),
new Promise((_, reject) =>
setTimeout(() => reject(new Error(`Worker ${index + 1} close timeout`)), 5000)
)
]);
this.logger.debug(`Worker ${index + 1} closed successfully`);
} catch (error) {
this.logger.error(`Failed to close worker ${index + 1}`, { error });
// Force close if graceful close fails
await worker.close(true);
}
});
await Promise.allSettled(workerClosePromises);
this.logger.debug('All workers closed');
// Step 2: Close queue and events with timeout protection
this.logger.debug('Closing queue and events...');
await Promise.allSettled([
Promise.race([
this.queue.close(),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Queue close timeout')), 3000)
)
]).catch(error => this.logger.error('Queue close error', { error })),
Promise.race([
this.queueEvents.close(),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('QueueEvents close timeout')), 3000)
)
]).catch(error => this.logger.error('QueueEvents close error', { error }))
]);
this.logger.info('Queue service shutdown completed successfully');
} catch (error) {
this.logger.error('Error during queue service shutdown', { error });
// Force close everything as last resort
try {
await Promise.allSettled([
...this.workers.map(worker => worker.close(true)),
this.queue.close(),
this.queueEvents.close()
]);
} catch (forceCloseError) {
this.logger.error('Force close also failed', { error: forceCloseError });
}
throw error;
}
}
}