diff --git a/libs/queue/src/dlq-handler.ts b/libs/queue/src/dlq-handler.ts index f74404b..27dba28 100644 --- a/libs/queue/src/dlq-handler.ts +++ b/libs/queue/src/dlq-handler.ts @@ -1,17 +1,10 @@ import { Queue, type Job } from 'bullmq'; import { getLogger } from '@stock-bot/logger'; -import type { JobData } from './types'; +import type { JobData, DLQConfig, RedisConfig } from './types'; import { getRedisConnection } from './utils'; const logger = getLogger('dlq-handler'); -export interface DLQConfig { - maxRetries?: number; - retryDelay?: number; - alertThreshold?: number; - cleanupAge?: number; // hours -} - export class DeadLetterQueueHandler { private dlq: Queue; private config: Required; @@ -19,7 +12,7 @@ export class DeadLetterQueueHandler { constructor( private mainQueue: Queue, - private connection: any, + private connection: RedisConfig, config: DLQConfig = {} ) { this.config = { diff --git a/libs/queue/src/index.ts b/libs/queue/src/index.ts index 04eb8a7..0f66ada 100644 --- a/libs/queue/src/index.ts +++ b/libs/queue/src/index.ts @@ -1,21 +1,12 @@ -export * from './batch-processor'; -export * from './handler-registry'; -export * from './queue-manager'; -export * from './queue-factory'; -export * from './types'; -export * from './dlq-handler'; -export * from './queue-metrics'; -export * from './rate-limiter'; - -// Re-export commonly used functions -export { processBatchJob, processItems } from './batch-processor'; - +// Core exports +export { Queue, type QueueWorkerConfig } from './queue'; export { QueueManager } from './queue-manager'; -export { Queue, type QueueConfig } from './queue'; - export { handlerRegistry } from './handler-registry'; -// Re-export queue factory functions +// Batch processing +export { processBatchJob, processItems } from './batch-processor'; + +// Queue factory functions export { initializeQueueSystem, getQueue, @@ -25,17 +16,50 @@ export { shutdownAllQueues } from './queue-factory'; -// Re-export types for convenience +// DLQ handling +export { DLQHandler } from './dlq-handler'; + +// Metrics +export { QueueMetricsCollector } from './queue-metrics'; + +// Rate limiting +export { QueueRateLimiter } from './rate-limiter'; + +// Types export type { + // Core types + JobData, + JobOptions, + QueueOptions, + QueueStats, + GlobalStats, + + // Batch processing types BatchResult, - JobHandler, ProcessOptions, + BatchJobData, + + // Handler types + JobHandler, HandlerConfig, HandlerConfigWithSchedule, HandlerInitializer, + + // Configuration types + RedisConfig, QueueConfig, - ScheduledJob, + QueueManagerConfig, + + // Rate limiting types RateLimitConfig, RateLimitRule, + + // DLQ types DLQConfig, + DLQJobInfo, + + // Scheduled job types + ScheduledJob, + ScheduleConfig, } from './types'; + diff --git a/libs/queue/src/queue-factory.ts b/libs/queue/src/queue-factory.ts index 315d654..d1f7c55 100644 --- a/libs/queue/src/queue-factory.ts +++ b/libs/queue/src/queue-factory.ts @@ -2,7 +2,7 @@ import { getLogger } from '@stock-bot/logger'; import { QueueManager } from './queue-manager'; import { Queue } from './queue'; import { processItems } from './batch-processor'; -import type { ProcessOptions, BatchResult, QueueManagerConfig } from './types'; +import type { ProcessOptions, BatchResult, QueueManagerConfig, RedisConfig, JobOptions } from './types'; const logger = getLogger('queue-factory'); @@ -11,8 +11,8 @@ const logger = getLogger('queue-factory'); * This now uses the singleton QueueManager pattern */ export async function initializeQueueSystem(config: { - redis: any; - defaultJobOptions?: any; + redis: RedisConfig; + defaultJobOptions?: JobOptions; workers?: number; concurrency?: number; }): Promise { diff --git a/libs/queue/src/queue-manager.ts b/libs/queue/src/queue-manager.ts index 2fe33cb..88bcdfe 100644 --- a/libs/queue/src/queue-manager.ts +++ b/libs/queue/src/queue-manager.ts @@ -1,13 +1,14 @@ import { getLogger } from '@stock-bot/logger'; import { QueueRateLimiter } from './rate-limiter'; -import { Queue, type QueueConfig } from './queue'; +import { Queue, type QueueWorkerConfig } from './queue'; import { CacheProvider, createCache } from '@stock-bot/cache'; import type { QueueManagerConfig, QueueOptions, GlobalStats, QueueStats, - RateLimitRule + RateLimitRule, + RedisConfig } from './types'; import { getRedisConnection } from './utils'; @@ -22,7 +23,7 @@ export class QueueManager { private queues = new Map(); private caches = new Map(); private rateLimiter?: QueueRateLimiter; - private redisConnection: any; + private redisConnection: ReturnType; private isShuttingDown = false; private isInitialized = false; @@ -93,7 +94,7 @@ export class QueueManager { }; // Prepare queue configuration - const queueConfig: QueueConfig = { + const queueConfig: QueueWorkerConfig = { workers: mergedOptions.workers, concurrency: mergedOptions.concurrency, startWorker: mergedOptions.workers && mergedOptions.workers > 0, diff --git a/libs/queue/src/queue.ts b/libs/queue/src/queue.ts index 2aec8cc..22602c0 100644 --- a/libs/queue/src/queue.ts +++ b/libs/queue/src/queue.ts @@ -1,12 +1,12 @@ import { Queue as BullQueue, Worker, QueueEvents, type Job } from 'bullmq'; import { getLogger } from '@stock-bot/logger'; import { handlerRegistry } from './handler-registry'; -import type { JobData, JobOptions, QueueStats } from './types'; +import type { JobData, JobOptions, QueueStats, RedisConfig } from './types'; import { getRedisConnection } from './utils'; const logger = getLogger('queue'); -export interface QueueConfig { +export interface QueueWorkerConfig { workers?: number; concurrency?: number; startWorker?: boolean; @@ -21,13 +21,13 @@ export class Queue { private workers: Worker[] = []; private queueEvents?: QueueEvents; private queueName: string; - private redisConfig: any; + private redisConfig: RedisConfig; constructor( queueName: string, - redisConfig: any, + redisConfig: RedisConfig, defaultJobOptions: JobOptions = {}, - config: QueueConfig = {} + config: QueueWorkerConfig = {} ) { this.queueName = queueName; this.redisConfig = redisConfig; diff --git a/libs/queue/src/rate-limiter.ts b/libs/queue/src/rate-limiter.ts index 2e4d123..2293a1c 100644 --- a/libs/queue/src/rate-limiter.ts +++ b/libs/queue/src/rate-limiter.ts @@ -1,28 +1,19 @@ import { RateLimiterRedis, RateLimiterRes } from 'rate-limiter-flexible'; import { getLogger } from '@stock-bot/logger'; +import type { RateLimitConfig as BaseRateLimitConfig, RateLimitRule } from './types'; const logger = getLogger('rate-limiter'); -export interface RateLimitConfig { - points: number; // Number of requests - duration: number; // Per duration in seconds - blockDuration?: number; // Block duration in seconds +// Extend the base config to add rate-limiter specific fields +export interface RateLimitConfig extends BaseRateLimitConfig { keyPrefix?: string; } -export interface RateLimitRule { - level: 'global' | 'queue' | 'handler' | 'operation'; - queueName?: string; // For queue-level limits - handler?: string; // For handler-level limits - operation?: string; // For operation-level limits (most specific) - config: RateLimitConfig; -} - export class QueueRateLimiter { private limiters = new Map(); private rules: RateLimitRule[] = []; - constructor(private redisClient: any) {} + constructor(private redisClient: ReturnType) {} /** * Add a rate limit rule diff --git a/libs/queue/src/types.ts b/libs/queue/src/types.ts index ba2219f..d11a05e 100644 --- a/libs/queue/src/types.ts +++ b/libs/queue/src/types.ts @@ -1,9 +1,9 @@ // Types for queue operations -export interface JobData { +export interface JobData { type?: string; handler: string; operation: string; - payload: any; + payload: T; priority?: number; } @@ -19,8 +19,6 @@ export interface ProcessOptions { // Job routing information handler?: string; operation?: string; - // Optional queue for overloaded function signatures - queue?: any; // QueueManager reference } export interface BatchResult { @@ -33,8 +31,8 @@ export interface BatchResult { // New improved types for the refactored architecture export interface RedisConfig { - host?: string; - port?: number; + host: string; + port: number; password?: string; db?: number; } @@ -96,14 +94,14 @@ export interface QueueConfig extends QueueManagerConfig { enableMetrics?: boolean; } -export interface JobHandler { - (payload: any): Promise; +export interface JobHandler { + (payload: TPayload): Promise; } -export interface ScheduledJob { +export interface ScheduledJob { type: string; operation: string; - payload: any; + payload: T; cronPattern: string; priority?: number; description?: string; diff --git a/libs/queue/src/utils.ts b/libs/queue/src/utils.ts index 7db4bcb..df851bb 100644 --- a/libs/queue/src/utils.ts +++ b/libs/queue/src/utils.ts @@ -1,12 +1,9 @@ +import type { RedisConfig } from './types'; + /** * Get Redis connection configuration with retry settings */ -export function getRedisConnection(config: { - host: string; - port: number; - password?: string; - db?: number; -}) { +export function getRedisConnection(config: RedisConfig) { const isTest = process.env.NODE_ENV === 'test' || process.env.BUNIT === '1'; return {