stock-bot/libs/core/handlers/src/base/BaseHandler.ts
2025-07-06 18:53:02 -04:00

403 lines
12 KiB
TypeScript

import { createNamespacedCache } from '@stock-bot/cache';
import { getLogger } from '@stock-bot/logger';
import type {
ExecutionContext,
HandlerConfigWithSchedule,
HandlerMetadata,
IHandler,
IServiceContainer,
JobHandler,
ServiceTypes,
} from '@stock-bot/types';
import { fetch } from '@stock-bot/utils';
import type { Collection } from 'mongodb';
// Handler registry is now injected, not imported
import { createJobHandler } from '../utils/create-job-handler';
/**
* Job scheduling options
*/
export interface JobScheduleOptions {
jobId?: string; // Optional job ID for custom identification
delay?: number;
priority?: number;
attempts?: number;
removeOnComplete?: number;
removeOnFail?: number;
backoff?: {
type: 'exponential' | 'fixed';
delay: number;
};
repeat?: {
pattern?: string;
key?: string;
limit?: number;
every?: number;
immediately?: boolean;
};
}
/**
* Abstract base class for all handlers with improved DI
* Provides common functionality and structure for queue/event operations
*/
export abstract class BaseHandler<TServices extends ServiceTypes = ServiceTypes> implements IHandler {
// Direct service properties - flattened for cleaner access with proper types
readonly logger: TServices['logger'];
readonly cache: TServices['cache'];
readonly globalCache: TServices['globalCache'];
readonly queueManager: TServices['queueManager'];
readonly queue!: TServices['queue']; // Specific queue for this handler - initialized if queueManager exists
readonly proxy: TServices['proxy'];
readonly browser: TServices['browser'];
readonly mongodb: TServices['mongodb'];
readonly postgres: TServices['postgres'];
readonly questdb: TServices['questdb'];
private handlerName: string;
constructor(services: TServices, handlerName?: string) {
// Read handler name from decorator first, then fallback to parameter or class name
const constructor = this.constructor as any;
this.handlerName =
constructor.__handlerName || handlerName || this.constructor.name.toLowerCase();
// Flatten all services onto the handler instance
this.logger = getLogger(this.constructor.name) as TServices['logger'];
this.cache = services.cache as TServices['cache'];
this.globalCache = services.globalCache as TServices['globalCache'];
this.queueManager = services.queueManager as TServices['queueManager'];
this.proxy = services.proxy as TServices['proxy'];
this.browser = services.browser as TServices['browser'];
this.mongodb = services.mongodb as TServices['mongodb'];
this.postgres = services.postgres as TServices['postgres'];
this.questdb = services.questdb as TServices['questdb'];
// Get the specific queue for this handler
if (this.queueManager) {
this.queue = this.queueManager.getQueue(this.handlerName) as TServices['queue'];
}
}
/**
* Main execution method - automatically routes to decorated methods
* Works with queue (events commented for future)
*/
async execute(operation: string, input: unknown, context: ExecutionContext): Promise<unknown> {
const constructor = this.constructor as any;
const operations = constructor.__operations || [];
// Debug logging
this.logger.debug('Handler execute called', {
handler: this.handlerName,
operation,
availableOperations: operations.map((op: any) => ({ name: op.name, method: op.method })),
});
// Find the operation metadata
const operationMeta = operations.find((op: any) => op.name === operation);
if (!operationMeta) {
this.logger.error('Operation not found', {
requestedOperation: operation,
availableOperations: operations.map((op: any) => op.name),
});
throw new Error(`Unknown operation: ${operation}`);
}
// Get the method from the instance and call it
const method = (this as any)[operationMeta.method];
if (typeof method !== 'function') {
throw new Error(`Operation method '${operationMeta.method}' not found on handler`);
}
this.logger.debug('Executing operation method', {
operation,
method: operationMeta.method,
});
return await method.call(this, input, context);
}
async scheduleOperation(
operation: string,
payload: unknown,
options?: JobScheduleOptions
): Promise<void> {
if (!this.queue) {
throw new Error('Queue service is not available for this handler');
}
const jobData = {
handler: this.handlerName,
operation,
payload,
};
await this.queue.add(operation, jobData, options || {});
}
/**
* Create execution context for operations
*/
protected createExecutionContext(
type: 'http' | 'queue' | 'scheduled',
metadata: Record<string, any> = {}
): ExecutionContext {
return {
type,
metadata: {
...metadata,
timestamp: Date.now(),
traceId: `${this.constructor.name}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`,
},
};
}
/**
* Helper methods for common operations
*/
/**
* Get a MongoDB collection with type safety
*/
protected collection<T extends {} = any>(name: string): Collection<T> {
if (!this.mongodb) {
throw new Error('MongoDB service is not available');
}
return this.mongodb.collection(name);
}
/**
* Create a sub-namespaced cache for specific operations
* Example: handler 'webshare' creates namespace 'webshare:api' -> keys will be 'cache:data-ingestion:webshare:api:*'
*/
protected createNamespacedCache(subNamespace: string) {
return createNamespacedCache(this.cache || null, `${this.handlerName}:${subNamespace}`);
}
/**
* Set cache with handler-prefixed key
*/
protected async cacheSet(key: string, value: any, ttl?: number): Promise<void> {
if (!this.cache) {
return;
}
// Don't add 'cache:' prefix since the cache already has its own prefix
return this.cache.set(`${this.handlerName}:${key}`, value, ttl);
}
/**
* Get cache with handler-prefixed key
*/
protected async cacheGet<T = any>(key: string): Promise<T | null> {
if (!this.cache) {
return null;
}
// Don't add 'cache:' prefix since the cache already has its own prefix
return this.cache.get(`${this.handlerName}:${key}`);
}
/**
* Delete cache with handler-prefixed key
*/
protected async cacheDel(key: string): Promise<void> {
if (!this.cache) {
return;
}
// Don't add 'cache:' prefix since the cache already has its own prefix
return this.cache.del(`${this.handlerName}:${key}`);
}
/**
* Set global cache with key
*/
protected async globalCacheSet(key: string, value: any, ttl?: number): Promise<void> {
if (!this.globalCache) {
return;
}
return this.globalCache.set(key, value, ttl);
}
/**
* Get global cache with key
*/
protected async globalCacheGet<T = any>(key: string): Promise<T | null> {
if (!this.globalCache) {
return null;
}
return this.globalCache.get(key);
}
/**
* Delete global cache with key
*/
protected async globalCacheDel(key: string): Promise<void> {
if (!this.globalCache) {
return;
}
return this.globalCache.del(key);
}
/**
* Schedule operation with delay in seconds
*/
protected async scheduleIn(
operation: string,
payload: unknown,
delaySeconds: number,
additionalOptions?: Omit<JobScheduleOptions, 'delay'>
): Promise<void> {
return this.scheduleOperation(operation, payload, {
delay: delaySeconds * 1000,
...additionalOptions,
});
}
/**
* Log with handler context
*/
protected log(level: 'info' | 'warn' | 'error' | 'debug', message: string, meta?: any): void {
this.logger[level](message, { handler: this.handlerName, ...meta });
}
/**
* HTTP client helper using fetch from utils
*/
protected get http() {
return {
get: (url: string, options?: any) =>
fetch(url, { ...options, method: 'GET', logger: this.logger }),
post: (url: string, data?: any, options?: any) =>
fetch(url, {
...options,
method: 'POST',
body: JSON.stringify(data),
headers: { 'Content-Type': 'application/json', ...options?.headers },
logger: this.logger,
}),
put: (url: string, data?: any, options?: any) =>
fetch(url, {
...options,
method: 'PUT',
body: JSON.stringify(data),
headers: { 'Content-Type': 'application/json', ...options?.headers },
logger: this.logger,
}),
delete: (url: string, options?: any) =>
fetch(url, { ...options, method: 'DELETE', logger: this.logger }),
};
}
/**
* Check if a service is available
*/
protected hasService(name: keyof IServiceContainer): boolean {
const service = this[name as keyof this];
return service !== null;
}
/**
* Event methods - commented for future
*/
// protected async publishEvent(eventName: string, payload: unknown): Promise<void> {
// const eventBus = await this.container.resolveAsync('eventBus');
// await eventBus.publish(eventName, payload);
// }
/**
* Create handler configuration with job handlers
* This is used by the scanner to create the actual handler configuration
*/
createHandlerConfig(): HandlerConfigWithSchedule {
const metadata = (this.constructor as typeof BaseHandler).extractMetadata();
if (!metadata) {
throw new Error('Handler metadata not found');
}
const operationHandlers: Record<string, JobHandler> = {};
for (const opName of metadata.operations) {
operationHandlers[opName] = createJobHandler(async (payload: any) => {
const context: ExecutionContext = {
type: 'queue',
metadata: { source: 'queue', timestamp: Date.now() },
};
return await this.execute(opName, payload, context);
});
}
return {
name: metadata.name,
operations: operationHandlers,
scheduledJobs: metadata.scheduledJobs,
};
}
/**
* Extract handler metadata from decorators
* This returns metadata only - actual handler instances are created by the scanner
*/
static extractMetadata(): HandlerMetadata | null {
const constructor = this as any;
const handlerName = constructor.__handlerName;
if (!handlerName) {
return null;
}
const operations = constructor.__operations || [];
const schedules = constructor.__schedules || [];
// Create scheduled jobs from decorator metadata
const scheduledJobs = schedules.map((schedule: any) => {
// Find the operation name from the method name
const operation = operations.find((op: any) => op.method === schedule.operation);
return {
type: `${handlerName}-${schedule.operation}`,
operation: operation?.name || schedule.operation,
cronPattern: schedule.cronPattern,
priority: schedule.priority || 5,
immediately: schedule.immediately || false,
description: schedule.description || `${handlerName} ${schedule.operation}`,
payload: schedule.payload, // Include payload from decorator
batch: schedule.batch, // Include batch config from decorator
};
});
return {
name: handlerName,
operations: operations.map((op: any) => op.name),
scheduledJobs,
description: constructor.__description,
};
}
/**
* Override this method to provide payloads for scheduled jobs
* @param operation The operation name that needs a payload
* @returns The payload for the scheduled job, or undefined
*/
protected getScheduledJobPayload?(operation: string): any;
/**
* Lifecycle hooks - can be overridden by subclasses
*/
async onInit?(): Promise<void>;
async onStart?(): Promise<void>;
async onStop?(): Promise<void>;
async onDispose?(): Promise<void>;
}
/**
* Specialized handler for operations that have scheduled jobs
*/
export abstract class ScheduledHandler extends BaseHandler {
/**
* Get scheduled job configurations for this handler
* Override in subclasses to define schedules
*/
getScheduledJobs?(): Array<{
operation: string;
cronPattern: string;
priority?: number;
immediately?: boolean;
description?: string;
}>;
}