403 lines
12 KiB
TypeScript
403 lines
12 KiB
TypeScript
import { createNamespacedCache } from '@stock-bot/cache';
|
|
import { getLogger } from '@stock-bot/logger';
|
|
import type {
|
|
ExecutionContext,
|
|
HandlerConfigWithSchedule,
|
|
HandlerMetadata,
|
|
IHandler,
|
|
IServiceContainer,
|
|
JobHandler,
|
|
ServiceTypes,
|
|
} from '@stock-bot/types';
|
|
import { fetch } from '@stock-bot/utils';
|
|
import type { Collection } from 'mongodb';
|
|
// Handler registry is now injected, not imported
|
|
import { createJobHandler } from '../utils/create-job-handler';
|
|
|
|
/**
|
|
* Job scheduling options
|
|
*/
|
|
export interface JobScheduleOptions {
|
|
jobId?: string; // Optional job ID for custom identification
|
|
delay?: number;
|
|
priority?: number;
|
|
attempts?: number;
|
|
removeOnComplete?: number;
|
|
removeOnFail?: number;
|
|
backoff?: {
|
|
type: 'exponential' | 'fixed';
|
|
delay: number;
|
|
};
|
|
repeat?: {
|
|
pattern?: string;
|
|
key?: string;
|
|
limit?: number;
|
|
every?: number;
|
|
immediately?: boolean;
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Abstract base class for all handlers with improved DI
|
|
* Provides common functionality and structure for queue/event operations
|
|
*/
|
|
export abstract class BaseHandler<TServices extends ServiceTypes = ServiceTypes> implements IHandler {
|
|
// Direct service properties - flattened for cleaner access with proper types
|
|
readonly logger: TServices['logger'];
|
|
readonly cache: TServices['cache'];
|
|
readonly globalCache: TServices['globalCache'];
|
|
readonly queueManager: TServices['queueManager'];
|
|
readonly queue!: TServices['queue']; // Specific queue for this handler - initialized if queueManager exists
|
|
readonly proxy: TServices['proxy'];
|
|
readonly browser: TServices['browser'];
|
|
readonly mongodb: TServices['mongodb'];
|
|
readonly postgres: TServices['postgres'];
|
|
readonly questdb: TServices['questdb'];
|
|
|
|
private handlerName: string;
|
|
|
|
constructor(services: TServices, handlerName?: string) {
|
|
// Read handler name from decorator first, then fallback to parameter or class name
|
|
const constructor = this.constructor as any;
|
|
this.handlerName =
|
|
constructor.__handlerName || handlerName || this.constructor.name.toLowerCase();
|
|
|
|
// Flatten all services onto the handler instance
|
|
this.logger = getLogger(this.constructor.name) as TServices['logger'];
|
|
this.cache = services.cache as TServices['cache'];
|
|
this.globalCache = services.globalCache as TServices['globalCache'];
|
|
this.queueManager = services.queueManager as TServices['queueManager'];
|
|
this.proxy = services.proxy as TServices['proxy'];
|
|
this.browser = services.browser as TServices['browser'];
|
|
this.mongodb = services.mongodb as TServices['mongodb'];
|
|
this.postgres = services.postgres as TServices['postgres'];
|
|
this.questdb = services.questdb as TServices['questdb'];
|
|
|
|
// Get the specific queue for this handler
|
|
if (this.queueManager) {
|
|
this.queue = this.queueManager.getQueue(this.handlerName) as TServices['queue'];
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Main execution method - automatically routes to decorated methods
|
|
* Works with queue (events commented for future)
|
|
*/
|
|
async execute(operation: string, input: unknown, context: ExecutionContext): Promise<unknown> {
|
|
const constructor = this.constructor as any;
|
|
const operations = constructor.__operations || [];
|
|
|
|
// Debug logging
|
|
this.logger.debug('Handler execute called', {
|
|
handler: this.handlerName,
|
|
operation,
|
|
availableOperations: operations.map((op: any) => ({ name: op.name, method: op.method })),
|
|
});
|
|
|
|
// Find the operation metadata
|
|
const operationMeta = operations.find((op: any) => op.name === operation);
|
|
if (!operationMeta) {
|
|
this.logger.error('Operation not found', {
|
|
requestedOperation: operation,
|
|
availableOperations: operations.map((op: any) => op.name),
|
|
});
|
|
throw new Error(`Unknown operation: ${operation}`);
|
|
}
|
|
|
|
// Get the method from the instance and call it
|
|
const method = (this as any)[operationMeta.method];
|
|
if (typeof method !== 'function') {
|
|
throw new Error(`Operation method '${operationMeta.method}' not found on handler`);
|
|
}
|
|
|
|
this.logger.debug('Executing operation method', {
|
|
operation,
|
|
method: operationMeta.method,
|
|
});
|
|
|
|
return await method.call(this, input, context);
|
|
}
|
|
|
|
async scheduleOperation(
|
|
operation: string,
|
|
payload: unknown,
|
|
options?: JobScheduleOptions
|
|
): Promise<void> {
|
|
if (!this.queue) {
|
|
throw new Error('Queue service is not available for this handler');
|
|
}
|
|
|
|
const jobData = {
|
|
handler: this.handlerName,
|
|
operation,
|
|
payload,
|
|
};
|
|
|
|
await this.queue.add(operation, jobData, options || {});
|
|
}
|
|
|
|
/**
|
|
* Create execution context for operations
|
|
*/
|
|
protected createExecutionContext(
|
|
type: 'http' | 'queue' | 'scheduled',
|
|
metadata: Record<string, any> = {}
|
|
): ExecutionContext {
|
|
return {
|
|
type,
|
|
metadata: {
|
|
...metadata,
|
|
timestamp: Date.now(),
|
|
traceId: `${this.constructor.name}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`,
|
|
},
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Helper methods for common operations
|
|
*/
|
|
|
|
/**
|
|
* Get a MongoDB collection with type safety
|
|
*/
|
|
protected collection<T extends {} = any>(name: string): Collection<T> {
|
|
if (!this.mongodb) {
|
|
throw new Error('MongoDB service is not available');
|
|
}
|
|
return this.mongodb.collection(name);
|
|
}
|
|
|
|
/**
|
|
* Create a sub-namespaced cache for specific operations
|
|
* Example: handler 'webshare' creates namespace 'webshare:api' -> keys will be 'cache:data-ingestion:webshare:api:*'
|
|
*/
|
|
protected createNamespacedCache(subNamespace: string) {
|
|
return createNamespacedCache(this.cache || null, `${this.handlerName}:${subNamespace}`);
|
|
}
|
|
|
|
/**
|
|
* Set cache with handler-prefixed key
|
|
*/
|
|
protected async cacheSet(key: string, value: any, ttl?: number): Promise<void> {
|
|
if (!this.cache) {
|
|
return;
|
|
}
|
|
// Don't add 'cache:' prefix since the cache already has its own prefix
|
|
return this.cache.set(`${this.handlerName}:${key}`, value, ttl);
|
|
}
|
|
|
|
/**
|
|
* Get cache with handler-prefixed key
|
|
*/
|
|
protected async cacheGet<T = any>(key: string): Promise<T | null> {
|
|
if (!this.cache) {
|
|
return null;
|
|
}
|
|
// Don't add 'cache:' prefix since the cache already has its own prefix
|
|
return this.cache.get(`${this.handlerName}:${key}`);
|
|
}
|
|
|
|
/**
|
|
* Delete cache with handler-prefixed key
|
|
*/
|
|
protected async cacheDel(key: string): Promise<void> {
|
|
if (!this.cache) {
|
|
return;
|
|
}
|
|
// Don't add 'cache:' prefix since the cache already has its own prefix
|
|
return this.cache.del(`${this.handlerName}:${key}`);
|
|
}
|
|
|
|
/**
|
|
* Set global cache with key
|
|
*/
|
|
protected async globalCacheSet(key: string, value: any, ttl?: number): Promise<void> {
|
|
if (!this.globalCache) {
|
|
return;
|
|
}
|
|
return this.globalCache.set(key, value, ttl);
|
|
}
|
|
|
|
/**
|
|
* Get global cache with key
|
|
*/
|
|
protected async globalCacheGet<T = any>(key: string): Promise<T | null> {
|
|
if (!this.globalCache) {
|
|
return null;
|
|
}
|
|
return this.globalCache.get(key);
|
|
}
|
|
|
|
/**
|
|
* Delete global cache with key
|
|
*/
|
|
protected async globalCacheDel(key: string): Promise<void> {
|
|
if (!this.globalCache) {
|
|
return;
|
|
}
|
|
return this.globalCache.del(key);
|
|
}
|
|
/**
|
|
* Schedule operation with delay in seconds
|
|
*/
|
|
protected async scheduleIn(
|
|
operation: string,
|
|
payload: unknown,
|
|
delaySeconds: number,
|
|
additionalOptions?: Omit<JobScheduleOptions, 'delay'>
|
|
): Promise<void> {
|
|
return this.scheduleOperation(operation, payload, {
|
|
delay: delaySeconds * 1000,
|
|
...additionalOptions,
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Log with handler context
|
|
*/
|
|
protected log(level: 'info' | 'warn' | 'error' | 'debug', message: string, meta?: any): void {
|
|
this.logger[level](message, { handler: this.handlerName, ...meta });
|
|
}
|
|
|
|
/**
|
|
* HTTP client helper using fetch from utils
|
|
*/
|
|
protected get http() {
|
|
return {
|
|
get: (url: string, options?: any) =>
|
|
fetch(url, { ...options, method: 'GET', logger: this.logger }),
|
|
post: (url: string, data?: any, options?: any) =>
|
|
fetch(url, {
|
|
...options,
|
|
method: 'POST',
|
|
body: JSON.stringify(data),
|
|
headers: { 'Content-Type': 'application/json', ...options?.headers },
|
|
logger: this.logger,
|
|
}),
|
|
put: (url: string, data?: any, options?: any) =>
|
|
fetch(url, {
|
|
...options,
|
|
method: 'PUT',
|
|
body: JSON.stringify(data),
|
|
headers: { 'Content-Type': 'application/json', ...options?.headers },
|
|
logger: this.logger,
|
|
}),
|
|
delete: (url: string, options?: any) =>
|
|
fetch(url, { ...options, method: 'DELETE', logger: this.logger }),
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Check if a service is available
|
|
*/
|
|
protected hasService(name: keyof IServiceContainer): boolean {
|
|
const service = this[name as keyof this];
|
|
return service !== null;
|
|
}
|
|
|
|
/**
|
|
* Event methods - commented for future
|
|
*/
|
|
// protected async publishEvent(eventName: string, payload: unknown): Promise<void> {
|
|
// const eventBus = await this.container.resolveAsync('eventBus');
|
|
// await eventBus.publish(eventName, payload);
|
|
// }
|
|
|
|
/**
|
|
* Create handler configuration with job handlers
|
|
* This is used by the scanner to create the actual handler configuration
|
|
*/
|
|
createHandlerConfig(): HandlerConfigWithSchedule {
|
|
const metadata = (this.constructor as typeof BaseHandler).extractMetadata();
|
|
if (!metadata) {
|
|
throw new Error('Handler metadata not found');
|
|
}
|
|
|
|
const operationHandlers: Record<string, JobHandler> = {};
|
|
for (const opName of metadata.operations) {
|
|
operationHandlers[opName] = createJobHandler(async (payload: any) => {
|
|
const context: ExecutionContext = {
|
|
type: 'queue',
|
|
metadata: { source: 'queue', timestamp: Date.now() },
|
|
};
|
|
return await this.execute(opName, payload, context);
|
|
});
|
|
}
|
|
|
|
return {
|
|
name: metadata.name,
|
|
operations: operationHandlers,
|
|
scheduledJobs: metadata.scheduledJobs,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Extract handler metadata from decorators
|
|
* This returns metadata only - actual handler instances are created by the scanner
|
|
*/
|
|
static extractMetadata(): HandlerMetadata | null {
|
|
const constructor = this as any;
|
|
const handlerName = constructor.__handlerName;
|
|
if (!handlerName) {
|
|
return null;
|
|
}
|
|
|
|
const operations = constructor.__operations || [];
|
|
const schedules = constructor.__schedules || [];
|
|
|
|
// Create scheduled jobs from decorator metadata
|
|
const scheduledJobs = schedules.map((schedule: any) => {
|
|
// Find the operation name from the method name
|
|
const operation = operations.find((op: any) => op.method === schedule.operation);
|
|
return {
|
|
type: `${handlerName}-${schedule.operation}`,
|
|
operation: operation?.name || schedule.operation,
|
|
cronPattern: schedule.cronPattern,
|
|
priority: schedule.priority || 5,
|
|
immediately: schedule.immediately || false,
|
|
description: schedule.description || `${handlerName} ${schedule.operation}`,
|
|
payload: schedule.payload, // Include payload from decorator
|
|
batch: schedule.batch, // Include batch config from decorator
|
|
};
|
|
});
|
|
|
|
return {
|
|
name: handlerName,
|
|
operations: operations.map((op: any) => op.name),
|
|
scheduledJobs,
|
|
description: constructor.__description,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Override this method to provide payloads for scheduled jobs
|
|
* @param operation The operation name that needs a payload
|
|
* @returns The payload for the scheduled job, or undefined
|
|
*/
|
|
protected getScheduledJobPayload?(operation: string): any;
|
|
|
|
/**
|
|
* Lifecycle hooks - can be overridden by subclasses
|
|
*/
|
|
async onInit?(): Promise<void>;
|
|
async onStart?(): Promise<void>;
|
|
async onStop?(): Promise<void>;
|
|
async onDispose?(): Promise<void>;
|
|
}
|
|
|
|
/**
|
|
* Specialized handler for operations that have scheduled jobs
|
|
*/
|
|
export abstract class ScheduledHandler extends BaseHandler {
|
|
/**
|
|
* Get scheduled job configurations for this handler
|
|
* Override in subclasses to define schedules
|
|
*/
|
|
getScheduledJobs?(): Array<{
|
|
operation: string;
|
|
cronPattern: string;
|
|
priority?: number;
|
|
immediately?: boolean;
|
|
description?: string;
|
|
}>;
|
|
}
|