stock-bot/libs/data/mongodb/src/client.ts

528 lines
16 KiB
TypeScript

import { getLogger } from '@stock-bot/logger';
import { Collection, Db, MongoClient, OptionalUnlessRequiredId } from 'mongodb';
import type { DocumentBase, MongoDBClientConfig, PoolMetrics, ConnectionEvents, DynamicPoolConfig } from './types';
/**
* MongoDB Client for Stock Bot Data Service
*
* MongoDB client focused on batch upsert operations
* with minimal configuration and no health monitoring complexity.
*/
export class MongoDBClient {
private client: MongoClient | null = null;
private db: Db | null = null;
private readonly config: MongoDBClientConfig;
private defaultDatabase: string;
private readonly logger = getLogger('mongodb-client');
private isConnected = false;
private readonly metrics: PoolMetrics;
private readonly events?: ConnectionEvents;
private dynamicPoolConfig?: DynamicPoolConfig;
private poolMonitorInterval?: Timer;
constructor(config: MongoDBClientConfig, events?: ConnectionEvents) {
this.config = config;
this.defaultDatabase = config.database || 'stock';
this.events = events;
this.metrics = {
totalConnections: 0,
activeConnections: 0,
idleConnections: 0,
waitingRequests: 0,
errors: 0,
created: new Date(),
};
}
/**
* Connect to MongoDB with simple configuration
*/
async connect(): Promise<void> {
if (this.isConnected && this.client) {
return;
}
try {
const uri = this.buildConnectionUri();
this.logger.info('Connecting to MongoDB...');
this.client = new MongoClient(uri, {
maxPoolSize: this.config.poolSettings?.maxPoolSize || 10,
minPoolSize: this.config.poolSettings?.minPoolSize || 1,
connectTimeoutMS: this.config.timeouts?.connectTimeout || 10000,
socketTimeoutMS: this.config.timeouts?.socketTimeout || 30000,
serverSelectionTimeoutMS: this.config.timeouts?.serverSelectionTimeout || 5000,
});
await this.client.connect();
await this.client.db(this.defaultDatabase).admin().ping();
// Set default database from config
this.db = this.client.db(this.defaultDatabase);
this.isConnected = true;
// Update metrics
this.metrics.totalConnections = this.config.poolSettings?.maxPoolSize || 10;
this.metrics.idleConnections = this.metrics.totalConnections;
// Fire connection event
if (this.events?.onConnect) {
await Promise.resolve(this.events.onConnect());
}
// Fire pool created event
if (this.events?.onPoolCreated) {
await Promise.resolve(this.events.onPoolCreated());
}
this.logger.info('Successfully connected to MongoDB', {
database: this.defaultDatabase,
poolSize: this.metrics.totalConnections,
});
// Start pool monitoring if dynamic sizing is enabled
if (this.dynamicPoolConfig?.enabled) {
this.startPoolMonitoring();
}
} catch (error) {
this.metrics.errors++;
this.metrics.lastError = error instanceof Error ? error.message : 'Unknown error';
// Fire error event
if (this.events?.onError) {
await Promise.resolve(this.events.onError(error as Error));
}
this.logger.error('MongoDB connection failed:', error);
if (this.client) {
await this.client.close();
this.client = null;
}
throw error;
}
}
/**
* Disconnect from MongoDB
*/
async disconnect(): Promise<void> {
if (!this.client) {
return;
}
try {
// Stop pool monitoring
if (this.poolMonitorInterval) {
clearInterval(this.poolMonitorInterval);
this.poolMonitorInterval = undefined;
}
await this.client.close();
this.isConnected = false;
this.client = null;
this.db = null;
// Fire disconnect event
if (this.events?.onDisconnect) {
await Promise.resolve(this.events.onDisconnect());
}
this.logger.info('Disconnected from MongoDB');
} catch (error) {
this.logger.error('Error disconnecting from MongoDB:', error);
throw error;
}
}
/**
* Set the default database for operations
*/
setDefaultDatabase(databaseName: string): void {
this.defaultDatabase = databaseName;
if (this.client) {
this.db = this.client.db(databaseName);
this.logger.debug(`Default database changed to: ${databaseName}`);
}
}
/**
* Get the current default database name
*/
getDefaultDatabase(): string {
return this.defaultDatabase;
}
/**
* Get a database instance by name
*/
getDatabase(databaseName?: string): Db {
if (!this.client) {
throw new Error('MongoDB client not connected');
}
const dbName = databaseName || this.defaultDatabase;
return this.client.db(dbName);
}
/**
* Batch upsert documents for high-performance operations
* Supports single or multiple unique keys for matching
*/
async batchUpsert<T extends DocumentBase>(
collectionName: string,
documents: Array<
Omit<T, '_id' | 'created_at' | 'updated_at'> & Partial<Pick<T, 'created_at' | 'updated_at'>>
>,
uniqueKeys: string | string[],
options: {
chunkSize?: number;
database?: string; // Optional database override
} = {}
): Promise<{ insertedCount: number; updatedCount: number; errors: unknown[] }> {
if (!this.client) {
throw new Error('MongoDB client not connected');
}
if (documents.length === 0) {
return { insertedCount: 0, updatedCount: 0, errors: [] };
}
// Normalize uniqueKeys to array
const keyFields = Array.isArray(uniqueKeys) ? uniqueKeys : [uniqueKeys];
if (keyFields.length === 0) {
throw new Error('At least one unique key must be provided');
}
const { chunkSize = 10000, database } = options;
const db = this.getDatabase(database);
const collection = db.collection<T>(collectionName);
const operationId = Math.random().toString(36).substring(7);
const dbName = database || this.defaultDatabase;
let totalInserted = 0;
let totalUpdated = 0;
const errors: unknown[] = [];
this.logger.info(`Starting batch upsert operation [${collectionName}-${documents.length}][${operationId}]`, {
database: dbName,
collection: collectionName,
totalDocuments: documents.length,
uniqueKeys: keyFields,
chunkSize,
});
// Process documents in chunks to avoid memory issues
for (let i = 0; i < documents.length; i += chunkSize) {
const chunk = documents.slice(i, i + chunkSize);
try {
const startTime = Date.now();
// Prepare bulk operations
const bulkOps = chunk.map(doc => {
const now = new Date();
const docWithTimestamps = {
...doc,
created_at: doc.created_at || now,
updated_at: now,
};
// Create filter using multiple unique keys
const filter: Record<string, unknown> = {};
keyFields.forEach(key => {
const value = (doc as Record<string, unknown>)[key];
if (value === undefined || value === null) {
throw new Error(`Document missing required unique key: ${key}`);
}
filter[key] = value;
});
// Remove created_at from $set to avoid conflict with $setOnInsert
const { created_at, ...updateFields } = docWithTimestamps;
return {
updateOne: {
filter,
update: {
$set: updateFields,
$setOnInsert: { created_at },
},
upsert: true,
},
};
});
// Execute bulk operation with type assertion to handle complex MongoDB types
const result = await collection.bulkWrite(bulkOps as never, { ordered: false });
const executionTime = Date.now() - startTime;
const inserted = result.upsertedCount;
const updated = result.modifiedCount;
totalInserted += inserted;
totalUpdated += updated;
this.logger.debug(`Batch upsert chunk processed [${operationId}]`, {
chunkNumber: Math.floor(i / chunkSize) + 1,
chunkSize: chunk.length,
inserted,
updated,
executionTime,
database: dbName,
collection: collectionName,
});
} catch (error) {
this.logger.error(`Batch upsert failed on chunk [${operationId}]`, {
error,
database: dbName,
collection: collectionName,
chunkNumber: Math.floor(i / chunkSize) + 1,
chunkStart: i,
chunkSize: chunk.length,
uniqueKeys: keyFields,
});
errors.push(error);
}
}
this.logger.info(`Batch upsert completed [${operationId}]`, {
database: dbName,
collection: collectionName,
totalRecords: documents.length,
inserted: totalInserted,
updated: totalUpdated,
errors: errors.length,
uniqueKeys: keyFields,
});
return { insertedCount: totalInserted, updatedCount: totalUpdated, errors };
}
/**
* Get a typed collection
*/
getCollection<T extends DocumentBase>(name: string, database?: string): Collection<T> {
const db = this.getDatabase(database);
return db.collection<T>(name);
}
/**
* Get a collection (interface compatibility method)
* This method provides compatibility with the IMongoDBClient interface
*/
collection(name: string, database?: string): Collection<any> {
return this.getCollection(name, database);
}
/**
* Simple insert operation
*/
async insertOne<T extends DocumentBase>(
collectionName: string,
document: Omit<T, '_id' | 'created_at' | 'updated_at'> &
Partial<Pick<T, 'created_at' | 'updated_at'>>,
database?: string
): Promise<T> {
const collection = this.getCollection<T>(collectionName, database);
const now = new Date();
const docWithTimestamps = {
...document,
created_at: document.created_at || now,
updated_at: now,
} as T;
const result = await collection.insertOne(docWithTimestamps as OptionalUnlessRequiredId<T>);
return { ...docWithTimestamps, _id: result.insertedId } as T;
}
/**
* Check if client is connected
*/
get connected(): boolean {
return this.isConnected && !!this.client;
}
/**
* Get the default database instance
*/
get database(): Db | null {
return this.db;
}
/**
* Convenience methods for common databases
*/
// Stock database operations
async batchUpsertStock<T extends DocumentBase>(
collectionName: string,
documents: Array<
Omit<T, '_id' | 'created_at' | 'updated_at'> & Partial<Pick<T, 'created_at' | 'updated_at'>>
>,
uniqueKeys: string | string[],
options: { chunkSize?: number } = {}
) {
return this.batchUpsert(collectionName, documents, uniqueKeys, {
...options,
database: 'stock',
});
}
// Trading documents database operations
async batchUpsertTrading<T extends DocumentBase>(
collectionName: string,
documents: Array<
Omit<T, '_id' | 'created_at' | 'updated_at'> & Partial<Pick<T, 'created_at' | 'updated_at'>>
>,
uniqueKeys: string | string[],
options: { chunkSize?: number } = {}
) {
return this.batchUpsert(collectionName, documents, uniqueKeys, {
...options,
database: 'trading_documents',
});
}
// Analytics database operations
async batchUpsertAnalytics<T extends DocumentBase>(
collectionName: string,
documents: Array<
Omit<T, '_id' | 'created_at' | 'updated_at'> & Partial<Pick<T, 'created_at' | 'updated_at'>>
>,
uniqueKeys: string | string[],
options: { chunkSize?: number } = {}
) {
return this.batchUpsert(collectionName, documents, uniqueKeys, {
...options,
database: 'analytics',
});
}
private buildConnectionUri(): string {
if (this.config.uri) {
return this.config.uri;
}
const { host, port, username, password, database, authSource } = this.config;
// Build URI components
const auth = username && password ? `${username}:${password}@` : '';
const authParam = authSource && username ? `?authSource=${authSource}` : '';
return `mongodb://${auth}${host}:${port}/${database}${authParam}`;
}
/**
* Get current pool metrics
*/
getPoolMetrics(): PoolMetrics {
// Update last used timestamp
this.metrics.lastUsed = new Date();
// Note: MongoDB driver doesn't expose detailed pool metrics
// These are estimates based on configuration
return { ...this.metrics };
}
/**
* Set dynamic pool configuration
*/
setDynamicPoolConfig(config: DynamicPoolConfig): void {
this.dynamicPoolConfig = config;
if (config.enabled && this.isConnected && !this.poolMonitorInterval) {
this.startPoolMonitoring();
} else if (!config.enabled && this.poolMonitorInterval) {
clearInterval(this.poolMonitorInterval);
this.poolMonitorInterval = undefined;
}
}
/**
* Start monitoring pool and adjust size dynamically
*/
private startPoolMonitoring(): void {
if (!this.dynamicPoolConfig || this.poolMonitorInterval) {
return;
}
this.poolMonitorInterval = setInterval(() => {
this.evaluatePoolSize();
}, this.dynamicPoolConfig.evaluationInterval);
}
/**
* Evaluate and adjust pool size based on usage
*/
private async evaluatePoolSize(): Promise<void> {
if (!this.dynamicPoolConfig || !this.client) {
return;
}
const { minSize, maxSize, scaleUpThreshold, scaleDownThreshold } = this.dynamicPoolConfig;
const currentSize = this.metrics.totalConnections;
const utilization = ((this.metrics.activeConnections / currentSize) * 100);
this.logger.debug('Pool utilization', {
utilization: `${utilization.toFixed(1)}%`,
active: this.metrics.activeConnections,
total: currentSize,
});
// Scale up if utilization is high
if (utilization > scaleUpThreshold && currentSize < maxSize) {
const newSize = Math.min(currentSize + this.dynamicPoolConfig.scaleUpIncrement, maxSize);
await this.resizePool(newSize);
this.logger.info('Scaling up connection pool', { from: currentSize, to: newSize, utilization });
}
// Scale down if utilization is low
else if (utilization < scaleDownThreshold && currentSize > minSize) {
const newSize = Math.max(currentSize - this.dynamicPoolConfig.scaleDownIncrement, minSize);
await this.resizePool(newSize);
this.logger.info('Scaling down connection pool', { from: currentSize, to: newSize, utilization });
}
}
/**
* Resize the connection pool
* Note: MongoDB driver doesn't support dynamic resizing, this would require reconnection
*/
private async resizePool(newSize: number): Promise<void> {
// MongoDB doesn't support dynamic pool resizing
// This is a placeholder for future implementation
this.logger.warn('Dynamic pool resizing not yet implemented for MongoDB', { requestedSize: newSize });
// Update metrics to reflect desired state
this.metrics.totalConnections = newSize;
}
/**
* Enable pool warmup on connect
*/
async warmupPool(): Promise<void> {
if (!this.client || !this.isConnected) {
throw new Error('Client not connected');
}
const minSize = this.config.poolSettings?.minPoolSize || 1;
const promises: Promise<void>[] = [];
// Create minimum connections by running parallel pings
for (let i = 0; i < minSize; i++) {
promises.push(
this.client.db(this.defaultDatabase).admin().ping()
.then(() => {
this.logger.debug(`Warmed up connection ${i + 1}/${minSize}`);
})
.catch(error => {
this.logger.warn(`Failed to warm up connection ${i + 1}`, { error });
})
);
}
await Promise.allSettled(promises);
this.logger.info('Connection pool warmup complete', { connections: minSize });
}
}