From a7146a3f57f7af02c97f3d53d099cc765cf32b63 Mon Sep 17 00:00:00 2001 From: Boki Date: Sun, 6 Jul 2025 18:53:02 -0400 Subject: [PATCH] fixed up ratelimiting --- .env | 2 +- apps/stock/config/config/default.json | 2 +- .../src/handlers/eod/eod.handler.ts | 78 ++-- .../test/test-advanced-rate-limit.ts | 100 +++++ .../src/registrations/service.registration.ts | 2 + libs/core/di/src/service-application.ts | 39 ++ libs/core/handlers/src/base/BaseHandler.ts | 1 + .../handlers/src/decorators/rate-limit.ts | 82 +++- libs/core/handlers/src/index.ts | 6 +- libs/core/queue/src/index.ts | 2 +- libs/core/queue/src/queue-manager.ts | 113 ++++-- libs/core/queue/src/queue.ts | 93 ++++- libs/core/queue/src/rate-limiter-new.ts | 380 ++++++++++++++++++ libs/core/queue/src/rate-limiter.ts | 187 +++++---- libs/core/queue/src/types.ts | 11 +- 15 files changed, 912 insertions(+), 186 deletions(-) create mode 100755 apps/stock/data-ingestion/test/test-advanced-rate-limit.ts create mode 100644 libs/core/queue/src/rate-limiter-new.ts diff --git a/.env b/.env index a029ae7..8923e13 100644 --- a/.env +++ b/.env @@ -5,7 +5,7 @@ # Core Application Settings NODE_ENV=development LOG_LEVEL=trace -LOG_HIDE_OBJECT=true +LOG_HIDE_OBJECT=false # Data Service Configuration DATA_SERVICE_PORT=2001 diff --git a/apps/stock/config/config/default.json b/apps/stock/config/config/default.json index aa414e9..e8e112d 100644 --- a/apps/stock/config/config/default.json +++ b/apps/stock/config/config/default.json @@ -182,7 +182,7 @@ }, "services": { "dataIngestion": { - "port": 2009, + "port": 2001, "workers": 5, "queues": { "ceo": { "concurrency": 2 }, diff --git a/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts b/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts index e9c9569..9b96c4e 100644 --- a/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts +++ b/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts @@ -13,55 +13,63 @@ import { } from './actions'; /** - * EOD (End of Day) Handler for testing rate limits - * This handler demonstrates different rate limit configurations + * EOD (End of Day) Handler demonstrating advanced rate limiting * - * Handler-level rate limit: 100 requests per minute for all operations - * Individual operations can override this with their own limits + * Handler-level limits apply to all operations unless overridden + * Operations can specify just a cost to use handler limits, or override with custom limits */ @Handler('eod') -// @Disabled() -@RateLimit({ points: 1, duration: 10, blockDuration: 10 }) +@RateLimit({ + limits: [ + { points: 10, duration: 1 }, // 100 points per second + { points: 10000, duration: 3600 }, // 10k points per hour + { points: 100000, duration: 86400 }, // 100k points per day + ], + cost: 1, // Default cost for operations using this handler +}) export class EodHandler extends BaseHandler { constructor(services: any) { super(services); } /** - * Fetch daily price data - High volume operation - * Rate limit: 50 requests per minute (overrides handler-level limit) + * Fetch daily price data - Low cost operation + * Uses handler rate limits but costs only 1 point */ @Operation('fetch-daily-prices') - @RateLimit({ points: 3, duration: 10, blockDuration: 5 }) + @RateLimit(1) // Costs 1 point per call fetchDailyPrices = fetchDailyPrices; /** - * Fetch fundamental data - Medium volume operation - * Rate limit: 20 requests per minute + * Fetch fundamental data - Medium cost operation + * Uses handler rate limits but costs 10 points */ @Operation('fetch-fundamentals') - @RateLimit({ points: 2, duration: 10, blockDuration: 10 }) + @RateLimit(1) // Costs 10 points per call fetchFundamentals = fetchFundamentals; /** - * Fetch news data - Low volume operation - * Rate limit: 10 requests per minute (most restrictive) + * Fetch news data - High cost operation + * Has custom limits AND high cost */ @Operation('fetch-news') + @RateLimit(1) fetchNews = fetchNews; /** * Test burst operations - For testing rate limit behavior - * This doesn't have its own rate limit, so it uses the handler-level limit (100/min) + * Uses handler default cost (1 point) */ @Operation('test-burst') + @RateLimit(0) async testBurstOperations(input: { operationsToTest: string[], burstSize: number }): Promise { this.logger.info('Testing burst operations', input); const results = { attempted: 0, scheduled: 0, - failed: 0 + failed: 0, + operations: {} as Record }; try { @@ -69,11 +77,13 @@ export class EodHandler extends BaseHandler { for (let i = 0; i < input.burstSize; i++) { const operation = input.operationsToTest[i % input.operationsToTest.length] || 'fetch-news'; results.attempted++; + results.operations[operation] = (results.operations[operation] || 0) + 1; - const promise = this.scheduleOperation(operation, {}).then(() => { + const promise = this.scheduleOperation(operation, { index: i }).then(() => { results.scheduled++; - }).catch(() => { + }).catch((error) => { results.failed++; + this.logger.debug('Failed to schedule operation', { operation, error: error.message }); }); promises.push(promise); @@ -84,7 +94,8 @@ export class EodHandler extends BaseHandler { return { success: true, results, - message: `Scheduled ${results.scheduled}/${results.attempted} operations` + message: `Scheduled ${results.scheduled}/${results.attempted} operations`, + breakdown: results.operations }; } catch (error) { this.logger.error('Burst test failed', { error }); @@ -92,34 +103,6 @@ export class EodHandler extends BaseHandler { } } - /** - * Scheduled job to fetch daily prices - * Runs every day at 6 PM (after market close) - */ - @ScheduledOperation('eod-daily-prices', '0 18 * * *', { - priority: 5, - description: 'Fetch daily price data after market close', - immediately: false, - }) - async scheduledFetchDailyPrices(): Promise { - this.logger.info('Starting scheduled daily price fetch'); - return this.fetchDailyPrices(); - } - - /** - * Scheduled job to fetch fundamentals - * Runs weekly on Sunday - */ - @ScheduledOperation('eod-fundamentals', '0 0 * * 0', { - priority: 5, - description: 'Weekly fundamental data update', - immediately: false, - }) - async scheduledFetchFundamentals(): Promise { - this.logger.info('Starting scheduled fundamentals fetch'); - return this.fetchFundamentals(); - } - /** * Scheduled job to test rate limits * Runs every 5 minutes for testing @@ -129,6 +112,7 @@ export class EodHandler extends BaseHandler { description: 'Test rate limit behavior', immediately: true, }) + @RateLimit(0) // No cost for this test operation async scheduledRateLimitTest(): Promise { this.logger.info('Starting rate limit test'); return this.testBurstOperations({ diff --git a/apps/stock/data-ingestion/test/test-advanced-rate-limit.ts b/apps/stock/data-ingestion/test/test-advanced-rate-limit.ts new file mode 100755 index 0000000..4c895f7 --- /dev/null +++ b/apps/stock/data-ingestion/test/test-advanced-rate-limit.ts @@ -0,0 +1,100 @@ +#!/usr/bin/env bun + +import { getLogger } from '@stock-bot/logger'; + +const logger = getLogger('test-advanced-rate-limit'); + +async function testAdvancedRateLimits() { + logger.info('Testing advanced rate limit features...'); + + const baseUrl = 'http://localhost:3001/api'; + + logger.info('\nšŸ“‹ Rate Limit Configuration:'); + logger.info('Handler limits: 100pts/sec, 10k pts/hour, 100k pts/day'); + logger.info('fetch-daily-prices: 1 point per call'); + logger.info('fetch-fundamentals: 10 points per call'); + logger.info('fetch-news: 50 points per call (custom limits: 10pts/min, 100pts/hour)'); + + // First test: test-burst operation + logger.info('\nšŸš€ Testing burst operation with mixed costs...'); + + const burstResponse = await fetch(`${baseUrl}/handlers/eod/operations/test-burst`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + operationsToTest: ['fetch-daily-prices', 'fetch-fundamentals', 'fetch-news'], + burstSize: 30 + }) + }); + + const burstResult = await burstResponse.json(); + logger.info('Burst test result:', burstResult); + + // Wait for jobs to process + logger.info('\nā³ Waiting 10 seconds for jobs to process...'); + await new Promise(resolve => setTimeout(resolve, 10000)); + + // Test individual operations with different costs + logger.info('\nšŸ“Š Testing individual operations:'); + + // Test cheap operation (1 point) + logger.info('\n1ļøāƒ£ Testing fetch-daily-prices (1 point each)...'); + for (let i = 0; i < 5; i++) { + const response = await fetch(`${baseUrl}/handlers/eod/operations/fetch-daily-prices`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ test: true, index: i }) + }); + logger.info(`Request ${i + 1}: ${response.status} ${response.statusText}`); + } + + // Test medium cost operation (10 points) + logger.info('\nšŸ”Ÿ Testing fetch-fundamentals (10 points each)...'); + for (let i = 0; i < 3; i++) { + const response = await fetch(`${baseUrl}/handlers/eod/operations/fetch-fundamentals`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ test: true, index: i }) + }); + logger.info(`Request ${i + 1}: ${response.status} ${response.statusText}`); + } + + // Test expensive operation (50 points) + logger.info('\nšŸ’° Testing fetch-news (50 points each, custom limits)...'); + for (let i = 0; i < 2; i++) { + const response = await fetch(`${baseUrl}/handlers/eod/operations/fetch-news`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ test: true, index: i }) + }); + logger.info(`Request ${i + 1}: ${response.status} ${response.statusText}`); + } + + logger.info('\nāœ… Test completed!'); + logger.info('Check the data-ingestion service logs to see rate limiting in action.'); +} + +// Health check before running tests +async function checkService() { + try { + const response = await fetch('http://localhost:3001/health'); + if (!response.ok) { + throw new Error('Service not healthy'); + } + return true; + } catch (error) { + logger.error('Data ingestion service not running. Start it with: bun run dev'); + return false; + } +} + +async function main() { + if (await checkService()) { + await testAdvancedRateLimits(); + } +} + +main().catch(error => { + logger.error('Test failed:', error); + process.exit(1); +}); \ No newline at end of file diff --git a/libs/core/di/src/registrations/service.registration.ts b/libs/core/di/src/registrations/service.registration.ts index d87856e..0801371 100644 --- a/libs/core/di/src/registrations/service.registration.ts +++ b/libs/core/di/src/registrations/service.registration.ts @@ -62,6 +62,7 @@ export function registerApplicationServices( container.register({ queueManager: asFunction(({ logger, handlerRegistry }) => { const { QueueManager } = require('@stock-bot/queue'); + const queueConfig = { serviceName: config.service?.serviceName || config.service?.name || 'unknown', redis: { @@ -78,6 +79,7 @@ export function registerApplicationServices( enableScheduledJobs: config.queue!.enableScheduledJobs ?? true, autoDiscoverHandlers: true, }; + return new QueueManager(queueConfig, handlerRegistry, logger); }).singleton(), }); diff --git a/libs/core/di/src/service-application.ts b/libs/core/di/src/service-application.ts index ef42bac..f0df482 100644 --- a/libs/core/di/src/service-application.ts +++ b/libs/core/di/src/service-application.ts @@ -362,6 +362,45 @@ export class ServiceApplication { }); await handlerInitializer(containerWithDI); this.logger.info('Handlers initialized'); + + // After handlers are initialized, add rate limit rules to queue manager + const queueManager = this.container.resolve('queueManager'); + const handlerRegistry = this.container.resolve('handlerRegistry'); + + if (queueManager && handlerRegistry) { + const rateLimitRules = []; + const allMetadata = handlerRegistry.getAllMetadata(); + + for (const [handlerName, metadata] of allMetadata) { + // Handler-level rate limit + if (metadata.rateLimit) { + rateLimitRules.push({ + level: 'handler' as const, + handler: handlerName, + config: metadata.rateLimit as any, // Type mismatch between packages + }); + } + + // Operation-level rate limits + for (const operation of metadata.operations) { + if (operation.rateLimit) { + rateLimitRules.push({ + level: 'operation' as const, + handler: handlerName, + operation: operation.name, + config: operation.rateLimit as any, // Type mismatch between packages + }); + } + } + } + + if (rateLimitRules.length > 0) { + await queueManager.addRateLimitRules(rateLimitRules); + this.logger.info('Rate limit rules added to QueueManager', { + ruleCount: rateLimitRules.length, + }); + } + } } // Create and mount routes diff --git a/libs/core/handlers/src/base/BaseHandler.ts b/libs/core/handlers/src/base/BaseHandler.ts index 0dba5c7..c63d7d8 100644 --- a/libs/core/handlers/src/base/BaseHandler.ts +++ b/libs/core/handlers/src/base/BaseHandler.ts @@ -126,6 +126,7 @@ export abstract class BaseHandler if (!this.queue) { throw new Error('Queue service is not available for this handler'); } + const jobData = { handler: this.handlerName, operation, diff --git a/libs/core/handlers/src/decorators/rate-limit.ts b/libs/core/handlers/src/decorators/rate-limit.ts index 427f61e..3fc1da9 100644 --- a/libs/core/handlers/src/decorators/rate-limit.ts +++ b/libs/core/handlers/src/decorators/rate-limit.ts @@ -1,12 +1,30 @@ +/** + * Individual rate limit window configuration + */ +export interface RateLimitWindow { + points: number; // Number of allowed points in the time window + duration: number; // Time window in seconds + blockDuration?: number; // How long to block after limit is hit (seconds) +} + /** * Rate limit configuration for handlers and operations */ export interface RateLimitConfig { - points: number; // Number of allowed requests - duration: number; // Time window in seconds - blockDuration?: number; // How long to block after limit is hit (seconds) + limits?: RateLimitWindow[]; // Array of rate limit windows + cost?: number; // How many points this operation costs (default: 1) } +/** + * Flexible rate limit configuration + * Can be: + * - A full config object with limits and cost + * - Just a cost number (for operations inheriting handler limits) + * - An array of limit windows (legacy support) + * - A single limit window (legacy support) + */ +export type RateLimitOptions = RateLimitConfig | number | RateLimitWindow[] | RateLimitWindow; + /** * RateLimit decorator - configures rate limiting for handlers or operations * @@ -14,26 +32,62 @@ export interface RateLimitConfig { * - Classes: Sets default rate limit for all operations in the handler * - Methods: Sets specific rate limit for individual operations (overrides handler-level) * - * @param config Rate limit configuration + * @param options Rate limit configuration - flexible format * * @example - * // Handler-level rate limit + * // Handler-level rate limit with multiple windows * @Handler('myHandler') - * @RateLimit({ points: 100, duration: 60 }) + * @RateLimit({ + * limits: [ + * { points: 100, duration: 1 }, // 100 points/second + * { points: 10000, duration: 3600 } // 10k points/hour + * ], + * cost: 1 // Default cost for all operations + * }) * class MyHandler extends BaseHandler { - * // All operations inherit the 100/minute limit + * // All operations inherit these limits * } * * @example - * // Operation-specific rate limit - * @Operation('fetch-data') - * @RateLimit({ points: 10, duration: 60, blockDuration: 30 }) - * async fetchData() { - * // This operation is limited to 10/minute - * } + * // Operation-specific cost only (inherits handler limits) + * @Operation('fetch-simple') + * @RateLimit(1) // Costs 1 point + * async fetchSimple() {} + * + * @Operation('fetch-complex') + * @RateLimit({ cost: 10 }) // Costs 10 points + * async fetchComplex() {} + * + * @example + * // Operation with custom limits and cost + * @Operation('special-operation') + * @RateLimit({ + * limits: [ + * { points: 10, duration: 60 } // More restrictive: 10/minute + * ], + * cost: 5 + * }) + * async specialOperation() {} */ -export function RateLimit(config: RateLimitConfig) { +export function RateLimit(options: RateLimitOptions) { return function (target: any, propertyKey?: string, descriptor?: PropertyDescriptor): any { + // Normalize options to RateLimitConfig format + let config: RateLimitConfig; + + if (typeof options === 'number') { + // Just a cost number + config = { cost: options }; + } else if (Array.isArray(options)) { + // Array of limit windows (legacy support) + config = { limits: options }; + } else if ('points' in options && 'duration' in options) { + // Single limit window (legacy support) + config = { limits: [options as RateLimitWindow] }; + } else { + // Already a RateLimitConfig + config = options as RateLimitConfig; + } + if (propertyKey) { // Method decorator - operation-specific rate limit const constructor = target.constructor; diff --git a/libs/core/handlers/src/index.ts b/libs/core/handlers/src/index.ts index 5ab415e..7ce2373 100644 --- a/libs/core/handlers/src/index.ts +++ b/libs/core/handlers/src/index.ts @@ -8,7 +8,11 @@ export { Disabled, } from './decorators/decorators'; export { RateLimit } from './decorators/rate-limit'; -export type { RateLimitConfig } from './decorators/rate-limit'; +export type { + RateLimitConfig, + RateLimitWindow, + RateLimitOptions +} from './decorators/rate-limit'; export { createJobHandler } from './utils/create-job-handler'; // Re-export commonly used types from @stock-bot/types for convenience diff --git a/libs/core/queue/src/index.ts b/libs/core/queue/src/index.ts index 67f6d19..66a2c87 100644 --- a/libs/core/queue/src/index.ts +++ b/libs/core/queue/src/index.ts @@ -20,7 +20,7 @@ export { DeadLetterQueueHandler } from './dlq-handler'; export { QueueMetricsCollector } from './queue-metrics'; // Rate limiting -export { QueueRateLimiter } from './rate-limiter'; +export { QueueRateLimiter } from './rate-limiter-new'; // Types export type { diff --git a/libs/core/queue/src/queue-manager.ts b/libs/core/queue/src/queue-manager.ts index 5df43fa..acd8e41 100644 --- a/libs/core/queue/src/queue-manager.ts +++ b/libs/core/queue/src/queue-manager.ts @@ -3,8 +3,9 @@ import type { CacheProvider } from '@stock-bot/cache'; import { createCache } from '@stock-bot/cache'; import type { HandlerRegistry } from '@stock-bot/handler-registry'; import { getLogger } from '@stock-bot/logger'; +import Redis from 'ioredis'; import { Queue, type QueueWorkerConfig } from './queue'; -import { QueueRateLimiter } from './rate-limiter'; +import { QueueRateLimiter } from './rate-limiter-new'; import { getFullQueueName, parseQueueName } from './service-utils'; import type { GlobalStats, @@ -37,6 +38,7 @@ export class QueueManager { private caches = new Map(); private rateLimiter?: QueueRateLimiter; private redisConnection: ReturnType; + private ioredisConnection?: Redis; private isShuttingDown = false; private shutdownPromise: Promise | null = null; private config: QueueManagerConfig; @@ -65,16 +67,19 @@ export class QueueManager { this.handlerRegistry = handlerRegistry; this.logger = logger || getLogger('QueueManager'); this.redisConnection = getRedisConnection(config.redis); + + // Create ioredis connection for rate limiter + this.ioredisConnection = new Redis({ + host: config.redis.host, + port: config.redis.port, + password: config.redis.password, + db: config.redis.db || 0, + maxRetriesPerRequest: 3, + enableReadyCheck: true, + }); - // Initialize rate limiter if rules are provided - if (config.rateLimitRules && config.rateLimitRules.length > 0) { - this.rateLimiter = new QueueRateLimiter(this.redisConnection, this.logger); - config.rateLimitRules.forEach(rule => { - if (this.rateLimiter) { - this.rateLimiter.addRule(rule); - } - }); - } + // Rate limiter will be initialized when rules are added dynamically + // No longer initialized from config // Auto-discover routes if enabled and registry provided if (config.serviceName && config.autoDiscoverHandlers !== false && handlerRegistry) { @@ -153,6 +158,7 @@ export class QueueManager { startWorker: workers > 0, handlerRegistry: options.handlerRegistry || this.handlerRegistry, serviceName: this.config.serviceName, + queueManager: this, }; const queue = new Queue( @@ -169,16 +175,7 @@ export class QueueManager { // Automatically initialize batch cache for the queue this.initializeBatchCacheSync(queueName); - // Add queue-specific rate limit rules - if (this.rateLimiter && mergedOptions.rateLimitRules) { - mergedOptions.rateLimitRules.forEach(rule => { - // Ensure queue name is set for queue-specific rules - const ruleWithQueue = { ...rule, queueName }; - if (this.rateLimiter) { - this.rateLimiter.addRule(ruleWithQueue); - } - }); - } + // Rate limit rules are now added via addRateLimitRules after handlers are initialized this.logger.info('Queue created with batch cache', { queueName, @@ -303,23 +300,44 @@ export class QueueManager { /** * Add a rate limit rule */ - addRateLimitRule(rule: RateLimitRule): void { + async addRateLimitRule(rule: RateLimitRule): Promise { if (!this.rateLimiter) { - this.rateLimiter = new QueueRateLimiter(this.redisConnection, this.logger); + if (!this.ioredisConnection) { + throw new Error('IoRedis connection not initialized'); + } + this.rateLimiter = new QueueRateLimiter(this.ioredisConnection, this.logger); } - this.rateLimiter.addRule(rule); + await this.rateLimiter.addRule(rule); } /** - * Check rate limits for a job + * Add multiple rate limit rules at once + */ + async addRateLimitRules(rules: RateLimitRule[]): Promise { + if (!this.rateLimiter) { + if (!this.ioredisConnection) { + throw new Error('IoRedis connection not initialized'); + } + this.rateLimiter = new QueueRateLimiter(this.ioredisConnection, this.logger); + } + await Promise.all(rules.map(rule => this.rateLimiter!.addRule(rule))); + + this.logger.info('Added rate limit rules to QueueManager', { + ruleCount: rules.length, + rules, + }); + } + + /** + * Check rate limits for a job - overloaded to support both interfaces */ async checkRateLimit( - queueName: string, - handler: string, - operation: string + handlerOrQueueName: string, + operationOrHandler?: string, + operationIfThreeParams?: string ): Promise<{ allowed: boolean; - retryAfter?: number; + retryDelayMs?: number; remainingPoints?: number; appliedRule?: RateLimitRule; }> { @@ -327,7 +345,27 @@ export class QueueManager { return { allowed: true }; } - return await this.rateLimiter.checkLimit(queueName, handler, operation); + // Support both 2-parameter and 3-parameter calls + let handler: string; + let operation: string | undefined; + + if (operationIfThreeParams !== undefined) { + // 3-parameter call: checkRateLimit(queueName, handler, operation) + handler = operationOrHandler!; + operation = operationIfThreeParams; + } else { + // 2-parameter call: checkRateLimit(handler, operation) + handler = handlerOrQueueName; + operation = operationOrHandler; + } + + // The new rate limiter only needs handler and operation + const result = await this.rateLimiter.checkLimit(handler, operation); + return { + allowed: result.allowed, + retryDelayMs: result.retryDelayMs, + remainingPoints: result.remainingPoints, + }; } /** @@ -342,7 +380,12 @@ export class QueueManager { }; } - return await this.rateLimiter.getStatus(queueName, handler, operation); + // The new rate limiter only needs handler and operation + const status = await this.rateLimiter.getStatus(handler, operation); + return { + queueName, + ...status + }; } /** @@ -467,6 +510,16 @@ export class QueueManager { }); await Promise.all(cacheShutdownPromises); + + // Close ioredis connection + if (this.ioredisConnection) { + try { + await this.ioredisConnection.quit(); + this.logger.debug('IoRedis connection closed'); + } catch (error) { + this.logger.warn('Error closing ioredis connection', { error: (error as Error).message }); + } + } // Clear collections this.queues.clear(); diff --git a/libs/core/queue/src/queue.ts b/libs/core/queue/src/queue.ts index 3af41a3..b079beb 100644 --- a/libs/core/queue/src/queue.ts +++ b/libs/core/queue/src/queue.ts @@ -20,6 +20,7 @@ export interface QueueWorkerConfig { startWorker?: boolean; handlerRegistry?: HandlerRegistry; serviceName?: string; + queueManager?: any; // Reference to queue manager for rate limiting } /** @@ -35,6 +36,7 @@ export class Queue { private readonly logger: Logger; private readonly handlerRegistry?: HandlerRegistry; private serviceName?: string; + private queueManager?: any; constructor( queueName: string, @@ -48,6 +50,7 @@ export class Queue { this.logger = logger || console; this.handlerRegistry = config.handlerRegistry; this.serviceName = config.serviceName; + this.queueManager = config.queueManager; this.logger.debug('Queue constructor called', { queueName, @@ -332,13 +335,18 @@ export class Queue { const connection = getRedisConnection(this.redisConfig); for (let i = 0; i < workerCount; i++) { - const worker = new Worker(this.queueName, this.processJob.bind(this), { + const worker: Worker = new Worker(this.queueName, async (job: Job) => this.processJob(job, worker), { connection, concurrency, maxStalledCount: 3, stalledInterval: 30000, // Add a name to identify the worker name: `${this.serviceName || 'unknown'}_worker_${i}`, + // Enable manual rate limiting + limiter: { + max: 100, + duration: 1, + }, }); this.logger.info(`Starting worker ${i + 1}/${workerCount} for queue`, { @@ -348,7 +356,7 @@ export class Queue { }); // Setup worker event handlers - worker.on('completed', job => { + worker.on('completed', (job: Job) => { this.logger.trace('Job completed', { queueName: this.queueName, jobId: job.id, @@ -357,17 +365,27 @@ export class Queue { }); }); - worker.on('failed', (job, err) => { - this.logger.error('Job failed', { - queueName: this.queueName, - jobId: job?.id, - handler: job?.data?.handler, - operation: job?.data?.operation, - error: err.message, - }); + worker.on('failed', (job: Job | undefined, err: Error) => { + // Only log as error if it's not a rate limit error + if (err.name !== 'RateLimitError') { + this.logger.error('Job failed', { + queueName: this.queueName, + jobId: job?.id, + handler: job?.data?.handler, + operation: job?.data?.operation, + error: err.message, + }); + } else { + this.logger.debug('Job rate limited', { + queueName: this.queueName, + jobId: job?.id, + handler: job?.data?.handler, + operation: job?.data?.operation, + }); + } }); - worker.on('error', error => { + worker.on('error', (error: Error) => { this.logger.error('Worker error', { queueName: this.queueName, workerId: i, @@ -386,9 +404,16 @@ export class Queue { } /** - * Process a job using the handler registry + * Process a job using the handler registry with rate limiting */ - private async processJob(job: Job): Promise { + private async processJob(job: Job, worker: Worker): Promise { + this.logger.debug('Processing job', { + jobId: job.id, + jobName: job.name, + queueName: this.queueName, + hasQueueManager: !!this.queueManager, + jobData: job.data + }); const { handler, operation, payload }: JobData = job.data; this.logger.trace('Processing job', { @@ -404,6 +429,42 @@ export class Queue { throw new Error('Handler registry not configured for worker processing'); } + // Check rate limits if available + if (this.queueManager && this.queueManager.checkRateLimit) { + this.logger.debug('Checking rate limits', { + handler, + operation, + queueManager: this.queueManager.constructor.name + }); + + const rateLimitCheck = await this.queueManager.checkRateLimit( + handler, + operation + ); + + this.logger.debug('Rate limit check result', { + allowed: rateLimitCheck.allowed, + retryDelayMs: rateLimitCheck.retryDelayMs, + handler, + operation + }); + + if (!rateLimitCheck.allowed) { + this.logger.trace('Rate limit exceeded, throwing Worker.RateLimitError', { + id: job.id, + handler, + operation, + retryDelayMs: rateLimitCheck.retryDelayMs + }); + + // Use BullMQ's manual rate limiting + // First inform the worker about the rate limit delay + await worker.rateLimit(rateLimitCheck.retryDelayMs || 1000); + // Then throw the RateLimitError + throw Worker.RateLimitError(); + } + } + this.logger.debug('Looking up handler in registry', { handler, operation, @@ -419,6 +480,7 @@ export class Queue { const result = await jobHandler(payload); + this.logger.info(`[Queue] Job ${job.id} - ${handler}:${operation} completed successfully`); this.logger.trace('Job completed successfully', { id: job.id, handler, @@ -428,6 +490,11 @@ export class Queue { return result; } catch (error) { + // Re-throw RateLimitError without logging as error + if (error === Worker.RateLimitError() || (error as any).name === 'RateLimitError') { + throw error; + } + this.logger.error('Job processing failed', { id: job.id, handler, diff --git a/libs/core/queue/src/rate-limiter-new.ts b/libs/core/queue/src/rate-limiter-new.ts new file mode 100644 index 0000000..3014e75 --- /dev/null +++ b/libs/core/queue/src/rate-limiter-new.ts @@ -0,0 +1,380 @@ +import { RateLimiterRedis, type IRateLimiterRedisOptions } from 'rate-limiter-flexible'; +import type Redis from 'ioredis'; +import type { RateLimitWindow, RateLimitConfig, RateLimitRule } from './types'; + +// Logger interface for type safety +interface Logger { + info(message: string, meta?: Record): void; + error(message: string, meta?: Record): void; + warn(message: string, meta?: Record): void; + debug(message: string, meta?: Record): void; + trace(message: string, meta?: Record): void; +} + +interface RateLimiterEntry { + windows: Array<{ + limiter: RateLimiterRedis; + window: RateLimitWindow; + }>; + cost: number; +} + +export interface RateLimitResult { + allowed: boolean; + retryDelayMs?: number; + remainingPoints?: number; + cost?: number; +} + +/** + * Enhanced rate limiter that supports: + * - Multiple time windows per rule + * - Variable costs per operation + * - Hierarchical rules (operation > handler > queue > global) + */ +export class QueueRateLimiter { + private limiters = new Map(); + private readonly logger: Logger; + + constructor( + private redisClient: Redis, + logger?: Logger + ) { + this.logger = logger || console; + this.logger.debug('QueueRateLimiter initialized', { + redisClientType: typeof this.redisClient, + redisClientKeys: this.redisClient ? Object.keys(this.redisClient).slice(0, 10) : [], + hasClient: !!this.redisClient, + isIoRedis: this.redisClient && this.redisClient.constructor.name === 'Redis' + }); + } + + /** + * Add a rate limit rule + */ + async addRule(rule: RateLimitRule): Promise { + const key = this.getRuleKey(rule); + + // Create array of limiters for each window + const windows: RateLimiterEntry['windows'] = []; + const limits = rule.config.limits || []; + + this.logger.debug('Adding rate limit rule', { + key, + rule, + windowCount: limits.length, + cost: rule.config.cost + }); + + for (const window of limits) { + const limiterOptions: IRateLimiterRedisOptions = { + storeClient: this.redisClient as any, + keyPrefix: `rate_limit:${key}:${window.duration}s`, + points: window.points, + duration: window.duration, + blockDuration: window.blockDuration || 0, + }; + + this.logger.debug('Creating RateLimiterRedis', { + keyPrefix: limiterOptions.keyPrefix, + points: limiterOptions.points, + duration: limiterOptions.duration, + hasRedisClient: !!this.redisClient, + redisClientType: typeof this.redisClient + }); + + try { + const limiter = new RateLimiterRedis(limiterOptions); + windows.push({ limiter, window }); + } catch (error) { + this.logger.error('Failed to create RateLimiterRedis', { error, limiterOptions }); + throw error; + } + } + + // Default cost: handler defines the pool (no default cost), operations default to 1 + const defaultCost = rule.level === 'operation' ? 1 : 0; + + this.limiters.set(key, { + windows, + cost: rule.config.cost !== undefined ? rule.config.cost : defaultCost + }); + + this.logger.info('Rate limit rule added', { + key, + level: rule.level, + handler: rule.handler, + operation: rule.operation, + windows: windows.length, + cost: rule.config.cost !== undefined ? rule.config.cost : defaultCost + }); + } + + /** + * Check if an operation is allowed based on rate limits + */ + async checkLimit(handler: string, operation?: string): Promise { + // Build keys to check from most specific to least specific + const keysToCheck: string[] = []; + + if (operation) { + keysToCheck.push(this.getRuleKey({ level: 'operation', handler, operation, config: {} } as RateLimitRule)); + } + keysToCheck.push(this.getRuleKey({ level: 'handler', handler, config: {} } as RateLimitRule)); + keysToCheck.push(this.getRuleKey({ level: 'queue', config: {} } as RateLimitRule)); + keysToCheck.push(this.getRuleKey({ level: 'global', config: {} } as RateLimitRule)); + + this.logger.debug('Checking rate limits', { + handler, + operation, + keysToCheck, + availableKeys: Array.from(this.limiters.keys()) + }); + + // First, find the cost from the most specific rule + let cost = 1; // Default cost + for (const key of keysToCheck) { + const entry = this.limiters.get(key); + if (entry && entry.cost !== undefined) { + cost = entry.cost; + this.logger.info(`[RateLimiter] Using cost ${cost} from rule: ${key}`); + this.logger.debug('Using cost from rule', { + key, + cost, + handler, + operation + }); + break; + } + } + + // Then find the rate limit windows from the first matching rule that has windows + let windowEntry: RateLimiterEntry | undefined; + let appliedKey: string | undefined; + + for (const key of keysToCheck) { + const entry = this.limiters.get(key); + if (entry && entry.windows.length > 0) { + windowEntry = entry; + appliedKey = key; + break; + } + } + + if (!windowEntry) { + this.logger.debug('No rate limit rules found', { + handler, + operation + }); + return { allowed: true }; + } + + this.logger.info(`[RateLimiter] Applying rate limit rule: ${appliedKey}`, { + appliedKey, + windowCount: windowEntry.windows.length, + cost, + handler, + operation + }); + + // Check all windows with the determined cost + let maxRetryDelayMs = 0; + + this.logger.debug('Checking rate limit windows', { + handler, + operation, + cost, + windowCount: windowEntry.windows.length + }); + + // Special handling for 0-cost operations - they should always pass + if (cost === 0) { + this.logger.info(`[RateLimiter] Zero-cost operation ${handler}:${operation || 'N/A'} - allowing without consuming points`); + return { + allowed: true, + cost: 0 + }; + } + + for (const { limiter, window } of windowEntry.windows) { + // For handler-level rate limits, all operations share the same pool + // Use the applied key to determine the consumer key + const consumerKey = appliedKey?.startsWith('handler:') ? handler : + (operation ? `${handler}:${operation}` : handler); + + try { + // First check current state without consuming + const currentState = await limiter.get(consumerKey); + const now = Date.now(); + this.logger.info(`[RateLimiter] Current state before consume:`, { + handler, + operation, + consumerKey, + cost, + window: `${window.points}pts/${window.duration}s`, + consumedPoints: currentState?.consumedPoints || 0, + remainingPoints: currentState?.remainingPoints ?? window.points, + msBeforeNext: currentState?.msBeforeNext || 0, + currentTime: new Date(now).toISOString(), + resetTime: currentState?.msBeforeNext ? new Date(now + currentState.msBeforeNext).toISOString() : 'N/A', + hasState: !!currentState, + stateDetails: currentState ? JSON.stringify(currentState) : 'no state' + }); + + // Try to consume points + this.logger.info(`[RateLimiter] Attempting to consume ${cost} points for ${consumerKey}`); + const result = await limiter.consume(consumerKey, cost); + this.logger.info(`[RateLimiter] Successfully consumed ${cost} points, ${result.remainingPoints} remaining`); + this.logger.debug('Consumed points successfully', { + handler, + operation, + consumerKey, + window: `${window.points}pts/${window.duration}s`, + cost, + remainingPoints: result.remainingPoints, + msBeforeNext: result.msBeforeNext + }); + this.logger.trace('Rate limit window passed', { + handler, + operation, + window: `${window.points}pts/${window.duration}s`, + cost + }); + } catch (rejRes: any) { + // Rate limit exceeded for this window + const retryDelayMs = rejRes.msBeforeNext || window.blockDuration || 1000; + maxRetryDelayMs = Math.max(maxRetryDelayMs, retryDelayMs); + + this.logger.info(`[RateLimiter] RATE LIMIT HIT: ${consumerKey} - ${cost} points rejected, retry in ${retryDelayMs}ms`, { + consumedPoints: rejRes.consumedPoints || 0, + remainingPoints: rejRes.remainingPoints || 0, + totalPoints: rejRes.totalPoints || window.points, + isFirstInDuration: rejRes.isFirstInDuration || false, + msBeforeNext: rejRes.msBeforeNext || 0, + retryDelayMs + }); + this.logger.trace('Rate limit exceeded', { + handler, + operation, + consumerKey, + window: `${window.points}pts/${window.duration}s`, + cost, + remainingPoints: rejRes.remainingPoints || 0, + retryDelayMs: maxRetryDelayMs + }); + + return { + allowed: false, + retryDelayMs: maxRetryDelayMs, + remainingPoints: rejRes.remainingPoints || 0, + cost + }; + } + } + + // All windows passed + return { + allowed: true, + cost + }; + } + + /** + * Get rule key for storing rate limiter + */ + private getRuleKey(rule: RateLimitRule): string { + switch (rule.level) { + case 'global': + return 'global'; + case 'queue': + return `queue:${rule.queueName || 'default'}`; + case 'handler': + return `handler:${rule.handler}`; + case 'operation': + return `operation:${rule.handler}:${rule.operation}`; + default: + return rule.level; + } + } + + /** + * Get current rate limit status + */ + async getStatus(handler: string, operation?: string): Promise { + // Build keys to check from most specific to least specific + const keysToCheck: string[] = []; + + if (operation) { + keysToCheck.push(this.getRuleKey({ level: 'operation', handler, operation, config: {} } as RateLimitRule)); + } + keysToCheck.push(this.getRuleKey({ level: 'handler', handler, config: {} } as RateLimitRule)); + keysToCheck.push(this.getRuleKey({ level: 'queue', config: {} } as RateLimitRule)); + keysToCheck.push(this.getRuleKey({ level: 'global', config: {} } as RateLimitRule)); + + // Find the first matching rule + let entry: RateLimiterEntry | undefined; + let appliedKey: string | undefined; + + for (const key of keysToCheck) { + entry = this.limiters.get(key); + if (entry) { + appliedKey = key; + break; + } + } + + if (!entry) { + return { + handler, + operation, + hasRule: false + }; + } + + // Get status for all windows + const consumerKey = operation ? `${handler}:${operation}` : handler; + const windows = await Promise.all( + entry.windows.map(async ({ limiter, window }) => { + const result = await limiter.get(consumerKey); + return { + points: window.points, + duration: window.duration, + remaining: result?.remainingPoints ?? window.points, + resetIn: result?.msBeforeNext ?? 0, + }; + }) + ); + + return { + handler, + operation, + hasRule: true, + appliedKey, + cost: entry.cost, + windows + }; + } + + /** + * Reset rate limits for a specific handler/operation + */ + async reset(handler: string, operation?: string): Promise { + const keysToCheck: string[] = []; + + if (operation) { + keysToCheck.push(this.getRuleKey({ level: 'operation', handler, operation, config: {} } as RateLimitRule)); + } + keysToCheck.push(this.getRuleKey({ level: 'handler', handler, config: {} } as RateLimitRule)); + + for (const key of keysToCheck) { + const entry = this.limiters.get(key); + if (entry) { + await Promise.all( + entry.windows.map(({ limiter }) => limiter.delete(handler)) + ); + } + } + + this.logger.info('Rate limits reset', { handler, operation }); + } +} \ No newline at end of file diff --git a/libs/core/queue/src/rate-limiter.ts b/libs/core/queue/src/rate-limiter.ts index 413a62f..d417aa5 100644 --- a/libs/core/queue/src/rate-limiter.ts +++ b/libs/core/queue/src/rate-limiter.ts @@ -1,5 +1,5 @@ import { RateLimiterRedis, RateLimiterRes } from 'rate-limiter-flexible'; -import type { RateLimitConfig as BaseRateLimitConfig, RateLimitRule } from './types'; +import type { RateLimitWindow, RateLimitConfig, RateLimitRule } from './types'; // Logger interface for type safety interface Logger { @@ -9,13 +9,16 @@ interface Logger { debug(message: string, meta?: Record): void; } -// Extend the base config to add rate-limiter specific fields -export interface RateLimitConfig extends BaseRateLimitConfig { - keyPrefix?: string; +interface RateLimiterEntry { + windows: Array<{ + limiter: RateLimiterRedis; + window: RateLimitWindow; + }>; + cost: number; } export class QueueRateLimiter { - private limiters = new Map(); + private limiters = new Map(); private rules: RateLimitRule[] = []; private readonly logger: Logger; @@ -33,23 +36,33 @@ export class QueueRateLimiter { this.rules.push(rule); const key = this.getRuleKey(rule.level, rule.queueName, rule.handler, rule.operation); - const limiter = new RateLimiterRedis({ - storeClient: this.redisClient, - keyPrefix: `rl:${key}`, - points: rule.config.points, - duration: rule.config.duration, - blockDuration: rule.config.blockDuration || 0, + + // Extract limits and cost from config + const limits = rule.config.limits || []; + const cost = rule.config.cost || 1; + + // Create rate limiters for each window + const windows = limits.map((window, index) => { + const limiter = new RateLimiterRedis({ + storeClient: this.redisClient, + keyPrefix: `rl:${key}:${index}`, + points: window.points, + duration: window.duration, + blockDuration: window.blockDuration || 0, + }); + + return { limiter, window }; }); - this.limiters.set(key, limiter); + this.limiters.set(key, { windows, cost }); this.logger.info('Rate limit rule added', { level: rule.level, queueName: rule.queueName, handler: rule.handler, operation: rule.operation, - points: rule.config.points, - duration: rule.config.duration, + windows: limits.length, + cost, }); } @@ -57,6 +70,7 @@ export class QueueRateLimiter { * Check if a job can be processed based on rate limits * Uses hierarchical precedence: operation > handler > queue > global * The most specific matching rule takes precedence + * Returns the longest wait time if multiple windows are hit */ async checkLimit( queueName: string, @@ -67,6 +81,7 @@ export class QueueRateLimiter { retryAfter?: number; remainingPoints?: number; appliedRule?: RateLimitRule; + cost?: number; }> { const applicableRule = this.getMostSpecificRule(queueName, handler, operation); @@ -80,28 +95,74 @@ export class QueueRateLimiter { applicableRule.handler, applicableRule.operation ); - const limiter = this.limiters.get(key); + const limiterEntry = this.limiters.get(key); - if (!limiter) { + if (!limiterEntry || limiterEntry.windows.length === 0) { this.logger.warn('Rate limiter not found for rule', { key, rule: applicableRule }); return { allowed: true }; } - try { - const result = await this.consumePoint( - limiter, - this.getConsumerKey(queueName, handler, operation) - ); + const consumerKey = this.getConsumerKey(queueName, handler, operation); + const cost = limiterEntry.cost; + + // Check all windows and collect results + const windowResults = await Promise.all( + limiterEntry.windows.map(async ({ limiter, window }) => { + try { + // Try to consume points for this window + const result = await limiter.consume(consumerKey, cost); + return { + allowed: true, + remainingPoints: result.remainingPoints, + retryAfter: 0, + }; + } catch (rejRes) { + if (rejRes instanceof RateLimiterRes) { + return { + allowed: false, + remainingPoints: rejRes.remainingPoints, + retryAfter: rejRes.msBeforeNext, + }; + } + throw rejRes; + } + }) + ); + // Find if any window rejected the request + const rejectedWindow = windowResults.find(r => !r.allowed); + + if (rejectedWindow) { + // Find the longest wait time among all rejected windows + const maxRetryAfter = Math.max( + ...windowResults.filter(r => !r.allowed).map(r => r.retryAfter || 0) + ); + + this.logger.warn('Rate limit exceeded', { + handler, + operation, + cost, + retryAfter: maxRetryAfter, + }); + return { - ...result, + allowed: false, + retryAfter: maxRetryAfter, + remainingPoints: rejectedWindow.remainingPoints, appliedRule: applicableRule, + cost, }; - } catch (error) { - this.logger.error('Rate limit check failed', { queueName, handler, operation, error }); - // On error, allow the request to proceed - return { allowed: true }; } + + // All windows allowed - return the minimum remaining points + const minRemainingPoints = Math.min(...windowResults.map(r => r.remainingPoints || 0)); + + return { + allowed: true, + remainingPoints: minRemainingPoints, + appliedRule: applicableRule, + cost, + }; } /** @@ -144,35 +205,6 @@ export class QueueRateLimiter { return rule; } - /** - * Consume a point from the rate limiter - */ - private async consumePoint( - limiter: RateLimiterRedis, - key: string - ): Promise<{ allowed: boolean; retryAfter?: number; remainingPoints?: number }> { - try { - const result = await limiter.consume(key); - return { - allowed: true, - remainingPoints: result.remainingPoints, - }; - } catch (rejRes) { - if (rejRes instanceof RateLimiterRes) { - this.logger.warn('Rate limit exceeded', { - key, - retryAfter: rejRes.msBeforeNext, - }); - - return { - allowed: false, - retryAfter: rejRes.msBeforeNext, - remainingPoints: rejRes.remainingPoints, - }; - } - throw rejRes; - } - } /** * Get rule key for storing rate limiter @@ -216,13 +248,13 @@ export class QueueRateLimiter { handler: string; operation: string; appliedRule?: RateLimitRule; - limit?: { - level: string; + cost?: number; + windows?: Array<{ points: number; duration: number; remaining: number; resetIn: number; - }; + }>; }> { const applicableRule = this.getMostSpecificRule(queueName, handler, operation); @@ -240,9 +272,9 @@ export class QueueRateLimiter { applicableRule.handler, applicableRule.operation ); - const limiter = this.limiters.get(key); + const limiterEntry = this.limiters.get(key); - if (!limiter) { + if (!limiterEntry || limiterEntry.windows.length === 0) { return { queueName, handler, @@ -253,22 +285,27 @@ export class QueueRateLimiter { try { const consumerKey = this.getConsumerKey(queueName, handler, operation); - const result = await limiter.get(consumerKey); - - const limit = { - level: applicableRule.level, - points: limiter.points, - duration: limiter.duration, - remaining: result?.remainingPoints ?? limiter.points, - resetIn: result?.msBeforeNext ?? 0, - }; + + // Get status for all windows + const windows = await Promise.all( + limiterEntry.windows.map(async ({ limiter, window }) => { + const result = await limiter.get(consumerKey); + return { + points: window.points, + duration: window.duration, + remaining: result?.remainingPoints ?? window.points, + resetIn: result?.msBeforeNext ?? 0, + }; + }) + ); return { queueName, handler, operation, appliedRule: applicableRule, - limit, + cost: limiterEntry.cost, + windows, }; } catch (error) { this.logger.error('Failed to get rate limit status', { @@ -282,6 +319,7 @@ export class QueueRateLimiter { handler, operation, appliedRule: applicableRule, + cost: limiterEntry.cost, }; } } @@ -297,9 +335,12 @@ export class QueueRateLimiter { if (rule) { const key = this.getRuleKey(rule.level, rule.queueName, rule.handler, rule.operation); - const limiter = this.limiters.get(key); - if (limiter) { - await limiter.delete(consumerKey); + const limiterEntry = this.limiters.get(key); + if (limiterEntry) { + // Reset all windows for this consumer + await Promise.all( + limiterEntry.windows.map(({ limiter }) => limiter.delete(consumerKey)) + ); } } } else { diff --git a/libs/core/queue/src/types.ts b/libs/core/queue/src/types.ts index bc2e53b..9abdbdd 100644 --- a/libs/core/queue/src/types.ts +++ b/libs/core/queue/src/types.ts @@ -61,8 +61,6 @@ export interface QueueOptions { concurrency?: number; enableMetrics?: boolean; enableDLQ?: boolean; - enableRateLimit?: boolean; - rateLimitRules?: RateLimitRule[]; // Queue-specific rate limit rules handlerRegistry?: any; // HandlerRegistry from @stock-bot/handler-registry } @@ -70,8 +68,6 @@ export interface QueueManagerConfig { redis: RedisConfig; defaultQueueOptions?: QueueOptions; enableScheduledJobs?: boolean; - globalRateLimit?: RateLimitConfig; - rateLimitRules?: RateLimitRule[]; // Global rate limit rules serviceName?: string; // For service discovery and namespacing autoDiscoverHandlers?: boolean; // Auto-discover queue routes from handler registry } @@ -108,12 +104,17 @@ export interface HandlerInitializer { } // Rate limiting types -export interface RateLimitConfig { +export interface RateLimitWindow { points: number; duration: number; blockDuration?: number; } +export interface RateLimitConfig { + limits?: RateLimitWindow[]; + cost?: number; +} + export interface RateLimitRule { level: 'global' | 'queue' | 'handler' | 'operation'; queueName?: string; // For queue-level limits