diff --git a/apps/data-service/src/index.ts b/apps/data-service/src/index.ts index d367615..ec14a64 100644 --- a/apps/data-service/src/index.ts +++ b/apps/data-service/src/index.ts @@ -6,8 +6,6 @@ import { loadEnvVariables } from '@stock-bot/config'; import { Hono } from 'hono'; import { serve } from '@hono/node-server'; import { queueManager } from './services/queue.service'; -import { proxyService } from './services/proxy.service'; -import { marketDataProvider } from './providers/market-data.provider'; // Load environment variables loadEnvVariables(); @@ -56,12 +54,23 @@ app.get('/api/live/:symbol', async (c) => { const symbol = c.req.param('symbol'); logger.info('Live data request', { symbol }); - try { - const data = await marketDataProvider.getLiveData(symbol); - return c.json({ status: 'success', symbol, data }); + try { // Queue job for live data using Yahoo provider + const job = await queueManager.addJob({ + type: 'market-data-live', + service: 'market-data', + provider: 'yahoo-finance', + operation: 'live-data', + payload: { symbol } + }); + return c.json({ + status: 'success', + message: 'Live data job queued', + jobId: job.id, + symbol + }); } catch (error) { - logger.error('Failed to get live data', { symbol, error }); - return c.json({ status: 'error', message: 'Failed to get live data' }, 500); + logger.error('Failed to queue live data job', { symbol, error }); + return c.json({ status: 'error', message: 'Failed to queue live data job' }, 500); } }); @@ -75,20 +84,47 @@ app.get('/api/historical/:symbol', async (c) => { try { const fromDate = from ? new Date(from) : new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); // 30 days ago const toDate = to ? new Date(to) : new Date(); // Now - - const data = await marketDataProvider.getHistoricalData(symbol, fromDate, toDate); - return c.json({ status: 'success', symbol, from, to, data }); + // Queue job for historical data using Yahoo provider + const job = await queueManager.addJob({ + type: 'market-data-historical', + service: 'market-data', + provider: 'yahoo-finance', + operation: 'historical-data', + payload: { + symbol, + from: fromDate.toISOString(), + to: toDate.toISOString() + } + }); return c.json({ + status: 'success', + message: 'Historical data job queued', + jobId: job.id, + symbol, + from: fromDate, + to: toDate + }); } catch (error) { - logger.error('Failed to get historical data', { symbol, from, to, error }); - return c.json({ status: 'error', message: 'Failed to get historical data' }, 500); - } + logger.error('Failed to queue historical data job', { symbol, from, to, error }); + return c.json({ status: 'error', message: 'Failed to queue historical data job' }, 500); } }); // Proxy management endpoints app.post('/api/proxy/fetch', async (c) => { try { - const jobId = await proxyService.queueProxyFetch(); - return c.json({ status: 'success', jobId, message: 'Proxy fetch job queued' }); + const job = await queueManager.addJob({ + type: 'proxy-fetch', + service: 'proxy', + provider: 'proxy-service', + operation: 'fetch-and-check', + payload: {}, + priority: 5 + }); + + return c.json({ + status: 'success', + jobId: job.id, + message: 'Proxy fetch job queued' + }); } catch (error) { logger.error('Failed to queue proxy fetch', { error }); return c.json({ status: 'error', message: 'Failed to queue proxy fetch' }, 500); @@ -98,14 +134,49 @@ app.post('/api/proxy/fetch', async (c) => { app.post('/api/proxy/check', async (c) => { try { const { proxies } = await c.req.json(); - const jobId = await proxyService.queueProxyCheck(proxies); - return c.json({ status: 'success', jobId, message: 'Proxy check job queued' }); + const job = await queueManager.addJob({ + type: 'proxy-check', + service: 'proxy', + provider: 'proxy-service', + operation: 'check-specific', + payload: { proxies }, + priority: 8 + }); + + return c.json({ + status: 'success', + jobId: job.id, + message: `Proxy check job queued for ${proxies.length} proxies` + }); } catch (error) { logger.error('Failed to queue proxy check', { error }); return c.json({ status: 'error', message: 'Failed to queue proxy check' }, 500); } }); +// Get proxy stats via queue +app.get('/api/proxy/stats', async (c) => { + try { + const job = await queueManager.addJob({ + type: 'proxy-stats', + service: 'proxy', + provider: 'proxy-service', + operation: 'get-stats', + payload: {}, + priority: 3 + }); + + return c.json({ + status: 'success', + jobId: job.id, + message: 'Proxy stats job queued' + }); + } catch (error) { + logger.error('Failed to queue proxy stats', { error }); + return c.json({ status: 'error', message: 'Failed to queue proxy stats' }, 500); + } +}); + // Provider registry endpoints app.get('/api/providers', async (c) => { try { diff --git a/apps/data-service/src/providers/market-data.provider.ts b/apps/data-service/src/providers/market-data.provider.ts deleted file mode 100644 index 6c0a2d0..0000000 --- a/apps/data-service/src/providers/market-data.provider.ts +++ /dev/null @@ -1,179 +0,0 @@ -import { Logger } from '@stock-bot/logger'; -import { HttpClient } from '@stock-bot/http'; -import createCache, { type CacheProvider } from '@stock-bot/cache'; - -export interface MarketDataResponse { - symbol: string; - price: number; - timestamp: Date; - volume?: number; - change?: number; - open?: number; - high?: number; - low?: number; - close?: number; -} - -export class MarketDataProvider { - private logger = new Logger('market-data-provider'); - private httpClient: HttpClient; - private cache: CacheProvider = createCache('hybrid'); - private readonly CACHE_TTL = 60; // 1 minute - - constructor() { - this.httpClient = new HttpClient({ - timeout: 10000, - }, this.logger); - } - async getLiveData(symbol: string): Promise { - const cacheKey = `market-data:${symbol}`; - - try { - // Check cache first - const cached = await this.cache.get(cacheKey) as MarketDataResponse | null; - if (cached) { - this.logger.debug('Returning cached market data', { symbol }); - return cached; - } - - // Generate simulated data for demo - const data = this.generateSimulatedData(symbol); - - // Cache the result - await this.cache.set(cacheKey, data, this.CACHE_TTL); - - this.logger.info('Generated live market data', { symbol, price: data.price }); - return data; - } catch (error) { - this.logger.error('Error fetching market data', { symbol, error }); - throw error; - } - } - async getHistoricalData(symbol: string, from: Date, to: Date, interval: string = '1m'): Promise { - const cacheKey = `historical:${symbol}:${from.toISOString()}:${to.toISOString()}:${interval}`; - - try { - const cached = await this.cache.get(cacheKey) as MarketDataResponse[] | null; - if (cached) { - this.logger.debug('Returning cached historical data', { symbol, from, to }); - return cached; - } - - // Generate simulated historical data - const data = this.generateHistoricalData(symbol, from, to, interval); - - // Cache for longer time (1 hour) - await this.cache.set(cacheKey, data, 3600); - - this.logger.info('Generated historical market data', { symbol, from, to, count: data.length }); - return data; - } catch (error) { - this.logger.error('Error fetching historical data', { symbol, error }); - throw error; - } - } - - private generateSimulatedData(symbol: string): MarketDataResponse { - // Base prices for different symbols - const basePrices: { [key: string]: number } = { - 'AAPL': 150, - 'GOOGL': 2500, - 'MSFT': 300, - 'TSLA': 200, - 'AMZN': 3000, - 'NVDA': 400, - 'META': 250, - 'NFLX': 400 - }; - - const basePrice = basePrices[symbol] || 100; - - // Add some randomness (+/- 2%) - const variation = (Math.random() - 0.5) * 0.04; // ±2% - const price = basePrice * (1 + variation); - const change = variation * basePrice; - - return { - symbol, - price: Math.round(price * 100) / 100, - timestamp: new Date(), - volume: Math.floor(Math.random() * 1000000) + 500000, - change: Math.round(change * 100) / 100, - open: Math.round((price - change) * 100) / 100, - high: Math.round((price + Math.abs(change * 0.5)) * 100) / 100, - low: Math.round((price - Math.abs(change * 0.5)) * 100) / 100, - close: Math.round(price * 100) / 100 - }; - } - - private generateHistoricalData(symbol: string, from: Date, to: Date, interval: string): MarketDataResponse[] { - const data: MarketDataResponse[] = []; - const intervalMs = this.parseInterval(interval); - - let currentTime = new Date(from); - const endTime = new Date(to); - - // Base price for the symbol - const basePrices: { [key: string]: number } = { - 'AAPL': 150, - 'GOOGL': 2500, - 'MSFT': 300, - 'TSLA': 200, - 'AMZN': 3000, - 'NVDA': 400, - 'META': 250, - 'NFLX': 400 - }; - - let basePrice = basePrices[symbol] || 100; - - while (currentTime <= endTime) { - // Add some trend and randomness - const trend = (Math.random() - 0.5) * 0.01; // Small trend - const variation = (Math.random() - 0.5) * 0.02; // Random variation - - basePrice = basePrice * (1 + trend + variation); - const change = basePrice * variation; - - data.push({ - symbol, - price: Math.round(basePrice * 100) / 100, - timestamp: new Date(currentTime), - volume: Math.floor(Math.random() * 1000000) + 500000, - change: Math.round(change * 100) / 100, - open: Math.round((basePrice - change) * 100) / 100, - high: Math.round((basePrice + Math.abs(change * 0.5)) * 100) / 100, - low: Math.round((basePrice - Math.abs(change * 0.5)) * 100) / 100, - close: Math.round(basePrice * 100) / 100 - }); - - currentTime = new Date(currentTime.getTime() + intervalMs); - } - - return data; - } - - private parseInterval(interval: string): number { - const value = parseInt(interval.slice(0, -1)); - const unit = interval.slice(-1).toLowerCase(); - - switch (unit) { - case 's': return value * 1000; - case 'm': return value * 60 * 1000; - case 'h': return value * 60 * 60 * 1000; - case 'd': return value * 24 * 60 * 60 * 1000; - default: return 60 * 1000; // Default to 1 minute - } - } - - async clearCache(): Promise { - this.logger.info('Clearing market data cache'); - // Note: Cache provider limitations - would need proper key tracking - } - - async shutdown(): Promise { - this.logger.info('Shutting down MarketDataProvider'); - } -} - -export const marketDataProvider = new MarketDataProvider(); diff --git a/apps/data-service/src/providers/proxy.provider.ts b/apps/data-service/src/providers/proxy.provider.ts new file mode 100644 index 0000000..0e4d5fe --- /dev/null +++ b/apps/data-service/src/providers/proxy.provider.ts @@ -0,0 +1,42 @@ +import { ProviderConfig } from '../services/provider-registry.service'; + +export const proxyProvider: ProviderConfig = { + name: 'proxy-service', + service: 'proxy', + operations: { + 'fetch-and-check': async (payload: { sources?: string[] }) => { + const { proxyService } = await import('../services/proxy.service'); + return await proxyService.fetchProxiesFromSources(); + }, + + // 'check-specific': async (payload: { proxies: any[] }) => { + // const { proxyService } = await import('../services/proxy.service'); + // return await proxyService.checkProxies(payload.proxies); + // }, + + // 'get-stats': async (payload: { includeDetails?: boolean }) => { + // const { proxyService } = await import('../services/proxy.service'); + // return await proxyService.getProxyStats(payload.includeDetails); + // }, + + // 'cleanup-old-data': async (payload: { daysToKeep?: number }) => { + // const { proxyService } = await import('../services/proxy.service'); + // return await proxyService.cleanupOldData(payload.daysToKeep || 7); + // }, + + // 'get-working-proxy': async (payload: { protocol?: string; country?: string; timeout?: number }) => { + // const { proxyService } = await import('../services/proxy.service'); + // return await proxyService.getWorkingProxy(payload); + // }, + + // 'validate-proxy': async (payload: { proxy: any; testUrl?: string }) => { + // const { proxyService } = await import('../services/proxy.service'); + // return await proxyService.validateProxy(payload.proxy, payload.testUrl); + // }, + + // 'rotate-proxies': async (payload: { count?: number }) => { + // const { proxyService } = await import('../services/proxy.service'); + // return await proxyService.rotateProxies(payload.count || 5); + // } + } +}; diff --git a/apps/data-service/src/providers/quotemedia.provider.ts b/apps/data-service/src/providers/quotemedia.provider.ts new file mode 100644 index 0000000..e892b5e --- /dev/null +++ b/apps/data-service/src/providers/quotemedia.provider.ts @@ -0,0 +1,168 @@ +import { ProviderConfig } from '../services/provider-registry.service'; + +export const quotemediaProvider: ProviderConfig = { + name: 'quotemedia', + service: 'market-data', + operations: { + 'live-data': async (payload: { symbol: string; fields?: string[] }) => { + const { Logger } = await import('@stock-bot/logger'); + const logger = new Logger('quotemedia-provider'); + + logger.info('Fetching live data from QuoteMedia', { symbol: payload.symbol }); + + // Simulate QuoteMedia API call + const mockData = { + symbol: payload.symbol, + price: Math.random() * 1000 + 100, + volume: Math.floor(Math.random() * 1000000), + change: (Math.random() - 0.5) * 20, + changePercent: (Math.random() - 0.5) * 5, + timestamp: new Date().toISOString(), + source: 'quotemedia', + fields: payload.fields || ['price', 'volume', 'change'] + }; + + // Simulate network delay + await new Promise(resolve => setTimeout(resolve, 100 + Math.random() * 200)); + + return mockData; + }, + + 'historical-data': async (payload: { + symbol: string; + from: Date; + to: Date; + interval?: string; + fields?: string[]; + }) => { + const { Logger } = await import('@stock-bot/logger'); + const logger = new Logger('quotemedia-provider'); + + logger.info('Fetching historical data from QuoteMedia', { + symbol: payload.symbol, + from: payload.from, + to: payload.to, + interval: payload.interval || '1d' + }); + + // Generate mock historical data + const days = Math.ceil((payload.to.getTime() - payload.from.getTime()) / (1000 * 60 * 60 * 24)); + const data = []; + + for (let i = 0; i < Math.min(days, 100); i++) { + const date = new Date(payload.from.getTime() + i * 24 * 60 * 60 * 1000); + data.push({ + date: date.toISOString().split('T')[0], + open: Math.random() * 1000 + 100, + high: Math.random() * 1000 + 100, + low: Math.random() * 1000 + 100, + close: Math.random() * 1000 + 100, + volume: Math.floor(Math.random() * 1000000), + source: 'quotemedia' + }); + } + + // Simulate network delay + await new Promise(resolve => setTimeout(resolve, 200 + Math.random() * 300)); + + return { + symbol: payload.symbol, + interval: payload.interval || '1d', + data, + source: 'quotemedia', + totalRecords: data.length + }; + }, + + 'batch-quotes': async (payload: { symbols: string[]; fields?: string[] }) => { + const { Logger } = await import('@stock-bot/logger'); + const logger = new Logger('quotemedia-provider'); + + logger.info('Fetching batch quotes from QuoteMedia', { + symbols: payload.symbols, + count: payload.symbols.length + }); + + const quotes = payload.symbols.map(symbol => ({ + symbol, + price: Math.random() * 1000 + 100, + volume: Math.floor(Math.random() * 1000000), + change: (Math.random() - 0.5) * 20, + timestamp: new Date().toISOString(), + source: 'quotemedia' + })); + + // Simulate network delay + await new Promise(resolve => setTimeout(resolve, 300 + Math.random() * 200)); + + return { + quotes, + source: 'quotemedia', + timestamp: new Date().toISOString(), + totalSymbols: payload.symbols.length + }; + }, + + 'company-profile': async (payload: { symbol: string }) => { + const { Logger } = await import('@stock-bot/logger'); + const logger = new Logger('quotemedia-provider'); + + logger.info('Fetching company profile from QuoteMedia', { symbol: payload.symbol }); + + // Simulate company profile data + const profile = { + symbol: payload.symbol, + companyName: `${payload.symbol} Corporation`, + sector: 'Technology', + industry: 'Software', + description: `${payload.symbol} is a leading technology company.`, + marketCap: Math.floor(Math.random() * 1000000000000), + employees: Math.floor(Math.random() * 100000), + website: `https://www.${payload.symbol.toLowerCase()}.com`, + source: 'quotemedia' + }; + + await new Promise(resolve => setTimeout(resolve, 150 + Math.random() * 100)); + + return profile; + }, + + 'options-chain': async (payload: { symbol: string; expiration?: string }) => { + const { Logger } = await import('@stock-bot/logger'); + const logger = new Logger('quotemedia-provider'); + + logger.info('Fetching options chain from QuoteMedia', { + symbol: payload.symbol, + expiration: payload.expiration + }); + + // Generate mock options data + const strikes = Array.from({ length: 20 }, (_, i) => 100 + i * 5); + const calls = strikes.map(strike => ({ + strike, + bid: Math.random() * 10, + ask: Math.random() * 10 + 0.5, + volume: Math.floor(Math.random() * 1000), + openInterest: Math.floor(Math.random() * 5000) + })); + + const puts = strikes.map(strike => ({ + strike, + bid: Math.random() * 10, + ask: Math.random() * 10 + 0.5, + volume: Math.floor(Math.random() * 1000), + openInterest: Math.floor(Math.random() * 5000) + })); + + await new Promise(resolve => setTimeout(resolve, 400 + Math.random() * 300)); + + return { + symbol: payload.symbol, + expiration: payload.expiration || new Date(Date.now() + 30 * 24 * 60 * 60 * 1000).toISOString().split('T')[0], + calls, + puts, + source: 'quotemedia' + }; + } + } +}; diff --git a/apps/data-service/src/providers/unified.ts b/apps/data-service/src/providers/unified.ts deleted file mode 100644 index ce74edb..0000000 --- a/apps/data-service/src/providers/unified.ts +++ /dev/null @@ -1,364 +0,0 @@ -/** - * Unified data interface for live and historical data - */ -import { getLogger } from '@stock-bot/logger'; -import { QuestDBClient } from '@stock-bot/questdb-client'; -import { EventBus } from '@stock-bot/event-bus'; - -const logger = getLogger('unified-data-provider'); - -export interface MarketData { - symbol: string; - timestamp: Date; - open: number; - high: number; - low: number; - close: number; - volume: number; - source?: string; -} - -export interface DataProviderConfig { - questdb?: { - host: string; - port: number; - }; - enableCache?: boolean; - cacheSize?: number; -} - -export interface DataProvider { - getLiveData(symbol: string): Promise; - getHistoricalData(symbol: string, from: Date, to: Date, interval?: string): Promise; - subscribeLiveData(symbol: string, callback: (data: MarketData) => void): Promise; - unsubscribeLiveData(symbol: string): Promise; -} - -export class UnifiedDataProvider implements DataProvider { - private questdb?: QuestDBClient; - private eventBus: EventBus; - private cache: Map = new Map(); - private liveSubscriptions: Map void)[]> = new Map(); - private config: DataProviderConfig; - - constructor(eventBus: EventBus, config: DataProviderConfig = {}) { - this.eventBus = eventBus; - this.config = { - enableCache: true, - cacheSize: 10000, - ...config - }; - - // Initialize QuestDB client - if (config.questdb) { - this.questdb = new QuestDBClient(); - } - - this.initializeEventSubscriptions(); - } - - private initializeEventSubscriptions(): void { - // Subscribe to live market data events - this.eventBus.subscribe('market.data.live', (message) => { - const data = message.data as MarketData; - this.handleLiveData(data); - }); - - // Subscribe to historical data requests - this.eventBus.subscribe('market.data.request', async (message) => { - const { symbol, from, to, interval, requestId } = message.data; - try { - const data = await this.getHistoricalData(symbol, new Date(from), new Date(to), interval); - await this.eventBus.publish('market.data.response', { - requestId, - symbol, - data, - status: 'success' - }); - } catch (error) { - await this.eventBus.publish('market.data.response', { - requestId, - symbol, - error: (error as Error).message, - status: 'error' - }); - } - }); - } - - async getLiveData(symbol: string): Promise { - logger.info('Fetching live data', { symbol }); - - try { - // First check cache for recent data - if (this.config.enableCache) { - const cached = this.getFromCache(symbol); - if (cached && this.isRecentData(cached)) { - return cached; - } - } - - // For demo purposes, generate simulated live data - // In production, this would integrate with real market data providers - const liveData = this.generateSimulatedData(symbol); - - // Store in cache - if (this.config.enableCache) { - this.addToCache(symbol, liveData); - } - - // Publish live data event - await this.eventBus.publish('marketData', liveData); - - return liveData; - - } catch (error) { - logger.error('Failed to fetch live data', { symbol, error }); - throw error; - } - } - - async getHistoricalData(symbol: string, from: Date, to: Date, interval: string = '1m'): Promise { - logger.info('Fetching historical data', { symbol, from, to, interval }); - - try { - // Check cache first - const cacheKey = `${symbol}_${from.getTime()}_${to.getTime()}_${interval}`; - if (this.config.enableCache && this.cache.has(cacheKey)) { - logger.debug('Returning cached historical data', { symbol, cacheKey }); - return this.cache.get(cacheKey)!; - } - - // Try to fetch from QuestDB - let data: MarketData[] = []; - - if (this.questdb) { - try { - data = await this.fetchFromQuestDB(symbol, from, to, interval); - } catch (error) { - logger.warn('Failed to fetch from QuestDB, generating simulated data', error); - data = this.generateHistoricalData(symbol, from, to, interval); - } - } else { - // Generate simulated data if no QuestDB connection - data = this.generateHistoricalData(symbol, from, to, interval); - } - - // Store in cache - if (this.config.enableCache) { - this.cache.set(cacheKey, data); - this.trimCache(); - } - - return data; - - } catch (error) { - logger.error('Failed to fetch historical data', { symbol, from, to, error }); - throw error; - } - } - - async subscribeLiveData(symbol: string, callback: (data: MarketData) => void): Promise { - logger.info('Subscribing to live data', { symbol }); - - if (!this.liveSubscriptions.has(symbol)) { - this.liveSubscriptions.set(symbol, []); - } - - this.liveSubscriptions.get(symbol)!.push(callback); - - // Start live data simulation for this symbol - this.startLiveDataSimulation(symbol); - } - - async unsubscribeLiveData(symbol: string): Promise { - logger.info('Unsubscribing from live data', { symbol }); - this.liveSubscriptions.delete(symbol); - } - private async fetchFromQuestDB(symbol: string, from: Date, to: Date, interval: string): Promise { - if (!this.questdb) { - throw new Error('QuestDB client not initialized'); - } - - const query = ` - SELECT symbol, timestamp, open, high, low, close, volume - FROM market_data - WHERE symbol = '${symbol}' - AND timestamp >= '${from.toISOString()}' - AND timestamp <= '${to.toISOString()}' - ORDER BY timestamp ASC - `; - - const result = await this.questdb.query(query); - - return result.rows.map(row => ({ - symbol: row.symbol, - timestamp: new Date(row.timestamp), - open: parseFloat(row.open), - high: parseFloat(row.high), - low: parseFloat(row.low), - close: parseFloat(row.close), - volume: parseInt(row.volume), - source: 'questdb' - })); - } - - private generateHistoricalData(symbol: string, from: Date, to: Date, interval: string): MarketData[] { - const data: MarketData[] = []; - const intervalMs = this.parseInterval(interval); - const current = new Date(from); - - let basePrice = 100; // Starting price - - while (current <= to) { - const timestamp = new Date(current); - - // Generate realistic OHLCV data with some randomness - const volatility = 0.02; - const change = (Math.random() - 0.5) * volatility; - - const open = basePrice; - const close = open * (1 + change); - const high = Math.max(open, close) * (1 + Math.random() * 0.01); - const low = Math.min(open, close) * (1 - Math.random() * 0.01); - const volume = Math.floor(Math.random() * 100000) + 10000; - - data.push({ - symbol, - timestamp, - open, - high, - low, - close, - volume, - source: 'simulated' - }); - - basePrice = close; - current.setTime(current.getTime() + intervalMs); - } - - return data; - } - - private generateSimulatedData(symbol: string): MarketData { - const now = new Date(); - const basePrice = 100 + Math.sin(now.getTime() / 1000000) * 10; - const volatility = 0.001; - - const open = basePrice + (Math.random() - 0.5) * volatility * basePrice; - const close = open + (Math.random() - 0.5) * volatility * basePrice; - const high = Math.max(open, close) + Math.random() * volatility * basePrice; - const low = Math.min(open, close) - Math.random() * volatility * basePrice; - const volume = Math.floor(Math.random() * 10000) + 1000; - - return { - symbol, - timestamp: now, - open, - high, - low, - close, - volume, - source: 'simulated' - }; - } - - private parseInterval(interval: string): number { - const value = parseInt(interval.slice(0, -1)); - const unit = interval.slice(-1).toLowerCase(); - - switch (unit) { - case 's': return value * 1000; - case 'm': return value * 60 * 1000; - case 'h': return value * 60 * 60 * 1000; - case 'd': return value * 24 * 60 * 60 * 1000; - default: return 60 * 1000; // Default to 1 minute - } - } - - private handleLiveData(data: MarketData): void { - const callbacks = this.liveSubscriptions.get(data.symbol); - if (callbacks) { - callbacks.forEach(callback => { - try { - callback(data); - } catch (error) { - logger.error('Error in live data callback', { symbol: data.symbol, error }); - } - }); - } - } - - private startLiveDataSimulation(symbol: string): void { - // Simulate live data updates every second - const interval = setInterval(async () => { - if (!this.liveSubscriptions.has(symbol)) { - clearInterval(interval); - return; - } - - try { - const liveData = this.generateSimulatedData(symbol); - this.handleLiveData(liveData); - await this.eventBus.publish('marketData', liveData); - } catch (error) { - logger.error('Error in live data simulation', { symbol, error }); - } - }, 1000); - } - - private getFromCache(symbol: string): MarketData | null { - // Get most recent cached data for symbol - for (const [key, data] of this.cache.entries()) { - if (key.startsWith(symbol) && data.length > 0) { - return data[data.length - 1]; - } - } - return null; - } - - private isRecentData(data: MarketData): boolean { - const now = Date.now(); - const dataTime = data.timestamp.getTime(); - return (now - dataTime) < 60000; // Data is recent if less than 1 minute old - } - - private addToCache(symbol: string, data: MarketData): void { - const key = `${symbol}_live`; - if (!this.cache.has(key)) { - this.cache.set(key, []); - } - - const cached = this.cache.get(key)!; - cached.push(data); - - // Keep only last 1000 data points for live data - if (cached.length > 1000) { - cached.splice(0, cached.length - 1000); - } - } - - private trimCache(): void { - if (this.cache.size > this.config.cacheSize!) { - // Remove oldest entries - const entries = Array.from(this.cache.entries()); - const toRemove = entries.slice(0, entries.length - this.config.cacheSize!); - toRemove.forEach(([key]) => this.cache.delete(key)); - } - } - - async close(): Promise { - if (this.questdb) { - await this.questdb.disconnect(); - } - this.cache.clear(); - this.liveSubscriptions.clear(); - logger.info('Unified data provider closed'); - } -} - -// Factory function -export function createUnifiedDataProvider(eventBus: EventBus, config?: DataProviderConfig): UnifiedDataProvider { - return new UnifiedDataProvider(eventBus, config); -} diff --git a/apps/data-service/src/providers/yahoo.provider.ts b/apps/data-service/src/providers/yahoo.provider.ts new file mode 100644 index 0000000..560ff57 --- /dev/null +++ b/apps/data-service/src/providers/yahoo.provider.ts @@ -0,0 +1,229 @@ +import { ProviderConfig } from '../services/provider-registry.service'; + +export const yahooProvider: ProviderConfig = { + name: 'yahoo-finance', + service: 'market-data', + operations: { + 'live-data': async (payload: { symbol: string; modules?: string[] }) => { + const { Logger } = await import('@stock-bot/logger'); + const logger = new Logger('yahoo-provider'); + + logger.info('Fetching live data from Yahoo Finance', { symbol: payload.symbol }); + + // Simulate Yahoo Finance API call + const mockData = { + symbol: payload.symbol, + regularMarketPrice: Math.random() * 1000 + 100, + regularMarketVolume: Math.floor(Math.random() * 1000000), + regularMarketChange: (Math.random() - 0.5) * 20, + regularMarketChangePercent: (Math.random() - 0.5) * 5, + preMarketPrice: Math.random() * 1000 + 100, + postMarketPrice: Math.random() * 1000 + 100, + marketCap: Math.floor(Math.random() * 1000000000000), + peRatio: Math.random() * 50 + 5, + dividendYield: Math.random() * 0.1, + fiftyTwoWeekHigh: Math.random() * 1200 + 100, + fiftyTwoWeekLow: Math.random() * 800 + 50, + timestamp: Date.now() / 1000, + source: 'yahoo-finance', + modules: payload.modules || ['price', 'summaryDetail'] + }; + + // Simulate network delay + await new Promise(resolve => setTimeout(resolve, 150 + Math.random() * 250)); + + return mockData; + }, + + 'historical-data': async (payload: { + symbol: string; + period1: number; + period2: number; + interval?: string; + events?: string; + }) => { + const { Logger } = await import('@stock-bot/logger'); + const logger = new Logger('yahoo-provider'); + + logger.info('Fetching historical data from Yahoo Finance', { + symbol: payload.symbol, + period1: payload.period1, + period2: payload.period2, + interval: payload.interval || '1d' + }); + + // Generate mock historical data + const days = Math.ceil((payload.period2 - payload.period1) / (24 * 60 * 60)); + const data = []; + + for (let i = 0; i < Math.min(days, 100); i++) { + const timestamp = payload.period1 + i * 24 * 60 * 60; + data.push({ + timestamp, + date: new Date(timestamp * 1000).toISOString().split('T')[0], + open: Math.random() * 1000 + 100, + high: Math.random() * 1000 + 100, + low: Math.random() * 1000 + 100, + close: Math.random() * 1000 + 100, + adjClose: Math.random() * 1000 + 100, + volume: Math.floor(Math.random() * 1000000), + source: 'yahoo-finance' + }); + } + + // Simulate network delay + await new Promise(resolve => setTimeout(resolve, 250 + Math.random() * 350)); + + return { + symbol: payload.symbol, + interval: payload.interval || '1d', + timestamps: data.map(d => d.timestamp), + indicators: { + quote: [{ + open: data.map(d => d.open), + high: data.map(d => d.high), + low: data.map(d => d.low), + close: data.map(d => d.close), + volume: data.map(d => d.volume) + }], + adjclose: [{ + adjclose: data.map(d => d.adjClose) + }] + }, + source: 'yahoo-finance', + totalRecords: data.length + }; + }, + + 'search': async (payload: { query: string; quotesCount?: number; newsCount?: number }) => { + const { Logger } = await import('@stock-bot/logger'); + const logger = new Logger('yahoo-provider'); + + logger.info('Searching Yahoo Finance', { query: payload.query }); + + // Generate mock search results + const quotes = Array.from({ length: payload.quotesCount || 5 }, (_, i) => ({ + symbol: `${payload.query.toUpperCase()}${i}`, + shortname: `${payload.query} Company ${i}`, + longname: `${payload.query} Corporation ${i}`, + exchDisp: 'NASDAQ', + typeDisp: 'Equity', + source: 'yahoo-finance' + })); + + const news = Array.from({ length: payload.newsCount || 3 }, (_, i) => ({ + uuid: `news-${i}-${Date.now()}`, + title: `${payload.query} News Article ${i}`, + publisher: 'Financial News', + providerPublishTime: Date.now() - i * 3600000, + type: 'STORY', + source: 'yahoo-finance' + })); + + await new Promise(resolve => setTimeout(resolve, 200 + Math.random() * 200)); + + return { + quotes, + news, + totalQuotes: quotes.length, + totalNews: news.length, + source: 'yahoo-finance' + }; + }, + + 'financials': async (payload: { symbol: string; type?: 'income' | 'balance' | 'cash' }) => { + const { Logger } = await import('@stock-bot/logger'); + const logger = new Logger('yahoo-provider'); + + logger.info('Fetching financials from Yahoo Finance', { + symbol: payload.symbol, + type: payload.type || 'income' + }); + + // Generate mock financial data + const financials = { + symbol: payload.symbol, + type: payload.type || 'income', + currency: 'USD', + annual: Array.from({ length: 4 }, (_, i) => ({ + fiscalYear: 2024 - i, + revenue: Math.floor(Math.random() * 100000000000), + netIncome: Math.floor(Math.random() * 10000000000), + totalAssets: Math.floor(Math.random() * 500000000000), + totalDebt: Math.floor(Math.random() * 50000000000) + })), + quarterly: Array.from({ length: 4 }, (_, i) => ({ + fiscalQuarter: `Q${4-i} 2024`, + revenue: Math.floor(Math.random() * 25000000000), + netIncome: Math.floor(Math.random() * 2500000000) + })), + source: 'yahoo-finance' + }; + + await new Promise(resolve => setTimeout(resolve, 300 + Math.random() * 200)); + + return financials; + }, + + 'earnings': async (payload: { symbol: string; period?: 'annual' | 'quarterly' }) => { + const { Logger } = await import('@stock-bot/logger'); + const logger = new Logger('yahoo-provider'); + + logger.info('Fetching earnings from Yahoo Finance', { + symbol: payload.symbol, + period: payload.period || 'quarterly' + }); + + // Generate mock earnings data + const earnings = { + symbol: payload.symbol, + period: payload.period || 'quarterly', + earnings: Array.from({ length: 8 }, (_, i) => ({ + quarter: `Q${(i % 4) + 1} ${2024 - Math.floor(i/4)}`, + epsEstimate: Math.random() * 5, + epsActual: Math.random() * 5, + revenueEstimate: Math.floor(Math.random() * 50000000000), + revenueActual: Math.floor(Math.random() * 50000000000), + surprise: (Math.random() - 0.5) * 2 + })), + source: 'yahoo-finance' + }; + + await new Promise(resolve => setTimeout(resolve, 250 + Math.random() * 150)); + + return earnings; + }, + + 'recommendations': async (payload: { symbol: string }) => { + const { Logger } = await import('@stock-bot/logger'); + const logger = new Logger('yahoo-provider'); + + logger.info('Fetching recommendations from Yahoo Finance', { symbol: payload.symbol }); + + // Generate mock recommendations + const recommendations = { + symbol: payload.symbol, + current: { + strongBuy: Math.floor(Math.random() * 10), + buy: Math.floor(Math.random() * 15), + hold: Math.floor(Math.random() * 20), + sell: Math.floor(Math.random() * 5), + strongSell: Math.floor(Math.random() * 3) + }, + trend: Array.from({ length: 4 }, (_, i) => ({ + period: `${i}m`, + strongBuy: Math.floor(Math.random() * 10), + buy: Math.floor(Math.random() * 15), + hold: Math.floor(Math.random() * 20), + sell: Math.floor(Math.random() * 5), + strongSell: Math.floor(Math.random() * 3) + })), + source: 'yahoo-finance' + }; + + await new Promise(resolve => setTimeout(resolve, 180 + Math.random() * 120)); + + return recommendations; + } + } +}; diff --git a/apps/data-service/src/services/provider-registry.service.ts b/apps/data-service/src/services/provider-registry.service.ts new file mode 100644 index 0000000..73b0d37 --- /dev/null +++ b/apps/data-service/src/services/provider-registry.service.ts @@ -0,0 +1,82 @@ +import { Logger } from '@stock-bot/logger'; + +export interface JobHandler { + (payload: any): Promise; +} + +export interface ProviderConfig { + name: string; + service: string; + operations: Record; +} + +export class ProviderRegistry { + private logger = new Logger('provider-registry'); + private providers = new Map(); + + /** + * Register a provider with its operations + */ + registerProvider(config: ProviderConfig): void { + const key = `${config.service}:${config.name}`; + this.providers.set(key, config); + this.logger.info(`Registered provider: ${key}`, { + operations: Object.keys(config.operations) + }); + } + + /** + * Get a job handler for a specific provider and operation + */ + getHandler(service: string, provider: string, operation: string): JobHandler | null { + const key = `${service}:${provider}`; + const providerConfig = this.providers.get(key); + + if (!providerConfig) { + this.logger.warn(`Provider not found: ${key}`); + return null; + } + + const handler = providerConfig.operations[operation]; + if (!handler) { + this.logger.warn(`Operation not found: ${operation} in provider ${key}`); + return null; + } + + return handler; + } + + /** + * Get all registered providers + */ + getProviders(): Array<{ key: string; config: ProviderConfig }> { + return Array.from(this.providers.entries()).map(([key, config]) => ({ + key, + config + })); + } + + /** + * Check if a provider exists + */ + hasProvider(service: string, provider: string): boolean { + return this.providers.has(`${service}:${provider}`); + } + + /** + * Get providers by service type + */ + getProvidersByService(service: string): ProviderConfig[] { + return Array.from(this.providers.values()).filter(provider => provider.service === service); + } + + /** + * Clear all providers (useful for testing) + */ + clear(): void { + this.providers.clear(); + this.logger.info('All providers cleared'); + } +} + +export const providerRegistry = new ProviderRegistry(); diff --git a/apps/data-service/src/services/queue.service.ts b/apps/data-service/src/services/queue.service.ts index 0022f57..5dff892 100644 --- a/apps/data-service/src/services/queue.service.ts +++ b/apps/data-service/src/services/queue.service.ts @@ -1,9 +1,10 @@ import { Queue, Worker, QueueEvents } from 'bullmq'; import { Logger } from '@stock-bot/logger'; +import { providerRegistry } from './provider-registry.service'; export interface JobData { - type: 'proxy-fetch' | 'proxy-check' | 'market-data' | 'historical-data'; - service: 'proxy' | 'market-data' | 'analytics'; + type: string; + service: string; provider: string; operation: string; payload: any; @@ -29,6 +30,9 @@ export class QueueService { this.logger.info('Initializing queue service...'); + // Register all providers first + await this.registerProviders(); + const connection = { host: process.env.DRAGONFLY_HOST || 'localhost', port: parseInt(process.env.DRAGONFLY_PORT || '6379'), @@ -42,65 +46,85 @@ export class QueueService { connection, concurrency: 10 }); - this.queueEvents = new QueueEvents('data-service-queue', { connection }); - - // Test connection + this.queueEvents = new QueueEvents('data-service-queue', { connection }); // Test connection await this.queue.waitUntilReady(); await this.worker.waitUntilReady(); await this.queueEvents.waitUntilReady(); this.setupEventListeners(); - this.setupScheduledTasks(); - + this.isInitialized = true; this.logger.info('Queue service initialized successfully'); + + await this.setupScheduledTasks(); } catch (error) { this.logger.error('Failed to initialize queue service', { error }); throw error; } } - private async processJob(job: any) { - const { type, service, provider, operation, payload }: JobData = job.data; + private async registerProviders() { + this.logger.info('Registering providers...'); - this.logger.info('Processing job', { id: job.id, type, service, provider, operation }); - try { - switch (type) { - case 'proxy-fetch': - return await this.handleProxyFetch(payload); - case 'proxy-check': - return await this.handleProxyCheck(payload); - case 'market-data': - return await this.handleMarketData(payload); - case 'historical-data': - return await this.handleHistoricalData(payload); - default: - throw new Error(`Unknown job type: ${type}`); - } + // Import and register all providers + const { proxyProvider } = await import('../providers/proxy.provider'); + const { quotemediaProvider } = await import('../providers/quotemedia.provider'); + const { yahooProvider } = await import('../providers/yahoo.provider'); + + providerRegistry.registerProvider(proxyProvider); + providerRegistry.registerProvider(quotemediaProvider); + providerRegistry.registerProvider(yahooProvider); + + this.logger.info('All providers registered successfully'); } catch (error) { - this.logger.error('Job failed', { id: job.id, type, error }); + this.logger.error('Failed to register providers', { error }); throw error; } } - private async handleProxyFetch(payload: any) { - const { proxyService } = await import('./proxy.service'); - return await proxyService.fetchProxiesFromSources(); - } + private async processJob(job: any) { + const { service, provider, operation, payload }: JobData = job.data; + + this.logger.info('Processing job', { + id: job.id, + service, + provider, + operation, + payloadKeys: Object.keys(payload || {}) + }); - private async handleProxyCheck(payload: { proxies: any[] }) { - const { proxyService } = await import('./proxy.service'); - return await proxyService.checkProxies(payload.proxies); - } - private async handleMarketData(payload: { symbol: string }) { - const { marketDataProvider } = await import('../providers/market-data.provider.js'); - return await marketDataProvider.getLiveData(payload.symbol); - } + try { + // Get handler from registry + const handler = providerRegistry.getHandler(service, provider, operation); + + if (!handler) { + throw new Error(`No handler found for ${service}:${provider}:${operation}`); + } - private async handleHistoricalData(payload: { symbol: string; from: Date; to: Date; interval: string }) { - const { marketDataProvider } = await import('../providers/market-data.provider.js'); - return await marketDataProvider.getHistoricalData(payload.symbol, payload.from, payload.to, payload.interval); + // Execute the handler + const result = await handler(payload); + + this.logger.info('Job completed successfully', { + id: job.id, + service, + provider, + operation + }); + + return result; + + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); + this.logger.error('Job failed', { + id: job.id, + service, + provider, + operation, + error: errorMessage + }); + throw error; + } } private setupEventListeners() { @@ -117,27 +141,50 @@ export class QueueService { }); } - private setupScheduledTasks() { - // Market data refresh every minute - this.addRecurringJob({ - type: 'market-data', - service: 'market-data', - provider: 'unified-data', - operation: 'refresh-cache', - payload: { symbols: ['AAPL', 'GOOGL', 'MSFT'] } - }, '*/1 * * * *'); + private async setupScheduledTasks() { + try { + // Market data refresh every minute using Yahoo Finance + await this.addRecurringJob({ + type: 'market-data-refresh', + service: 'market-data', + provider: 'yahoo-finance', + operation: 'live-data', + payload: { symbol: 'AAPL' } + }, '*/1 * * * *'); - // Proxy check every 15 minutes - this.addRecurringJob({ - type: 'proxy-fetch', - service: 'proxy', - provider: 'proxy-service', - operation: 'fetch-and-check', - payload: {} - }, '*/15 * * * *'); + // Market data refresh using QuoteMedia every 2 minutes + await this.addRecurringJob({ + type: 'market-data-quotemedia', + service: 'market-data', + provider: 'quotemedia', + operation: 'batch-quotes', + payload: { symbols: ['GOOGL', 'MSFT', 'TSLA'] } + }, '*/2 * * * *'); - this.logger.info('Scheduled tasks configured'); + // Proxy fetch every 15 minutes + await this.addRecurringJob({ + type: 'proxy-maintenance', + service: 'proxy', + provider: 'proxy-service', + operation: 'fetch-and-check', + payload: {} + }, '*/15 * * * *'); + + // Proxy cleanup daily at 2 AM + await this.addRecurringJob({ + type: 'proxy-cleanup', + service: 'proxy', + provider: 'proxy-service', + operation: 'cleanup-old-data', + payload: { daysToKeep: 7 } + }, '0 2 * * *'); + + this.logger.info('Scheduled tasks configured'); + } catch (error) { + this.logger.error('Failed to setup scheduled tasks', error); + } } + async addJob(jobData: JobData, options?: any) { if (!this.isInitialized) { throw new Error('Queue service not initialized. Call initialize() first.'); @@ -185,7 +232,9 @@ export class QueueService { failed: failed.length, delayed: delayed.length }; - } async getQueueStatus() { + } + + async getQueueStatus() { if (!this.isInitialized) { throw new Error('Queue service not initialized. Call initialize() first.'); } @@ -209,11 +258,14 @@ export class QueueService { } getRegisteredProviders() { - return [ - { name: 'proxy-service', type: 'proxy', operations: ['fetch-and-check', 'check-specific'] }, - { name: 'market-data-provider', type: 'market-data', operations: ['live-data', 'historical-data'] } - ]; + return providerRegistry.getProviders().map(({ key, config }) => ({ + key, + name: config.name, + service: config.service, + operations: Object.keys(config.operations) + })); } + async shutdown() { if (!this.isInitialized) { this.logger.warn('Queue service not initialized, nothing to shutdown');