added new di with connection pool and rebuild of the database/cache services

This commit is contained in:
Boki 2025-06-21 14:54:51 -04:00
parent be6afef832
commit 09d907a10c
26 changed files with 4844 additions and 205 deletions

View file

@ -1,6 +1,6 @@
import { getLogger } from '@stock-bot/logger';
import { Collection, Db, MongoClient, OptionalUnlessRequiredId } from 'mongodb';
import type { DocumentBase, MongoDBClientConfig } from './types';
import type { DocumentBase, MongoDBClientConfig, PoolMetrics, ConnectionEvents, DynamicPoolConfig } from './types';
/**
* MongoDB Client for Stock Bot Data Service
@ -15,10 +15,23 @@ export class MongoDBClient {
private defaultDatabase: string;
private readonly logger = getLogger('mongodb-client');
private isConnected = false;
private readonly metrics: PoolMetrics;
private readonly events?: ConnectionEvents;
private dynamicPoolConfig?: DynamicPoolConfig;
private poolMonitorInterval?: Timer;
constructor(config: MongoDBClientConfig) {
constructor(config: MongoDBClientConfig, events?: ConnectionEvents) {
this.config = config;
this.defaultDatabase = config.database || 'stock';
this.events = events;
this.metrics = {
totalConnections: 0,
activeConnections: 0,
idleConnections: 0,
waitingRequests: 0,
errors: 0,
created: new Date(),
};
}
/**
@ -48,8 +61,38 @@ export class MongoDBClient {
this.db = this.client.db(this.defaultDatabase);
this.isConnected = true;
this.logger.info('Successfully connected to MongoDB');
// Update metrics
this.metrics.totalConnections = this.config.poolSettings?.maxPoolSize || 10;
this.metrics.idleConnections = this.metrics.totalConnections;
// Fire connection event
if (this.events?.onConnect) {
await Promise.resolve(this.events.onConnect());
}
// Fire pool created event
if (this.events?.onPoolCreated) {
await Promise.resolve(this.events.onPoolCreated());
}
this.logger.info('Successfully connected to MongoDB', {
database: this.defaultDatabase,
poolSize: this.metrics.totalConnections,
});
// Start pool monitoring if dynamic sizing is enabled
if (this.dynamicPoolConfig?.enabled) {
this.startPoolMonitoring();
}
} catch (error) {
this.metrics.errors++;
this.metrics.lastError = error instanceof Error ? error.message : 'Unknown error';
// Fire error event
if (this.events?.onError) {
await Promise.resolve(this.events.onError(error as Error));
}
this.logger.error('MongoDB connection failed:', error);
if (this.client) {
await this.client.close();
@ -68,10 +111,22 @@ export class MongoDBClient {
}
try {
// Stop pool monitoring
if (this.poolMonitorInterval) {
clearInterval(this.poolMonitorInterval);
this.poolMonitorInterval = undefined;
}
await this.client.close();
this.isConnected = false;
this.client = null;
this.db = null;
// Fire disconnect event
if (this.events?.onDisconnect) {
await Promise.resolve(this.events.onDisconnect());
}
this.logger.info('Disconnected from MongoDB');
} catch (error) {
this.logger.error('Error disconnecting from MongoDB:', error);
@ -350,4 +405,116 @@ export class MongoDBClient {
return `mongodb://${auth}${host}:${port}/${database}${authParam}`;
}
/**
* Get current pool metrics
*/
getPoolMetrics(): PoolMetrics {
// Update last used timestamp
this.metrics.lastUsed = new Date();
// Note: MongoDB driver doesn't expose detailed pool metrics
// These are estimates based on configuration
return { ...this.metrics };
}
/**
* Set dynamic pool configuration
*/
setDynamicPoolConfig(config: DynamicPoolConfig): void {
this.dynamicPoolConfig = config;
if (config.enabled && this.isConnected && !this.poolMonitorInterval) {
this.startPoolMonitoring();
} else if (!config.enabled && this.poolMonitorInterval) {
clearInterval(this.poolMonitorInterval);
this.poolMonitorInterval = undefined;
}
}
/**
* Start monitoring pool and adjust size dynamically
*/
private startPoolMonitoring(): void {
if (!this.dynamicPoolConfig || this.poolMonitorInterval) {
return;
}
this.poolMonitorInterval = setInterval(() => {
this.evaluatePoolSize();
}, this.dynamicPoolConfig.evaluationInterval);
}
/**
* Evaluate and adjust pool size based on usage
*/
private async evaluatePoolSize(): Promise<void> {
if (!this.dynamicPoolConfig || !this.client) {
return;
}
const { minSize, maxSize, scaleUpThreshold, scaleDownThreshold } = this.dynamicPoolConfig;
const currentSize = this.metrics.totalConnections;
const utilization = ((this.metrics.activeConnections / currentSize) * 100);
this.logger.debug('Pool utilization', {
utilization: `${utilization.toFixed(1)}%`,
active: this.metrics.activeConnections,
total: currentSize,
});
// Scale up if utilization is high
if (utilization > scaleUpThreshold && currentSize < maxSize) {
const newSize = Math.min(currentSize + this.dynamicPoolConfig.scaleUpIncrement, maxSize);
await this.resizePool(newSize);
this.logger.info('Scaling up connection pool', { from: currentSize, to: newSize, utilization });
}
// Scale down if utilization is low
else if (utilization < scaleDownThreshold && currentSize > minSize) {
const newSize = Math.max(currentSize - this.dynamicPoolConfig.scaleDownIncrement, minSize);
await this.resizePool(newSize);
this.logger.info('Scaling down connection pool', { from: currentSize, to: newSize, utilization });
}
}
/**
* Resize the connection pool
* Note: MongoDB driver doesn't support dynamic resizing, this would require reconnection
*/
private async resizePool(newSize: number): Promise<void> {
// MongoDB doesn't support dynamic pool resizing
// This is a placeholder for future implementation
this.logger.warn('Dynamic pool resizing not yet implemented for MongoDB', { requestedSize: newSize });
// Update metrics to reflect desired state
this.metrics.totalConnections = newSize;
}
/**
* Enable pool warmup on connect
*/
async warmupPool(): Promise<void> {
if (!this.client || !this.isConnected) {
throw new Error('Client not connected');
}
const minSize = this.config.poolSettings?.minPoolSize || 1;
const promises: Promise<void>[] = [];
// Create minimum connections by running parallel pings
for (let i = 0; i < minSize; i++) {
promises.push(
this.client.db(this.defaultDatabase).admin().ping()
.then(() => {
this.logger.debug(`Warmed up connection ${i + 1}/${minSize}`);
})
.catch(error => {
this.logger.warn(`Failed to warm up connection ${i + 1}`, { error });
})
);
}
await Promise.allSettled(promises);
this.logger.info('Connection pool warmup complete', { connections: minSize });
}
}

View file

@ -1,20 +1,21 @@
import { MongoDBClient } from './client';
import type { MongoDBClientConfig } from './types';
import type { MongoDBClientConfig, ConnectionEvents } from './types';
/**
* Factory function to create a MongoDB client instance
*/
export function createMongoDBClient(config: MongoDBClientConfig): MongoDBClient {
return new MongoDBClient(config);
export function createMongoDBClient(config: MongoDBClientConfig, events?: ConnectionEvents): MongoDBClient {
return new MongoDBClient(config, events);
}
/**
* Create and connect a MongoDB client
*/
export async function createAndConnectMongoDBClient(
config: MongoDBClientConfig
config: MongoDBClientConfig,
events?: ConnectionEvents
): Promise<MongoDBClient> {
const client = createMongoDBClient(config);
const client = createMongoDBClient(config, events);
await client.connect();
return client;
}

View file

@ -20,6 +20,9 @@ export type {
RawDocument,
SecFiling,
SentimentData,
PoolMetrics,
ConnectionEvents,
DynamicPoolConfig,
} from './types';
// Factory functions
@ -28,10 +31,4 @@ export {
createAndConnectMongoDBClient,
} from './factory';
// Singleton instance
export {
getMongoDBClient,
connectMongoDB,
getDatabase,
disconnectMongoDB,
} from './singleton';
// Singleton pattern removed - use factory functions instead

View file

@ -1,82 +0,0 @@
import { MongoDBClient } from './client';
import type { MongoDBClientConfig } from './types';
import type { Db } from 'mongodb';
/**
* Singleton MongoDB client instance
* Provides global access to a single MongoDB connection
*/
let instance: MongoDBClient | null = null;
let initPromise: Promise<MongoDBClient> | null = null;
/**
* Initialize the singleton MongoDB client
*/
export async function connectMongoDB(config?: MongoDBClientConfig): Promise<MongoDBClient> {
if (instance) {
return instance;
}
if (initPromise) {
return initPromise;
}
if (!config) {
throw new Error('MongoDB client not initialized. Call connectMongoDB(config) first.');
}
initPromise = (async () => {
const client = new MongoDBClient(config);
await client.connect();
instance = client;
return client;
})();
try {
return await initPromise;
} catch (error) {
// Reset promise on error so next call can retry
initPromise = null;
throw error;
}
}
/**
* Get the singleton MongoDB client instance
* @throws Error if not initialized
*/
export function getMongoDBClient(): MongoDBClient {
if (!instance) {
throw new Error('MongoDB client not initialized. Call connectMongoDB(config) first.');
}
return instance;
}
/**
* Get the MongoDB database instance
* @throws Error if not initialized
*/
export function getDatabase(): Db {
if (!instance) {
throw new Error('MongoDB client not initialized. Call connectMongoDB(config) first.');
}
return instance.getDatabase();
}
/**
* Check if the MongoDB client is initialized
*/
export function isInitialized(): boolean {
return instance !== null && instance.connected;
}
/**
* Disconnect and reset the singleton instance
*/
export async function disconnectMongoDB(): Promise<void> {
if (instance) {
await instance.disconnect();
instance = null;
}
initPromise = null;
}

View file

@ -43,6 +43,36 @@ export interface MongoDBConnectionOptions {
healthCheckInterval?: number;
}
export interface PoolMetrics {
totalConnections: number;
activeConnections: number;
idleConnections: number;
waitingRequests: number;
errors: number;
lastError?: string;
avgResponseTime?: number;
created: Date;
lastUsed?: Date;
}
export interface ConnectionEvents {
onConnect?: () => void | Promise<void>;
onDisconnect?: () => void | Promise<void>;
onError?: (error: Error) => void | Promise<void>;
onPoolCreated?: () => void | Promise<void>;
}
export interface DynamicPoolConfig {
enabled: boolean;
minSize: number;
maxSize: number;
scaleUpThreshold: number; // % of pool in use (0-100)
scaleDownThreshold: number; // % of pool idle (0-100)
scaleUpIncrement: number; // connections to add
scaleDownIncrement: number; // connections to remove
evaluationInterval: number; // ms between checks
}
/**
* Health Status Types
*/