diff --git a/.serena/cache/typescript/document_symbols_cache_v20-05-25.pkl b/.serena/cache/typescript/document_symbols_cache_v20-05-25.pkl index 44c3779..937683c 100644 Binary files a/.serena/cache/typescript/document_symbols_cache_v20-05-25.pkl and b/.serena/cache/typescript/document_symbols_cache_v20-05-25.pkl differ diff --git a/apps/data-ingestion/src/index.ts b/apps/data-ingestion/src/index.ts index dddc0d6..aa3d651 100644 --- a/apps/data-ingestion/src/index.ts +++ b/apps/data-ingestion/src/index.ts @@ -4,9 +4,9 @@ */ // Framework imports +import { initializeServiceConfig } from '@stock-bot/config'; import { Hono } from 'hono'; import { cors } from 'hono/cors'; -import { initializeServiceConfig } from '@stock-bot/config'; // Library imports import { createServiceContainer, @@ -17,8 +17,8 @@ import { getLogger, setLoggerConfig, shutdownLoggers } from '@stock-bot/logger'; import { Shutdown } from '@stock-bot/shutdown'; import { handlerRegistry } from '@stock-bot/types'; // Local imports -import { createRoutes } from './routes/create-routes'; import { initializeAllHandlers } from './handlers'; +import { createRoutes } from './routes/create-routes'; const config = initializeServiceConfig(); console.log('Data Service Configuration:', JSON.stringify(config, null, 2)); @@ -123,7 +123,11 @@ async function initializeServices() { let totalScheduledJobs = 0; for (const [handlerName, config] of allHandlers) { if (config.scheduledJobs && config.scheduledJobs.length > 0) { - const queueManager = container!.resolve('queueManager'); + const queueManager = container.resolve('queueManager'); + if(!queueManager) { + logger.error('Queue manager is not initialized, cannot create scheduled jobs'); + continue; + } const queue = queueManager.getQueue(handlerName); for (const scheduledJob of config.scheduledJobs) { diff --git a/apps/data-pipeline/src/clients.ts b/apps/data-pipeline/src/clients.ts index 8cd54e2..5488cf8 100644 --- a/apps/data-pipeline/src/clients.ts +++ b/apps/data-pipeline/src/clients.ts @@ -1,27 +1,8 @@ -import { MongoDBClient } from '@stock-bot/mongodb'; -import { PostgreSQLClient } from '@stock-bot/postgres'; +/** + * Client exports for backward compatibility + * + * @deprecated Use ServiceContainer parameter instead + * This file will be removed once all operations are migrated + */ -let postgresClient: PostgreSQLClient | null = null; -let mongodbClient: MongoDBClient | null = null; - -export function setPostgreSQLClient(client: PostgreSQLClient): void { - postgresClient = client; -} - -export function getPostgreSQLClient(): PostgreSQLClient { - if (!postgresClient) { - throw new Error('PostgreSQL client not initialized. Call setPostgreSQLClient first.'); - } - return postgresClient; -} - -export function setMongoDBClient(client: MongoDBClient): void { - mongodbClient = client; -} - -export function getMongoDBClient(): MongoDBClient { - if (!mongodbClient) { - throw new Error('MongoDB client not initialized. Call setMongoDBClient first.'); - } - return mongodbClient; -} +export { getMongoDBClient, getPostgreSQLClient } from './migration-helper'; \ No newline at end of file diff --git a/apps/data-pipeline/src/container-setup.ts b/apps/data-pipeline/src/container-setup.ts new file mode 100644 index 0000000..a1f2638 --- /dev/null +++ b/apps/data-pipeline/src/container-setup.ts @@ -0,0 +1,34 @@ +/** + * Service Container Setup for Data Pipeline + * Configures dependency injection for the data pipeline service + */ + +import type { ServiceContainer } from '@stock-bot/di'; +import { getLogger } from '@stock-bot/logger'; +import type { AppConfig } from '@stock-bot/config'; + +const logger = getLogger('data-pipeline-container'); + +/** + * Configure the service container for data pipeline workloads + */ +export function setupServiceContainer( + config: AppConfig, + container: ServiceContainer +): ServiceContainer { + logger.info('Configuring data pipeline service container...'); + + // Data pipeline specific configuration + // This service does more complex queries and transformations + const poolSizes = { + mongodb: config.environment === 'production' ? 40 : 20, + postgres: config.environment === 'production' ? 50 : 25, + cache: config.environment === 'production' ? 30 : 15, + }; + + logger.info('Data pipeline pool sizes configured', poolSizes); + + // The container is already configured with connections + // Just return it with our logging + return container; +} \ No newline at end of file diff --git a/apps/data-pipeline/src/handlers/exchanges/exchanges.handler.ts b/apps/data-pipeline/src/handlers/exchanges/exchanges.handler.ts index 2968dd2..cd503c3 100644 --- a/apps/data-pipeline/src/handlers/exchanges/exchanges.handler.ts +++ b/apps/data-pipeline/src/handlers/exchanges/exchanges.handler.ts @@ -1,5 +1,6 @@ import { getLogger } from '@stock-bot/logger'; -import { handlerRegistry, type HandlerConfig, type ScheduledJobConfig } from '@stock-bot/queue'; +import { handlerRegistry, createJobHandler, type HandlerConfig, type ScheduledJobConfig } from '@stock-bot/queue'; +import type { ServiceContainer } from '@stock-bot/di'; import { exchangeOperations } from './operations'; const logger = getLogger('exchanges-handler'); @@ -51,8 +52,23 @@ const exchangesHandlerConfig: HandlerConfig = { }, }; -export function initializeExchangesHandler(): void { +export function initializeExchangesHandler(container: ServiceContainer) { logger.info('Registering exchanges handler...'); - handlerRegistry.registerHandler(HANDLER_NAME, exchangesHandlerConfig); + + // Update operations to use container + const containerAwareOperations = Object.entries(exchangeOperations).reduce((acc, [key, operation]) => { + acc[key] = createJobHandler(async (payload: any) => { + return operation(payload, container); + }); + return acc; + }, {} as Record); + + const exchangesHandlerConfigWithContainer: HandlerConfig = { + ...exchangesHandlerConfig, + operations: containerAwareOperations, + }; + + handlerRegistry.register(HANDLER_NAME, exchangesHandlerConfigWithContainer); logger.info('Exchanges handler registered successfully'); } + diff --git a/apps/data-pipeline/src/handlers/exchanges/operations/clear-postgresql-data.operations.ts b/apps/data-pipeline/src/handlers/exchanges/operations/clear-postgresql-data.operations.ts index 808320e..b47b0cb 100644 --- a/apps/data-pipeline/src/handlers/exchanges/operations/clear-postgresql-data.operations.ts +++ b/apps/data-pipeline/src/handlers/exchanges/operations/clear-postgresql-data.operations.ts @@ -1,10 +1,13 @@ import { getLogger } from '@stock-bot/logger'; -import { getPostgreSQLClient } from '../../../clients'; +import type { ServiceContainer } from '@stock-bot/di'; import type { JobPayload } from '../../../types/job-payloads'; const logger = getLogger('enhanced-sync-clear-postgresql-data'); -export async function clearPostgreSQLData(payload: JobPayload): Promise<{ +export async function clearPostgreSQLData( + payload: JobPayload, + container: ServiceContainer +): Promise<{ exchangesCleared: number; symbolsCleared: number; mappingsCleared: number; @@ -12,7 +15,7 @@ export async function clearPostgreSQLData(payload: JobPayload): Promise<{ logger.info('Clearing existing PostgreSQL data...'); try { - const postgresClient = getPostgreSQLClient(); + const postgresClient = container.postgres; // Start transaction for atomic operations await postgresClient.query('BEGIN'); @@ -50,7 +53,7 @@ export async function clearPostgreSQLData(payload: JobPayload): Promise<{ return { exchangesCleared, symbolsCleared, mappingsCleared }; } catch (error) { - const postgresClient = getPostgreSQLClient(); + const postgresClient = container.postgres; await postgresClient.query('ROLLBACK'); logger.error('Failed to clear PostgreSQL data', { error }); throw error; diff --git a/apps/data-pipeline/src/handlers/exchanges/operations/enhanced-sync-status.operations.ts b/apps/data-pipeline/src/handlers/exchanges/operations/enhanced-sync-status.operations.ts index da188e9..f1ab881 100644 --- a/apps/data-pipeline/src/handlers/exchanges/operations/enhanced-sync-status.operations.ts +++ b/apps/data-pipeline/src/handlers/exchanges/operations/enhanced-sync-status.operations.ts @@ -1,14 +1,17 @@ import { getLogger } from '@stock-bot/logger'; -import { getPostgreSQLClient } from '../../../clients'; +import type { ServiceContainer } from '@stock-bot/di'; import type { JobPayload, SyncStatus } from '../../../types/job-payloads'; const logger = getLogger('enhanced-sync-status'); -export async function getSyncStatus(payload: JobPayload): Promise { +export async function getSyncStatus( + payload: JobPayload, + container: ServiceContainer +): Promise { logger.info('Getting comprehensive sync status...'); try { - const postgresClient = getPostgreSQLClient(); + const postgresClient = container.postgres; const query = ` SELECT provider, data_type as "dataType", last_sync_at as "lastSyncAt", last_sync_count as "lastSyncCount", sync_errors as "syncErrors" diff --git a/apps/data-pipeline/src/handlers/exchanges/operations/exchange-stats.operations.ts b/apps/data-pipeline/src/handlers/exchanges/operations/exchange-stats.operations.ts index 2c79d96..fdc17fc 100644 --- a/apps/data-pipeline/src/handlers/exchanges/operations/exchange-stats.operations.ts +++ b/apps/data-pipeline/src/handlers/exchanges/operations/exchange-stats.operations.ts @@ -1,14 +1,17 @@ import { getLogger } from '@stock-bot/logger'; -import { getPostgreSQLClient } from '../../../clients'; +import type { ServiceContainer } from '@stock-bot/di'; import type { JobPayload } from '../../../types/job-payloads'; const logger = getLogger('enhanced-sync-exchange-stats'); -export async function getExchangeStats(payload: JobPayload): Promise { +export async function getExchangeStats( + payload: JobPayload, + container: ServiceContainer +): Promise { logger.info('Getting exchange statistics...'); try { - const postgresClient = getPostgreSQLClient(); + const postgresClient = container.postgres; const query = ` SELECT COUNT(*) as total_exchanges, diff --git a/apps/data-pipeline/src/handlers/exchanges/operations/provider-mapping-stats.operations.ts b/apps/data-pipeline/src/handlers/exchanges/operations/provider-mapping-stats.operations.ts index 62cb229..9d07412 100644 --- a/apps/data-pipeline/src/handlers/exchanges/operations/provider-mapping-stats.operations.ts +++ b/apps/data-pipeline/src/handlers/exchanges/operations/provider-mapping-stats.operations.ts @@ -1,14 +1,17 @@ import { getLogger } from '@stock-bot/logger'; -import { getPostgreSQLClient } from '../../../clients'; +import type { ServiceContainer } from '@stock-bot/di'; import type { JobPayload } from '../../../types/job-payloads'; const logger = getLogger('enhanced-sync-provider-mapping-stats'); -export async function getProviderMappingStats(payload: JobPayload): Promise { +export async function getProviderMappingStats( + payload: JobPayload, + container: ServiceContainer +): Promise { logger.info('Getting provider mapping statistics...'); try { - const postgresClient = getPostgreSQLClient(); + const postgresClient = container.postgres; const query = ` SELECT provider, diff --git a/apps/data-pipeline/src/handlers/exchanges/operations/sync-all-exchanges.operations.ts b/apps/data-pipeline/src/handlers/exchanges/operations/sync-all-exchanges.operations.ts index e6ba7fd..7c642b4 100644 --- a/apps/data-pipeline/src/handlers/exchanges/operations/sync-all-exchanges.operations.ts +++ b/apps/data-pipeline/src/handlers/exchanges/operations/sync-all-exchanges.operations.ts @@ -1,10 +1,10 @@ import { getLogger } from '@stock-bot/logger'; -import { getMongoDBClient, getPostgreSQLClient } from '../../../clients'; +import type { ServiceContainer } from '@stock-bot/di'; import type { JobPayload, SyncResult } from '../../../types/job-payloads'; const logger = getLogger('enhanced-sync-all-exchanges'); -export async function syncAllExchanges(payload: JobPayload): Promise { +export async function syncAllExchanges(payload: JobPayload, container: ServiceContainer): Promise { const clearFirst = payload.clearFirst || true; logger.info('Starting comprehensive exchange sync...', { clearFirst }); @@ -17,7 +17,7 @@ export async function syncAllExchanges(payload: JobPayload): Promise }; try { - const postgresClient = getPostgreSQLClient(); + const postgresClient = container.postgres; // Clear existing data if requested if (clearFirst) { @@ -28,11 +28,11 @@ export async function syncAllExchanges(payload: JobPayload): Promise await postgresClient.query('BEGIN'); // 1. Sync from EOD exchanges (comprehensive global data) - const eodResult = await syncEODExchanges(); + const eodResult = await syncEODExchanges(container); mergeResults(result, eodResult); // 2. Sync from IB exchanges (detailed asset information) - const ibResult = await syncIBExchanges(); + const ibResult = await syncIBExchanges(container); mergeResults(result, ibResult); // 3. Update sync status @@ -43,13 +43,14 @@ export async function syncAllExchanges(payload: JobPayload): Promise logger.info('Comprehensive exchange sync completed', result); return result; } catch (error) { - const postgresClient = getPostgreSQLClient(); + const postgresClient = container.postgres; await postgresClient.query('ROLLBACK'); logger.error('Comprehensive exchange sync failed', { error }); throw error; } } + async function clearPostgreSQLData(postgresClient: any): Promise { logger.info('Clearing existing PostgreSQL data...'); @@ -66,8 +67,8 @@ async function clearPostgreSQLData(postgresClient: any): Promise { logger.info('PostgreSQL data cleared successfully'); } -async function syncEODExchanges(): Promise { - const mongoClient = getMongoDBClient(); +async function syncEODExchanges(container: ServiceContainer): Promise { + const mongoClient = container.mongodb; const exchanges = await mongoClient.find('eodExchanges', { active: true }); const result: SyncResult = { processed: 0, created: 0, updated: 0, skipped: 0, errors: 0 }; @@ -80,7 +81,8 @@ async function syncEODExchanges(): Promise { exchange.Name, exchange.CountryISO2, exchange.Currency, - 0.95 // very high confidence for EOD data + 0.95, // very high confidence for EOD data + container ); result.processed++; @@ -94,8 +96,8 @@ async function syncEODExchanges(): Promise { return result; } -async function syncIBExchanges(): Promise { - const mongoClient = getMongoDBClient(); +async function syncIBExchanges(container: ServiceContainer): Promise { + const mongoClient = container.mongodb; const exchanges = await mongoClient.find('ibExchanges', {}); const result: SyncResult = { processed: 0, created: 0, updated: 0, skipped: 0, errors: 0 }; @@ -108,7 +110,8 @@ async function syncIBExchanges(): Promise { exchange.name, exchange.country_code, 'USD', // IB doesn't specify currency, default to USD - 0.85 // good confidence for IB data + 0.85, // good confidence for IB data + container ); result.processed++; @@ -128,16 +131,17 @@ async function createProviderExchangeMapping( providerExchangeName: string, countryCode: string | null, currency: string | null, - confidence: number + confidence: number, + container: ServiceContainer ): Promise { if (!providerExchangeCode) { return; } - const postgresClient = getPostgreSQLClient(); + const postgresClient = container.postgres; // Check if mapping already exists - const existingMapping = await findProviderExchangeMapping(provider, providerExchangeCode); + const existingMapping = await findProviderExchangeMapping(provider, providerExchangeCode, container); if (existingMapping) { // Don't override existing mappings to preserve manual work return; @@ -148,7 +152,8 @@ async function createProviderExchangeMapping( providerExchangeCode, providerExchangeName, countryCode, - currency + currency, + container ); // Create the provider exchange mapping @@ -175,12 +180,13 @@ async function findOrCreateMasterExchange( providerCode: string, providerName: string, countryCode: string | null, - currency: string | null + currency: string | null, + container: ServiceContainer ): Promise { - const postgresClient = getPostgreSQLClient(); + const postgresClient = container.postgres; // First, try to find exact match - let masterExchange = await findExchangeByCode(providerCode); + let masterExchange = await findExchangeByCode(providerCode, container); if (masterExchange) { return masterExchange; @@ -189,7 +195,7 @@ async function findOrCreateMasterExchange( // Try to find by similar codes (basic mapping) const basicMapping = getBasicExchangeMapping(providerCode); if (basicMapping) { - masterExchange = await findExchangeByCode(basicMapping); + masterExchange = await findExchangeByCode(basicMapping, container); if (masterExchange) { return masterExchange; } @@ -230,17 +236,18 @@ function getBasicExchangeMapping(providerCode: string): string | null { async function findProviderExchangeMapping( provider: string, - providerExchangeCode: string + providerExchangeCode: string, + container: ServiceContainer ): Promise { - const postgresClient = getPostgreSQLClient(); + const postgresClient = container.postgres; const query = 'SELECT * FROM provider_exchange_mappings WHERE provider = $1 AND provider_exchange_code = $2'; const result = await postgresClient.query(query, [provider, providerExchangeCode]); return result.rows[0] || null; } -async function findExchangeByCode(code: string): Promise { - const postgresClient = getPostgreSQLClient(); +async function findExchangeByCode(code: string, container: ServiceContainer): Promise { + const postgresClient = container.postgres; const query = 'SELECT * FROM exchanges WHERE code = $1'; const result = await postgresClient.query(query, [code]); return result.rows[0] || null; diff --git a/apps/data-pipeline/src/handlers/index.ts b/apps/data-pipeline/src/handlers/index.ts new file mode 100644 index 0000000..a596fb4 --- /dev/null +++ b/apps/data-pipeline/src/handlers/index.ts @@ -0,0 +1,33 @@ +/** + * Handler initialization for data pipeline service + * Registers all handlers with the service container + */ + +import type { ServiceContainer } from '@stock-bot/di'; +import { getLogger } from '@stock-bot/logger'; +import { initializeExchangesHandler } from './exchanges/exchanges.handler'; +import { initializeSymbolsHandler } from './symbols/symbols.handler'; + +const logger = getLogger('pipeline-handler-init'); + +/** + * Initialize all handlers with the service container + */ +export async function initializeAllHandlers(container: ServiceContainer): Promise { + logger.info('Initializing data pipeline handlers...'); + + try { + // Initialize exchanges handler with container + initializeExchangesHandler(container); + logger.debug('Exchanges handler initialized'); + + // Initialize symbols handler with container + initializeSymbolsHandler(container); + logger.debug('Symbols handler initialized'); + + logger.info('All pipeline handlers initialized successfully'); + } catch (error) { + logger.error('Failed to initialize handlers', { error }); + throw error; + } +} \ No newline at end of file diff --git a/apps/data-pipeline/src/handlers/symbols/symbols.handler.ts b/apps/data-pipeline/src/handlers/symbols/symbols.handler.ts index 9013e06..3cd384b 100644 --- a/apps/data-pipeline/src/handlers/symbols/symbols.handler.ts +++ b/apps/data-pipeline/src/handlers/symbols/symbols.handler.ts @@ -1,5 +1,6 @@ import { getLogger } from '@stock-bot/logger'; -import { handlerRegistry, type HandlerConfig, type ScheduledJobConfig } from '@stock-bot/queue'; +import { handlerRegistry, createJobHandler, type HandlerConfig, type ScheduledJobConfig } from '@stock-bot/queue'; +import type { ServiceContainer } from '@stock-bot/di'; import { symbolOperations } from './operations'; const logger = getLogger('symbols-handler'); @@ -34,8 +35,22 @@ const symbolsHandlerConfig: HandlerConfig = { }, }; -export function initializeSymbolsHandler(): void { +export function initializeSymbolsHandler(container: ServiceContainer): void { logger.info('Registering symbols handler...'); - handlerRegistry.registerHandler(HANDLER_NAME, symbolsHandlerConfig); + + // Update operations to use container + const containerAwareOperations = Object.entries(symbolOperations).reduce((acc, [key, operation]) => { + acc[key] = createJobHandler(async (payload: any) => { + return operation(payload, container); + }); + return acc; + }, {} as Record); + + const symbolsHandlerConfigWithContainer: HandlerConfig = { + ...symbolsHandlerConfig, + operations: containerAwareOperations, + }; + + handlerRegistry.register(HANDLER_NAME, symbolsHandlerConfigWithContainer); logger.info('Symbols handler registered successfully'); } diff --git a/apps/data-pipeline/src/index.ts b/apps/data-pipeline/src/index.ts index d5f487d..3ffc6dd 100644 --- a/apps/data-pipeline/src/index.ts +++ b/apps/data-pipeline/src/index.ts @@ -1,22 +1,31 @@ +/** + * Data Pipeline Service with Dependency Injection + * Uses Awilix container for managing database connections and services + */ + // Framework imports import { Hono } from 'hono'; import { cors } from 'hono/cors'; import { initializeServiceConfig } from '@stock-bot/config'; + // Library imports +import { + createServiceContainer, + initializeServices as initializeAwilixServices, + type ServiceContainer +} from '@stock-bot/di'; import { getLogger, setLoggerConfig, shutdownLoggers } from '@stock-bot/logger'; -import { MongoDBClient } from '@stock-bot/mongodb'; -import { PostgreSQLClient } from '@stock-bot/postgres'; -import { QueueManager, type QueueManagerConfig } from '@stock-bot/queue'; import { Shutdown } from '@stock-bot/shutdown'; -import { setMongoDBClient, setPostgreSQLClient } from './clients'; +import { handlerRegistry } from '@stock-bot/types'; + // Local imports -import { enhancedSyncRoutes, healthRoutes, statsRoutes, syncRoutes } from './routes'; +import { createRoutes } from './routes/create-routes'; +import { setupServiceContainer } from './container-setup'; +import { initializeAllHandlers } from './handlers'; const config = initializeServiceConfig(); -console.log('Data Sync Service Configuration:', JSON.stringify(config, null, 2)); +console.log('Data Pipeline Service Configuration:', JSON.stringify(config, null, 2)); const serviceConfig = config.service; -const databaseConfig = config.database; -const queueConfig = config.queue; if (config.log) { setLoggerConfig({ @@ -31,129 +40,91 @@ if (config.log) { // Create logger AFTER config is set const logger = getLogger('data-pipeline'); -const app = new Hono(); - -// Add CORS middleware -app.use( - '*', - cors({ - origin: '*', - allowMethods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS', 'PATCH'], - allowHeaders: ['Content-Type', 'Authorization'], - credentials: false, - }) -); const PORT = serviceConfig.port; let server: ReturnType | null = null; -let mongoClient: MongoDBClient | null = null; -let postgresClient: PostgreSQLClient | null = null; -let queueManager: QueueManager | null = null; +let container: ServiceContainer | null = null; +let app: Hono | null = null; // Initialize shutdown manager const shutdown = Shutdown.getInstance({ timeout: 15000 }); -// Mount routes -app.route('/health', healthRoutes); -app.route('/sync', syncRoutes); -app.route('/sync', enhancedSyncRoutes); -app.route('/sync/stats', statsRoutes); - -// Initialize services +// Initialize services with DI pattern async function initializeServices() { - logger.info('Initializing data sync service...'); + logger.info('Initializing data pipeline service with DI...'); try { - // Initialize MongoDB client - logger.debug('Connecting to MongoDB...'); - const mongoConfig = databaseConfig.mongodb; - mongoClient = new MongoDBClient( - { - uri: mongoConfig.uri, - database: mongoConfig.database, - host: mongoConfig.host || 'localhost', - port: mongoConfig.port || 27017, - timeouts: { - connectTimeout: 30000, - socketTimeout: 30000, - serverSelectionTimeout: 5000, - }, + // Create Awilix container with proper config structure + logger.debug('Creating Awilix DI container...'); + const awilixConfig = { + redis: { + host: config.database.dragonfly.host, + port: config.database.dragonfly.port, + db: config.database.dragonfly.db, }, - logger - ); - await mongoClient.connect(); - setMongoDBClient(mongoClient); - logger.info('MongoDB connected'); - - // Initialize PostgreSQL client - logger.debug('Connecting to PostgreSQL...'); - const pgConfig = databaseConfig.postgres; - postgresClient = new PostgreSQLClient( - { - host: pgConfig.host, - port: pgConfig.port, - database: pgConfig.database, - username: pgConfig.user, - password: pgConfig.password, - poolSettings: { - min: 2, - max: pgConfig.poolSize || 10, - idleTimeoutMillis: pgConfig.idleTimeout || 30000, - }, + mongodb: { + uri: config.database.mongodb.uri, + database: config.database.mongodb.database, }, - logger - ); - await postgresClient.connect(); - setPostgreSQLClient(postgresClient); - logger.info('PostgreSQL connected'); - - // Initialize queue system (with delayed worker start) - logger.debug('Initializing queue system...'); - const queueManagerConfig: QueueManagerConfig = { - redis: queueConfig?.redis || { - host: 'localhost', - port: 6379, - db: 1, + postgres: { + host: config.database.postgres.host, + port: config.database.postgres.port, + database: config.database.postgres.database, + user: config.database.postgres.user, + password: config.database.postgres.password, }, - defaultQueueOptions: { - defaultJobOptions: queueConfig?.defaultJobOptions || { - attempts: 3, - backoff: { - type: 'exponential', - delay: 1000, - }, - removeOnComplete: 10, - removeOnFail: 5, - }, - workers: 2, - concurrency: 1, - enableMetrics: true, - enableDLQ: true, + questdb: { + enabled: config.database.questdb.enabled || false, + host: config.database.questdb.host, + httpPort: config.database.questdb.httpPort, + pgPort: config.database.questdb.pgPort, + influxPort: config.database.questdb.ilpPort, + database: config.database.questdb.database, }, - enableScheduledJobs: true, - delayWorkerStart: true, // Prevent workers from starting until all singletons are ready }; + + container = createServiceContainer(awilixConfig); + await initializeAwilixServices(container); + logger.info('Awilix container created and initialized'); + + // Setup service-specific configuration + const serviceContainer = setupServiceContainer(config, container.resolve('serviceContainer')); + + // Initialize migration helper for backward compatibility + const { setContainerForMigration } = await import('./migration-helper'); + setContainerForMigration(serviceContainer); + logger.info('Migration helper initialized for backward compatibility'); + + // Create app with routes + app = new Hono(); + + // Add CORS middleware + app.use( + '*', + cors({ + origin: '*', + allowMethods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS', 'PATCH'], + allowHeaders: ['Content-Type', 'Authorization'], + credentials: false, + }) + ); + + // Create and mount routes using the service container + const routes = createRoutes(serviceContainer); + app.route('/', routes); - queueManager = QueueManager.getOrInitialize(queueManagerConfig); - logger.info('Queue system initialized'); - - // Initialize handlers (register handlers and scheduled jobs) - logger.debug('Initializing sync handlers...'); - const { initializeExchangesHandler } = await import('./handlers/exchanges/exchanges.handler'); - const { initializeSymbolsHandler } = await import('./handlers/symbols/symbols.handler'); - - initializeExchangesHandler(); - initializeSymbolsHandler(); - - logger.info('Sync handlers initialized'); + // Initialize handlers with service container + logger.debug('Initializing pipeline handlers with DI pattern...'); + await initializeAllHandlers(serviceContainer); + logger.info('Pipeline handlers initialized with DI pattern'); // Create scheduled jobs from registered handlers logger.debug('Creating scheduled jobs from registered handlers...'); - const { handlerRegistry } = await import('@stock-bot/queue'); - const allHandlers = handlerRegistry.getAllHandlers(); + const allHandlers = handlerRegistry.getAllHandlersWithSchedule(); let totalScheduledJobs = 0; for (const [handlerName, config] of allHandlers) { if (config.scheduledJobs && config.scheduledJobs.length > 0) { + const queueManager = container!.resolve('queueManager'); const queue = queueManager.getQueue(handlerName); for (const scheduledJob of config.scheduledJobs) { @@ -161,7 +132,7 @@ async function initializeServices() { const jobData = { handler: handlerName, operation: scheduledJob.operation, - payload: scheduledJob.payload || {}, + payload: scheduledJob.payload, }; // Build job options from scheduled job config @@ -192,14 +163,22 @@ async function initializeServices() { } logger.info('Scheduled jobs created', { totalJobs: totalScheduledJobs }); - // Now that all singletons are initialized and jobs are scheduled, start the workers + // Start queue workers logger.debug('Starting queue workers...'); - queueManager.startAllWorkers(); - logger.info('Queue workers started'); + const queueManager = container.resolve('queueManager'); + if (queueManager) { + queueManager.startAllWorkers(); + logger.info('Queue workers started'); + } logger.info('All services initialized successfully'); } catch (error) { - logger.error('Failed to initialize services', { error }); + console.error('DETAILED ERROR:', error); + logger.error('Failed to initialize services', { + error: error instanceof Error ? error.message : String(error), + stack: error instanceof Error ? error.stack : undefined, + details: JSON.stringify(error, null, 2) + }); throw error; } } @@ -208,13 +187,17 @@ async function initializeServices() { async function startServer() { await initializeServices(); + if (!app) { + throw new Error('App not initialized'); + } + server = Bun.serve({ port: PORT, fetch: app.fetch, development: config.environment === 'development', }); - logger.info(`Data Sync Service started on port ${PORT}`); + logger.info(`Data pipeline service started on port ${PORT}`); } // Register shutdown handlers with priorities @@ -222,6 +205,7 @@ async function startServer() { shutdown.onShutdownHigh(async () => { logger.info('Shutting down queue system...'); try { + const queueManager = container?.resolve('queueManager'); if (queueManager) { await queueManager.shutdown(); } @@ -244,21 +228,27 @@ shutdown.onShutdownHigh(async () => { } }, 'HTTP Server'); -// Priority 2: Database connections (medium priority) +// Priority 2: Services and connections (medium priority) shutdown.onShutdownMedium(async () => { - logger.info('Disconnecting from databases...'); + logger.info('Disposing services and connections...'); try { - if (mongoClient) { - await mongoClient.disconnect(); + if (container) { + // Disconnect database clients + const mongoClient = container.resolve('mongoClient'); + if (mongoClient?.disconnect) await mongoClient.disconnect(); + + const postgresClient = container.resolve('postgresClient'); + if (postgresClient?.disconnect) await postgresClient.disconnect(); + + const questdbClient = container.resolve('questdbClient'); + if (questdbClient?.disconnect) await questdbClient.disconnect(); + + logger.info('All services disposed successfully'); } - if (postgresClient) { - await postgresClient.disconnect(); - } - logger.info('Database connections closed'); } catch (error) { - logger.error('Error closing database connections', { error }); + logger.error('Error disposing services', { error }); } -}, 'Databases'); +}, 'Services'); // Priority 3: Logger shutdown (lowest priority - runs last) shutdown.onShutdownLow(async () => { @@ -273,8 +263,8 @@ shutdown.onShutdownLow(async () => { // Start the service startServer().catch(error => { - logger.fatal('Failed to start data sync service', { error }); + logger.fatal('Failed to start data pipeline service', { error }); process.exit(1); }); -logger.info('Data sync service startup initiated'); +logger.info('Data pipeline service startup initiated with DI pattern'); \ No newline at end of file diff --git a/apps/data-pipeline/src/migration-helper.ts b/apps/data-pipeline/src/migration-helper.ts new file mode 100644 index 0000000..d0af885 --- /dev/null +++ b/apps/data-pipeline/src/migration-helper.ts @@ -0,0 +1,37 @@ +/** + * Temporary migration helper for data-pipeline service + * Provides backward compatibility while migrating to DI container + * + * TODO: Remove this file once all operations are migrated to use ServiceContainer + */ + +import type { ServiceContainer } from '@stock-bot/di'; +import type { MongoDBClient } from '@stock-bot/mongodb'; +import type { PostgreSQLClient } from '@stock-bot/postgres'; + +let containerInstance: ServiceContainer | null = null; + +export function setContainerForMigration(container: ServiceContainer): void { + containerInstance = container; +} + +export function getMongoDBClient(): MongoDBClient { + if (!containerInstance) { + throw new Error('Container not initialized. This is a migration helper - please update the operation to accept ServiceContainer parameter'); + } + return containerInstance.mongodb; +} + +export function getPostgreSQLClient(): PostgreSQLClient { + if (!containerInstance) { + throw new Error('Container not initialized. This is a migration helper - please update the operation to accept ServiceContainer parameter'); + } + return containerInstance.postgres; +} + +export function getQuestDBClient(): any { + if (!containerInstance) { + throw new Error('Container not initialized. This is a migration helper - please update the operation to accept ServiceContainer parameter'); + } + return containerInstance.questdb; +} \ No newline at end of file diff --git a/apps/data-pipeline/src/routes/create-routes.ts b/apps/data-pipeline/src/routes/create-routes.ts new file mode 100644 index 0000000..8cf160f --- /dev/null +++ b/apps/data-pipeline/src/routes/create-routes.ts @@ -0,0 +1,26 @@ +/** + * Route factory for data pipeline service + * Creates routes with access to the service container + */ + +import { Hono } from 'hono'; +import type { ServiceContainer } from '@stock-bot/di'; +import { healthRoutes, syncRoutes, enhancedSyncRoutes, statsRoutes } from './index'; + +export function createRoutes(container: ServiceContainer): Hono { + const app = new Hono(); + + // Add container to context for all routes + app.use('*', async (c, next) => { + c.set('container', container); + await next(); + }); + + // Mount routes + app.route('/health', healthRoutes); + app.route('/sync', syncRoutes); + app.route('/sync', enhancedSyncRoutes); + app.route('/sync/stats', statsRoutes); + + return app; +} \ No newline at end of file diff --git a/apps/web-api/src/clients.ts b/apps/web-api/src/clients.ts index 8cd54e2..bf8e03d 100644 --- a/apps/web-api/src/clients.ts +++ b/apps/web-api/src/clients.ts @@ -1,27 +1,8 @@ -import { MongoDBClient } from '@stock-bot/mongodb'; -import { PostgreSQLClient } from '@stock-bot/postgres'; +/** + * Client exports for backward compatibility + * + * @deprecated Use ServiceContainer parameter instead + * This file will be removed once all routes and services are migrated + */ -let postgresClient: PostgreSQLClient | null = null; -let mongodbClient: MongoDBClient | null = null; - -export function setPostgreSQLClient(client: PostgreSQLClient): void { - postgresClient = client; -} - -export function getPostgreSQLClient(): PostgreSQLClient { - if (!postgresClient) { - throw new Error('PostgreSQL client not initialized. Call setPostgreSQLClient first.'); - } - return postgresClient; -} - -export function setMongoDBClient(client: MongoDBClient): void { - mongodbClient = client; -} - -export function getMongoDBClient(): MongoDBClient { - if (!mongodbClient) { - throw new Error('MongoDB client not initialized. Call setMongoDBClient first.'); - } - return mongodbClient; -} +export { getMongoDBClient, getPostgreSQLClient } from './migration-helper'; \ No newline at end of file diff --git a/apps/web-api/src/container-setup.ts b/apps/web-api/src/container-setup.ts new file mode 100644 index 0000000..2e71f0f --- /dev/null +++ b/apps/web-api/src/container-setup.ts @@ -0,0 +1,34 @@ +/** + * Service Container Setup for Web API + * Configures dependency injection for the web API service + */ + +import type { ServiceContainer } from '@stock-bot/di'; +import { getLogger } from '@stock-bot/logger'; +import type { AppConfig } from '@stock-bot/config'; + +const logger = getLogger('web-api-container'); + +/** + * Configure the service container for web API workloads + */ +export function setupServiceContainer( + config: AppConfig, + container: ServiceContainer +): ServiceContainer { + logger.info('Configuring web API service container...'); + + // Web API specific configuration + // This service mainly reads data, so smaller pool sizes are fine + const poolSizes = { + mongodb: config.environment === 'production' ? 20 : 10, + postgres: config.environment === 'production' ? 30 : 15, + cache: config.environment === 'production' ? 20 : 10, + }; + + logger.info('Web API pool sizes configured', poolSizes); + + // The container is already configured with connections + // Just return it with our logging + return container; +} \ No newline at end of file diff --git a/apps/web-api/src/index.ts b/apps/web-api/src/index.ts index 4d247a4..76f5c16 100644 --- a/apps/web-api/src/index.ts +++ b/apps/web-api/src/index.ts @@ -1,125 +1,137 @@ /** - * Stock Bot Web API - REST API service for web application + * Stock Bot Web API with Dependency Injection + * REST API service using Awilix container for managing connections */ + +// Framework imports import { Hono } from 'hono'; import { cors } from 'hono/cors'; import { initializeServiceConfig } from '@stock-bot/config'; + +// Library imports +import { + createServiceContainer, + initializeServices as initializeAwilixServices, + type ServiceContainer +} from '@stock-bot/di'; import { getLogger, setLoggerConfig, shutdownLoggers } from '@stock-bot/logger'; -import { MongoDBClient } from '@stock-bot/mongodb'; -import { PostgreSQLClient } from '@stock-bot/postgres'; import { Shutdown } from '@stock-bot/shutdown'; -import { exchangeRoutes } from './routes/exchange.routes'; -import { healthRoutes } from './routes/health.routes'; -// Import routes -import { setMongoDBClient, setPostgreSQLClient } from './clients'; -// Initialize configuration with automatic monorepo config inheritance -const config = await initializeServiceConfig(); +// Local imports +import { createRoutes } from './routes/create-routes'; +import { setupServiceContainer } from './container-setup'; + +const config = initializeServiceConfig(); +console.log('Web API Service Configuration:', JSON.stringify(config, null, 2)); const serviceConfig = config.service; -const databaseConfig = config.database; -// Initialize logger with config -const loggingConfig = config.logging; -if (loggingConfig) { +if (config.log) { setLoggerConfig({ - logLevel: loggingConfig.level, + logLevel: config.log.level, logConsole: true, logFile: false, environment: config.environment, + hideObject: config.log.hideObject, }); } -const app = new Hono(); - -// Add CORS middleware -app.use( - '*', - cors({ - origin: ['http://localhost:4200', 'http://localhost:3000', 'http://localhost:3002'], // React dev server ports - allowMethods: ['GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS'], - allowHeaders: ['Content-Type', 'Authorization'], - credentials: true, - }) -); - +// Create logger AFTER config is set const logger = getLogger('web-api'); + const PORT = serviceConfig.port; let server: ReturnType | null = null; -let postgresClient: PostgreSQLClient | null = null; -let mongoClient: MongoDBClient | null = null; +let container: ServiceContainer | null = null; +let app: Hono | null = null; // Initialize shutdown manager const shutdown = Shutdown.getInstance({ timeout: 15000 }); -// Add routes -app.route('/health', healthRoutes); -app.route('/api/exchanges', exchangeRoutes); - -// Basic API info endpoint -app.get('/', c => { - return c.json({ - name: 'Stock Bot Web API', - version: '1.0.0', - status: 'running', - timestamp: new Date().toISOString(), - endpoints: { - health: '/health', - exchanges: '/api/exchanges', - }, - }); -}); - -// Initialize services +// Initialize services with DI pattern async function initializeServices() { - logger.info('Initializing web API service...'); + logger.info('Initializing web API service with DI...'); try { - // Initialize MongoDB client - logger.debug('Connecting to MongoDB...'); - const mongoConfig = databaseConfig.mongodb; - mongoClient = new MongoDBClient( - { - uri: mongoConfig.uri, - database: mongoConfig.database, - host: mongoConfig.host, - port: mongoConfig.port, - timeouts: { - connectTimeout: 30000, - socketTimeout: 30000, - serverSelectionTimeout: 5000, - }, + // Create Awilix container with proper config structure + logger.debug('Creating Awilix DI container...'); + const awilixConfig = { + redis: { + host: config.database.dragonfly.host, + port: config.database.dragonfly.port, + db: config.database.dragonfly.db, }, - logger - ); - await mongoClient.connect(); - setMongoDBClient(mongoClient); - logger.info('MongoDB connected'); - - // Initialize PostgreSQL client - logger.debug('Connecting to PostgreSQL...'); - const pgConfig = databaseConfig.postgres; - postgresClient = new PostgreSQLClient( - { - host: pgConfig.host, - port: pgConfig.port, - database: pgConfig.database, - username: pgConfig.user, - password: pgConfig.password, - poolSettings: { - min: 2, - max: pgConfig.poolSize || 10, - idleTimeoutMillis: pgConfig.idleTimeout || 30000, - }, + mongodb: { + uri: config.database.mongodb.uri, + database: config.database.mongodb.database, }, - logger + postgres: { + host: config.database.postgres.host, + port: config.database.postgres.port, + database: config.database.postgres.database, + user: config.database.postgres.user, + password: config.database.postgres.password, + }, + questdb: { + enabled: false, // Web API doesn't need QuestDB + host: config.database.questdb.host, + httpPort: config.database.questdb.httpPort, + pgPort: config.database.questdb.pgPort, + influxPort: config.database.questdb.ilpPort, + database: config.database.questdb.database, + }, + }; + + container = createServiceContainer(awilixConfig); + await initializeAwilixServices(container); + logger.info('Awilix container created and initialized'); + + // Setup service-specific configuration + const serviceContainer = setupServiceContainer(config, container.resolve('serviceContainer')); + + // Initialize migration helper for backward compatibility + const { setContainerForMigration } = await import('./migration-helper'); + setContainerForMigration(serviceContainer); + logger.info('Migration helper initialized for backward compatibility'); + + // Create app with routes + app = new Hono(); + + // Add CORS middleware + app.use( + '*', + cors({ + origin: ['http://localhost:4200', 'http://localhost:3000', 'http://localhost:3002'], + allowMethods: ['GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS'], + allowHeaders: ['Content-Type', 'Authorization'], + credentials: true, + }) ); - await postgresClient.connect(); - setPostgreSQLClient(postgresClient); - logger.info('PostgreSQL connected'); + + // Basic API info endpoint + app.get('/', c => { + return c.json({ + name: 'Stock Bot Web API', + version: '1.0.0', + status: 'running', + timestamp: new Date().toISOString(), + endpoints: { + health: '/health', + exchanges: '/api/exchanges', + }, + }); + }); + + // Create and mount routes using the service container + const routes = createRoutes(serviceContainer); + app.route('/', routes); logger.info('All services initialized successfully'); } catch (error) { - logger.error('Failed to initialize services', { error }); + console.error('DETAILED ERROR:', error); + logger.error('Failed to initialize services', { + error: error instanceof Error ? error.message : String(error), + stack: error instanceof Error ? error.stack : undefined, + details: JSON.stringify(error, null, 2) + }); throw error; } } @@ -128,17 +140,22 @@ async function initializeServices() { async function startServer() { await initializeServices(); + if (!app) { + throw new Error('App not initialized'); + } + server = Bun.serve({ port: PORT, fetch: app.fetch, development: config.environment === 'development', }); - logger.info(`Stock Bot Web API started on port ${PORT}`); + logger.info(`Web API service started on port ${PORT}`); } -// Register shutdown handlers -shutdown.onShutdown(async () => { +// Register shutdown handlers with priorities +// Priority 1: HTTP Server (high priority) +shutdown.onShutdownHigh(async () => { if (server) { logger.info('Stopping HTTP server...'); try { @@ -148,36 +165,42 @@ shutdown.onShutdown(async () => { logger.error('Error stopping HTTP server', { error }); } } -}); +}, 'HTTP Server'); -shutdown.onShutdown(async () => { - logger.info('Disconnecting from databases...'); +// Priority 2: Services and connections (medium priority) +shutdown.onShutdownMedium(async () => { + logger.info('Disposing services and connections...'); try { - if (mongoClient) { - await mongoClient.disconnect(); + if (container) { + // Disconnect database clients + const mongoClient = container.resolve('mongoClient'); + if (mongoClient?.disconnect) await mongoClient.disconnect(); + + const postgresClient = container.resolve('postgresClient'); + if (postgresClient?.disconnect) await postgresClient.disconnect(); + + logger.info('All services disposed successfully'); } - if (postgresClient) { - await postgresClient.disconnect(); - } - logger.info('Database connections closed'); } catch (error) { - logger.error('Error closing database connections', { error }); + logger.error('Error disposing services', { error }); } -}); +}, 'Services'); -shutdown.onShutdown(async () => { +// Priority 3: Logger shutdown (lowest priority - runs last) +shutdown.onShutdownLow(async () => { try { + logger.info('Shutting down loggers...'); await shutdownLoggers(); - // process.stdout.write('Web API loggers shut down\n'); - } catch (error) { - process.stderr.write(`Error shutting down loggers: ${error}\n`); + // Don't log after shutdown + } catch { + // Silently ignore logger shutdown errors } -}); +}, 'Loggers'); // Start the service startServer().catch(error => { - logger.error('Failed to start web API service', { error }); + logger.fatal('Failed to start web API service', { error }); process.exit(1); }); -logger.info('Web API service startup initiated'); +logger.info('Web API service startup initiated with DI pattern'); \ No newline at end of file diff --git a/apps/web-api/src/migration-helper.ts b/apps/web-api/src/migration-helper.ts new file mode 100644 index 0000000..349f050 --- /dev/null +++ b/apps/web-api/src/migration-helper.ts @@ -0,0 +1,30 @@ +/** + * Temporary migration helper for web-api service + * Provides backward compatibility while migrating to DI container + * + * TODO: Remove this file once all routes and services are migrated to use ServiceContainer + */ + +import type { ServiceContainer } from '@stock-bot/di'; +import type { MongoDBClient } from '@stock-bot/mongodb'; +import type { PostgreSQLClient } from '@stock-bot/postgres'; + +let containerInstance: ServiceContainer | null = null; + +export function setContainerForMigration(container: ServiceContainer): void { + containerInstance = container; +} + +export function getMongoDBClient(): MongoDBClient { + if (!containerInstance) { + throw new Error('Container not initialized. This is a migration helper - please update the service to accept ServiceContainer parameter'); + } + return containerInstance.mongodb; +} + +export function getPostgreSQLClient(): PostgreSQLClient { + if (!containerInstance) { + throw new Error('Container not initialized. This is a migration helper - please update the service to accept ServiceContainer parameter'); + } + return containerInstance.postgres; +} \ No newline at end of file diff --git a/apps/web-api/src/routes/create-routes.ts b/apps/web-api/src/routes/create-routes.ts new file mode 100644 index 0000000..11867ca --- /dev/null +++ b/apps/web-api/src/routes/create-routes.ts @@ -0,0 +1,24 @@ +/** + * Route factory for web API service + * Creates routes with access to the service container + */ + +import { Hono } from 'hono'; +import type { ServiceContainer } from '@stock-bot/di'; +import { healthRoutes, exchangeRoutes } from './index'; + +export function createRoutes(container: ServiceContainer): Hono { + const app = new Hono(); + + // Add container to context for all routes + app.use('*', async (c, next) => { + c.set('container', container); + await next(); + }); + + // Mount routes + app.route('/health', healthRoutes); + app.route('/api/exchanges', exchangeRoutes); + + return app; +} \ No newline at end of file