stock-bot/libs/core/di/src/service-factory.ts
2025-06-22 08:27:54 -04:00

227 lines
No EOL
7.1 KiB
TypeScript

/**
* Service Factory for creating and managing all application dependencies
*/
import { getLogger } from '@stock-bot/logger';
import { ConnectionFactory } from './connection-factory';
import { PoolSizeCalculator } from './pool-size-calculator';
import type {
IDataIngestionServices,
IServiceFactory,
IConnectionFactory,
IMongoDBClient,
IPostgreSQLClient
} from './service-interfaces';
import type { CacheProvider } from '@stock-bot/cache';
import type { QueueManager } from '@stock-bot/queue';
export class DataIngestionServiceFactory implements IServiceFactory {
/**
* Create all services with proper dependency injection
*/
async create(config: any): Promise<IDataIngestionServices> {
const logger = getLogger('data-ingestion-factory');
logger.info('Creating data ingestion services...');
// Create connection factory
const connectionFactory = new ConnectionFactory({
service: 'data-ingestion',
environment: config.environment || 'development',
pools: {
mongodb: { poolSize: 50 },
postgres: { poolSize: 30 },
cache: { poolSize: 20 },
queue: { poolSize: 1 }
}
}) as IConnectionFactory;
try {
// Create all database connections in parallel
const [mongoPool, postgresPool, cachePool, queuePool] = await Promise.all([
this.createMongoDBConnection(connectionFactory, config),
this.createPostgreSQLConnection(connectionFactory, config),
this.createCacheConnection(connectionFactory, config),
this.createQueueConnection(connectionFactory, config)
]);
// Note: Proxy manager initialization moved to Awilix container
const services: IDataIngestionServices = {
mongodb: mongoPool.client,
postgres: postgresPool.client,
cache: cachePool.client,
queue: queuePool.client,
logger,
connectionFactory
};
logger.info('All data ingestion services created successfully');
return services;
} catch (error) {
logger.error('Failed to create services', { error });
// Cleanup any partial connections
await connectionFactory.disposeAll().catch(cleanupError => {
logger.error('Error during cleanup', { error: cleanupError });
});
throw error;
}
}
/**
* Dispose all services and connections
*/
async dispose(services: IDataIngestionServices): Promise<void> {
const logger = services.logger;
logger.info('Disposing data ingestion services...');
try {
// Dispose connection factory (this will close all pools)
await services.connectionFactory.disposeAll();
logger.info('All services disposed successfully');
} catch (error) {
logger.error('Error disposing services', { error });
throw error;
}
}
/**
* Create MongoDB connection with optimized settings
*/
private async createMongoDBConnection(
connectionFactory: IConnectionFactory,
config: any
): Promise<{ client: IMongoDBClient }> {
const poolSize = PoolSizeCalculator.calculate('data-ingestion', 'batch-import');
return connectionFactory.createMongoDB({
name: 'data-ingestion',
config: {
uri: config.database.mongodb.uri,
database: config.database.mongodb.database,
host: config.database.mongodb.host,
port: config.database.mongodb.port,
username: config.database.mongodb.user,
password: config.database.mongodb.password,
authSource: config.database.mongodb.authSource,
poolSettings: {
maxPoolSize: poolSize.max,
minPoolSize: poolSize.min,
maxIdleTime: 30000,
}
},
maxConnections: poolSize.max,
minConnections: poolSize.min,
});
}
/**
* Create PostgreSQL connection with optimized settings
*/
private async createPostgreSQLConnection(
connectionFactory: IConnectionFactory,
config: any
): Promise<{ client: IPostgreSQLClient }> {
const poolSize = PoolSizeCalculator.calculate('data-ingestion');
return connectionFactory.createPostgreSQL({
name: 'data-ingestion',
config: {
host: config.database.postgres.host,
port: config.database.postgres.port,
database: config.database.postgres.database,
username: config.database.postgres.user,
password: config.database.postgres.password,
poolSettings: {
max: poolSize.max,
min: poolSize.min,
idleTimeoutMillis: 30000,
}
},
maxConnections: poolSize.max,
minConnections: poolSize.min,
});
}
/**
* Create cache connection
*/
private async createCacheConnection(
connectionFactory: IConnectionFactory,
config: any
): Promise<{ client: CacheProvider }> {
return connectionFactory.createCache({
name: 'data-ingestion',
config: {
host: config.database.dragonfly.host,
port: config.database.dragonfly.port,
db: config.database.dragonfly.db,
}
});
}
/**
* Create queue connection
*/
private async createQueueConnection(
connectionFactory: IConnectionFactory,
config: any
): Promise<{ client: QueueManager }> {
return connectionFactory.createQueue({
name: 'data-ingestion',
config: {
host: config.database.dragonfly.host,
port: config.database.dragonfly.port,
db: config.database.dragonfly.db || 1,
}
});
}
/**
* Enable dynamic pool sizing for production workloads
*/
async enableDynamicPoolSizing(services: IDataIngestionServices): Promise<void> {
const dynamicConfig = {
enabled: true,
minSize: 5,
maxSize: 100,
scaleUpThreshold: 70,
scaleDownThreshold: 30,
scaleUpIncrement: 10,
scaleDownIncrement: 5,
evaluationInterval: 30000,
};
try {
// Set dynamic config for MongoDB
if (services.mongodb && typeof services.mongodb.setDynamicPoolConfig === 'function') {
services.mongodb.setDynamicPoolConfig(dynamicConfig);
services.logger.info('Dynamic pool sizing enabled for MongoDB');
}
// Set dynamic config for PostgreSQL
if (services.postgres && typeof services.postgres.setDynamicPoolConfig === 'function') {
services.postgres.setDynamicPoolConfig(dynamicConfig);
services.logger.info('Dynamic pool sizing enabled for PostgreSQL');
}
} catch (error) {
services.logger.warn('Failed to enable dynamic pool sizing', { error });
}
}
}
/**
* Convenience function to create services
*/
export async function createDataIngestionServices(config: any): Promise<IDataIngestionServices> {
const factory = new DataIngestionServiceFactory();
return factory.create(config);
}
/**
* Convenience function to dispose services
*/
export async function disposeDataIngestionServices(services: IDataIngestionServices): Promise<void> {
const factory = new DataIngestionServiceFactory();
return factory.dispose(services);
}