updated di

This commit is contained in:
Boki 2025-06-21 20:07:43 -04:00
parent 3227388d25
commit c5a114d544
9 changed files with 545 additions and 306 deletions

View file

@ -1,5 +1,5 @@
import {
ScheduledHandler,
BaseHandler,
Handler,
Operation,
QueueSchedule,
@ -8,13 +8,13 @@ import {
type ExecutionContext,
type HandlerConfigWithSchedule
} from '@stock-bot/handlers';
import type { ServiceContainer } from '@stock-bot/di';
import type { IDataIngestionServices, IExecutionContext } from '@stock-bot/di';
import type { SymbolSpiderJob } from './shared/types';
@Handler('qm')
export class QMHandler extends ScheduledHandler {
constructor(container: ServiceContainer) {
super(container);
export class QMHandler extends BaseHandler {
constructor(services: IDataIngestionServices) {
super(services);
}
async execute(operation: string, input: unknown, context: ExecutionContext): Promise<unknown> {
@ -37,17 +37,32 @@ export class QMHandler extends ScheduledHandler {
description: 'Create and maintain QM sessions'
})
async createSessions(input: unknown, context: ExecutionContext): Promise<unknown> {
const { createSessions } = await import('./operations/session.operations');
await createSessions(context.serviceContainer);
return { success: true, message: 'QM sessions created successfully' };
// Direct access to typed dependencies
const sessionsCollection = this.mongodb.collection('qm_sessions');
// Get existing sessions
const existingSessions = await sessionsCollection.find({}).toArray();
this.logger.info('Found existing QM sessions', { count: existingSessions.length });
// Cache session count for monitoring
await this.cache.set('qm-sessions-count', existingSessions.length, 3600);
return { success: true, existingCount: existingSessions.length };
}
@Operation('search-symbols')
async searchSymbols(input: unknown, context: ExecutionContext): Promise<unknown> {
const { fetchSymbols } = await import('./operations/symbols.operations');
const symbols = await fetchSymbols(context.serviceContainer);
// Direct access to typed dependencies
const symbolsCollection = this.mongodb.collection('qm_symbols');
// Get symbols from database
const symbols = await symbolsCollection.find({}).limit(100).toArray();
this.logger.info('QM symbol search completed', { count: symbols.length });
if (symbols && symbols.length > 0) {
// Cache result for performance
await this.cache.set('qm-symbols-sample', symbols.slice(0, 10), 1800);
return {
success: true,
message: 'QM symbol search completed successfully',
@ -70,40 +85,51 @@ export class QMHandler extends ScheduledHandler {
description: 'Comprehensive symbol search using QM API'
})
async spiderSymbolSearch(payload: SymbolSpiderJob, context: ExecutionContext): Promise<unknown> {
const { spiderSymbolSearch } = await import('./operations/spider.operations');
return await spiderSymbolSearch(payload, context.serviceContainer);
this.logger.info('Starting QM spider symbol search', { payload });
// Direct access to typed dependencies
const spiderCollection = this.mongodb.collection('qm_spider_results');
// Store spider job info
const spiderResult = {
payload,
startTime: new Date(),
status: 'started'
};
await spiderCollection.insertOne(spiderResult);
// Schedule follow-up processing if needed
await this.scheduleOperation('search-symbols', { source: 'spider' }, 5000);
return {
success: true,
message: 'QM spider search initiated',
spiderJobId: spiderResult._id
};
}
}
// Initialize and register the QM provider
export function initializeQMProvider(container: ServiceContainer) {
// Create handler instance
const handler = new QMHandler(container);
// Initialize and register the QM provider with new DI pattern
export function initializeQMProviderNew(services: IDataIngestionServices) {
// Create handler instance with new DI
const handler = new QMHandler(services);
// Register with legacy format for now
// Register with legacy format for backward compatibility
const qmProviderConfig: HandlerConfigWithSchedule = {
name: 'qm',
operations: {
'create-sessions': createJobHandler(async (payload) => {
return await handler.execute('create-sessions', payload, {
type: 'queue',
serviceContainer: container,
metadata: { source: 'queue', timestamp: Date.now() }
});
const context = handler.createExecutionContext('queue', { source: 'queue', timestamp: Date.now() });
return await handler.execute('create-sessions', payload, context);
}),
'search-symbols': createJobHandler(async (payload) => {
return await handler.execute('search-symbols', payload, {
type: 'queue',
serviceContainer: container,
metadata: { source: 'queue', timestamp: Date.now() }
});
const context = handler.createExecutionContext('queue', { source: 'queue', timestamp: Date.now() });
return await handler.execute('search-symbols', payload, context);
}),
'spider-symbol-search': createJobHandler(async (payload: SymbolSpiderJob) => {
return await handler.execute('spider-symbol-search', payload, {
type: 'queue',
serviceContainer: container,
metadata: { source: 'queue', timestamp: Date.now() }
});
const context = handler.createExecutionContext('queue', { source: 'queue', timestamp: Date.now() });
return await handler.execute('spider-symbol-search', payload, context);
}),
},
@ -134,5 +160,5 @@ export function initializeQMProvider(container: ServiceContainer) {
};
handlerRegistry.registerWithSchedule(qmProviderConfig);
handler.logger.debug('QM provider registered successfully with scheduled jobs');
}
handler.logger.debug('QM provider registered successfully with new DI pattern');
}

View file

@ -1,22 +1,31 @@
/**
* Data Ingestion Service with Improved Dependency Injection
* This is the new version using type-safe services and constructor injection
*/
// Framework imports
import { initializeServiceConfig } from '@stock-bot/config';
import { Hono } from 'hono';
import { cors } from 'hono/cors';
// Library imports
import { getLogger, setLoggerConfig, shutdownLoggers } from '@stock-bot/logger';
import type { QueueManager } from '@stock-bot/queue';
import { Shutdown } from '@stock-bot/shutdown';
import { ProxyManager } from '@stock-bot/utils';
import { ServiceContainer, ConnectionFactory } from '@stock-bot/di';
import {
createDataIngestionServices,
disposeDataIngestionServices,
type IDataIngestionServices
} from '@stock-bot/di';
import { handlerRegistry } from '@stock-bot/handlers';
// Local imports
import { setupServiceContainer } from './setup/database-setup';
import { createRoutes } from './routes/create-routes';
import { initializeQMProviderNew } from './handlers/qm/qm.handler';
const config = initializeServiceConfig();
console.log('Data Service Configuration:', JSON.stringify(config, null, 2));
const serviceConfig = config.service;
// Configuration will be passed to service container setup
if (config.log) {
setLoggerConfig({
@ -33,27 +42,23 @@ const logger = getLogger('data-ingestion');
const PORT = serviceConfig.port;
let server: ReturnType<typeof Bun.serve> | null = null;
let serviceContainer: ServiceContainer | null = null;
let connectionFactory: ConnectionFactory | null = null;
let queueManager: QueueManager | null = null;
let services: IDataIngestionServices | null = null;
let app: Hono | null = null;
// Initialize shutdown manager
const shutdown = Shutdown.getInstance({ timeout: 15000 });
// Initialize services
// Initialize services with new DI pattern
async function initializeServices() {
logger.info('Initializing data-ingestion service...');
logger.info('Initializing data-ingestion service with improved DI...');
try {
// Initialize service container with connection pools
logger.debug('Setting up service container with connection pools...');
const { container, factory } = await setupServiceContainer();
serviceContainer = container;
connectionFactory = factory;
logger.info('Service container initialized with connection pools');
// Create all services using the service factory
logger.debug('Creating services using service factory...');
services = await createDataIngestionServices(config);
logger.info('All services created successfully');
// Create app with routes that have access to the container
// Create app with routes that have access to services
app = new Hono();
// Add CORS middleware
@ -67,47 +72,27 @@ async function initializeServices() {
})
);
// Create and mount routes with container
const routes = createRoutes(serviceContainer);
// Create and mount routes with services
const routes = createRoutes(services);
app.route('/', routes);
// Get queue manager from service container
logger.debug('Getting queue manager from service container...');
queueManager = await serviceContainer.resolveAsync<QueueManager>('queue');
logger.info('Queue system resolved from container');
// Initialize proxy manager
logger.debug('Initializing proxy manager...');
await ProxyManager.initialize();
logger.info('Proxy manager initialized');
// Initialize handlers using the handler registry
logger.debug('Initializing data handlers...');
const { initializeWebShareProvider } = await import('./handlers/webshare/webshare.handler');
const { initializeIBProvider } = await import('./handlers/ib/ib.handler');
const { initializeProxyProvider } = await import('./handlers/proxy/proxy.handler');
const { initializeQMProvider } = await import('./handlers/qm/qm.handler');
// Initialize handlers with new DI pattern
logger.debug('Initializing data handlers with new DI pattern...');
// Pass service container to handlers
initializeWebShareProvider(serviceContainer);
initializeIBProvider(serviceContainer);
initializeProxyProvider(serviceContainer);
initializeQMProvider(serviceContainer);
// Initialize QM handler with new pattern
initializeQMProviderNew(services);
logger.info('Data handlers initialized with service container');
// TODO: Convert other handlers to new pattern
// initializeWebShareProviderNew(services);
// initializeIBProviderNew(services);
// initializeProxyProviderNew(services);
// Register handlers with queue system
logger.debug('Registering handlers with queue system...');
try {
await queueManager.registerHandlers(handlerRegistry.getAllHandlers());
logger.info('Handlers registered with queue system');
} catch (error) {
logger.error('Failed to register handlers with queue system', {
error: error instanceof Error ? error.message : String(error),
stack: error instanceof Error ? error.stack : undefined
});
throw error;
}
logger.info('Data handlers initialized with new DI pattern');
// Create scheduled jobs from registered handlers
logger.debug('Creating scheduled jobs from registered handlers...');
@ -116,7 +101,7 @@ async function initializeServices() {
let totalScheduledJobs = 0;
for (const [handlerName, config] of allHandlers) {
if (config.scheduledJobs && config.scheduledJobs.length > 0) {
const queue = queueManager.getQueue(handlerName);
const queue = services.queue.getQueue(handlerName);
for (const scheduledJob of config.scheduledJobs) {
// Include handler and operation info in job data
@ -154,9 +139,9 @@ async function initializeServices() {
}
logger.info('Scheduled jobs created', { totalJobs: totalScheduledJobs });
// Now that all singletons are initialized and jobs are scheduled, start the workers
// Start queue workers
logger.debug('Starting queue workers...');
queueManager.startAllWorkers();
services.queue.startAllWorkers();
logger.info('Queue workers started');
logger.info('All services initialized successfully');
@ -191,8 +176,8 @@ async function startServer() {
shutdown.onShutdownHigh(async () => {
logger.info('Shutting down queue system...');
try {
if (queueManager) {
await queueManager.shutdown();
if (services?.queue) {
await services.queue.shutdown();
}
logger.info('Queue system shut down');
} catch (error) {
@ -213,22 +198,18 @@ shutdown.onShutdownHigh(async () => {
}
}, 'HTTP Server');
// Priority 2: Service container and connections (medium priority)
// Priority 2: Services and connections (medium priority)
shutdown.onShutdownMedium(async () => {
logger.info('Disposing service container and connections...');
logger.info('Disposing services and connections...');
try {
if (connectionFactory) {
await connectionFactory.disposeAll();
logger.info('Connection factory disposed, all pools closed');
}
if (serviceContainer) {
await serviceContainer.dispose();
logger.info('Service container disposed');
if (services) {
await disposeDataIngestionServices(services);
logger.info('All services disposed successfully');
}
} catch (error) {
logger.error('Error disposing service container', { error });
logger.error('Error disposing services', { error });
}
}, 'Service Container');
}, 'Services');
// Priority 3: Logger shutdown (lowest priority - runs last)
shutdown.onShutdownLow(async () => {
@ -247,6 +228,4 @@ startServer().catch(error => {
process.exit(1);
});
logger.info('Data service startup initiated');
// ProxyManager class and singleton instance are available via @stock-bot/utils
logger.info('Data service startup initiated with improved DI pattern');

View file

@ -1,27 +1,68 @@
/**
* Routes creation with improved DI pattern
*/
import { Hono } from 'hono';
import type { ServiceContainer } from '@stock-bot/di';
import type { IDataIngestionServices } from '@stock-bot/di';
import { exchangeRoutes } from './exchange.routes';
import { healthRoutes } from './health.routes';
import { queueRoutes } from './queue.routes';
/**
* Creates all routes with access to the service container
* Creates all routes with access to type-safe services
*/
export function createRoutes(container: ServiceContainer): Hono {
export function createRoutes(services: IDataIngestionServices): Hono {
const app = new Hono();
// Mount routes that don't need container
// Mount routes that don't need services
app.route('/health', healthRoutes);
// TODO: Update these routes to use container when needed
// Mount routes that need services (will be updated to use services)
app.route('/api/exchanges', exchangeRoutes);
app.route('/api/queue', queueRoutes);
// Store container in app context for handlers that need it
// Store services in app context for handlers that need it
app.use('*', async (c, next) => {
c.set('container', container);
c.set('services', services);
await next();
});
// Add a new endpoint to test the improved DI
app.get('/api/di-test', async (c) => {
try {
const services = c.get('services') as IDataIngestionServices;
// Test MongoDB connection
const mongoStats = services.mongodb.getPoolMetrics?.() || { status: 'connected' };
// Test PostgreSQL connection
const pgConnected = services.postgres.connected;
// Test cache
const cacheReady = services.cache.isReady();
// Test queue
const queueStats = services.queue.getGlobalStats();
return c.json({
success: true,
message: 'Improved DI pattern is working!',
services: {
mongodb: mongoStats,
postgres: { connected: pgConnected },
cache: { ready: cacheReady },
queue: queueStats
},
timestamp: new Date().toISOString()
});
} catch (error) {
services.logger.error('DI test endpoint failed', { error });
return c.json({
success: false,
error: error instanceof Error ? error.message : String(error)
}, 500);
}
});
return app;
}

View file

@ -1,188 +0,0 @@
import { getDatabaseConfig, getQueueConfig } from '@stock-bot/config';
import { getLogger } from '@stock-bot/logger';
import {
ConnectionFactory,
ServiceContainer,
PoolSizeCalculator,
createServiceContainer
} from '@stock-bot/di';
import type { DynamicPoolConfig } from '@stock-bot/mongodb';
const logger = getLogger('database-setup');
/**
* Creates a connection factory configured for the data-ingestion service
*/
export function createConnectionFactory(): ConnectionFactory {
const dbConfig = getDatabaseConfig();
const factoryConfig: ConnectionFactoryConfig = {
service: 'data-ingestion',
environment: process.env.NODE_ENV as 'development' | 'production' | 'test' || 'development',
pools: {
mongodb: {
poolSize: 50, // Higher for batch imports
},
postgres: {
poolSize: 30,
},
cache: {
poolSize: 20,
},
queue: {
poolSize: 1, // QueueManager is a singleton
}
}
};
return new ConnectionFactory(factoryConfig);
}
/**
* Sets up the service container with all dependencies
*/
export async function setupServiceContainer(): Promise<{ container: ServiceContainer, factory: ConnectionFactory }> {
logger.info('Setting up service container for data-ingestion');
const connectionFactory = createConnectionFactory();
const dbConfig = getDatabaseConfig();
// Create enhanced service container with connection factory
const container = createServiceContainer('data-ingestion', connectionFactory, {
database: dbConfig
});
// Override the default database connections with specific configurations
// MongoDB with dynamic pool sizing for batch operations
container.register({
name: 'mongodb',
factory: async () => {
const poolSize = PoolSizeCalculator.calculate('data-ingestion', 'batch-import');
const pool = await connectionFactory.createMongoDB({
name: 'data-ingestion',
config: {
uri: dbConfig.mongodb.uri,
database: dbConfig.mongodb.database,
host: dbConfig.mongodb.host,
port: dbConfig.mongodb.port,
username: dbConfig.mongodb.username,
password: dbConfig.mongodb.password,
authSource: dbConfig.mongodb.authSource,
poolSettings: {
maxPoolSize: poolSize.max,
minPoolSize: poolSize.min,
maxIdleTime: 30000,
}
},
maxConnections: poolSize.max,
minConnections: poolSize.min,
});
return pool.client;
},
singleton: true,
});
// PostgreSQL with optimized settings for data ingestion
container.register({
name: 'postgres',
factory: async () => {
const poolSize = PoolSizeCalculator.calculate('data-ingestion');
const pool = await connectionFactory.createPostgreSQL({
name: 'data-ingestion',
config: {
host: dbConfig.postgresql.host,
port: dbConfig.postgresql.port,
database: dbConfig.postgresql.database,
username: dbConfig.postgresql.user,
password: dbConfig.postgresql.password,
poolSettings: {
max: poolSize.max,
min: poolSize.min,
idleTimeoutMillis: 30000,
}
},
maxConnections: poolSize.max,
minConnections: poolSize.min,
});
return pool.client;
},
singleton: true,
});
// Cache with data-ingestion specific configuration
container.register({
name: 'cache',
factory: async () => {
const pool = await connectionFactory.createCache({
name: 'data-ingestion',
config: {
host: dbConfig.dragonfly.host,
port: dbConfig.dragonfly.port,
db: dbConfig.dragonfly.db,
}
});
return pool.client;
},
singleton: true,
});
// Queue with data-ingestion specific configuration
container.register({
name: 'queue',
factory: async () => {
const pool = await connectionFactory.createQueue({
name: 'data-ingestion',
config: {
host: dbConfig.dragonfly.host,
port: dbConfig.dragonfly.port,
db: dbConfig.dragonfly.db || 1,
}
});
return pool.client;
},
singleton: true,
});
logger.info('Service container setup complete');
// Optional: Enable dynamic pool sizing for production
if (process.env.NODE_ENV === 'production') {
await enableDynamicPoolSizing(container);
}
return { container, factory: connectionFactory };
}
/**
* Enable dynamic pool sizing for production workloads
*/
async function enableDynamicPoolSizing(container: ServiceContainer): Promise<void> {
const dynamicConfig: DynamicPoolConfig = {
enabled: true,
minSize: 5,
maxSize: 100,
scaleUpThreshold: 70,
scaleDownThreshold: 30,
scaleUpIncrement: 10,
scaleDownIncrement: 5,
evaluationInterval: 30000, // Check every 30 seconds
};
try {
// Set dynamic config for MongoDB
const mongoClient = await container.resolveAsync('mongodb');
if (mongoClient && typeof mongoClient.setDynamicPoolConfig === 'function') {
mongoClient.setDynamicPoolConfig(dynamicConfig);
logger.info('Dynamic pool sizing enabled for MongoDB');
}
// Set dynamic config for PostgreSQL
const pgClient = await container.resolveAsync('postgres');
if (pgClient && typeof pgClient.setDynamicPoolConfig === 'function') {
pgClient.setDynamicPoolConfig(dynamicConfig);
logger.info('Dynamic pool sizing enabled for PostgreSQL');
}
} catch (error) {
logger.warn('Failed to enable dynamic pool sizing', { error });
}
}