import { Collection, Db, MongoClient, OptionalUnlessRequiredId } from 'mongodb'; import { getLogger } from '@stock-bot/logger'; import type { DocumentBase, MongoDBClientConfig } from './types'; /** * MongoDB Client for Stock Bot Data Service * * MongoDB client focused on batch upsert operations * with minimal configuration and no health monitoring complexity. */ export class MongoDBClient { private client: MongoClient | null = null; private db: Db | null = null; private readonly config: MongoDBClientConfig; private defaultDatabase: string; private readonly logger = getLogger('mongodb-client'); private isConnected = false; constructor(config: MongoDBClientConfig) { this.config = config; this.defaultDatabase = config.database || 'stock'; } /** * Connect to MongoDB with simple configuration */ async connect(): Promise { if (this.isConnected && this.client) { return; } try { const uri = this.buildConnectionUri(); this.logger.info('Connecting to MongoDB...'); this.client = new MongoClient(uri, { maxPoolSize: this.config.poolSettings?.maxPoolSize || 10, minPoolSize: this.config.poolSettings?.minPoolSize || 1, connectTimeoutMS: this.config.timeouts?.connectTimeout || 10000, socketTimeoutMS: this.config.timeouts?.socketTimeout || 30000, serverSelectionTimeoutMS: this.config.timeouts?.serverSelectionTimeout || 5000, }); await this.client.connect(); await this.client.db(this.defaultDatabase).admin().ping(); // Set default database from config this.db = this.client.db(this.defaultDatabase); this.isConnected = true; this.logger.info('Successfully connected to MongoDB'); } catch (error) { this.logger.error('MongoDB connection failed:', error); if (this.client) { await this.client.close(); this.client = null; } throw error; } } /** * Disconnect from MongoDB */ async disconnect(): Promise { if (!this.client) { return; } try { await this.client.close(); this.isConnected = false; this.client = null; this.db = null; this.logger.info('Disconnected from MongoDB'); } catch (error) { this.logger.error('Error disconnecting from MongoDB:', error); throw error; } } /** * Set the default database for operations */ setDefaultDatabase(databaseName: string): void { this.defaultDatabase = databaseName; if (this.client) { this.db = this.client.db(databaseName); this.logger.debug(`Default database changed to: ${databaseName}`); } } /** * Get the current default database name */ getDefaultDatabase(): string { return this.defaultDatabase; } /** * Get a database instance by name */ getDatabase(databaseName?: string): Db { if (!this.client) { throw new Error('MongoDB client not connected'); } const dbName = databaseName || this.defaultDatabase; return this.client.db(dbName); } /** * Batch upsert documents for high-performance operations * Supports single or multiple unique keys for matching */ async batchUpsert( collectionName: string, documents: Array< Omit & Partial> >, uniqueKeys: string | string[], options: { chunkSize?: number; database?: string; // Optional database override } = {} ): Promise<{ insertedCount: number; updatedCount: number; errors: unknown[] }> { if (!this.client) { throw new Error('MongoDB client not connected'); } if (documents.length === 0) { return { insertedCount: 0, updatedCount: 0, errors: [] }; } // Normalize uniqueKeys to array const keyFields = Array.isArray(uniqueKeys) ? uniqueKeys : [uniqueKeys]; if (keyFields.length === 0) { throw new Error('At least one unique key must be provided'); } const { chunkSize = 10000, database } = options; const db = this.getDatabase(database); const collection = db.collection(collectionName); const operationId = Math.random().toString(36).substring(7); const dbName = database || this.defaultDatabase; let totalInserted = 0; let totalUpdated = 0; const errors: unknown[] = []; this.logger.info(`Starting batch upsert operation [${operationId}]`, { database: dbName, collection: collectionName, totalDocuments: documents.length, uniqueKeys: keyFields, chunkSize, }); // Process documents in chunks to avoid memory issues for (let i = 0; i < documents.length; i += chunkSize) { const chunk = documents.slice(i, i + chunkSize); try { const startTime = Date.now(); // Prepare bulk operations const bulkOps = chunk.map(doc => { const now = new Date(); const docWithTimestamps = { ...doc, created_at: doc.created_at || now, updated_at: now, }; // Create filter using multiple unique keys const filter: Record = {}; keyFields.forEach(key => { const value = (doc as Record)[key]; if (value === undefined || value === null) { throw new Error(`Document missing required unique key: ${key}`); } filter[key] = value; }); // Remove created_at from $set to avoid conflict with $setOnInsert const { created_at, ...updateFields } = docWithTimestamps; return { updateOne: { filter, update: { $set: updateFields, $setOnInsert: { created_at }, }, upsert: true, }, }; }); // Execute bulk operation with type assertion to handle complex MongoDB types const result = await collection.bulkWrite(bulkOps as never, { ordered: false }); const executionTime = Date.now() - startTime; const inserted = result.upsertedCount; const updated = result.modifiedCount; totalInserted += inserted; totalUpdated += updated; this.logger.debug(`Batch upsert chunk processed [${operationId}]`, { chunkNumber: Math.floor(i / chunkSize) + 1, chunkSize: chunk.length, inserted, updated, executionTime, database: dbName, collection: collectionName, }); } catch (error) { this.logger.error(`Batch upsert failed on chunk [${operationId}]`, { error, database: dbName, collection: collectionName, chunkNumber: Math.floor(i / chunkSize) + 1, chunkStart: i, chunkSize: chunk.length, uniqueKeys: keyFields, }); errors.push(error); } } this.logger.info(`Batch upsert completed [${operationId}]`, { database: dbName, collection: collectionName, totalRecords: documents.length, inserted: totalInserted, updated: totalUpdated, errors: errors.length, uniqueKeys: keyFields, }); return { insertedCount: totalInserted, updatedCount: totalUpdated, errors }; } /** * Get a typed collection */ getCollection(name: string, database?: string): Collection { const db = this.getDatabase(database); return db.collection(name); } /** * Simple insert operation */ async insertOne( collectionName: string, document: Omit & Partial>, database?: string ): Promise { const collection = this.getCollection(collectionName, database); const now = new Date(); const docWithTimestamps = { ...document, created_at: document.created_at || now, updated_at: now, } as T; const result = await collection.insertOne(docWithTimestamps as OptionalUnlessRequiredId); return { ...docWithTimestamps, _id: result.insertedId } as T; } /** * Check if client is connected */ get connected(): boolean { return this.isConnected && !!this.client; } /** * Get the default database instance */ get database(): Db | null { return this.db; } /** * Convenience methods for common databases */ // Stock database operations async batchUpsertStock( collectionName: string, documents: Array< Omit & Partial> >, uniqueKeys: string | string[], options: { chunkSize?: number } = {} ) { return this.batchUpsert(collectionName, documents, uniqueKeys, { ...options, database: 'stock', }); } // Trading documents database operations async batchUpsertTrading( collectionName: string, documents: Array< Omit & Partial> >, uniqueKeys: string | string[], options: { chunkSize?: number } = {} ) { return this.batchUpsert(collectionName, documents, uniqueKeys, { ...options, database: 'trading_documents', }); } // Analytics database operations async batchUpsertAnalytics( collectionName: string, documents: Array< Omit & Partial> >, uniqueKeys: string | string[], options: { chunkSize?: number } = {} ) { return this.batchUpsert(collectionName, documents, uniqueKeys, { ...options, database: 'analytics', }); } private buildConnectionUri(): string { if (this.config.uri) { return this.config.uri; } const { host, port, username, password, database, authSource } = this.config; // Build URI components const auth = username && password ? `${username}:${password}@` : ''; const authParam = authSource && username ? `?authSource=${authSource}` : ''; return `mongodb://${auth}${host}:${port}/${database}${authParam}`; } }