fixed priority shutdown

This commit is contained in:
Boki 2025-06-21 09:08:40 -04:00
parent 6d5d746f68
commit 5929612e36
6 changed files with 113 additions and 46 deletions

2
.env
View file

@ -5,7 +5,7 @@
# Core Application Settings # Core Application Settings
NODE_ENV=development NODE_ENV=development
LOG_LEVEL=debug LOG_LEVEL=debug
LOG_HIDE_OBJECT=false LOG_HIDE_OBJECT=true
# Data Service Configuration # Data Service Configuration
DATA_SERVICE_PORT=2001 DATA_SERVICE_PORT=2001

View file

@ -132,16 +132,16 @@ async function initializeServices() {
// Initialize providers (register handlers and scheduled jobs) // Initialize providers (register handlers and scheduled jobs)
logger.debug('Initializing data providers...'); logger.debug('Initializing data providers...');
const { initializeWebShareProvider } = await import('./providers/webshare.provider'); const { initializeWebShareProvider } = await import('./providers/webshare.provider');
// const { initializeExchangeSyncProvider } = await import('./providers/exchange-sync.provider'); const { initializeExchangeSyncProvider } = await import('./providers/exchange-sync.provider');
// const { initializeIBProvider } = await import('./providers/ib.provider'); const { initializeIBProvider } = await import('./providers/ib.provider');
// const { initializeProxyProvider } = await import('./providers/proxy.provider'); const { initializeProxyProvider } = await import('./providers/proxy.provider');
// const { initializeQMProvider } = await import('./providers/qm.provider'); const { initializeQMProvider } = await import('./providers/qm.provider');
initializeWebShareProvider(); initializeWebShareProvider();
// initializeExchangeSyncProvider(); initializeExchangeSyncProvider();
// initializeIBProvider(); initializeIBProvider();
// initializeProxyProvider(); initializeProxyProvider();
// initializeQMProvider(); initializeQMProvider();
logger.info('Data providers initialized'); logger.info('Data providers initialized');
@ -216,20 +216,9 @@ async function startServer() {
logger.info(`Data Service started on port ${PORT}`); logger.info(`Data Service started on port ${PORT}`);
} }
// Register shutdown handlers // Register shutdown handlers with priorities
shutdown.onShutdown(async () => { // Priority 1: Queue system (highest priority)
if (server) { shutdown.onShutdownHigh(async () => {
logger.info('Stopping HTTP server...');
try {
server.stop();
logger.info('HTTP server stopped');
} catch (error) {
logger.error('Error stopping HTTP server', { error });
}
}
});
shutdown.onShutdown(async () => {
logger.info('Shutting down queue system...'); logger.info('Shutting down queue system...');
try { try {
if (queueManager) { if (queueManager) {
@ -239,9 +228,23 @@ shutdown.onShutdown(async () => {
} catch (error) { } catch (error) {
logger.error('Error shutting down queue system', { error }); logger.error('Error shutting down queue system', { error });
} }
}); }, 'Queue System');
shutdown.onShutdown(async () => { // Priority 1: HTTP Server (high priority)
shutdown.onShutdownHigh(async () => {
if (server) {
logger.info('Stopping HTTP server...');
try {
server.stop();
logger.info('HTTP server stopped');
} catch (error) {
logger.error('Error stopping HTTP server', { error });
}
}
}, 'HTTP Server');
// Priority 2: Database connections (medium priority)
shutdown.onShutdownMedium(async () => {
logger.info('Disconnecting from databases...'); logger.info('Disconnecting from databases...');
try { try {
const { disconnectMongoDB } = await import('@stock-bot/mongodb-client'); const { disconnectMongoDB } = await import('@stock-bot/mongodb-client');
@ -253,10 +256,10 @@ shutdown.onShutdown(async () => {
} catch (error) { } catch (error) {
logger.error('Error closing database connections', { error }); logger.error('Error closing database connections', { error });
} }
}); }, 'Databases');
// Logger shutdown - registered last so it runs after all other shutdown operations // Priority 3: Logger shutdown (lowest priority - runs last)
shutdown.onShutdown(async () => { shutdown.onShutdownLow(async () => {
try { try {
logger.info('Shutting down loggers...'); logger.info('Shutting down loggers...');
await shutdownLoggers(); await shutdownLoggers();
@ -264,7 +267,7 @@ shutdown.onShutdown(async () => {
} catch { } catch {
// Silently ignore logger shutdown errors // Silently ignore logger shutdown errors
} }
}); }, 'Loggers');
// Start the service // Start the service
startServer().catch(error => { startServer().catch(error => {

View file

@ -58,7 +58,7 @@ function createDestination(
// Console: In-process pretty stream for dev (fast shutdown) // Console: In-process pretty stream for dev (fast shutdown)
if (config.logConsole && config.environment !== 'production') { if (config.logConsole && config.environment !== 'production') {
const prettyStream = pretty({ const prettyStream = pretty({
sync: false, // IMPORTANT: Make async to prevent blocking the event loop sync: true, // IMPORTANT: Make async to prevent blocking the event loop
colorize: true, colorize: true,
translateTime: 'yyyy-mm-dd HH:MM:ss.l', translateTime: 'yyyy-mm-dd HH:MM:ss.l',
messageFormat: '[{service}{childName}] {msg}', messageFormat: '[{service}{childName}] {msg}',

View file

@ -9,7 +9,7 @@ import type { ShutdownResult } from './types';
// Core shutdown classes and types // Core shutdown classes and types
export { Shutdown } from './shutdown'; export { Shutdown } from './shutdown';
export type { ShutdownCallback, ShutdownOptions, ShutdownResult } from './types'; export type { ShutdownCallback, ShutdownOptions, ShutdownResult, PrioritizedShutdownCallback } from './types';
// Global singleton instance // Global singleton instance
let globalInstance: Shutdown | null = null; let globalInstance: Shutdown | null = null;
@ -31,8 +31,29 @@ function getGlobalInstance(): Shutdown {
/** /**
* Register a cleanup callback that will be executed during shutdown * Register a cleanup callback that will be executed during shutdown
*/ */
export function onShutdown(callback: () => Promise<void> | void): void { export function onShutdown(callback: () => Promise<void> | void, priority?: number, name?: string): void {
getGlobalInstance().onShutdown(callback); getGlobalInstance().onShutdown(callback, priority, name);
}
/**
* Register a high priority shutdown callback (for queues, critical services)
*/
export function onShutdownHigh(callback: () => Promise<void> | void, name?: string): void {
getGlobalInstance().onShutdownHigh(callback, name);
}
/**
* Register a medium priority shutdown callback (for databases, connections)
*/
export function onShutdownMedium(callback: () => Promise<void> | void, name?: string): void {
getGlobalInstance().onShutdownMedium(callback, name);
}
/**
* Register a low priority shutdown callback (for loggers, cleanup)
*/
export function onShutdownLow(callback: () => Promise<void> | void, name?: string): void {
getGlobalInstance().onShutdownLow(callback, name);
} }
/** /**

View file

@ -8,13 +8,13 @@
* - Platform-specific signal support (Windows/Unix) * - Platform-specific signal support (Windows/Unix)
*/ */
import type { ShutdownCallback, ShutdownOptions, ShutdownResult } from './types'; import type { ShutdownCallback, ShutdownOptions, ShutdownResult, PrioritizedShutdownCallback } from './types';
export class Shutdown { export class Shutdown {
private static instance: Shutdown | null = null; private static instance: Shutdown | null = null;
private isShuttingDown = false; private isShuttingDown = false;
private shutdownTimeout = 30000; // 30 seconds default private shutdownTimeout = 30000; // 30 seconds default
private callbacks: ShutdownCallback[] = []; private callbacks: PrioritizedShutdownCallback[] = [];
private signalHandlersRegistered = false; private signalHandlersRegistered = false;
constructor(options: ShutdownOptions = {}) { constructor(options: ShutdownOptions = {}) {
@ -43,13 +43,34 @@ export class Shutdown {
} }
/** /**
* Register a cleanup callback * Register a cleanup callback with priority (lower numbers = higher priority)
*/ */
onShutdown(callback: ShutdownCallback): void { onShutdown(callback: ShutdownCallback, priority: number = 50, name?: string): void {
if (this.isShuttingDown) { if (this.isShuttingDown) {
return; return;
} }
this.callbacks.push(callback); this.callbacks.push({ callback, priority, name });
}
/**
* Register a high priority shutdown callback (for queues, critical services)
*/
onShutdownHigh(callback: ShutdownCallback, name?: string): void {
this.onShutdown(callback, 10, name);
}
/**
* Register a medium priority shutdown callback (for databases, connections)
*/
onShutdownMedium(callback: ShutdownCallback, name?: string): void {
this.onShutdown(callback, 50, name);
}
/**
* Register a low priority shutdown callback (for loggers, cleanup)
*/
onShutdownLow(callback: ShutdownCallback, name?: string): void {
this.onShutdown(callback, 90, name);
} }
/** /**
@ -140,21 +161,34 @@ export class Shutdown {
} }
/** /**
* Execute all registered callbacks * Execute all registered callbacks in priority order
*/ */
private async executeCallbacks(): Promise<{ executed: number; failed: number }> { private async executeCallbacks(): Promise<{ executed: number; failed: number }> {
if (this.callbacks.length === 0) { if (this.callbacks.length === 0) {
return { executed: 0, failed: 0 }; return { executed: 0, failed: 0 };
} }
const results = await Promise.allSettled( // Sort callbacks by priority (lower numbers = higher priority = execute first)
this.callbacks.map(async callback => { const sortedCallbacks = [...this.callbacks].sort((a, b) => a.priority - b.priority);
await callback();
})
);
const failed = results.filter(result => result.status === 'rejected').length; let executed = 0;
const executed = results.length; let failed = 0;
// Execute callbacks in order by priority
for (const { callback, name, priority } of sortedCallbacks) {
try {
await callback();
executed++;
if (name) {
console.log(`✓ Shutdown completed: ${name} (priority: ${priority})`);
}
} catch (error) {
failed++;
if (name) {
console.error(`✗ Shutdown failed: ${name} (priority: ${priority})`, error);
}
}
}
return { executed, failed }; return { executed, failed };
} }

View file

@ -7,6 +7,15 @@
*/ */
export type ShutdownCallback = () => Promise<void> | void; export type ShutdownCallback = () => Promise<void> | void;
/**
* Shutdown callback with priority information
*/
export interface PrioritizedShutdownCallback {
callback: ShutdownCallback;
priority: number;
name?: string;
}
/** /**
* Options for configuring shutdown behavior * Options for configuring shutdown behavior
*/ */