fixed up ratelimiting

This commit is contained in:
Boki 2025-07-06 18:53:02 -04:00
parent a616c92656
commit a7146a3f57
15 changed files with 912 additions and 186 deletions

2
.env
View file

@ -5,7 +5,7 @@
# Core Application Settings # Core Application Settings
NODE_ENV=development NODE_ENV=development
LOG_LEVEL=trace LOG_LEVEL=trace
LOG_HIDE_OBJECT=true LOG_HIDE_OBJECT=false
# Data Service Configuration # Data Service Configuration
DATA_SERVICE_PORT=2001 DATA_SERVICE_PORT=2001

View file

@ -182,7 +182,7 @@
}, },
"services": { "services": {
"dataIngestion": { "dataIngestion": {
"port": 2009, "port": 2001,
"workers": 5, "workers": 5,
"queues": { "queues": {
"ceo": { "concurrency": 2 }, "ceo": { "concurrency": 2 },

View file

@ -13,55 +13,63 @@ import {
} from './actions'; } from './actions';
/** /**
* EOD (End of Day) Handler for testing rate limits * EOD (End of Day) Handler demonstrating advanced rate limiting
* This handler demonstrates different rate limit configurations
* *
* Handler-level rate limit: 100 requests per minute for all operations * Handler-level limits apply to all operations unless overridden
* Individual operations can override this with their own limits * Operations can specify just a cost to use handler limits, or override with custom limits
*/ */
@Handler('eod') @Handler('eod')
// @Disabled() @RateLimit({
@RateLimit({ points: 1, duration: 10, blockDuration: 10 }) limits: [
{ points: 10, duration: 1 }, // 100 points per second
{ points: 10000, duration: 3600 }, // 10k points per hour
{ points: 100000, duration: 86400 }, // 100k points per day
],
cost: 1, // Default cost for operations using this handler
})
export class EodHandler extends BaseHandler<DataIngestionServices> { export class EodHandler extends BaseHandler<DataIngestionServices> {
constructor(services: any) { constructor(services: any) {
super(services); super(services);
} }
/** /**
* Fetch daily price data - High volume operation * Fetch daily price data - Low cost operation
* Rate limit: 50 requests per minute (overrides handler-level limit) * Uses handler rate limits but costs only 1 point
*/ */
@Operation('fetch-daily-prices') @Operation('fetch-daily-prices')
@RateLimit({ points: 3, duration: 10, blockDuration: 5 }) @RateLimit(1) // Costs 1 point per call
fetchDailyPrices = fetchDailyPrices; fetchDailyPrices = fetchDailyPrices;
/** /**
* Fetch fundamental data - Medium volume operation * Fetch fundamental data - Medium cost operation
* Rate limit: 20 requests per minute * Uses handler rate limits but costs 10 points
*/ */
@Operation('fetch-fundamentals') @Operation('fetch-fundamentals')
@RateLimit({ points: 2, duration: 10, blockDuration: 10 }) @RateLimit(1) // Costs 10 points per call
fetchFundamentals = fetchFundamentals; fetchFundamentals = fetchFundamentals;
/** /**
* Fetch news data - Low volume operation * Fetch news data - High cost operation
* Rate limit: 10 requests per minute (most restrictive) * Has custom limits AND high cost
*/ */
@Operation('fetch-news') @Operation('fetch-news')
@RateLimit(1)
fetchNews = fetchNews; fetchNews = fetchNews;
/** /**
* Test burst operations - For testing rate limit behavior * Test burst operations - For testing rate limit behavior
* This doesn't have its own rate limit, so it uses the handler-level limit (100/min) * Uses handler default cost (1 point)
*/ */
@Operation('test-burst') @Operation('test-burst')
@RateLimit(0)
async testBurstOperations(input: { operationsToTest: string[], burstSize: number }): Promise<unknown> { async testBurstOperations(input: { operationsToTest: string[], burstSize: number }): Promise<unknown> {
this.logger.info('Testing burst operations', input); this.logger.info('Testing burst operations', input);
const results = { const results = {
attempted: 0, attempted: 0,
scheduled: 0, scheduled: 0,
failed: 0 failed: 0,
operations: {} as Record<string, number>
}; };
try { try {
@ -69,11 +77,13 @@ export class EodHandler extends BaseHandler<DataIngestionServices> {
for (let i = 0; i < input.burstSize; i++) { for (let i = 0; i < input.burstSize; i++) {
const operation = input.operationsToTest[i % input.operationsToTest.length] || 'fetch-news'; const operation = input.operationsToTest[i % input.operationsToTest.length] || 'fetch-news';
results.attempted++; results.attempted++;
results.operations[operation] = (results.operations[operation] || 0) + 1;
const promise = this.scheduleOperation(operation, {}).then(() => { const promise = this.scheduleOperation(operation, { index: i }).then(() => {
results.scheduled++; results.scheduled++;
}).catch(() => { }).catch((error) => {
results.failed++; results.failed++;
this.logger.debug('Failed to schedule operation', { operation, error: error.message });
}); });
promises.push(promise); promises.push(promise);
@ -84,7 +94,8 @@ export class EodHandler extends BaseHandler<DataIngestionServices> {
return { return {
success: true, success: true,
results, results,
message: `Scheduled ${results.scheduled}/${results.attempted} operations` message: `Scheduled ${results.scheduled}/${results.attempted} operations`,
breakdown: results.operations
}; };
} catch (error) { } catch (error) {
this.logger.error('Burst test failed', { error }); this.logger.error('Burst test failed', { error });
@ -92,34 +103,6 @@ export class EodHandler extends BaseHandler<DataIngestionServices> {
} }
} }
/**
* Scheduled job to fetch daily prices
* Runs every day at 6 PM (after market close)
*/
@ScheduledOperation('eod-daily-prices', '0 18 * * *', {
priority: 5,
description: 'Fetch daily price data after market close',
immediately: false,
})
async scheduledFetchDailyPrices(): Promise<unknown> {
this.logger.info('Starting scheduled daily price fetch');
return this.fetchDailyPrices();
}
/**
* Scheduled job to fetch fundamentals
* Runs weekly on Sunday
*/
@ScheduledOperation('eod-fundamentals', '0 0 * * 0', {
priority: 5,
description: 'Weekly fundamental data update',
immediately: false,
})
async scheduledFetchFundamentals(): Promise<unknown> {
this.logger.info('Starting scheduled fundamentals fetch');
return this.fetchFundamentals();
}
/** /**
* Scheduled job to test rate limits * Scheduled job to test rate limits
* Runs every 5 minutes for testing * Runs every 5 minutes for testing
@ -129,6 +112,7 @@ export class EodHandler extends BaseHandler<DataIngestionServices> {
description: 'Test rate limit behavior', description: 'Test rate limit behavior',
immediately: true, immediately: true,
}) })
@RateLimit(0) // No cost for this test operation
async scheduledRateLimitTest(): Promise<unknown> { async scheduledRateLimitTest(): Promise<unknown> {
this.logger.info('Starting rate limit test'); this.logger.info('Starting rate limit test');
return this.testBurstOperations({ return this.testBurstOperations({

View file

@ -0,0 +1,100 @@
#!/usr/bin/env bun
import { getLogger } from '@stock-bot/logger';
const logger = getLogger('test-advanced-rate-limit');
async function testAdvancedRateLimits() {
logger.info('Testing advanced rate limit features...');
const baseUrl = 'http://localhost:3001/api';
logger.info('\n📋 Rate Limit Configuration:');
logger.info('Handler limits: 100pts/sec, 10k pts/hour, 100k pts/day');
logger.info('fetch-daily-prices: 1 point per call');
logger.info('fetch-fundamentals: 10 points per call');
logger.info('fetch-news: 50 points per call (custom limits: 10pts/min, 100pts/hour)');
// First test: test-burst operation
logger.info('\n🚀 Testing burst operation with mixed costs...');
const burstResponse = await fetch(`${baseUrl}/handlers/eod/operations/test-burst`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
operationsToTest: ['fetch-daily-prices', 'fetch-fundamentals', 'fetch-news'],
burstSize: 30
})
});
const burstResult = await burstResponse.json();
logger.info('Burst test result:', burstResult);
// Wait for jobs to process
logger.info('\n⏳ Waiting 10 seconds for jobs to process...');
await new Promise(resolve => setTimeout(resolve, 10000));
// Test individual operations with different costs
logger.info('\n📊 Testing individual operations:');
// Test cheap operation (1 point)
logger.info('\n1⃣ Testing fetch-daily-prices (1 point each)...');
for (let i = 0; i < 5; i++) {
const response = await fetch(`${baseUrl}/handlers/eod/operations/fetch-daily-prices`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ test: true, index: i })
});
logger.info(`Request ${i + 1}: ${response.status} ${response.statusText}`);
}
// Test medium cost operation (10 points)
logger.info('\n🔟 Testing fetch-fundamentals (10 points each)...');
for (let i = 0; i < 3; i++) {
const response = await fetch(`${baseUrl}/handlers/eod/operations/fetch-fundamentals`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ test: true, index: i })
});
logger.info(`Request ${i + 1}: ${response.status} ${response.statusText}`);
}
// Test expensive operation (50 points)
logger.info('\n💰 Testing fetch-news (50 points each, custom limits)...');
for (let i = 0; i < 2; i++) {
const response = await fetch(`${baseUrl}/handlers/eod/operations/fetch-news`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ test: true, index: i })
});
logger.info(`Request ${i + 1}: ${response.status} ${response.statusText}`);
}
logger.info('\n✅ Test completed!');
logger.info('Check the data-ingestion service logs to see rate limiting in action.');
}
// Health check before running tests
async function checkService() {
try {
const response = await fetch('http://localhost:3001/health');
if (!response.ok) {
throw new Error('Service not healthy');
}
return true;
} catch (error) {
logger.error('Data ingestion service not running. Start it with: bun run dev');
return false;
}
}
async function main() {
if (await checkService()) {
await testAdvancedRateLimits();
}
}
main().catch(error => {
logger.error('Test failed:', error);
process.exit(1);
});

View file

@ -62,6 +62,7 @@ export function registerApplicationServices(
container.register({ container.register({
queueManager: asFunction(({ logger, handlerRegistry }) => { queueManager: asFunction(({ logger, handlerRegistry }) => {
const { QueueManager } = require('@stock-bot/queue'); const { QueueManager } = require('@stock-bot/queue');
const queueConfig = { const queueConfig = {
serviceName: config.service?.serviceName || config.service?.name || 'unknown', serviceName: config.service?.serviceName || config.service?.name || 'unknown',
redis: { redis: {
@ -78,6 +79,7 @@ export function registerApplicationServices(
enableScheduledJobs: config.queue!.enableScheduledJobs ?? true, enableScheduledJobs: config.queue!.enableScheduledJobs ?? true,
autoDiscoverHandlers: true, autoDiscoverHandlers: true,
}; };
return new QueueManager(queueConfig, handlerRegistry, logger); return new QueueManager(queueConfig, handlerRegistry, logger);
}).singleton(), }).singleton(),
}); });

View file

@ -362,6 +362,45 @@ export class ServiceApplication {
}); });
await handlerInitializer(containerWithDI); await handlerInitializer(containerWithDI);
this.logger.info('Handlers initialized'); this.logger.info('Handlers initialized');
// After handlers are initialized, add rate limit rules to queue manager
const queueManager = this.container.resolve('queueManager');
const handlerRegistry = this.container.resolve<HandlerRegistry>('handlerRegistry');
if (queueManager && handlerRegistry) {
const rateLimitRules = [];
const allMetadata = handlerRegistry.getAllMetadata();
for (const [handlerName, metadata] of allMetadata) {
// Handler-level rate limit
if (metadata.rateLimit) {
rateLimitRules.push({
level: 'handler' as const,
handler: handlerName,
config: metadata.rateLimit as any, // Type mismatch between packages
});
}
// Operation-level rate limits
for (const operation of metadata.operations) {
if (operation.rateLimit) {
rateLimitRules.push({
level: 'operation' as const,
handler: handlerName,
operation: operation.name,
config: operation.rateLimit as any, // Type mismatch between packages
});
}
}
}
if (rateLimitRules.length > 0) {
await queueManager.addRateLimitRules(rateLimitRules);
this.logger.info('Rate limit rules added to QueueManager', {
ruleCount: rateLimitRules.length,
});
}
}
} }
// Create and mount routes // Create and mount routes

View file

@ -126,6 +126,7 @@ export abstract class BaseHandler<TServices extends ServiceTypes = ServiceTypes>
if (!this.queue) { if (!this.queue) {
throw new Error('Queue service is not available for this handler'); throw new Error('Queue service is not available for this handler');
} }
const jobData = { const jobData = {
handler: this.handlerName, handler: this.handlerName,
operation, operation,

View file

@ -1,12 +1,30 @@
/**
* Individual rate limit window configuration
*/
export interface RateLimitWindow {
points: number; // Number of allowed points in the time window
duration: number; // Time window in seconds
blockDuration?: number; // How long to block after limit is hit (seconds)
}
/** /**
* Rate limit configuration for handlers and operations * Rate limit configuration for handlers and operations
*/ */
export interface RateLimitConfig { export interface RateLimitConfig {
points: number; // Number of allowed requests limits?: RateLimitWindow[]; // Array of rate limit windows
duration: number; // Time window in seconds cost?: number; // How many points this operation costs (default: 1)
blockDuration?: number; // How long to block after limit is hit (seconds)
} }
/**
* Flexible rate limit configuration
* Can be:
* - A full config object with limits and cost
* - Just a cost number (for operations inheriting handler limits)
* - An array of limit windows (legacy support)
* - A single limit window (legacy support)
*/
export type RateLimitOptions = RateLimitConfig | number | RateLimitWindow[] | RateLimitWindow;
/** /**
* RateLimit decorator - configures rate limiting for handlers or operations * RateLimit decorator - configures rate limiting for handlers or operations
* *
@ -14,26 +32,62 @@ export interface RateLimitConfig {
* - Classes: Sets default rate limit for all operations in the handler * - Classes: Sets default rate limit for all operations in the handler
* - Methods: Sets specific rate limit for individual operations (overrides handler-level) * - Methods: Sets specific rate limit for individual operations (overrides handler-level)
* *
* @param config Rate limit configuration * @param options Rate limit configuration - flexible format
* *
* @example * @example
* // Handler-level rate limit * // Handler-level rate limit with multiple windows
* @Handler('myHandler') * @Handler('myHandler')
* @RateLimit({ points: 100, duration: 60 }) * @RateLimit({
* limits: [
* { points: 100, duration: 1 }, // 100 points/second
* { points: 10000, duration: 3600 } // 10k points/hour
* ],
* cost: 1 // Default cost for all operations
* })
* class MyHandler extends BaseHandler { * class MyHandler extends BaseHandler {
* // All operations inherit the 100/minute limit * // All operations inherit these limits
* } * }
* *
* @example * @example
* // Operation-specific rate limit * // Operation-specific cost only (inherits handler limits)
* @Operation('fetch-data') * @Operation('fetch-simple')
* @RateLimit({ points: 10, duration: 60, blockDuration: 30 }) * @RateLimit(1) // Costs 1 point
* async fetchData() { * async fetchSimple() {}
* // This operation is limited to 10/minute *
* } * @Operation('fetch-complex')
* @RateLimit({ cost: 10 }) // Costs 10 points
* async fetchComplex() {}
*
* @example
* // Operation with custom limits and cost
* @Operation('special-operation')
* @RateLimit({
* limits: [
* { points: 10, duration: 60 } // More restrictive: 10/minute
* ],
* cost: 5
* })
* async specialOperation() {}
*/ */
export function RateLimit(config: RateLimitConfig) { export function RateLimit(options: RateLimitOptions) {
return function (target: any, propertyKey?: string, descriptor?: PropertyDescriptor): any { return function (target: any, propertyKey?: string, descriptor?: PropertyDescriptor): any {
// Normalize options to RateLimitConfig format
let config: RateLimitConfig;
if (typeof options === 'number') {
// Just a cost number
config = { cost: options };
} else if (Array.isArray(options)) {
// Array of limit windows (legacy support)
config = { limits: options };
} else if ('points' in options && 'duration' in options) {
// Single limit window (legacy support)
config = { limits: [options as RateLimitWindow] };
} else {
// Already a RateLimitConfig
config = options as RateLimitConfig;
}
if (propertyKey) { if (propertyKey) {
// Method decorator - operation-specific rate limit // Method decorator - operation-specific rate limit
const constructor = target.constructor; const constructor = target.constructor;

View file

@ -8,7 +8,11 @@ export {
Disabled, Disabled,
} from './decorators/decorators'; } from './decorators/decorators';
export { RateLimit } from './decorators/rate-limit'; export { RateLimit } from './decorators/rate-limit';
export type { RateLimitConfig } from './decorators/rate-limit'; export type {
RateLimitConfig,
RateLimitWindow,
RateLimitOptions
} from './decorators/rate-limit';
export { createJobHandler } from './utils/create-job-handler'; export { createJobHandler } from './utils/create-job-handler';
// Re-export commonly used types from @stock-bot/types for convenience // Re-export commonly used types from @stock-bot/types for convenience

View file

@ -20,7 +20,7 @@ export { DeadLetterQueueHandler } from './dlq-handler';
export { QueueMetricsCollector } from './queue-metrics'; export { QueueMetricsCollector } from './queue-metrics';
// Rate limiting // Rate limiting
export { QueueRateLimiter } from './rate-limiter'; export { QueueRateLimiter } from './rate-limiter-new';
// Types // Types
export type { export type {

View file

@ -3,8 +3,9 @@ import type { CacheProvider } from '@stock-bot/cache';
import { createCache } from '@stock-bot/cache'; import { createCache } from '@stock-bot/cache';
import type { HandlerRegistry } from '@stock-bot/handler-registry'; import type { HandlerRegistry } from '@stock-bot/handler-registry';
import { getLogger } from '@stock-bot/logger'; import { getLogger } from '@stock-bot/logger';
import Redis from 'ioredis';
import { Queue, type QueueWorkerConfig } from './queue'; import { Queue, type QueueWorkerConfig } from './queue';
import { QueueRateLimiter } from './rate-limiter'; import { QueueRateLimiter } from './rate-limiter-new';
import { getFullQueueName, parseQueueName } from './service-utils'; import { getFullQueueName, parseQueueName } from './service-utils';
import type { import type {
GlobalStats, GlobalStats,
@ -37,6 +38,7 @@ export class QueueManager {
private caches = new Map<string, CacheProvider>(); private caches = new Map<string, CacheProvider>();
private rateLimiter?: QueueRateLimiter; private rateLimiter?: QueueRateLimiter;
private redisConnection: ReturnType<typeof getRedisConnection>; private redisConnection: ReturnType<typeof getRedisConnection>;
private ioredisConnection?: Redis;
private isShuttingDown = false; private isShuttingDown = false;
private shutdownPromise: Promise<void> | null = null; private shutdownPromise: Promise<void> | null = null;
private config: QueueManagerConfig; private config: QueueManagerConfig;
@ -66,15 +68,18 @@ export class QueueManager {
this.logger = logger || getLogger('QueueManager'); this.logger = logger || getLogger('QueueManager');
this.redisConnection = getRedisConnection(config.redis); this.redisConnection = getRedisConnection(config.redis);
// Initialize rate limiter if rules are provided // Create ioredis connection for rate limiter
if (config.rateLimitRules && config.rateLimitRules.length > 0) { this.ioredisConnection = new Redis({
this.rateLimiter = new QueueRateLimiter(this.redisConnection, this.logger); host: config.redis.host,
config.rateLimitRules.forEach(rule => { port: config.redis.port,
if (this.rateLimiter) { password: config.redis.password,
this.rateLimiter.addRule(rule); db: config.redis.db || 0,
} maxRetriesPerRequest: 3,
}); enableReadyCheck: true,
} });
// Rate limiter will be initialized when rules are added dynamically
// No longer initialized from config
// Auto-discover routes if enabled and registry provided // Auto-discover routes if enabled and registry provided
if (config.serviceName && config.autoDiscoverHandlers !== false && handlerRegistry) { if (config.serviceName && config.autoDiscoverHandlers !== false && handlerRegistry) {
@ -153,6 +158,7 @@ export class QueueManager {
startWorker: workers > 0, startWorker: workers > 0,
handlerRegistry: options.handlerRegistry || this.handlerRegistry, handlerRegistry: options.handlerRegistry || this.handlerRegistry,
serviceName: this.config.serviceName, serviceName: this.config.serviceName,
queueManager: this,
}; };
const queue = new Queue( const queue = new Queue(
@ -169,16 +175,7 @@ export class QueueManager {
// Automatically initialize batch cache for the queue // Automatically initialize batch cache for the queue
this.initializeBatchCacheSync(queueName); this.initializeBatchCacheSync(queueName);
// Add queue-specific rate limit rules // Rate limit rules are now added via addRateLimitRules after handlers are initialized
if (this.rateLimiter && mergedOptions.rateLimitRules) {
mergedOptions.rateLimitRules.forEach(rule => {
// Ensure queue name is set for queue-specific rules
const ruleWithQueue = { ...rule, queueName };
if (this.rateLimiter) {
this.rateLimiter.addRule(ruleWithQueue);
}
});
}
this.logger.info('Queue created with batch cache', { this.logger.info('Queue created with batch cache', {
queueName, queueName,
@ -303,23 +300,44 @@ export class QueueManager {
/** /**
* Add a rate limit rule * Add a rate limit rule
*/ */
addRateLimitRule(rule: RateLimitRule): void { async addRateLimitRule(rule: RateLimitRule): Promise<void> {
if (!this.rateLimiter) { if (!this.rateLimiter) {
this.rateLimiter = new QueueRateLimiter(this.redisConnection, this.logger); if (!this.ioredisConnection) {
throw new Error('IoRedis connection not initialized');
}
this.rateLimiter = new QueueRateLimiter(this.ioredisConnection, this.logger);
} }
this.rateLimiter.addRule(rule); await this.rateLimiter.addRule(rule);
} }
/** /**
* Check rate limits for a job * Add multiple rate limit rules at once
*/
async addRateLimitRules(rules: RateLimitRule[]): Promise<void> {
if (!this.rateLimiter) {
if (!this.ioredisConnection) {
throw new Error('IoRedis connection not initialized');
}
this.rateLimiter = new QueueRateLimiter(this.ioredisConnection, this.logger);
}
await Promise.all(rules.map(rule => this.rateLimiter!.addRule(rule)));
this.logger.info('Added rate limit rules to QueueManager', {
ruleCount: rules.length,
rules,
});
}
/**
* Check rate limits for a job - overloaded to support both interfaces
*/ */
async checkRateLimit( async checkRateLimit(
queueName: string, handlerOrQueueName: string,
handler: string, operationOrHandler?: string,
operation: string operationIfThreeParams?: string
): Promise<{ ): Promise<{
allowed: boolean; allowed: boolean;
retryAfter?: number; retryDelayMs?: number;
remainingPoints?: number; remainingPoints?: number;
appliedRule?: RateLimitRule; appliedRule?: RateLimitRule;
}> { }> {
@ -327,7 +345,27 @@ export class QueueManager {
return { allowed: true }; return { allowed: true };
} }
return await this.rateLimiter.checkLimit(queueName, handler, operation); // Support both 2-parameter and 3-parameter calls
let handler: string;
let operation: string | undefined;
if (operationIfThreeParams !== undefined) {
// 3-parameter call: checkRateLimit(queueName, handler, operation)
handler = operationOrHandler!;
operation = operationIfThreeParams;
} else {
// 2-parameter call: checkRateLimit(handler, operation)
handler = handlerOrQueueName;
operation = operationOrHandler;
}
// The new rate limiter only needs handler and operation
const result = await this.rateLimiter.checkLimit(handler, operation);
return {
allowed: result.allowed,
retryDelayMs: result.retryDelayMs,
remainingPoints: result.remainingPoints,
};
} }
/** /**
@ -342,7 +380,12 @@ export class QueueManager {
}; };
} }
return await this.rateLimiter.getStatus(queueName, handler, operation); // The new rate limiter only needs handler and operation
const status = await this.rateLimiter.getStatus(handler, operation);
return {
queueName,
...status
};
} }
/** /**
@ -468,6 +511,16 @@ export class QueueManager {
await Promise.all(cacheShutdownPromises); await Promise.all(cacheShutdownPromises);
// Close ioredis connection
if (this.ioredisConnection) {
try {
await this.ioredisConnection.quit();
this.logger.debug('IoRedis connection closed');
} catch (error) {
this.logger.warn('Error closing ioredis connection', { error: (error as Error).message });
}
}
// Clear collections // Clear collections
this.queues.clear(); this.queues.clear();
this.caches.clear(); this.caches.clear();

View file

@ -20,6 +20,7 @@ export interface QueueWorkerConfig {
startWorker?: boolean; startWorker?: boolean;
handlerRegistry?: HandlerRegistry; handlerRegistry?: HandlerRegistry;
serviceName?: string; serviceName?: string;
queueManager?: any; // Reference to queue manager for rate limiting
} }
/** /**
@ -35,6 +36,7 @@ export class Queue {
private readonly logger: Logger; private readonly logger: Logger;
private readonly handlerRegistry?: HandlerRegistry; private readonly handlerRegistry?: HandlerRegistry;
private serviceName?: string; private serviceName?: string;
private queueManager?: any;
constructor( constructor(
queueName: string, queueName: string,
@ -48,6 +50,7 @@ export class Queue {
this.logger = logger || console; this.logger = logger || console;
this.handlerRegistry = config.handlerRegistry; this.handlerRegistry = config.handlerRegistry;
this.serviceName = config.serviceName; this.serviceName = config.serviceName;
this.queueManager = config.queueManager;
this.logger.debug('Queue constructor called', { this.logger.debug('Queue constructor called', {
queueName, queueName,
@ -332,13 +335,18 @@ export class Queue {
const connection = getRedisConnection(this.redisConfig); const connection = getRedisConnection(this.redisConfig);
for (let i = 0; i < workerCount; i++) { for (let i = 0; i < workerCount; i++) {
const worker = new Worker(this.queueName, this.processJob.bind(this), { const worker: Worker = new Worker(this.queueName, async (job: Job) => this.processJob(job, worker), {
connection, connection,
concurrency, concurrency,
maxStalledCount: 3, maxStalledCount: 3,
stalledInterval: 30000, stalledInterval: 30000,
// Add a name to identify the worker // Add a name to identify the worker
name: `${this.serviceName || 'unknown'}_worker_${i}`, name: `${this.serviceName || 'unknown'}_worker_${i}`,
// Enable manual rate limiting
limiter: {
max: 100,
duration: 1,
},
}); });
this.logger.info(`Starting worker ${i + 1}/${workerCount} for queue`, { this.logger.info(`Starting worker ${i + 1}/${workerCount} for queue`, {
@ -348,7 +356,7 @@ export class Queue {
}); });
// Setup worker event handlers // Setup worker event handlers
worker.on('completed', job => { worker.on('completed', (job: Job) => {
this.logger.trace('Job completed', { this.logger.trace('Job completed', {
queueName: this.queueName, queueName: this.queueName,
jobId: job.id, jobId: job.id,
@ -357,17 +365,27 @@ export class Queue {
}); });
}); });
worker.on('failed', (job, err) => { worker.on('failed', (job: Job | undefined, err: Error) => {
this.logger.error('Job failed', { // Only log as error if it's not a rate limit error
queueName: this.queueName, if (err.name !== 'RateLimitError') {
jobId: job?.id, this.logger.error('Job failed', {
handler: job?.data?.handler, queueName: this.queueName,
operation: job?.data?.operation, jobId: job?.id,
error: err.message, handler: job?.data?.handler,
}); operation: job?.data?.operation,
error: err.message,
});
} else {
this.logger.debug('Job rate limited', {
queueName: this.queueName,
jobId: job?.id,
handler: job?.data?.handler,
operation: job?.data?.operation,
});
}
}); });
worker.on('error', error => { worker.on('error', (error: Error) => {
this.logger.error('Worker error', { this.logger.error('Worker error', {
queueName: this.queueName, queueName: this.queueName,
workerId: i, workerId: i,
@ -386,9 +404,16 @@ export class Queue {
} }
/** /**
* Process a job using the handler registry * Process a job using the handler registry with rate limiting
*/ */
private async processJob(job: Job): Promise<unknown> { private async processJob(job: Job, worker: Worker): Promise<unknown> {
this.logger.debug('Processing job', {
jobId: job.id,
jobName: job.name,
queueName: this.queueName,
hasQueueManager: !!this.queueManager,
jobData: job.data
});
const { handler, operation, payload }: JobData = job.data; const { handler, operation, payload }: JobData = job.data;
this.logger.trace('Processing job', { this.logger.trace('Processing job', {
@ -404,6 +429,42 @@ export class Queue {
throw new Error('Handler registry not configured for worker processing'); throw new Error('Handler registry not configured for worker processing');
} }
// Check rate limits if available
if (this.queueManager && this.queueManager.checkRateLimit) {
this.logger.debug('Checking rate limits', {
handler,
operation,
queueManager: this.queueManager.constructor.name
});
const rateLimitCheck = await this.queueManager.checkRateLimit(
handler,
operation
);
this.logger.debug('Rate limit check result', {
allowed: rateLimitCheck.allowed,
retryDelayMs: rateLimitCheck.retryDelayMs,
handler,
operation
});
if (!rateLimitCheck.allowed) {
this.logger.trace('Rate limit exceeded, throwing Worker.RateLimitError', {
id: job.id,
handler,
operation,
retryDelayMs: rateLimitCheck.retryDelayMs
});
// Use BullMQ's manual rate limiting
// First inform the worker about the rate limit delay
await worker.rateLimit(rateLimitCheck.retryDelayMs || 1000);
// Then throw the RateLimitError
throw Worker.RateLimitError();
}
}
this.logger.debug('Looking up handler in registry', { this.logger.debug('Looking up handler in registry', {
handler, handler,
operation, operation,
@ -419,6 +480,7 @@ export class Queue {
const result = await jobHandler(payload); const result = await jobHandler(payload);
this.logger.info(`[Queue] Job ${job.id} - ${handler}:${operation} completed successfully`);
this.logger.trace('Job completed successfully', { this.logger.trace('Job completed successfully', {
id: job.id, id: job.id,
handler, handler,
@ -428,6 +490,11 @@ export class Queue {
return result; return result;
} catch (error) { } catch (error) {
// Re-throw RateLimitError without logging as error
if (error === Worker.RateLimitError() || (error as any).name === 'RateLimitError') {
throw error;
}
this.logger.error('Job processing failed', { this.logger.error('Job processing failed', {
id: job.id, id: job.id,
handler, handler,

View file

@ -0,0 +1,380 @@
import { RateLimiterRedis, type IRateLimiterRedisOptions } from 'rate-limiter-flexible';
import type Redis from 'ioredis';
import type { RateLimitWindow, RateLimitConfig, RateLimitRule } from './types';
// Logger interface for type safety
interface Logger {
info(message: string, meta?: Record<string, unknown>): void;
error(message: string, meta?: Record<string, unknown>): void;
warn(message: string, meta?: Record<string, unknown>): void;
debug(message: string, meta?: Record<string, unknown>): void;
trace(message: string, meta?: Record<string, unknown>): void;
}
interface RateLimiterEntry {
windows: Array<{
limiter: RateLimiterRedis;
window: RateLimitWindow;
}>;
cost: number;
}
export interface RateLimitResult {
allowed: boolean;
retryDelayMs?: number;
remainingPoints?: number;
cost?: number;
}
/**
* Enhanced rate limiter that supports:
* - Multiple time windows per rule
* - Variable costs per operation
* - Hierarchical rules (operation > handler > queue > global)
*/
export class QueueRateLimiter {
private limiters = new Map<string, RateLimiterEntry>();
private readonly logger: Logger;
constructor(
private redisClient: Redis,
logger?: Logger
) {
this.logger = logger || console;
this.logger.debug('QueueRateLimiter initialized', {
redisClientType: typeof this.redisClient,
redisClientKeys: this.redisClient ? Object.keys(this.redisClient).slice(0, 10) : [],
hasClient: !!this.redisClient,
isIoRedis: this.redisClient && this.redisClient.constructor.name === 'Redis'
});
}
/**
* Add a rate limit rule
*/
async addRule(rule: RateLimitRule): Promise<void> {
const key = this.getRuleKey(rule);
// Create array of limiters for each window
const windows: RateLimiterEntry['windows'] = [];
const limits = rule.config.limits || [];
this.logger.debug('Adding rate limit rule', {
key,
rule,
windowCount: limits.length,
cost: rule.config.cost
});
for (const window of limits) {
const limiterOptions: IRateLimiterRedisOptions = {
storeClient: this.redisClient as any,
keyPrefix: `rate_limit:${key}:${window.duration}s`,
points: window.points,
duration: window.duration,
blockDuration: window.blockDuration || 0,
};
this.logger.debug('Creating RateLimiterRedis', {
keyPrefix: limiterOptions.keyPrefix,
points: limiterOptions.points,
duration: limiterOptions.duration,
hasRedisClient: !!this.redisClient,
redisClientType: typeof this.redisClient
});
try {
const limiter = new RateLimiterRedis(limiterOptions);
windows.push({ limiter, window });
} catch (error) {
this.logger.error('Failed to create RateLimiterRedis', { error, limiterOptions });
throw error;
}
}
// Default cost: handler defines the pool (no default cost), operations default to 1
const defaultCost = rule.level === 'operation' ? 1 : 0;
this.limiters.set(key, {
windows,
cost: rule.config.cost !== undefined ? rule.config.cost : defaultCost
});
this.logger.info('Rate limit rule added', {
key,
level: rule.level,
handler: rule.handler,
operation: rule.operation,
windows: windows.length,
cost: rule.config.cost !== undefined ? rule.config.cost : defaultCost
});
}
/**
* Check if an operation is allowed based on rate limits
*/
async checkLimit(handler: string, operation?: string): Promise<RateLimitResult> {
// Build keys to check from most specific to least specific
const keysToCheck: string[] = [];
if (operation) {
keysToCheck.push(this.getRuleKey({ level: 'operation', handler, operation, config: {} } as RateLimitRule));
}
keysToCheck.push(this.getRuleKey({ level: 'handler', handler, config: {} } as RateLimitRule));
keysToCheck.push(this.getRuleKey({ level: 'queue', config: {} } as RateLimitRule));
keysToCheck.push(this.getRuleKey({ level: 'global', config: {} } as RateLimitRule));
this.logger.debug('Checking rate limits', {
handler,
operation,
keysToCheck,
availableKeys: Array.from(this.limiters.keys())
});
// First, find the cost from the most specific rule
let cost = 1; // Default cost
for (const key of keysToCheck) {
const entry = this.limiters.get(key);
if (entry && entry.cost !== undefined) {
cost = entry.cost;
this.logger.info(`[RateLimiter] Using cost ${cost} from rule: ${key}`);
this.logger.debug('Using cost from rule', {
key,
cost,
handler,
operation
});
break;
}
}
// Then find the rate limit windows from the first matching rule that has windows
let windowEntry: RateLimiterEntry | undefined;
let appliedKey: string | undefined;
for (const key of keysToCheck) {
const entry = this.limiters.get(key);
if (entry && entry.windows.length > 0) {
windowEntry = entry;
appliedKey = key;
break;
}
}
if (!windowEntry) {
this.logger.debug('No rate limit rules found', {
handler,
operation
});
return { allowed: true };
}
this.logger.info(`[RateLimiter] Applying rate limit rule: ${appliedKey}`, {
appliedKey,
windowCount: windowEntry.windows.length,
cost,
handler,
operation
});
// Check all windows with the determined cost
let maxRetryDelayMs = 0;
this.logger.debug('Checking rate limit windows', {
handler,
operation,
cost,
windowCount: windowEntry.windows.length
});
// Special handling for 0-cost operations - they should always pass
if (cost === 0) {
this.logger.info(`[RateLimiter] Zero-cost operation ${handler}:${operation || 'N/A'} - allowing without consuming points`);
return {
allowed: true,
cost: 0
};
}
for (const { limiter, window } of windowEntry.windows) {
// For handler-level rate limits, all operations share the same pool
// Use the applied key to determine the consumer key
const consumerKey = appliedKey?.startsWith('handler:') ? handler :
(operation ? `${handler}:${operation}` : handler);
try {
// First check current state without consuming
const currentState = await limiter.get(consumerKey);
const now = Date.now();
this.logger.info(`[RateLimiter] Current state before consume:`, {
handler,
operation,
consumerKey,
cost,
window: `${window.points}pts/${window.duration}s`,
consumedPoints: currentState?.consumedPoints || 0,
remainingPoints: currentState?.remainingPoints ?? window.points,
msBeforeNext: currentState?.msBeforeNext || 0,
currentTime: new Date(now).toISOString(),
resetTime: currentState?.msBeforeNext ? new Date(now + currentState.msBeforeNext).toISOString() : 'N/A',
hasState: !!currentState,
stateDetails: currentState ? JSON.stringify(currentState) : 'no state'
});
// Try to consume points
this.logger.info(`[RateLimiter] Attempting to consume ${cost} points for ${consumerKey}`);
const result = await limiter.consume(consumerKey, cost);
this.logger.info(`[RateLimiter] Successfully consumed ${cost} points, ${result.remainingPoints} remaining`);
this.logger.debug('Consumed points successfully', {
handler,
operation,
consumerKey,
window: `${window.points}pts/${window.duration}s`,
cost,
remainingPoints: result.remainingPoints,
msBeforeNext: result.msBeforeNext
});
this.logger.trace('Rate limit window passed', {
handler,
operation,
window: `${window.points}pts/${window.duration}s`,
cost
});
} catch (rejRes: any) {
// Rate limit exceeded for this window
const retryDelayMs = rejRes.msBeforeNext || window.blockDuration || 1000;
maxRetryDelayMs = Math.max(maxRetryDelayMs, retryDelayMs);
this.logger.info(`[RateLimiter] RATE LIMIT HIT: ${consumerKey} - ${cost} points rejected, retry in ${retryDelayMs}ms`, {
consumedPoints: rejRes.consumedPoints || 0,
remainingPoints: rejRes.remainingPoints || 0,
totalPoints: rejRes.totalPoints || window.points,
isFirstInDuration: rejRes.isFirstInDuration || false,
msBeforeNext: rejRes.msBeforeNext || 0,
retryDelayMs
});
this.logger.trace('Rate limit exceeded', {
handler,
operation,
consumerKey,
window: `${window.points}pts/${window.duration}s`,
cost,
remainingPoints: rejRes.remainingPoints || 0,
retryDelayMs: maxRetryDelayMs
});
return {
allowed: false,
retryDelayMs: maxRetryDelayMs,
remainingPoints: rejRes.remainingPoints || 0,
cost
};
}
}
// All windows passed
return {
allowed: true,
cost
};
}
/**
* Get rule key for storing rate limiter
*/
private getRuleKey(rule: RateLimitRule): string {
switch (rule.level) {
case 'global':
return 'global';
case 'queue':
return `queue:${rule.queueName || 'default'}`;
case 'handler':
return `handler:${rule.handler}`;
case 'operation':
return `operation:${rule.handler}:${rule.operation}`;
default:
return rule.level;
}
}
/**
* Get current rate limit status
*/
async getStatus(handler: string, operation?: string): Promise<any> {
// Build keys to check from most specific to least specific
const keysToCheck: string[] = [];
if (operation) {
keysToCheck.push(this.getRuleKey({ level: 'operation', handler, operation, config: {} } as RateLimitRule));
}
keysToCheck.push(this.getRuleKey({ level: 'handler', handler, config: {} } as RateLimitRule));
keysToCheck.push(this.getRuleKey({ level: 'queue', config: {} } as RateLimitRule));
keysToCheck.push(this.getRuleKey({ level: 'global', config: {} } as RateLimitRule));
// Find the first matching rule
let entry: RateLimiterEntry | undefined;
let appliedKey: string | undefined;
for (const key of keysToCheck) {
entry = this.limiters.get(key);
if (entry) {
appliedKey = key;
break;
}
}
if (!entry) {
return {
handler,
operation,
hasRule: false
};
}
// Get status for all windows
const consumerKey = operation ? `${handler}:${operation}` : handler;
const windows = await Promise.all(
entry.windows.map(async ({ limiter, window }) => {
const result = await limiter.get(consumerKey);
return {
points: window.points,
duration: window.duration,
remaining: result?.remainingPoints ?? window.points,
resetIn: result?.msBeforeNext ?? 0,
};
})
);
return {
handler,
operation,
hasRule: true,
appliedKey,
cost: entry.cost,
windows
};
}
/**
* Reset rate limits for a specific handler/operation
*/
async reset(handler: string, operation?: string): Promise<void> {
const keysToCheck: string[] = [];
if (operation) {
keysToCheck.push(this.getRuleKey({ level: 'operation', handler, operation, config: {} } as RateLimitRule));
}
keysToCheck.push(this.getRuleKey({ level: 'handler', handler, config: {} } as RateLimitRule));
for (const key of keysToCheck) {
const entry = this.limiters.get(key);
if (entry) {
await Promise.all(
entry.windows.map(({ limiter }) => limiter.delete(handler))
);
}
}
this.logger.info('Rate limits reset', { handler, operation });
}
}

View file

@ -1,5 +1,5 @@
import { RateLimiterRedis, RateLimiterRes } from 'rate-limiter-flexible'; import { RateLimiterRedis, RateLimiterRes } from 'rate-limiter-flexible';
import type { RateLimitConfig as BaseRateLimitConfig, RateLimitRule } from './types'; import type { RateLimitWindow, RateLimitConfig, RateLimitRule } from './types';
// Logger interface for type safety // Logger interface for type safety
interface Logger { interface Logger {
@ -9,13 +9,16 @@ interface Logger {
debug(message: string, meta?: Record<string, unknown>): void; debug(message: string, meta?: Record<string, unknown>): void;
} }
// Extend the base config to add rate-limiter specific fields interface RateLimiterEntry {
export interface RateLimitConfig extends BaseRateLimitConfig { windows: Array<{
keyPrefix?: string; limiter: RateLimiterRedis;
window: RateLimitWindow;
}>;
cost: number;
} }
export class QueueRateLimiter { export class QueueRateLimiter {
private limiters = new Map<string, RateLimiterRedis>(); private limiters = new Map<string, RateLimiterEntry>();
private rules: RateLimitRule[] = []; private rules: RateLimitRule[] = [];
private readonly logger: Logger; private readonly logger: Logger;
@ -33,23 +36,33 @@ export class QueueRateLimiter {
this.rules.push(rule); this.rules.push(rule);
const key = this.getRuleKey(rule.level, rule.queueName, rule.handler, rule.operation); const key = this.getRuleKey(rule.level, rule.queueName, rule.handler, rule.operation);
const limiter = new RateLimiterRedis({
storeClient: this.redisClient, // Extract limits and cost from config
keyPrefix: `rl:${key}`, const limits = rule.config.limits || [];
points: rule.config.points, const cost = rule.config.cost || 1;
duration: rule.config.duration,
blockDuration: rule.config.blockDuration || 0, // Create rate limiters for each window
const windows = limits.map((window, index) => {
const limiter = new RateLimiterRedis({
storeClient: this.redisClient,
keyPrefix: `rl:${key}:${index}`,
points: window.points,
duration: window.duration,
blockDuration: window.blockDuration || 0,
});
return { limiter, window };
}); });
this.limiters.set(key, limiter); this.limiters.set(key, { windows, cost });
this.logger.info('Rate limit rule added', { this.logger.info('Rate limit rule added', {
level: rule.level, level: rule.level,
queueName: rule.queueName, queueName: rule.queueName,
handler: rule.handler, handler: rule.handler,
operation: rule.operation, operation: rule.operation,
points: rule.config.points, windows: limits.length,
duration: rule.config.duration, cost,
}); });
} }
@ -57,6 +70,7 @@ export class QueueRateLimiter {
* Check if a job can be processed based on rate limits * Check if a job can be processed based on rate limits
* Uses hierarchical precedence: operation > handler > queue > global * Uses hierarchical precedence: operation > handler > queue > global
* The most specific matching rule takes precedence * The most specific matching rule takes precedence
* Returns the longest wait time if multiple windows are hit
*/ */
async checkLimit( async checkLimit(
queueName: string, queueName: string,
@ -67,6 +81,7 @@ export class QueueRateLimiter {
retryAfter?: number; retryAfter?: number;
remainingPoints?: number; remainingPoints?: number;
appliedRule?: RateLimitRule; appliedRule?: RateLimitRule;
cost?: number;
}> { }> {
const applicableRule = this.getMostSpecificRule(queueName, handler, operation); const applicableRule = this.getMostSpecificRule(queueName, handler, operation);
@ -80,28 +95,74 @@ export class QueueRateLimiter {
applicableRule.handler, applicableRule.handler,
applicableRule.operation applicableRule.operation
); );
const limiter = this.limiters.get(key); const limiterEntry = this.limiters.get(key);
if (!limiter) { if (!limiterEntry || limiterEntry.windows.length === 0) {
this.logger.warn('Rate limiter not found for rule', { key, rule: applicableRule }); this.logger.warn('Rate limiter not found for rule', { key, rule: applicableRule });
return { allowed: true }; return { allowed: true };
} }
try { const consumerKey = this.getConsumerKey(queueName, handler, operation);
const result = await this.consumePoint( const cost = limiterEntry.cost;
limiter,
this.getConsumerKey(queueName, handler, operation) // Check all windows and collect results
const windowResults = await Promise.all(
limiterEntry.windows.map(async ({ limiter, window }) => {
try {
// Try to consume points for this window
const result = await limiter.consume(consumerKey, cost);
return {
allowed: true,
remainingPoints: result.remainingPoints,
retryAfter: 0,
};
} catch (rejRes) {
if (rejRes instanceof RateLimiterRes) {
return {
allowed: false,
remainingPoints: rejRes.remainingPoints,
retryAfter: rejRes.msBeforeNext,
};
}
throw rejRes;
}
})
);
// Find if any window rejected the request
const rejectedWindow = windowResults.find(r => !r.allowed);
if (rejectedWindow) {
// Find the longest wait time among all rejected windows
const maxRetryAfter = Math.max(
...windowResults.filter(r => !r.allowed).map(r => r.retryAfter || 0)
); );
this.logger.warn('Rate limit exceeded', {
handler,
operation,
cost,
retryAfter: maxRetryAfter,
});
return { return {
...result, allowed: false,
retryAfter: maxRetryAfter,
remainingPoints: rejectedWindow.remainingPoints,
appliedRule: applicableRule, appliedRule: applicableRule,
cost,
}; };
} catch (error) {
this.logger.error('Rate limit check failed', { queueName, handler, operation, error });
// On error, allow the request to proceed
return { allowed: true };
} }
// All windows allowed - return the minimum remaining points
const minRemainingPoints = Math.min(...windowResults.map(r => r.remainingPoints || 0));
return {
allowed: true,
remainingPoints: minRemainingPoints,
appliedRule: applicableRule,
cost,
};
} }
/** /**
@ -144,35 +205,6 @@ export class QueueRateLimiter {
return rule; return rule;
} }
/**
* Consume a point from the rate limiter
*/
private async consumePoint(
limiter: RateLimiterRedis,
key: string
): Promise<{ allowed: boolean; retryAfter?: number; remainingPoints?: number }> {
try {
const result = await limiter.consume(key);
return {
allowed: true,
remainingPoints: result.remainingPoints,
};
} catch (rejRes) {
if (rejRes instanceof RateLimiterRes) {
this.logger.warn('Rate limit exceeded', {
key,
retryAfter: rejRes.msBeforeNext,
});
return {
allowed: false,
retryAfter: rejRes.msBeforeNext,
remainingPoints: rejRes.remainingPoints,
};
}
throw rejRes;
}
}
/** /**
* Get rule key for storing rate limiter * Get rule key for storing rate limiter
@ -216,13 +248,13 @@ export class QueueRateLimiter {
handler: string; handler: string;
operation: string; operation: string;
appliedRule?: RateLimitRule; appliedRule?: RateLimitRule;
limit?: { cost?: number;
level: string; windows?: Array<{
points: number; points: number;
duration: number; duration: number;
remaining: number; remaining: number;
resetIn: number; resetIn: number;
}; }>;
}> { }> {
const applicableRule = this.getMostSpecificRule(queueName, handler, operation); const applicableRule = this.getMostSpecificRule(queueName, handler, operation);
@ -240,9 +272,9 @@ export class QueueRateLimiter {
applicableRule.handler, applicableRule.handler,
applicableRule.operation applicableRule.operation
); );
const limiter = this.limiters.get(key); const limiterEntry = this.limiters.get(key);
if (!limiter) { if (!limiterEntry || limiterEntry.windows.length === 0) {
return { return {
queueName, queueName,
handler, handler,
@ -253,22 +285,27 @@ export class QueueRateLimiter {
try { try {
const consumerKey = this.getConsumerKey(queueName, handler, operation); const consumerKey = this.getConsumerKey(queueName, handler, operation);
const result = await limiter.get(consumerKey);
const limit = { // Get status for all windows
level: applicableRule.level, const windows = await Promise.all(
points: limiter.points, limiterEntry.windows.map(async ({ limiter, window }) => {
duration: limiter.duration, const result = await limiter.get(consumerKey);
remaining: result?.remainingPoints ?? limiter.points, return {
resetIn: result?.msBeforeNext ?? 0, points: window.points,
}; duration: window.duration,
remaining: result?.remainingPoints ?? window.points,
resetIn: result?.msBeforeNext ?? 0,
};
})
);
return { return {
queueName, queueName,
handler, handler,
operation, operation,
appliedRule: applicableRule, appliedRule: applicableRule,
limit, cost: limiterEntry.cost,
windows,
}; };
} catch (error) { } catch (error) {
this.logger.error('Failed to get rate limit status', { this.logger.error('Failed to get rate limit status', {
@ -282,6 +319,7 @@ export class QueueRateLimiter {
handler, handler,
operation, operation,
appliedRule: applicableRule, appliedRule: applicableRule,
cost: limiterEntry.cost,
}; };
} }
} }
@ -297,9 +335,12 @@ export class QueueRateLimiter {
if (rule) { if (rule) {
const key = this.getRuleKey(rule.level, rule.queueName, rule.handler, rule.operation); const key = this.getRuleKey(rule.level, rule.queueName, rule.handler, rule.operation);
const limiter = this.limiters.get(key); const limiterEntry = this.limiters.get(key);
if (limiter) { if (limiterEntry) {
await limiter.delete(consumerKey); // Reset all windows for this consumer
await Promise.all(
limiterEntry.windows.map(({ limiter }) => limiter.delete(consumerKey))
);
} }
} }
} else { } else {

View file

@ -61,8 +61,6 @@ export interface QueueOptions {
concurrency?: number; concurrency?: number;
enableMetrics?: boolean; enableMetrics?: boolean;
enableDLQ?: boolean; enableDLQ?: boolean;
enableRateLimit?: boolean;
rateLimitRules?: RateLimitRule[]; // Queue-specific rate limit rules
handlerRegistry?: any; // HandlerRegistry from @stock-bot/handler-registry handlerRegistry?: any; // HandlerRegistry from @stock-bot/handler-registry
} }
@ -70,8 +68,6 @@ export interface QueueManagerConfig {
redis: RedisConfig; redis: RedisConfig;
defaultQueueOptions?: QueueOptions; defaultQueueOptions?: QueueOptions;
enableScheduledJobs?: boolean; enableScheduledJobs?: boolean;
globalRateLimit?: RateLimitConfig;
rateLimitRules?: RateLimitRule[]; // Global rate limit rules
serviceName?: string; // For service discovery and namespacing serviceName?: string; // For service discovery and namespacing
autoDiscoverHandlers?: boolean; // Auto-discover queue routes from handler registry autoDiscoverHandlers?: boolean; // Auto-discover queue routes from handler registry
} }
@ -108,12 +104,17 @@ export interface HandlerInitializer {
} }
// Rate limiting types // Rate limiting types
export interface RateLimitConfig { export interface RateLimitWindow {
points: number; points: number;
duration: number; duration: number;
blockDuration?: number; blockDuration?: number;
} }
export interface RateLimitConfig {
limits?: RateLimitWindow[];
cost?: number;
}
export interface RateLimitRule { export interface RateLimitRule {
level: 'global' | 'queue' | 'handler' | 'operation'; level: 'global' | 'queue' | 'handler' | 'operation';
queueName?: string; // For queue-level limits queueName?: string; // For queue-level limits