diff --git a/apps/stock/config/config/default.json b/apps/stock/config/config/default.json index e8e112d..aa414e9 100644 --- a/apps/stock/config/config/default.json +++ b/apps/stock/config/config/default.json @@ -182,7 +182,7 @@ }, "services": { "dataIngestion": { - "port": 2001, + "port": 2009, "workers": 5, "queues": { "ceo": { "concurrency": 2 }, diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/fetch-daily-prices.action.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/fetch-daily-prices.action.ts new file mode 100644 index 0000000..38a651a --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/fetch-daily-prices.action.ts @@ -0,0 +1,62 @@ +import type { EodHandler } from '../eod.handler'; +import { EOD_CONFIG } from '../shared/config'; + +/** + * Simulates fetching daily price data + * This is a high-volume operation that should be rate limited + */ +export async function fetchDailyPrices( + this: EodHandler +): Promise { + const { logger, mongodb } = this; + + // Generate fake data for testing + const symbols = ['AAPL', 'GOOGL', 'MSFT', 'AMZN', 'TSLA']; + const date = new Date().toISOString().split('T')[0]; + + logger.info('Fetching daily prices', { + symbols: symbols.length, + date, + timestamp: new Date().toISOString() + }); + + try { + // Simulate API call delay + await new Promise(resolve => setTimeout(resolve, 100 + Math.random() * 200)); + + // Simulate processing each symbol + const results = []; + for (const symbol of symbols) { + const priceData = { + symbol, + date, + open: 100 + Math.random() * 50, + high: 120 + Math.random() * 50, + low: 90 + Math.random() * 50, + close: 110 + Math.random() * 50, + volume: Math.floor(1000000 + Math.random() * 5000000), + timestamp: new Date().toISOString() + }; + + results.push(priceData); + + logger.debug('Processed price data', { symbol, date }); + } + + // Simulate saving to database + if (mongodb) { + await mongodb.batchUpsert('testPrices',results, ['symbol', 'date']); + logger.info('Saved price data to MongoDB', { count: results.length }); + } + + return { + success: true, + processed: results.length, + date, + message: `Fetched daily prices for ${symbols.length} symbols` + }; + } catch (error) { + logger.error('Failed to fetch daily prices', { error, symbols }); + throw error; + } +} \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/fetch-fundamentals.action.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/fetch-fundamentals.action.ts new file mode 100644 index 0000000..4b5a408 --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/fetch-fundamentals.action.ts @@ -0,0 +1,60 @@ +import type { EodHandler } from '../eod.handler'; + +/** + * Simulates fetching fundamental data + * This is a medium-volume operation with moderate rate limits + */ +export async function fetchFundamentals( + this: EodHandler +): Promise { + const { logger, mongodb } = this; + + // Generate fake data for testing + const symbols = ['AAPL', 'GOOGL', 'MSFT']; + const metrics = ['pe_ratio', 'market_cap', 'revenue', 'earnings']; + + logger.info('Fetching fundamentals', { + symbols: symbols.length, + metrics, + timestamp: new Date().toISOString() + }); + + try { + // Simulate API call delay (longer than prices) + await new Promise(resolve => setTimeout(resolve, 200 + Math.random() * 300)); + + // Simulate processing each symbol + const results = []; + for (const symbol of symbols) { + const fundamentalData = { + symbol, + pe_ratio: 15 + Math.random() * 20, + market_cap: Math.floor(1000000000 + Math.random() * 500000000000), + revenue: Math.floor(10000000 + Math.random() * 100000000000), + earnings: Math.floor(1000000 + Math.random() * 10000000000), + last_updated: new Date().toISOString() + }; + + results.push(fundamentalData); + + logger.debug('Processed fundamental data', { symbol }); + } + + // Simulate saving to database + if (mongodb) { + const result = await mongodb.batchUpsert('testFundamentals', results, ['symbol']); + logger.info('Saved fundamental data to MongoDB', { + count: results.length + }); + } + + return { + success: true, + processed: results.length, + message: `Fetched fundamentals for ${symbols.length} symbols` + }; + } catch (error) { + logger.error('Failed to fetch fundamentals', { error, symbols }); + throw error; + } +} \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/fetch-news.action.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/fetch-news.action.ts new file mode 100644 index 0000000..07ade84 --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/fetch-news.action.ts @@ -0,0 +1,65 @@ +import type { EodHandler } from '../eod.handler'; + +/** + * Simulates fetching news data + * This is a low-volume operation with strict rate limits + */ +export async function fetchNews( + this: EodHandler +): Promise { + const { logger, mongodb } = this; + + // Generate fake data for testing + const symbols = ['AAPL', 'TSLA']; + const keywords = ['earnings', 'market']; + const limit = 10; + + logger.info('Fetching news', { + symbols: symbols.length, + keywords: keywords.length, + limit, + timestamp: new Date().toISOString() + }); + + try { + // Simulate API call delay (longest delay for news) + await new Promise(resolve => setTimeout(resolve, 300 + Math.random() * 500)); + + // Simulate fetching news articles + const articles = []; + const articleCount = Math.min(limit, 5 + Math.floor(Math.random() * 10)); + + for (let i = 0; i < articleCount; i++) { + const article = { + id: `news_${Date.now()}_${i}`, + title: `Breaking: Market Update ${i + 1}`, + summary: `Important market news regarding ${symbols.join(', ') || 'general market'}`, + symbols: symbols, + keywords: keywords, + published_at: new Date(Date.now() - Math.random() * 86400000).toISOString(), + source: ['Reuters', 'Bloomberg', 'CNBC', 'WSJ'][Math.floor(Math.random() * 4)], + sentiment: ['positive', 'negative', 'neutral'][Math.floor(Math.random() * 3)], + fetched_at: new Date().toISOString() + }; + + articles.push(article); + } + + logger.debug('Fetched news articles', { count: articles.length }); + + // Simulate saving to database + if (mongodb && articles.length > 0) { + await mongodb.batchUpsert('testNews', articles, ['id']); + logger.info('Saved news articles to MongoDB', { count: articles.length }); + } + + return { + success: true, + articles: articles.length, + message: `Fetched ${articles.length} news articles` + }; + } catch (error) { + logger.error('Failed to fetch news', { error, symbols, keywords }); + throw error; + } +} \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/eod/actions/index.ts b/apps/stock/data-ingestion/src/handlers/eod/actions/index.ts new file mode 100644 index 0000000..b1501dc --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/eod/actions/index.ts @@ -0,0 +1,3 @@ +export * from './fetch-daily-prices.action'; +export * from './fetch-fundamentals.action'; +export * from './fetch-news.action'; diff --git a/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts b/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts new file mode 100644 index 0000000..e9c9569 --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/eod/eod.handler.ts @@ -0,0 +1,139 @@ +import { + BaseHandler, + Handler, + Operation, + RateLimit, + ScheduledOperation +} from '@stock-bot/handlers'; +import type { DataIngestionServices } from '../../types'; +import { + fetchDailyPrices, + fetchFundamentals, + fetchNews, +} from './actions'; + +/** + * EOD (End of Day) Handler for testing rate limits + * This handler demonstrates different rate limit configurations + * + * Handler-level rate limit: 100 requests per minute for all operations + * Individual operations can override this with their own limits + */ +@Handler('eod') +// @Disabled() +@RateLimit({ points: 1, duration: 10, blockDuration: 10 }) +export class EodHandler extends BaseHandler { + constructor(services: any) { + super(services); + } + + /** + * Fetch daily price data - High volume operation + * Rate limit: 50 requests per minute (overrides handler-level limit) + */ + @Operation('fetch-daily-prices') + @RateLimit({ points: 3, duration: 10, blockDuration: 5 }) + fetchDailyPrices = fetchDailyPrices; + + /** + * Fetch fundamental data - Medium volume operation + * Rate limit: 20 requests per minute + */ + @Operation('fetch-fundamentals') + @RateLimit({ points: 2, duration: 10, blockDuration: 10 }) + fetchFundamentals = fetchFundamentals; + + /** + * Fetch news data - Low volume operation + * Rate limit: 10 requests per minute (most restrictive) + */ + @Operation('fetch-news') + fetchNews = fetchNews; + + /** + * Test burst operations - For testing rate limit behavior + * This doesn't have its own rate limit, so it uses the handler-level limit (100/min) + */ + @Operation('test-burst') + async testBurstOperations(input: { operationsToTest: string[], burstSize: number }): Promise { + this.logger.info('Testing burst operations', input); + + const results = { + attempted: 0, + scheduled: 0, + failed: 0 + }; + + try { + const promises = []; + for (let i = 0; i < input.burstSize; i++) { + const operation = input.operationsToTest[i % input.operationsToTest.length] || 'fetch-news'; + results.attempted++; + + const promise = this.scheduleOperation(operation, {}).then(() => { + results.scheduled++; + }).catch(() => { + results.failed++; + }); + + promises.push(promise); + } + + await Promise.allSettled(promises); + + return { + success: true, + results, + message: `Scheduled ${results.scheduled}/${results.attempted} operations` + }; + } catch (error) { + this.logger.error('Burst test failed', { error }); + throw error; + } + } + + /** + * Scheduled job to fetch daily prices + * Runs every day at 6 PM (after market close) + */ + @ScheduledOperation('eod-daily-prices', '0 18 * * *', { + priority: 5, + description: 'Fetch daily price data after market close', + immediately: false, + }) + async scheduledFetchDailyPrices(): Promise { + this.logger.info('Starting scheduled daily price fetch'); + return this.fetchDailyPrices(); + } + + /** + * Scheduled job to fetch fundamentals + * Runs weekly on Sunday + */ + @ScheduledOperation('eod-fundamentals', '0 0 * * 0', { + priority: 5, + description: 'Weekly fundamental data update', + immediately: false, + }) + async scheduledFetchFundamentals(): Promise { + this.logger.info('Starting scheduled fundamentals fetch'); + return this.fetchFundamentals(); + } + + /** + * Scheduled job to test rate limits + * Runs every 5 minutes for testing + */ + @ScheduledOperation('eod-rate-limit-test', '*/5 * * * *', { + priority: 5, + description: 'Test rate limit behavior', + immediately: true, + }) + async scheduledRateLimitTest(): Promise { + this.logger.info('Starting rate limit test'); + return this.testBurstOperations({ + operationsToTest: ['fetch-daily-prices', 'fetch-fundamentals', 'fetch-news'], + burstSize: 200 + }); + } +} \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/eod/shared/config.ts b/apps/stock/data-ingestion/src/handlers/eod/shared/config.ts new file mode 100644 index 0000000..4355f15 --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/eod/shared/config.ts @@ -0,0 +1,4 @@ +export const EOD_CONFIG = { + // API configuration + API_BASE_URL: 'https://eodhd.com/api/', +}; \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/eod/shared/index.ts b/apps/stock/data-ingestion/src/handlers/eod/shared/index.ts new file mode 100644 index 0000000..4226c05 --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/eod/shared/index.ts @@ -0,0 +1 @@ +export * from './config'; diff --git a/apps/stock/data-ingestion/src/handlers/eod/shared/types.ts b/apps/stock/data-ingestion/src/handlers/eod/shared/types.ts new file mode 100644 index 0000000..e69de29 diff --git a/apps/stock/data-ingestion/src/handlers/index.ts b/apps/stock/data-ingestion/src/handlers/index.ts index c1d4189..a64d177 100644 --- a/apps/stock/data-ingestion/src/handlers/index.ts +++ b/apps/stock/data-ingestion/src/handlers/index.ts @@ -13,6 +13,7 @@ import { QMHandler } from './qm/qm.handler'; import { WebShareHandler } from './webshare/webshare.handler'; import { TradingViewHandler } from './tradingview/tradingview.handler'; import { TeHandler } from './te/te.handler'; +import { EodHandler } from './eod/eod.handler'; // Add more handler imports as needed @@ -28,7 +29,7 @@ export async function initializeAllHandlers(serviceContainer: IServiceContainer) // The HandlerScanner in the DI container will handle the actual registration // We just need to ensure handlers are imported so their decorators run - const handlers = [CeoHandler, IbHandler, QMHandler, WebShareHandler, TradingViewHandler, TeHandler]; + const handlers = [CeoHandler, IbHandler, QMHandler, WebShareHandler, TradingViewHandler, TeHandler, EodHandler]; logger.info('Handler imports loaded', { count: handlers.length, diff --git a/libs/core/di/src/scanner/handler-scanner.ts b/libs/core/di/src/scanner/handler-scanner.ts index a39c0b1..ce97f4c 100644 --- a/libs/core/di/src/scanner/handler-scanner.ts +++ b/libs/core/di/src/scanner/handler-scanner.ts @@ -101,6 +101,8 @@ export class HandlerScanner { const schedules = HandlerClass.__schedules || []; const isDisabled = HandlerClass.__disabled || false; const disabledOperations = HandlerClass.__disabledOperations || []; + const handlerRateLimit = HandlerClass.__handlerRateLimit || null; + const operationRateLimits = HandlerClass.__rateLimits || {}; if (isDisabled) { this.logger.debug('Skipping disabled handler', { handlerName }); @@ -131,13 +133,14 @@ export class HandlerScanner { return !isDisabled; }); - // Build metadata + // Build metadata with rate limit information const metadata: HandlerMetadata = { name: handlerName, service: this.options.serviceName, operations: enabledOperations.map((op: any) => ({ name: op.name, method: op.method, + rateLimit: operationRateLimits[op.method] || null, })), schedules: enabledSchedules.map((schedule: any) => ({ operation: schedule.operation, @@ -148,6 +151,7 @@ export class HandlerScanner { payload: schedule.payload, batch: schedule.batch, })), + rateLimit: handlerRateLimit, }; // Build configuration with operation handlers diff --git a/libs/core/handler-registry/src/types.ts b/libs/core/handler-registry/src/types.ts index 24544de..a1be999 100644 --- a/libs/core/handler-registry/src/types.ts +++ b/libs/core/handler-registry/src/types.ts @@ -5,6 +5,15 @@ import type { JobHandler, ScheduledJob } from '@stock-bot/types'; +/** + * Rate limit configuration + */ +export interface RateLimitConfig { + points: number; + duration: number; + blockDuration?: number; +} + /** * Metadata for a single operation within a handler */ @@ -12,6 +21,7 @@ export interface OperationMetadata { name: string; method: string; description?: string; + rateLimit?: RateLimitConfig | null; } /** @@ -35,6 +45,7 @@ export interface HandlerMetadata { schedules?: ScheduleMetadata[]; version?: string; description?: string; + rateLimit?: RateLimitConfig | null; } /** diff --git a/libs/core/handlers/src/decorators/rate-limit.ts b/libs/core/handlers/src/decorators/rate-limit.ts new file mode 100644 index 0000000..427f61e --- /dev/null +++ b/libs/core/handlers/src/decorators/rate-limit.ts @@ -0,0 +1,56 @@ +/** + * Rate limit configuration for handlers and operations + */ +export interface RateLimitConfig { + points: number; // Number of allowed requests + duration: number; // Time window in seconds + blockDuration?: number; // How long to block after limit is hit (seconds) +} + +/** + * RateLimit decorator - configures rate limiting for handlers or operations + * + * Can be applied to: + * - Classes: Sets default rate limit for all operations in the handler + * - Methods: Sets specific rate limit for individual operations (overrides handler-level) + * + * @param config Rate limit configuration + * + * @example + * // Handler-level rate limit + * @Handler('myHandler') + * @RateLimit({ points: 100, duration: 60 }) + * class MyHandler extends BaseHandler { + * // All operations inherit the 100/minute limit + * } + * + * @example + * // Operation-specific rate limit + * @Operation('fetch-data') + * @RateLimit({ points: 10, duration: 60, blockDuration: 30 }) + * async fetchData() { + * // This operation is limited to 10/minute + * } + */ +export function RateLimit(config: RateLimitConfig) { + return function (target: any, propertyKey?: string, descriptor?: PropertyDescriptor): any { + if (propertyKey) { + // Method decorator - operation-specific rate limit + const constructor = target.constructor; + + // Initialize rate limits array if not exists + if (!constructor.__rateLimits) { + constructor.__rateLimits = {}; + } + + // Store rate limit config for this operation + constructor.__rateLimits[propertyKey] = config; + + return descriptor; + } else { + // Class decorator - handler-level rate limit + (target as any).__handlerRateLimit = config; + return target; + } + }; +} \ No newline at end of file diff --git a/libs/core/handlers/src/index.ts b/libs/core/handlers/src/index.ts index 939cd03..5ab415e 100644 --- a/libs/core/handlers/src/index.ts +++ b/libs/core/handlers/src/index.ts @@ -7,6 +7,8 @@ export { ScheduledOperation, Disabled, } from './decorators/decorators'; +export { RateLimit } from './decorators/rate-limit'; +export type { RateLimitConfig } from './decorators/rate-limit'; export { createJobHandler } from './utils/create-job-handler'; // Re-export commonly used types from @stock-bot/types for convenience