cleaned up stuff

This commit is contained in:
Boki 2025-07-06 19:23:11 -04:00
parent a7146a3f57
commit ac4c5078fa
9 changed files with 392 additions and 1204 deletions

View file

@ -123,10 +123,6 @@
"name": "yahoo", "name": "yahoo",
"enabled": true, "enabled": true,
"priority": 1, "priority": 1,
"rateLimit": {
"maxRequests": 5,
"windowMs": 60000
},
"timeout": 30000, "timeout": 30000,
"baseUrl": "https://query1.finance.yahoo.com" "baseUrl": "https://query1.finance.yahoo.com"
}, },
@ -190,10 +186,6 @@
"qm": { "concurrency": 5 }, "qm": { "concurrency": 5 },
"ib": { "concurrency": 1 }, "ib": { "concurrency": 1 },
"proxy": { "concurrency": 1 } "proxy": { "concurrency": 1 }
},
"rateLimit": {
"enabled": true,
"requestsPerSecond": 10
} }
}, },
"dataPipeline": { "dataPipeline": {
@ -213,7 +205,6 @@
}, },
"webApi": { "webApi": {
"port": 2003, "port": 2003,
"rateLimitPerMinute": 60,
"cache": { "cache": {
"ttl": 300, "ttl": 300,
"checkPeriod": 60 "checkPeriod": 60

View file

@ -1,67 +1,10 @@
import { z } from 'zod'; // Re-export provider schemas from core config
export {
// Base provider configuration providerConfigSchema as providersSchema,
export const baseProviderConfigSchema = z.object({ type ProviderName,
name: z.string(), providerSchemas,
enabled: z.boolean().default(true), eodProviderConfigSchema,
priority: z.number().default(0), ibProviderConfigSchema,
rateLimit: z qmProviderConfigSchema,
.object({ yahooProviderConfigSchema,
maxRequests: z.number().default(100), } from '@stock-bot/config';
windowMs: z.number().default(60000),
})
.optional(),
timeout: z.number().default(30000),
retries: z.number().default(3),
});
// EOD Historical Data provider
export const eodProviderConfigSchema = baseProviderConfigSchema.extend({
apiKey: z.string(),
baseUrl: z.string().default('https://eodhistoricaldata.com/api'),
tier: z.enum(['free', 'fundamentals', 'all-in-one']).default('free'),
});
// Interactive Brokers provider
export const ibProviderConfigSchema = baseProviderConfigSchema.extend({
gateway: z.object({
host: z.string().default('localhost'),
port: z.number().default(5000),
clientId: z.number().default(1),
}),
account: z.string().optional(),
marketDataType: z.enum(['live', 'delayed', 'frozen']).default('delayed'),
});
// QuoteMedia provider
export const qmProviderConfigSchema = baseProviderConfigSchema.extend({
username: z.string(),
password: z.string(),
baseUrl: z.string().default('https://app.quotemedia.com/quotetools'),
webmasterId: z.string(),
});
// Yahoo Finance provider
export const yahooProviderConfigSchema = baseProviderConfigSchema.extend({
baseUrl: z.string().default('https://query1.finance.yahoo.com'),
cookieJar: z.boolean().default(true),
crumb: z.string().optional(),
});
// Combined provider configuration
export const providersSchema = z.object({
eod: eodProviderConfigSchema.optional(),
ib: ibProviderConfigSchema.optional(),
qm: qmProviderConfigSchema.optional(),
yahoo: yahooProviderConfigSchema.optional(),
});
// Dynamic provider configuration type
export type ProviderName = 'eod' | 'ib' | 'qm' | 'yahoo';
export const providerSchemas = {
eod: eodProviderConfigSchema,
ib: ibProviderConfigSchema,
qm: qmProviderConfigSchema,
yahoo: yahooProviderConfigSchema,
} as const;

View file

@ -41,12 +41,6 @@ export const stockAppSchema = baseAppSchema.extend({
}) })
) )
.optional(), .optional(),
rateLimit: z
.object({
enabled: z.boolean().default(true),
requestsPerSecond: z.number().default(10),
})
.optional(),
}) })
.optional(), .optional(),
dataPipeline: z dataPipeline: z
@ -74,7 +68,6 @@ export const stockAppSchema = baseAppSchema.extend({
webApi: z webApi: z
.object({ .object({
port: z.number().default(2003), port: z.number().default(2003),
rateLimitPerMinute: z.number().default(60),
cache: z cache: z
.object({ .object({
ttl: z.number().default(300), ttl: z.number().default(300),

View file

@ -5,12 +5,6 @@ export const baseProviderConfigSchema = z.object({
name: z.string(), name: z.string(),
enabled: z.boolean().default(true), enabled: z.boolean().default(true),
priority: z.number().default(0), priority: z.number().default(0),
rateLimit: z
.object({
maxRequests: z.number().default(100),
windowMs: z.number().default(60000),
})
.optional(),
timeout: z.number().default(30000), timeout: z.number().default(30000),
retries: z.number().default(3), retries: z.number().default(3),
}); });

View file

@ -20,7 +20,7 @@ export { DeadLetterQueueHandler } from './dlq-handler';
export { QueueMetricsCollector } from './queue-metrics'; export { QueueMetricsCollector } from './queue-metrics';
// Rate limiting // Rate limiting
export { QueueRateLimiter } from './rate-limiter-new'; export { QueueRateLimiter } from './rate-limiter';
// Types // Types
export type { export type {

View file

@ -5,7 +5,7 @@ import type { HandlerRegistry } from '@stock-bot/handler-registry';
import { getLogger } from '@stock-bot/logger'; import { getLogger } from '@stock-bot/logger';
import Redis from 'ioredis'; import Redis from 'ioredis';
import { Queue, type QueueWorkerConfig } from './queue'; import { Queue, type QueueWorkerConfig } from './queue';
import { QueueRateLimiter } from './rate-limiter-new'; import { QueueRateLimiter } from './rate-limiter';
import { getFullQueueName, parseQueueName } from './service-utils'; import { getFullQueueName, parseQueueName } from './service-utils';
import type { import type {
GlobalStats, GlobalStats,

View file

@ -1,380 +0,0 @@
import { RateLimiterRedis, type IRateLimiterRedisOptions } from 'rate-limiter-flexible';
import type Redis from 'ioredis';
import type { RateLimitWindow, RateLimitConfig, RateLimitRule } from './types';
// Logger interface for type safety
interface Logger {
info(message: string, meta?: Record<string, unknown>): void;
error(message: string, meta?: Record<string, unknown>): void;
warn(message: string, meta?: Record<string, unknown>): void;
debug(message: string, meta?: Record<string, unknown>): void;
trace(message: string, meta?: Record<string, unknown>): void;
}
interface RateLimiterEntry {
windows: Array<{
limiter: RateLimiterRedis;
window: RateLimitWindow;
}>;
cost: number;
}
export interface RateLimitResult {
allowed: boolean;
retryDelayMs?: number;
remainingPoints?: number;
cost?: number;
}
/**
* Enhanced rate limiter that supports:
* - Multiple time windows per rule
* - Variable costs per operation
* - Hierarchical rules (operation > handler > queue > global)
*/
export class QueueRateLimiter {
private limiters = new Map<string, RateLimiterEntry>();
private readonly logger: Logger;
constructor(
private redisClient: Redis,
logger?: Logger
) {
this.logger = logger || console;
this.logger.debug('QueueRateLimiter initialized', {
redisClientType: typeof this.redisClient,
redisClientKeys: this.redisClient ? Object.keys(this.redisClient).slice(0, 10) : [],
hasClient: !!this.redisClient,
isIoRedis: this.redisClient && this.redisClient.constructor.name === 'Redis'
});
}
/**
* Add a rate limit rule
*/
async addRule(rule: RateLimitRule): Promise<void> {
const key = this.getRuleKey(rule);
// Create array of limiters for each window
const windows: RateLimiterEntry['windows'] = [];
const limits = rule.config.limits || [];
this.logger.debug('Adding rate limit rule', {
key,
rule,
windowCount: limits.length,
cost: rule.config.cost
});
for (const window of limits) {
const limiterOptions: IRateLimiterRedisOptions = {
storeClient: this.redisClient as any,
keyPrefix: `rate_limit:${key}:${window.duration}s`,
points: window.points,
duration: window.duration,
blockDuration: window.blockDuration || 0,
};
this.logger.debug('Creating RateLimiterRedis', {
keyPrefix: limiterOptions.keyPrefix,
points: limiterOptions.points,
duration: limiterOptions.duration,
hasRedisClient: !!this.redisClient,
redisClientType: typeof this.redisClient
});
try {
const limiter = new RateLimiterRedis(limiterOptions);
windows.push({ limiter, window });
} catch (error) {
this.logger.error('Failed to create RateLimiterRedis', { error, limiterOptions });
throw error;
}
}
// Default cost: handler defines the pool (no default cost), operations default to 1
const defaultCost = rule.level === 'operation' ? 1 : 0;
this.limiters.set(key, {
windows,
cost: rule.config.cost !== undefined ? rule.config.cost : defaultCost
});
this.logger.info('Rate limit rule added', {
key,
level: rule.level,
handler: rule.handler,
operation: rule.operation,
windows: windows.length,
cost: rule.config.cost !== undefined ? rule.config.cost : defaultCost
});
}
/**
* Check if an operation is allowed based on rate limits
*/
async checkLimit(handler: string, operation?: string): Promise<RateLimitResult> {
// Build keys to check from most specific to least specific
const keysToCheck: string[] = [];
if (operation) {
keysToCheck.push(this.getRuleKey({ level: 'operation', handler, operation, config: {} } as RateLimitRule));
}
keysToCheck.push(this.getRuleKey({ level: 'handler', handler, config: {} } as RateLimitRule));
keysToCheck.push(this.getRuleKey({ level: 'queue', config: {} } as RateLimitRule));
keysToCheck.push(this.getRuleKey({ level: 'global', config: {} } as RateLimitRule));
this.logger.debug('Checking rate limits', {
handler,
operation,
keysToCheck,
availableKeys: Array.from(this.limiters.keys())
});
// First, find the cost from the most specific rule
let cost = 1; // Default cost
for (const key of keysToCheck) {
const entry = this.limiters.get(key);
if (entry && entry.cost !== undefined) {
cost = entry.cost;
this.logger.info(`[RateLimiter] Using cost ${cost} from rule: ${key}`);
this.logger.debug('Using cost from rule', {
key,
cost,
handler,
operation
});
break;
}
}
// Then find the rate limit windows from the first matching rule that has windows
let windowEntry: RateLimiterEntry | undefined;
let appliedKey: string | undefined;
for (const key of keysToCheck) {
const entry = this.limiters.get(key);
if (entry && entry.windows.length > 0) {
windowEntry = entry;
appliedKey = key;
break;
}
}
if (!windowEntry) {
this.logger.debug('No rate limit rules found', {
handler,
operation
});
return { allowed: true };
}
this.logger.info(`[RateLimiter] Applying rate limit rule: ${appliedKey}`, {
appliedKey,
windowCount: windowEntry.windows.length,
cost,
handler,
operation
});
// Check all windows with the determined cost
let maxRetryDelayMs = 0;
this.logger.debug('Checking rate limit windows', {
handler,
operation,
cost,
windowCount: windowEntry.windows.length
});
// Special handling for 0-cost operations - they should always pass
if (cost === 0) {
this.logger.info(`[RateLimiter] Zero-cost operation ${handler}:${operation || 'N/A'} - allowing without consuming points`);
return {
allowed: true,
cost: 0
};
}
for (const { limiter, window } of windowEntry.windows) {
// For handler-level rate limits, all operations share the same pool
// Use the applied key to determine the consumer key
const consumerKey = appliedKey?.startsWith('handler:') ? handler :
(operation ? `${handler}:${operation}` : handler);
try {
// First check current state without consuming
const currentState = await limiter.get(consumerKey);
const now = Date.now();
this.logger.info(`[RateLimiter] Current state before consume:`, {
handler,
operation,
consumerKey,
cost,
window: `${window.points}pts/${window.duration}s`,
consumedPoints: currentState?.consumedPoints || 0,
remainingPoints: currentState?.remainingPoints ?? window.points,
msBeforeNext: currentState?.msBeforeNext || 0,
currentTime: new Date(now).toISOString(),
resetTime: currentState?.msBeforeNext ? new Date(now + currentState.msBeforeNext).toISOString() : 'N/A',
hasState: !!currentState,
stateDetails: currentState ? JSON.stringify(currentState) : 'no state'
});
// Try to consume points
this.logger.info(`[RateLimiter] Attempting to consume ${cost} points for ${consumerKey}`);
const result = await limiter.consume(consumerKey, cost);
this.logger.info(`[RateLimiter] Successfully consumed ${cost} points, ${result.remainingPoints} remaining`);
this.logger.debug('Consumed points successfully', {
handler,
operation,
consumerKey,
window: `${window.points}pts/${window.duration}s`,
cost,
remainingPoints: result.remainingPoints,
msBeforeNext: result.msBeforeNext
});
this.logger.trace('Rate limit window passed', {
handler,
operation,
window: `${window.points}pts/${window.duration}s`,
cost
});
} catch (rejRes: any) {
// Rate limit exceeded for this window
const retryDelayMs = rejRes.msBeforeNext || window.blockDuration || 1000;
maxRetryDelayMs = Math.max(maxRetryDelayMs, retryDelayMs);
this.logger.info(`[RateLimiter] RATE LIMIT HIT: ${consumerKey} - ${cost} points rejected, retry in ${retryDelayMs}ms`, {
consumedPoints: rejRes.consumedPoints || 0,
remainingPoints: rejRes.remainingPoints || 0,
totalPoints: rejRes.totalPoints || window.points,
isFirstInDuration: rejRes.isFirstInDuration || false,
msBeforeNext: rejRes.msBeforeNext || 0,
retryDelayMs
});
this.logger.trace('Rate limit exceeded', {
handler,
operation,
consumerKey,
window: `${window.points}pts/${window.duration}s`,
cost,
remainingPoints: rejRes.remainingPoints || 0,
retryDelayMs: maxRetryDelayMs
});
return {
allowed: false,
retryDelayMs: maxRetryDelayMs,
remainingPoints: rejRes.remainingPoints || 0,
cost
};
}
}
// All windows passed
return {
allowed: true,
cost
};
}
/**
* Get rule key for storing rate limiter
*/
private getRuleKey(rule: RateLimitRule): string {
switch (rule.level) {
case 'global':
return 'global';
case 'queue':
return `queue:${rule.queueName || 'default'}`;
case 'handler':
return `handler:${rule.handler}`;
case 'operation':
return `operation:${rule.handler}:${rule.operation}`;
default:
return rule.level;
}
}
/**
* Get current rate limit status
*/
async getStatus(handler: string, operation?: string): Promise<any> {
// Build keys to check from most specific to least specific
const keysToCheck: string[] = [];
if (operation) {
keysToCheck.push(this.getRuleKey({ level: 'operation', handler, operation, config: {} } as RateLimitRule));
}
keysToCheck.push(this.getRuleKey({ level: 'handler', handler, config: {} } as RateLimitRule));
keysToCheck.push(this.getRuleKey({ level: 'queue', config: {} } as RateLimitRule));
keysToCheck.push(this.getRuleKey({ level: 'global', config: {} } as RateLimitRule));
// Find the first matching rule
let entry: RateLimiterEntry | undefined;
let appliedKey: string | undefined;
for (const key of keysToCheck) {
entry = this.limiters.get(key);
if (entry) {
appliedKey = key;
break;
}
}
if (!entry) {
return {
handler,
operation,
hasRule: false
};
}
// Get status for all windows
const consumerKey = operation ? `${handler}:${operation}` : handler;
const windows = await Promise.all(
entry.windows.map(async ({ limiter, window }) => {
const result = await limiter.get(consumerKey);
return {
points: window.points,
duration: window.duration,
remaining: result?.remainingPoints ?? window.points,
resetIn: result?.msBeforeNext ?? 0,
};
})
);
return {
handler,
operation,
hasRule: true,
appliedKey,
cost: entry.cost,
windows
};
}
/**
* Reset rate limits for a specific handler/operation
*/
async reset(handler: string, operation?: string): Promise<void> {
const keysToCheck: string[] = [];
if (operation) {
keysToCheck.push(this.getRuleKey({ level: 'operation', handler, operation, config: {} } as RateLimitRule));
}
keysToCheck.push(this.getRuleKey({ level: 'handler', handler, config: {} } as RateLimitRule));
for (const key of keysToCheck) {
const entry = this.limiters.get(key);
if (entry) {
await Promise.all(
entry.windows.map(({ limiter }) => limiter.delete(handler))
);
}
}
this.logger.info('Rate limits reset', { handler, operation });
}
}

View file

@ -1,384 +1,380 @@
import { RateLimiterRedis, RateLimiterRes } from 'rate-limiter-flexible'; import type Redis from 'ioredis';
import type { RateLimitWindow, RateLimitConfig, RateLimitRule } from './types'; import { RateLimiterRedis, type IRateLimiterRedisOptions } from 'rate-limiter-flexible';
import type { RateLimitRule, RateLimitWindow } from './types';
// Logger interface for type safety
interface Logger { // Logger interface for type safety
info(message: string, meta?: Record<string, unknown>): void; interface Logger {
error(message: string, meta?: Record<string, unknown>): void; info(message: string, meta?: Record<string, unknown>): void;
warn(message: string, meta?: Record<string, unknown>): void; error(message: string, meta?: Record<string, unknown>): void;
debug(message: string, meta?: Record<string, unknown>): void; warn(message: string, meta?: Record<string, unknown>): void;
} debug(message: string, meta?: Record<string, unknown>): void;
trace(message: string, meta?: Record<string, unknown>): void;
interface RateLimiterEntry { }
windows: Array<{
limiter: RateLimiterRedis; interface RateLimiterEntry {
window: RateLimitWindow; windows: Array<{
}>; limiter: RateLimiterRedis;
cost: number; window: RateLimitWindow;
} }>;
cost: number;
export class QueueRateLimiter { }
private limiters = new Map<string, RateLimiterEntry>();
private rules: RateLimitRule[] = []; export interface RateLimitResult {
private readonly logger: Logger; allowed: boolean;
retryDelayMs?: number;
constructor( remainingPoints?: number;
private redisClient: ReturnType<typeof import('./utils').getRedisConnection>, cost?: number;
logger?: Logger }
) {
this.logger = logger || console; /**
} * Enhanced rate limiter that supports:
* - Multiple time windows per rule
/** * - Variable costs per operation
* Add a rate limit rule * - Hierarchical rules (operation > handler > queue > global)
*/ */
addRule(rule: RateLimitRule): void { export class QueueRateLimiter {
this.rules.push(rule); private limiters = new Map<string, RateLimiterEntry>();
private readonly logger: Logger;
const key = this.getRuleKey(rule.level, rule.queueName, rule.handler, rule.operation);
constructor(
// Extract limits and cost from config private redisClient: Redis,
const limits = rule.config.limits || []; logger?: Logger
const cost = rule.config.cost || 1; ) {
this.logger = logger || console;
// Create rate limiters for each window this.logger.debug('QueueRateLimiter initialized', {
const windows = limits.map((window, index) => { redisClientType: typeof this.redisClient,
const limiter = new RateLimiterRedis({ redisClientKeys: this.redisClient ? Object.keys(this.redisClient).slice(0, 10) : [],
storeClient: this.redisClient, hasClient: !!this.redisClient,
keyPrefix: `rl:${key}:${index}`, isIoRedis: this.redisClient && this.redisClient.constructor.name === 'Redis'
points: window.points, });
duration: window.duration, }
blockDuration: window.blockDuration || 0,
}); /**
* Add a rate limit rule
return { limiter, window }; */
}); async addRule(rule: RateLimitRule): Promise<void> {
const key = this.getRuleKey(rule);
this.limiters.set(key, { windows, cost });
// Create array of limiters for each window
this.logger.info('Rate limit rule added', { const windows: RateLimiterEntry['windows'] = [];
level: rule.level, const limits = rule.config.limits || [];
queueName: rule.queueName,
handler: rule.handler, this.logger.debug('Adding rate limit rule', {
operation: rule.operation, key,
windows: limits.length, rule,
cost, windowCount: limits.length,
}); cost: rule.config.cost
} });
/** for (const window of limits) {
* Check if a job can be processed based on rate limits const limiterOptions: IRateLimiterRedisOptions = {
* Uses hierarchical precedence: operation > handler > queue > global storeClient: this.redisClient as any,
* The most specific matching rule takes precedence keyPrefix: `rate_limit:${key}:${window.duration}s`,
* Returns the longest wait time if multiple windows are hit points: window.points,
*/ duration: window.duration,
async checkLimit( blockDuration: window.blockDuration || 0,
queueName: string, };
handler: string,
operation: string this.logger.debug('Creating RateLimiterRedis', {
): Promise<{ keyPrefix: limiterOptions.keyPrefix,
allowed: boolean; points: limiterOptions.points,
retryAfter?: number; duration: limiterOptions.duration,
remainingPoints?: number; hasRedisClient: !!this.redisClient,
appliedRule?: RateLimitRule; redisClientType: typeof this.redisClient
cost?: number; });
}> {
const applicableRule = this.getMostSpecificRule(queueName, handler, operation); try {
const limiter = new RateLimiterRedis(limiterOptions);
if (!applicableRule) { windows.push({ limiter, window });
return { allowed: true }; } catch (error) {
} this.logger.error('Failed to create RateLimiterRedis', { error, limiterOptions });
throw error;
const key = this.getRuleKey( }
applicableRule.level, }
applicableRule.queueName,
applicableRule.handler, // Default cost: handler defines the pool (no default cost), operations default to 1
applicableRule.operation const defaultCost = rule.level === 'operation' ? 1 : 0;
);
const limiterEntry = this.limiters.get(key); this.limiters.set(key, {
windows,
if (!limiterEntry || limiterEntry.windows.length === 0) { cost: rule.config.cost !== undefined ? rule.config.cost : defaultCost
this.logger.warn('Rate limiter not found for rule', { key, rule: applicableRule }); });
return { allowed: true };
} this.logger.debug('Rate limit rule added', {
key,
const consumerKey = this.getConsumerKey(queueName, handler, operation); level: rule.level,
const cost = limiterEntry.cost; handler: rule.handler,
operation: rule.operation,
// Check all windows and collect results windows: windows.length,
const windowResults = await Promise.all( cost: rule.config.cost !== undefined ? rule.config.cost : defaultCost
limiterEntry.windows.map(async ({ limiter, window }) => { });
try { }
// Try to consume points for this window
const result = await limiter.consume(consumerKey, cost); /**
return { * Check if an operation is allowed based on rate limits
allowed: true, */
remainingPoints: result.remainingPoints, async checkLimit(handler: string, operation?: string): Promise<RateLimitResult> {
retryAfter: 0, // Build keys to check from most specific to least specific
}; const keysToCheck: string[] = [];
} catch (rejRes) {
if (rejRes instanceof RateLimiterRes) { if (operation) {
return { keysToCheck.push(this.getRuleKey({ level: 'operation', handler, operation, config: {} } as RateLimitRule));
allowed: false, }
remainingPoints: rejRes.remainingPoints, keysToCheck.push(this.getRuleKey({ level: 'handler', handler, config: {} } as RateLimitRule));
retryAfter: rejRes.msBeforeNext, keysToCheck.push(this.getRuleKey({ level: 'queue', config: {} } as RateLimitRule));
}; keysToCheck.push(this.getRuleKey({ level: 'global', config: {} } as RateLimitRule));
}
throw rejRes; this.logger.debug('Checking rate limits', {
} handler,
}) operation,
); keysToCheck,
availableKeys: Array.from(this.limiters.keys())
// Find if any window rejected the request });
const rejectedWindow = windowResults.find(r => !r.allowed);
// First, find the cost from the most specific rule
if (rejectedWindow) { let cost = 1; // Default cost
// Find the longest wait time among all rejected windows for (const key of keysToCheck) {
const maxRetryAfter = Math.max( const entry = this.limiters.get(key);
...windowResults.filter(r => !r.allowed).map(r => r.retryAfter || 0) if (entry && entry.cost !== undefined) {
); cost = entry.cost;
this.logger.debug(`[RateLimiter] Using cost ${cost} from rule: ${key}`);
this.logger.warn('Rate limit exceeded', { this.logger.debug('Using cost from rule', {
handler, key,
operation, cost,
cost, handler,
retryAfter: maxRetryAfter, operation
}); });
break;
return { }
allowed: false, }
retryAfter: maxRetryAfter,
remainingPoints: rejectedWindow.remainingPoints, // Then find the rate limit windows from the first matching rule that has windows
appliedRule: applicableRule, let windowEntry: RateLimiterEntry | undefined;
cost, let appliedKey: string | undefined;
};
} for (const key of keysToCheck) {
const entry = this.limiters.get(key);
// All windows allowed - return the minimum remaining points if (entry && entry.windows.length > 0) {
const minRemainingPoints = Math.min(...windowResults.map(r => r.remainingPoints || 0)); windowEntry = entry;
appliedKey = key;
return { break;
allowed: true, }
remainingPoints: minRemainingPoints, }
appliedRule: applicableRule,
cost, if (!windowEntry) {
}; this.logger.debug('No rate limit rules found', {
} handler,
operation
/** });
* Get the most specific rule that applies to this job return { allowed: true };
* Precedence: operation > handler > queue > global }
*/
private getMostSpecificRule( this.logger.debug(`[RateLimiter] Applying rate limit rule: ${appliedKey}`, {
queueName: string, appliedKey,
handler: string, windowCount: windowEntry.windows.length,
operation: string cost,
): RateLimitRule | undefined { handler,
// 1. Check for operation-specific rule (most specific) operation
let rule = this.rules.find( });
r =>
r.level === 'operation' && // Check all windows with the determined cost
r.queueName === queueName && let maxRetryDelayMs = 0;
r.handler === handler &&
r.operation === operation this.logger.debug('Checking rate limit windows', {
); handler,
if (rule) { operation,
return rule; cost,
} windowCount: windowEntry.windows.length
});
// 2. Check for handler-specific rule
rule = this.rules.find( // Special handling for 0-cost operations - they should always pass
r => r.level === 'handler' && r.queueName === queueName && r.handler === handler if (cost === 0) {
); this.logger.debug(`[RateLimiter] Zero-cost operation ${handler}:${operation || 'N/A'} - allowing without consuming points`);
if (rule) { return {
return rule; allowed: true,
} cost: 0
};
// 3. Check for queue-specific rule }
rule = this.rules.find(r => r.level === 'queue' && r.queueName === queueName);
if (rule) { for (const { limiter, window } of windowEntry.windows) {
return rule; // For handler-level rate limits, all operations share the same pool
} // Use the applied key to determine the consumer key
const consumerKey = appliedKey?.startsWith('handler:') ? handler :
// 4. Check for global rule (least specific) (operation ? `${handler}:${operation}` : handler);
rule = this.rules.find(r => r.level === 'global');
return rule; try {
} // First check current state without consuming
const currentState = await limiter.get(consumerKey);
const now = Date.now();
/** this.logger.debug(`[RateLimiter] Current state before consume:`, {
* Get rule key for storing rate limiter handler,
*/ operation,
private getRuleKey( consumerKey,
level: string, cost,
queueName?: string, window: `${window.points}pts/${window.duration}s`,
handler?: string, consumedPoints: currentState?.consumedPoints || 0,
operation?: string remainingPoints: currentState?.remainingPoints ?? window.points,
): string { msBeforeNext: currentState?.msBeforeNext || 0,
switch (level) { currentTime: new Date(now).toISOString(),
case 'global': resetTime: currentState?.msBeforeNext ? new Date(now + currentState.msBeforeNext).toISOString() : 'N/A',
return 'global'; hasState: !!currentState,
case 'queue': stateDetails: currentState ? JSON.stringify(currentState) : 'no state'
return `queue:${queueName}`; });
case 'handler':
return `handler:${queueName}:${handler}`; // Try to consume points
case 'operation': this.logger.debug(`[RateLimiter] Attempting to consume ${cost} points for ${consumerKey}`);
return `operation:${queueName}:${handler}:${operation}`; const result = await limiter.consume(consumerKey, cost);
default: this.logger.debug(`[RateLimiter] Successfully consumed ${cost} points, ${result.remainingPoints} remaining`);
return level; this.logger.debug('Consumed points successfully', {
} handler,
} operation,
consumerKey,
/** window: `${window.points}pts/${window.duration}s`,
* Get consumer key for rate limiting (what gets counted) cost,
*/ remainingPoints: result.remainingPoints,
private getConsumerKey(queueName: string, handler: string, operation: string): string { msBeforeNext: result.msBeforeNext
return `${queueName}:${handler}:${operation}`; });
} this.logger.trace('Rate limit window passed', {
handler,
/** operation,
* Get current rate limit status for a queue/handler/operation window: `${window.points}pts/${window.duration}s`,
*/ cost
async getStatus( });
queueName: string, } catch (rejRes: any) {
handler: string, // Rate limit exceeded for this window
operation: string const retryDelayMs = rejRes.msBeforeNext || window.blockDuration || 1000;
): Promise<{ maxRetryDelayMs = Math.max(maxRetryDelayMs, retryDelayMs);
queueName: string;
handler: string; this.logger.debug(`[RateLimiter] RATE LIMIT HIT: ${consumerKey} - ${cost} points rejected, retry in ${retryDelayMs}ms`, {
operation: string; consumedPoints: rejRes.consumedPoints || 0,
appliedRule?: RateLimitRule; remainingPoints: rejRes.remainingPoints || 0,
cost?: number; totalPoints: rejRes.totalPoints || window.points,
windows?: Array<{ isFirstInDuration: rejRes.isFirstInDuration || false,
points: number; msBeforeNext: rejRes.msBeforeNext || 0,
duration: number; retryDelayMs
remaining: number; });
resetIn: number; this.logger.trace('Rate limit exceeded', {
}>; handler,
}> { operation,
const applicableRule = this.getMostSpecificRule(queueName, handler, operation); consumerKey,
window: `${window.points}pts/${window.duration}s`,
if (!applicableRule) { cost,
return { remainingPoints: rejRes.remainingPoints || 0,
queueName, retryDelayMs: maxRetryDelayMs
handler, });
operation,
}; return {
} allowed: false,
retryDelayMs: maxRetryDelayMs,
const key = this.getRuleKey( remainingPoints: rejRes.remainingPoints || 0,
applicableRule.level, cost
applicableRule.queueName, };
applicableRule.handler, }
applicableRule.operation }
);
const limiterEntry = this.limiters.get(key); // All windows passed
return {
if (!limiterEntry || limiterEntry.windows.length === 0) { allowed: true,
return { cost
queueName, };
handler, }
operation,
appliedRule: applicableRule, /**
}; * Get rule key for storing rate limiter
} */
private getRuleKey(rule: RateLimitRule): string {
try { switch (rule.level) {
const consumerKey = this.getConsumerKey(queueName, handler, operation); case 'global':
return 'global';
// Get status for all windows case 'queue':
const windows = await Promise.all( return `queue:${rule.queueName || 'default'}`;
limiterEntry.windows.map(async ({ limiter, window }) => { case 'handler':
const result = await limiter.get(consumerKey); return `handler:${rule.handler}`;
return { case 'operation':
points: window.points, return `operation:${rule.handler}:${rule.operation}`;
duration: window.duration, default:
remaining: result?.remainingPoints ?? window.points, return rule.level;
resetIn: result?.msBeforeNext ?? 0, }
}; }
})
); /**
* Get current rate limit status
return { */
queueName, async getStatus(handler: string, operation?: string): Promise<any> {
handler, // Build keys to check from most specific to least specific
operation, const keysToCheck: string[] = [];
appliedRule: applicableRule,
cost: limiterEntry.cost, if (operation) {
windows, keysToCheck.push(this.getRuleKey({ level: 'operation', handler, operation, config: {} } as RateLimitRule));
}; }
} catch (error) { keysToCheck.push(this.getRuleKey({ level: 'handler', handler, config: {} } as RateLimitRule));
this.logger.error('Failed to get rate limit status', { keysToCheck.push(this.getRuleKey({ level: 'queue', config: {} } as RateLimitRule));
queueName, keysToCheck.push(this.getRuleKey({ level: 'global', config: {} } as RateLimitRule));
handler,
operation, // Find the first matching rule
error, let entry: RateLimiterEntry | undefined;
}); let appliedKey: string | undefined;
return {
queueName, for (const key of keysToCheck) {
handler, entry = this.limiters.get(key);
operation, if (entry) {
appliedRule: applicableRule, appliedKey = key;
cost: limiterEntry.cost, break;
}; }
} }
}
if (!entry) {
/** return {
* Reset rate limits for a specific consumer handler,
*/ operation,
async reset(queueName: string, handler?: string, operation?: string): Promise<void> { hasRule: false
if (handler && operation) { };
// Reset specific operation }
const consumerKey = this.getConsumerKey(queueName, handler, operation);
const rule = this.getMostSpecificRule(queueName, handler, operation); // Get status for all windows
const consumerKey = operation ? `${handler}:${operation}` : handler;
if (rule) { const windows = await Promise.all(
const key = this.getRuleKey(rule.level, rule.queueName, rule.handler, rule.operation); entry.windows.map(async ({ limiter, window }) => {
const limiterEntry = this.limiters.get(key); const result = await limiter.get(consumerKey);
if (limiterEntry) { return {
// Reset all windows for this consumer points: window.points,
await Promise.all( duration: window.duration,
limiterEntry.windows.map(({ limiter }) => limiter.delete(consumerKey)) remaining: result?.remainingPoints ?? window.points,
); resetIn: result?.msBeforeNext ?? 0,
} };
} })
} else { );
// Reset broader scope - this is more complex with the new hierarchy
this.logger.warn('Broad reset not implemented yet', { queueName, handler, operation }); return {
} handler,
operation,
this.logger.info('Rate limits reset', { queueName, handler, operation }); hasRule: true,
} appliedKey,
cost: entry.cost,
/** windows
* Get all configured rate limit rules };
*/ }
getRules(): RateLimitRule[] {
return [...this.rules]; /**
} * Reset rate limits for a specific handler/operation
*/
/** async reset(handler: string, operation?: string): Promise<void> {
* Remove a rate limit rule const keysToCheck: string[] = [];
*/
removeRule(level: string, queueName?: string, handler?: string, operation?: string): boolean { if (operation) {
const key = this.getRuleKey(level, queueName, handler, operation); keysToCheck.push(this.getRuleKey({ level: 'operation', handler, operation, config: {} } as RateLimitRule));
const ruleIndex = this.rules.findIndex( }
r => keysToCheck.push(this.getRuleKey({ level: 'handler', handler, config: {} } as RateLimitRule));
r.level === level &&
(!queueName || r.queueName === queueName) && for (const key of keysToCheck) {
(!handler || r.handler === handler) && const entry = this.limiters.get(key);
(!operation || r.operation === operation) if (entry) {
); await Promise.all(
entry.windows.map(({ limiter }) => limiter.delete(handler))
if (ruleIndex >= 0) { );
this.rules.splice(ruleIndex, 1); }
this.limiters.delete(key); }
this.logger.info('Rate limit rule removed', { level, queueName, handler, operation }); this.logger.debug('Rate limits reset', { handler, operation });
return true; }
} }
return false;
}
}

View file

@ -1,349 +0,0 @@
import { beforeEach, describe, expect, it, mock } from 'bun:test';
import { QueueRateLimiter } from '../src/rate-limiter';
import type { RateLimitRule } from '../src/types';
describe('QueueRateLimiter', () => {
const mockRedisClient = {
host: 'localhost',
port: 6379,
};
const mockLogger = {
info: mock(() => {}),
error: mock(() => {}),
warn: mock(() => {}),
debug: mock(() => {}),
};
beforeEach(() => {
mockLogger.info = mock(() => {});
mockLogger.error = mock(() => {});
mockLogger.warn = mock(() => {});
mockLogger.debug = mock(() => {});
});
describe('constructor', () => {
it('should create rate limiter', () => {
const limiter = new QueueRateLimiter(mockRedisClient, mockLogger);
expect(limiter).toBeDefined();
});
});
describe('addRule', () => {
it('should add a rate limit rule', () => {
const limiter = new QueueRateLimiter(mockRedisClient, mockLogger);
const rule: RateLimitRule = {
level: 'queue',
queueName: 'test-queue',
config: {
points: 100,
duration: 60,
},
};
limiter.addRule(rule);
expect(mockLogger.info).toHaveBeenCalledWith(
'Rate limit rule added',
expect.objectContaining({
level: 'queue',
queueName: 'test-queue',
})
);
});
it('should add operation-level rule', () => {
const limiter = new QueueRateLimiter(mockRedisClient, mockLogger);
const rule: RateLimitRule = {
level: 'operation',
queueName: 'test-queue',
handler: 'user-service',
operation: 'process-user',
config: {
points: 10,
duration: 60,
blockDuration: 300,
},
};
limiter.addRule(rule);
const rules = limiter.getRules();
expect(rules).toHaveLength(1);
expect(rules[0]).toEqual(rule);
});
});
describe('checkLimit', () => {
it('should allow when no rules apply', async () => {
const limiter = new QueueRateLimiter(mockRedisClient, mockLogger);
const result = await limiter.checkLimit('test-queue', 'handler', 'operation');
expect(result.allowed).toBe(true);
expect(result.appliedRule).toBeUndefined();
});
it('should check against global rule', async () => {
const limiter = new QueueRateLimiter(mockRedisClient, mockLogger);
const globalRule: RateLimitRule = {
level: 'global',
config: { points: 1000, duration: 60 },
};
limiter.addRule(globalRule);
const result = await limiter.checkLimit('any-queue', 'any-handler', 'any-op');
// In test environment without real Redis, it returns allowed: true on error
expect(result.allowed).toBe(true);
// Check that error was logged
expect(mockLogger.error).toHaveBeenCalledWith(
'Rate limit check failed',
expect.objectContaining({
queueName: 'any-queue',
handler: 'any-handler',
operation: 'any-op',
})
);
});
it('should prefer more specific rules', async () => {
const limiter = new QueueRateLimiter(mockRedisClient, mockLogger);
// Add rules from least to most specific
const globalRule: RateLimitRule = {
level: 'global',
config: { points: 1000, duration: 60 },
};
const queueRule: RateLimitRule = {
level: 'queue',
queueName: 'test-queue',
config: { points: 100, duration: 60 },
};
const handlerRule: RateLimitRule = {
level: 'handler',
queueName: 'test-queue',
handler: 'test-handler',
config: { points: 50, duration: 60 },
};
const operationRule: RateLimitRule = {
level: 'operation',
queueName: 'test-queue',
handler: 'test-handler',
operation: 'test-op',
config: { points: 10, duration: 60 },
};
limiter.addRule(globalRule);
limiter.addRule(queueRule);
limiter.addRule(handlerRule);
limiter.addRule(operationRule);
// Operation level should take precedence
const result = await limiter.checkLimit('test-queue', 'test-handler', 'test-op');
expect(result.allowed).toBe(true);
// Check that the most specific rule was attempted (operation level)
expect(mockLogger.error).toHaveBeenCalledWith(
'Rate limit check failed',
expect.objectContaining({
queueName: 'test-queue',
handler: 'test-handler',
operation: 'test-op',
})
);
});
});
describe('getStatus', () => {
it('should get rate limit status', async () => {
const limiter = new QueueRateLimiter(mockRedisClient, mockLogger);
const rule: RateLimitRule = {
level: 'queue',
queueName: 'test-queue',
config: { points: 100, duration: 60 },
};
limiter.addRule(rule);
const status = await limiter.getStatus('test-queue', 'handler', 'operation');
expect(status.queueName).toBe('test-queue');
expect(status.handler).toBe('handler');
expect(status.operation).toBe('operation');
expect(status.appliedRule).toEqual(rule);
});
it('should return status without rule when none apply', async () => {
const limiter = new QueueRateLimiter(mockRedisClient, mockLogger);
const status = await limiter.getStatus('test-queue', 'handler', 'operation');
expect(status.queueName).toBe('test-queue');
expect(status.appliedRule).toBeUndefined();
expect(status.limit).toBeUndefined();
});
});
describe('reset', () => {
it('should reset rate limits for specific operation', async () => {
const limiter = new QueueRateLimiter(mockRedisClient, mockLogger);
const rule: RateLimitRule = {
level: 'operation',
queueName: 'test-queue',
handler: 'test-handler',
operation: 'test-op',
config: { points: 10, duration: 60 },
};
limiter.addRule(rule);
try {
await limiter.reset('test-queue', 'test-handler', 'test-op');
} catch (error) {
// In test environment, limiter.delete will fail due to no Redis connection
// That's expected, just ensure the method can be called
}
// The method should at least attempt to reset
expect(limiter.getRules()).toContain(rule);
});
it('should warn about broad reset', async () => {
const limiter = new QueueRateLimiter(mockRedisClient, mockLogger);
await limiter.reset('test-queue');
expect(mockLogger.warn).toHaveBeenCalledWith(
'Broad reset not implemented yet',
expect.objectContaining({ queueName: 'test-queue' })
);
});
});
describe('removeRule', () => {
it('should remove a rule', () => {
const limiter = new QueueRateLimiter(mockRedisClient, mockLogger);
const rule: RateLimitRule = {
level: 'queue',
queueName: 'test-queue',
config: { points: 100, duration: 60 },
};
limiter.addRule(rule);
expect(limiter.getRules()).toHaveLength(1);
const removed = limiter.removeRule('queue', 'test-queue');
expect(removed).toBe(true);
expect(limiter.getRules()).toHaveLength(0);
});
it('should return false when rule not found', () => {
const limiter = new QueueRateLimiter(mockRedisClient, mockLogger);
const removed = limiter.removeRule('queue', 'non-existent');
expect(removed).toBe(false);
});
});
describe('getRules', () => {
it('should return all configured rules', () => {
const limiter = new QueueRateLimiter(mockRedisClient, mockLogger);
const rule1: RateLimitRule = {
level: 'global',
config: { points: 1000, duration: 60 },
};
const rule2: RateLimitRule = {
level: 'queue',
queueName: 'test-queue',
config: { points: 100, duration: 60 },
};
limiter.addRule(rule1);
limiter.addRule(rule2);
const rules = limiter.getRules();
expect(rules).toHaveLength(2);
expect(rules).toContain(rule1);
expect(rules).toContain(rule2);
});
});
describe('error handling', () => {
it('should allow on rate limiter error', async () => {
const limiter = new QueueRateLimiter(mockRedisClient, mockLogger);
// Add a rule but don't set up the actual limiter to cause an error
const rule: RateLimitRule = {
level: 'queue',
queueName: 'test-queue',
config: { points: 100, duration: 60 },
};
limiter.addRule(rule);
// Force the limiter map to be empty to simulate error
(limiter as any).limiters.clear();
const result = await limiter.checkLimit('test-queue', 'handler', 'operation');
expect(result.allowed).toBe(true); // Should allow on error
expect(mockLogger.warn).toHaveBeenCalledWith(
'Rate limiter not found for rule',
expect.any(Object)
);
});
});
describe('hierarchical rule precedence', () => {
it('should correctly apply rule hierarchy', () => {
const limiter = new QueueRateLimiter(mockRedisClient, mockLogger);
// Add multiple rules at different levels
const rules: RateLimitRule[] = [
{
level: 'global',
config: { points: 10000, duration: 60 },
},
{
level: 'queue',
queueName: 'email-queue',
config: { points: 1000, duration: 60 },
},
{
level: 'handler',
queueName: 'email-queue',
handler: 'email-service',
config: { points: 100, duration: 60 },
},
{
level: 'operation',
queueName: 'email-queue',
handler: 'email-service',
operation: 'send-bulk',
config: { points: 10, duration: 60 },
},
];
rules.forEach(rule => limiter.addRule(rule));
// Test that getMostSpecificRule works correctly
const specificRule = (limiter as any).getMostSpecificRule(
'email-queue',
'email-service',
'send-bulk'
);
expect(specificRule?.level).toBe('operation');
expect(specificRule?.config.points).toBe(10);
});
});
});