diff --git a/DEVELOPMENT-ROADMAP.md b/DEVELOPMENT-ROADMAP.md new file mode 100644 index 0000000..56cc19b --- /dev/null +++ b/DEVELOPMENT-ROADMAP.md @@ -0,0 +1,377 @@ +# ๐Ÿ“‹ Stock Bot Development Roadmap + +*Last Updated: June 2025* + +## ๐ŸŽฏ Overview + +This document outlines the development plan for the Stock Bot platform, focusing on building a robust data pipeline from market data providers through processing layers to trading execution. The plan emphasizes establishing solid foundational layers before adding advanced features. + +## ๐Ÿ—๏ธ Architecture Philosophy + +``` +Raw Data โ†’ Clean Data โ†’ Insights โ†’ Strategies โ†’ Execution โ†’ Monitoring +``` + +Our approach prioritizes: +- **Data Quality First**: Clean, validated data is the foundation +- **Incremental Complexity**: Start simple, add sophistication gradually +- **Monitoring Everything**: Observability at each layer +- **Fault Tolerance**: Graceful handling of failures and data gaps + +--- + +## ๐Ÿ“Š Phase 1: Data Foundation Layer (Current Focus) + +### 1.1 Data Service & Providers โœ… **In Progress** + +**Current Status**: Basic structure in place, needs enhancement + +**Core Components**: +- `apps/data-service` - Central data orchestration service +- Provider implementations: + - `providers/yahoo.provider.ts` โœ… Basic implementation + - `providers/quotemedia.provider.ts` โœ… Basic implementation + - `providers/proxy.provider.ts` โœ… Proxy/fallback logic + +**Immediate Tasks**: + +1. **Enhance Provider Reliability** + ```typescript + // libs/data-providers (NEW LIBRARY NEEDED) + interface DataProvider { + getName(): string; + getQuote(symbol: string): Promise; + getHistorical(symbol: string, period: TimePeriod): Promise; + isHealthy(): Promise; + getRateLimit(): RateLimitInfo; + } + ``` + +2. **Add Rate Limiting & Circuit Breakers** + - Implement in `libs/http` client + - Add provider-specific rate limits + - Circuit breaker pattern for failed providers + +3. **Data Validation Layer** + ```typescript + // libs/data-validation (NEW LIBRARY NEEDED) + - Price reasonableness checks + - Volume validation + - Timestamp validation + - Missing data detection + ``` + +4. **Provider Registry Enhancement** + - Dynamic provider switching + - Health-based routing + - Cost optimization (free โ†’ paid fallback) + +### 1.2 Raw Data Storage + +**Storage Strategy**: +- **QuestDB**: Real-time market data (OHLCV, quotes) +- **MongoDB**: Provider responses, metadata, configurations +- **PostgreSQL**: Processed/clean data, trading records + +**Schema Design**: +```sql +-- QuestDB Time-Series Tables +raw_quotes (timestamp, symbol, provider, bid, ask, last, volume) +raw_ohlcv (timestamp, symbol, provider, open, high, low, close, volume) +provider_health (timestamp, provider, latency, success_rate, error_rate) + +-- MongoDB Collections +provider_responses: { provider, symbol, timestamp, raw_response, status } +data_quality_metrics: { symbol, date, completeness, accuracy, issues[] } +``` + +**Immediate Implementation**: +1. Enhance `libs/questdb-client` with streaming inserts +2. Add data retention policies +3. Implement data compression strategies + +--- + +## ๐Ÿงน Phase 2: Data Processing & Quality Layer + +### 2.1 Data Cleaning Service โšก **Next Priority** + +**New Service**: `apps/processing-service` + +**Core Responsibilities**: +1. **Data Normalization** + - Standardize timestamps (UTC) + - Normalize price formats + - Handle split/dividend adjustments + +2. **Quality Checks** + - Outlier detection (price spikes, volume anomalies) + - Gap filling strategies + - Cross-provider validation + +3. **Data Enrichment** + - Calculate derived metrics (returns, volatility) + - Add technical indicators + - Market session classification + +**Library Enhancements Needed**: + +```typescript +// libs/data-frame (ENHANCE EXISTING) +class MarketDataFrame { + // Add time-series specific operations + fillGaps(strategy: GapFillStrategy): MarketDataFrame; + detectOutliers(method: OutlierMethod): OutlierReport; + normalize(): MarketDataFrame; + calculateReturns(period: number): MarketDataFrame; +} + +// libs/data-quality (NEW LIBRARY) +interface QualityMetrics { + completeness: number; + accuracy: number; + timeliness: number; + consistency: number; + issues: QualityIssue[]; +} +``` + +### 2.2 Technical Indicators Library + +**Enhance**: `libs/strategy-engine` or create `libs/technical-indicators` + +**Initial Indicators**: +- Moving averages (SMA, EMA, VWAP) +- Momentum (RSI, MACD, Stochastic) +- Volatility (Bollinger Bands, ATR) +- Volume (OBV, Volume Profile) + +```typescript +// Implementation approach +interface TechnicalIndicator { + name: string; + calculate(data: OHLCV[]): T[]; + getSignal(current: T, previous: T[]): Signal; +} +``` + +--- + +## ๐Ÿง  Phase 3: Analytics & Strategy Layer + +### 3.1 Strategy Engine Enhancement + +**Current**: Basic structure exists in `libs/strategy-engine` + +**Enhancements Needed**: + +1. **Strategy Framework** + ```typescript + abstract class TradingStrategy { + abstract analyze(data: MarketData): StrategySignal[]; + abstract getRiskParams(): RiskParameters; + backtest(historicalData: MarketData[]): BacktestResults; + } + ``` + +2. **Signal Generation** + - Entry/exit signals + - Position sizing recommendations + - Risk-adjusted scores + +3. **Strategy Types to Implement**: + - Mean reversion + - Momentum/trend following + - Statistical arbitrage + - Volume-based strategies + +### 3.2 Backtesting Engine + +**New Service**: Enhanced `apps/strategy-service` + +**Features**: +- Historical simulation +- Performance metrics calculation +- Risk analysis +- Strategy comparison + +--- + +## โšก Phase 4: Execution Layer + +### 4.1 Portfolio Management + +**Enhance**: `apps/portfolio-service` + +**Core Features**: +- Position tracking +- Risk monitoring +- P&L calculation +- Margin management + +### 4.2 Order Management + +**New Service**: `apps/order-service` + +**Responsibilities**: +- Order validation +- Execution routing +- Fill reporting +- Trade reconciliation + +### 4.3 Risk Management + +**New Library**: `libs/risk-engine` + +**Risk Controls**: +- Position limits +- Drawdown limits +- Correlation limits +- Volatility scaling + +--- + +## ๐Ÿ“š Library Improvements Roadmap + +### Immediate (Phase 1-2) + +1. **`libs/http`** โœ… **Current Priority** + - [ ] Rate limiting middleware + - [ ] Circuit breaker pattern + - [ ] Request/response caching + - [ ] Retry strategies with exponential backoff + +2. **`libs/questdb-client`** + - [ ] Streaming insert optimization + - [ ] Batch insert operations + - [ ] Connection pooling + - [ ] Query result caching + +3. **`libs/logger`** โœ… **Recently Updated** + - [x] Migrated to `getLogger()` pattern + - [ ] Performance metrics logging + - [ ] Structured trading event logging + +4. **`libs/data-frame`** + - [ ] Time-series operations + - [ ] Financial calculations + - [ ] Memory optimization for large datasets + +### Medium Term (Phase 3) + +5. **`libs/cache`** + - [ ] Market data caching strategies + - [ ] Cache warming for frequently accessed symbols + - [ ] Distributed caching support + +6. **`libs/config`** + - [ ] Strategy-specific configurations + - [ ] Dynamic configuration updates + - [ ] Environment-specific overrides + +### Long Term (Phase 4+) + +7. **`libs/vector-engine`** + - [ ] Market similarity analysis + - [ ] Pattern recognition + - [ ] Correlation analysis + +--- + +## ๐ŸŽฏ Immediate Next Steps (Next 2 Weeks) + +### Week 1: Data Provider Hardening +1. **Enhance HTTP Client** (`libs/http`) + - Implement rate limiting + - Add circuit breaker pattern + - Add comprehensive error handling + +2. **Provider Reliability** (`apps/data-service`) + - Add health checks for all providers + - Implement fallback logic + - Add provider performance monitoring + +3. **Data Validation** + - Create `libs/data-validation` + - Implement basic price/volume validation + - Add data quality metrics + +### Week 2: Processing Foundation +1. **Start Processing Service** (`apps/processing-service`) + - Basic data cleaning pipeline + - Outlier detection + - Gap filling strategies + +2. **QuestDB Optimization** (`libs/questdb-client`) + - Implement streaming inserts + - Add batch operations + - Optimize for time-series data + +3. **Technical Indicators** + - Start `libs/technical-indicators` + - Implement basic indicators (SMA, EMA, RSI) + +--- + +## ๐Ÿ“Š Success Metrics + +### Phase 1 Completion Criteria +- [ ] 99.9% data provider uptime +- [ ] <500ms average data latency +- [ ] Zero data quality issues for major symbols +- [ ] All providers monitored and health-checked + +### Phase 2 Completion Criteria +- [ ] Automated data quality scoring +- [ ] Gap-free historical data for 100+ symbols +- [ ] Real-time technical indicator calculation +- [ ] Processing latency <100ms + +### Phase 3 Completion Criteria +- [ ] 5+ implemented trading strategies +- [ ] Comprehensive backtesting framework +- [ ] Performance analytics dashboard + +--- + +## ๐Ÿšจ Risk Mitigation + +### Data Risks +- **Provider Failures**: Multi-provider fallback strategy +- **Data Quality**: Automated validation and alerting +- **Rate Limits**: Smart request distribution + +### Technical Risks +- **Scalability**: Horizontal scaling design +- **Latency**: Optimize critical paths early +- **Data Loss**: Comprehensive backup strategies + +### Operational Risks +- **Monitoring**: Full observability stack (Grafana, Loki, Prometheus) +- **Alerting**: Critical issue notifications +- **Documentation**: Keep architecture docs current + +--- + +## ๐Ÿ’ก Innovation Opportunities + +### Machine Learning Integration +- Predictive models for data quality +- Anomaly detection in market data +- Strategy parameter optimization + +### Real-time Processing +- Stream processing with Kafka/Pulsar +- Event-driven architecture +- WebSocket data feeds + +### Advanced Analytics +- Market microstructure analysis +- Alternative data integration +- Cross-asset correlation analysis + +--- + +*This roadmap is a living document that will evolve as we learn and adapt. Focus remains on building solid foundations before adding complexity.* + +**Next Review**: End of June 2025 diff --git a/apps/data-service/src/providers/proxy.provider.ts b/apps/data-service/src/providers/proxy.provider.ts index c4129d9..d154e5a 100644 --- a/apps/data-service/src/providers/proxy.provider.ts +++ b/apps/data-service/src/providers/proxy.provider.ts @@ -19,112 +19,205 @@ export const proxyProvider: ProviderConfig = { operations: { 'fetch-and-check': async (payload: { sources?: string[] }) => { const { proxyService } = await import('./proxy.tasks'); + const { queueManager } = await import('../services/queue.service'); + + await queueManager.drainQueue(); + const proxies = await proxyService.fetchProxiesFromSources(); const proxiesCount = proxies.length; - // Get the actual proxies to create individual jobs - if (proxiesCount > 0) { - try { - const { queueManager } = await import('../services/queue.service'); - if (proxies && proxies.length > 0) { - // Calculate delay distribution over 24 hours - const totalDelayMs = 24 * 60 * 60 * 1000; // 24 hours in milliseconds - const delayPerProxy = Math.floor(totalDelayMs / proxies.length); + + if (proxiesCount === 0) { + logger.info('No proxies fetched, skipping job creation'); + return { proxiesFetched: 0, batchJobsCreated: 0 }; + } + + try { + // Optimized batch size for 800k proxies + const batchSize = 200; // Process 200 proxies per batch job + const totalBatches = Math.ceil(proxies.length / batchSize); + const totalDelayMs = 24 * 60 * 60 * 1000; // 24 hours + const delayPerBatch = Math.floor(totalDelayMs / totalBatches); + + logger.info('Creating proxy validation batch jobs', { + totalProxies: proxies.length, + batchSize, + totalBatches, + delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes`, + estimatedCompletion: '24 hours' + }); + + // Process batches in chunks to avoid memory issues + const batchCreationChunkSize = 50; // Create 50 batch jobs at a time + let batchJobsCreated = 0; + + for (let chunkStart = 0; chunkStart < totalBatches; chunkStart += batchCreationChunkSize) { + const chunkEnd = Math.min(chunkStart + batchCreationChunkSize, totalBatches); + + // Create batch jobs in parallel for this chunk + const batchPromises = []; + + for (let i = chunkStart; i < chunkEnd; i++) { + const startIndex = i * batchSize; + const endIndex = Math.min(startIndex + batchSize, proxies.length); + const batchProxies = proxies.slice(startIndex, endIndex); + const delay = i * delayPerBatch; - logger.info('Creating individual proxy validation jobs', { - proxyCount: proxies.length, - distributionPeriod: '24 hours', - delayPerProxy: `${(delayPerProxy / 1000 / 60).toFixed(2)} minutes` + const batchPromise = queueManager.addJob({ + type: 'proxy-batch-validation', + service: 'proxy', + provider: 'proxy-service', + operation: 'process-proxy-batch', + payload: { + proxies: batchProxies, + batchIndex: i, + totalBatches, + source: 'fetch-and-check' + }, + priority: 3 + }, { + delay: delay, + jobId: `proxy-batch-${i}-${Date.now()}` }); - let queuedCount = 0; - - for (let i = 0; i < proxies.length; i++) { - const proxy = proxies[i]; - const delay = i * delayPerProxy; - - try { - await queueManager.addJob({ - type: 'proxy-validation', - service: 'proxy', - provider: 'proxy-service', - operation: 'check-proxy', - payload: { - proxy: proxy, - source: 'fetch-and-check', - autoTriggered: true, - batchIndex: i, - totalBatch: proxies.length - }, - priority: 3 - }, { - delay: delay - }); - - queuedCount++; - // Log progress every 100 jobs - if ((i + 1) % 100 === 0 || i === proxies.length - 1) { - logger.info('Proxy validation jobs queued progress', { - queued: i + 1, - total: proxies.length, - percentage: `${((i + 1) / proxies.length * 100).toFixed(1)}%` - }); - } - - } catch (error) { - logger.error('Failed to queue proxy validation job', { - proxy: `${proxy.host}:${proxy.port}`, - batchIndex: i, - error: error instanceof Error ? error.message : String(error) - }); - } } - - logger.info('Proxy validation jobs queuing completed', { - total: proxies.length, - successful: queuedCount, - failed: proxies.length - queuedCount, - totalDelay: `${(totalDelayMs / 1000 / 60 / 60).toFixed(1)} hours`, - avgDelayPerJob: `${(delayPerProxy / 1000 / 60).toFixed(2)} minutes` - }); - - return { - proxiesFetched: proxiesCount, - jobsQueued: queuedCount, - totalDelay: `${(totalDelayMs / 1000 / 60 / 60).toFixed(1)} hours`, - avgDelayPerJob: `${(delayPerProxy / 1000 / 60).toFixed(2)} minutes` - }; - } else { - logger.warn('No proxies found to create validation jobs', { - proxiesFetched: proxiesCount - }); - return { - proxiesFetched: proxiesCount, - jobsQueued: 0, - message: 'No cached proxies found' - }; + batchPromises.push(batchPromise); } - } catch (error) { - logger.error('Failed to create individual proxy validation jobs', { - proxiesCount, - error: error instanceof Error ? error.message : String(error) + // Wait for this chunk to complete + const results = await Promise.allSettled(batchPromises); + const successful = results.filter(r => r.status === 'fulfilled').length; + const failed = results.filter(r => r.status === 'rejected').length; + + batchJobsCreated += successful; + + logger.info('Batch chunk created', { + chunkStart: chunkStart + 1, + chunkEnd, + totalChunks: Math.ceil(totalBatches / batchCreationChunkSize), + successful, + failed, + totalCreated: batchJobsCreated, + progress: `${((chunkEnd / totalBatches) * 100).toFixed(1)}%` }); - return { - proxiesFetched: proxiesCount, - jobsQueued: 0, - error: error instanceof Error ? error.message : String(error) - }; } - } else { logger.info('No proxies fetched, skipping job creation'); + + // Small delay between chunks to prevent overwhelming Redis + if (chunkEnd < totalBatches) { + await new Promise(resolve => setTimeout(resolve, 100)); + } + } + + logger.info('All batch jobs creation completed', { + totalProxies: proxies.length, + batchJobsCreated, + totalBatches, + avgProxiesPerBatch: Math.floor(proxies.length / totalBatches), + estimatedDuration: '24 hours' + }); + return { - proxiesFetched: 0, - jobsQueued: 0, - message: 'No proxies fetched' + proxiesFetched: proxiesCount, + batchJobsCreated, + totalBatches, + avgProxiesPerBatch: Math.floor(proxies.length / totalBatches) + }; + + } catch (error) { + logger.error('Failed to create batch jobs', { + proxiesCount, + error: error instanceof Error ? error.message : String(error) + }); + throw error; + } + }, + + 'process-proxy-batch': async (payload: { + proxies: ProxyInfo[], + batchIndex: number, + totalBatches: number, + source: string + }) => { + const { queueManager } = await import('../services/queue.service'); + + logger.info('Processing proxy batch', { + batchIndex: payload.batchIndex, + batchSize: payload.proxies.length, + totalBatches: payload.totalBatches, + progress: `${((payload.batchIndex + 1) / payload.totalBatches * 100).toFixed(2)}%` + }); + + const batchDelayMs = 15 * 60 * 1000; // 15 minutes per batch + const delayPerProxy = Math.floor(batchDelayMs / payload.proxies.length); + + logger.info('Batch timing calculated', { + batchIndex: payload.batchIndex, + proxiesInBatch: payload.proxies.length, + batchDurationMinutes: 30, + delayPerProxySeconds: Math.floor(delayPerProxy / 1000), + delayPerProxyMs: delayPerProxy + }); + + // Use BullMQ's addBulk for better performance + const jobsToCreate = payload.proxies.map((proxy, i) => ({ + name: 'proxy-validation', + data: { + type: 'proxy-validation', + service: 'proxy', + provider: 'proxy-service', + operation: 'check-proxy', + payload: { + proxy: proxy, + source: payload.source, + batchIndex: payload.batchIndex, + proxyIndexInBatch: i, + totalBatch: payload.totalBatches + }, + priority: 2 + }, + opts: { + delay: i * delayPerProxy, + jobId: `proxy-${proxy.host}-${proxy.port}-batch${payload.batchIndex}-${Date.now()}-${i}`, + removeOnComplete: 3, + removeOnFail: 5 + } + })); + + try { + const jobs = await queueManager.addBulk(jobsToCreate); + + logger.info('Batch processing completed successfully', { + batchIndex: payload.batchIndex, + totalProxies: payload.proxies.length, + jobsCreated: jobs.length, + batchDelay: '15 minutes', + progress: `${((payload.batchIndex + 1) / payload.totalBatches * 100).toFixed(2)}%` + }); + + return { + batchIndex: payload.batchIndex, + totalProxies: payload.proxies.length, + jobsCreated: jobs.length, + jobsFailed: 0 + }; + } catch (error) { + logger.error('Failed to create validation jobs for batch', { + batchIndex: payload.batchIndex, + batchSize: payload.proxies.length, + error: error instanceof Error ? error.message : String(error) + }); + + return { + batchIndex: payload.batchIndex, + totalProxies: payload.proxies.length, + jobsCreated: 0, + jobsFailed: payload.proxies.length }; } }, - 'check-proxy': async (payload: { + + 'check-proxy': async (payload: { proxy: ProxyInfo, source?: string, batchIndex?: number, + proxyIndexInBatch?: number, totalBatch?: number }) => { const { checkProxy } = await import('./proxy.tasks'); @@ -132,7 +225,7 @@ export const proxyProvider: ProviderConfig = { logger.debug('Checking individual proxy', { proxy: `${payload.proxy.host}:${payload.proxy.port}`, batchIndex: payload.batchIndex, - totalBatch: payload.totalBatch, + proxyIndex: payload.proxyIndexInBatch, source: payload.source }); @@ -148,12 +241,13 @@ export const proxyProvider: ProviderConfig = { return { result: result, batchInfo: { - index: payload.batchIndex, + batchIndex: payload.batchIndex, + proxyIndex: payload.proxyIndexInBatch, total: payload.totalBatch, source: payload.source } }; - }, + } }, scheduledJobs: [ diff --git a/apps/data-service/src/services/queue.service.ts b/apps/data-service/src/services/queue.service.ts index 3edd4e0..911499a 100644 --- a/apps/data-service/src/services/queue.service.ts +++ b/apps/data-service/src/services/queue.service.ts @@ -47,7 +47,7 @@ export class QueueService { }; // Worker configuration - const workerCount = parseInt(process.env.WORKER_COUNT || '4'); + const workerCount = parseInt(process.env.WORKER_COUNT || '5'); const concurrencyPerWorker = parseInt(process.env.WORKER_CONCURRENCY || '20'); this.logger.info('Connecting to Redis/Dragonfly', connection); @@ -180,6 +180,10 @@ export class QueueService { throw error; } } + + async addBulk(jobs: any[]) : Promise { + return await this.queue.addBulk(jobs) + } private setupEventListeners() { this.queueEvents.on('completed', (job) => { this.logger.info('Job completed', { id: job.jobId }); @@ -396,6 +400,13 @@ export class QueueService { delayed: delayed.length }; } + + async drainQueue() { + if (!this.isInitialized) { + await this.queue.drain() + } + } + async getQueueStatus() { if (!this.isInitialized) { throw new Error('Queue service not initialized. Call initialize() first.'); @@ -412,12 +423,14 @@ export class QueueService { } }; } + getWorkerCount() { if (!this.isInitialized) { return 0; } return this.workers.length; } + getRegisteredProviders() { return providerRegistry.getProviders().map(({ key, config }) => ({ key, diff --git a/libs/data-adjustments/package.json b/libs/data-adjustments/package.json new file mode 100644 index 0000000..a41bdd9 --- /dev/null +++ b/libs/data-adjustments/package.json @@ -0,0 +1,24 @@ +{ + "name": "@stock-bot/data-adjustments", + "version": "1.0.0", + "description": "Stock split and dividend adjustment utilities for market data", + "type": "module", + "main": "dist/index.js", + "types": "dist/index.d.ts", + "scripts": { + "build": "tsc", + "test": "bun test", + "test:watch": "bun test --watch" + }, + "dependencies": { + "@stock-bot/types": "*", + "@stock-bot/logger": "*" + }, + "devDependencies": { + "typescript": "^5.4.5", + "bun-types": "^1.1.12" + }, + "peerDependencies": { + "typescript": "^5.0.0" + } +}