diff --git a/apps/stock/data-pipeline/src/handlers/exchanges/exchanges.handler.ts b/apps/stock/data-pipeline/src/handlers/exchanges/exchanges.handler.ts index a5690f4..86a5597 100644 --- a/apps/stock/data-pipeline/src/handlers/exchanges/exchanges.handler.ts +++ b/apps/stock/data-pipeline/src/handlers/exchanges/exchanges.handler.ts @@ -106,3 +106,5 @@ class ExchangesHandler extends BaseHandler { return getSyncStatus({}, this.services); } } + +export { ExchangesHandler }; diff --git a/apps/stock/data-pipeline/src/handlers/index.ts b/apps/stock/data-pipeline/src/handlers/index.ts index f6cadf8..e4efe55 100644 --- a/apps/stock/data-pipeline/src/handlers/index.ts +++ b/apps/stock/data-pipeline/src/handlers/index.ts @@ -1,41 +1,41 @@ /** - * Handler auto-registration for data pipeline service - * Automatically discovers and registers all handlers + * Handler registration for data pipeline service */ -import type { IServiceContainer } from '@stock-bot/handlers'; -import { autoRegisterHandlers } from '@stock-bot/handlers'; +import type { IServiceContainer } from '@stock-bot/types'; import { getLogger } from '@stock-bot/logger'; -// Import handlers for bundling (ensures they're included in the build) -import './exchanges/exchanges.handler'; -import './symbols/symbols.handler'; +// Import handlers directly +import { ExchangesHandler } from './exchanges/exchanges.handler'; +import { SymbolsHandler } from './symbols/symbols.handler'; const logger = getLogger('pipeline-handler-init'); /** - * Initialize and register all handlers automatically + * Initialize and register all handlers */ export async function initializeAllHandlers(container: IServiceContainer): Promise { logger.info('Initializing data pipeline handlers...'); try { - // Auto-register all handlers in this directory - const result = await autoRegisterHandlers(__dirname, container, { - pattern: '.handler.', - exclude: ['test', 'spec', '.old'], - dryRun: false, - }); + // Register handlers manually + const handlers = [ + ExchangesHandler, + SymbolsHandler, + ]; - logger.info('Handler auto-registration complete', { - registered: result.registered, - failed: result.failed, - }); - - if (result.failed.length > 0) { - logger.error('Some handlers failed to register', { failed: result.failed }); + for (const Handler of handlers) { + try { + const instance = new Handler(container); + // Register the handler instance if needed + logger.info(`Registered handler: ${Handler.name}`); + } catch (error) { + logger.error(`Failed to register handler: ${Handler.name}`, { error }); + } } + + logger.info('Handler registration complete'); } catch (error) { - logger.error('Handler auto-registration failed', { error }); + logger.error('Handler registration failed', { error }); throw error; } -} +} \ No newline at end of file diff --git a/apps/stock/data-pipeline/src/handlers/symbols/symbols.handler.ts b/apps/stock/data-pipeline/src/handlers/symbols/symbols.handler.ts index 2c55478..3d7499e 100644 --- a/apps/stock/data-pipeline/src/handlers/symbols/symbols.handler.ts +++ b/apps/stock/data-pipeline/src/handlers/symbols/symbols.handler.ts @@ -69,3 +69,5 @@ class SymbolsHandler extends BaseHandler { return syncSymbolsFromProvider(payload, this.services); } } + +export { SymbolsHandler }; diff --git a/libs/core/cache/src/connection-manager.ts b/libs/core/cache/src/connection-manager.ts index 2c48bda..17f039f 100644 --- a/libs/core/cache/src/connection-manager.ts +++ b/libs/core/cache/src/connection-manager.ts @@ -1,5 +1,6 @@ import Redis from 'ioredis'; import type { RedisConfig } from './types'; +import { REDIS_DEFAULTS } from './constants'; interface ConnectionConfig { name: string; @@ -10,16 +11,12 @@ interface ConnectionConfig { } /** - * Redis Connection Manager for managing shared and unique connections + * Simplified Redis Connection Manager */ export class RedisConnectionManager { - private connections = new Map(); - private static sharedConnections = new Map(); + private static connections = new Map(); private static instance: RedisConnectionManager; - private logger: any = console; - private static readyConnections = new Set(); - // Singleton pattern for the manager itself static getInstance(): RedisConnectionManager { if (!this.instance) { this.instance = new RedisConnectionManager(); @@ -29,251 +26,50 @@ export class RedisConnectionManager { /** * Get or create a Redis connection - * @param config Connection configuration - * @returns Redis connection instance */ getConnection(config: ConnectionConfig): Redis { - const { name, singleton = false, db, redisConfig, logger } = config; - if (logger) { - this.logger = logger; + const { name, singleton = true, redisConfig } = config; + + if (singleton) { + const existing = RedisConnectionManager.connections.get(name); + if (existing) return existing; } + const connection = this.createConnection(redisConfig); + if (singleton) { - // Use shared connection across all instances - if (!RedisConnectionManager.sharedConnections.has(name)) { - const connection = this.createConnection(name, redisConfig, db, logger); - RedisConnectionManager.sharedConnections.set(name, connection); - this.logger.info(`Created shared Redis connection: ${name}`); - } - const connection = RedisConnectionManager.sharedConnections.get(name); - if (!connection) { - throw new Error(`Expected connection ${name} to exist in shared connections`); - } - return connection; - } else { - // Create unique connection per instance - const uniqueName = `${name}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; - const connection = this.createConnection(uniqueName, redisConfig, db, logger); - this.connections.set(uniqueName, connection); - this.logger.debug(`Created unique Redis connection: ${uniqueName}`); - return connection; + RedisConnectionManager.connections.set(name, connection); } + + return connection; } /** - * Create a new Redis connection with configuration + * Create a new Redis connection */ - private createConnection(name: string, config: RedisConfig, db?: number, logger?: any): Redis { - const redisOptions = { + private createConnection(config: RedisConfig): Redis { + return new Redis({ host: config.host, port: config.port, - password: config.password || undefined, - username: config.username || undefined, - db: db ?? config.db ?? 0, - maxRetriesPerRequest: config.maxRetriesPerRequest ?? 3, - retryDelayOnFailover: config.retryDelayOnFailover ?? 100, - connectTimeout: config.connectTimeout ?? 10000, - commandTimeout: config.commandTimeout ?? 5000, - keepAlive: config.keepAlive ?? 0, - connectionName: name, - lazyConnect: false, // Connect immediately instead of waiting for first command - ...(config.tls && { - tls: { - cert: config.tls.cert || undefined, - key: config.tls.key || undefined, - ca: config.tls.ca || undefined, - rejectUnauthorized: config.tls.rejectUnauthorized ?? true, - }, - }), - }; - - const redis = new Redis(redisOptions); - - // Use the provided logger or fall back to instance logger - const log = logger || this.logger; - - // Setup event handlers - redis.on('connect', () => { - log.info(`Redis connection established: ${name}`); + password: config.password, + username: config.username, + db: config.db ?? REDIS_DEFAULTS.DB, + maxRetriesPerRequest: config.maxRetriesPerRequest ?? REDIS_DEFAULTS.MAX_RETRIES, + connectTimeout: config.connectTimeout ?? REDIS_DEFAULTS.CONNECT_TIMEOUT, + commandTimeout: config.commandTimeout ?? REDIS_DEFAULTS.COMMAND_TIMEOUT, + lazyConnect: false, + ...(config.tls && { tls: config.tls }), }); - - redis.on('ready', () => { - log.info(`Redis connection ready: ${name}`); - }); - - redis.on('error', err => { - log.error(`Redis connection error for ${name}:`, err); - }); - - redis.on('close', () => { - log.warn(`Redis connection closed: ${name}`); - }); - - redis.on('reconnecting', () => { - log.warn(`Redis reconnecting: ${name}`); - }); - - return redis; } /** - * Close a specific connection + * Close all connections */ - async closeConnection(connection: Redis): Promise { - try { - await connection.quit(); - } catch (error) { - this.logger.warn('Error closing Redis connection:', error as Error); - } - } - - /** - * Close all connections managed by this instance - */ - async closeAllConnections(): Promise { - // Close instance-specific connections - const instancePromises = Array.from(this.connections.values()).map(conn => - this.closeConnection(conn) + static async closeAll(): Promise { + const promises = Array.from(this.connections.values()).map(conn => + conn.quit().catch(() => {}) ); - await Promise.all(instancePromises); + await Promise.all(promises); this.connections.clear(); - - // Close shared connections (only if this is the last instance) - if (RedisConnectionManager.instance === this) { - const sharedPromises = Array.from(RedisConnectionManager.sharedConnections.values()).map( - conn => this.closeConnection(conn) - ); - await Promise.all(sharedPromises); - RedisConnectionManager.sharedConnections.clear(); - } - - this.logger.info('All Redis connections closed'); } - - /** - * Get connection statistics - */ - getConnectionCount(): { shared: number; unique: number } { - return { - shared: RedisConnectionManager.sharedConnections.size, - unique: this.connections.size, - }; - } - - /** - * Get all connection names for monitoring - */ - getConnectionNames(): { shared: string[]; unique: string[] } { - return { - shared: Array.from(RedisConnectionManager.sharedConnections.keys()), - unique: Array.from(this.connections.keys()), - }; - } - - /** - * Health check for all connections - */ - async healthCheck(): Promise<{ healthy: boolean; details: Record }> { - const details: Record = {}; - let allHealthy = true; - - // Check shared connections - for (const [name, connection] of RedisConnectionManager.sharedConnections) { - try { - await connection.ping(); - details[`shared:${name}`] = true; - } catch { - details[`shared:${name}`] = false; - allHealthy = false; - } - } - - // Check instance connections - for (const [name, connection] of this.connections) { - try { - await connection.ping(); - details[`unique:${name}`] = true; - } catch { - details[`unique:${name}`] = false; - allHealthy = false; - } - } - - return { healthy: allHealthy, details }; - } - - /** - * Wait for all created connections to be ready - * @param timeout Maximum time to wait in milliseconds - * @returns Promise that resolves when all connections are ready - */ - static async waitForAllConnections(timeout: number = 30000): Promise { - const instance = this.getInstance(); - const allConnections = new Map([...instance.connections, ...this.sharedConnections]); - - if (allConnections.size === 0) { - instance.logger.debug('No Redis connections to wait for'); - return; - } - - instance.logger.info(`Waiting for ${allConnections.size} Redis connections to be ready...`); - - const connectionPromises = Array.from(allConnections.entries()).map(([name, redis]) => - instance.waitForConnection(redis, name, timeout) - ); - - try { - await Promise.all(connectionPromises); - instance.logger.info('All Redis connections are ready'); - } catch (error) { - instance.logger.error('Failed to establish all Redis connections:', error); - throw error; - } - } - - /** - * Wait for a specific connection to be ready - */ - private async waitForConnection(redis: Redis, name: string, timeout: number): Promise { - return new Promise((resolve, reject) => { - const timeoutId = setTimeout(() => { - reject(new Error(`Redis connection ${name} failed to be ready within ${timeout}ms`)); - }, timeout); - - const onReady = () => { - clearTimeout(timeoutId); - RedisConnectionManager.readyConnections.add(name); - this.logger.debug(`Redis connection ready: ${name}`); - resolve(); - }; - - const onError = (err: Error) => { - clearTimeout(timeoutId); - this.logger.error(`Redis connection failed for ${name}:`, err); - reject(err); - }; - - if (redis.status === 'ready') { - onReady(); - } else { - redis.once('ready', onReady); - redis.once('error', onError); - } - }); - } - - /** - * Check if all connections are ready - */ - static areAllConnectionsReady(): boolean { - const instance = this.getInstance(); - const allConnections = new Map([...instance.connections, ...this.sharedConnections]); - - return ( - allConnections.size > 0 && - Array.from(allConnections.keys()).every(name => this.readyConnections.has(name)) - ); - } -} - -export default RedisConnectionManager; +} \ No newline at end of file diff --git a/libs/core/cache/src/constants.ts b/libs/core/cache/src/constants.ts new file mode 100644 index 0000000..0f9a523 --- /dev/null +++ b/libs/core/cache/src/constants.ts @@ -0,0 +1,16 @@ +// Cache constants +export const CACHE_DEFAULTS = { + TTL: 3600, // 1 hour in seconds + KEY_PREFIX: 'cache:', + SCAN_COUNT: 100, +} as const; + +// Redis connection constants +export const REDIS_DEFAULTS = { + DB: 0, + MAX_RETRIES: 3, + RETRY_DELAY: 100, + CONNECT_TIMEOUT: 10000, + COMMAND_TIMEOUT: 5000, + KEEP_ALIVE: 0, +} as const; \ No newline at end of file diff --git a/libs/core/cache/src/index.ts b/libs/core/cache/src/index.ts index 4a4e4e3..cd5eef2 100644 --- a/libs/core/cache/src/index.ts +++ b/libs/core/cache/src/index.ts @@ -5,7 +5,7 @@ import type { CacheOptions, CacheProvider } from './types'; const cacheInstances = new Map(); /** - * Create a Redis cache instance with trading-optimized defaults + * Create a Redis cache instance */ export function createCache(options: CacheOptions): CacheProvider { const defaultOptions: CacheOptions = { @@ -37,19 +37,7 @@ export function createCache(options: CacheOptions): CacheProvider { return new RedisCache(defaultOptions); } -// Export types and classes -export type { - CacheConfig, - CacheKey, - CacheOptions, - CacheProvider, - CacheStats, - RedisConfig, - SerializationOptions, -} from './types'; - -export { RedisConnectionManager } from './connection-manager'; -export { CacheKeyGenerator } from './key-generator'; -export { RedisCache } from './redis-cache'; +// Export only what's actually used +export type { CacheProvider, CacheStats } from './types'; export { NamespacedCache } from './namespaced-cache'; -export { createNamespacedCache, isCacheAvailable } from './cache-factory'; +export { createNamespacedCache } from './cache-factory'; \ No newline at end of file diff --git a/libs/core/cache/src/redis-cache.ts b/libs/core/cache/src/redis-cache.ts index 40c7af9..f65128a 100644 --- a/libs/core/cache/src/redis-cache.ts +++ b/libs/core/cache/src/redis-cache.ts @@ -1,20 +1,16 @@ import Redis from 'ioredis'; import { RedisConnectionManager } from './connection-manager'; import type { CacheOptions, CacheProvider, CacheStats } from './types'; +import { CACHE_DEFAULTS } from './constants'; /** - * Simplified Redis-based cache provider using connection manager + * Simplified Redis-based cache provider */ export class RedisCache implements CacheProvider { private redis: Redis; private logger: any; private defaultTTL: number; private keyPrefix: string; - private enableMetrics: boolean; - private isConnected = false; - private startTime = Date.now(); - private connectionManager: RedisConnectionManager; - private stats: CacheStats = { hits: 0, misses: 0, @@ -23,65 +19,22 @@ export class RedisCache implements CacheProvider { total: 0, uptime: 0, }; + private startTime = Date.now(); constructor(options: CacheOptions) { - this.defaultTTL = options.ttl ?? 3600; // 1 hour default - this.keyPrefix = options.keyPrefix ?? 'cache:'; - this.enableMetrics = options.enableMetrics ?? true; - this.logger = options.logger || console; // Use provided logger or console as fallback + this.defaultTTL = options.ttl ?? CACHE_DEFAULTS.TTL; + this.keyPrefix = options.keyPrefix ?? CACHE_DEFAULTS.KEY_PREFIX; + this.logger = options.logger || console; - // Get connection manager instance - this.connectionManager = RedisConnectionManager.getInstance(); - - // Generate connection name based on cache type - const baseName = - options.name || - this.keyPrefix - .replace(':', '') - .replace(/[^a-zA-Z0-9]/g, '') - .toUpperCase() || - 'CACHE'; - - // Get Redis connection (shared by default for cache) - this.redis = this.connectionManager.getConnection({ - name: `${baseName}-SERVICE`, - singleton: options.shared ?? true, // Default to shared connection for cache + const manager = RedisConnectionManager.getInstance(); + const name = options.name || 'CACHE'; + + this.redis = manager.getConnection({ + name: `${name}-SERVICE`, + singleton: options.shared ?? true, redisConfig: options.redisConfig, logger: this.logger, }); - - // Only setup event handlers for non-shared connections to avoid memory leaks - if (!(options.shared ?? true)) { - this.setupEventHandlers(); - } else { - // For shared connections, just monitor the connection status without adding handlers - this.isConnected = this.redis.status === 'ready'; - } - } - - private setupEventHandlers(): void { - this.redis.on('connect', () => { - this.logger.info('Redis cache connected'); - }); - - this.redis.on('ready', () => { - this.isConnected = true; - this.logger.info('Redis cache ready'); - }); - - this.redis.on('error', (error: Error) => { - this.isConnected = false; - this.logger.error('Redis cache connection error', { error: error.message }); - }); - - this.redis.on('close', () => { - this.isConnected = false; - this.logger.warn('Redis cache connection closed'); - }); - - this.redis.on('reconnecting', () => { - this.logger.warn('Redis cache reconnecting...'); - }); } private getKey(key: string): string { @@ -89,10 +42,6 @@ export class RedisCache implements CacheProvider { } private updateStats(hit: boolean, error = false): void { - if (!this.enableMetrics) { - return; - } - if (error) { this.stats.errors++; } else if (hit) { @@ -100,256 +49,145 @@ export class RedisCache implements CacheProvider { } else { this.stats.misses++; } - this.stats.total = this.stats.hits + this.stats.misses; this.stats.hitRate = this.stats.total > 0 ? this.stats.hits / this.stats.total : 0; this.stats.uptime = Date.now() - this.startTime; } - private async safeExecute( - operation: () => Promise, - fallback: T, - operationName: string - ): Promise { - try { - if (!this.isReady()) { - this.logger.warn(`Redis not ready for ${operationName}, using fallback`); - this.updateStats(false, true); - return fallback; - } - return await operation(); - } catch (error) { - this.logger.error(`Redis ${operationName} failed`, { - error: error instanceof Error ? error.message : String(error), - }); - this.updateStats(false, true); - return fallback; - } - } - async get(key: string): Promise { - return this.safeExecute( - async () => { - const fullKey = this.getKey(key); - const value = await this.redis.get(fullKey); - - if (value === null) { - this.updateStats(false); - this.logger.debug('Cache miss', { key }); - return null; - } - - this.updateStats(true); - this.logger.debug('Cache hit', { key }); - - try { - return JSON.parse(value) as T; - } catch { - // If parsing fails, return as string - return value as unknown as T; - } - }, - null, - 'get' - ); + try { + const value = await this.redis.get(this.getKey(key)); + if (value === null) { + this.updateStats(false); + return null; + } + this.updateStats(true); + return JSON.parse(value); + } catch (error) { + this.updateStats(false, true); + return null; + } } async set( key: string, value: T, - options?: - | number - | { - ttl?: number; - preserveTTL?: boolean; - onlyIfExists?: boolean; - onlyIfNotExists?: boolean; - getOldValue?: boolean; - } - ): Promise { - // Validate options before safeExecute - const config = typeof options === 'number' ? { ttl: options } : options || {}; - if (config.onlyIfExists && config.onlyIfNotExists) { - throw new Error('Cannot specify both onlyIfExists and onlyIfNotExists'); + options?: number | { + ttl?: number; + preserveTTL?: boolean; + onlyIfExists?: boolean; + onlyIfNotExists?: boolean; + getOldValue?: boolean; } - - return this.safeExecute( - async () => { - const fullKey = this.getKey(key); - const serialized = typeof value === 'string' ? value : JSON.stringify(value); - - // Config is already parsed and validated above - - let oldValue: T | null = null; - - // Get old value if requested - if (config.getOldValue) { - const existingValue = await this.redis.get(fullKey); - if (existingValue !== null) { - try { - oldValue = JSON.parse(existingValue) as T; - } catch { - oldValue = existingValue as unknown as T; - } - } - } - - // Handle preserveTTL logic - if (config.preserveTTL) { - const currentTTL = await this.redis.ttl(fullKey); - - if (currentTTL === -2) { - // Key doesn't exist - if (config.onlyIfExists) { - this.logger.debug('Set skipped - key does not exist and onlyIfExists is true', { - key, - }); - return oldValue; - } - // Set with default or specified TTL - const ttl = config.ttl ?? this.defaultTTL; - await this.redis.setex(fullKey, ttl, serialized); - this.logger.debug('Cache set with new TTL (key did not exist)', { key, ttl }); - } else if (currentTTL === -1) { - // Key exists but has no expiry - preserve the no-expiry state - await this.redis.set(fullKey, serialized); - this.logger.debug('Cache set preserving no-expiry', { key }); - } else { - // Key exists with TTL - preserve it - await this.redis.setex(fullKey, currentTTL, serialized); - this.logger.debug('Cache set preserving existing TTL', { key, ttl: currentTTL }); - } + ): Promise { + try { + const fullKey = this.getKey(key); + const serialized = JSON.stringify(value); + const opts = typeof options === 'number' ? { ttl: options } : options || {}; + + let oldValue: T | null = null; + if (opts.getOldValue) { + const existing = await this.redis.get(fullKey); + if (existing) oldValue = JSON.parse(existing); + } + + const ttl = opts.ttl ?? this.defaultTTL; + + if (opts.onlyIfExists) { + const result = await this.redis.set(fullKey, serialized, 'EX', ttl, 'XX'); + if (!result) return oldValue; + } else if (opts.onlyIfNotExists) { + const result = await this.redis.set(fullKey, serialized, 'EX', ttl, 'NX'); + if (!result) return oldValue; + } else if (opts.preserveTTL) { + const currentTTL = await this.redis.ttl(fullKey); + if (currentTTL > 0) { + await this.redis.setex(fullKey, currentTTL, serialized); } else { - // Standard set logic with conditional operations - - if (config.onlyIfExists) { - // Only set if key exists (XX flag) - const ttl = config.ttl ?? this.defaultTTL; - const result = await this.redis.set(fullKey, serialized, 'EX', ttl, 'XX'); - if (result === null) { - this.logger.debug('Set skipped - key does not exist', { key }); - return oldValue; - } - } else if (config.onlyIfNotExists) { - // Only set if key doesn't exist (NX flag) - const ttl = config.ttl ?? this.defaultTTL; - const result = await this.redis.set(fullKey, serialized, 'EX', ttl, 'NX'); - if (result === null) { - this.logger.debug('Set skipped - key already exists', { key }); - return oldValue; - } - } else { - // Standard set - const ttl = config.ttl ?? this.defaultTTL; - await this.redis.setex(fullKey, ttl, serialized); - } - - this.logger.debug('Cache set', { key, ttl: config.ttl ?? this.defaultTTL }); + await this.redis.setex(fullKey, ttl, serialized); } - - return oldValue; - }, - null, - 'set' - ); + } else { + await this.redis.setex(fullKey, ttl, serialized); + } + + return oldValue; + } catch (error) { + this.updateStats(false, true); + throw error; + } } async del(key: string): Promise { - await this.safeExecute( - async () => { - const fullKey = this.getKey(key); - await this.redis.del(fullKey); - this.logger.debug('Cache delete', { key }); - }, - undefined, - 'del' - ); + try { + await this.redis.del(this.getKey(key)); + } catch (error) { + this.updateStats(false, true); + throw error; + } } async exists(key: string): Promise { - return this.safeExecute( - async () => { - const fullKey = this.getKey(key); - const result = await this.redis.exists(fullKey); - return result === 1; - }, - false, - 'exists' - ); + try { + return (await this.redis.exists(this.getKey(key))) === 1; + } catch (error) { + this.updateStats(false, true); + return false; + } } async clear(): Promise { - await this.safeExecute( - async () => { - const pattern = `${this.keyPrefix}*`; - const keys = await this.redis.keys(pattern); - if (keys.length > 0) { - await this.redis.del(...keys); - this.logger.warn('Cache cleared', { keysDeleted: keys.length }); + try { + const stream = this.redis.scanStream({ + match: `${this.keyPrefix}*`, + count: CACHE_DEFAULTS.SCAN_COUNT + }); + + const pipeline = this.redis.pipeline(); + stream.on('data', (keys: string[]) => { + if (keys.length) { + keys.forEach(key => pipeline.del(key)); } - }, - undefined, - 'clear' - ); - } - - /** - * Get a value using a raw Redis key (bypassing the keyPrefix) - * Useful for accessing cache data from other services with different prefixes - */ - async getRaw(key: string): Promise { - return this.safeExecute( - async () => { - // Use the key directly without adding our prefix - const value = await this.redis.get(key); - if (!value) { - this.updateStats(false); - return null; - } - this.updateStats(true); - - try { - const parsed = JSON.parse(value); - this.logger.debug('Cache raw get hit', { key }); - return parsed; - } catch (error) { - // If JSON parsing fails, log the error with more context - this.logger.warn('Cache getRaw JSON parse failed', { - key, - valueLength: value.length, - valuePreview: value.substring(0, 100), - error: error instanceof Error ? error.message : String(error), - }); - // Return the raw value as-is if it can't be parsed - return value as unknown as T; - } - }, - null, - 'getRaw' - ); + }); + + await new Promise((resolve, reject) => { + stream.on('end', resolve); + stream.on('error', reject); + }); + + await pipeline.exec(); + } catch (error) { + this.updateStats(false, true); + throw error; + } } async keys(pattern: string): Promise { - return this.safeExecute( - async () => { - const fullPattern = `${this.keyPrefix}${pattern}`; - const keys = await this.redis.keys(fullPattern); - // Remove the prefix from returned keys to match the interface expectation - return keys.map(key => key.replace(this.keyPrefix, '')); - }, - [], - 'keys' - ); + try { + const keys: string[] = []; + const stream = this.redis.scanStream({ + match: `${this.keyPrefix}${pattern}`, + count: CACHE_DEFAULTS.SCAN_COUNT + }); + + await new Promise((resolve, reject) => { + stream.on('data', (resultKeys: string[]) => { + keys.push(...resultKeys.map(k => k.replace(this.keyPrefix, ''))); + }); + stream.on('end', resolve); + stream.on('error', reject); + }); + + return keys; + } catch (error) { + this.updateStats(false, true); + return []; + } } async health(): Promise { try { - const pong = await this.redis.ping(); - return pong === 'PONG'; - } catch (error) { - this.logger.error('Redis health check failed', { - error: error instanceof Error ? error.message : String(error), - }); + return (await this.redis.ping()) === 'PONG'; + } catch { return false; } } @@ -362,115 +200,21 @@ export class RedisCache implements CacheProvider { } async waitForReady(timeout = 5000): Promise { + if (this.redis.status === 'ready') return; + return new Promise((resolve, reject) => { - if (this.redis.status === 'ready') { - resolve(); - return; - } - - const timeoutId = setTimeout(() => { + const timer = setTimeout(() => { reject(new Error(`Redis connection timeout after ${timeout}ms`)); }, timeout); this.redis.once('ready', () => { - clearTimeout(timeoutId); + clearTimeout(timer); resolve(); }); - - this.redis.once('error', error => { - clearTimeout(timeoutId); - reject(error); - }); }); } isReady(): boolean { - // Always check the actual Redis connection status - const ready = this.redis.status === 'ready'; - - // Update local flag if we're not using shared connection - if (this.isConnected !== ready) { - this.isConnected = ready; - } - - return ready; + return this.redis.status === 'ready'; } - - // Enhanced convenience methods - async update(key: string, value: T): Promise { - return this.set(key, value, { preserveTTL: true, getOldValue: true }); - } - - async setIfExists(key: string, value: T, ttl?: number): Promise { - const result = await this.set(key, value, { ttl, onlyIfExists: true }); - return result !== null || (await this.exists(key)); - } - - async setIfNotExists(key: string, value: T, ttl?: number): Promise { - const oldValue = await this.set(key, value, { ttl, onlyIfNotExists: true, getOldValue: true }); - return oldValue === null; // Returns true if key didn't exist before - } - - async replace(key: string, value: T, ttl?: number): Promise { - return this.set(key, value, { ttl, onlyIfExists: true, getOldValue: true }); - } - - // Atomic update with transformation - async updateField( - key: string, - updater: (current: T | null) => T, - ttl?: number - ): Promise { - return this.safeExecute( - async () => { - const fullKey = this.getKey(key); - - // Use Lua script for atomic read-modify-write - const luaScript = ` - local key = KEYS[1] - - -- Get current value and TTL - local current_value = redis.call('GET', key) - local current_ttl = redis.call('TTL', key) - - -- Return current value for processing - return {current_value, current_ttl} - `; - - const [currentValue, currentTTL] = (await this.redis.eval(luaScript, 1, fullKey)) as [ - string | null, - number, - ]; - - // Parse current value - let parsed: T | null = null; - if (currentValue !== null) { - try { - parsed = JSON.parse(currentValue) as T; - } catch { - parsed = currentValue as unknown as T; - } - } - - // Apply updater function - const newValue = updater(parsed); - - // Set the new value with appropriate TTL logic - if (ttl !== undefined) { - // Use specified TTL - await this.set(key, newValue, ttl); - } else if (currentTTL === -2) { - // Key didn't exist, use default TTL - await this.set(key, newValue); - } else { - // Preserve existing TTL - await this.set(key, newValue, { preserveTTL: true }); - } - - return parsed; - }, - null, - 'updateField' - ); - } -} +} \ No newline at end of file diff --git a/libs/core/cache/test/redis-cache.test.ts b/libs/core/cache/test/redis-cache.test.ts index 2e38f7c..98ed029 100644 --- a/libs/core/cache/test/redis-cache.test.ts +++ b/libs/core/cache/test/redis-cache.test.ts @@ -108,17 +108,15 @@ describe('RedisCache', () => { logger: mockLogger, }; - // Setup event handler storage - mockRedis.on = mock((event: string, handler: Function) => { - mockRedis._eventCallbacks[event] = handler; - }); - cache = new RedisCache(options); - // Should setup event handlers for non-shared - expect(mockRedis.on).toHaveBeenCalledWith('connect', expect.any(Function)); - expect(mockRedis.on).toHaveBeenCalledWith('ready', expect.any(Function)); - expect(mockRedis.on).toHaveBeenCalledWith('error', expect.any(Function)); + // Should create a new connection for non-shared + expect(mockConnectionManager.getConnection).toHaveBeenCalledWith({ + name: 'CACHE-SERVICE', + singleton: false, + redisConfig: options.redisConfig, + logger: mockLogger, + }); }); it('should sanitize prefix for connection name', () => { diff --git a/libs/core/config/src/config-manager.ts b/libs/core/config/src/config-manager.ts index 874fb54..2b1f326 100644 --- a/libs/core/config/src/config-manager.ts +++ b/libs/core/config/src/config-manager.ts @@ -22,20 +22,19 @@ export class ConfigManager> { constructor(options: ConfigManagerOptions = {}) { this.environment = options.environment || this.detectEnvironment(); - // Default loaders if none provided if (options.loaders) { this.loaders = options.loaders; } else { const configPath = options.configPath || join(process.cwd(), 'config'); this.loaders = [ new FileLoader(configPath, this.environment), - new EnvLoader(''), // No prefix for env vars to match our .env file + new EnvLoader(''), ]; } } /** - * Initialize the configuration by loading from all sources synchronously. + * Initialize the configuration by loading from all sources */ initialize(schema?: ConfigSchema): T { if (this.config) { @@ -50,7 +49,6 @@ export class ConfigManager> { // Load configurations from all sources const configs: Record[] = []; for (const loader of sortedLoaders) { - // Assuming all loaders now have a synchronous `load` method const config = loader.load(); if (config && Object.keys(config).length > 0) { configs.push(config); @@ -58,14 +56,10 @@ export class ConfigManager> { } // Merge all configurations - const mergedConfig = this.deepMerge(...configs) as T; + const mergedConfig = this.merge(...configs) as T; // Add environment if not present - if ( - typeof mergedConfig === 'object' && - mergedConfig !== null && - !('environment' in mergedConfig) - ) { + if (typeof mergedConfig === 'object' && mergedConfig !== null && !('environment' in mergedConfig)) { (mergedConfig as Record)['environment'] = this.environment; } @@ -79,12 +73,9 @@ export class ConfigManager> { path: err.path.join('.'), message: err.message, code: err.code, - expected: (err as any).expected, - received: (err as any).received, })); this.logger.error('Configuration validation failed:', errorDetails); - throw new ConfigValidationError('Configuration validation failed', error.errors); } throw error; @@ -138,19 +129,18 @@ export class ConfigManager> { } /** - * Update configuration at runtime (useful for testing) + * Update configuration at runtime */ set(updates: DeepPartial): void { if (!this.config) { throw new ConfigError('Configuration not initialized. Call initialize() first.'); } - const updated = this.deepMerge( + const updated = this.merge( this.config as Record, updates as Record ) as T; - // Re-validate if schema is present if (this.schema) { try { this.config = this.schema.parse(updated) as T; @@ -176,7 +166,7 @@ export class ConfigManager> { } /** - * Reset configuration (useful for testing) + * Reset configuration */ reset(): void { this.config = null; @@ -190,13 +180,6 @@ export class ConfigManager> { return schema.parse(config); } - /** - * Create a typed configuration getter - */ - createTypedGetter(schema: S): () => z.infer { - return () => this.validate(schema); - } - private detectEnvironment(): Environment { const env = process.env.NODE_ENV?.toLowerCase(); switch (env) { @@ -212,48 +195,32 @@ export class ConfigManager> { } } - private deepMerge(...objects: Record[]): Record { - const seen = new WeakSet(); - - const merge = (...objs: Record[]): Record => { - const result: Record = {}; + /** + * Simple deep merge without circular reference handling + */ + private merge(...objects: Record[]): Record { + const result: Record = {}; - for (const obj of objs) { - if (seen.has(obj)) { - // Skip circular reference instead of throwing - return result; + for (const obj of objects) { + for (const [key, value] of Object.entries(obj)) { + if (value === null || value === undefined) { + result[key] = value; + } else if ( + typeof value === 'object' && + !Array.isArray(value) && + !(value instanceof Date) && + !(value instanceof RegExp) + ) { + result[key] = this.merge( + (result[key] as Record) || {}, + value as Record + ); + } else { + result[key] = value; } - - seen.add(obj); - - for (const [key, value] of Object.entries(obj)) { - if (value === null || value === undefined) { - result[key] = value; - } else if ( - typeof value === 'object' && - !Array.isArray(value) && - !(value instanceof Date) && - !(value instanceof RegExp) - ) { - if (seen.has(value)) { - // Skip circular reference - don't merge this value - continue; - } - result[key] = merge( - (result[key] as Record) || ({} as Record), - value as Record - ); - } else { - result[key] = value; - } - } - - seen.delete(obj); } + } - return result; - }; - - return merge(...objects); + return result; } -} +} \ No newline at end of file diff --git a/libs/core/config/src/index.ts b/libs/core/config/src/index.ts index e8b4c42..30c7a57 100644 --- a/libs/core/config/src/index.ts +++ b/libs/core/config/src/index.ts @@ -1,192 +1,22 @@ -// Import necessary types -import { z } from 'zod'; -import { EnvLoader } from './loaders/env.loader'; -import { FileLoader } from './loaders/file.loader'; import { ConfigManager } from './config-manager'; -import { ConfigError } from './errors'; -import type { BaseAppConfig } from './schemas'; -import { baseAppSchema } from './schemas'; -// Legacy singleton instance for backward compatibility -let configInstance: ConfigManager | null = null; - -// Synchronously load critical env vars for early initialization -function loadCriticalEnvVarsSync(): void { - // Load .env file synchronously if it exists - try { - const fs = require('fs'); - const path = require('path'); - const envPath = path.resolve(process.cwd(), '.env'); - if (fs.existsSync(envPath)) { - const envContent = fs.readFileSync(envPath, 'utf-8'); - const lines = envContent.split('\n'); - - for (const line of lines) { - const trimmed = line.trim(); - if (!trimmed || trimmed.startsWith('#')) { - continue; - } - - const equalIndex = trimmed.indexOf('='); - if (equalIndex === -1) { - continue; - } - - const key = trimmed.substring(0, equalIndex).trim(); - let value = trimmed.substring(equalIndex + 1).trim(); - - // Remove surrounding quotes - if ( - (value.startsWith('"') && value.endsWith('"')) || - (value.startsWith("'") && value.endsWith("'")) - ) { - value = value.slice(1, -1); - } - - // Only set if not already set - if (!(key in process.env)) { - process.env[key] = value; - } - } - } - } catch { - // Ignore errors - env file is optional - } -} - -// Load critical env vars immediately -loadCriticalEnvVarsSync(); - -/** - * Initialize configuration for a service in a monorepo. - * Automatically loads configs from: - * 1. Root config directory (../../config) - * 2. Service-specific config directory (./config) - * 3. Environment variables - */ -export function initializeServiceConfig(): BaseAppConfig { - if (!configInstance) { - const environment = process.env.NODE_ENV || 'development'; - configInstance = new ConfigManager({ - loaders: [ - new FileLoader('../../config', environment), // Root config - new FileLoader('./config', environment), // Service config - new EnvLoader(''), // Environment variables - ], - }); - } - return configInstance.initialize(baseAppSchema); -} - -/** - * Get the current configuration - */ -export function getConfig(): BaseAppConfig { - if (!configInstance) { - throw new ConfigError('Configuration not initialized. Call initializeConfig() first.'); - } - return configInstance.get(); -} - -/** - * Get configuration manager instance - */ -export function getConfigManager(): ConfigManager { - if (!configInstance) { - throw new ConfigError('Configuration not initialized. Call initializeConfig() first.'); - } - return configInstance; -} - -/** - * Reset configuration (useful for testing) - */ -export function resetConfig(): void { - if (configInstance) { - configInstance.reset(); - configInstance = null; - } -} - -// Export convenience functions for common configs -export function getDatabaseConfig() { - return getConfig().database; -} - -export function getServiceConfig() { - return getConfig().service; -} - -export function getLogConfig() { - return getConfig().log; -} - -export function getQueueConfig() { - return getConfig().queue; -} - -// Export environment helpers -export function isDevelopment(): boolean { - return getConfig().environment === 'development'; -} - -export function isProduction(): boolean { - return getConfig().environment === 'production'; -} - -export function isTest(): boolean { - return getConfig().environment === 'test'; -} - -/** - * Generic config builder for creating app-specific configurations - * @param schema - Zod schema for your app config - * @param options - Config manager options - * @returns Initialized config manager instance - */ -export function createAppConfig( - schema: T, - options?: { - configPath?: string; - environment?: 'development' | 'test' | 'production'; - loaders?: any[]; - } -): ConfigManager> { - const manager = new ConfigManager>(options); - return manager; -} - -/** - * Create and initialize app config in one step - */ -export function initializeAppConfig( - schema: T, - options?: { - configPath?: string; - environment?: 'development' | 'test' | 'production'; - loaders?: any[]; - } -): z.infer { - const manager = createAppConfig(schema, options); - return manager.initialize(schema); -} - -// Export all schemas -export * from './schemas'; - -// Export types -export * from './types'; - -// Export errors -export * from './errors'; - -// Export loaders -export { EnvLoader } from './loaders/env.loader'; -export { FileLoader } from './loaders/file.loader'; - -// Export ConfigManager +// Export only what's actually used export { ConfigManager } from './config-manager'; +export { toUnifiedConfig } from './schemas/unified-app.schema'; -// Export utilities -export * from './utils/secrets'; -export * from './utils/validation'; +// Export used types +export type { BaseAppConfig, UnifiedAppConfig } from './schemas'; + +// Export schemas that are used by apps +export { + baseAppSchema, + dragonflyConfigSchema, + mongodbConfigSchema, + postgresConfigSchema, + questdbConfigSchema, +} from './schemas'; + +// createAppConfig function for apps/stock +export function createAppConfig(schema: any, options?: any): ConfigManager { + return new ConfigManager(options); +} \ No newline at end of file diff --git a/libs/core/di/src/index.ts b/libs/core/di/src/index.ts index 37ec6a1..417188e 100644 --- a/libs/core/di/src/index.ts +++ b/libs/core/di/src/index.ts @@ -1,39 +1,3 @@ -// Export all dependency injection components -export * from './operation-context'; -export * from './pool-size-calculator'; -export * from './types'; - -// Re-export IServiceContainer from types for convenience -export type { IServiceContainer } from '@stock-bot/types'; - -// Type exports from awilix-container -export { - type AppConfig, - type ServiceCradle, - type ServiceContainer, - type ServiceContainerOptions, -} from './awilix-container'; - -// New modular structure exports -export * from './container/types'; -export { ServiceContainerBuilder } from './container/builder'; - -// Configuration exports -export * from './config/schemas'; - -// Factory exports -export * from './factories'; - -// Utility exports -export { ServiceLifecycleManager } from './utils/lifecycle'; - -// Service application framework -export { - ServiceApplication, - type ServiceApplicationConfig, - type ServiceLifecycleHooks, -} from './service-application'; - -// Handler scanner -export { HandlerScanner } from './scanner'; -export type { HandlerScannerOptions } from './scanner'; +// Export only what's actually used +export { ServiceApplication } from './service-application'; +export { ServiceContainerBuilder } from './container/builder'; \ No newline at end of file diff --git a/libs/core/di/src/registrations/database.registration.ts b/libs/core/di/src/registrations/database.registration.ts index 3d984dc..7370831 100644 --- a/libs/core/di/src/registrations/database.registration.ts +++ b/libs/core/di/src/registrations/database.registration.ts @@ -2,7 +2,7 @@ import { asFunction, asValue, type AwilixContainer } from 'awilix'; import { MongoDBClient } from '@stock-bot/mongodb'; import { PostgreSQLClient } from '@stock-bot/postgres'; import { QuestDBClient } from '@stock-bot/questdb'; -import type { AppConfig } from '../config/schemas'; +import type { AppConfig } from '../config/schemas/index'; import type { ServiceDefinitions } from '../container/types'; export function registerDatabaseServices( diff --git a/libs/core/di/src/scanner/handler-scanner.ts b/libs/core/di/src/scanner/handler-scanner.ts index 65f7565..635e5d2 100644 --- a/libs/core/di/src/scanner/handler-scanner.ts +++ b/libs/core/di/src/scanner/handler-scanner.ts @@ -10,7 +10,6 @@ import type { HandlerMetadata, HandlerRegistry, } from '@stock-bot/handler-registry'; -import { createJobHandler } from '@stock-bot/handlers'; import { getLogger } from '@stock-bot/logger'; import type { ExecutionContext, IHandler } from '@stock-bot/types'; @@ -129,14 +128,14 @@ export class HandlerScanner { // Build configuration with operation handlers const operationHandlers: Record = {}; for (const op of operations) { - operationHandlers[op.name] = createJobHandler(async payload => { + operationHandlers[op.name] = async (payload: any) => { const handler = this.container.resolve(handlerName); const context: ExecutionContext = { type: 'queue', metadata: { source: 'queue', timestamp: Date.now() }, }; return await handler.execute(op.name, payload, context); - }); + }; } const configuration: HandlerConfiguration = { diff --git a/libs/core/di/src/service-application.ts b/libs/core/di/src/service-application.ts index cd3c7f9..3c25cfc 100644 --- a/libs/core/di/src/service-application.ts +++ b/libs/core/di/src/service-application.ts @@ -10,7 +10,7 @@ import type { BaseAppConfig, UnifiedAppConfig } from '@stock-bot/config'; import { toUnifiedConfig } from '@stock-bot/config'; import type { HandlerRegistry } from '@stock-bot/handler-registry'; import { getLogger, setLoggerConfig, shutdownLoggers, type Logger } from '@stock-bot/logger'; -import { Shutdown } from '@stock-bot/shutdown'; +import { Shutdown, SHUTDOWN_DEFAULTS } from '@stock-bot/shutdown'; import type { IServiceContainer } from '@stock-bot/types'; import type { ServiceDefinitions } from './container/types'; @@ -166,7 +166,7 @@ export class ServiceApplication { private registerShutdownHandlers(): void { // Priority 1: Queue system (highest priority) if (this.serviceConfig.enableScheduledJobs) { - this.shutdown.onShutdownHigh(async () => { + this.shutdown.onShutdown(async () => { this.logger.info('Shutting down queue system...'); try { const queueManager = this.container?.resolve('queueManager'); @@ -177,11 +177,11 @@ export class ServiceApplication { } catch (error) { this.logger.error('Error shutting down queue system', { error }); } - }, 'Queue System'); + }, SHUTDOWN_DEFAULTS.HIGH_PRIORITY, 'Queue System'); } // Priority 1: HTTP Server (high priority) - this.shutdown.onShutdownHigh(async () => { + this.shutdown.onShutdown(async () => { if (this.server) { this.logger.info('Stopping HTTP server...'); try { @@ -191,21 +191,21 @@ export class ServiceApplication { this.logger.error('Error stopping HTTP server', { error }); } } - }, 'HTTP Server'); + }, SHUTDOWN_DEFAULTS.HIGH_PRIORITY, 'HTTP Server'); // Custom shutdown hook if (this.hooks.onBeforeShutdown) { - this.shutdown.onShutdownHigh(async () => { + this.shutdown.onShutdown(async () => { try { await this.hooks.onBeforeShutdown!(); } catch (error) { this.logger.error('Error in custom shutdown hook', { error }); } - }, 'Custom Shutdown'); + }, SHUTDOWN_DEFAULTS.HIGH_PRIORITY, 'Custom Shutdown'); } // Priority 2: Services and connections (medium priority) - this.shutdown.onShutdownMedium(async () => { + this.shutdown.onShutdown(async () => { this.logger.info('Disposing services and connections...'); try { if (this.container) { @@ -230,10 +230,10 @@ export class ServiceApplication { } catch (error) { this.logger.error('Error disposing services', { error }); } - }, 'Services'); + }, SHUTDOWN_DEFAULTS.MEDIUM_PRIORITY, 'Services'); // Priority 3: Logger shutdown (lowest priority - runs last) - this.shutdown.onShutdownLow(async () => { + this.shutdown.onShutdown(async () => { try { this.logger.info('Shutting down loggers...'); await shutdownLoggers(); @@ -241,7 +241,7 @@ export class ServiceApplication { } catch { // Silently ignore logger shutdown errors } - }, 'Loggers'); + }, SHUTDOWN_DEFAULTS.LOW_PRIORITY, 'Loggers'); } /** diff --git a/libs/core/handler-registry/src/registry.ts b/libs/core/handler-registry/src/registry.ts index 5cc3099..31b7581 100644 --- a/libs/core/handler-registry/src/registry.ts +++ b/libs/core/handler-registry/src/registry.ts @@ -80,23 +80,6 @@ export class HandlerRegistry { return this.handlers.has(handlerName); } - /** - * Get handlers for a specific service - */ - getServiceHandlers(serviceName: string): HandlerMetadata[] { - const handlers: HandlerMetadata[] = []; - - for (const [handlerName, service] of this.handlerServices) { - if (service === serviceName) { - const metadata = this.handlers.get(handlerName); - if (metadata) { - handlers.push(metadata); - } - } - } - - return handlers; - } /** * Set service ownership for a handler diff --git a/libs/core/handlers/src/index.ts b/libs/core/handlers/src/index.ts index d49f9b2..73c3d19 100644 --- a/libs/core/handlers/src/index.ts +++ b/libs/core/handlers/src/index.ts @@ -1,37 +1,9 @@ -// Base handler classes -export { BaseHandler, ScheduledHandler } from './base/BaseHandler'; -export type { JobScheduleOptions } from './base/BaseHandler'; - -// Handler registry is now in a separate package - -// Utilities -export { createJobHandler } from './utils/create-job-handler'; - -// Re-export types from types package for convenience -export type { - ExecutionContext, - IHandler, - JobHandler, - ScheduledJob, - HandlerConfig, - HandlerConfigWithSchedule, - TypedJobHandler, - HandlerMetadata, - OperationMetadata, - IServiceContainer, -} from '@stock-bot/types'; - -// Decorators +// Export only what's actually used +export { BaseHandler } from './base/BaseHandler'; export { Handler, Operation, QueueSchedule, ScheduledOperation, Disabled, -} from './decorators/decorators'; - -// Auto-registration utilities -export { autoRegisterHandlers, createAutoHandlerRegistry } from './registry/auto-register'; - -// Future exports - commented for now -// export { EventListener, EventPublisher } from './decorators/decorators'; +} from './decorators/decorators'; \ No newline at end of file diff --git a/libs/core/queue/src/index.ts b/libs/core/queue/src/index.ts index aa77c2a..0426f43 100644 --- a/libs/core/queue/src/index.ts +++ b/libs/core/queue/src/index.ts @@ -10,8 +10,6 @@ export { parseQueueName, } from './service-utils'; -// Re-export utilities from handlers package -export { createJobHandler } from '@stock-bot/handlers'; // Batch processing export { processBatchJob, processItems } from './batch-processor'; diff --git a/libs/core/shutdown/src/constants.ts b/libs/core/shutdown/src/constants.ts new file mode 100644 index 0000000..e3cb0b6 --- /dev/null +++ b/libs/core/shutdown/src/constants.ts @@ -0,0 +1,35 @@ +/** + * Core constants used across the stock-bot application + */ + +// Cache constants +export const CACHE_DEFAULTS = { + TTL: 3600, // 1 hour in seconds + KEY_PREFIX: 'cache:', + SCAN_COUNT: 100, +} as const; + +// Redis connection constants +export const REDIS_DEFAULTS = { + DB: 0, + MAX_RETRIES: 3, + RETRY_DELAY: 100, + CONNECT_TIMEOUT: 10000, + COMMAND_TIMEOUT: 5000, + KEEP_ALIVE: 0, +} as const; + +// Shutdown constants +export const SHUTDOWN_DEFAULTS = { + TIMEOUT: 30000, // 30 seconds + HIGH_PRIORITY: 10, + MEDIUM_PRIORITY: 50, + LOW_PRIORITY: 90, +} as const; + +// Pool size constants +export const POOL_SIZE_DEFAULTS = { + MIN_POOL_SIZE: 2, + MAX_POOL_SIZE: 10, + CPU_MULTIPLIER: 2, +} as const; \ No newline at end of file diff --git a/libs/core/shutdown/src/index.ts b/libs/core/shutdown/src/index.ts index 14319cd..eb0892a 100644 --- a/libs/core/shutdown/src/index.ts +++ b/libs/core/shutdown/src/index.ts @@ -1,118 +1,15 @@ import { Shutdown } from './shutdown'; -import type { ShutdownResult } from './types'; -/** - * @stock-bot/shutdown - Shutdown management library - * - * Main exports for the shutdown library - */ - -// Core shutdown classes and types +// Export only what's actually used export { Shutdown } from './shutdown'; -export type { - ShutdownCallback, - ShutdownOptions, - ShutdownResult, - PrioritizedShutdownCallback, -} from './types'; +export { SHUTDOWN_DEFAULTS } from './constants'; -// Global singleton instance -let globalInstance: Shutdown | null = null; - -/** - * Get the global shutdown instance (creates one if it doesn't exist) - */ -function getGlobalInstance(): Shutdown { - if (!globalInstance) { - globalInstance = Shutdown.getInstance(); - } - return globalInstance; -} - -/** - * Convenience functions for global shutdown management - */ - -/** - * Register a cleanup callback that will be executed during shutdown - */ +// Convenience function used by queue-manager export function onShutdown( callback: () => Promise | void, priority?: number, name?: string ): void { - getGlobalInstance().onShutdown(callback, priority, name); -} - -/** - * Register a high priority shutdown callback (for queues, critical services) - */ -export function onShutdownHigh(callback: () => Promise | void, name?: string): void { - getGlobalInstance().onShutdownHigh(callback, name); -} - -/** - * Register a medium priority shutdown callback (for databases, connections) - */ -export function onShutdownMedium(callback: () => Promise | void, name?: string): void { - getGlobalInstance().onShutdownMedium(callback, name); -} - -/** - * Register a low priority shutdown callback (for loggers, cleanup) - */ -export function onShutdownLow(callback: () => Promise | void, name?: string): void { - getGlobalInstance().onShutdownLow(callback, name); -} - -/** - * Set the shutdown timeout in milliseconds - */ -export function setShutdownTimeout(timeout: number): void { - getGlobalInstance().setTimeout(timeout); -} - -/** - * Check if shutdown is currently in progress - */ -export function isShuttingDown(): boolean { - return globalInstance?.isShutdownInProgress() || false; -} - -/** - * Check if shutdown signal was received (for quick checks in running jobs) - */ -export function isShutdownSignalReceived(): boolean { - const globalFlag = globalThis.__SHUTDOWN_SIGNAL_RECEIVED__ || false; - const instanceFlag = globalInstance?.isShutdownSignalReceived() || false; - return globalFlag || instanceFlag; -} - -/** - * Get the number of registered shutdown callbacks - */ -export function getShutdownCallbackCount(): number { - return globalInstance?.getCallbackCount() || 0; -} - -/** - * Manually initiate graceful shutdown - */ -export function initiateShutdown(signal?: string): Promise { - return getGlobalInstance().shutdown(signal); -} - -/** - * Manually initiate graceful shutdown and exit the process - */ -export function shutdownAndExit(signal?: string, exitCode = 0): Promise { - return getGlobalInstance().shutdownAndExit(signal, exitCode); -} - -/** - * Reset the global instance (mainly for testing) - */ -export function resetShutdown(): void { - globalInstance = null; - Shutdown.reset(); -} + const shutdown = Shutdown.getInstance(); + shutdown.onShutdown(callback, priority, name); +} \ No newline at end of file diff --git a/libs/core/shutdown/src/shutdown.ts b/libs/core/shutdown/src/shutdown.ts index b82b858..9e9ff75 100644 --- a/libs/core/shutdown/src/shutdown.ts +++ b/libs/core/shutdown/src/shutdown.ts @@ -1,46 +1,28 @@ -/** - * Shutdown management for Node.js applications - * - * Features: - * - Automatic signal handling (SIGTERM, SIGINT, etc.) - * - Configurable shutdown timeout - * - Multiple cleanup callbacks with error handling - * - Platform-specific signal support (Windows/Unix) - */ - import { getLogger } from '@stock-bot/logger'; -import type { - PrioritizedShutdownCallback, - ShutdownCallback, - ShutdownOptions, - ShutdownResult, -} from './types'; +import type { ShutdownCallback, ShutdownOptions } from './types'; +import { SHUTDOWN_DEFAULTS } from './constants'; -// Global flag that works across all processes/workers -declare global { - var __SHUTDOWN_SIGNAL_RECEIVED__: boolean | undefined; +interface CallbackEntry { + callback: ShutdownCallback; + priority: number; + name?: string; } export class Shutdown { private static instance: Shutdown | null = null; private readonly logger = getLogger('shutdown'); private isShuttingDown = false; - private signalReceived = false; // Track if shutdown signal was received - private shutdownTimeout = 30000; // 30 seconds default - private callbacks: PrioritizedShutdownCallback[] = []; + private shutdownTimeout: number; + private callbacks: CallbackEntry[] = []; private signalHandlersRegistered = false; constructor(options: ShutdownOptions = {}) { - this.shutdownTimeout = options.timeout || 30000; - + this.shutdownTimeout = options.timeout || SHUTDOWN_DEFAULTS.TIMEOUT; if (options.autoRegister !== false) { this.setupSignalHandlers(); } } - /** - * Get or create singleton instance - */ static getInstance(options?: ShutdownOptions): Shutdown { if (!Shutdown.instance) { Shutdown.instance = new Shutdown(options); @@ -49,211 +31,63 @@ export class Shutdown { } /** - * Reset singleton instance (mainly for testing) + * Register a cleanup callback */ - static reset(): void { - Shutdown.instance = null; - } - - /** - * Register a cleanup callback with priority (lower numbers = higher priority) - */ - onShutdown(callback: ShutdownCallback, priority: number = 50, name?: string): void { - if (this.isShuttingDown) { - return; - } + onShutdown(callback: ShutdownCallback, priority: number = SHUTDOWN_DEFAULTS.MEDIUM_PRIORITY, name?: string): void { + if (this.isShuttingDown) return; this.callbacks.push({ callback, priority, name }); } - /** - * Register a high priority shutdown callback (for queues, critical services) - */ - onShutdownHigh(callback: ShutdownCallback, name?: string): void { - this.onShutdown(callback, 10, name); - } - - /** - * Register a medium priority shutdown callback (for databases, connections) - */ - onShutdownMedium(callback: ShutdownCallback, name?: string): void { - this.onShutdown(callback, 50, name); - } - - /** - * Register a low priority shutdown callback (for loggers, cleanup) - */ - onShutdownLow(callback: ShutdownCallback, name?: string): void { - this.onShutdown(callback, 90, name); - } - - /** - * Set shutdown timeout in milliseconds - */ - setTimeout(timeout: number): void { - if (isNaN(timeout) || timeout <= 0) { - throw new Error('Shutdown timeout must be a positive number'); - } - this.shutdownTimeout = timeout; - } - - /** - * Get current shutdown state - */ - isShutdownInProgress(): boolean { - return this.isShuttingDown; - } - - /** - * Check if shutdown signal was received (for quick checks in running jobs) - */ - isShutdownSignalReceived(): boolean { - const globalFlag = (globalThis as any).__SHUTDOWN_SIGNAL_RECEIVED__ || false; - return globalFlag || this.signalReceived || this.isShuttingDown; - } - - /** - * Get number of registered callbacks - */ - getCallbackCount(): number { - return this.callbacks.length; - } - /** * Initiate graceful shutdown */ - async shutdown(_signal?: string): Promise { - if (this.isShuttingDown) { - return { - success: false, - callbacksExecuted: 0, - callbacksFailed: 0, - duration: 0, - error: 'Shutdown already in progress', - }; - } - + async shutdown(): Promise { + if (this.isShuttingDown) return; + this.isShuttingDown = true; - const startTime = Date.now(); - - const shutdownPromise = this.executeCallbacks(); - const timeoutPromise = new Promise((_, reject) => { - setTimeout(() => reject(new Error('Shutdown timeout')), this.shutdownTimeout); - }); - - let result: ShutdownResult; - + + const timeout = new Promise((_, reject) => + setTimeout(() => reject(new Error('Shutdown timeout')), this.shutdownTimeout) + ); + try { - const callbackResult = await Promise.race([shutdownPromise, timeoutPromise]); - const duration = Date.now() - startTime; - - result = { - success: callbackResult.failed === 0, - callbacksExecuted: callbackResult.executed, - callbacksFailed: callbackResult.failed, - duration, - error: callbackResult.failed > 0 ? `${callbackResult.failed} callbacks failed` : undefined, - }; + await Promise.race([this.executeCallbacks(), timeout]); } catch (error) { - const duration = Date.now() - startTime; - const errorMessage = error instanceof Error ? error.message : String(error); - - result = { - success: false, - callbacksExecuted: 0, - callbacksFailed: 0, - duration, - error: errorMessage, - }; + this.logger.error('Shutdown failed', error); + throw error; } - - // Don't call process.exit here - let the caller decide - - return result; } - /** - * Initiate shutdown and exit process - */ - async shutdownAndExit(signal?: string, exitCode = 0): Promise { - const result = await this.shutdown(signal); - const finalExitCode = result.success ? exitCode : 1; - - process.exit(finalExitCode); - } - - /** - * Execute all registered callbacks in priority order - */ - private async executeCallbacks(): Promise<{ executed: number; failed: number }> { - if (this.callbacks.length === 0) { - return { executed: 0, failed: 0 }; - } - - // Sort callbacks by priority (lower numbers = higher priority = execute first) - const sortedCallbacks = [...this.callbacks].sort((a, b) => a.priority - b.priority); - - let executed = 0; - let failed = 0; - - // Execute callbacks in order by priority - for (const { callback, name, priority } of sortedCallbacks) { - executed++; // Count all attempted executions + private async executeCallbacks(): Promise { + const sorted = [...this.callbacks].sort((a, b) => a.priority - b.priority); + + for (const { callback, name } of sorted) { try { await callback(); } catch (error) { - failed++; - if (name) { - this.logger.error(`Shutdown failed: ${name} (priority: ${priority})`, error); - } + this.logger.error(`Shutdown callback failed: ${name || 'unnamed'}`, error); } } - - return { executed, failed }; } - /** - * Setup signal handlers for graceful shutdown - */ private setupSignalHandlers(): void { - if (this.signalHandlersRegistered) { - return; - } - - // Platform-specific signals - const signals: NodeJS.Signals[] = - process.platform === 'win32' ? ['SIGINT', 'SIGTERM'] : ['SIGTERM', 'SIGINT', 'SIGUSR2']; - + if (this.signalHandlersRegistered) return; + + const signals: NodeJS.Signals[] = ['SIGTERM', 'SIGINT']; + signals.forEach(signal => { - process.on(signal, () => { - // Only process if not already shutting down + process.once(signal, async () => { if (!this.isShuttingDown) { - // Set signal flag immediately for quick checks - this.signalReceived = true; - // Also set global flag for workers/other processes - globalThis.__SHUTDOWN_SIGNAL_RECEIVED__ = true; - this.shutdownAndExit(signal).catch(() => { + try { + await this.shutdown(); + process.exit(0); + } catch { process.exit(1); - }); + } } }); }); - - // Handle uncaught exceptions - process.on('uncaughtException', () => { - this.signalReceived = true; - this.shutdownAndExit('uncaughtException', 1).catch(() => { - process.exit(1); - }); - }); - - // Handle unhandled promise rejections - process.on('unhandledRejection', () => { - this.signalReceived = true; - this.shutdownAndExit('unhandledRejection', 1).catch(() => { - process.exit(1); - }); - }); - + this.signalHandlersRegistered = true; } -} +} \ No newline at end of file