This commit is contained in:
Boki 2025-06-22 17:55:51 -04:00
parent d858222af7
commit 7d9044ab29
202 changed files with 10755 additions and 10972 deletions

View file

@ -1,297 +1,307 @@
import { getLogger } from '@stock-bot/logger';
import { createJobHandler, handlerRegistry, type HandlerConfigWithSchedule } from '@stock-bot/types';
import { fetch } from '@stock-bot/utils';
import type { Collection } from 'mongodb';
import type { IServiceContainer } from '../types/service-container';
import type { ExecutionContext, IHandler } from '../types/types';
/**
* Abstract base class for all handlers with improved DI
* Provides common functionality and structure for queue/event operations
*/
export abstract class BaseHandler implements IHandler {
// Direct service properties - flattened for cleaner access
readonly logger;
readonly cache;
readonly queue;
readonly proxy;
readonly browser;
readonly mongodb;
readonly postgres;
readonly questdb;
private handlerName: string;
constructor(services: IServiceContainer, handlerName?: string) {
// Flatten all services onto the handler instance
this.logger = getLogger(this.constructor.name);
this.cache = services.cache;
this.queue = services.queue;
this.proxy = services.proxy;
this.browser = services.browser;
this.mongodb = services.mongodb;
this.postgres = services.postgres;
this.questdb = services.questdb;
// Read handler name from decorator first, then fallback to parameter or class name
const constructor = this.constructor as any;
this.handlerName = constructor.__handlerName || handlerName || this.constructor.name.toLowerCase();
}
/**
* Main execution method - automatically routes to decorated methods
* Works with queue (events commented for future)
*/
async execute(operation: string, input: unknown, context: ExecutionContext): Promise<unknown> {
const constructor = this.constructor as any;
const operations = constructor.__operations || [];
// Debug logging
this.logger.debug('Handler execute called', {
handler: this.handlerName,
operation,
availableOperations: operations.map((op: any) => ({ name: op.name, method: op.method }))
});
// Find the operation metadata
const operationMeta = operations.find((op: any) => op.name === operation);
if (!operationMeta) {
this.logger.error('Operation not found', {
requestedOperation: operation,
availableOperations: operations.map((op: any) => op.name)
});
throw new Error(`Unknown operation: ${operation}`);
}
// Get the method from the instance and call it
const method = (this as any)[operationMeta.method];
if (typeof method !== 'function') {
throw new Error(`Operation method '${operationMeta.method}' not found on handler`);
}
this.logger.debug('Executing operation method', {
operation,
method: operationMeta.method
});
return await method.call(this, input, context);
}
async scheduleOperation(operation: string, payload: unknown, delay?: number): Promise<void> {
if (!this.queue) {
throw new Error('Queue service is not available');
}
const queue = this.queue.getQueue(this.handlerName);
const jobData = {
handler: this.handlerName,
operation,
payload
};
await queue.add(operation, jobData, { delay });
}
/**
* Create execution context for operations
*/
protected createExecutionContext(type: 'http' | 'queue' | 'scheduled', metadata: Record<string, any> = {}): ExecutionContext {
return {
type,
metadata: {
...metadata,
timestamp: Date.now(),
traceId: `${this.constructor.name}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`
}
};
}
/**
* Helper methods for common operations
*/
/**
* Get a MongoDB collection with type safety
*/
protected collection<T extends {} = any>(name: string): Collection<T> {
if (!this.mongodb) {
throw new Error('MongoDB service is not available');
}
return this.mongodb.collection(name);
}
/**
* Set cache with handler-prefixed key
*/
protected async cacheSet(key: string, value: any, ttl?: number): Promise<void> {
if (!this.cache) {
return;
}
return this.cache.set(`${this.handlerName}:${key}`, value, ttl);
}
/**
* Get cache with handler-prefixed key
*/
protected async cacheGet<T = any>(key: string): Promise<T | null> {
if (!this.cache) {
return null;
}
return this.cache.get(`${this.handlerName}:${key}`);
}
/**
* Delete cache with handler-prefixed key
*/
protected async cacheDel(key: string): Promise<void> {
if (!this.cache) {
return;
}
return this.cache.del(`${this.handlerName}:${key}`);
}
/**
* Schedule operation with delay in seconds
*/
protected async scheduleIn(operation: string, payload: unknown, delaySeconds: number): Promise<void> {
return this.scheduleOperation(operation, payload, delaySeconds * 1000);
}
/**
* Log with handler context
*/
protected log(level: 'info' | 'warn' | 'error' | 'debug', message: string, meta?: any): void {
this.logger[level](message, { handler: this.handlerName, ...meta });
}
/**
* HTTP client helper using fetch from utils
*/
protected get http() {
return {
get: (url: string, options?: any) =>
fetch(url, { ...options, method: 'GET', logger: this.logger }),
post: (url: string, data?: any, options?: any) =>
fetch(url, {
...options,
method: 'POST',
body: JSON.stringify(data),
headers: { 'Content-Type': 'application/json', ...options?.headers },
logger: this.logger
}),
put: (url: string, data?: any, options?: any) =>
fetch(url, {
...options,
method: 'PUT',
body: JSON.stringify(data),
headers: { 'Content-Type': 'application/json', ...options?.headers },
logger: this.logger
}),
delete: (url: string, options?: any) =>
fetch(url, { ...options, method: 'DELETE', logger: this.logger }),
};
}
/**
* Check if a service is available
*/
protected hasService(name: keyof IServiceContainer): boolean {
const service = this[name as keyof this];
return service !== null;
}
/**
* Event methods - commented for future
*/
// protected async publishEvent(eventName: string, payload: unknown): Promise<void> {
// const eventBus = await this.container.resolveAsync('eventBus');
// await eventBus.publish(eventName, payload);
// }
/**
* Register this handler using decorator metadata
* Automatically reads @Handler, @Operation, and @QueueSchedule decorators
*/
register(): void {
const constructor = this.constructor as any;
const handlerName = constructor.__handlerName || this.handlerName;
const operations = constructor.__operations || [];
const schedules = constructor.__schedules || [];
// Create operation handlers from decorator metadata
const operationHandlers: Record<string, any> = {};
for (const op of operations) {
operationHandlers[op.name] = createJobHandler(async (payload) => {
const context: ExecutionContext = {
type: 'queue',
metadata: { source: 'queue', timestamp: Date.now() }
};
return await this.execute(op.name, payload, context);
});
}
// Create scheduled jobs from decorator metadata
const scheduledJobs = schedules.map((schedule: any) => {
// Find the operation name from the method name
const operation = operations.find((op: any) => op.method === schedule.operation);
return {
type: `${handlerName}-${schedule.operation}`,
operation: operation?.name || schedule.operation,
cronPattern: schedule.cronPattern,
priority: schedule.priority || 5,
immediately: schedule.immediately || false,
description: schedule.description || `${handlerName} ${schedule.operation}`,
payload: this.getScheduledJobPayload?.(schedule.operation),
};
});
const config: HandlerConfigWithSchedule = {
name: handlerName,
operations: operationHandlers,
scheduledJobs,
};
handlerRegistry.registerWithSchedule(config);
this.logger.info('Handler registered using decorator metadata', {
handlerName,
operations: operations.map((op: any) => ({ name: op.name, method: op.method })),
scheduledJobs: scheduledJobs.map((job: any) => ({
operation: job.operation,
cronPattern: job.cronPattern,
immediately: job.immediately
}))
});
}
/**
* Override this method to provide payloads for scheduled jobs
* @param operation The operation name that needs a payload
* @returns The payload for the scheduled job, or undefined
*/
protected getScheduledJobPayload?(operation: string): any;
/**
* Lifecycle hooks - can be overridden by subclasses
*/
async onInit?(): Promise<void>;
async onStart?(): Promise<void>;
async onStop?(): Promise<void>;
async onDispose?(): Promise<void>;
}
/**
* Specialized handler for operations that have scheduled jobs
*/
export abstract class ScheduledHandler extends BaseHandler {
/**
* Get scheduled job configurations for this handler
* Override in subclasses to define schedules
*/
getScheduledJobs?(): Array<{
operation: string;
cronPattern: string;
priority?: number;
immediately?: boolean;
description?: string;
}>;
}
import type { Collection } from 'mongodb';
import { getLogger } from '@stock-bot/logger';
import {
createJobHandler,
handlerRegistry,
type HandlerConfigWithSchedule,
} from '@stock-bot/types';
import { fetch } from '@stock-bot/utils';
import type { IServiceContainer } from '../types/service-container';
import type { ExecutionContext, IHandler } from '../types/types';
/**
* Abstract base class for all handlers with improved DI
* Provides common functionality and structure for queue/event operations
*/
export abstract class BaseHandler implements IHandler {
// Direct service properties - flattened for cleaner access
readonly logger;
readonly cache;
readonly queue;
readonly proxy;
readonly browser;
readonly mongodb;
readonly postgres;
readonly questdb;
private handlerName: string;
constructor(services: IServiceContainer, handlerName?: string) {
// Flatten all services onto the handler instance
this.logger = getLogger(this.constructor.name);
this.cache = services.cache;
this.queue = services.queue;
this.proxy = services.proxy;
this.browser = services.browser;
this.mongodb = services.mongodb;
this.postgres = services.postgres;
this.questdb = services.questdb;
// Read handler name from decorator first, then fallback to parameter or class name
const constructor = this.constructor as any;
this.handlerName =
constructor.__handlerName || handlerName || this.constructor.name.toLowerCase();
}
/**
* Main execution method - automatically routes to decorated methods
* Works with queue (events commented for future)
*/
async execute(operation: string, input: unknown, context: ExecutionContext): Promise<unknown> {
const constructor = this.constructor as any;
const operations = constructor.__operations || [];
// Debug logging
this.logger.debug('Handler execute called', {
handler: this.handlerName,
operation,
availableOperations: operations.map((op: any) => ({ name: op.name, method: op.method })),
});
// Find the operation metadata
const operationMeta = operations.find((op: any) => op.name === operation);
if (!operationMeta) {
this.logger.error('Operation not found', {
requestedOperation: operation,
availableOperations: operations.map((op: any) => op.name),
});
throw new Error(`Unknown operation: ${operation}`);
}
// Get the method from the instance and call it
const method = (this as any)[operationMeta.method];
if (typeof method !== 'function') {
throw new Error(`Operation method '${operationMeta.method}' not found on handler`);
}
this.logger.debug('Executing operation method', {
operation,
method: operationMeta.method,
});
return await method.call(this, input, context);
}
async scheduleOperation(operation: string, payload: unknown, delay?: number): Promise<void> {
if (!this.queue) {
throw new Error('Queue service is not available');
}
const queue = this.queue.getQueue(this.handlerName);
const jobData = {
handler: this.handlerName,
operation,
payload,
};
await queue.add(operation, jobData, { delay });
}
/**
* Create execution context for operations
*/
protected createExecutionContext(
type: 'http' | 'queue' | 'scheduled',
metadata: Record<string, any> = {}
): ExecutionContext {
return {
type,
metadata: {
...metadata,
timestamp: Date.now(),
traceId: `${this.constructor.name}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`,
},
};
}
/**
* Helper methods for common operations
*/
/**
* Get a MongoDB collection with type safety
*/
protected collection<T extends {} = any>(name: string): Collection<T> {
if (!this.mongodb) {
throw new Error('MongoDB service is not available');
}
return this.mongodb.collection(name);
}
/**
* Set cache with handler-prefixed key
*/
protected async cacheSet(key: string, value: any, ttl?: number): Promise<void> {
if (!this.cache) {
return;
}
return this.cache.set(`${this.handlerName}:${key}`, value, ttl);
}
/**
* Get cache with handler-prefixed key
*/
protected async cacheGet<T = any>(key: string): Promise<T | null> {
if (!this.cache) {
return null;
}
return this.cache.get(`${this.handlerName}:${key}`);
}
/**
* Delete cache with handler-prefixed key
*/
protected async cacheDel(key: string): Promise<void> {
if (!this.cache) {
return;
}
return this.cache.del(`${this.handlerName}:${key}`);
}
/**
* Schedule operation with delay in seconds
*/
protected async scheduleIn(
operation: string,
payload: unknown,
delaySeconds: number
): Promise<void> {
return this.scheduleOperation(operation, payload, delaySeconds * 1000);
}
/**
* Log with handler context
*/
protected log(level: 'info' | 'warn' | 'error' | 'debug', message: string, meta?: any): void {
this.logger[level](message, { handler: this.handlerName, ...meta });
}
/**
* HTTP client helper using fetch from utils
*/
protected get http() {
return {
get: (url: string, options?: any) =>
fetch(url, { ...options, method: 'GET', logger: this.logger }),
post: (url: string, data?: any, options?: any) =>
fetch(url, {
...options,
method: 'POST',
body: JSON.stringify(data),
headers: { 'Content-Type': 'application/json', ...options?.headers },
logger: this.logger,
}),
put: (url: string, data?: any, options?: any) =>
fetch(url, {
...options,
method: 'PUT',
body: JSON.stringify(data),
headers: { 'Content-Type': 'application/json', ...options?.headers },
logger: this.logger,
}),
delete: (url: string, options?: any) =>
fetch(url, { ...options, method: 'DELETE', logger: this.logger }),
};
}
/**
* Check if a service is available
*/
protected hasService(name: keyof IServiceContainer): boolean {
const service = this[name as keyof this];
return service !== null;
}
/**
* Event methods - commented for future
*/
// protected async publishEvent(eventName: string, payload: unknown): Promise<void> {
// const eventBus = await this.container.resolveAsync('eventBus');
// await eventBus.publish(eventName, payload);
// }
/**
* Register this handler using decorator metadata
* Automatically reads @Handler, @Operation, and @QueueSchedule decorators
*/
register(): void {
const constructor = this.constructor as any;
const handlerName = constructor.__handlerName || this.handlerName;
const operations = constructor.__operations || [];
const schedules = constructor.__schedules || [];
// Create operation handlers from decorator metadata
const operationHandlers: Record<string, any> = {};
for (const op of operations) {
operationHandlers[op.name] = createJobHandler(async payload => {
const context: ExecutionContext = {
type: 'queue',
metadata: { source: 'queue', timestamp: Date.now() },
};
return await this.execute(op.name, payload, context);
});
}
// Create scheduled jobs from decorator metadata
const scheduledJobs = schedules.map((schedule: any) => {
// Find the operation name from the method name
const operation = operations.find((op: any) => op.method === schedule.operation);
return {
type: `${handlerName}-${schedule.operation}`,
operation: operation?.name || schedule.operation,
cronPattern: schedule.cronPattern,
priority: schedule.priority || 5,
immediately: schedule.immediately || false,
description: schedule.description || `${handlerName} ${schedule.operation}`,
payload: this.getScheduledJobPayload?.(schedule.operation),
};
});
const config: HandlerConfigWithSchedule = {
name: handlerName,
operations: operationHandlers,
scheduledJobs,
};
handlerRegistry.registerWithSchedule(config);
this.logger.info('Handler registered using decorator metadata', {
handlerName,
operations: operations.map((op: any) => ({ name: op.name, method: op.method })),
scheduledJobs: scheduledJobs.map((job: any) => ({
operation: job.operation,
cronPattern: job.cronPattern,
immediately: job.immediately,
})),
});
}
/**
* Override this method to provide payloads for scheduled jobs
* @param operation The operation name that needs a payload
* @returns The payload for the scheduled job, or undefined
*/
protected getScheduledJobPayload?(operation: string): any;
/**
* Lifecycle hooks - can be overridden by subclasses
*/
async onInit?(): Promise<void>;
async onStart?(): Promise<void>;
async onStop?(): Promise<void>;
async onDispose?(): Promise<void>;
}
/**
* Specialized handler for operations that have scheduled jobs
*/
export abstract class ScheduledHandler extends BaseHandler {
/**
* Get scheduled job configurations for this handler
* Override in subclasses to define schedules
*/
getScheduledJobs?(): Array<{
operation: string;
cronPattern: string;
priority?: number;
immediately?: boolean;
description?: string;
}>;
}