From 0c774495845fbef46a5a78171905f237881f09f7 Mon Sep 17 00:00:00 2001 From: Boki Date: Sat, 21 Jun 2025 22:30:19 -0400 Subject: [PATCH] work on new di system --- apps/data-ingestion/src/handlers/index.ts | 10 ++-- .../qm/operations/session.operations.ts | 6 +-- .../src/handlers/qm/qm.handler.ts | 49 +++++++++++++------ .../src/handlers/webshare/webshare.handler.ts | 6 +-- apps/data-ingestion/src/index.ts | 2 +- libs/core/di/src/adapters/service-adapter.ts | 48 ++++++++++++++++++ libs/core/di/src/index.ts | 3 +- libs/core/handlers/src/base/BaseHandler.ts | 42 ++++++++++++---- libs/core/handlers/src/index.ts | 2 + .../handlers/src/types/service-container.ts | 24 +++++++++ libs/core/types/src/handlers.ts | 8 +-- 11 files changed, 161 insertions(+), 39 deletions(-) create mode 100644 libs/core/di/src/adapters/service-adapter.ts create mode 100644 libs/core/handlers/src/types/service-container.ts diff --git a/apps/data-ingestion/src/handlers/index.ts b/apps/data-ingestion/src/handlers/index.ts index 9708f9c..3766052 100644 --- a/apps/data-ingestion/src/handlers/index.ts +++ b/apps/data-ingestion/src/handlers/index.ts @@ -4,6 +4,7 @@ */ import type { IDataIngestionServices } from '@stock-bot/di'; +import { createServiceAdapter } from '@stock-bot/di'; import { QMHandler } from './qm/qm.handler'; import { WebShareHandler } from './webshare/webshare.handler'; @@ -11,15 +12,18 @@ import { WebShareHandler } from './webshare/webshare.handler'; * Initialize and register all handlers */ export function initializeAllHandlers(services: IDataIngestionServices): void { + // Create generic service container adapter + const serviceContainer = createServiceAdapter(services); + // QM Handler - const qmHandler = new QMHandler(services); + const qmHandler = new QMHandler(serviceContainer); qmHandler.register(); // WebShare Handler - const webShareHandler = new WebShareHandler(services); + const webShareHandler = new WebShareHandler(serviceContainer); webShareHandler.register(); // TODO: Add other handlers here as they're converted - // const ibHandler = new IBHandler(services); + // const ibHandler = new IBHandler(serviceContainer); // ibHandler.register(); } \ No newline at end of file diff --git a/apps/data-ingestion/src/handlers/qm/operations/session.operations.ts b/apps/data-ingestion/src/handlers/qm/operations/session.operations.ts index b5d85c5..eb9a6b7 100644 --- a/apps/data-ingestion/src/handlers/qm/operations/session.operations.ts +++ b/apps/data-ingestion/src/handlers/qm/operations/session.operations.ts @@ -2,17 +2,17 @@ * QM Session Operations - Session creation and management */ +import type { ServiceContainer } from '@stock-bot/di'; import { OperationContext } from '@stock-bot/di'; import { isShutdownSignalReceived } from '@stock-bot/shutdown'; import { getRandomProxy } from '@stock-bot/utils'; -import type { ServiceContainer } from '@stock-bot/di'; +import { QM_CONFIG, QM_SESSION_IDS, SESSION_CONFIG, getQmHeaders } from '../shared/config'; import { QMSessionManager } from '../shared/session-manager'; -import { QM_SESSION_IDS, QM_CONFIG, SESSION_CONFIG, getQmHeaders } from '../shared/config'; import type { QMSession } from '../shared/types'; export async function createSessions(container: ServiceContainer): Promise { - const ctx = new OperationContext('qm-handler', 'create-sessions', container); + const ctx = OperationContext.create('qm-handler', 'create-sessions', {container}); try { ctx.logger.info('Creating QM sessions...'); diff --git a/apps/data-ingestion/src/handlers/qm/qm.handler.ts b/apps/data-ingestion/src/handlers/qm/qm.handler.ts index 3c076da..e1384a0 100644 --- a/apps/data-ingestion/src/handlers/qm/qm.handler.ts +++ b/apps/data-ingestion/src/handlers/qm/qm.handler.ts @@ -1,16 +1,16 @@ -import type { IDataIngestionServices } from '@stock-bot/di'; import { BaseHandler, Handler, Operation, QueueSchedule, - type ExecutionContext + type ExecutionContext, + type IServiceContainer } from '@stock-bot/handlers'; import type { SymbolSpiderJob } from './shared/types'; @Handler('qm') export class QMHandler extends BaseHandler { - constructor(services: IDataIngestionServices) { + constructor(services: IServiceContainer) { super(services); // Handler name read from @Handler decorator } @@ -21,28 +21,47 @@ export class QMHandler extends BaseHandler { description: 'Create and maintain QM sessions' }) async createSessions(input: unknown, context: ExecutionContext): Promise { - this.logger.info('Creating QM sessions with new DI pattern...'); + this.logger.info('Creating QM sessions...'); try { - // Check existing sessions in MongoDB - const sessionsCollection = this.mongodb.collection('qm_sessions'); - const existingSessions = await sessionsCollection.find({}).toArray(); + // Check existing sessions in cache + const sessionKey = 'qm:sessions:active'; + const existingSessions = await this.cache.get(sessionKey) || []; this.logger.info('Current QM sessions', { existing: existingSessions.length, action: 'creating_new_sessions' }); - // Cache session stats for monitoring - await this.cache.set('qm-sessions-count', existingSessions.length, 3600); - await this.cache.set('last-session-check', new Date().toISOString(), 1800); + // Create new session + const newSession = { + id: `qm-session-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`, + createdAt: new Date().toISOString(), + status: 'active', + provider: 'quotemedia', + // Add other session properties as needed + }; + + // Add to existing sessions + const updatedSessions = [...existingSessions, newSession]; + + // Store sessions in cache with 24 hour TTL (sessions are temporary) + await this.cache.set(sessionKey, updatedSessions, 86400); // 24 hours + + // Store session stats for monitoring + await this.cache.set('qm:sessions:count', updatedSessions.length, 3600); + await this.cache.set('qm:sessions:last-created', new Date().toISOString(), 1800); + + this.logger.info('QM session created', { + sessionId: newSession.id, + totalSessions: updatedSessions.length + }); - // For now, just return the current state - // TODO: Implement actual session creation logic using new DI pattern return { success: true, - existingSessions: existingSessions.length, - message: 'QM session check completed' + sessionId: newSession.id, + totalSessions: updatedSessions.length, + message: 'QM session created successfully' }; } catch (error) { @@ -90,7 +109,7 @@ export class QMHandler extends BaseHandler { @Operation('spider-symbol-search') @QueueSchedule('0 0 * * 0', { priority: 10, - immediately: true, + immediately: false, description: 'Comprehensive symbol search using QM API' }) async spiderSymbolSearch(payload: SymbolSpiderJob | undefined, context: ExecutionContext): Promise { diff --git a/apps/data-ingestion/src/handlers/webshare/webshare.handler.ts b/apps/data-ingestion/src/handlers/webshare/webshare.handler.ts index 983de20..3516126 100644 --- a/apps/data-ingestion/src/handlers/webshare/webshare.handler.ts +++ b/apps/data-ingestion/src/handlers/webshare/webshare.handler.ts @@ -1,16 +1,16 @@ -import type { IDataIngestionServices } from '@stock-bot/di'; import { BaseHandler, Handler, Operation, QueueSchedule, - type ExecutionContext + type ExecutionContext, + type IServiceContainer } from '@stock-bot/handlers'; import { updateProxies } from '@stock-bot/utils'; @Handler('webshare') export class WebShareHandler extends BaseHandler { - constructor(services: IDataIngestionServices) { + constructor(services: IServiceContainer) { super(services); } diff --git a/apps/data-ingestion/src/index.ts b/apps/data-ingestion/src/index.ts index 8ec3c15..0dc0f06 100644 --- a/apps/data-ingestion/src/index.ts +++ b/apps/data-ingestion/src/index.ts @@ -103,7 +103,7 @@ async function initializeServices() { const jobData = { handler: handlerName, operation: scheduledJob.operation, - payload: scheduledJob.payload || {}, + payload: scheduledJob.payload, // Don't default to {} - let it be undefined }; // Build job options from scheduled job config diff --git a/libs/core/di/src/adapters/service-adapter.ts b/libs/core/di/src/adapters/service-adapter.ts new file mode 100644 index 0000000..3c446e2 --- /dev/null +++ b/libs/core/di/src/adapters/service-adapter.ts @@ -0,0 +1,48 @@ +/** + * Service Adapter - Bridges specific service interfaces to generic IServiceContainer + * Allows handlers to be decoupled from specific service implementations + */ + +import type { IServiceContainer } from '@stock-bot/handlers'; +import type { IDataIngestionServices } from '../service-interfaces'; + +/** + * Adapter that converts IDataIngestionServices to IServiceContainer + * This allows handlers to use the generic container while still supporting + * the existing data-ingestion specific services + */ +export class DataIngestionServiceAdapter implements IServiceContainer { + constructor(private readonly dataServices: IDataIngestionServices) {} + + // Core infrastructure + get logger() { return this.dataServices.logger; } + get cache() { return this.dataServices.cache; } + get queue() { return this.dataServices.queue; } + get http() { + // HTTP client not in current data services - will be added when needed + return null; + } + + // Database clients + get mongodb() { return this.dataServices.mongodb; } + get postgres() { return this.dataServices.postgres; } + get questdb() { + // QuestDB not in current data services - will be added when needed + return null; + } + + // Optional extensions + get custom() { + return { + connectionFactory: this.dataServices.connectionFactory, + // Add other data-ingestion specific services here + }; + } +} + +/** + * Helper function to create service container adapter + */ +export function createServiceAdapter(dataServices: IDataIngestionServices): IServiceContainer { + return new DataIngestionServiceAdapter(dataServices); +} \ No newline at end of file diff --git a/libs/core/di/src/index.ts b/libs/core/di/src/index.ts index 102bcbd..24698b8 100644 --- a/libs/core/di/src/index.ts +++ b/libs/core/di/src/index.ts @@ -5,4 +5,5 @@ export * from './operation-context'; export * from './pool-size-calculator'; export * from './types'; export * from './service-interfaces'; -export * from './service-factory'; \ No newline at end of file +export * from './service-factory'; +export * from './adapters/service-adapter'; \ No newline at end of file diff --git a/libs/core/handlers/src/base/BaseHandler.ts b/libs/core/handlers/src/base/BaseHandler.ts index 779e195..9afae79 100644 --- a/libs/core/handlers/src/base/BaseHandler.ts +++ b/libs/core/handlers/src/base/BaseHandler.ts @@ -1,6 +1,6 @@ import { getLogger } from '@stock-bot/logger'; -import type { IDataIngestionServices, IExecutionContext } from '@stock-bot/di'; import type { IHandler, ExecutionContext } from '../types/types'; +import type { IServiceContainer } from '../types/service-container'; import { handlerRegistry, createJobHandler, type HandlerConfigWithSchedule } from '@stock-bot/types'; /** @@ -11,7 +11,7 @@ export abstract class BaseHandler implements IHandler { protected readonly logger; private handlerName: string; - constructor(protected readonly services: IDataIngestionServices, handlerName?: string) { + constructor(protected readonly services: IServiceContainer, handlerName?: string) { this.logger = getLogger(this.constructor.name); // Read handler name from decorator first, then fallback to parameter or class name const constructor = this.constructor as any; @@ -21,8 +21,10 @@ export abstract class BaseHandler implements IHandler { // Convenience getters for common services protected get mongodb() { return this.services.mongodb; } protected get postgres() { return this.services.postgres; } + protected get questdb() { return this.services.questdb; } protected get cache() { return this.services.cache; } protected get queue() { return this.services.queue; } + protected get http() { return this.services.http; } /** * Main execution method - automatically routes to decorated methods @@ -32,9 +34,20 @@ export abstract class BaseHandler implements IHandler { const constructor = this.constructor as any; const operations = constructor.__operations || []; + // Debug logging + this.logger.debug('Handler execute called', { + handler: this.handlerName, + operation, + availableOperations: operations.map((op: any) => ({ name: op.name, method: op.method })) + }); + // Find the operation metadata const operationMeta = operations.find((op: any) => op.name === operation); if (!operationMeta) { + this.logger.error('Operation not found', { + requestedOperation: operation, + availableOperations: operations.map((op: any) => op.name) + }); throw new Error(`Unknown operation: ${operation}`); } @@ -44,6 +57,11 @@ export abstract class BaseHandler implements IHandler { throw new Error(`Operation method '${operationMeta.method}' not found on handler`); } + this.logger.debug('Executing operation method', { + operation, + method: operationMeta.method + }); + return await method.call(this, input, context); } @@ -63,12 +81,14 @@ export abstract class BaseHandler implements IHandler { /** * Create execution context for operations */ - protected createExecutionContext(type: 'http' | 'queue' | 'scheduled', metadata: Record = {}): IExecutionContext { + protected createExecutionContext(type: 'http' | 'queue' | 'scheduled', metadata: Record = {}): ExecutionContext { return { type, - services: this.services, - metadata, - traceId: `${this.constructor.name}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}` + metadata: { + ...metadata, + timestamp: Date.now(), + traceId: `${this.constructor.name}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}` + } }; } @@ -124,10 +144,14 @@ export abstract class BaseHandler implements IHandler { }; handlerRegistry.registerWithSchedule(config); - this.logger.debug('Handler registered using decorator metadata', { + this.logger.info('Handler registered using decorator metadata', { handlerName, - operations: operations.length, - schedules: schedules.length + operations: operations.map((op: any) => ({ name: op.name, method: op.method })), + scheduledJobs: scheduledJobs.map((job: any) => ({ + operation: job.operation, + cronPattern: job.cronPattern, + immediately: job.immediately + })) }); } diff --git a/libs/core/handlers/src/index.ts b/libs/core/handlers/src/index.ts index cd63e6b..e39bc4d 100644 --- a/libs/core/handlers/src/index.ts +++ b/libs/core/handlers/src/index.ts @@ -17,6 +17,8 @@ export type { OperationMetadata, } from './types/types'; +export type { IServiceContainer } from './types/service-container'; + export { createJobHandler } from './types/types'; // Decorators diff --git a/libs/core/handlers/src/types/service-container.ts b/libs/core/handlers/src/types/service-container.ts new file mode 100644 index 0000000..86facab --- /dev/null +++ b/libs/core/handlers/src/types/service-container.ts @@ -0,0 +1,24 @@ +/** + * Universal Service Container for Handlers + * Simple, comprehensive container with all services available + */ + +/** + * Universal service container with all common services + * Designed to work across different service contexts (data-ingestion, processing, etc.) + */ +export interface IServiceContainer { + // Core infrastructure + readonly logger: any; // Logger instance + readonly cache: any; // Cache provider (Redis/Dragonfly) + readonly queue: any; // Queue manager (BullMQ) + readonly http: any; // HTTP client with proxy support + + // Database clients + readonly mongodb: any; // MongoDB client + readonly postgres: any; // PostgreSQL client + readonly questdb: any; // QuestDB client (time-series) + + // Optional extensions for future use + readonly custom?: Record; +} \ No newline at end of file diff --git a/libs/core/types/src/handlers.ts b/libs/core/types/src/handlers.ts index f524b99..b0430ba 100644 --- a/libs/core/types/src/handlers.ts +++ b/libs/core/types/src/handlers.ts @@ -3,15 +3,15 @@ * Shared types for handler system and queue operations */ -// Simple execution context - mostly queue for now +// Generic execution context - decoupled from service implementations export interface ExecutionContext { - type: 'queue'; // | 'event' - commented for future - serviceContainer?: any; // Will be typed properly when needed + type: 'http' | 'queue' | 'scheduled' | 'event'; metadata: { source?: string; jobId?: string; attempts?: number; - timestamp: number; + timestamp?: number; + traceId?: string; [key: string]: unknown; }; }