libs working i think
This commit is contained in:
parent
dc4bd7b18e
commit
63baeaec70
16 changed files with 141 additions and 476 deletions
|
|
@ -1,8 +1,4 @@
|
|||
import { getLogger, type Logger } from '@stock-bot/logger';
|
||||
import { MongoDBClient, createMongoDBClient, type ConnectionEvents } from '@stock-bot/mongodb';
|
||||
import { PostgreSQLClient, createPostgreSQLClient } from '@stock-bot/postgres';
|
||||
import { createCache, type CacheProvider } from '@stock-bot/cache';
|
||||
import { QueueManager } from '@stock-bot/queue';
|
||||
import type {
|
||||
ConnectionFactory as IConnectionFactory,
|
||||
ConnectionPool,
|
||||
|
|
@ -22,9 +18,10 @@ export class ConnectionFactory implements IConnectionFactory {
|
|||
constructor(config: ConnectionFactoryConfig) {
|
||||
this.config = config;
|
||||
this.logger = getLogger(`connection-factory:${config.service}`);
|
||||
// Note: config is stored for future use and used in logger name
|
||||
}
|
||||
|
||||
async createMongoDB(poolConfig: MongoDBPoolConfig): Promise<ConnectionPool<MongoDBClient>> {
|
||||
async createMongoDB(poolConfig: MongoDBPoolConfig): Promise<ConnectionPool<any>> {
|
||||
const key = `mongodb:${poolConfig.name}`;
|
||||
|
||||
if (this.pools.has(key)) {
|
||||
|
|
@ -38,35 +35,30 @@ export class ConnectionFactory implements IConnectionFactory {
|
|||
});
|
||||
|
||||
try {
|
||||
const events: ConnectionEvents = {
|
||||
// Dynamic import to avoid circular dependency
|
||||
const { createMongoDBClient } = await import('@stock-bot/mongodb');
|
||||
|
||||
const events = {
|
||||
onConnect: () => {
|
||||
this.logger.debug('MongoDB connected', { pool: poolConfig.name });
|
||||
},
|
||||
onDisconnect: () => {
|
||||
this.logger.debug('MongoDB disconnected', { pool: poolConfig.name });
|
||||
},
|
||||
onError: (error) => {
|
||||
onError: (error: any) => {
|
||||
this.logger.error('MongoDB error', { pool: poolConfig.name, error });
|
||||
},
|
||||
};
|
||||
|
||||
const client = createMongoDBClient({
|
||||
...poolConfig.config,
|
||||
poolSettings: {
|
||||
maxPoolSize: poolConfig.maxConnections || poolConfig.poolSize || 10,
|
||||
minPoolSize: poolConfig.minConnections || 2,
|
||||
maxIdleTime: 30000,
|
||||
}
|
||||
}, events);
|
||||
const client = createMongoDBClient(poolConfig.config as any, events);
|
||||
|
||||
await client.connect();
|
||||
|
||||
// Warm up the pool
|
||||
if (poolConfig.minConnections) {
|
||||
await client.warmupPool();
|
||||
}
|
||||
|
||||
const pool: ConnectionPool<MongoDBClient> = {
|
||||
const pool: ConnectionPool<any> = {
|
||||
name: poolConfig.name,
|
||||
client,
|
||||
metrics: client.getPoolMetrics(),
|
||||
|
|
@ -92,7 +84,7 @@ export class ConnectionFactory implements IConnectionFactory {
|
|||
}
|
||||
}
|
||||
|
||||
async createPostgreSQL(poolConfig: PostgreSQLPoolConfig): Promise<ConnectionPool<PostgreSQLClient>> {
|
||||
async createPostgreSQL(poolConfig: PostgreSQLPoolConfig): Promise<ConnectionPool<any>> {
|
||||
const key = `postgres:${poolConfig.name}`;
|
||||
|
||||
if (this.pools.has(key)) {
|
||||
|
|
@ -106,35 +98,19 @@ export class ConnectionFactory implements IConnectionFactory {
|
|||
});
|
||||
|
||||
try {
|
||||
const events: ConnectionEvents = {
|
||||
onConnect: () => {
|
||||
this.logger.debug('PostgreSQL connected', { pool: poolConfig.name });
|
||||
},
|
||||
onDisconnect: () => {
|
||||
this.logger.debug('PostgreSQL disconnected', { pool: poolConfig.name });
|
||||
},
|
||||
onError: (error) => {
|
||||
this.logger.error('PostgreSQL error', { pool: poolConfig.name, error });
|
||||
},
|
||||
};
|
||||
|
||||
const client = createPostgreSQLClient({
|
||||
...poolConfig.config,
|
||||
poolSettings: {
|
||||
max: poolConfig.maxConnections || poolConfig.poolSize || 10,
|
||||
min: poolConfig.minConnections || 2,
|
||||
idleTimeoutMillis: poolConfig.idleTimeoutMillis || 30000,
|
||||
},
|
||||
}, undefined, events);
|
||||
// Dynamic import to avoid circular dependency
|
||||
const { createPostgreSQLClient } = await import('@stock-bot/postgres');
|
||||
|
||||
// Events will be handled by the client internally
|
||||
const client = createPostgreSQLClient(poolConfig.config as any);
|
||||
|
||||
await client.connect();
|
||||
|
||||
// Warm up the pool
|
||||
if (poolConfig.minConnections) {
|
||||
await client.warmupPool();
|
||||
}
|
||||
|
||||
const pool: ConnectionPool<PostgreSQLClient> = {
|
||||
const pool: ConnectionPool<any> = {
|
||||
name: poolConfig.name,
|
||||
client,
|
||||
metrics: client.getPoolMetrics(),
|
||||
|
|
@ -153,7 +129,7 @@ export class ConnectionFactory implements IConnectionFactory {
|
|||
}
|
||||
}
|
||||
|
||||
createCache(poolConfig: CachePoolConfig): ConnectionPool<CacheProvider> {
|
||||
createCache(poolConfig: CachePoolConfig): ConnectionPool<any> {
|
||||
const key = `cache:${poolConfig.name}`;
|
||||
|
||||
if (this.pools.has(key)) {
|
||||
|
|
@ -166,32 +142,16 @@ export class ConnectionFactory implements IConnectionFactory {
|
|||
});
|
||||
|
||||
try {
|
||||
const cache = createCache({
|
||||
...poolConfig.config,
|
||||
keyPrefix: `${this.config.service}:${poolConfig.name}:`,
|
||||
shared: false, // Each pool gets its own connection
|
||||
});
|
||||
|
||||
const pool: ConnectionPool<CacheProvider> = {
|
||||
name: poolConfig.name,
|
||||
client: cache,
|
||||
metrics: this.createInitialMetrics(),
|
||||
health: async () => cache.health(),
|
||||
dispose: async () => {
|
||||
// Cache disposal handled internally
|
||||
this.pools.delete(key);
|
||||
},
|
||||
};
|
||||
|
||||
this.pools.set(key, pool);
|
||||
return pool;
|
||||
// TODO: Implement cache creation with dynamic import
|
||||
throw new Error('Cache creation temporarily disabled');
|
||||
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to create cache pool', { name: poolConfig.name, error });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
createQueue(poolConfig: QueuePoolConfig): ConnectionPool<QueueManager> {
|
||||
createQueue(poolConfig: QueuePoolConfig): ConnectionPool<any> {
|
||||
const key = `queue:${poolConfig.name}`;
|
||||
|
||||
if (this.pools.has(key)) {
|
||||
|
|
@ -204,31 +164,9 @@ export class ConnectionFactory implements IConnectionFactory {
|
|||
});
|
||||
|
||||
try {
|
||||
// Initialize or get existing QueueManager instance
|
||||
const queueManager = QueueManager.getOrInitialize(poolConfig.config);
|
||||
|
||||
const pool: ConnectionPool<QueueManager> = {
|
||||
name: poolConfig.name,
|
||||
client: queueManager,
|
||||
metrics: this.createInitialMetrics(),
|
||||
health: async () => {
|
||||
try {
|
||||
// Check if QueueManager is initialized
|
||||
queueManager.getQueueNames();
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
},
|
||||
dispose: async () => {
|
||||
// QueueManager handles its own shutdown
|
||||
await queueManager.shutdown();
|
||||
this.pools.delete(key);
|
||||
},
|
||||
};
|
||||
|
||||
this.pools.set(key, pool);
|
||||
return pool;
|
||||
// TODO: Implement queue creation with dynamic import
|
||||
throw new Error('Queue creation temporarily disabled');
|
||||
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to create queue manager', { name: poolConfig.name, error });
|
||||
throw error;
|
||||
|
|
@ -243,11 +181,11 @@ export class ConnectionFactory implements IConnectionFactory {
|
|||
listPools(): Array<{ type: string; name: string; metrics: PoolMetrics }> {
|
||||
const result: Array<{ type: string; name: string; metrics: PoolMetrics }> = [];
|
||||
|
||||
for (const [key, pool] of this.pools.entries()) {
|
||||
const [type, ...nameParts] = key.split(':');
|
||||
for (const [key, pool] of this.pools) {
|
||||
const [type] = key.split(':');
|
||||
result.push({
|
||||
type: type || 'unknown',
|
||||
name: nameParts.join(':'),
|
||||
name: pool.name,
|
||||
metrics: pool.metrics,
|
||||
});
|
||||
}
|
||||
|
|
@ -256,25 +194,10 @@ export class ConnectionFactory implements IConnectionFactory {
|
|||
}
|
||||
|
||||
async disposeAll(): Promise<void> {
|
||||
this.logger.info('Disposing all connection pools', { count: this.pools.size });
|
||||
|
||||
const disposePromises: Promise<void>[] = [];
|
||||
for (const pool of this.pools.values()) {
|
||||
disposePromises.push(pool.dispose());
|
||||
}
|
||||
this.logger.info('Disposing all connection pools', { service: this.config.service });
|
||||
|
||||
const disposePromises = Array.from(this.pools.values()).map(pool => pool.dispose());
|
||||
await Promise.all(disposePromises);
|
||||
this.pools.clear();
|
||||
}
|
||||
|
||||
private createInitialMetrics(): PoolMetrics {
|
||||
return {
|
||||
created: new Date(),
|
||||
totalConnections: 0,
|
||||
activeConnections: 0,
|
||||
idleConnections: 0,
|
||||
waitingRequests: 0,
|
||||
errors: 0,
|
||||
};
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue