work on ib and cleanup

This commit is contained in:
Boki 2025-06-14 09:17:48 -04:00
parent a20a11c1aa
commit d686a72591
41 changed files with 601 additions and 2793 deletions

View file

@ -1,110 +1,68 @@
import {
Collection,
Db,
Document,
MongoClient,
MongoClientOptions,
OptionalUnlessRequiredId,
WithId,
} from 'mongodb';
import * as yup from 'yup';
import { Collection, Db, MongoClient, OptionalUnlessRequiredId } from 'mongodb';
import { mongodbConfig } from '@stock-bot/config';
import { getLogger } from '@stock-bot/logger';
import { MongoDBHealthMonitor } from './health';
import { schemaMap } from './schemas';
import type {
AnalystReport,
CollectionNames,
DocumentBase,
EarningsTranscript,
MongoDBClientConfig,
MongoDBConnectionOptions,
NewsArticle,
RawDocument,
SecFiling,
SentimentData,
} from './types';
import type { DocumentBase } from './types';
/**
* MongoDB Client for Stock Bot
* Simplified MongoDB Client for Stock Bot Data Service
*
* Provides type-safe access to MongoDB collections with built-in
* health monitoring, connection pooling, and schema validation.
* A singleton MongoDB client focused solely on batch upsert operations
* with minimal configuration and no health monitoring complexity.
*/
export class MongoDBClient {
private static instance: MongoDBClient | null = null;
private client: MongoClient | null = null;
private db: Db | null = null;
private readonly config: MongoDBClientConfig;
private readonly options: MongoDBConnectionOptions;
private readonly logger: ReturnType<typeof getLogger>;
private readonly healthMonitor: MongoDBHealthMonitor;
private readonly logger = getLogger('mongodb-client-simple');
private isConnected = false;
constructor(config?: Partial<MongoDBClientConfig>, options?: MongoDBConnectionOptions) {
this.config = this.buildConfig(config);
this.options = {
retryAttempts: 3,
retryDelay: 1000,
healthCheckInterval: 30000,
...options,
};
private constructor() {}
this.logger = getLogger('mongodb-client');
this.healthMonitor = new MongoDBHealthMonitor(this);
/**
* Get singleton instance
*/
static getInstance(): MongoDBClient {
if (!MongoDBClient.instance) {
MongoDBClient.instance = new MongoDBClient();
}
return MongoDBClient.instance;
}
/**
* Connect to MongoDB
* Connect to MongoDB with simple configuration
*/
async connect(): Promise<void> {
if (this.isConnected && this.client) {
return;
}
const uri = this.buildConnectionUri();
const clientOptions = this.buildClientOptions();
try {
const uri = this.buildConnectionUri();
this.logger.info('Connecting to MongoDB...');
let lastError: Error | null = null;
this.client = new MongoClient(uri, {
maxPoolSize: 10,
minPoolSize: 1,
connectTimeoutMS: 10000,
socketTimeoutMS: 30000,
serverSelectionTimeoutMS: 5000,
});
for (let attempt = 1; attempt <= this.options.retryAttempts!; attempt++) {
try {
this.logger.info(
`Connecting to MongoDB (attempt ${attempt}/${this.options.retryAttempts})...`
);
await this.client.connect();
await this.client.db(mongodbConfig.MONGODB_DATABASE).admin().ping();
this.client = new MongoClient(uri, clientOptions);
await this.client.connect();
this.db = this.client.db(mongodbConfig.MONGODB_DATABASE);
this.isConnected = true;
// Test the connection
await this.client.db(this.config.database).admin().ping();
this.db = this.client.db(this.config.database);
this.isConnected = true;
this.logger.info('Successfully connected to MongoDB');
// Start health monitoring
this.healthMonitor.start();
return;
} catch (error) {
lastError = error as Error;
this.logger.error(`MongoDB connection attempt ${attempt} failed:`, error);
if (this.client) {
await this.client.close();
this.client = null;
}
if (attempt < this.options.retryAttempts!) {
await this.delay(this.options.retryDelay! * attempt);
}
this.logger.info('Successfully connected to MongoDB');
} catch (error) {
this.logger.error('MongoDB connection failed:', error);
if (this.client) {
await this.client.close();
this.client = null;
}
throw error;
}
throw new Error(
`Failed to connect to MongoDB after ${this.options.retryAttempts} attempts: ${lastError?.message}`
);
}
/**
@ -116,7 +74,6 @@ export class MongoDBClient {
}
try {
this.healthMonitor.stop();
await this.client.close();
this.isConnected = false;
this.client = null;
@ -128,10 +85,138 @@ export class MongoDBClient {
}
}
/**
* Batch upsert documents for high-performance operations
* Supports single or multiple unique keys for matching
*/
async batchUpsert<T extends DocumentBase>(
collectionName: string,
documents: Array<
Omit<T, '_id' | 'created_at' | 'updated_at'> & Partial<Pick<T, 'created_at' | 'updated_at'>>
>,
uniqueKeys: string | string[],
options: {
chunkSize?: number;
} = {}
): Promise<{ insertedCount: number; updatedCount: number; errors: unknown[] }> {
if (!this.db) {
throw new Error('MongoDB client not connected');
}
if (documents.length === 0) {
return { insertedCount: 0, updatedCount: 0, errors: [] };
}
// Normalize uniqueKeys to array
const keyFields = Array.isArray(uniqueKeys) ? uniqueKeys : [uniqueKeys];
if (keyFields.length === 0) {
throw new Error('At least one unique key must be provided');
}
const { chunkSize = 10000 } = options;
const collection = this.db.collection<T>(collectionName);
const operationId = Math.random().toString(36).substring(7);
let totalInserted = 0;
let totalUpdated = 0;
const errors: unknown[] = [];
this.logger.info(`Starting batch upsert operation [${operationId}]`, {
collection: collectionName,
totalDocuments: documents.length,
uniqueKeys: keyFields,
chunkSize
});
// Process documents in chunks to avoid memory issues
for (let i = 0; i < documents.length; i += chunkSize) {
const chunk = documents.slice(i, i + chunkSize);
try {
const startTime = Date.now();
// Prepare bulk operations
const bulkOps = chunk.map(doc => {
const now = new Date();
const docWithTimestamps = {
...doc,
created_at: doc.created_at || now,
updated_at: now,
};
// Create filter using multiple unique keys
const filter: Record<string, unknown> = {};
keyFields.forEach(key => {
const value = (doc as Record<string, unknown>)[key];
if (value === undefined || value === null) {
throw new Error(`Document missing required unique key: ${key}`);
}
filter[key] = value;
});
// Remove created_at from $set to avoid conflict with $setOnInsert
const { created_at, ...updateFields } = docWithTimestamps;
return {
updateOne: {
filter,
update: {
$set: updateFields,
$setOnInsert: { created_at },
},
upsert: true,
},
};
});
// Execute bulk operation with type assertion to handle complex MongoDB types
const result = await collection.bulkWrite(bulkOps as never, { ordered: false });
const executionTime = Date.now() - startTime;
const inserted = result.upsertedCount;
const updated = result.modifiedCount;
totalInserted += inserted;
totalUpdated += updated;
this.logger.debug(`Batch upsert chunk processed [${operationId}]`, {
chunkNumber: Math.floor(i / chunkSize) + 1,
chunkSize: chunk.length,
inserted,
updated,
executionTime,
collection: collectionName,
});
} catch (error) {
this.logger.error(`Batch upsert failed on chunk [${operationId}]`, {
error,
collection: collectionName,
chunkNumber: Math.floor(i / chunkSize) + 1,
chunkStart: i,
chunkSize: chunk.length,
uniqueKeys: keyFields,
});
errors.push(error);
}
}
this.logger.info(`Batch upsert completed [${operationId}]`, {
collection: collectionName,
totalRecords: documents.length,
inserted: totalInserted,
updated: totalUpdated,
errors: errors.length,
uniqueKeys: keyFields,
});
return { insertedCount: totalInserted, updatedCount: totalUpdated, errors };
}
/**
* Get a typed collection
*/
getCollection<T extends DocumentBase>(name: CollectionNames): Collection<T> {
getCollection<T extends DocumentBase>(name: string): Collection<T> {
if (!this.db) {
throw new Error('MongoDB client not connected');
}
@ -139,162 +224,26 @@ export class MongoDBClient {
}
/**
* Insert a document with validation
* Simple insert operation
*/
async insertOne<T extends DocumentBase>(
collectionName: CollectionNames,
collectionName: string,
document: Omit<T, '_id' | 'created_at' | 'updated_at'> &
Partial<Pick<T, 'created_at' | 'updated_at'>>
): Promise<T> {
const collection = this.getCollection<T>(collectionName);
// Add timestamps
const now = new Date();
const docWithTimestamps = {
...document,
created_at: document.created_at || now,
updated_at: now,
} as T; // Validate document if schema exists
if (collectionName in schemaMap) {
try {
(schemaMap as any)[collectionName].validateSync(docWithTimestamps);
} catch (error) {
if (error instanceof yup.ValidationError) {
this.logger.error(`Document validation failed for ${collectionName}:`, error.errors);
throw new Error(`Document validation failed: ${error.errors?.map(e => e).join(', ')}`);
}
throw error;
}
}
} as T;
const result = await collection.insertOne(docWithTimestamps as OptionalUnlessRequiredId<T>);
return { ...docWithTimestamps, _id: result.insertedId } as T;
}
/**
* Update a document with validation
*/
async updateOne<T extends DocumentBase>(
collectionName: CollectionNames,
filter: any,
update: Partial<T>
): Promise<boolean> {
const collection = this.getCollection<T>(collectionName);
// Add updated timestamp
const updateWithTimestamp = {
...update,
updated_at: new Date(),
};
const result = await collection.updateOne(filter, { $set: updateWithTimestamp });
return result.modifiedCount > 0;
}
/**
* Find documents with optional validation
*/
async find<T extends DocumentBase>(
collectionName: CollectionNames,
filter: any = {},
options: any = {}
): Promise<T[]> {
const collection = this.getCollection<T>(collectionName);
return (await collection.find(filter, options).toArray()) as T[];
}
/**
* Find one document
*/
async findOne<T extends DocumentBase>(
collectionName: CollectionNames,
filter: any
): Promise<T | null> {
const collection = this.getCollection<T>(collectionName);
return (await collection.findOne(filter)) as T | null;
}
/**
* Aggregate with type safety
*/
async aggregate<T extends DocumentBase>(
collectionName: CollectionNames,
pipeline: any[]
): Promise<T[]> {
const collection = this.getCollection<T>(collectionName);
return await collection.aggregate<T>(pipeline).toArray();
}
/**
* Count documents
*/
async countDocuments(collectionName: CollectionNames, filter: any = {}): Promise<number> {
const collection = this.getCollection(collectionName);
return await collection.countDocuments(filter);
}
/**
* Create indexes for better performance
*/
async createIndexes(): Promise<void> {
if (!this.db) {
throw new Error('MongoDB client not connected');
}
try {
// Sentiment data indexes
await this.db
.collection('sentiment_data')
.createIndexes([
{ key: { symbol: 1, timestamp: -1 } },
{ key: { sentiment_label: 1 } },
{ key: { source_type: 1 } },
{ key: { created_at: -1 } },
]);
// News articles indexes
await this.db
.collection('news_articles')
.createIndexes([
{ key: { symbols: 1, published_date: -1 } },
{ key: { publication: 1 } },
{ key: { categories: 1 } },
{ key: { created_at: -1 } },
]);
// SEC filings indexes
await this.db
.collection('sec_filings')
.createIndexes([
{ key: { symbols: 1, filing_date: -1 } },
{ key: { filing_type: 1 } },
{ key: { cik: 1 } },
{ key: { created_at: -1 } },
]); // Raw documents indexes
await this.db.collection('raw_documents').createIndex({ content_hash: 1 }, { unique: true });
await this.db
.collection('raw_documents')
.createIndexes([
{ key: { processing_status: 1 } },
{ key: { document_type: 1 } },
{ key: { created_at: -1 } },
]);
this.logger.info('MongoDB indexes created successfully');
} catch (error) {
this.logger.error('Error creating MongoDB indexes:', error);
throw error;
}
}
/**
* Get database statistics
*/
async getStats(): Promise<any> {
if (!this.db) {
throw new Error('MongoDB client not connected');
}
return await this.db.stats();
}
/**
* Check if client is connected
*/
@ -302,13 +251,6 @@ export class MongoDBClient {
return this.isConnected && !!this.client;
}
/**
* Get the underlying MongoDB client
*/
get mongoClient(): MongoClient | null {
return this.client;
}
/**
* Get the database instance
*/
@ -316,81 +258,24 @@ export class MongoDBClient {
return this.db;
}
private buildConfig(config?: Partial<MongoDBClientConfig>): MongoDBClientConfig {
return {
host: config?.host || mongodbConfig.MONGODB_HOST,
port: config?.port || mongodbConfig.MONGODB_PORT,
database: config?.database || mongodbConfig.MONGODB_DATABASE,
username: config?.username || mongodbConfig.MONGODB_USERNAME,
password: config?.password || mongodbConfig.MONGODB_PASSWORD,
authSource: config?.authSource || mongodbConfig.MONGODB_AUTH_SOURCE,
uri: config?.uri || mongodbConfig.MONGODB_URI,
poolSettings: {
maxPoolSize: mongodbConfig.MONGODB_MAX_POOL_SIZE,
minPoolSize: mongodbConfig.MONGODB_MIN_POOL_SIZE,
maxIdleTime: mongodbConfig.MONGODB_MAX_IDLE_TIME,
...config?.poolSettings,
},
timeouts: {
connectTimeout: mongodbConfig.MONGODB_CONNECT_TIMEOUT,
socketTimeout: mongodbConfig.MONGODB_SOCKET_TIMEOUT,
serverSelectionTimeout: mongodbConfig.MONGODB_SERVER_SELECTION_TIMEOUT,
...config?.timeouts,
},
tls: {
enabled: mongodbConfig.MONGODB_TLS,
insecure: mongodbConfig.MONGODB_TLS_INSECURE,
caFile: mongodbConfig.MONGODB_TLS_CA_FILE,
...config?.tls,
},
options: {
retryWrites: mongodbConfig.MONGODB_RETRY_WRITES,
journal: mongodbConfig.MONGODB_JOURNAL,
readPreference: mongodbConfig.MONGODB_READ_PREFERENCE as any,
writeConcern: mongodbConfig.MONGODB_WRITE_CONCERN,
...config?.options,
},
};
}
private buildConnectionUri(): string {
if (this.config.uri) {
return this.config.uri;
if (mongodbConfig.MONGODB_URI) {
return mongodbConfig.MONGODB_URI;
}
const { host, port, username, password, database, authSource } = this.config;
const {
MONGODB_HOST: host,
MONGODB_PORT: port,
MONGODB_USERNAME: username,
MONGODB_PASSWORD: password,
MONGODB_DATABASE: database,
MONGODB_AUTH_SOURCE: authSource,
} = mongodbConfig;
// Build URI components
const auth = username && password ? `${username}:${password}@` : '';
const authDb = authSource ? `?authSource=${authSource}` : '';
const authParam = authSource && username ? `?authSource=${authSource}` : '';
return `mongodb://${auth}${host}:${port}/${database}${authDb}`;
}
private buildClientOptions(): MongoClientOptions {
return {
maxPoolSize: this.config.poolSettings?.maxPoolSize,
minPoolSize: this.config.poolSettings?.minPoolSize,
maxIdleTimeMS: this.config.poolSettings?.maxIdleTime,
connectTimeoutMS: this.config.timeouts?.connectTimeout,
socketTimeoutMS: this.config.timeouts?.socketTimeout,
serverSelectionTimeoutMS: this.config.timeouts?.serverSelectionTimeout,
retryWrites: this.config.options?.retryWrites,
journal: this.config.options?.journal,
readPreference: this.config.options?.readPreference,
writeConcern: this.config.options?.writeConcern
? {
w:
this.config.options.writeConcern === 'majority'
? ('majority' as const)
: parseInt(this.config.options.writeConcern, 10) || 1,
}
: undefined,
tls: this.config.tls?.enabled,
tlsInsecure: this.config.tls?.insecure,
tlsCAFile: this.config.tls?.caFile,
};
}
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
return `mongodb://${auth}${host}:${port}/${database}${authParam}`;
}
}