import type { Logger } from '@stock-bot/core/logger'; import { Collection, Db, MongoClient } from 'mongodb'; import type { OptionalUnlessRequiredId } from 'mongodb'; import type { ConnectionEvents, DocumentBase, DynamicPoolConfig, MongoDBClientConfig, PoolMetrics } from './types'; /** * MongoDB Client for Stock Bot Data Service * * MongoDB client focused on batch upsert operations * with minimal configuration and no health monitoring complexity. */ export class MongoDBClient { private client: MongoClient | null = null; private db: Db | null = null; private readonly config: MongoDBClientConfig; private defaultDatabase: string; private readonly logger: Logger; private isConnected = false; private readonly metrics: PoolMetrics; private readonly events?: ConnectionEvents; private dynamicPoolConfig?: DynamicPoolConfig; private poolMonitorInterval?: Timer; constructor(mongoConfig: MongoDBClientConfig, logger: Logger, events?: ConnectionEvents) { this.config = mongoConfig; this.defaultDatabase = mongoConfig.database || 'stock'; this.logger = logger; this.events = events; this.metrics = { totalConnections: 0, activeConnections: 0, idleConnections: 0, waitingRequests: 0, errors: 0, created: new Date(), }; } /** * Connect to MongoDB with simple configuration */ async connect(): Promise { if (this.isConnected && this.client) { return; } try { const uri = this.buildConnectionUri(); this.logger.info('Connecting to MongoDB...'); this.client = new MongoClient(uri, { maxPoolSize: this.config.poolSettings?.maxPoolSize || 10, minPoolSize: this.config.poolSettings?.minPoolSize || 1, connectTimeoutMS: this.config.timeouts?.connectTimeout || 10000, socketTimeoutMS: this.config.timeouts?.socketTimeout || 30000, serverSelectionTimeoutMS: this.config.timeouts?.serverSelectionTimeout || 5000, }); await this.client.connect(); await this.client.db(this.defaultDatabase).admin().ping(); // Set default database from config this.db = this.client.db(this.defaultDatabase); this.isConnected = true; // Update metrics this.metrics.totalConnections = this.config.poolSettings?.maxPoolSize || 10; this.metrics.idleConnections = this.metrics.totalConnections; // Fire connection event if (this.events?.onConnect) { await Promise.resolve(this.events.onConnect()); } // Fire pool created event if (this.events?.onPoolCreated) { await Promise.resolve(this.events.onPoolCreated()); } this.logger.info('Successfully connected to MongoDB', { database: this.defaultDatabase, poolSize: this.metrics.totalConnections, }); // Start pool monitoring if dynamic sizing is enabled if (this.dynamicPoolConfig?.enabled) { this.startPoolMonitoring(); } } catch (error) { this.metrics.errors++; this.metrics.lastError = error instanceof Error ? error.message : 'Unknown error'; // Fire error event if (this.events?.onError) { await Promise.resolve(this.events.onError(error as Error)); } this.logger.error('MongoDB connection failed:', error); if (this.client) { await this.client.close(); this.client = null; } throw error; } } /** * Disconnect from MongoDB */ async disconnect(): Promise { if (!this.client) { return; } try { // Stop pool monitoring if (this.poolMonitorInterval) { clearInterval(this.poolMonitorInterval); this.poolMonitorInterval = undefined; } await this.client.close(); this.isConnected = false; this.client = null; this.db = null; // Fire disconnect event if (this.events?.onDisconnect) { await Promise.resolve(this.events.onDisconnect()); } this.logger.info('Disconnected from MongoDB'); } catch (error) { this.logger.error('Error disconnecting from MongoDB:', error); throw error; } } /** * Set the default database for operations */ setDefaultDatabase(databaseName: string): void { this.defaultDatabase = databaseName; if (this.client) { this.db = this.client.db(databaseName); this.logger.debug(`Default database changed to: ${databaseName}`); } } /** * Get the current default database name */ getDefaultDatabase(): string { return this.defaultDatabase; } /** * Get a database instance by name */ getDatabase(databaseName?: string): Db { if (!this.client) { throw new Error('MongoDB client not connected'); } const dbName = databaseName || this.defaultDatabase; return this.client.db(dbName); } /** * Batch upsert documents for high-performance operations * Supports single or multiple unique keys for matching */ async batchUpsert( collectionName: string, documents: Array< Omit & Partial> >, uniqueKeys: string | string[], options: { chunkSize?: number; database?: string; // Optional database override } = {} ): Promise<{ insertedCount: number; updatedCount: number; errors: unknown[] }> { if (!this.client) { throw new Error('MongoDB client not connected'); } if (documents.length === 0) { return { insertedCount: 0, updatedCount: 0, errors: [] }; } // Normalize uniqueKeys to array const keyFields = Array.isArray(uniqueKeys) ? uniqueKeys : [uniqueKeys]; if (keyFields.length === 0) { throw new Error('At least one unique key must be provided'); } const { chunkSize = 10000, database } = options; const db = this.getDatabase(database); const collection = db.collection(collectionName); const operationId = Math.random().toString(36).substring(7); const dbName = database || this.defaultDatabase; let totalInserted = 0; let totalUpdated = 0; const errors: unknown[] = []; this.logger.info(`Starting batch upsert operation [${collectionName}-${documents.length}][${operationId}]`, { database: dbName, collection: collectionName, totalDocuments: documents.length, uniqueKeys: keyFields, chunkSize, }); // Process documents in chunks to avoid memory issues for (let i = 0; i < documents.length; i += chunkSize) { const chunk = documents.slice(i, i + chunkSize); try { const startTime = Date.now(); // Prepare bulk operations const bulkOps = chunk.map(doc => { const now = new Date(); const docWithTimestamps = { ...doc, created_at: doc.created_at || now, updated_at: now, }; // Create filter using multiple unique keys const filter: Record = {}; keyFields.forEach(key => { const value = (doc as Record)[key]; if (value === undefined || value === null) { throw new Error(`Document missing required unique key: ${key}`); } filter[key] = value; }); // Remove created_at from $set to avoid conflict with $setOnInsert const { created_at, ...updateFields } = docWithTimestamps; return { updateOne: { filter, update: { $set: updateFields, $setOnInsert: { created_at }, }, upsert: true, }, }; }); // Execute bulk operation with type assertion to handle complex MongoDB types const result = await collection.bulkWrite(bulkOps as never, { ordered: false }); const executionTime = Date.now() - startTime; const inserted = result.upsertedCount; const updated = result.modifiedCount; totalInserted += inserted; totalUpdated += updated; this.logger.debug(`Batch upsert chunk processed [${operationId}]`, { chunkNumber: Math.floor(i / chunkSize) + 1, chunkSize: chunk.length, inserted, updated, executionTime, database: dbName, collection: collectionName, }); } catch (error) { this.logger.error(`Batch upsert failed on chunk [${operationId}]`, { error, database: dbName, collection: collectionName, chunkNumber: Math.floor(i / chunkSize) + 1, chunkStart: i, chunkSize: chunk.length, uniqueKeys: keyFields, }); errors.push(error); } } this.logger.info(`Batch upsert completed [${operationId}]`, { database: dbName, collection: collectionName, totalRecords: documents.length, inserted: totalInserted, updated: totalUpdated, errors: errors.length, uniqueKeys: keyFields, }); return { insertedCount: totalInserted, updatedCount: totalUpdated, errors }; } /** * Get a typed collection */ getCollection(name: string, database?: string): Collection { const db = this.getDatabase(database); return db.collection(name); } /** * Get a collection (interface compatibility method) * This method provides compatibility with the IMongoDBClient interface */ collection(name: string, database?: string): Collection { return this.getCollection(name, database); } /** * Simple insert operation */ async insertOne( collectionName: string, document: Omit & Partial>, database?: string ): Promise { const collection = this.getCollection(collectionName, database); const now = new Date(); const docWithTimestamps = { ...document, created_at: document.created_at || now, updated_at: now, } as T; const result = await collection.insertOne(docWithTimestamps as OptionalUnlessRequiredId); return { ...docWithTimestamps, _id: result.insertedId } as T; } /** * Check if client is connected */ get connected(): boolean { return this.isConnected && !!this.client; } /** * Get the default database instance */ get database(): Db | null { return this.db; } /** * Convenience methods for common databases */ // Stock database operations async batchUpsertStock( collectionName: string, documents: Array< Omit & Partial> >, uniqueKeys: string | string[], options: { chunkSize?: number } = {} ) { return this.batchUpsert(collectionName, documents, uniqueKeys, { ...options, database: 'stock', }); } // Trading documents database operations async batchUpsertTrading( collectionName: string, documents: Array< Omit & Partial> >, uniqueKeys: string | string[], options: { chunkSize?: number } = {} ) { return this.batchUpsert(collectionName, documents, uniqueKeys, { ...options, database: 'trading_documents', }); } // Analytics database operations async batchUpsertAnalytics( collectionName: string, documents: Array< Omit & Partial> >, uniqueKeys: string | string[], options: { chunkSize?: number } = {} ) { return this.batchUpsert(collectionName, documents, uniqueKeys, { ...options, database: 'analytics', }); } private buildConnectionUri(): string { if (this.config.uri) { return this.config.uri; } const { host, port, username, password, database, authSource } = this.config; // Build URI components const auth = username && password ? `${username}:${password}@` : ''; const authParam = authSource && username ? `?authSource=${authSource}` : ''; return `mongodb://${auth}${host}:${port}/${database}${authParam}`; } /** * Get current pool metrics */ getPoolMetrics(): PoolMetrics { // Update last used timestamp this.metrics.lastUsed = new Date(); // Note: MongoDB driver doesn't expose detailed pool metrics // These are estimates based on configuration return { ...this.metrics }; } /** * Set dynamic pool configuration */ setDynamicPoolConfig(config: DynamicPoolConfig): void { this.dynamicPoolConfig = config; if (config.enabled && this.isConnected && !this.poolMonitorInterval) { this.startPoolMonitoring(); } else if (!config.enabled && this.poolMonitorInterval) { clearInterval(this.poolMonitorInterval); this.poolMonitorInterval = undefined; } } /** * Start monitoring pool and adjust size dynamically */ private startPoolMonitoring(): void { if (!this.dynamicPoolConfig || this.poolMonitorInterval) { return; } this.poolMonitorInterval = setInterval(() => { this.evaluatePoolSize(); }, this.dynamicPoolConfig.evaluationInterval); } /** * Evaluate and adjust pool size based on usage */ private async evaluatePoolSize(): Promise { if (!this.dynamicPoolConfig || !this.client) { return; } const { minSize, maxSize, scaleUpThreshold, scaleDownThreshold } = this.dynamicPoolConfig; const currentSize = this.metrics.totalConnections; const utilization = ((this.metrics.activeConnections / currentSize) * 100); this.logger.debug('Pool utilization', { utilization: `${utilization.toFixed(1)}%`, active: this.metrics.activeConnections, total: currentSize, }); // Scale up if utilization is high if (utilization > scaleUpThreshold && currentSize < maxSize) { const newSize = Math.min(currentSize + this.dynamicPoolConfig.scaleUpIncrement, maxSize); await this.resizePool(newSize); this.logger.info('Scaling up connection pool', { from: currentSize, to: newSize, utilization }); } // Scale down if utilization is low else if (utilization < scaleDownThreshold && currentSize > minSize) { const newSize = Math.max(currentSize - this.dynamicPoolConfig.scaleDownIncrement, minSize); await this.resizePool(newSize); this.logger.info('Scaling down connection pool', { from: currentSize, to: newSize, utilization }); } } /** * Resize the connection pool * Note: MongoDB driver doesn't support dynamic resizing, this would require reconnection */ private async resizePool(newSize: number): Promise { // MongoDB doesn't support dynamic pool resizing // This is a placeholder for future implementation this.logger.warn('Dynamic pool resizing not yet implemented for MongoDB', { requestedSize: newSize }); // Update metrics to reflect desired state this.metrics.totalConnections = newSize; } /** * Enable pool warmup on connect */ async warmupPool(): Promise { if (!this.client || !this.isConnected) { throw new Error('Client not connected'); } const minSize = this.config.poolSettings?.minPoolSize || 1; const promises: Promise[] = []; // Create minimum connections by running parallel pings for (let i = 0; i < minSize; i++) { promises.push( this.client.db(this.defaultDatabase).admin().ping() .then(() => { this.logger.debug(`Warmed up connection ${i + 1}/${minSize}`); }) .catch(error => { this.logger.warn(`Failed to warm up connection ${i + 1}`, { error }); }) ); } await Promise.allSettled(promises); this.logger.info('Connection pool warmup complete', { connections: minSize }); } }