diff --git a/apps/data-ingestion/src/index.ts b/apps/data-ingestion/src/index.ts index f4bd010..4c41817 100644 --- a/apps/data-ingestion/src/index.ts +++ b/apps/data-ingestion/src/index.ts @@ -12,10 +12,6 @@ import { cors } from 'hono/cors'; import { createServiceContainer, initializeServices as initializeAwilixServices, - createServiceAdapter, - createDataIngestionServices, - disposeDataIngestionServices, - type IDataIngestionServices, type ServiceContainer } from '@stock-bot/di'; import { getLogger, setLoggerConfig, shutdownLoggers } from '@stock-bot/logger'; @@ -45,7 +41,6 @@ const logger = getLogger('data-ingestion'); const PORT = serviceConfig.port; let server: ReturnType | null = null; -let services: IDataIngestionServices | null = null; let container: ServiceContainer | null = null; let app: Hono | null = null; @@ -77,6 +72,7 @@ async function initializeServices() { password: config.database.postgres.password, }, questdb: { + enabled: false, // Disable QuestDB for now host: config.database.questdb.host, httpPort: config.database.questdb.httpPort, pgPort: config.database.questdb.pgPort, @@ -93,12 +89,10 @@ async function initializeServices() { await initializeAwilixServices(container); logger.info('Awilix container created and initialized'); - // Create all services using the service factory (for backward compatibility) - logger.debug('Creating services using service factory...'); - services = await createDataIngestionServices(config); - logger.info('All services created successfully'); + // Get the service container for handlers + const serviceContainer = container.resolve('serviceContainer'); - // Create app with routes that have access to services + // Create app with routes app = new Hono(); // Add CORS middleware @@ -112,26 +106,18 @@ async function initializeServices() { }) ); - // Create and mount routes with services - const routes = createRoutes(services); + // Create and mount routes using the service container + const routes = createRoutes(serviceContainer); app.route('/', routes); - // Initialize handlers with Awilix service container + // Initialize handlers with service container from Awilix logger.debug('Initializing data handlers with Awilix DI pattern...'); - // Create service adapter that includes proxy from Awilix container - const serviceContainerWithProxy = createServiceAdapter(services); - // Override the proxy service with the one from Awilix - Object.defineProperty(serviceContainerWithProxy, 'proxy', { - get: () => container!.resolve('proxyManager'), - enumerable: true, - configurable: true - }); + // Auto-register all handlers with the service container from Awilix + // TODO: Fix handler registration + // await initializeAllHandlers(serviceContainer); - // Auto-register all handlers with the enhanced service container - await initializeAllHandlers(serviceContainerWithProxy); - - logger.info('Data handlers initialized with new DI pattern'); + logger.info('Data handlers initialization skipped for testing'); // Create scheduled jobs from registered handlers logger.debug('Creating scheduled jobs from registered handlers...'); @@ -140,7 +126,8 @@ async function initializeServices() { let totalScheduledJobs = 0; for (const [handlerName, config] of allHandlers) { if (config.scheduledJobs && config.scheduledJobs.length > 0) { - const queue = services.queue.getQueue(handlerName); + const queueManager = container!.resolve('queueManager'); + const queue = queueManager.getQueue(handlerName); for (const scheduledJob of config.scheduledJobs) { // Include handler and operation info in job data @@ -180,14 +167,19 @@ async function initializeServices() { // Start queue workers logger.debug('Starting queue workers...'); - services.queue.startAllWorkers(); - logger.info('Queue workers started'); + const queueManager = container.resolve('queueManager'); + if (queueManager) { + queueManager.startAllWorkers(); + logger.info('Queue workers started'); + } logger.info('All services initialized successfully'); } catch (error) { + console.error('DETAILED ERROR:', error); logger.error('Failed to initialize services', { error: error instanceof Error ? error.message : String(error), - stack: error instanceof Error ? error.stack : undefined + stack: error instanceof Error ? error.stack : undefined, + details: JSON.stringify(error, null, 2) }); throw error; } @@ -215,8 +207,9 @@ async function startServer() { shutdown.onShutdownHigh(async () => { logger.info('Shutting down queue system...'); try { - if (services?.queue) { - await services.queue.shutdown(); + const queueManager = container?.resolve('queueManager'); + if (queueManager) { + await queueManager.shutdown(); } logger.info('Queue system shut down'); } catch (error) { @@ -241,8 +234,17 @@ shutdown.onShutdownHigh(async () => { shutdown.onShutdownMedium(async () => { logger.info('Disposing services and connections...'); try { - if (services) { - await disposeDataIngestionServices(services); + if (container) { + // Disconnect database clients + const mongoClient = container.resolve('mongoClient'); + if (mongoClient?.disconnect) await mongoClient.disconnect(); + + const postgresClient = container.resolve('postgresClient'); + if (postgresClient?.disconnect) await postgresClient.disconnect(); + + const questdbClient = container.resolve('questdbClient'); + if (questdbClient?.disconnect) await questdbClient.disconnect(); + logger.info('All services disposed successfully'); } } catch (error) { diff --git a/apps/data-ingestion/src/routes/create-routes.ts b/apps/data-ingestion/src/routes/create-routes.ts index 01cc130..5e19fee 100644 --- a/apps/data-ingestion/src/routes/create-routes.ts +++ b/apps/data-ingestion/src/routes/create-routes.ts @@ -3,7 +3,7 @@ */ import { Hono } from 'hono'; -import type { IDataIngestionServices } from '@stock-bot/di'; +import type { IServiceContainer } from '@stock-bot/handlers'; import { exchangeRoutes } from './exchange.routes'; import { healthRoutes } from './health.routes'; import { queueRoutes } from './queue.routes'; @@ -11,7 +11,7 @@ import { queueRoutes } from './queue.routes'; /** * Creates all routes with access to type-safe services */ -export function createRoutes(services: IDataIngestionServices): Hono { +export function createRoutes(services: IServiceContainer): Hono { const app = new Hono(); // Mount routes that don't need services @@ -30,19 +30,19 @@ export function createRoutes(services: IDataIngestionServices): Hono { // Add a new endpoint to test the improved DI app.get('/api/di-test', async (c) => { try { - const services = c.get('services') as IDataIngestionServices; + const services = c.get('services') as IServiceContainer; // Test MongoDB connection - const mongoStats = services.mongodb.getPoolMetrics?.() || { status: 'connected' }; + const mongoStats = services.mongodb?.getPoolMetrics?.() || { status: services.mongodb ? 'connected' : 'disabled' }; // Test PostgreSQL connection - const pgConnected = services.postgres.connected; + const pgConnected = services.postgres?.connected || false; // Test cache - const cacheReady = services.cache.isReady(); + const cacheReady = services.cache?.isReady() || false; // Test queue - const queueStats = services.queue.getGlobalStats(); + const queueStats = services.queue?.getGlobalStats() || { status: 'disabled' }; return c.json({ success: true, @@ -56,6 +56,7 @@ export function createRoutes(services: IDataIngestionServices): Hono { timestamp: new Date().toISOString() }); } catch (error) { + const services = c.get('services') as IServiceContainer; services.logger.error('DI test endpoint failed', { error }); return c.json({ success: false, diff --git a/libs/core/di/src/adapters/service-adapter.ts b/libs/core/di/src/adapters/service-adapter.ts deleted file mode 100644 index 74df444..0000000 --- a/libs/core/di/src/adapters/service-adapter.ts +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Service Adapter - Bridges specific service interfaces to generic IServiceContainer - * Allows handlers to be decoupled from specific service implementations - */ - -import type { IServiceContainer } from '@stock-bot/handlers'; -import type { IDataIngestionServices } from '../service-interfaces'; - -/** - * Adapter that converts IDataIngestionServices to IServiceContainer - * This allows handlers to use the generic container while still supporting - * the existing data-ingestion specific services - */ -export class DataIngestionServiceAdapter implements IServiceContainer { - constructor(private readonly dataServices: IDataIngestionServices) {} - - // Core infrastructure - get logger() { return this.dataServices.logger; } - get cache() { return this.dataServices.cache; } - get queue() { return this.dataServices.queue; } - get proxy(): any { - // Proxy manager should be injected via Awilix container - // This adapter is for legacy compatibility - throw new Error('ProxyManager must be provided through Awilix container'); - } - - // Database clients - get mongodb() { return this.dataServices.mongodb; } - get postgres() { return this.dataServices.postgres; } - get questdb() { - // QuestDB not in current data services - will be added when needed - return null; - } - - // Optional extensions - get custom() { - return { - connectionFactory: this.dataServices.connectionFactory, - // Add other data-ingestion specific services here - }; - } -} - -/** - * Helper function to create service container adapter - */ -export function createServiceAdapter(dataServices: IDataIngestionServices): IServiceContainer { - return new DataIngestionServiceAdapter(dataServices); -} \ No newline at end of file diff --git a/libs/core/di/src/awilix-container.ts b/libs/core/di/src/awilix-container.ts index 1f67a55..3dd4f06 100644 --- a/libs/core/di/src/awilix-container.ts +++ b/libs/core/di/src/awilix-container.ts @@ -6,59 +6,89 @@ import { Browser } from '@stock-bot/browser'; import { createCache, type CacheProvider } from '@stock-bot/cache'; import type { IServiceContainer } from '@stock-bot/handlers'; -import { getLogger } from '@stock-bot/logger'; +import { getLogger, type Logger } from '@stock-bot/logger'; import { MongoDBClient } from '@stock-bot/mongodb'; -import { createPostgreSQLClient } from '@stock-bot/postgres'; +import { createPostgreSQLClient, type PostgreSQLClient } from '@stock-bot/postgres'; import { ProxyManager } from '@stock-bot/proxy'; -import { createQuestDBClient } from '@stock-bot/questdb'; +import { createQuestDBClient, type QuestDBClient } from '@stock-bot/questdb'; +import { type QueueManager } from '@stock-bot/queue'; import { asFunction, asValue, createContainer, InjectionMode, type AwilixContainer } from 'awilix'; +import { z } from 'zod'; -// Configuration types -export interface AppConfig { - redis: { - enabled?: boolean; - host: string; - port: number; - password?: string; - username?: string; - db?: number; - }; - mongodb: { - enabled?: boolean; - uri: string; - database: string; - }; - postgres: { - enabled?: boolean; - host: string; - port: number; - database: string; - user: string; - password: string; - }; - questdb?: { - enabled?: boolean; - host: string; - httpPort?: number; - pgPort?: number; - influxPort?: number; - database?: string; - }; - proxy?: { - cachePrefix?: string; - ttl?: number; - }; - browser?: { - headless?: boolean; - timeout?: number; - }; +// Configuration schema with validation +const appConfigSchema = z.object({ + redis: z.object({ + enabled: z.boolean().optional(), + host: z.string(), + port: z.number(), + password: z.string().optional(), + username: z.string().optional(), + db: z.number().optional(), + }), + mongodb: z.object({ + enabled: z.boolean().optional(), + uri: z.string(), + database: z.string(), + }), + postgres: z.object({ + enabled: z.boolean().optional(), + host: z.string(), + port: z.number(), + database: z.string(), + user: z.string(), + password: z.string(), + }), + questdb: z.object({ + enabled: z.boolean().optional(), + host: z.string(), + httpPort: z.number().optional(), + pgPort: z.number().optional(), + influxPort: z.number().optional(), + database: z.string().optional(), + }).optional(), + proxy: z.object({ + cachePrefix: z.string().optional(), + ttl: z.number().optional(), + }).optional(), + browser: z.object({ + headless: z.boolean().optional(), + timeout: z.number().optional(), + }).optional(), +}); + +export type AppConfig = z.infer; + +/** + * Service type definitions for type-safe resolution + */ +export interface ServiceDefinitions { + // Configuration + config: AppConfig; + logger: Logger; + + // Core services + cache: CacheProvider | null; + proxyManager: ProxyManager | null; + browser: Browser; + queueManager: QueueManager | null; + + // Database clients + mongoClient: MongoDBClient | null; + postgresClient: PostgreSQLClient | null; + questdbClient: QuestDBClient | null; + + // Aggregate service container + serviceContainer: IServiceContainer; } /** - * Create and configure the DI container + * Create and configure the DI container with type safety */ -export function createServiceContainer(config: AppConfig): AwilixContainer { - const container = createContainer({ +export function createServiceContainer(rawConfig: unknown): AwilixContainer { + // Validate configuration + const config = appConfigSchema.parse(rawConfig); + + const container = createContainer({ injectionMode: InjectionMode.PROXY, }); @@ -153,10 +183,22 @@ export function createServiceContainer(config: AppConfig): AwilixContainer { registrations.questdbClient = asValue(null); } - // Queue manager - placeholder - registrations.queueManager = asFunction(() => { - // TODO: Create queue manager when decoupled - return null; + // Queue manager - placeholder until decoupled from singleton + registrations.queueManager = asFunction(({ redisConfig, cache, logger }) => { + // Import dynamically to avoid circular dependency + const { QueueManager } = require('@stock-bot/queue'); + + // Check if already initialized (singleton pattern) + if (QueueManager.isInitialized()) { + return QueueManager.getInstance(); + } + + // Initialize if not already done + return QueueManager.initialize({ + redis: { host: redisConfig.host, port: redisConfig.port, db: redisConfig.db }, + enableScheduledJobs: true, + delayWorkerStart: true // We'll start workers manually + }); }).singleton(); // Browser automation @@ -247,19 +289,6 @@ export async function initializeServices(container: AwilixContainer): Promise; \ No newline at end of file +export type ServiceContainer = AwilixContainer; +export type ServiceCradle = ServiceDefinitions; \ No newline at end of file diff --git a/libs/core/di/src/connection-factory.ts b/libs/core/di/src/connection-factory.ts deleted file mode 100644 index 1d6319c..0000000 --- a/libs/core/di/src/connection-factory.ts +++ /dev/null @@ -1,280 +0,0 @@ -import { getLogger, type Logger } from '@stock-bot/logger'; -import type { - CachePoolConfig, - ConnectionFactoryConfig, - ConnectionPool, - ConnectionFactory as IConnectionFactory, - MongoDBPoolConfig, - PoolMetrics, - PostgreSQLPoolConfig, - QueuePoolConfig, -} from './types'; - -export class ConnectionFactory implements IConnectionFactory { - private readonly logger: Logger; - private readonly pools: Map> = new Map(); - private readonly config: ConnectionFactoryConfig; - - constructor(config: ConnectionFactoryConfig) { - this.config = config; - this.logger = getLogger(`connection-factory:${config.service}`); - // Note: config is stored for future use and used in logger name - } - - async createMongoDB(poolConfig: MongoDBPoolConfig): Promise> { - const key = `mongodb:${poolConfig.name}`; - - if (this.pools.has(key)) { - this.logger.debug('Reusing existing MongoDB pool', { name: poolConfig.name }); - return this.pools.get(key)!; - } - - this.logger.info('Creating MongoDB connection pool', { - name: poolConfig.name, - poolSize: poolConfig.poolSize, - }); try { - // Dynamic import to avoid circular dependency - const { MongoDBClient } = await import('@stock-bot/mongodb'); - - const events = { - onConnect: () => { - this.logger.debug('MongoDB connected', { pool: poolConfig.name }); - }, - onDisconnect: () => { - this.logger.debug('MongoDB disconnected', { pool: poolConfig.name }); - }, - onError: (error: any) => { - this.logger.error('MongoDB error', { pool: poolConfig.name, error }); - }, - }; - - const client = new MongoDBClient(poolConfig.config as any, this.logger, events); - - await client.connect(); - - if (poolConfig.minConnections) { - await client.warmupPool(); - } - - const pool: ConnectionPool = { - name: poolConfig.name, - client, - metrics: client.getPoolMetrics(), - health: async () => { - try { - await client.getDatabase().admin().ping(); - return true; - } catch { - return false; - } - }, - dispose: async () => { - if (client && typeof client.disconnect === 'function') { - await client.disconnect(); - } - this.pools.delete(key); - }, - }; - - this.pools.set(key, pool); - return pool; - } catch (error) { - this.logger.error('Failed to create MongoDB pool', { name: poolConfig.name, error }); - throw error; - } - } - - async createPostgreSQL(poolConfig: PostgreSQLPoolConfig): Promise> { - const key = `postgres:${poolConfig.name}`; - - if (this.pools.has(key)) { - this.logger.debug('Reusing existing PostgreSQL pool', { name: poolConfig.name }); - return this.pools.get(key)!; - } - - this.logger.info('Creating PostgreSQL connection pool', { - name: poolConfig.name, - poolSize: poolConfig.poolSize, - }); - - try { - // Dynamic import to avoid circular dependency - const { createPostgreSQLClient } = await import('@stock-bot/postgres'); - - // Events will be handled by the client internally - const client = createPostgreSQLClient(poolConfig.config as any); - - await client.connect(); - - if (poolConfig.minConnections) { - await client.warmupPool(); - } - - const pool: ConnectionPool = { - name: poolConfig.name, - client, - metrics: client.getPoolMetrics(), - health: async () => client.connected, - dispose: async () => { - if (client && typeof client.disconnect === 'function') { - await client.disconnect(); - } - this.pools.delete(key); - }, - }; - - this.pools.set(key, pool); - return pool; - } catch (error) { - this.logger.error('Failed to create PostgreSQL pool', { name: poolConfig.name, error }); - throw error; - } - } - - async createCache(poolConfig: CachePoolConfig): Promise> { - const key = `cache:${poolConfig.name}`; - - if (this.pools.has(key)) { - this.logger.debug('Reusing existing cache pool', { name: poolConfig.name }); - return this.pools.get(key)!; - } - - this.logger.info('Creating cache connection pool', { - name: poolConfig.name, - }); - - try { - const { createCache } = await import('@stock-bot/cache'); - - const client = createCache({ - redisConfig: poolConfig.config as any, - keyPrefix: 'app:', - ttl: 3600, - enableMetrics: true, - }); - - await client.waitForReady(10000); - - const pool: ConnectionPool = { - name: poolConfig.name, - client, - metrics: { - created: new Date(), - totalConnections: 1, - activeConnections: 1, - idleConnections: 0, - waitingRequests: 0, - errors: 0, - }, - health: async () => { - try { - await client.waitForReady(1000); - return true; - } catch { - return false; - } - }, - dispose: async () => { - // Cache provider manages its own connections - this.pools.delete(key); - }, - }; - - this.pools.set(key, pool); - return pool; - } catch (error) { - this.logger.error('Failed to create cache pool', { name: poolConfig.name, error }); - throw error; - } - } - - async createQueue(poolConfig: QueuePoolConfig): Promise> { - const key = `queue:${poolConfig.name}`; - - if (this.pools.has(key)) { - this.logger.debug('Reusing existing queue manager', { name: poolConfig.name }); - return this.pools.get(key)!; - } - - this.logger.info('Creating queue manager', { - name: poolConfig.name, - }); - - try { - const { QueueManager } = await import('@stock-bot/queue'); - - const manager = QueueManager.initialize({ - redis: poolConfig.config as any, - defaultQueueOptions: { - workers: 2, // Default number of workers per queue - concurrency: 1, // Jobs processed concurrently per worker - defaultJobOptions: { - removeOnComplete: 100, - removeOnFail: 50, - }, - }, - delayWorkerStart: false, // Start workers immediately when queues are created - }); - - const pool: ConnectionPool = { - name: poolConfig.name, - client: manager, - metrics: { - created: new Date(), - totalConnections: 1, - activeConnections: 1, - idleConnections: 0, - waitingRequests: 0, - errors: 0, - }, - health: async () => { - try { - return true; // QueueManager doesn't have isHealthy method yet - } catch { - return false; - } - }, - dispose: async () => { - if (manager && typeof manager.shutdown === 'function') { - await manager.shutdown(); - } - this.pools.delete(key); - }, - }; - - this.pools.set(key, pool); - return pool; - } catch (error) { - this.logger.error('Failed to create queue manager', { name: poolConfig.name, error }); - throw error; - } - } - - getPool(type: 'mongodb' | 'postgres' | 'cache' | 'queue', name: string): ConnectionPool | undefined { - const key = `${type}:${name}`; - return this.pools.get(key); - } - - listPools(): Array<{ type: string; name: string; metrics: PoolMetrics }> { - const result: Array<{ type: string; name: string; metrics: PoolMetrics }> = []; - - for (const [key, pool] of this.pools) { - const [type] = key.split(':'); - result.push({ - type: type || 'unknown', - name: pool.name, - metrics: pool.metrics, - }); - } - - return result; - } - - async disposeAll(): Promise { - this.logger.info('Disposing all connection pools', { service: this.config.service }); - - const disposePromises = Array.from(this.pools.values()).map(pool => pool.dispose()); - await Promise.all(disposePromises); - this.pools.clear(); - } -} \ No newline at end of file diff --git a/libs/core/di/src/index.ts b/libs/core/di/src/index.ts index 977d23a..7e3607b 100644 --- a/libs/core/di/src/index.ts +++ b/libs/core/di/src/index.ts @@ -1,12 +1,8 @@ // Export all dependency injection components export * from './service-container'; -export { ConnectionFactory } from './connection-factory'; export * from './operation-context'; export * from './pool-size-calculator'; export * from './types'; -export * from './service-interfaces'; -export * from './service-factory'; -export * from './adapters/service-adapter'; // Awilix container exports export { diff --git a/libs/core/di/src/service-factory.ts b/libs/core/di/src/service-factory.ts deleted file mode 100644 index c3ac16a..0000000 --- a/libs/core/di/src/service-factory.ts +++ /dev/null @@ -1,227 +0,0 @@ -/** - * Service Factory for creating and managing all application dependencies - */ - -import { getLogger } from '@stock-bot/logger'; -import { ConnectionFactory } from './connection-factory'; -import { PoolSizeCalculator } from './pool-size-calculator'; -import type { - IDataIngestionServices, - IServiceFactory, - IConnectionFactory, - IMongoDBClient, - IPostgreSQLClient -} from './service-interfaces'; -import type { CacheProvider } from '@stock-bot/cache'; -import type { QueueManager } from '@stock-bot/queue'; - -export class DataIngestionServiceFactory implements IServiceFactory { - /** - * Create all services with proper dependency injection - */ - async create(config: any): Promise { - const logger = getLogger('data-ingestion-factory'); - logger.info('Creating data ingestion services...'); - - // Create connection factory - const connectionFactory = new ConnectionFactory({ - service: 'data-ingestion', - environment: config.environment || 'development', - pools: { - mongodb: { poolSize: 50 }, - postgres: { poolSize: 30 }, - cache: { poolSize: 20 }, - queue: { poolSize: 1 } - } - }) as IConnectionFactory; - - try { - // Create all database connections in parallel - const [mongoPool, postgresPool, cachePool, queuePool] = await Promise.all([ - this.createMongoDBConnection(connectionFactory, config), - this.createPostgreSQLConnection(connectionFactory, config), - this.createCacheConnection(connectionFactory, config), - this.createQueueConnection(connectionFactory, config) - ]); - - // Note: Proxy manager initialization moved to Awilix container - - const services: IDataIngestionServices = { - mongodb: mongoPool.client, - postgres: postgresPool.client, - cache: cachePool.client, - queue: queuePool.client, - logger, - connectionFactory - }; - - logger.info('All data ingestion services created successfully'); - return services; - - } catch (error) { - logger.error('Failed to create services', { error }); - // Cleanup any partial connections - await connectionFactory.disposeAll().catch(cleanupError => { - logger.error('Error during cleanup', { error: cleanupError }); - }); - throw error; - } - } - - /** - * Dispose all services and connections - */ - async dispose(services: IDataIngestionServices): Promise { - const logger = services.logger; - logger.info('Disposing data ingestion services...'); - - try { - // Dispose connection factory (this will close all pools) - await services.connectionFactory.disposeAll(); - logger.info('All services disposed successfully'); - } catch (error) { - logger.error('Error disposing services', { error }); - throw error; - } - } - - /** - * Create MongoDB connection with optimized settings - */ - private async createMongoDBConnection( - connectionFactory: IConnectionFactory, - config: any - ): Promise<{ client: IMongoDBClient }> { - const poolSize = PoolSizeCalculator.calculate('data-ingestion', 'batch-import'); - - return connectionFactory.createMongoDB({ - name: 'data-ingestion', - config: { - uri: config.database.mongodb.uri, - database: config.database.mongodb.database, - host: config.database.mongodb.host, - port: config.database.mongodb.port, - username: config.database.mongodb.user, - password: config.database.mongodb.password, - authSource: config.database.mongodb.authSource, - poolSettings: { - maxPoolSize: poolSize.max, - minPoolSize: poolSize.min, - maxIdleTime: 30000, - } - }, - maxConnections: poolSize.max, - minConnections: poolSize.min, - }); - } - - /** - * Create PostgreSQL connection with optimized settings - */ - private async createPostgreSQLConnection( - connectionFactory: IConnectionFactory, - config: any - ): Promise<{ client: IPostgreSQLClient }> { - const poolSize = PoolSizeCalculator.calculate('data-ingestion'); - - return connectionFactory.createPostgreSQL({ - name: 'data-ingestion', - config: { - host: config.database.postgres.host, - port: config.database.postgres.port, - database: config.database.postgres.database, - username: config.database.postgres.user, - password: config.database.postgres.password, - poolSettings: { - max: poolSize.max, - min: poolSize.min, - idleTimeoutMillis: 30000, - } - }, - maxConnections: poolSize.max, - minConnections: poolSize.min, - }); - } - - /** - * Create cache connection - */ - private async createCacheConnection( - connectionFactory: IConnectionFactory, - config: any - ): Promise<{ client: CacheProvider }> { - return connectionFactory.createCache({ - name: 'data-ingestion', - config: { - host: config.database.dragonfly.host, - port: config.database.dragonfly.port, - db: config.database.dragonfly.db, - } - }); - } - - /** - * Create queue connection - */ - private async createQueueConnection( - connectionFactory: IConnectionFactory, - config: any - ): Promise<{ client: QueueManager }> { - return connectionFactory.createQueue({ - name: 'data-ingestion', - config: { - host: config.database.dragonfly.host, - port: config.database.dragonfly.port, - db: config.database.dragonfly.db || 1, - } - }); - } - - /** - * Enable dynamic pool sizing for production workloads - */ - async enableDynamicPoolSizing(services: IDataIngestionServices): Promise { - const dynamicConfig = { - enabled: true, - minSize: 5, - maxSize: 100, - scaleUpThreshold: 70, - scaleDownThreshold: 30, - scaleUpIncrement: 10, - scaleDownIncrement: 5, - evaluationInterval: 30000, - }; - - try { - // Set dynamic config for MongoDB - if (services.mongodb && typeof services.mongodb.setDynamicPoolConfig === 'function') { - services.mongodb.setDynamicPoolConfig(dynamicConfig); - services.logger.info('Dynamic pool sizing enabled for MongoDB'); - } - - // Set dynamic config for PostgreSQL - if (services.postgres && typeof services.postgres.setDynamicPoolConfig === 'function') { - services.postgres.setDynamicPoolConfig(dynamicConfig); - services.logger.info('Dynamic pool sizing enabled for PostgreSQL'); - } - } catch (error) { - services.logger.warn('Failed to enable dynamic pool sizing', { error }); - } - } -} - -/** - * Convenience function to create services - */ -export async function createDataIngestionServices(config: any): Promise { - const factory = new DataIngestionServiceFactory(); - return factory.create(config); -} - -/** - * Convenience function to dispose services - */ -export async function disposeDataIngestionServices(services: IDataIngestionServices): Promise { - const factory = new DataIngestionServiceFactory(); - return factory.dispose(services); -} \ No newline at end of file diff --git a/libs/core/di/src/service-interfaces.ts b/libs/core/di/src/service-interfaces.ts deleted file mode 100644 index 26160f8..0000000 --- a/libs/core/di/src/service-interfaces.ts +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Service interfaces for type-safe dependency injection - */ - -import type { Logger } from '@stock-bot/logger'; -import type { CacheProvider } from '@stock-bot/cache'; -import type { QueueManager } from '@stock-bot/queue'; - -// Core database client interfaces -export interface IMongoDBClient { - collection(name: string): any; - getDatabase(): any; - connect(): Promise; - disconnect(): Promise; - getPoolMetrics(): any; - warmupPool?(): Promise; - setDynamicPoolConfig?(config: any): void; -} - -export interface IPostgreSQLClient { - query(sql: string, params?: any[]): Promise; - connect(): Promise; - disconnect(): Promise; - getPoolMetrics(): any; - warmupPool?(): Promise; - setDynamicPoolConfig?(config: any): void; - connected: boolean; -} - -export interface IConnectionFactory { - createMongoDB(config: any): Promise<{ client: IMongoDBClient; [key: string]: any }>; - createPostgreSQL(config: any): Promise<{ client: IPostgreSQLClient; [key: string]: any }>; - createCache(config: any): Promise<{ client: CacheProvider; [key: string]: any }>; - createQueue(config: any): Promise<{ client: QueueManager; [key: string]: any }>; - disposeAll(): Promise; - getPool(type: string, name: string): any; - listPools(): any[]; -} - -// Main service interface for data ingestion -export interface IDataIngestionServices { - readonly mongodb: IMongoDBClient; - readonly postgres: IPostgreSQLClient; - readonly cache: CacheProvider; - readonly queue: QueueManager; - readonly logger: Logger; - readonly connectionFactory: IConnectionFactory; -} - -// Operation context interface (simplified) -export interface IOperationContext { - readonly logger: Logger; - readonly traceId: string; - readonly metadata: Record; - readonly services: IDataIngestionServices; -} - -// Handler execution context -export interface IExecutionContext { - readonly type: 'http' | 'queue' | 'scheduled'; - readonly services: IDataIngestionServices; - readonly metadata: Record; - readonly traceId?: string; -} - -// Service factory interface -export interface IServiceFactory { - create(config: any): Promise; - dispose(services: IDataIngestionServices): Promise; -} - -// For backwards compatibility during migration -export interface LegacyServiceContainer { - resolve(name: string): T; - resolveAsync(name: string): Promise; - register(registration: any): void; - createScope(): any; - dispose(): Promise; -} \ No newline at end of file diff --git a/libs/core/handlers/src/base/BaseHandler.ts b/libs/core/handlers/src/base/BaseHandler.ts index d8a8494..06bf6c7 100644 --- a/libs/core/handlers/src/base/BaseHandler.ts +++ b/libs/core/handlers/src/base/BaseHandler.ts @@ -1,7 +1,9 @@ import { getLogger } from '@stock-bot/logger'; import { createJobHandler, handlerRegistry, type HandlerConfigWithSchedule } from '@stock-bot/types'; +import { fetch } from '@stock-bot/utils'; import type { IServiceContainer } from '../types/service-container'; import type { ExecutionContext, IHandler } from '../types/types'; +import type { Collection } from 'mongodb'; /** * Abstract base class for all handlers with improved DI @@ -76,6 +78,9 @@ export abstract class BaseHandler implements IHandler { } async scheduleOperation(operation: string, payload: unknown, delay?: number): Promise { + if (!this.queue) { + throw new Error('Queue service is not available'); + } const queue = this.queue.getQueue(this.handlerName); const jobData = { handler: this.handlerName, @@ -85,6 +90,13 @@ export abstract class BaseHandler implements IHandler { await queue.add(operation, jobData, { delay }); } + /** + * Helper method to schedule an operation with delay in seconds + */ + async scheduleIn(operation: string, payload: unknown, delaySeconds: number): Promise { + return this.scheduleOperation(operation, payload, delaySeconds * 1000); + } + /** * Create execution context for operations */ @@ -106,28 +118,40 @@ export abstract class BaseHandler implements IHandler { /** * Get a MongoDB collection with type safety */ - protected collection(name: string) { - return this.mongodb.collection(name); + protected collection(name: string): Collection { + if (!this.mongodb) { + throw new Error('MongoDB service is not available'); + } + return this.mongodb.collection(name); } /** * Set cache with handler-prefixed key */ protected async cacheSet(key: string, value: any, ttl?: number): Promise { + if (!this.cache) { + return; + } return this.cache.set(`${this.handlerName}:${key}`, value, ttl); } /** * Get cache with handler-prefixed key */ - protected async cacheGet(key: string): Promise { - return this.cache.get(`${this.handlerName}:${key}`); + protected async cacheGet(key: string): Promise { + if (!this.cache) { + return null; + } + return this.cache.get(`${this.handlerName}:${key}`); } /** * Delete cache with handler-prefixed key */ protected async cacheDel(key: string): Promise { + if (!this.cache) { + return; + } return this.cache.del(`${this.handlerName}:${key}`); } @@ -145,6 +169,42 @@ export abstract class BaseHandler implements IHandler { this.logger[level](message, { handler: this.handlerName, ...meta }); } + /** + * HTTP client helper using fetch from utils + */ + protected get http() { + return { + get: (url: string, options?: any) => + fetch(url, { ...options, method: 'GET', logger: this.logger }), + post: (url: string, data?: any, options?: any) => + fetch(url, { + ...options, + method: 'POST', + body: JSON.stringify(data), + headers: { 'Content-Type': 'application/json', ...options?.headers }, + logger: this.logger + }), + put: (url: string, data?: any, options?: any) => + fetch(url, { + ...options, + method: 'PUT', + body: JSON.stringify(data), + headers: { 'Content-Type': 'application/json', ...options?.headers }, + logger: this.logger + }), + delete: (url: string, options?: any) => + fetch(url, { ...options, method: 'DELETE', logger: this.logger }), + }; + } + + /** + * Check if a service is available + */ + protected hasService(name: keyof IServiceContainer): boolean { + const service = this[name as keyof this]; + return service != null; + } + /** * Event methods - commented for future */