import { createNamespacedCache } from '@stock-bot/cache'; import { getLogger } from '@stock-bot/logger'; import type { ExecutionContext, HandlerConfigWithSchedule, HandlerMetadata, IHandler, IServiceContainer, JobHandler, ServiceTypes, } from '@stock-bot/types'; import { fetch } from '@stock-bot/utils'; import type { Collection } from 'mongodb'; // Handler registry is now injected, not imported import { createJobHandler } from '../utils/create-job-handler'; /** * Job scheduling options */ export interface JobScheduleOptions { jobId?: string; // Optional job ID for custom identification delay?: number; priority?: number; attempts?: number; removeOnComplete?: number; removeOnFail?: number; backoff?: { type: 'exponential' | 'fixed'; delay: number; }; repeat?: { pattern?: string; key?: string; limit?: number; every?: number; immediately?: boolean; }; } /** * Abstract base class for all handlers with improved DI * Provides common functionality and structure for queue/event operations */ export abstract class BaseHandler implements IHandler { // Direct service properties - flattened for cleaner access with proper types readonly logger: TServices['logger']; readonly cache: TServices['cache']; readonly globalCache: TServices['globalCache']; readonly queueManager: TServices['queueManager']; readonly queue!: TServices['queue']; // Specific queue for this handler - initialized if queueManager exists readonly proxy: TServices['proxy']; readonly browser: TServices['browser']; readonly mongodb: TServices['mongodb']; readonly postgres: TServices['postgres']; readonly questdb: TServices['questdb']; private handlerName: string; constructor(services: TServices, handlerName?: string) { // Read handler name from decorator first, then fallback to parameter or class name const constructor = this.constructor as any; this.handlerName = constructor.__handlerName || handlerName || this.constructor.name.toLowerCase(); // Flatten all services onto the handler instance this.logger = getLogger(this.constructor.name) as TServices['logger']; this.cache = services.cache as TServices['cache']; this.globalCache = services.globalCache as TServices['globalCache']; this.queueManager = services.queueManager as TServices['queueManager']; this.proxy = services.proxy as TServices['proxy']; this.browser = services.browser as TServices['browser']; this.mongodb = services.mongodb as TServices['mongodb']; this.postgres = services.postgres as TServices['postgres']; this.questdb = services.questdb as TServices['questdb']; // Get the specific queue for this handler if (this.queueManager) { this.queue = this.queueManager.getQueue(this.handlerName) as TServices['queue']; } } /** * Main execution method - automatically routes to decorated methods * Works with queue (events commented for future) */ async execute(operation: string, input: unknown, context: ExecutionContext): Promise { const constructor = this.constructor as any; const operations = constructor.__operations || []; // Debug logging this.logger.debug('Handler execute called', { handler: this.handlerName, operation, availableOperations: operations.map((op: any) => ({ name: op.name, method: op.method })), }); // Find the operation metadata const operationMeta = operations.find((op: any) => op.name === operation); if (!operationMeta) { this.logger.error('Operation not found', { requestedOperation: operation, availableOperations: operations.map((op: any) => op.name), }); throw new Error(`Unknown operation: ${operation}`); } // Get the method from the instance and call it const method = (this as any)[operationMeta.method]; if (typeof method !== 'function') { throw new Error(`Operation method '${operationMeta.method}' not found on handler`); } this.logger.debug('Executing operation method', { operation, method: operationMeta.method, }); return await method.call(this, input, context); } async scheduleOperation( operation: string, payload: unknown, options?: JobScheduleOptions ): Promise { if (!this.queue) { throw new Error('Queue service is not available for this handler'); } const jobData = { handler: this.handlerName, operation, payload, }; await this.queue.add(operation, jobData, options || {}); } /** * Create execution context for operations */ protected createExecutionContext( type: 'http' | 'queue' | 'scheduled', metadata: Record = {} ): ExecutionContext { return { type, metadata: { ...metadata, timestamp: Date.now(), traceId: `${this.constructor.name}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`, }, }; } /** * Helper methods for common operations */ /** * Get a MongoDB collection with type safety */ protected collection(name: string): Collection { if (!this.mongodb) { throw new Error('MongoDB service is not available'); } return this.mongodb.collection(name); } /** * Create a sub-namespaced cache for specific operations * Example: handler 'webshare' creates namespace 'webshare:api' -> keys will be 'cache:data-ingestion:webshare:api:*' */ protected createNamespacedCache(subNamespace: string) { return createNamespacedCache(this.cache || null, `${this.handlerName}:${subNamespace}`); } /** * Set cache with handler-prefixed key */ protected async cacheSet(key: string, value: any, ttl?: number): Promise { if (!this.cache) { return; } // Don't add 'cache:' prefix since the cache already has its own prefix return this.cache.set(`${this.handlerName}:${key}`, value, ttl); } /** * Get cache with handler-prefixed key */ protected async cacheGet(key: string): Promise { if (!this.cache) { return null; } // Don't add 'cache:' prefix since the cache already has its own prefix return this.cache.get(`${this.handlerName}:${key}`); } /** * Delete cache with handler-prefixed key */ protected async cacheDel(key: string): Promise { if (!this.cache) { return; } // Don't add 'cache:' prefix since the cache already has its own prefix return this.cache.del(`${this.handlerName}:${key}`); } /** * Set global cache with key */ protected async globalCacheSet(key: string, value: any, ttl?: number): Promise { if (!this.globalCache) { return; } return this.globalCache.set(key, value, ttl); } /** * Get global cache with key */ protected async globalCacheGet(key: string): Promise { if (!this.globalCache) { return null; } return this.globalCache.get(key); } /** * Delete global cache with key */ protected async globalCacheDel(key: string): Promise { if (!this.globalCache) { return; } return this.globalCache.del(key); } /** * Schedule operation with delay in seconds */ protected async scheduleIn( operation: string, payload: unknown, delaySeconds: number, additionalOptions?: Omit ): Promise { return this.scheduleOperation(operation, payload, { delay: delaySeconds * 1000, ...additionalOptions, }); } /** * Log with handler context */ protected log(level: 'info' | 'warn' | 'error' | 'debug', message: string, meta?: any): void { this.logger[level](message, { handler: this.handlerName, ...meta }); } /** * HTTP client helper using fetch from utils */ protected get http() { return { get: (url: string, options?: any) => fetch(url, { ...options, method: 'GET', logger: this.logger }), post: (url: string, data?: any, options?: any) => fetch(url, { ...options, method: 'POST', body: JSON.stringify(data), headers: { 'Content-Type': 'application/json', ...options?.headers }, logger: this.logger, }), put: (url: string, data?: any, options?: any) => fetch(url, { ...options, method: 'PUT', body: JSON.stringify(data), headers: { 'Content-Type': 'application/json', ...options?.headers }, logger: this.logger, }), delete: (url: string, options?: any) => fetch(url, { ...options, method: 'DELETE', logger: this.logger }), }; } /** * Check if a service is available */ protected hasService(name: keyof IServiceContainer): boolean { const service = this[name as keyof this]; return service !== null; } /** * Event methods - commented for future */ // protected async publishEvent(eventName: string, payload: unknown): Promise { // const eventBus = await this.container.resolveAsync('eventBus'); // await eventBus.publish(eventName, payload); // } /** * Create handler configuration with job handlers * This is used by the scanner to create the actual handler configuration */ createHandlerConfig(): HandlerConfigWithSchedule { const metadata = (this.constructor as typeof BaseHandler).extractMetadata(); if (!metadata) { throw new Error('Handler metadata not found'); } const operationHandlers: Record = {}; for (const opName of metadata.operations) { operationHandlers[opName] = createJobHandler(async (payload: any) => { const context: ExecutionContext = { type: 'queue', metadata: { source: 'queue', timestamp: Date.now() }, }; return await this.execute(opName, payload, context); }); } return { name: metadata.name, operations: operationHandlers, scheduledJobs: metadata.scheduledJobs, }; } /** * Extract handler metadata from decorators * This returns metadata only - actual handler instances are created by the scanner */ static extractMetadata(): HandlerMetadata | null { const constructor = this as any; const handlerName = constructor.__handlerName; if (!handlerName) { return null; } const operations = constructor.__operations || []; const schedules = constructor.__schedules || []; // Create scheduled jobs from decorator metadata const scheduledJobs = schedules.map((schedule: any) => { // Find the operation name from the method name const operation = operations.find((op: any) => op.method === schedule.operation); return { type: `${handlerName}-${schedule.operation}`, operation: operation?.name || schedule.operation, cronPattern: schedule.cronPattern, priority: schedule.priority || 5, immediately: schedule.immediately || false, description: schedule.description || `${handlerName} ${schedule.operation}`, payload: schedule.payload, // Include payload from decorator batch: schedule.batch, // Include batch config from decorator }; }); return { name: handlerName, operations: operations.map((op: any) => op.name), scheduledJobs, description: constructor.__description, }; } /** * Override this method to provide payloads for scheduled jobs * @param operation The operation name that needs a payload * @returns The payload for the scheduled job, or undefined */ protected getScheduledJobPayload?(operation: string): any; /** * Lifecycle hooks - can be overridden by subclasses */ async onInit?(): Promise; async onStart?(): Promise; async onStop?(): Promise; async onDispose?(): Promise; } /** * Specialized handler for operations that have scheduled jobs */ export abstract class ScheduledHandler extends BaseHandler { /** * Get scheduled job configurations for this handler * Override in subclasses to define schedules */ getScheduledJobs?(): Array<{ operation: string; cronPattern: string; priority?: number; immediately?: boolean; description?: string; }>; }