From 3227388d250d0f658b0666a708036f4bf0162668 Mon Sep 17 00:00:00 2001 From: Boki Date: Sat, 21 Jun 2025 19:42:20 -0400 Subject: [PATCH] integrated data-ingestion --- apps/data-ingestion/config/default.json | 66 ++++++++++++++++- .../src/handlers/example-handler.ts | 64 ++++++++++------- .../proxy/operations/check.operations.ts | 33 ++++----- .../proxy/operations/fetch.operations.ts | 9 +-- .../src/handlers/proxy/proxy.handler.ts | 4 +- .../qm/operations/session.operations.ts | 70 +++++++++++-------- .../src/handlers/qm/qm.handler.ts | 1 - .../src/handlers/webshare/webshare.handler.ts | 2 +- apps/data-ingestion/src/index.ts | 38 +++++++--- .../src/routes/create-routes.ts | 2 +- .../src/setup/database-setup.ts | 52 +++++--------- config/default.json | 4 +- libs/core/config/config/default.json | 4 +- libs/core/config/config/test.json | 4 +- libs/services/queue/src/dlq-handler.ts | 6 +- 15 files changed, 226 insertions(+), 133 deletions(-) diff --git a/apps/data-ingestion/config/default.json b/apps/data-ingestion/config/default.json index 1c6d8b5..c8d21e3 100644 --- a/apps/data-ingestion/config/default.json +++ b/apps/data-ingestion/config/default.json @@ -12,6 +12,57 @@ "credentials": false } }, + "log": { + "level": "info", + "format": "json", + "hideObject": false, + "loki": { + "enabled": false, + "host": "localhost", + "port": 3100, + "labels": {} + } + }, + "database": { + "postgres": { + "host": "localhost", + "port": 5432, + "database": "trading_bot", + "user": "trading_user", + "password": "trading_pass_dev", + "ssl": false, + "poolSize": 20, + "connectionTimeout": 30000, + "idleTimeout": 10000 + }, + "questdb": { + "host": "localhost", + "ilpPort": 9009, + "httpPort": 9000, + "pgPort": 8812, + "database": "questdb", + "user": "admin", + "password": "quest", + "bufferSize": 65536, + "flushInterval": 1000 + }, + "mongodb": { + "host": "localhost", + "port": 27017, + "database": "stock", + "user": "trading_admin", + "password": "trading_mongo_dev", + "authSource": "admin", + "poolSize": 20 + }, + "dragonfly": { + "host": "localhost", + "port": 6379, + "db": 0, + "maxRetries": 3, + "retryDelay": 100 + } + }, "queue": { "redis": { "host": "localhost", @@ -24,12 +75,23 @@ "type": "exponential", "delay": 1000 }, - "removeOnComplete": true, - "removeOnFail": false + "removeOnComplete": 100, + "removeOnFail": 50 } }, "webshare": { "apiKey": "", "apiUrl": "https://proxy.webshare.io/api/v2/" + }, + "http": { + "timeout": 30000, + "retries": 3, + "retryDelay": 1000, + "userAgent": "StockBot/1.0", + "rateLimit": { + "enabled": false, + "requestsPerSecond": 10, + "burstSize": 20 + } } } \ No newline at end of file diff --git a/apps/data-ingestion/src/handlers/example-handler.ts b/apps/data-ingestion/src/handlers/example-handler.ts index cfbd697..58682a1 100644 --- a/apps/data-ingestion/src/handlers/example-handler.ts +++ b/apps/data-ingestion/src/handlers/example-handler.ts @@ -1,5 +1,5 @@ -import { OperationContext } from '@stock-bot/utils'; -import type { ServiceContainer } from '@stock-bot/connection-factory'; +import { OperationContext } from '@stock-bot/di'; +import type { ServiceContainer } from '@stock-bot/di'; /** * Example handler showing how to use the new connection pooling pattern @@ -12,26 +12,32 @@ export class ExampleHandler { */ async performOperation(data: any): Promise { // Create operation context with container - const context = OperationContext.create('example', 'perform-operation', { - container: this.container - }); + const context = new OperationContext( + 'example-handler', + 'perform-operation', + this.container, + { data } + ); try { // Log operation start context.logger.info('Starting operation', { data }); - // Use MongoDB through context (no more singleton!) - const result = await context.mongodb.collection('test').insertOne(data); + // Use MongoDB through service resolution + const mongodb = context.resolve('mongodb'); + const result = await mongodb.collection('test').insertOne(data); context.logger.debug('MongoDB insert complete', { insertedId: result.insertedId }); - // Use PostgreSQL through context - await context.postgres.query( + // Use PostgreSQL through service resolution + const postgres = context.resolve('postgres'); + await postgres.query( 'INSERT INTO operations (id, status) VALUES ($1, $2)', [result.insertedId, 'completed'] ); - // Use cache through context - await context.cache.set(`operation:${result.insertedId}`, { + // Use cache through service resolution + const cache = context.resolve('cache'); + await cache.set(`operation:${result.insertedId}`, { status: 'completed', timestamp: new Date() }); @@ -40,9 +46,6 @@ export class ExampleHandler { } catch (error) { context.logger.error('Operation failed', { error }); throw error; - } finally { - // Clean up resources - await context.dispose(); } } @@ -53,23 +56,35 @@ export class ExampleHandler { // Create a scoped container for this batch operation const scopedContainer = this.container.createScope(); - const context = OperationContext.create('example', 'batch-operation', { - container: scopedContainer - }); + const context = new OperationContext( + 'example-handler', + 'batch-operation', + scopedContainer, + { itemCount: items.length } + ); try { context.logger.info('Starting batch operation', { itemCount: items.length }); - // Process items in parallel with isolated connections + // Get services once for the batch + const mongodb = context.resolve('mongodb'); + const cache = context.resolve('cache'); + + // Process items in parallel const promises = items.map(async (item, index) => { - // Each sub-operation gets its own context - const subContext = context.createChild(`item-${index}`); + const itemContext = new OperationContext( + 'example-handler', + `batch-item-${index}`, + scopedContainer, + { item } + ); try { - await subContext.mongodb.collection('batch').insertOne(item); - await subContext.cache.set(`batch:${item.id}`, item); - } finally { - await subContext.dispose(); + await mongodb.collection('batch').insertOne(item); + await cache.set(`batch:${item.id}`, item); + } catch (error) { + itemContext.logger.error('Batch item failed', { error, itemIndex: index }); + throw error; } }); @@ -78,7 +93,6 @@ export class ExampleHandler { } finally { // Clean up scoped resources - await context.dispose(); await scopedContainer.dispose(); } } diff --git a/apps/data-ingestion/src/handlers/proxy/operations/check.operations.ts b/apps/data-ingestion/src/handlers/proxy/operations/check.operations.ts index 3d92903..c73200f 100644 --- a/apps/data-ingestion/src/handlers/proxy/operations/check.operations.ts +++ b/apps/data-ingestion/src/handlers/proxy/operations/check.operations.ts @@ -3,9 +3,9 @@ */ import { HttpClient, ProxyInfo } from '@stock-bot/http'; import { OperationContext } from '@stock-bot/di'; +import { getLogger } from '@stock-bot/logger'; import { PROXY_CONFIG } from '../shared/config'; -import { ProxyStatsManager } from '../shared/proxy-manager'; // Shared HTTP client let httpClient: HttpClient; @@ -21,7 +21,12 @@ function getHttpClient(ctx: OperationContext): HttpClient { * Check if a proxy is working */ export async function checkProxy(proxy: ProxyInfo): Promise { - const ctx = OperationContext.create('proxy', 'check'); + const ctx = { + logger: getLogger('proxy-check'), + resolve: (_name: string) => { + throw new Error(`Service container not available for proxy operations`); + } + } as any; let success = false; ctx.logger.debug(`Checking Proxy:`, { @@ -94,10 +99,12 @@ export async function checkProxy(proxy: ProxyInfo): Promise { * Update proxy data in cache with working/total stats and average response time */ async function updateProxyInCache(proxy: ProxyInfo, isWorking: boolean, ctx: OperationContext): Promise { - const cacheKey = `${PROXY_CONFIG.CACHE_KEY}:${proxy.protocol}://${proxy.host}:${proxy.port}`; + const _cacheKey = `${PROXY_CONFIG.CACHE_KEY}:${proxy.protocol}://${proxy.host}:${proxy.port}`; try { - const existing: ProxyInfo | null = await ctx.cache.get(cacheKey); + // For now, skip cache operations without service container + // TODO: Pass service container to operations + const existing: ProxyInfo | null = null; // For failed proxies, only update if they already exist if (!isWorking && !existing) { @@ -140,8 +147,9 @@ async function updateProxyInCache(proxy: ProxyInfo, isWorking: boolean, ctx: Ope updated.successRate = updated.total > 0 ? (updated.working / updated.total) * 100 : 0; // Save to cache: reset TTL for working proxies, keep existing TTL for failed ones - const cacheOptions = isWorking ? { ttl: PROXY_CONFIG.CACHE_TTL } : undefined; - await ctx.cache.set(cacheKey, updated, cacheOptions); + const _cacheOptions = isWorking ? { ttl: PROXY_CONFIG.CACHE_TTL } : undefined; + // Skip cache operations without service container + // TODO: Pass service container to operations ctx.logger.debug(`Updated ${isWorking ? 'working' : 'failed'} proxy in cache`, { proxy: `${proxy.host}:${proxy.port}`, @@ -161,15 +169,8 @@ async function updateProxyInCache(proxy: ProxyInfo, isWorking: boolean, ctx: Ope } function updateProxyStats(sourceId: string, success: boolean, ctx: OperationContext) { - const statsManager = ProxyStatsManager.getInstance(); - const source = statsManager.updateSourceStats(sourceId, success); + // Stats are now handled by the global ProxyManager + ctx.logger.debug('Proxy check result', { sourceId, success }); - if (!source) { - ctx.logger.warn(`Unknown proxy source: ${sourceId}`); - return; - } - - // Cache the updated stats - ctx.cache.set(`${PROXY_CONFIG.CACHE_STATS_KEY}:${source.id}`, source, { ttl: PROXY_CONFIG.CACHE_TTL }) - .catch(error => ctx.logger.debug('Failed to cache proxy stats', { error })); + // TODO: Integrate with global ProxyManager stats if needed } \ No newline at end of file diff --git a/apps/data-ingestion/src/handlers/proxy/operations/fetch.operations.ts b/apps/data-ingestion/src/handlers/proxy/operations/fetch.operations.ts index 5cabb8a..2305bee 100644 --- a/apps/data-ingestion/src/handlers/proxy/operations/fetch.operations.ts +++ b/apps/data-ingestion/src/handlers/proxy/operations/fetch.operations.ts @@ -3,9 +3,9 @@ */ import { HttpClient, ProxyInfo } from '@stock-bot/http'; import { OperationContext } from '@stock-bot/di'; +import { getLogger } from '@stock-bot/logger'; import { PROXY_CONFIG } from '../shared/config'; -import { ProxyStatsManager } from '../shared/proxy-manager'; import type { ProxySource } from '../shared/types'; // Shared HTTP client @@ -19,10 +19,11 @@ function getHttpClient(ctx: OperationContext): HttpClient { } export async function fetchProxiesFromSources(): Promise { - const ctx = OperationContext.create('proxy', 'fetch-sources'); + const ctx = { + logger: getLogger('proxy-fetch') + } as any; - const statsManager = ProxyStatsManager.getInstance(); - statsManager.resetStats(); + ctx.logger.info('Starting proxy fetch from sources'); const fetchPromises = PROXY_CONFIG.PROXY_SOURCES.map(source => fetchProxiesFromSource(source, ctx)); const results = await Promise.all(fetchPromises); diff --git a/apps/data-ingestion/src/handlers/proxy/proxy.handler.ts b/apps/data-ingestion/src/handlers/proxy/proxy.handler.ts index 44985a2..5ab4902 100644 --- a/apps/data-ingestion/src/handlers/proxy/proxy.handler.ts +++ b/apps/data-ingestion/src/handlers/proxy/proxy.handler.ts @@ -4,12 +4,12 @@ import { ProxyInfo } from '@stock-bot/http'; import { getLogger } from '@stock-bot/logger'; import { handlerRegistry, createJobHandler, type HandlerConfigWithSchedule } from '@stock-bot/queue'; -import type { ServiceContainer } from '@stock-bot/connection-factory'; +import type { ServiceContainer } from '@stock-bot/di'; const handlerLogger = getLogger('proxy-handler'); // Initialize and register the Proxy provider -export function initializeProxyProvider(container: ServiceContainer) { +export function initializeProxyProvider(_container: ServiceContainer) { handlerLogger.debug('Registering proxy provider with scheduled jobs...'); const proxyProviderConfig: HandlerConfigWithSchedule = { diff --git a/apps/data-ingestion/src/handlers/qm/operations/session.operations.ts b/apps/data-ingestion/src/handlers/qm/operations/session.operations.ts index 0f6e6c6..b5d85c5 100644 --- a/apps/data-ingestion/src/handlers/qm/operations/session.operations.ts +++ b/apps/data-ingestion/src/handlers/qm/operations/session.operations.ts @@ -4,15 +4,15 @@ import { OperationContext } from '@stock-bot/di'; import { isShutdownSignalReceived } from '@stock-bot/shutdown'; -import { getRandomProxy } from '@stock-bot/di'; -import type { ServiceContainer } from '@stock-bot/connection-factory'; +import { getRandomProxy } from '@stock-bot/utils'; +import type { ServiceContainer } from '@stock-bot/di'; import { QMSessionManager } from '../shared/session-manager'; import { QM_SESSION_IDS, QM_CONFIG, SESSION_CONFIG, getQmHeaders } from '../shared/config'; import type { QMSession } from '../shared/types'; export async function createSessions(container: ServiceContainer): Promise { - const ctx = OperationContext.create('qm', 'session', { container }); + const ctx = new OperationContext('qm-handler', 'create-sessions', container); try { ctx.logger.info('Creating QM sessions...'); @@ -33,7 +33,8 @@ export async function createSessions(container: ServiceContainer): Promise // Cache session creation stats const initialStats = sessionManager.getStats(); - await ctx.cache.set('pre-creation-stats', initialStats, { ttl: 300 }); + const cache = ctx.resolve('cache'); + await cache.set('pre-creation-stats', initialStats, { ttl: 300 }); // Create sessions for each session ID that needs them for (const [sessionKey, sessionId] of Object.entries(QM_SESSION_IDS)) { @@ -56,9 +57,9 @@ export async function createSessions(container: ServiceContainer): Promise const finalStats = sessionManager.getStats(); const totalSessions = sessionManager.getSessionCount(); - await ctx.cache.set('post-creation-stats', finalStats, { ttl: 3600 }); - await ctx.cache.set('session-count', totalSessions, { ttl: 900 }); - await ctx.cache.set('last-session-creation', new Date().toISOString()); + await cache.set('post-creation-stats', finalStats, { ttl: 3600 }); + await cache.set('session-count', totalSessions, { ttl: 900 }); + await cache.set('last-session-creation', new Date().toISOString()); ctx.logger.info('QM session creation completed', { totalSessions, @@ -68,8 +69,6 @@ export async function createSessions(container: ServiceContainer): Promise } catch (error) { ctx.logger.error('Failed to create QM sessions', { error }); throw error; - } finally { - await ctx.dispose(); } } @@ -134,7 +133,8 @@ async function createSingleSession( sessionManager.addSession(sessionId, newSession); // Cache successful session creation - await ctx.cache.set( + const cacheService = ctx.resolve('cache'); + await cacheService.set( `successful-session:${sessionKey}:${Date.now()}`, { sessionId, proxy, tokenExists: !!sessionData.token }, { ttl: 300 } @@ -156,7 +156,8 @@ async function createSingleSession( } // Cache failed session attempt for debugging - await ctx.cache.set( + const cacheService = ctx.resolve('cache'); + await cacheService.set( `failed-session:${sessionKey}:${Date.now()}`, { sessionId, proxy, error: error.message }, { ttl: 300 } @@ -165,25 +166,34 @@ async function createSingleSession( } export async function initializeQMResources(container?: ServiceContainer): Promise { - const ctx = OperationContext.create('qm', 'init', container ? { container } : undefined); - - // Check if already initialized - const alreadyInitialized = await ctx.cache.get('initialized'); - if (alreadyInitialized) { - ctx.logger.debug('QM resources already initialized'); - return; + if (!container) { + throw new Error('Service container is required for QM resource initialization'); } + + const ctx = new OperationContext('qm-handler', 'initialize-resources', container); + + try { + const cache = ctx.resolve('cache'); + + // Check if already initialized + const alreadyInitialized = await cache.get('initialized'); + if (alreadyInitialized) { + ctx.logger.debug('QM resources already initialized'); + return; + } - ctx.logger.debug('Initializing QM resources...'); - - // Mark as initialized in cache and session manager - await ctx.cache.set('initialized', true, { ttl: 3600 }); - await ctx.cache.set('initialization-time', new Date().toISOString()); - - const sessionManager = QMSessionManager.getInstance(); - sessionManager.setInitialized(true); - - ctx.logger.info('QM resources initialized successfully'); - - await ctx.dispose(); + ctx.logger.debug('Initializing QM resources...'); + + // Mark as initialized in cache and session manager + await cache.set('initialized', true, { ttl: 3600 }); + await cache.set('initialization-time', new Date().toISOString()); + + const sessionManager = QMSessionManager.getInstance(); + sessionManager.setInitialized(true); + + ctx.logger.info('QM resources initialized successfully'); + } catch (error) { + ctx.logger.error('Failed to initialize QM resources', { error }); + throw error; + } } \ No newline at end of file diff --git a/apps/data-ingestion/src/handlers/qm/qm.handler.ts b/apps/data-ingestion/src/handlers/qm/qm.handler.ts index df9a979..3c91a44 100644 --- a/apps/data-ingestion/src/handlers/qm/qm.handler.ts +++ b/apps/data-ingestion/src/handlers/qm/qm.handler.ts @@ -1,5 +1,4 @@ import { - BaseHandler, ScheduledHandler, Handler, Operation, diff --git a/apps/data-ingestion/src/handlers/webshare/webshare.handler.ts b/apps/data-ingestion/src/handlers/webshare/webshare.handler.ts index 59882ed..0c36f1c 100644 --- a/apps/data-ingestion/src/handlers/webshare/webshare.handler.ts +++ b/apps/data-ingestion/src/handlers/webshare/webshare.handler.ts @@ -13,7 +13,7 @@ import type { ServiceContainer } from '@stock-bot/di'; const logger = getLogger('webshare-provider'); // Initialize and register the WebShare provider -export function initializeWebShareProvider(container: ServiceContainer) { +export function initializeWebShareProvider(_container: ServiceContainer) { logger.debug('Registering WebShare provider with scheduled jobs...'); const webShareProviderConfig: HandlerConfigWithSchedule = { diff --git a/apps/data-ingestion/src/index.ts b/apps/data-ingestion/src/index.ts index 45b7099..7e00b6b 100644 --- a/apps/data-ingestion/src/index.ts +++ b/apps/data-ingestion/src/index.ts @@ -7,7 +7,8 @@ import { getLogger, setLoggerConfig, shutdownLoggers } from '@stock-bot/logger'; import type { QueueManager } from '@stock-bot/queue'; import { Shutdown } from '@stock-bot/shutdown'; import { ProxyManager } from '@stock-bot/utils'; -import type { ServiceContainer } from '@stock-bot/di'; +import { ServiceContainer, ConnectionFactory } from '@stock-bot/di'; +import { handlerRegistry } from '@stock-bot/handlers'; // Local imports import { setupServiceContainer } from './setup/database-setup'; import { createRoutes } from './routes/create-routes'; @@ -15,8 +16,7 @@ import { createRoutes } from './routes/create-routes'; const config = initializeServiceConfig(); console.log('Data Service Configuration:', JSON.stringify(config, null, 2)); const serviceConfig = config.service; -const databaseConfig = config.database; -const queueConfig = config.queue; +// Configuration will be passed to service container setup if (config.log) { setLoggerConfig({ @@ -34,6 +34,7 @@ const logger = getLogger('data-ingestion'); const PORT = serviceConfig.port; let server: ReturnType | null = null; let serviceContainer: ServiceContainer | null = null; +let connectionFactory: ConnectionFactory | null = null; let queueManager: QueueManager | null = null; let app: Hono | null = null; @@ -47,7 +48,9 @@ async function initializeServices() { try { // Initialize service container with connection pools logger.debug('Setting up service container with connection pools...'); - serviceContainer = await setupServiceContainer(); + const { container, factory } = await setupServiceContainer(); + serviceContainer = container; + connectionFactory = factory; logger.info('Service container initialized with connection pools'); // Create app with routes that have access to the container @@ -78,7 +81,7 @@ async function initializeServices() { await ProxyManager.initialize(); logger.info('Proxy manager initialized'); - // Initialize handlers (register handlers and scheduled jobs) + // Initialize handlers using the handler registry logger.debug('Initializing data handlers...'); const { initializeWebShareProvider } = await import('./handlers/webshare/webshare.handler'); const { initializeIBProvider } = await import('./handlers/ib/ib.handler'); @@ -92,10 +95,22 @@ async function initializeServices() { initializeQMProvider(serviceContainer); logger.info('Data handlers initialized with service container'); + + // Register handlers with queue system + logger.debug('Registering handlers with queue system...'); + try { + await queueManager.registerHandlers(handlerRegistry.getAllHandlers()); + logger.info('Handlers registered with queue system'); + } catch (error) { + logger.error('Failed to register handlers with queue system', { + error: error instanceof Error ? error.message : String(error), + stack: error instanceof Error ? error.stack : undefined + }); + throw error; + } // Create scheduled jobs from registered handlers logger.debug('Creating scheduled jobs from registered handlers...'); - const { handlerRegistry } = await import('@stock-bot/queue'); const allHandlers = handlerRegistry.getAllHandlers(); let totalScheduledJobs = 0; @@ -146,7 +161,10 @@ async function initializeServices() { logger.info('All services initialized successfully'); } catch (error) { - logger.error('Failed to initialize services', { error }); + logger.error('Failed to initialize services', { + error: error instanceof Error ? error.message : String(error), + stack: error instanceof Error ? error.stack : undefined + }); throw error; } } @@ -199,9 +217,13 @@ shutdown.onShutdownHigh(async () => { shutdown.onShutdownMedium(async () => { logger.info('Disposing service container and connections...'); try { + if (connectionFactory) { + await connectionFactory.disposeAll(); + logger.info('Connection factory disposed, all pools closed'); + } if (serviceContainer) { await serviceContainer.dispose(); - logger.info('Service container disposed, all connections closed'); + logger.info('Service container disposed'); } } catch (error) { logger.error('Error disposing service container', { error }); diff --git a/apps/data-ingestion/src/routes/create-routes.ts b/apps/data-ingestion/src/routes/create-routes.ts index 8c0e5c6..661ab04 100644 --- a/apps/data-ingestion/src/routes/create-routes.ts +++ b/apps/data-ingestion/src/routes/create-routes.ts @@ -1,5 +1,5 @@ import { Hono } from 'hono'; -import type { ServiceContainer } from '@stock-bot/connection-factory'; +import type { ServiceContainer } from '@stock-bot/di'; import { exchangeRoutes } from './exchange.routes'; import { healthRoutes } from './health.routes'; import { queueRoutes } from './queue.routes'; diff --git a/apps/data-ingestion/src/setup/database-setup.ts b/apps/data-ingestion/src/setup/database-setup.ts index fd02b99..3ea4d65 100644 --- a/apps/data-ingestion/src/setup/database-setup.ts +++ b/apps/data-ingestion/src/setup/database-setup.ts @@ -3,9 +3,10 @@ import { getLogger } from '@stock-bot/logger'; import { ConnectionFactory, ServiceContainer, - PoolSizeCalculator + PoolSizeCalculator, + createServiceContainer } from '@stock-bot/di'; -import type { ConnectionFactoryConfig, DynamicPoolConfig } from '@stock-bot/mongodb'; +import type { DynamicPoolConfig } from '@stock-bot/mongodb'; const logger = getLogger('database-setup'); @@ -40,23 +41,25 @@ export function createConnectionFactory(): ConnectionFactory { /** * Sets up the service container with all dependencies */ -export async function setupServiceContainer(): Promise { +export async function setupServiceContainer(): Promise<{ container: ServiceContainer, factory: ConnectionFactory }> { logger.info('Setting up service container for data-ingestion'); const connectionFactory = createConnectionFactory(); const dbConfig = getDatabaseConfig(); - const queueConfig = getQueueConfig(); - // Create base container - const container = new ServiceContainer('data-ingestion'); + // Create enhanced service container with connection factory + const container = createServiceContainer('data-ingestion', connectionFactory, { + database: dbConfig + }); - // Register MongoDB with dynamic pool sizing + // Override the default database connections with specific configurations + // MongoDB with dynamic pool sizing for batch operations container.register({ name: 'mongodb', factory: async () => { const poolSize = PoolSizeCalculator.calculate('data-ingestion', 'batch-import'); const pool = await connectionFactory.createMongoDB({ - name: 'default', + name: 'data-ingestion', config: { uri: dbConfig.mongodb.uri, database: dbConfig.mongodb.database, @@ -77,18 +80,15 @@ export async function setupServiceContainer(): Promise { return pool.client; }, singleton: true, - dispose: async (client) => { - await client.disconnect(); - } }); - // Register PostgreSQL + // PostgreSQL with optimized settings for data ingestion container.register({ name: 'postgres', factory: async () => { const poolSize = PoolSizeCalculator.calculate('data-ingestion'); const pool = await connectionFactory.createPostgreSQL({ - name: 'default', + name: 'data-ingestion', config: { host: dbConfig.postgresql.host, port: dbConfig.postgresql.port, @@ -107,17 +107,14 @@ export async function setupServiceContainer(): Promise { return pool.client; }, singleton: true, - dispose: async (client) => { - await client.disconnect(); - } }); - // Register Cache + // Cache with data-ingestion specific configuration container.register({ name: 'cache', factory: async () => { const pool = await connectionFactory.createCache({ - name: 'default', + name: 'data-ingestion', config: { host: dbConfig.dragonfly.host, port: dbConfig.dragonfly.port, @@ -129,12 +126,12 @@ export async function setupServiceContainer(): Promise { singleton: true, }); - // Register QueueManager + // Queue with data-ingestion specific configuration container.register({ name: 'queue', factory: async () => { const pool = await connectionFactory.createQueue({ - name: 'default', + name: 'data-ingestion', config: { host: dbConfig.dragonfly.host, port: dbConfig.dragonfly.port, @@ -144,19 +141,6 @@ export async function setupServiceContainer(): Promise { return pool.client; }, singleton: true, - dispose: async (queueManager) => { - await queueManager.shutdown(); - } - }); - - // Register the connection factory itself for pool management - container.register({ - name: 'connectionFactory', - factory: () => connectionFactory, - singleton: true, - dispose: async (factory) => { - await factory.disposeAll(); - } }); logger.info('Service container setup complete'); @@ -166,7 +150,7 @@ export async function setupServiceContainer(): Promise { await enableDynamicPoolSizing(container); } - return container; + return { container, factory: connectionFactory }; } /** diff --git a/config/default.json b/config/default.json index 3f715ad..1e26b67 100644 --- a/config/default.json +++ b/config/default.json @@ -85,8 +85,8 @@ "type": "exponential", "delay": 1000 }, - "removeOnComplete": true, - "removeOnFail": false + "removeOnComplete": 100, + "removeOnFail": 50 } }, "features": { diff --git a/libs/core/config/config/default.json b/libs/core/config/config/default.json index 1b7310d..6314b96 100644 --- a/libs/core/config/config/default.json +++ b/libs/core/config/config/default.json @@ -75,8 +75,8 @@ "type": "exponential", "delay": 1000 }, - "removeOnComplete": true, - "removeOnFail": false + "removeOnComplete": 100, + "removeOnFail": 50 } }, "http": { diff --git a/libs/core/config/config/test.json b/libs/core/config/config/test.json index f362037..0cf6dcb 100644 --- a/libs/core/config/config/test.json +++ b/libs/core/config/config/test.json @@ -31,8 +31,8 @@ }, "defaultJobOptions": { "attempts": 1, - "removeOnComplete": false, - "removeOnFail": false + "removeOnComplete": 100, + "removeOnFail": 50 } }, "http": { diff --git a/libs/services/queue/src/dlq-handler.ts b/libs/services/queue/src/dlq-handler.ts index 76b2a2d..640f69b 100644 --- a/libs/services/queue/src/dlq-handler.ts +++ b/libs/services/queue/src/dlq-handler.ts @@ -1,5 +1,5 @@ -import { Queue, type Job } from 'bullmq'; import { getLogger } from '@stock-bot/logger'; +import { Queue, type Job } from 'bullmq'; import type { DLQConfig, RedisConfig } from './types'; import { getRedisConnection } from './utils'; @@ -76,8 +76,8 @@ export class DeadLetterQueueHandler { }; await this.dlq.add('failed-job', dlqData, { - removeOnComplete: false, - removeOnFail: false, + removeOnComplete: 100, + removeOnFail: 50, }); logger.error('Job moved to DLQ', {