testing ratelimit
This commit is contained in:
parent
95b1381480
commit
a616c92656
14 changed files with 411 additions and 3 deletions
|
|
@ -182,7 +182,7 @@
|
|||
},
|
||||
"services": {
|
||||
"dataIngestion": {
|
||||
"port": 2001,
|
||||
"port": 2009,
|
||||
"workers": 5,
|
||||
"queues": {
|
||||
"ceo": { "concurrency": 2 },
|
||||
|
|
|
|||
|
|
@ -0,0 +1,62 @@
|
|||
import type { EodHandler } from '../eod.handler';
|
||||
import { EOD_CONFIG } from '../shared/config';
|
||||
|
||||
/**
|
||||
* Simulates fetching daily price data
|
||||
* This is a high-volume operation that should be rate limited
|
||||
*/
|
||||
export async function fetchDailyPrices(
|
||||
this: EodHandler
|
||||
): Promise<unknown> {
|
||||
const { logger, mongodb } = this;
|
||||
|
||||
// Generate fake data for testing
|
||||
const symbols = ['AAPL', 'GOOGL', 'MSFT', 'AMZN', 'TSLA'];
|
||||
const date = new Date().toISOString().split('T')[0];
|
||||
|
||||
logger.info('Fetching daily prices', {
|
||||
symbols: symbols.length,
|
||||
date,
|
||||
timestamp: new Date().toISOString()
|
||||
});
|
||||
|
||||
try {
|
||||
// Simulate API call delay
|
||||
await new Promise(resolve => setTimeout(resolve, 100 + Math.random() * 200));
|
||||
|
||||
// Simulate processing each symbol
|
||||
const results = [];
|
||||
for (const symbol of symbols) {
|
||||
const priceData = {
|
||||
symbol,
|
||||
date,
|
||||
open: 100 + Math.random() * 50,
|
||||
high: 120 + Math.random() * 50,
|
||||
low: 90 + Math.random() * 50,
|
||||
close: 110 + Math.random() * 50,
|
||||
volume: Math.floor(1000000 + Math.random() * 5000000),
|
||||
timestamp: new Date().toISOString()
|
||||
};
|
||||
|
||||
results.push(priceData);
|
||||
|
||||
logger.debug('Processed price data', { symbol, date });
|
||||
}
|
||||
|
||||
// Simulate saving to database
|
||||
if (mongodb) {
|
||||
await mongodb.batchUpsert('testPrices',results, ['symbol', 'date']);
|
||||
logger.info('Saved price data to MongoDB', { count: results.length });
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
processed: results.length,
|
||||
date,
|
||||
message: `Fetched daily prices for ${symbols.length} symbols`
|
||||
};
|
||||
} catch (error) {
|
||||
logger.error('Failed to fetch daily prices', { error, symbols });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,60 @@
|
|||
import type { EodHandler } from '../eod.handler';
|
||||
|
||||
/**
|
||||
* Simulates fetching fundamental data
|
||||
* This is a medium-volume operation with moderate rate limits
|
||||
*/
|
||||
export async function fetchFundamentals(
|
||||
this: EodHandler
|
||||
): Promise<unknown> {
|
||||
const { logger, mongodb } = this;
|
||||
|
||||
// Generate fake data for testing
|
||||
const symbols = ['AAPL', 'GOOGL', 'MSFT'];
|
||||
const metrics = ['pe_ratio', 'market_cap', 'revenue', 'earnings'];
|
||||
|
||||
logger.info('Fetching fundamentals', {
|
||||
symbols: symbols.length,
|
||||
metrics,
|
||||
timestamp: new Date().toISOString()
|
||||
});
|
||||
|
||||
try {
|
||||
// Simulate API call delay (longer than prices)
|
||||
await new Promise(resolve => setTimeout(resolve, 200 + Math.random() * 300));
|
||||
|
||||
// Simulate processing each symbol
|
||||
const results = [];
|
||||
for (const symbol of symbols) {
|
||||
const fundamentalData = {
|
||||
symbol,
|
||||
pe_ratio: 15 + Math.random() * 20,
|
||||
market_cap: Math.floor(1000000000 + Math.random() * 500000000000),
|
||||
revenue: Math.floor(10000000 + Math.random() * 100000000000),
|
||||
earnings: Math.floor(1000000 + Math.random() * 10000000000),
|
||||
last_updated: new Date().toISOString()
|
||||
};
|
||||
|
||||
results.push(fundamentalData);
|
||||
|
||||
logger.debug('Processed fundamental data', { symbol });
|
||||
}
|
||||
|
||||
// Simulate saving to database
|
||||
if (mongodb) {
|
||||
const result = await mongodb.batchUpsert('testFundamentals', results, ['symbol']);
|
||||
logger.info('Saved fundamental data to MongoDB', {
|
||||
count: results.length
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
processed: results.length,
|
||||
message: `Fetched fundamentals for ${symbols.length} symbols`
|
||||
};
|
||||
} catch (error) {
|
||||
logger.error('Failed to fetch fundamentals', { error, symbols });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,65 @@
|
|||
import type { EodHandler } from '../eod.handler';
|
||||
|
||||
/**
|
||||
* Simulates fetching news data
|
||||
* This is a low-volume operation with strict rate limits
|
||||
*/
|
||||
export async function fetchNews(
|
||||
this: EodHandler
|
||||
): Promise<unknown> {
|
||||
const { logger, mongodb } = this;
|
||||
|
||||
// Generate fake data for testing
|
||||
const symbols = ['AAPL', 'TSLA'];
|
||||
const keywords = ['earnings', 'market'];
|
||||
const limit = 10;
|
||||
|
||||
logger.info('Fetching news', {
|
||||
symbols: symbols.length,
|
||||
keywords: keywords.length,
|
||||
limit,
|
||||
timestamp: new Date().toISOString()
|
||||
});
|
||||
|
||||
try {
|
||||
// Simulate API call delay (longest delay for news)
|
||||
await new Promise(resolve => setTimeout(resolve, 300 + Math.random() * 500));
|
||||
|
||||
// Simulate fetching news articles
|
||||
const articles = [];
|
||||
const articleCount = Math.min(limit, 5 + Math.floor(Math.random() * 10));
|
||||
|
||||
for (let i = 0; i < articleCount; i++) {
|
||||
const article = {
|
||||
id: `news_${Date.now()}_${i}`,
|
||||
title: `Breaking: Market Update ${i + 1}`,
|
||||
summary: `Important market news regarding ${symbols.join(', ') || 'general market'}`,
|
||||
symbols: symbols,
|
||||
keywords: keywords,
|
||||
published_at: new Date(Date.now() - Math.random() * 86400000).toISOString(),
|
||||
source: ['Reuters', 'Bloomberg', 'CNBC', 'WSJ'][Math.floor(Math.random() * 4)],
|
||||
sentiment: ['positive', 'negative', 'neutral'][Math.floor(Math.random() * 3)],
|
||||
fetched_at: new Date().toISOString()
|
||||
};
|
||||
|
||||
articles.push(article);
|
||||
}
|
||||
|
||||
logger.debug('Fetched news articles', { count: articles.length });
|
||||
|
||||
// Simulate saving to database
|
||||
if (mongodb && articles.length > 0) {
|
||||
await mongodb.batchUpsert('testNews', articles, ['id']);
|
||||
logger.info('Saved news articles to MongoDB', { count: articles.length });
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
articles: articles.length,
|
||||
message: `Fetched ${articles.length} news articles`
|
||||
};
|
||||
} catch (error) {
|
||||
logger.error('Failed to fetch news', { error, symbols, keywords });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,3 @@
|
|||
export * from './fetch-daily-prices.action';
|
||||
export * from './fetch-fundamentals.action';
|
||||
export * from './fetch-news.action';
|
||||
139
apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts
Normal file
139
apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts
Normal file
|
|
@ -0,0 +1,139 @@
|
|||
import {
|
||||
BaseHandler,
|
||||
Handler,
|
||||
Operation,
|
||||
RateLimit,
|
||||
ScheduledOperation
|
||||
} from '@stock-bot/handlers';
|
||||
import type { DataIngestionServices } from '../../types';
|
||||
import {
|
||||
fetchDailyPrices,
|
||||
fetchFundamentals,
|
||||
fetchNews,
|
||||
} from './actions';
|
||||
|
||||
/**
|
||||
* EOD (End of Day) Handler for testing rate limits
|
||||
* This handler demonstrates different rate limit configurations
|
||||
*
|
||||
* Handler-level rate limit: 100 requests per minute for all operations
|
||||
* Individual operations can override this with their own limits
|
||||
*/
|
||||
@Handler('eod')
|
||||
// @Disabled()
|
||||
@RateLimit({ points: 1, duration: 10, blockDuration: 10 })
|
||||
export class EodHandler extends BaseHandler<DataIngestionServices> {
|
||||
constructor(services: any) {
|
||||
super(services);
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch daily price data - High volume operation
|
||||
* Rate limit: 50 requests per minute (overrides handler-level limit)
|
||||
*/
|
||||
@Operation('fetch-daily-prices')
|
||||
@RateLimit({ points: 3, duration: 10, blockDuration: 5 })
|
||||
fetchDailyPrices = fetchDailyPrices;
|
||||
|
||||
/**
|
||||
* Fetch fundamental data - Medium volume operation
|
||||
* Rate limit: 20 requests per minute
|
||||
*/
|
||||
@Operation('fetch-fundamentals')
|
||||
@RateLimit({ points: 2, duration: 10, blockDuration: 10 })
|
||||
fetchFundamentals = fetchFundamentals;
|
||||
|
||||
/**
|
||||
* Fetch news data - Low volume operation
|
||||
* Rate limit: 10 requests per minute (most restrictive)
|
||||
*/
|
||||
@Operation('fetch-news')
|
||||
fetchNews = fetchNews;
|
||||
|
||||
/**
|
||||
* Test burst operations - For testing rate limit behavior
|
||||
* This doesn't have its own rate limit, so it uses the handler-level limit (100/min)
|
||||
*/
|
||||
@Operation('test-burst')
|
||||
async testBurstOperations(input: { operationsToTest: string[], burstSize: number }): Promise<unknown> {
|
||||
this.logger.info('Testing burst operations', input);
|
||||
|
||||
const results = {
|
||||
attempted: 0,
|
||||
scheduled: 0,
|
||||
failed: 0
|
||||
};
|
||||
|
||||
try {
|
||||
const promises = [];
|
||||
for (let i = 0; i < input.burstSize; i++) {
|
||||
const operation = input.operationsToTest[i % input.operationsToTest.length] || 'fetch-news';
|
||||
results.attempted++;
|
||||
|
||||
const promise = this.scheduleOperation(operation, {}).then(() => {
|
||||
results.scheduled++;
|
||||
}).catch(() => {
|
||||
results.failed++;
|
||||
});
|
||||
|
||||
promises.push(promise);
|
||||
}
|
||||
|
||||
await Promise.allSettled(promises);
|
||||
|
||||
return {
|
||||
success: true,
|
||||
results,
|
||||
message: `Scheduled ${results.scheduled}/${results.attempted} operations`
|
||||
};
|
||||
} catch (error) {
|
||||
this.logger.error('Burst test failed', { error });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Scheduled job to fetch daily prices
|
||||
* Runs every day at 6 PM (after market close)
|
||||
*/
|
||||
@ScheduledOperation('eod-daily-prices', '0 18 * * *', {
|
||||
priority: 5,
|
||||
description: 'Fetch daily price data after market close',
|
||||
immediately: false,
|
||||
})
|
||||
async scheduledFetchDailyPrices(): Promise<unknown> {
|
||||
this.logger.info('Starting scheduled daily price fetch');
|
||||
return this.fetchDailyPrices();
|
||||
}
|
||||
|
||||
/**
|
||||
* Scheduled job to fetch fundamentals
|
||||
* Runs weekly on Sunday
|
||||
*/
|
||||
@ScheduledOperation('eod-fundamentals', '0 0 * * 0', {
|
||||
priority: 5,
|
||||
description: 'Weekly fundamental data update',
|
||||
immediately: false,
|
||||
})
|
||||
async scheduledFetchFundamentals(): Promise<unknown> {
|
||||
this.logger.info('Starting scheduled fundamentals fetch');
|
||||
return this.fetchFundamentals();
|
||||
}
|
||||
|
||||
/**
|
||||
* Scheduled job to test rate limits
|
||||
* Runs every 5 minutes for testing
|
||||
*/
|
||||
@ScheduledOperation('eod-rate-limit-test', '*/5 * * * *', {
|
||||
priority: 5,
|
||||
description: 'Test rate limit behavior',
|
||||
immediately: true,
|
||||
})
|
||||
async scheduledRateLimitTest(): Promise<unknown> {
|
||||
this.logger.info('Starting rate limit test');
|
||||
return this.testBurstOperations({
|
||||
operationsToTest: ['fetch-daily-prices', 'fetch-fundamentals', 'fetch-news'],
|
||||
burstSize: 200
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,4 @@
|
|||
export const EOD_CONFIG = {
|
||||
// API configuration
|
||||
API_BASE_URL: 'https://eodhd.com/api/',
|
||||
};
|
||||
|
|
@ -0,0 +1 @@
|
|||
export * from './config';
|
||||
|
|
@ -13,6 +13,7 @@ import { QMHandler } from './qm/qm.handler';
|
|||
import { WebShareHandler } from './webshare/webshare.handler';
|
||||
import { TradingViewHandler } from './tradingview/tradingview.handler';
|
||||
import { TeHandler } from './te/te.handler';
|
||||
import { EodHandler } from './eod/eod.handler';
|
||||
|
||||
// Add more handler imports as needed
|
||||
|
||||
|
|
@ -28,7 +29,7 @@ export async function initializeAllHandlers(serviceContainer: IServiceContainer)
|
|||
// The HandlerScanner in the DI container will handle the actual registration
|
||||
// We just need to ensure handlers are imported so their decorators run
|
||||
|
||||
const handlers = [CeoHandler, IbHandler, QMHandler, WebShareHandler, TradingViewHandler, TeHandler];
|
||||
const handlers = [CeoHandler, IbHandler, QMHandler, WebShareHandler, TradingViewHandler, TeHandler, EodHandler];
|
||||
|
||||
logger.info('Handler imports loaded', {
|
||||
count: handlers.length,
|
||||
|
|
|
|||
|
|
@ -101,6 +101,8 @@ export class HandlerScanner {
|
|||
const schedules = HandlerClass.__schedules || [];
|
||||
const isDisabled = HandlerClass.__disabled || false;
|
||||
const disabledOperations = HandlerClass.__disabledOperations || [];
|
||||
const handlerRateLimit = HandlerClass.__handlerRateLimit || null;
|
||||
const operationRateLimits = HandlerClass.__rateLimits || {};
|
||||
|
||||
if (isDisabled) {
|
||||
this.logger.debug('Skipping disabled handler', { handlerName });
|
||||
|
|
@ -131,13 +133,14 @@ export class HandlerScanner {
|
|||
return !isDisabled;
|
||||
});
|
||||
|
||||
// Build metadata
|
||||
// Build metadata with rate limit information
|
||||
const metadata: HandlerMetadata = {
|
||||
name: handlerName,
|
||||
service: this.options.serviceName,
|
||||
operations: enabledOperations.map((op: any) => ({
|
||||
name: op.name,
|
||||
method: op.method,
|
||||
rateLimit: operationRateLimits[op.method] || null,
|
||||
})),
|
||||
schedules: enabledSchedules.map((schedule: any) => ({
|
||||
operation: schedule.operation,
|
||||
|
|
@ -148,6 +151,7 @@ export class HandlerScanner {
|
|||
payload: schedule.payload,
|
||||
batch: schedule.batch,
|
||||
})),
|
||||
rateLimit: handlerRateLimit,
|
||||
};
|
||||
|
||||
// Build configuration with operation handlers
|
||||
|
|
|
|||
|
|
@ -5,6 +5,15 @@
|
|||
|
||||
import type { JobHandler, ScheduledJob } from '@stock-bot/types';
|
||||
|
||||
/**
|
||||
* Rate limit configuration
|
||||
*/
|
||||
export interface RateLimitConfig {
|
||||
points: number;
|
||||
duration: number;
|
||||
blockDuration?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Metadata for a single operation within a handler
|
||||
*/
|
||||
|
|
@ -12,6 +21,7 @@ export interface OperationMetadata {
|
|||
name: string;
|
||||
method: string;
|
||||
description?: string;
|
||||
rateLimit?: RateLimitConfig | null;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -35,6 +45,7 @@ export interface HandlerMetadata {
|
|||
schedules?: ScheduleMetadata[];
|
||||
version?: string;
|
||||
description?: string;
|
||||
rateLimit?: RateLimitConfig | null;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
56
libs/core/handlers/src/decorators/rate-limit.ts
Normal file
56
libs/core/handlers/src/decorators/rate-limit.ts
Normal file
|
|
@ -0,0 +1,56 @@
|
|||
/**
|
||||
* Rate limit configuration for handlers and operations
|
||||
*/
|
||||
export interface RateLimitConfig {
|
||||
points: number; // Number of allowed requests
|
||||
duration: number; // Time window in seconds
|
||||
blockDuration?: number; // How long to block after limit is hit (seconds)
|
||||
}
|
||||
|
||||
/**
|
||||
* RateLimit decorator - configures rate limiting for handlers or operations
|
||||
*
|
||||
* Can be applied to:
|
||||
* - Classes: Sets default rate limit for all operations in the handler
|
||||
* - Methods: Sets specific rate limit for individual operations (overrides handler-level)
|
||||
*
|
||||
* @param config Rate limit configuration
|
||||
*
|
||||
* @example
|
||||
* // Handler-level rate limit
|
||||
* @Handler('myHandler')
|
||||
* @RateLimit({ points: 100, duration: 60 })
|
||||
* class MyHandler extends BaseHandler {
|
||||
* // All operations inherit the 100/minute limit
|
||||
* }
|
||||
*
|
||||
* @example
|
||||
* // Operation-specific rate limit
|
||||
* @Operation('fetch-data')
|
||||
* @RateLimit({ points: 10, duration: 60, blockDuration: 30 })
|
||||
* async fetchData() {
|
||||
* // This operation is limited to 10/minute
|
||||
* }
|
||||
*/
|
||||
export function RateLimit(config: RateLimitConfig) {
|
||||
return function (target: any, propertyKey?: string, descriptor?: PropertyDescriptor): any {
|
||||
if (propertyKey) {
|
||||
// Method decorator - operation-specific rate limit
|
||||
const constructor = target.constructor;
|
||||
|
||||
// Initialize rate limits array if not exists
|
||||
if (!constructor.__rateLimits) {
|
||||
constructor.__rateLimits = {};
|
||||
}
|
||||
|
||||
// Store rate limit config for this operation
|
||||
constructor.__rateLimits[propertyKey] = config;
|
||||
|
||||
return descriptor;
|
||||
} else {
|
||||
// Class decorator - handler-level rate limit
|
||||
(target as any).__handlerRateLimit = config;
|
||||
return target;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
@ -7,6 +7,8 @@ export {
|
|||
ScheduledOperation,
|
||||
Disabled,
|
||||
} from './decorators/decorators';
|
||||
export { RateLimit } from './decorators/rate-limit';
|
||||
export type { RateLimitConfig } from './decorators/rate-limit';
|
||||
export { createJobHandler } from './utils/create-job-handler';
|
||||
|
||||
// Re-export commonly used types from @stock-bot/types for convenience
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue