moved handlers out of queue will be reused with event-bus

This commit is contained in:
Boki 2025-06-21 18:32:55 -04:00
parent 36cb84b343
commit dc4bd7b18e
16 changed files with 145 additions and 295 deletions

View file

@ -15,6 +15,7 @@
"@stock-bot/cache": "*",
"@stock-bot/config": "*",
"@stock-bot/di": "*",
"@stock-bot/handlers": "*",
"@stock-bot/logger": "*",
"@stock-bot/mongodb": "*",
"@stock-bot/postgres": "*",

View file

@ -1,48 +1,110 @@
import { getLogger } from '@stock-bot/logger';
import {
createJobHandler,
BaseHandler,
ScheduledHandler,
Handler,
Operation,
QueueSchedule,
handlerRegistry,
createJobHandler,
type ExecutionContext,
type HandlerConfigWithSchedule
} from '@stock-bot/queue';
import type { ServiceContainer } from '@stock-bot/connection-factory';
} from '@stock-bot/handlers';
import type { ServiceContainer } from '@stock-bot/di';
import type { SymbolSpiderJob } from './shared/types';
const handlerLogger = getLogger('qm-handler');
@Handler('qm')
export class QMHandler extends ScheduledHandler {
constructor(container: ServiceContainer) {
super(container);
}
async execute(operation: string, input: unknown, context: ExecutionContext): Promise<unknown> {
switch (operation) {
case 'create-sessions':
return await this.createSessions(input, context);
case 'search-symbols':
return await this.searchSymbols(input, context);
case 'spider-symbol-search':
return await this.spiderSymbolSearch(input as SymbolSpiderJob, context);
default:
throw new Error(`Unknown operation: ${operation}`);
}
}
@Operation('create-sessions')
@QueueSchedule('0 */15 * * *', {
priority: 7,
immediately: true,
description: 'Create and maintain QM sessions'
})
async createSessions(input: unknown, context: ExecutionContext): Promise<unknown> {
const { createSessions } = await import('./operations/session.operations');
await createSessions(context.serviceContainer);
return { success: true, message: 'QM sessions created successfully' };
}
@Operation('search-symbols')
async searchSymbols(input: unknown, context: ExecutionContext): Promise<unknown> {
const { fetchSymbols } = await import('./operations/symbols.operations');
const symbols = await fetchSymbols(context.serviceContainer);
if (symbols && symbols.length > 0) {
return {
success: true,
message: 'QM symbol search completed successfully',
count: symbols.length,
symbols: symbols.slice(0, 10), // Return first 10 symbols as sample
};
} else {
return {
success: false,
message: 'No symbols found',
count: 0,
};
}
}
@Operation('spider-symbol-search')
@QueueSchedule('0 0 * * 0', {
priority: 10,
immediately: true,
description: 'Comprehensive symbol search using QM API'
})
async spiderSymbolSearch(payload: SymbolSpiderJob, context: ExecutionContext): Promise<unknown> {
const { spiderSymbolSearch } = await import('./operations/spider.operations');
return await spiderSymbolSearch(payload, context.serviceContainer);
}
}
// Initialize and register the QM provider
export function initializeQMProvider(container: ServiceContainer) {
handlerLogger.debug('Registering QM provider with scheduled jobs...');
// Create handler instance
const handler = new QMHandler(container);
// Register with legacy format for now
const qmProviderConfig: HandlerConfigWithSchedule = {
name: 'qm',
operations: {
'create-sessions': createJobHandler(async () => {
const { createSessions } = await import('./operations/session.operations');
await createSessions(container);
return { success: true, message: 'QM sessions created successfully' };
'create-sessions': createJobHandler(async (payload) => {
return await handler.execute('create-sessions', payload, {
type: 'queue',
serviceContainer: container,
metadata: { source: 'queue', timestamp: Date.now() }
});
}),
'search-symbols': createJobHandler(async () => {
const { fetchSymbols } = await import('./operations/symbols.operations');
const symbols = await fetchSymbols(container);
if (symbols && symbols.length > 0) {
return {
success: true,
message: 'QM symbol search completed successfully',
count: symbols.length,
symbols: symbols.slice(0, 10), // Return first 10 symbols as sample
};
} else {
return {
success: false,
message: 'No symbols found',
count: 0,
};
}
'search-symbols': createJobHandler(async (payload) => {
return await handler.execute('search-symbols', payload, {
type: 'queue',
serviceContainer: container,
metadata: { source: 'queue', timestamp: Date.now() }
});
}),
'spider-symbol-search': createJobHandler(async (payload: SymbolSpiderJob) => {
const { spiderSymbolSearch } = await import('./operations/spider.operations');
return await spiderSymbolSearch(payload, container);
return await handler.execute('spider-symbol-search', payload, {
type: 'queue',
serviceContainer: container,
metadata: { source: 'queue', timestamp: Date.now() }
});
}),
},
@ -52,7 +114,7 @@ export function initializeQMProvider(container: ServiceContainer) {
operation: 'create-sessions',
cronPattern: '0 */15 * * *', // Every 15 minutes
priority: 7,
immediately: true, // Don't run on startup to avoid blocking
immediately: true,
description: 'Create and maintain QM sessions',
},
{
@ -66,12 +128,12 @@ export function initializeQMProvider(container: ServiceContainer) {
},
cronPattern: '0 0 * * 0', // Every Sunday at midnight
priority: 10,
immediately: true, // Don't run on startup - this is a heavy operation
immediately: true,
description: 'Comprehensive symbol search using QM API',
},
],
};
handlerRegistry.registerWithSchedule(qmProviderConfig);
handlerLogger.debug('QM provider registered successfully with scheduled jobs');
handler.logger.debug('QM provider registered successfully with scheduled jobs');
}

View file

@ -5,6 +5,7 @@
{ "path": "../../libs/core/config" },
{ "path": "../../libs/core/logger" },
{ "path": "../../libs/core/di" },
{ "path": "../../libs/core/handlers" },
{ "path": "../../libs/data/cache" },
{ "path": "../../libs/data/mongodb" },
{ "path": "../../libs/data/postgres" },

View file

@ -48,6 +48,7 @@
"@stock-bot/cache": "*",
"@stock-bot/config": "*",
"@stock-bot/di": "*",
"@stock-bot/handlers": "*",
"@stock-bot/logger": "*",
"@stock-bot/mongodb": "*",
"@stock-bot/postgres": "*",
@ -163,6 +164,20 @@
"@types/pg": "^8.10.7",
},
},
"libs/core/handlers": {
"name": "@stock-bot/handlers",
"version": "1.0.0",
"dependencies": {
"@stock-bot/config": "workspace:*",
"@stock-bot/di": "workspace:*",
"@stock-bot/logger": "workspace:*",
},
"devDependencies": {
"@types/node": "^20.11.0",
"bun-types": "^1.2.15",
"typescript": "^5.3.0",
},
},
"libs/core/logger": {
"name": "@stock-bot/logger",
"version": "1.0.0",
@ -309,6 +324,7 @@
"version": "1.0.0",
"dependencies": {
"@stock-bot/cache": "*",
"@stock-bot/handlers": "*",
"@stock-bot/logger": "*",
"@stock-bot/types": "*",
"bullmq": "^5.0.0",
@ -773,6 +789,8 @@
"@stock-bot/event-bus": ["@stock-bot/event-bus@workspace:libs/services/event-bus"],
"@stock-bot/handlers": ["@stock-bot/handlers@workspace:libs/core/handlers"],
"@stock-bot/http": ["@stock-bot/http@workspace:libs/services/http"],
"@stock-bot/logger": ["@stock-bot/logger@workspace:libs/core/logger"],
@ -2347,6 +2365,8 @@
"@stock-bot/event-bus/@types/node": ["@types/node@20.19.1", "", { "dependencies": { "undici-types": "~6.21.0" } }, "sha512-jJD50LtlD2dodAEO653i3YF04NWak6jN3ky+Ri3Em3mGR39/glWiboM/IePaRbgwSfqM1TpGXfAg8ohn/4dTgA=="],
"@stock-bot/handlers/@types/node": ["@types/node@20.19.1", "", { "dependencies": { "undici-types": "~6.21.0" } }, "sha512-jJD50LtlD2dodAEO653i3YF04NWak6jN3ky+Ri3Em3mGR39/glWiboM/IePaRbgwSfqM1TpGXfAg8ohn/4dTgA=="],
"@stock-bot/http/@types/node": ["@types/node@20.19.1", "", { "dependencies": { "undici-types": "~6.21.0" } }, "sha512-jJD50LtlD2dodAEO653i3YF04NWak6jN3ky+Ri3Em3mGR39/glWiboM/IePaRbgwSfqM1TpGXfAg8ohn/4dTgA=="],
"@stock-bot/http/@typescript-eslint/eslint-plugin": ["@typescript-eslint/eslint-plugin@6.21.0", "", { "dependencies": { "@eslint-community/regexpp": "^4.5.1", "@typescript-eslint/scope-manager": "6.21.0", "@typescript-eslint/type-utils": "6.21.0", "@typescript-eslint/utils": "6.21.0", "@typescript-eslint/visitor-keys": "6.21.0", "debug": "^4.3.4", "graphemer": "^1.4.0", "ignore": "^5.2.4", "natural-compare": "^1.4.0", "semver": "^7.5.4", "ts-api-utils": "^1.0.1" }, "peerDependencies": { "@typescript-eslint/parser": "^6.0.0 || ^6.0.0-alpha", "eslint": "^7.0.0 || ^8.0.0" } }, "sha512-oy9+hTPCUFpngkEZUSzbf9MxI65wbKFoQYsgPdILTfbUldp5ovUuphZVe4i30emU9M/kP+T64Di0mxl7dSw3MA=="],

View file

@ -11,8 +11,7 @@
},
"dependencies": {
"@stock-bot/config": "workspace:*",
"@stock-bot/logger": "workspace:*",
"@stock-bot/di": "workspace:*"
"@stock-bot/logger": "workspace:*"
},
"devDependencies": {
"@types/node": "^20.11.0",

View file

@ -1,4 +1,3 @@
import type { ServiceContainer } from '@stock-bot/di';
import { getLogger } from '@stock-bot/logger';
import type { IHandler, ExecutionContext } from '../types/types';
@ -9,7 +8,7 @@ import type { IHandler, ExecutionContext } from '../types/types';
export abstract class BaseHandler implements IHandler {
protected readonly logger;
constructor(protected readonly container: ServiceContainer) {
constructor(protected readonly container: any) {
this.logger = getLogger(this.constructor.name);
}
@ -23,7 +22,7 @@ export abstract class BaseHandler implements IHandler {
* Queue helper methods
*/
protected async scheduleOperation(operation: string, payload: unknown, delay?: number): Promise<void> {
const queue = await this.container.resolveAsync('queue');
const queue = await this.container.resolveAsync('queue') as any;
await queue.add(operation, payload, { delay });
}
@ -31,7 +30,7 @@ export abstract class BaseHandler implements IHandler {
* Get a service from the container
*/
protected async getService<T>(serviceName: string): Promise<T> {
return await this.container.resolveAsync<T>(serviceName);
return await this.container.resolveAsync(serviceName);
}
/**

View file

@ -18,7 +18,7 @@ export function Handler(name: string) {
* @param name Operation name
*/
export function Operation(name: string) {
return function (target: any, propertyName: string, descriptor: PropertyDescriptor) {
return function (target: any, propertyName: string, descriptor?: PropertyDescriptor) {
// Store operation metadata for future use
if (!target.constructor.__operations) {
target.constructor.__operations = [];
@ -44,7 +44,7 @@ export function QueueSchedule(
description?: string;
}
) {
return function (target: any, propertyName: string, descriptor: PropertyDescriptor) {
return function (target: any, propertyName: string, descriptor?: PropertyDescriptor) {
// Store schedule metadata for future use
if (!target.constructor.__schedules) {
target.constructor.__schedules = [];

View file

@ -1,9 +1,9 @@
import type { ServiceContainer } from '@stock-bot/di';
// import type { ServiceContainer } from '@stock-bot/di'; // Temporarily commented
// Simple execution context - mostly queue for now
export interface ExecutionContext {
type: 'queue'; // | 'event' - commented for future
serviceContainer: ServiceContainer;
serviceContainer: any; // ServiceContainer - temporarily any
metadata: {
source?: string;
jobId?: string;

View file

@ -8,7 +8,6 @@
"include": ["src/**/*"],
"references": [
{ "path": "../config" },
{ "path": "../logger" },
{ "path": "../di" }
{ "path": "../logger" }
]
}

View file

@ -14,6 +14,7 @@
"ioredis": "^5.3.0",
"rate-limiter-flexible": "^3.0.0",
"@stock-bot/cache": "*",
"@stock-bot/handlers": "*",
"@stock-bot/logger": "*",
"@stock-bot/types": "*"
},

View file

@ -1,191 +0,0 @@
import { getLogger } from '@stock-bot/logger';
import type { JobHandler, HandlerConfig, HandlerConfigWithSchedule, ScheduledJob } from './types';
const logger = getLogger('handler-registry');
class HandlerRegistry {
private handlers = new Map<string, HandlerConfig>();
private handlerSchedules = new Map<string, ScheduledJob[]>();
/**
* Register a handler with its operations (simple config)
*/
register(handlerName: string, config: HandlerConfig): void {
logger.info(`Registering handler: ${handlerName}`, {
operations: Object.keys(config),
});
this.handlers.set(handlerName, config);
}
/**
* Register a handler with operations and scheduled jobs (full config)
*/
registerWithSchedule(config: HandlerConfigWithSchedule): void {
logger.info(`Registering handler with schedule: ${config.name}`, {
operations: Object.keys(config.operations),
scheduledJobs: config.scheduledJobs?.length || 0,
});
this.handlers.set(config.name, config.operations);
if (config.scheduledJobs && config.scheduledJobs.length > 0) {
this.handlerSchedules.set(config.name, config.scheduledJobs);
}
}
/**
* Get a handler for a specific handler and operation
*/
getHandler(handler: string, operation: string): JobHandler | null {
const handlerConfig = this.handlers.get(handler);
if (!handlerConfig) {
logger.warn(`Handler not found: ${handler}`);
return null;
}
const jobHandler = handlerConfig[operation];
if (!jobHandler) {
logger.warn(`Operation not found: ${handler}:${operation}`, {
availableOperations: Object.keys(handlerConfig),
});
return null;
}
return jobHandler;
}
/**
* Get all scheduled jobs from all handlers
*/
getAllScheduledJobs(): Array<{ handler: string; job: ScheduledJob }> {
const allJobs: Array<{ handler: string; job: ScheduledJob }> = [];
for (const [handlerName, jobs] of this.handlerSchedules) {
for (const job of jobs) {
allJobs.push({
handler: handlerName,
job,
});
}
}
return allJobs;
}
/**
* Get scheduled jobs for a specific handler
*/
getScheduledJobs(handler: string): ScheduledJob[] {
return this.handlerSchedules.get(handler) || [];
}
/**
* Check if a handler has scheduled jobs
*/
hasScheduledJobs(handler: string): boolean {
return this.handlerSchedules.has(handler);
}
/**
* Get all registered handlers with their configurations
*/
getHandlerConfigs(): Array<{ name: string; operations: string[]; scheduledJobs: number }> {
return Array.from(this.handlers.keys()).map(name => ({
name,
operations: Object.keys(this.handlers.get(name) || {}),
scheduledJobs: this.handlerSchedules.get(name)?.length || 0,
}));
}
/**
* Get all handlers with their full configurations for queue manager registration
*/
getAllHandlers(): Map<string, { operations: HandlerConfig; scheduledJobs?: ScheduledJob[] }> {
const result = new Map<
string,
{ operations: HandlerConfig; scheduledJobs?: ScheduledJob[] }
>();
for (const [name, operations] of this.handlers) {
const scheduledJobs = this.handlerSchedules.get(name);
result.set(name, {
operations,
scheduledJobs,
});
}
return result;
}
/**
* Get all registered handlers
*/
getHandlers(): string[] {
return Array.from(this.handlers.keys());
}
/**
* Get operations for a specific handler
*/
getOperations(handler: string): string[] {
const handlerConfig = this.handlers.get(handler);
return handlerConfig ? Object.keys(handlerConfig) : [];
}
/**
* Check if a handler exists
*/
hasHandler(handler: string): boolean {
return this.handlers.has(handler);
}
/**
* Check if a handler has a specific operation
*/
hasOperation(handler: string, operation: string): boolean {
const handlerConfig = this.handlers.get(handler);
return handlerConfig ? operation in handlerConfig : false;
}
/**
* Remove a handler
*/
unregister(handler: string): boolean {
this.handlerSchedules.delete(handler);
return this.handlers.delete(handler);
}
/**
* Clear all handlers
*/
clear(): void {
this.handlers.clear();
this.handlerSchedules.clear();
}
/**
* Get registry statistics
*/
getStats(): { handlers: number; totalOperations: number; totalScheduledJobs: number } {
let totalOperations = 0;
let totalScheduledJobs = 0;
for (const config of this.handlers.values()) {
totalOperations += Object.keys(config).length;
}
for (const jobs of this.handlerSchedules.values()) {
totalScheduledJobs += jobs.length;
}
return {
handlers: this.handlers.size,
totalOperations,
totalScheduledJobs,
};
}
}
// Export singleton instance
export const handlerRegistry = new HandlerRegistry();

View file

@ -1,9 +1,11 @@
// Core exports
export { Queue, type QueueWorkerConfig } from './queue';
export { QueueManager } from './queue-manager';
export { handlerRegistry } from './handler-registry';
export { createJobHandler } from './types';
// Re-export handler registry from new location
export { handlerRegistry } from '@stock-bot/handlers';
// Batch processing
export { processBatchJob, processItems } from './batch-processor';
@ -37,9 +39,7 @@ export type {
JobHandler,
TypedJobHandler,
HandlerConfig,
TypedHandlerConfig,
HandlerConfigWithSchedule,
TypedHandlerConfigWithSchedule,
HandlerInitializer,
// Configuration types

View file

@ -1,6 +1,6 @@
import { Queue as BullQueue, QueueEvents, Worker, type Job } from 'bullmq';
import { getLogger } from '@stock-bot/logger';
import { handlerRegistry } from './handler-registry';
import { handlerRegistry } from '@stock-bot/handlers';
import type { JobData, JobOptions, QueueStats, RedisConfig } from './types';
import { getRedisConnection } from './utils';

View file

@ -1,3 +1,12 @@
// Re-export handler types from new location
export type {
JobHandler,
TypedJobHandler,
HandlerConfig,
HandlerConfigWithSchedule,
ScheduledJob,
} from '@stock-bot/handlers';
// Types for queue operations
export interface JobData<T = unknown> {
handler: string;
@ -101,60 +110,8 @@ export interface QueueConfig extends QueueManagerConfig {
enableMetrics?: boolean;
}
export interface JobHandler<TPayload = unknown, TResult = unknown> {
(payload: TPayload): Promise<TResult>;
}
// Type-safe wrapper for creating job handlers
export type TypedJobHandler<TPayload, TResult = unknown> = (payload: TPayload) => Promise<TResult>;
// Helper to create type-safe job handlers
export function createJobHandler<TPayload = unknown, TResult = unknown>(
handler: TypedJobHandler<TPayload, TResult>
): JobHandler<unknown, TResult> {
return async (payload: unknown): Promise<TResult> => {
return handler(payload as TPayload);
};
}
export interface ScheduledJob<T = unknown> {
type: string;
operation: string;
payload?: T;
cronPattern: string;
priority?: number;
description?: string;
immediately?: boolean;
delay?: number;
}
export interface HandlerConfig {
[operation: string]: JobHandler;
}
// Type-safe handler configuration
export type TypedHandlerConfig<T extends Record<string, JobHandler> = Record<string, JobHandler>> = {
[K in keyof T]: T[K];
};
export interface HandlerConfigWithSchedule {
name: string;
operations: Record<string, JobHandler>;
scheduledJobs?: ScheduledJob[];
// Rate limiting
rateLimit?: RateLimitConfig;
operationLimits?: Record<string, RateLimitConfig>;
}
// Type-safe version of HandlerConfigWithSchedule
export interface TypedHandlerConfigWithSchedule<T extends Record<string, JobHandler> = Record<string, JobHandler>> {
name: string;
operations: T;
scheduledJobs?: ScheduledJob[];
// Rate limiting
rateLimit?: RateLimitConfig;
operationLimits?: Record<string, RateLimitConfig>;
}
// Re-export createJobHandler from handlers library
export { createJobHandler } from '@stock-bot/handlers';
export interface BatchJobData {
payloadKey: string;

View file

@ -8,6 +8,7 @@
"include": ["src/**/*"],
"references": [
{ "path": "../../data/cache" },
{ "path": "../../core/handlers" },
{ "path": "../../core/logger" },
{ "path": "../../core/types" }
]

View file

@ -35,6 +35,7 @@ libs=(
"core/types" # Base types - no dependencies
"core/config" # Configuration - depends on types
"core/logger" # Logging utilities - depends on types
"core/handlers" # Handler infrastructure - depends on core libs
"utils" # Utilities - depends on types and config
# Data access libraries