From ac4c5078fa13349acb3a375e61c656b92a7fd7f2 Mon Sep 17 00:00:00 2001 From: Boki Date: Sun, 6 Jul 2025 19:23:11 -0400 Subject: [PATCH] cleaned up stuff --- apps/stock/config/config/default.json | 9 - .../config/src/schemas/providers.schema.ts | 77 +- .../config/src/schemas/stock-app.schema.ts | 7 - .../config/src/schemas/provider.schema.ts | 6 - libs/core/queue/src/index.ts | 2 +- libs/core/queue/src/queue-manager.ts | 2 +- libs/core/queue/src/rate-limiter-new.ts | 380 --------- libs/core/queue/src/rate-limiter.ts | 764 +++++++++--------- libs/core/queue/test/rate-limiter.test.ts | 349 -------- 9 files changed, 392 insertions(+), 1204 deletions(-) delete mode 100644 libs/core/queue/src/rate-limiter-new.ts delete mode 100644 libs/core/queue/test/rate-limiter.test.ts diff --git a/apps/stock/config/config/default.json b/apps/stock/config/config/default.json index e8e112d..e8cef5f 100644 --- a/apps/stock/config/config/default.json +++ b/apps/stock/config/config/default.json @@ -123,10 +123,6 @@ "name": "yahoo", "enabled": true, "priority": 1, - "rateLimit": { - "maxRequests": 5, - "windowMs": 60000 - }, "timeout": 30000, "baseUrl": "https://query1.finance.yahoo.com" }, @@ -190,10 +186,6 @@ "qm": { "concurrency": 5 }, "ib": { "concurrency": 1 }, "proxy": { "concurrency": 1 } - }, - "rateLimit": { - "enabled": true, - "requestsPerSecond": 10 } }, "dataPipeline": { @@ -213,7 +205,6 @@ }, "webApi": { "port": 2003, - "rateLimitPerMinute": 60, "cache": { "ttl": 300, "checkPeriod": 60 diff --git a/apps/stock/config/src/schemas/providers.schema.ts b/apps/stock/config/src/schemas/providers.schema.ts index 98f0c02..61bf1b1 100644 --- a/apps/stock/config/src/schemas/providers.schema.ts +++ b/apps/stock/config/src/schemas/providers.schema.ts @@ -1,67 +1,10 @@ -import { z } from 'zod'; - -// Base provider configuration -export const baseProviderConfigSchema = z.object({ - name: z.string(), - enabled: z.boolean().default(true), - priority: z.number().default(0), - rateLimit: z - .object({ - maxRequests: z.number().default(100), - windowMs: z.number().default(60000), - }) - .optional(), - timeout: z.number().default(30000), - retries: z.number().default(3), -}); - -// EOD Historical Data provider -export const eodProviderConfigSchema = baseProviderConfigSchema.extend({ - apiKey: z.string(), - baseUrl: z.string().default('https://eodhistoricaldata.com/api'), - tier: z.enum(['free', 'fundamentals', 'all-in-one']).default('free'), -}); - -// Interactive Brokers provider -export const ibProviderConfigSchema = baseProviderConfigSchema.extend({ - gateway: z.object({ - host: z.string().default('localhost'), - port: z.number().default(5000), - clientId: z.number().default(1), - }), - account: z.string().optional(), - marketDataType: z.enum(['live', 'delayed', 'frozen']).default('delayed'), -}); - -// QuoteMedia provider -export const qmProviderConfigSchema = baseProviderConfigSchema.extend({ - username: z.string(), - password: z.string(), - baseUrl: z.string().default('https://app.quotemedia.com/quotetools'), - webmasterId: z.string(), -}); - -// Yahoo Finance provider -export const yahooProviderConfigSchema = baseProviderConfigSchema.extend({ - baseUrl: z.string().default('https://query1.finance.yahoo.com'), - cookieJar: z.boolean().default(true), - crumb: z.string().optional(), -}); - -// Combined provider configuration -export const providersSchema = z.object({ - eod: eodProviderConfigSchema.optional(), - ib: ibProviderConfigSchema.optional(), - qm: qmProviderConfigSchema.optional(), - yahoo: yahooProviderConfigSchema.optional(), -}); - -// Dynamic provider configuration type -export type ProviderName = 'eod' | 'ib' | 'qm' | 'yahoo'; - -export const providerSchemas = { - eod: eodProviderConfigSchema, - ib: ibProviderConfigSchema, - qm: qmProviderConfigSchema, - yahoo: yahooProviderConfigSchema, -} as const; +// Re-export provider schemas from core config +export { + providerConfigSchema as providersSchema, + type ProviderName, + providerSchemas, + eodProviderConfigSchema, + ibProviderConfigSchema, + qmProviderConfigSchema, + yahooProviderConfigSchema, +} from '@stock-bot/config'; \ No newline at end of file diff --git a/apps/stock/config/src/schemas/stock-app.schema.ts b/apps/stock/config/src/schemas/stock-app.schema.ts index 9b7f6ad..fb425b6 100644 --- a/apps/stock/config/src/schemas/stock-app.schema.ts +++ b/apps/stock/config/src/schemas/stock-app.schema.ts @@ -41,12 +41,6 @@ export const stockAppSchema = baseAppSchema.extend({ }) ) .optional(), - rateLimit: z - .object({ - enabled: z.boolean().default(true), - requestsPerSecond: z.number().default(10), - }) - .optional(), }) .optional(), dataPipeline: z @@ -74,7 +68,6 @@ export const stockAppSchema = baseAppSchema.extend({ webApi: z .object({ port: z.number().default(2003), - rateLimitPerMinute: z.number().default(60), cache: z .object({ ttl: z.number().default(300), diff --git a/libs/core/config/src/schemas/provider.schema.ts b/libs/core/config/src/schemas/provider.schema.ts index 0f7e4cf..7a5f954 100644 --- a/libs/core/config/src/schemas/provider.schema.ts +++ b/libs/core/config/src/schemas/provider.schema.ts @@ -5,12 +5,6 @@ export const baseProviderConfigSchema = z.object({ name: z.string(), enabled: z.boolean().default(true), priority: z.number().default(0), - rateLimit: z - .object({ - maxRequests: z.number().default(100), - windowMs: z.number().default(60000), - }) - .optional(), timeout: z.number().default(30000), retries: z.number().default(3), }); diff --git a/libs/core/queue/src/index.ts b/libs/core/queue/src/index.ts index 66a2c87..67f6d19 100644 --- a/libs/core/queue/src/index.ts +++ b/libs/core/queue/src/index.ts @@ -20,7 +20,7 @@ export { DeadLetterQueueHandler } from './dlq-handler'; export { QueueMetricsCollector } from './queue-metrics'; // Rate limiting -export { QueueRateLimiter } from './rate-limiter-new'; +export { QueueRateLimiter } from './rate-limiter'; // Types export type { diff --git a/libs/core/queue/src/queue-manager.ts b/libs/core/queue/src/queue-manager.ts index acd8e41..4eacbbe 100644 --- a/libs/core/queue/src/queue-manager.ts +++ b/libs/core/queue/src/queue-manager.ts @@ -5,7 +5,7 @@ import type { HandlerRegistry } from '@stock-bot/handler-registry'; import { getLogger } from '@stock-bot/logger'; import Redis from 'ioredis'; import { Queue, type QueueWorkerConfig } from './queue'; -import { QueueRateLimiter } from './rate-limiter-new'; +import { QueueRateLimiter } from './rate-limiter'; import { getFullQueueName, parseQueueName } from './service-utils'; import type { GlobalStats, diff --git a/libs/core/queue/src/rate-limiter-new.ts b/libs/core/queue/src/rate-limiter-new.ts deleted file mode 100644 index 3014e75..0000000 --- a/libs/core/queue/src/rate-limiter-new.ts +++ /dev/null @@ -1,380 +0,0 @@ -import { RateLimiterRedis, type IRateLimiterRedisOptions } from 'rate-limiter-flexible'; -import type Redis from 'ioredis'; -import type { RateLimitWindow, RateLimitConfig, RateLimitRule } from './types'; - -// Logger interface for type safety -interface Logger { - info(message: string, meta?: Record): void; - error(message: string, meta?: Record): void; - warn(message: string, meta?: Record): void; - debug(message: string, meta?: Record): void; - trace(message: string, meta?: Record): void; -} - -interface RateLimiterEntry { - windows: Array<{ - limiter: RateLimiterRedis; - window: RateLimitWindow; - }>; - cost: number; -} - -export interface RateLimitResult { - allowed: boolean; - retryDelayMs?: number; - remainingPoints?: number; - cost?: number; -} - -/** - * Enhanced rate limiter that supports: - * - Multiple time windows per rule - * - Variable costs per operation - * - Hierarchical rules (operation > handler > queue > global) - */ -export class QueueRateLimiter { - private limiters = new Map(); - private readonly logger: Logger; - - constructor( - private redisClient: Redis, - logger?: Logger - ) { - this.logger = logger || console; - this.logger.debug('QueueRateLimiter initialized', { - redisClientType: typeof this.redisClient, - redisClientKeys: this.redisClient ? Object.keys(this.redisClient).slice(0, 10) : [], - hasClient: !!this.redisClient, - isIoRedis: this.redisClient && this.redisClient.constructor.name === 'Redis' - }); - } - - /** - * Add a rate limit rule - */ - async addRule(rule: RateLimitRule): Promise { - const key = this.getRuleKey(rule); - - // Create array of limiters for each window - const windows: RateLimiterEntry['windows'] = []; - const limits = rule.config.limits || []; - - this.logger.debug('Adding rate limit rule', { - key, - rule, - windowCount: limits.length, - cost: rule.config.cost - }); - - for (const window of limits) { - const limiterOptions: IRateLimiterRedisOptions = { - storeClient: this.redisClient as any, - keyPrefix: `rate_limit:${key}:${window.duration}s`, - points: window.points, - duration: window.duration, - blockDuration: window.blockDuration || 0, - }; - - this.logger.debug('Creating RateLimiterRedis', { - keyPrefix: limiterOptions.keyPrefix, - points: limiterOptions.points, - duration: limiterOptions.duration, - hasRedisClient: !!this.redisClient, - redisClientType: typeof this.redisClient - }); - - try { - const limiter = new RateLimiterRedis(limiterOptions); - windows.push({ limiter, window }); - } catch (error) { - this.logger.error('Failed to create RateLimiterRedis', { error, limiterOptions }); - throw error; - } - } - - // Default cost: handler defines the pool (no default cost), operations default to 1 - const defaultCost = rule.level === 'operation' ? 1 : 0; - - this.limiters.set(key, { - windows, - cost: rule.config.cost !== undefined ? rule.config.cost : defaultCost - }); - - this.logger.info('Rate limit rule added', { - key, - level: rule.level, - handler: rule.handler, - operation: rule.operation, - windows: windows.length, - cost: rule.config.cost !== undefined ? rule.config.cost : defaultCost - }); - } - - /** - * Check if an operation is allowed based on rate limits - */ - async checkLimit(handler: string, operation?: string): Promise { - // Build keys to check from most specific to least specific - const keysToCheck: string[] = []; - - if (operation) { - keysToCheck.push(this.getRuleKey({ level: 'operation', handler, operation, config: {} } as RateLimitRule)); - } - keysToCheck.push(this.getRuleKey({ level: 'handler', handler, config: {} } as RateLimitRule)); - keysToCheck.push(this.getRuleKey({ level: 'queue', config: {} } as RateLimitRule)); - keysToCheck.push(this.getRuleKey({ level: 'global', config: {} } as RateLimitRule)); - - this.logger.debug('Checking rate limits', { - handler, - operation, - keysToCheck, - availableKeys: Array.from(this.limiters.keys()) - }); - - // First, find the cost from the most specific rule - let cost = 1; // Default cost - for (const key of keysToCheck) { - const entry = this.limiters.get(key); - if (entry && entry.cost !== undefined) { - cost = entry.cost; - this.logger.info(`[RateLimiter] Using cost ${cost} from rule: ${key}`); - this.logger.debug('Using cost from rule', { - key, - cost, - handler, - operation - }); - break; - } - } - - // Then find the rate limit windows from the first matching rule that has windows - let windowEntry: RateLimiterEntry | undefined; - let appliedKey: string | undefined; - - for (const key of keysToCheck) { - const entry = this.limiters.get(key); - if (entry && entry.windows.length > 0) { - windowEntry = entry; - appliedKey = key; - break; - } - } - - if (!windowEntry) { - this.logger.debug('No rate limit rules found', { - handler, - operation - }); - return { allowed: true }; - } - - this.logger.info(`[RateLimiter] Applying rate limit rule: ${appliedKey}`, { - appliedKey, - windowCount: windowEntry.windows.length, - cost, - handler, - operation - }); - - // Check all windows with the determined cost - let maxRetryDelayMs = 0; - - this.logger.debug('Checking rate limit windows', { - handler, - operation, - cost, - windowCount: windowEntry.windows.length - }); - - // Special handling for 0-cost operations - they should always pass - if (cost === 0) { - this.logger.info(`[RateLimiter] Zero-cost operation ${handler}:${operation || 'N/A'} - allowing without consuming points`); - return { - allowed: true, - cost: 0 - }; - } - - for (const { limiter, window } of windowEntry.windows) { - // For handler-level rate limits, all operations share the same pool - // Use the applied key to determine the consumer key - const consumerKey = appliedKey?.startsWith('handler:') ? handler : - (operation ? `${handler}:${operation}` : handler); - - try { - // First check current state without consuming - const currentState = await limiter.get(consumerKey); - const now = Date.now(); - this.logger.info(`[RateLimiter] Current state before consume:`, { - handler, - operation, - consumerKey, - cost, - window: `${window.points}pts/${window.duration}s`, - consumedPoints: currentState?.consumedPoints || 0, - remainingPoints: currentState?.remainingPoints ?? window.points, - msBeforeNext: currentState?.msBeforeNext || 0, - currentTime: new Date(now).toISOString(), - resetTime: currentState?.msBeforeNext ? new Date(now + currentState.msBeforeNext).toISOString() : 'N/A', - hasState: !!currentState, - stateDetails: currentState ? JSON.stringify(currentState) : 'no state' - }); - - // Try to consume points - this.logger.info(`[RateLimiter] Attempting to consume ${cost} points for ${consumerKey}`); - const result = await limiter.consume(consumerKey, cost); - this.logger.info(`[RateLimiter] Successfully consumed ${cost} points, ${result.remainingPoints} remaining`); - this.logger.debug('Consumed points successfully', { - handler, - operation, - consumerKey, - window: `${window.points}pts/${window.duration}s`, - cost, - remainingPoints: result.remainingPoints, - msBeforeNext: result.msBeforeNext - }); - this.logger.trace('Rate limit window passed', { - handler, - operation, - window: `${window.points}pts/${window.duration}s`, - cost - }); - } catch (rejRes: any) { - // Rate limit exceeded for this window - const retryDelayMs = rejRes.msBeforeNext || window.blockDuration || 1000; - maxRetryDelayMs = Math.max(maxRetryDelayMs, retryDelayMs); - - this.logger.info(`[RateLimiter] RATE LIMIT HIT: ${consumerKey} - ${cost} points rejected, retry in ${retryDelayMs}ms`, { - consumedPoints: rejRes.consumedPoints || 0, - remainingPoints: rejRes.remainingPoints || 0, - totalPoints: rejRes.totalPoints || window.points, - isFirstInDuration: rejRes.isFirstInDuration || false, - msBeforeNext: rejRes.msBeforeNext || 0, - retryDelayMs - }); - this.logger.trace('Rate limit exceeded', { - handler, - operation, - consumerKey, - window: `${window.points}pts/${window.duration}s`, - cost, - remainingPoints: rejRes.remainingPoints || 0, - retryDelayMs: maxRetryDelayMs - }); - - return { - allowed: false, - retryDelayMs: maxRetryDelayMs, - remainingPoints: rejRes.remainingPoints || 0, - cost - }; - } - } - - // All windows passed - return { - allowed: true, - cost - }; - } - - /** - * Get rule key for storing rate limiter - */ - private getRuleKey(rule: RateLimitRule): string { - switch (rule.level) { - case 'global': - return 'global'; - case 'queue': - return `queue:${rule.queueName || 'default'}`; - case 'handler': - return `handler:${rule.handler}`; - case 'operation': - return `operation:${rule.handler}:${rule.operation}`; - default: - return rule.level; - } - } - - /** - * Get current rate limit status - */ - async getStatus(handler: string, operation?: string): Promise { - // Build keys to check from most specific to least specific - const keysToCheck: string[] = []; - - if (operation) { - keysToCheck.push(this.getRuleKey({ level: 'operation', handler, operation, config: {} } as RateLimitRule)); - } - keysToCheck.push(this.getRuleKey({ level: 'handler', handler, config: {} } as RateLimitRule)); - keysToCheck.push(this.getRuleKey({ level: 'queue', config: {} } as RateLimitRule)); - keysToCheck.push(this.getRuleKey({ level: 'global', config: {} } as RateLimitRule)); - - // Find the first matching rule - let entry: RateLimiterEntry | undefined; - let appliedKey: string | undefined; - - for (const key of keysToCheck) { - entry = this.limiters.get(key); - if (entry) { - appliedKey = key; - break; - } - } - - if (!entry) { - return { - handler, - operation, - hasRule: false - }; - } - - // Get status for all windows - const consumerKey = operation ? `${handler}:${operation}` : handler; - const windows = await Promise.all( - entry.windows.map(async ({ limiter, window }) => { - const result = await limiter.get(consumerKey); - return { - points: window.points, - duration: window.duration, - remaining: result?.remainingPoints ?? window.points, - resetIn: result?.msBeforeNext ?? 0, - }; - }) - ); - - return { - handler, - operation, - hasRule: true, - appliedKey, - cost: entry.cost, - windows - }; - } - - /** - * Reset rate limits for a specific handler/operation - */ - async reset(handler: string, operation?: string): Promise { - const keysToCheck: string[] = []; - - if (operation) { - keysToCheck.push(this.getRuleKey({ level: 'operation', handler, operation, config: {} } as RateLimitRule)); - } - keysToCheck.push(this.getRuleKey({ level: 'handler', handler, config: {} } as RateLimitRule)); - - for (const key of keysToCheck) { - const entry = this.limiters.get(key); - if (entry) { - await Promise.all( - entry.windows.map(({ limiter }) => limiter.delete(handler)) - ); - } - } - - this.logger.info('Rate limits reset', { handler, operation }); - } -} \ No newline at end of file diff --git a/libs/core/queue/src/rate-limiter.ts b/libs/core/queue/src/rate-limiter.ts index d417aa5..9691276 100644 --- a/libs/core/queue/src/rate-limiter.ts +++ b/libs/core/queue/src/rate-limiter.ts @@ -1,384 +1,380 @@ -import { RateLimiterRedis, RateLimiterRes } from 'rate-limiter-flexible'; -import type { RateLimitWindow, RateLimitConfig, RateLimitRule } from './types'; - -// Logger interface for type safety -interface Logger { - info(message: string, meta?: Record): void; - error(message: string, meta?: Record): void; - warn(message: string, meta?: Record): void; - debug(message: string, meta?: Record): void; -} - -interface RateLimiterEntry { - windows: Array<{ - limiter: RateLimiterRedis; - window: RateLimitWindow; - }>; - cost: number; -} - -export class QueueRateLimiter { - private limiters = new Map(); - private rules: RateLimitRule[] = []; - private readonly logger: Logger; - - constructor( - private redisClient: ReturnType, - logger?: Logger - ) { - this.logger = logger || console; - } - - /** - * Add a rate limit rule - */ - addRule(rule: RateLimitRule): void { - this.rules.push(rule); - - const key = this.getRuleKey(rule.level, rule.queueName, rule.handler, rule.operation); - - // Extract limits and cost from config - const limits = rule.config.limits || []; - const cost = rule.config.cost || 1; - - // Create rate limiters for each window - const windows = limits.map((window, index) => { - const limiter = new RateLimiterRedis({ - storeClient: this.redisClient, - keyPrefix: `rl:${key}:${index}`, - points: window.points, - duration: window.duration, - blockDuration: window.blockDuration || 0, - }); - - return { limiter, window }; - }); - - this.limiters.set(key, { windows, cost }); - - this.logger.info('Rate limit rule added', { - level: rule.level, - queueName: rule.queueName, - handler: rule.handler, - operation: rule.operation, - windows: limits.length, - cost, - }); - } - - /** - * Check if a job can be processed based on rate limits - * Uses hierarchical precedence: operation > handler > queue > global - * The most specific matching rule takes precedence - * Returns the longest wait time if multiple windows are hit - */ - async checkLimit( - queueName: string, - handler: string, - operation: string - ): Promise<{ - allowed: boolean; - retryAfter?: number; - remainingPoints?: number; - appliedRule?: RateLimitRule; - cost?: number; - }> { - const applicableRule = this.getMostSpecificRule(queueName, handler, operation); - - if (!applicableRule) { - return { allowed: true }; - } - - const key = this.getRuleKey( - applicableRule.level, - applicableRule.queueName, - applicableRule.handler, - applicableRule.operation - ); - const limiterEntry = this.limiters.get(key); - - if (!limiterEntry || limiterEntry.windows.length === 0) { - this.logger.warn('Rate limiter not found for rule', { key, rule: applicableRule }); - return { allowed: true }; - } - - const consumerKey = this.getConsumerKey(queueName, handler, operation); - const cost = limiterEntry.cost; - - // Check all windows and collect results - const windowResults = await Promise.all( - limiterEntry.windows.map(async ({ limiter, window }) => { - try { - // Try to consume points for this window - const result = await limiter.consume(consumerKey, cost); - return { - allowed: true, - remainingPoints: result.remainingPoints, - retryAfter: 0, - }; - } catch (rejRes) { - if (rejRes instanceof RateLimiterRes) { - return { - allowed: false, - remainingPoints: rejRes.remainingPoints, - retryAfter: rejRes.msBeforeNext, - }; - } - throw rejRes; - } - }) - ); - - // Find if any window rejected the request - const rejectedWindow = windowResults.find(r => !r.allowed); - - if (rejectedWindow) { - // Find the longest wait time among all rejected windows - const maxRetryAfter = Math.max( - ...windowResults.filter(r => !r.allowed).map(r => r.retryAfter || 0) - ); - - this.logger.warn('Rate limit exceeded', { - handler, - operation, - cost, - retryAfter: maxRetryAfter, - }); - - return { - allowed: false, - retryAfter: maxRetryAfter, - remainingPoints: rejectedWindow.remainingPoints, - appliedRule: applicableRule, - cost, - }; - } - - // All windows allowed - return the minimum remaining points - const minRemainingPoints = Math.min(...windowResults.map(r => r.remainingPoints || 0)); - - return { - allowed: true, - remainingPoints: minRemainingPoints, - appliedRule: applicableRule, - cost, - }; - } - - /** - * Get the most specific rule that applies to this job - * Precedence: operation > handler > queue > global - */ - private getMostSpecificRule( - queueName: string, - handler: string, - operation: string - ): RateLimitRule | undefined { - // 1. Check for operation-specific rule (most specific) - let rule = this.rules.find( - r => - r.level === 'operation' && - r.queueName === queueName && - r.handler === handler && - r.operation === operation - ); - if (rule) { - return rule; - } - - // 2. Check for handler-specific rule - rule = this.rules.find( - r => r.level === 'handler' && r.queueName === queueName && r.handler === handler - ); - if (rule) { - return rule; - } - - // 3. Check for queue-specific rule - rule = this.rules.find(r => r.level === 'queue' && r.queueName === queueName); - if (rule) { - return rule; - } - - // 4. Check for global rule (least specific) - rule = this.rules.find(r => r.level === 'global'); - return rule; - } - - - /** - * Get rule key for storing rate limiter - */ - private getRuleKey( - level: string, - queueName?: string, - handler?: string, - operation?: string - ): string { - switch (level) { - case 'global': - return 'global'; - case 'queue': - return `queue:${queueName}`; - case 'handler': - return `handler:${queueName}:${handler}`; - case 'operation': - return `operation:${queueName}:${handler}:${operation}`; - default: - return level; - } - } - - /** - * Get consumer key for rate limiting (what gets counted) - */ - private getConsumerKey(queueName: string, handler: string, operation: string): string { - return `${queueName}:${handler}:${operation}`; - } - - /** - * Get current rate limit status for a queue/handler/operation - */ - async getStatus( - queueName: string, - handler: string, - operation: string - ): Promise<{ - queueName: string; - handler: string; - operation: string; - appliedRule?: RateLimitRule; - cost?: number; - windows?: Array<{ - points: number; - duration: number; - remaining: number; - resetIn: number; - }>; - }> { - const applicableRule = this.getMostSpecificRule(queueName, handler, operation); - - if (!applicableRule) { - return { - queueName, - handler, - operation, - }; - } - - const key = this.getRuleKey( - applicableRule.level, - applicableRule.queueName, - applicableRule.handler, - applicableRule.operation - ); - const limiterEntry = this.limiters.get(key); - - if (!limiterEntry || limiterEntry.windows.length === 0) { - return { - queueName, - handler, - operation, - appliedRule: applicableRule, - }; - } - - try { - const consumerKey = this.getConsumerKey(queueName, handler, operation); - - // Get status for all windows - const windows = await Promise.all( - limiterEntry.windows.map(async ({ limiter, window }) => { - const result = await limiter.get(consumerKey); - return { - points: window.points, - duration: window.duration, - remaining: result?.remainingPoints ?? window.points, - resetIn: result?.msBeforeNext ?? 0, - }; - }) - ); - - return { - queueName, - handler, - operation, - appliedRule: applicableRule, - cost: limiterEntry.cost, - windows, - }; - } catch (error) { - this.logger.error('Failed to get rate limit status', { - queueName, - handler, - operation, - error, - }); - return { - queueName, - handler, - operation, - appliedRule: applicableRule, - cost: limiterEntry.cost, - }; - } - } - - /** - * Reset rate limits for a specific consumer - */ - async reset(queueName: string, handler?: string, operation?: string): Promise { - if (handler && operation) { - // Reset specific operation - const consumerKey = this.getConsumerKey(queueName, handler, operation); - const rule = this.getMostSpecificRule(queueName, handler, operation); - - if (rule) { - const key = this.getRuleKey(rule.level, rule.queueName, rule.handler, rule.operation); - const limiterEntry = this.limiters.get(key); - if (limiterEntry) { - // Reset all windows for this consumer - await Promise.all( - limiterEntry.windows.map(({ limiter }) => limiter.delete(consumerKey)) - ); - } - } - } else { - // Reset broader scope - this is more complex with the new hierarchy - this.logger.warn('Broad reset not implemented yet', { queueName, handler, operation }); - } - - this.logger.info('Rate limits reset', { queueName, handler, operation }); - } - - /** - * Get all configured rate limit rules - */ - getRules(): RateLimitRule[] { - return [...this.rules]; - } - - /** - * Remove a rate limit rule - */ - removeRule(level: string, queueName?: string, handler?: string, operation?: string): boolean { - const key = this.getRuleKey(level, queueName, handler, operation); - const ruleIndex = this.rules.findIndex( - r => - r.level === level && - (!queueName || r.queueName === queueName) && - (!handler || r.handler === handler) && - (!operation || r.operation === operation) - ); - - if (ruleIndex >= 0) { - this.rules.splice(ruleIndex, 1); - this.limiters.delete(key); - - this.logger.info('Rate limit rule removed', { level, queueName, handler, operation }); - return true; - } - - return false; - } -} +import type Redis from 'ioredis'; +import { RateLimiterRedis, type IRateLimiterRedisOptions } from 'rate-limiter-flexible'; +import type { RateLimitRule, RateLimitWindow } from './types'; + +// Logger interface for type safety +interface Logger { + info(message: string, meta?: Record): void; + error(message: string, meta?: Record): void; + warn(message: string, meta?: Record): void; + debug(message: string, meta?: Record): void; + trace(message: string, meta?: Record): void; +} + +interface RateLimiterEntry { + windows: Array<{ + limiter: RateLimiterRedis; + window: RateLimitWindow; + }>; + cost: number; +} + +export interface RateLimitResult { + allowed: boolean; + retryDelayMs?: number; + remainingPoints?: number; + cost?: number; +} + +/** + * Enhanced rate limiter that supports: + * - Multiple time windows per rule + * - Variable costs per operation + * - Hierarchical rules (operation > handler > queue > global) + */ +export class QueueRateLimiter { + private limiters = new Map(); + private readonly logger: Logger; + + constructor( + private redisClient: Redis, + logger?: Logger + ) { + this.logger = logger || console; + this.logger.debug('QueueRateLimiter initialized', { + redisClientType: typeof this.redisClient, + redisClientKeys: this.redisClient ? Object.keys(this.redisClient).slice(0, 10) : [], + hasClient: !!this.redisClient, + isIoRedis: this.redisClient && this.redisClient.constructor.name === 'Redis' + }); + } + + /** + * Add a rate limit rule + */ + async addRule(rule: RateLimitRule): Promise { + const key = this.getRuleKey(rule); + + // Create array of limiters for each window + const windows: RateLimiterEntry['windows'] = []; + const limits = rule.config.limits || []; + + this.logger.debug('Adding rate limit rule', { + key, + rule, + windowCount: limits.length, + cost: rule.config.cost + }); + + for (const window of limits) { + const limiterOptions: IRateLimiterRedisOptions = { + storeClient: this.redisClient as any, + keyPrefix: `rate_limit:${key}:${window.duration}s`, + points: window.points, + duration: window.duration, + blockDuration: window.blockDuration || 0, + }; + + this.logger.debug('Creating RateLimiterRedis', { + keyPrefix: limiterOptions.keyPrefix, + points: limiterOptions.points, + duration: limiterOptions.duration, + hasRedisClient: !!this.redisClient, + redisClientType: typeof this.redisClient + }); + + try { + const limiter = new RateLimiterRedis(limiterOptions); + windows.push({ limiter, window }); + } catch (error) { + this.logger.error('Failed to create RateLimiterRedis', { error, limiterOptions }); + throw error; + } + } + + // Default cost: handler defines the pool (no default cost), operations default to 1 + const defaultCost = rule.level === 'operation' ? 1 : 0; + + this.limiters.set(key, { + windows, + cost: rule.config.cost !== undefined ? rule.config.cost : defaultCost + }); + + this.logger.debug('Rate limit rule added', { + key, + level: rule.level, + handler: rule.handler, + operation: rule.operation, + windows: windows.length, + cost: rule.config.cost !== undefined ? rule.config.cost : defaultCost + }); + } + + /** + * Check if an operation is allowed based on rate limits + */ + async checkLimit(handler: string, operation?: string): Promise { + // Build keys to check from most specific to least specific + const keysToCheck: string[] = []; + + if (operation) { + keysToCheck.push(this.getRuleKey({ level: 'operation', handler, operation, config: {} } as RateLimitRule)); + } + keysToCheck.push(this.getRuleKey({ level: 'handler', handler, config: {} } as RateLimitRule)); + keysToCheck.push(this.getRuleKey({ level: 'queue', config: {} } as RateLimitRule)); + keysToCheck.push(this.getRuleKey({ level: 'global', config: {} } as RateLimitRule)); + + this.logger.debug('Checking rate limits', { + handler, + operation, + keysToCheck, + availableKeys: Array.from(this.limiters.keys()) + }); + + // First, find the cost from the most specific rule + let cost = 1; // Default cost + for (const key of keysToCheck) { + const entry = this.limiters.get(key); + if (entry && entry.cost !== undefined) { + cost = entry.cost; + this.logger.debug(`[RateLimiter] Using cost ${cost} from rule: ${key}`); + this.logger.debug('Using cost from rule', { + key, + cost, + handler, + operation + }); + break; + } + } + + // Then find the rate limit windows from the first matching rule that has windows + let windowEntry: RateLimiterEntry | undefined; + let appliedKey: string | undefined; + + for (const key of keysToCheck) { + const entry = this.limiters.get(key); + if (entry && entry.windows.length > 0) { + windowEntry = entry; + appliedKey = key; + break; + } + } + + if (!windowEntry) { + this.logger.debug('No rate limit rules found', { + handler, + operation + }); + return { allowed: true }; + } + + this.logger.debug(`[RateLimiter] Applying rate limit rule: ${appliedKey}`, { + appliedKey, + windowCount: windowEntry.windows.length, + cost, + handler, + operation + }); + + // Check all windows with the determined cost + let maxRetryDelayMs = 0; + + this.logger.debug('Checking rate limit windows', { + handler, + operation, + cost, + windowCount: windowEntry.windows.length + }); + + // Special handling for 0-cost operations - they should always pass + if (cost === 0) { + this.logger.debug(`[RateLimiter] Zero-cost operation ${handler}:${operation || 'N/A'} - allowing without consuming points`); + return { + allowed: true, + cost: 0 + }; + } + + for (const { limiter, window } of windowEntry.windows) { + // For handler-level rate limits, all operations share the same pool + // Use the applied key to determine the consumer key + const consumerKey = appliedKey?.startsWith('handler:') ? handler : + (operation ? `${handler}:${operation}` : handler); + + try { + // First check current state without consuming + const currentState = await limiter.get(consumerKey); + const now = Date.now(); + this.logger.debug(`[RateLimiter] Current state before consume:`, { + handler, + operation, + consumerKey, + cost, + window: `${window.points}pts/${window.duration}s`, + consumedPoints: currentState?.consumedPoints || 0, + remainingPoints: currentState?.remainingPoints ?? window.points, + msBeforeNext: currentState?.msBeforeNext || 0, + currentTime: new Date(now).toISOString(), + resetTime: currentState?.msBeforeNext ? new Date(now + currentState.msBeforeNext).toISOString() : 'N/A', + hasState: !!currentState, + stateDetails: currentState ? JSON.stringify(currentState) : 'no state' + }); + + // Try to consume points + this.logger.debug(`[RateLimiter] Attempting to consume ${cost} points for ${consumerKey}`); + const result = await limiter.consume(consumerKey, cost); + this.logger.debug(`[RateLimiter] Successfully consumed ${cost} points, ${result.remainingPoints} remaining`); + this.logger.debug('Consumed points successfully', { + handler, + operation, + consumerKey, + window: `${window.points}pts/${window.duration}s`, + cost, + remainingPoints: result.remainingPoints, + msBeforeNext: result.msBeforeNext + }); + this.logger.trace('Rate limit window passed', { + handler, + operation, + window: `${window.points}pts/${window.duration}s`, + cost + }); + } catch (rejRes: any) { + // Rate limit exceeded for this window + const retryDelayMs = rejRes.msBeforeNext || window.blockDuration || 1000; + maxRetryDelayMs = Math.max(maxRetryDelayMs, retryDelayMs); + + this.logger.debug(`[RateLimiter] RATE LIMIT HIT: ${consumerKey} - ${cost} points rejected, retry in ${retryDelayMs}ms`, { + consumedPoints: rejRes.consumedPoints || 0, + remainingPoints: rejRes.remainingPoints || 0, + totalPoints: rejRes.totalPoints || window.points, + isFirstInDuration: rejRes.isFirstInDuration || false, + msBeforeNext: rejRes.msBeforeNext || 0, + retryDelayMs + }); + this.logger.trace('Rate limit exceeded', { + handler, + operation, + consumerKey, + window: `${window.points}pts/${window.duration}s`, + cost, + remainingPoints: rejRes.remainingPoints || 0, + retryDelayMs: maxRetryDelayMs + }); + + return { + allowed: false, + retryDelayMs: maxRetryDelayMs, + remainingPoints: rejRes.remainingPoints || 0, + cost + }; + } + } + + // All windows passed + return { + allowed: true, + cost + }; + } + + /** + * Get rule key for storing rate limiter + */ + private getRuleKey(rule: RateLimitRule): string { + switch (rule.level) { + case 'global': + return 'global'; + case 'queue': + return `queue:${rule.queueName || 'default'}`; + case 'handler': + return `handler:${rule.handler}`; + case 'operation': + return `operation:${rule.handler}:${rule.operation}`; + default: + return rule.level; + } + } + + /** + * Get current rate limit status + */ + async getStatus(handler: string, operation?: string): Promise { + // Build keys to check from most specific to least specific + const keysToCheck: string[] = []; + + if (operation) { + keysToCheck.push(this.getRuleKey({ level: 'operation', handler, operation, config: {} } as RateLimitRule)); + } + keysToCheck.push(this.getRuleKey({ level: 'handler', handler, config: {} } as RateLimitRule)); + keysToCheck.push(this.getRuleKey({ level: 'queue', config: {} } as RateLimitRule)); + keysToCheck.push(this.getRuleKey({ level: 'global', config: {} } as RateLimitRule)); + + // Find the first matching rule + let entry: RateLimiterEntry | undefined; + let appliedKey: string | undefined; + + for (const key of keysToCheck) { + entry = this.limiters.get(key); + if (entry) { + appliedKey = key; + break; + } + } + + if (!entry) { + return { + handler, + operation, + hasRule: false + }; + } + + // Get status for all windows + const consumerKey = operation ? `${handler}:${operation}` : handler; + const windows = await Promise.all( + entry.windows.map(async ({ limiter, window }) => { + const result = await limiter.get(consumerKey); + return { + points: window.points, + duration: window.duration, + remaining: result?.remainingPoints ?? window.points, + resetIn: result?.msBeforeNext ?? 0, + }; + }) + ); + + return { + handler, + operation, + hasRule: true, + appliedKey, + cost: entry.cost, + windows + }; + } + + /** + * Reset rate limits for a specific handler/operation + */ + async reset(handler: string, operation?: string): Promise { + const keysToCheck: string[] = []; + + if (operation) { + keysToCheck.push(this.getRuleKey({ level: 'operation', handler, operation, config: {} } as RateLimitRule)); + } + keysToCheck.push(this.getRuleKey({ level: 'handler', handler, config: {} } as RateLimitRule)); + + for (const key of keysToCheck) { + const entry = this.limiters.get(key); + if (entry) { + await Promise.all( + entry.windows.map(({ limiter }) => limiter.delete(handler)) + ); + } + } + + this.logger.debug('Rate limits reset', { handler, operation }); + } +} \ No newline at end of file diff --git a/libs/core/queue/test/rate-limiter.test.ts b/libs/core/queue/test/rate-limiter.test.ts deleted file mode 100644 index 4f0b12d..0000000 --- a/libs/core/queue/test/rate-limiter.test.ts +++ /dev/null @@ -1,349 +0,0 @@ -import { beforeEach, describe, expect, it, mock } from 'bun:test'; -import { QueueRateLimiter } from '../src/rate-limiter'; -import type { RateLimitRule } from '../src/types'; - -describe('QueueRateLimiter', () => { - const mockRedisClient = { - host: 'localhost', - port: 6379, - }; - - const mockLogger = { - info: mock(() => {}), - error: mock(() => {}), - warn: mock(() => {}), - debug: mock(() => {}), - }; - - beforeEach(() => { - mockLogger.info = mock(() => {}); - mockLogger.error = mock(() => {}); - mockLogger.warn = mock(() => {}); - mockLogger.debug = mock(() => {}); - }); - - describe('constructor', () => { - it('should create rate limiter', () => { - const limiter = new QueueRateLimiter(mockRedisClient, mockLogger); - expect(limiter).toBeDefined(); - }); - }); - - describe('addRule', () => { - it('should add a rate limit rule', () => { - const limiter = new QueueRateLimiter(mockRedisClient, mockLogger); - - const rule: RateLimitRule = { - level: 'queue', - queueName: 'test-queue', - config: { - points: 100, - duration: 60, - }, - }; - - limiter.addRule(rule); - - expect(mockLogger.info).toHaveBeenCalledWith( - 'Rate limit rule added', - expect.objectContaining({ - level: 'queue', - queueName: 'test-queue', - }) - ); - }); - - it('should add operation-level rule', () => { - const limiter = new QueueRateLimiter(mockRedisClient, mockLogger); - - const rule: RateLimitRule = { - level: 'operation', - queueName: 'test-queue', - handler: 'user-service', - operation: 'process-user', - config: { - points: 10, - duration: 60, - blockDuration: 300, - }, - }; - - limiter.addRule(rule); - const rules = limiter.getRules(); - expect(rules).toHaveLength(1); - expect(rules[0]).toEqual(rule); - }); - }); - - describe('checkLimit', () => { - it('should allow when no rules apply', async () => { - const limiter = new QueueRateLimiter(mockRedisClient, mockLogger); - - const result = await limiter.checkLimit('test-queue', 'handler', 'operation'); - expect(result.allowed).toBe(true); - expect(result.appliedRule).toBeUndefined(); - }); - - it('should check against global rule', async () => { - const limiter = new QueueRateLimiter(mockRedisClient, mockLogger); - - const globalRule: RateLimitRule = { - level: 'global', - config: { points: 1000, duration: 60 }, - }; - - limiter.addRule(globalRule); - - const result = await limiter.checkLimit('any-queue', 'any-handler', 'any-op'); - // In test environment without real Redis, it returns allowed: true on error - expect(result.allowed).toBe(true); - // Check that error was logged - expect(mockLogger.error).toHaveBeenCalledWith( - 'Rate limit check failed', - expect.objectContaining({ - queueName: 'any-queue', - handler: 'any-handler', - operation: 'any-op', - }) - ); - }); - - it('should prefer more specific rules', async () => { - const limiter = new QueueRateLimiter(mockRedisClient, mockLogger); - - // Add rules from least to most specific - const globalRule: RateLimitRule = { - level: 'global', - config: { points: 1000, duration: 60 }, - }; - - const queueRule: RateLimitRule = { - level: 'queue', - queueName: 'test-queue', - config: { points: 100, duration: 60 }, - }; - - const handlerRule: RateLimitRule = { - level: 'handler', - queueName: 'test-queue', - handler: 'test-handler', - config: { points: 50, duration: 60 }, - }; - - const operationRule: RateLimitRule = { - level: 'operation', - queueName: 'test-queue', - handler: 'test-handler', - operation: 'test-op', - config: { points: 10, duration: 60 }, - }; - - limiter.addRule(globalRule); - limiter.addRule(queueRule); - limiter.addRule(handlerRule); - limiter.addRule(operationRule); - - // Operation level should take precedence - const result = await limiter.checkLimit('test-queue', 'test-handler', 'test-op'); - expect(result.allowed).toBe(true); - // Check that the most specific rule was attempted (operation level) - expect(mockLogger.error).toHaveBeenCalledWith( - 'Rate limit check failed', - expect.objectContaining({ - queueName: 'test-queue', - handler: 'test-handler', - operation: 'test-op', - }) - ); - }); - }); - - describe('getStatus', () => { - it('should get rate limit status', async () => { - const limiter = new QueueRateLimiter(mockRedisClient, mockLogger); - - const rule: RateLimitRule = { - level: 'queue', - queueName: 'test-queue', - config: { points: 100, duration: 60 }, - }; - - limiter.addRule(rule); - - const status = await limiter.getStatus('test-queue', 'handler', 'operation'); - - expect(status.queueName).toBe('test-queue'); - expect(status.handler).toBe('handler'); - expect(status.operation).toBe('operation'); - expect(status.appliedRule).toEqual(rule); - }); - - it('should return status without rule when none apply', async () => { - const limiter = new QueueRateLimiter(mockRedisClient, mockLogger); - - const status = await limiter.getStatus('test-queue', 'handler', 'operation'); - - expect(status.queueName).toBe('test-queue'); - expect(status.appliedRule).toBeUndefined(); - expect(status.limit).toBeUndefined(); - }); - }); - - describe('reset', () => { - it('should reset rate limits for specific operation', async () => { - const limiter = new QueueRateLimiter(mockRedisClient, mockLogger); - - const rule: RateLimitRule = { - level: 'operation', - queueName: 'test-queue', - handler: 'test-handler', - operation: 'test-op', - config: { points: 10, duration: 60 }, - }; - - limiter.addRule(rule); - - try { - await limiter.reset('test-queue', 'test-handler', 'test-op'); - } catch (error) { - // In test environment, limiter.delete will fail due to no Redis connection - // That's expected, just ensure the method can be called - } - - // The method should at least attempt to reset - expect(limiter.getRules()).toContain(rule); - }); - - it('should warn about broad reset', async () => { - const limiter = new QueueRateLimiter(mockRedisClient, mockLogger); - - await limiter.reset('test-queue'); - - expect(mockLogger.warn).toHaveBeenCalledWith( - 'Broad reset not implemented yet', - expect.objectContaining({ queueName: 'test-queue' }) - ); - }); - }); - - describe('removeRule', () => { - it('should remove a rule', () => { - const limiter = new QueueRateLimiter(mockRedisClient, mockLogger); - - const rule: RateLimitRule = { - level: 'queue', - queueName: 'test-queue', - config: { points: 100, duration: 60 }, - }; - - limiter.addRule(rule); - expect(limiter.getRules()).toHaveLength(1); - - const removed = limiter.removeRule('queue', 'test-queue'); - expect(removed).toBe(true); - expect(limiter.getRules()).toHaveLength(0); - }); - - it('should return false when rule not found', () => { - const limiter = new QueueRateLimiter(mockRedisClient, mockLogger); - - const removed = limiter.removeRule('queue', 'non-existent'); - expect(removed).toBe(false); - }); - }); - - describe('getRules', () => { - it('should return all configured rules', () => { - const limiter = new QueueRateLimiter(mockRedisClient, mockLogger); - - const rule1: RateLimitRule = { - level: 'global', - config: { points: 1000, duration: 60 }, - }; - - const rule2: RateLimitRule = { - level: 'queue', - queueName: 'test-queue', - config: { points: 100, duration: 60 }, - }; - - limiter.addRule(rule1); - limiter.addRule(rule2); - - const rules = limiter.getRules(); - expect(rules).toHaveLength(2); - expect(rules).toContain(rule1); - expect(rules).toContain(rule2); - }); - }); - - describe('error handling', () => { - it('should allow on rate limiter error', async () => { - const limiter = new QueueRateLimiter(mockRedisClient, mockLogger); - - // Add a rule but don't set up the actual limiter to cause an error - const rule: RateLimitRule = { - level: 'queue', - queueName: 'test-queue', - config: { points: 100, duration: 60 }, - }; - - limiter.addRule(rule); - - // Force the limiter map to be empty to simulate error - (limiter as any).limiters.clear(); - - const result = await limiter.checkLimit('test-queue', 'handler', 'operation'); - - expect(result.allowed).toBe(true); // Should allow on error - expect(mockLogger.warn).toHaveBeenCalledWith( - 'Rate limiter not found for rule', - expect.any(Object) - ); - }); - }); - - describe('hierarchical rule precedence', () => { - it('should correctly apply rule hierarchy', () => { - const limiter = new QueueRateLimiter(mockRedisClient, mockLogger); - - // Add multiple rules at different levels - const rules: RateLimitRule[] = [ - { - level: 'global', - config: { points: 10000, duration: 60 }, - }, - { - level: 'queue', - queueName: 'email-queue', - config: { points: 1000, duration: 60 }, - }, - { - level: 'handler', - queueName: 'email-queue', - handler: 'email-service', - config: { points: 100, duration: 60 }, - }, - { - level: 'operation', - queueName: 'email-queue', - handler: 'email-service', - operation: 'send-bulk', - config: { points: 10, duration: 60 }, - }, - ]; - - rules.forEach(rule => limiter.addRule(rule)); - - // Test that getMostSpecificRule works correctly - const specificRule = (limiter as any).getMostSpecificRule( - 'email-queue', - 'email-service', - 'send-bulk' - ); - - expect(specificRule?.level).toBe('operation'); - expect(specificRule?.config.points).toBe(10); - }); - }); -});