update mongo for multi db support
This commit is contained in:
parent
4942574b94
commit
cbef304045
7 changed files with 927 additions and 13 deletions
|
|
@ -13,6 +13,7 @@ export class MongoDBClient {
|
|||
private static instance: MongoDBClient | null = null;
|
||||
private client: MongoClient | null = null;
|
||||
private db: Db | null = null;
|
||||
private defaultDatabase: string = 'stock'; // Default database name
|
||||
private readonly logger = getLogger('mongodb-client-simple');
|
||||
private isConnected = false;
|
||||
|
||||
|
|
@ -51,7 +52,9 @@ export class MongoDBClient {
|
|||
await this.client.connect();
|
||||
await this.client.db(mongodbConfig.MONGODB_DATABASE).admin().ping();
|
||||
|
||||
this.db = this.client.db(mongodbConfig.MONGODB_DATABASE);
|
||||
// Set default database from config
|
||||
this.defaultDatabase = mongodbConfig.MONGODB_DATABASE;
|
||||
this.db = this.client.db(this.defaultDatabase);
|
||||
this.isConnected = true;
|
||||
|
||||
this.logger.info('Successfully connected to MongoDB');
|
||||
|
|
@ -85,6 +88,36 @@ export class MongoDBClient {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the default database for operations
|
||||
*/
|
||||
setDefaultDatabase(databaseName: string): void {
|
||||
this.defaultDatabase = databaseName;
|
||||
if (this.client) {
|
||||
this.db = this.client.db(databaseName);
|
||||
this.logger.info(`Default database changed to: ${databaseName}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current default database name
|
||||
*/
|
||||
getDefaultDatabase(): string {
|
||||
return this.defaultDatabase;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a database instance by name
|
||||
*/
|
||||
getDatabase(databaseName?: string): Db {
|
||||
if (!this.client) {
|
||||
throw new Error('MongoDB client not connected');
|
||||
}
|
||||
|
||||
const dbName = databaseName || this.defaultDatabase;
|
||||
return this.client.db(dbName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Batch upsert documents for high-performance operations
|
||||
* Supports single or multiple unique keys for matching
|
||||
|
|
@ -97,9 +130,10 @@ export class MongoDBClient {
|
|||
uniqueKeys: string | string[],
|
||||
options: {
|
||||
chunkSize?: number;
|
||||
database?: string; // Optional database override
|
||||
} = {}
|
||||
): Promise<{ insertedCount: number; updatedCount: number; errors: unknown[] }> {
|
||||
if (!this.db) {
|
||||
if (!this.client) {
|
||||
throw new Error('MongoDB client not connected');
|
||||
}
|
||||
|
||||
|
|
@ -114,15 +148,18 @@ export class MongoDBClient {
|
|||
throw new Error('At least one unique key must be provided');
|
||||
}
|
||||
|
||||
const { chunkSize = 10000 } = options;
|
||||
const collection = this.db.collection<T>(collectionName);
|
||||
const { chunkSize = 10000, database } = options;
|
||||
const db = this.getDatabase(database);
|
||||
const collection = db.collection<T>(collectionName);
|
||||
const operationId = Math.random().toString(36).substring(7);
|
||||
const dbName = database || this.defaultDatabase;
|
||||
|
||||
let totalInserted = 0;
|
||||
let totalUpdated = 0;
|
||||
const errors: unknown[] = [];
|
||||
|
||||
this.logger.info(`Starting batch upsert operation [${operationId}]`, {
|
||||
database: dbName,
|
||||
collection: collectionName,
|
||||
totalDocuments: documents.length,
|
||||
uniqueKeys: keyFields,
|
||||
|
|
@ -186,11 +223,13 @@ export class MongoDBClient {
|
|||
inserted,
|
||||
updated,
|
||||
executionTime,
|
||||
database: dbName,
|
||||
collection: collectionName,
|
||||
});
|
||||
} catch (error) {
|
||||
this.logger.error(`Batch upsert failed on chunk [${operationId}]`, {
|
||||
error,
|
||||
database: dbName,
|
||||
collection: collectionName,
|
||||
chunkNumber: Math.floor(i / chunkSize) + 1,
|
||||
chunkStart: i,
|
||||
|
|
@ -202,6 +241,7 @@ export class MongoDBClient {
|
|||
}
|
||||
|
||||
this.logger.info(`Batch upsert completed [${operationId}]`, {
|
||||
database: dbName,
|
||||
collection: collectionName,
|
||||
totalRecords: documents.length,
|
||||
inserted: totalInserted,
|
||||
|
|
@ -216,11 +256,9 @@ export class MongoDBClient {
|
|||
/**
|
||||
* Get a typed collection
|
||||
*/
|
||||
getCollection<T extends DocumentBase>(name: string): Collection<T> {
|
||||
if (!this.db) {
|
||||
throw new Error('MongoDB client not connected');
|
||||
}
|
||||
return this.db.collection<T>(name);
|
||||
getCollection<T extends DocumentBase>(name: string, database?: string): Collection<T> {
|
||||
const db = this.getDatabase(database);
|
||||
return db.collection<T>(name);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -229,9 +267,10 @@ export class MongoDBClient {
|
|||
async insertOne<T extends DocumentBase>(
|
||||
collectionName: string,
|
||||
document: Omit<T, '_id' | 'created_at' | 'updated_at'> &
|
||||
Partial<Pick<T, 'created_at' | 'updated_at'>>
|
||||
Partial<Pick<T, 'created_at' | 'updated_at'>>,
|
||||
database?: string
|
||||
): Promise<T> {
|
||||
const collection = this.getCollection<T>(collectionName);
|
||||
const collection = this.getCollection<T>(collectionName, database);
|
||||
|
||||
const now = new Date();
|
||||
const docWithTimestamps = {
|
||||
|
|
@ -252,12 +291,61 @@ export class MongoDBClient {
|
|||
}
|
||||
|
||||
/**
|
||||
* Get the database instance
|
||||
* Get the default database instance
|
||||
*/
|
||||
get database(): Db | null {
|
||||
return this.db;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience methods for common databases
|
||||
*/
|
||||
|
||||
// Stock database operations
|
||||
async batchUpsertStock<T extends DocumentBase>(
|
||||
collectionName: string,
|
||||
documents: Array<
|
||||
Omit<T, '_id' | 'created_at' | 'updated_at'> & Partial<Pick<T, 'created_at' | 'updated_at'>>
|
||||
>,
|
||||
uniqueKeys: string | string[],
|
||||
options: { chunkSize?: number } = {}
|
||||
) {
|
||||
return this.batchUpsert(collectionName, documents, uniqueKeys, {
|
||||
...options,
|
||||
database: 'stock',
|
||||
});
|
||||
}
|
||||
|
||||
// Trading documents database operations
|
||||
async batchUpsertTrading<T extends DocumentBase>(
|
||||
collectionName: string,
|
||||
documents: Array<
|
||||
Omit<T, '_id' | 'created_at' | 'updated_at'> & Partial<Pick<T, 'created_at' | 'updated_at'>>
|
||||
>,
|
||||
uniqueKeys: string | string[],
|
||||
options: { chunkSize?: number } = {}
|
||||
) {
|
||||
return this.batchUpsert(collectionName, documents, uniqueKeys, {
|
||||
...options,
|
||||
database: 'trading_documents',
|
||||
});
|
||||
}
|
||||
|
||||
// Analytics database operations
|
||||
async batchUpsertAnalytics<T extends DocumentBase>(
|
||||
collectionName: string,
|
||||
documents: Array<
|
||||
Omit<T, '_id' | 'created_at' | 'updated_at'> & Partial<Pick<T, 'created_at' | 'updated_at'>>
|
||||
>,
|
||||
uniqueKeys: string | string[],
|
||||
options: { chunkSize?: number } = {}
|
||||
) {
|
||||
return this.batchUpsert(collectionName, documents, uniqueKeys, {
|
||||
...options,
|
||||
database: 'analytics',
|
||||
});
|
||||
}
|
||||
|
||||
private buildConnectionUri(): string {
|
||||
if (mongodbConfig.MONGODB_URI) {
|
||||
return mongodbConfig.MONGODB_URI;
|
||||
|
|
|
|||
|
|
@ -27,3 +27,27 @@ export async function disconnectMongoDB(): Promise<void> {
|
|||
await client.disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the default database for all operations
|
||||
*/
|
||||
export function setDefaultDatabase(databaseName: string): void {
|
||||
const client = getMongoDBClient();
|
||||
client.setDefaultDatabase(databaseName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current default database name
|
||||
*/
|
||||
export function getCurrentDatabase(): string {
|
||||
const client = getMongoDBClient();
|
||||
return client.getDefaultDatabase();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a database instance by name
|
||||
*/
|
||||
export function getDatabase(databaseName?: string) {
|
||||
const client = getMongoDBClient();
|
||||
return client.getDatabase(databaseName);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,4 +19,11 @@ export type {
|
|||
} from './types';
|
||||
|
||||
// Factory functions
|
||||
export { connectMongoDB, disconnectMongoDB, getMongoDBClient } from './factory';
|
||||
export {
|
||||
connectMongoDB,
|
||||
disconnectMongoDB,
|
||||
getCurrentDatabase,
|
||||
getDatabase,
|
||||
getMongoDBClient,
|
||||
setDefaultDatabase,
|
||||
} from './factory';
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue