initial data-service started
This commit is contained in:
parent
8681c34529
commit
f68e620c76
6 changed files with 531 additions and 178 deletions
98
.vscode/tasks.json
vendored
98
.vscode/tasks.json
vendored
|
|
@ -2,98 +2,18 @@
|
||||||
"version": "2.0.0",
|
"version": "2.0.0",
|
||||||
"tasks": [
|
"tasks": [
|
||||||
{
|
{
|
||||||
"label": "Start Market Data Gateway",
|
"label": "Start Data Service",
|
||||||
"type": "shell",
|
"type": "shell",
|
||||||
"command": "bun",
|
"command": "bun",
|
||||||
"args": ["run", "dev"],
|
"args": [
|
||||||
"options": {
|
"run",
|
||||||
"cwd": "${workspaceFolder}/apps/core-services/market-data-gateway"
|
"dev"
|
||||||
},
|
|
||||||
"group": "build",
|
|
||||||
"presentation": {
|
|
||||||
"echo": true,
|
|
||||||
"reveal": "always",
|
|
||||||
"focus": false,
|
|
||||||
"panel": "new",
|
|
||||||
"showReuseMessage": true,
|
|
||||||
"clear": false
|
|
||||||
},
|
|
||||||
"isBackground": true,
|
|
||||||
"problemMatcher": []
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"label": "Start Trading Dashboard",
|
|
||||||
"type": "shell",
|
|
||||||
"command": "bun",
|
|
||||||
"args": ["run", "dev"],
|
|
||||||
"options": {
|
|
||||||
"cwd": "${workspaceFolder}/apps/interface-services/trading-dashboard"
|
|
||||||
},
|
|
||||||
"group": "build",
|
|
||||||
"presentation": {
|
|
||||||
"echo": true,
|
|
||||||
"reveal": "always",
|
|
||||||
"focus": false,
|
|
||||||
"panel": "new",
|
|
||||||
"showReuseMessage": true,
|
|
||||||
"clear": false
|
|
||||||
},
|
|
||||||
"isBackground": true,
|
|
||||||
"problemMatcher": []
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"label": "Start Strategy Orchestrator",
|
|
||||||
"type": "shell",
|
|
||||||
"command": "bun",
|
|
||||||
"args": ["run", "dev"],
|
|
||||||
"options": {
|
|
||||||
"cwd": "${workspaceFolder}/apps/intelligence-services/strategy-orchestrator"
|
|
||||||
},
|
|
||||||
"group": "build",
|
|
||||||
"presentation": {
|
|
||||||
"echo": true,
|
|
||||||
"reveal": "always",
|
|
||||||
"focus": false,
|
|
||||||
"panel": "new",
|
|
||||||
"showReuseMessage": true,
|
|
||||||
"clear": false
|
|
||||||
},
|
|
||||||
"isBackground": true,
|
|
||||||
"problemMatcher": []
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"label": "Start Risk Guardian",
|
|
||||||
"type": "shell",
|
|
||||||
"command": "bun",
|
|
||||||
"args": ["run", "dev"],
|
|
||||||
"options": {
|
|
||||||
"cwd": "${workspaceFolder}/apps/core-services/risk-guardian"
|
|
||||||
},
|
|
||||||
"group": "build",
|
|
||||||
"presentation": {
|
|
||||||
"echo": true,
|
|
||||||
"reveal": "always",
|
|
||||||
"focus": false,
|
|
||||||
"panel": "new",
|
|
||||||
"showReuseMessage": true,
|
|
||||||
"clear": false
|
|
||||||
},
|
|
||||||
"isBackground": true,
|
|
||||||
"problemMatcher": []
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"label": "Start All Services",
|
|
||||||
"dependsOn": [
|
|
||||||
"Start Market Data Gateway",
|
|
||||||
"Start Trading Dashboard",
|
|
||||||
"Start Strategy Orchestrator",
|
|
||||||
"Start Risk Guardian"
|
|
||||||
],
|
],
|
||||||
"dependsOrder": "parallel",
|
"group": "build",
|
||||||
"group": {
|
"isBackground": true,
|
||||||
"kind": "build",
|
"problemMatcher": [
|
||||||
"isDefault": true
|
"$tsc"
|
||||||
}
|
]
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,65 +0,0 @@
|
||||||
import { getLogger, shutdownLoggers } from '@stock-bot/logger';
|
|
||||||
import { onShutdown, setShutdownTimeout, initiateShutdown } from '@stock-bot/shutdown';
|
|
||||||
|
|
||||||
const logger = getLogger('shutdown-test');
|
|
||||||
|
|
||||||
logger.info('🚀 Starting graceful shutdown test...');
|
|
||||||
|
|
||||||
// Configure shutdown
|
|
||||||
setShutdownTimeout(10000); // 10 seconds
|
|
||||||
|
|
||||||
// Register multiple shutdown handlers
|
|
||||||
onShutdown(async () => {
|
|
||||||
logger.info('🔧 Shutdown handler 1: Cleaning up resources...');
|
|
||||||
await new Promise(resolve => setTimeout(resolve, 1000));
|
|
||||||
logger.info('✅ Shutdown handler 1 completed');
|
|
||||||
});
|
|
||||||
|
|
||||||
onShutdown(async () => {
|
|
||||||
logger.info('🔧 Shutdown handler 2: Closing connections...');
|
|
||||||
await new Promise(resolve => setTimeout(resolve, 500));
|
|
||||||
logger.info('✅ Shutdown handler 2 completed');
|
|
||||||
});
|
|
||||||
|
|
||||||
onShutdown(() => {
|
|
||||||
logger.info('🔧 Shutdown handler 3: Final cleanup (sync)');
|
|
||||||
logger.info('✅ Shutdown handler 3 completed');
|
|
||||||
});
|
|
||||||
|
|
||||||
onShutdown(async () => {
|
|
||||||
logger.info('🔧 Shutdown handler 4: Logger cleanup');
|
|
||||||
try {
|
|
||||||
await shutdownLoggers();
|
|
||||||
console.log('✅ Logger shutdown completed');
|
|
||||||
} catch (error) {
|
|
||||||
console.error('❌ Logger shutdown failed:', error);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Simulate some work
|
|
||||||
let counter = 0;
|
|
||||||
const workInterval = setInterval(() => {
|
|
||||||
counter++;
|
|
||||||
logger.info(`🔄 Working... ${counter}`);
|
|
||||||
|
|
||||||
if (counter >= 5) {
|
|
||||||
logger.info('🎯 Work completed, triggering graceful shutdown in 2 seconds...');
|
|
||||||
setTimeout(async () => {
|
|
||||||
logger.info('📡 Initiating manual graceful shutdown...');
|
|
||||||
try {
|
|
||||||
await initiateShutdown();
|
|
||||||
} catch (error) {
|
|
||||||
logger.error('Manual shutdown failed', error);
|
|
||||||
process.exit(1);
|
|
||||||
}
|
|
||||||
}, 2000);
|
|
||||||
clearInterval(workInterval);
|
|
||||||
}
|
|
||||||
}, 1000);
|
|
||||||
|
|
||||||
// Log process info
|
|
||||||
logger.info('📊 Process info', {
|
|
||||||
pid: process.pid,
|
|
||||||
platform: process.platform,
|
|
||||||
node: process.version
|
|
||||||
});
|
|
||||||
|
|
@ -1,10 +1,13 @@
|
||||||
/**
|
/**
|
||||||
* Data Service - Combined live and historical data ingestion
|
* Data Service - Combined live and historical data ingestion with queue-based architecture
|
||||||
*/
|
*/
|
||||||
import { getLogger } from '@stock-bot/logger';
|
import { getLogger } from '@stock-bot/logger';
|
||||||
import { loadEnvVariables } from '@stock-bot/config';
|
import { loadEnvVariables } from '@stock-bot/config';
|
||||||
import { Hono } from 'hono';
|
import { Hono } from 'hono';
|
||||||
import { serve } from '@hono/node-server';
|
import { serve } from '@hono/node-server';
|
||||||
|
import { queueManager } from './services/queue-manager.service';
|
||||||
|
import { proxyService } from './services/proxy.service';
|
||||||
|
import { marketDataProvider } from './providers/market-data.provider';
|
||||||
|
|
||||||
// Load environment variables
|
// Load environment variables
|
||||||
loadEnvVariables();
|
loadEnvVariables();
|
||||||
|
|
@ -18,20 +21,48 @@ app.get('/health', (c) => {
|
||||||
return c.json({
|
return c.json({
|
||||||
service: 'data-service',
|
service: 'data-service',
|
||||||
status: 'healthy',
|
status: 'healthy',
|
||||||
timestamp: new Date().toISOString()
|
timestamp: new Date().toISOString(),
|
||||||
|
queue: {
|
||||||
|
status: 'running',
|
||||||
|
workers: queueManager.getWorkerCount()
|
||||||
|
}
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
// API routes
|
// Queue management endpoints
|
||||||
|
app.get('/api/queue/status', async (c) => {
|
||||||
|
try {
|
||||||
|
const status = await queueManager.getQueueStatus();
|
||||||
|
return c.json({ status: 'success', data: status });
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('Failed to get queue status', { error });
|
||||||
|
return c.json({ status: 'error', message: 'Failed to get queue status' }, 500);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
app.post('/api/queue/job', async (c) => {
|
||||||
|
try {
|
||||||
|
const jobData = await c.req.json();
|
||||||
|
const job = await queueManager.addJob(jobData);
|
||||||
|
return c.json({ status: 'success', jobId: job.id });
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('Failed to add job', { error });
|
||||||
|
return c.json({ status: 'error', message: 'Failed to add job' }, 500);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Market data endpoints
|
||||||
app.get('/api/live/:symbol', async (c) => {
|
app.get('/api/live/:symbol', async (c) => {
|
||||||
const symbol = c.req.param('symbol');
|
const symbol = c.req.param('symbol');
|
||||||
logger.info('Live data request', { symbol });
|
logger.info('Live data request', { symbol });
|
||||||
|
|
||||||
// TODO: Implement live data fetching
|
try {
|
||||||
return c.json({
|
const data = await marketDataProvider.getLiveData(symbol);
|
||||||
symbol,
|
return c.json({ status: 'success', symbol, data });
|
||||||
message: 'Live data endpoint - not implemented yet'
|
} catch (error) {
|
||||||
});
|
logger.error('Failed to get live data', { symbol, error });
|
||||||
|
return c.json({ status: 'error', message: 'Failed to get live data' }, 500);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
app.get('/api/historical/:symbol', async (c) => {
|
app.get('/api/historical/:symbol', async (c) => {
|
||||||
|
|
@ -41,19 +72,89 @@ app.get('/api/historical/:symbol', async (c) => {
|
||||||
|
|
||||||
logger.info('Historical data request', { symbol, from, to });
|
logger.info('Historical data request', { symbol, from, to });
|
||||||
|
|
||||||
// TODO: Implement historical data fetching
|
try {
|
||||||
return c.json({
|
const fromDate = from ? new Date(from) : new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); // 30 days ago
|
||||||
symbol,
|
const toDate = to ? new Date(to) : new Date(); // Now
|
||||||
from,
|
|
||||||
to,
|
const data = await marketDataProvider.getHistoricalData(symbol, fromDate, toDate);
|
||||||
message: 'Historical data endpoint - not implemented yet'
|
return c.json({ status: 'success', symbol, from, to, data });
|
||||||
});
|
} catch (error) {
|
||||||
|
logger.error('Failed to get historical data', { symbol, from, to, error });
|
||||||
|
return c.json({ status: 'error', message: 'Failed to get historical data' }, 500);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Proxy management endpoints
|
||||||
|
app.post('/api/proxy/fetch', async (c) => {
|
||||||
|
try {
|
||||||
|
const jobId = await proxyService.queueProxyFetch();
|
||||||
|
return c.json({ status: 'success', jobId, message: 'Proxy fetch job queued' });
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('Failed to queue proxy fetch', { error });
|
||||||
|
return c.json({ status: 'error', message: 'Failed to queue proxy fetch' }, 500);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
app.post('/api/proxy/check', async (c) => {
|
||||||
|
try {
|
||||||
|
const { proxies } = await c.req.json();
|
||||||
|
const jobId = await proxyService.queueProxyCheck(proxies);
|
||||||
|
return c.json({ status: 'success', jobId, message: 'Proxy check job queued' });
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('Failed to queue proxy check', { error });
|
||||||
|
return c.json({ status: 'error', message: 'Failed to queue proxy check' }, 500);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Provider registry endpoints
|
||||||
|
app.get('/api/providers', async (c) => {
|
||||||
|
try {
|
||||||
|
const providers = queueManager.getRegisteredProviders();
|
||||||
|
return c.json({ status: 'success', providers });
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('Failed to get providers', { error });
|
||||||
|
return c.json({ status: 'error', message: 'Failed to get providers' }, 500);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Initialize services
|
||||||
|
async function initializeServices() {
|
||||||
|
logger.info('Initializing data service...');
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Queue manager is initialized automatically when imported
|
||||||
|
logger.info('Queue manager initialized');
|
||||||
|
|
||||||
|
// Initialize providers
|
||||||
|
logger.info('All services initialized successfully');
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('Failed to initialize services', { error });
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Start server
|
// Start server
|
||||||
serve({
|
async function startServer() {
|
||||||
|
await initializeServices();
|
||||||
|
|
||||||
|
serve({
|
||||||
fetch: app.fetch,
|
fetch: app.fetch,
|
||||||
port: PORT,
|
port: PORT,
|
||||||
});
|
});
|
||||||
|
|
||||||
logger.info(`Data Service started on port ${PORT}`);
|
logger.info(`Data Service started on port ${PORT}`);
|
||||||
|
logger.info('Available endpoints:');
|
||||||
|
logger.info(' GET /health - Health check');
|
||||||
|
logger.info(' GET /api/queue/status - Queue status');
|
||||||
|
logger.info(' POST /api/queue/job - Add job to queue');
|
||||||
|
logger.info(' GET /api/live/:symbol - Live market data');
|
||||||
|
logger.info(' GET /api/historical/:symbol - Historical market data');
|
||||||
|
logger.info(' POST /api/proxy/fetch - Queue proxy fetch');
|
||||||
|
logger.info(' POST /api/proxy/check - Queue proxy check');
|
||||||
|
logger.info(' GET /api/providers - List registered providers');
|
||||||
|
}
|
||||||
|
|
||||||
|
startServer().catch(error => {
|
||||||
|
logger.error('Failed to start server', { error });
|
||||||
|
process.exit(1);
|
||||||
|
});
|
||||||
|
|
|
||||||
179
apps/data-service/src/providers/market-data.provider.ts
Normal file
179
apps/data-service/src/providers/market-data.provider.ts
Normal file
|
|
@ -0,0 +1,179 @@
|
||||||
|
import { Logger } from '@stock-bot/logger';
|
||||||
|
import { HttpClient } from '@stock-bot/http';
|
||||||
|
import createCache, { type CacheProvider } from '@stock-bot/cache';
|
||||||
|
|
||||||
|
export interface MarketDataResponse {
|
||||||
|
symbol: string;
|
||||||
|
price: number;
|
||||||
|
timestamp: Date;
|
||||||
|
volume?: number;
|
||||||
|
change?: number;
|
||||||
|
open?: number;
|
||||||
|
high?: number;
|
||||||
|
low?: number;
|
||||||
|
close?: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
export class MarketDataProvider {
|
||||||
|
private logger = new Logger('market-data-provider');
|
||||||
|
private httpClient: HttpClient;
|
||||||
|
private cache: CacheProvider = createCache('hybrid');
|
||||||
|
private readonly CACHE_TTL = 60; // 1 minute
|
||||||
|
|
||||||
|
constructor() {
|
||||||
|
this.httpClient = new HttpClient({
|
||||||
|
timeout: 10000,
|
||||||
|
}, this.logger);
|
||||||
|
}
|
||||||
|
async getLiveData(symbol: string): Promise<MarketDataResponse> {
|
||||||
|
const cacheKey = `market-data:${symbol}`;
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Check cache first
|
||||||
|
const cached = await this.cache.get(cacheKey) as MarketDataResponse | null;
|
||||||
|
if (cached) {
|
||||||
|
this.logger.debug('Returning cached market data', { symbol });
|
||||||
|
return cached;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate simulated data for demo
|
||||||
|
const data = this.generateSimulatedData(symbol);
|
||||||
|
|
||||||
|
// Cache the result
|
||||||
|
await this.cache.set(cacheKey, data, this.CACHE_TTL);
|
||||||
|
|
||||||
|
this.logger.info('Generated live market data', { symbol, price: data.price });
|
||||||
|
return data;
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.error('Error fetching market data', { symbol, error });
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
async getHistoricalData(symbol: string, from: Date, to: Date, interval: string = '1m'): Promise<MarketDataResponse[]> {
|
||||||
|
const cacheKey = `historical:${symbol}:${from.toISOString()}:${to.toISOString()}:${interval}`;
|
||||||
|
|
||||||
|
try {
|
||||||
|
const cached = await this.cache.get(cacheKey) as MarketDataResponse[] | null;
|
||||||
|
if (cached) {
|
||||||
|
this.logger.debug('Returning cached historical data', { symbol, from, to });
|
||||||
|
return cached;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate simulated historical data
|
||||||
|
const data = this.generateHistoricalData(symbol, from, to, interval);
|
||||||
|
|
||||||
|
// Cache for longer time (1 hour)
|
||||||
|
await this.cache.set(cacheKey, data, 3600);
|
||||||
|
|
||||||
|
this.logger.info('Generated historical market data', { symbol, from, to, count: data.length });
|
||||||
|
return data;
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.error('Error fetching historical data', { symbol, error });
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private generateSimulatedData(symbol: string): MarketDataResponse {
|
||||||
|
// Base prices for different symbols
|
||||||
|
const basePrices: { [key: string]: number } = {
|
||||||
|
'AAPL': 150,
|
||||||
|
'GOOGL': 2500,
|
||||||
|
'MSFT': 300,
|
||||||
|
'TSLA': 200,
|
||||||
|
'AMZN': 3000,
|
||||||
|
'NVDA': 400,
|
||||||
|
'META': 250,
|
||||||
|
'NFLX': 400
|
||||||
|
};
|
||||||
|
|
||||||
|
const basePrice = basePrices[symbol] || 100;
|
||||||
|
|
||||||
|
// Add some randomness (+/- 2%)
|
||||||
|
const variation = (Math.random() - 0.5) * 0.04; // ±2%
|
||||||
|
const price = basePrice * (1 + variation);
|
||||||
|
const change = variation * basePrice;
|
||||||
|
|
||||||
|
return {
|
||||||
|
symbol,
|
||||||
|
price: Math.round(price * 100) / 100,
|
||||||
|
timestamp: new Date(),
|
||||||
|
volume: Math.floor(Math.random() * 1000000) + 500000,
|
||||||
|
change: Math.round(change * 100) / 100,
|
||||||
|
open: Math.round((price - change) * 100) / 100,
|
||||||
|
high: Math.round((price + Math.abs(change * 0.5)) * 100) / 100,
|
||||||
|
low: Math.round((price - Math.abs(change * 0.5)) * 100) / 100,
|
||||||
|
close: Math.round(price * 100) / 100
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private generateHistoricalData(symbol: string, from: Date, to: Date, interval: string): MarketDataResponse[] {
|
||||||
|
const data: MarketDataResponse[] = [];
|
||||||
|
const intervalMs = this.parseInterval(interval);
|
||||||
|
|
||||||
|
let currentTime = new Date(from);
|
||||||
|
const endTime = new Date(to);
|
||||||
|
|
||||||
|
// Base price for the symbol
|
||||||
|
const basePrices: { [key: string]: number } = {
|
||||||
|
'AAPL': 150,
|
||||||
|
'GOOGL': 2500,
|
||||||
|
'MSFT': 300,
|
||||||
|
'TSLA': 200,
|
||||||
|
'AMZN': 3000,
|
||||||
|
'NVDA': 400,
|
||||||
|
'META': 250,
|
||||||
|
'NFLX': 400
|
||||||
|
};
|
||||||
|
|
||||||
|
let basePrice = basePrices[symbol] || 100;
|
||||||
|
|
||||||
|
while (currentTime <= endTime) {
|
||||||
|
// Add some trend and randomness
|
||||||
|
const trend = (Math.random() - 0.5) * 0.01; // Small trend
|
||||||
|
const variation = (Math.random() - 0.5) * 0.02; // Random variation
|
||||||
|
|
||||||
|
basePrice = basePrice * (1 + trend + variation);
|
||||||
|
const change = basePrice * variation;
|
||||||
|
|
||||||
|
data.push({
|
||||||
|
symbol,
|
||||||
|
price: Math.round(basePrice * 100) / 100,
|
||||||
|
timestamp: new Date(currentTime),
|
||||||
|
volume: Math.floor(Math.random() * 1000000) + 500000,
|
||||||
|
change: Math.round(change * 100) / 100,
|
||||||
|
open: Math.round((basePrice - change) * 100) / 100,
|
||||||
|
high: Math.round((basePrice + Math.abs(change * 0.5)) * 100) / 100,
|
||||||
|
low: Math.round((basePrice - Math.abs(change * 0.5)) * 100) / 100,
|
||||||
|
close: Math.round(basePrice * 100) / 100
|
||||||
|
});
|
||||||
|
|
||||||
|
currentTime = new Date(currentTime.getTime() + intervalMs);
|
||||||
|
}
|
||||||
|
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
|
||||||
|
private parseInterval(interval: string): number {
|
||||||
|
const value = parseInt(interval.slice(0, -1));
|
||||||
|
const unit = interval.slice(-1).toLowerCase();
|
||||||
|
|
||||||
|
switch (unit) {
|
||||||
|
case 's': return value * 1000;
|
||||||
|
case 'm': return value * 60 * 1000;
|
||||||
|
case 'h': return value * 60 * 60 * 1000;
|
||||||
|
case 'd': return value * 24 * 60 * 60 * 1000;
|
||||||
|
default: return 60 * 1000; // Default to 1 minute
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async clearCache(): Promise<void> {
|
||||||
|
this.logger.info('Clearing market data cache');
|
||||||
|
// Note: Cache provider limitations - would need proper key tracking
|
||||||
|
}
|
||||||
|
|
||||||
|
async shutdown(): Promise<void> {
|
||||||
|
this.logger.info('Shutting down MarketDataProvider');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export const marketDataProvider = new MarketDataProvider();
|
||||||
|
|
@ -72,16 +72,48 @@ export class ProxyService {
|
||||||
|
|
||||||
this.logger.info('ProxyService initialized');
|
this.logger.info('ProxyService initialized');
|
||||||
}
|
}
|
||||||
|
|
||||||
private async initializeScheduling() {
|
private async initializeScheduling() {
|
||||||
try {
|
try {
|
||||||
await queueService.scheduleRecurringTasks();
|
// Queue manager will handle scheduling
|
||||||
this.logger.info('Proxy scheduling initialized');
|
this.logger.info('Proxy scheduling will be handled by queue manager');
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
this.logger.error('Failed to initialize scheduling', { error });
|
this.logger.error('Failed to initialize scheduling', { error });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add queue integration methods
|
||||||
|
async queueProxyFetch(): Promise<string> {
|
||||||
|
const { queueManager } = await import('./queue-manager.service');
|
||||||
|
const job = await queueManager.addJob({
|
||||||
|
type: 'proxy-fetch',
|
||||||
|
service: 'proxy',
|
||||||
|
provider: 'proxy-service',
|
||||||
|
operation: 'fetch-and-check',
|
||||||
|
payload: {},
|
||||||
|
priority: 5
|
||||||
|
});
|
||||||
|
|
||||||
|
const jobId = job.id || 'unknown';
|
||||||
|
this.logger.info('Proxy fetch job queued', { jobId });
|
||||||
|
return jobId;
|
||||||
|
}
|
||||||
|
|
||||||
|
async queueProxyCheck(proxies: ProxyInfo[]): Promise<string> {
|
||||||
|
const { queueManager } = await import('./queue-manager.service');
|
||||||
|
const job = await queueManager.addJob({
|
||||||
|
type: 'proxy-check',
|
||||||
|
service: 'proxy',
|
||||||
|
provider: 'proxy-service',
|
||||||
|
operation: 'check-specific',
|
||||||
|
payload: { proxies },
|
||||||
|
priority: 3
|
||||||
|
});
|
||||||
|
|
||||||
|
const jobId = job.id || 'unknown';
|
||||||
|
this.logger.info('Proxy check job queued', { jobId, count: proxies.length });
|
||||||
|
return jobId;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
async fetchProxiesFromSources() : Promise<number> {
|
async fetchProxiesFromSources() : Promise<number> {
|
||||||
const sources = this.PROXY_SOURCES.map(source =>
|
const sources = this.PROXY_SOURCES.map(source =>
|
||||||
|
|
|
||||||
186
apps/data-service/src/services/queue-manager.service.ts
Normal file
186
apps/data-service/src/services/queue-manager.service.ts
Normal file
|
|
@ -0,0 +1,186 @@
|
||||||
|
import { Queue, Worker, QueueEvents } from 'bullmq';
|
||||||
|
import { Logger } from '@stock-bot/logger';
|
||||||
|
|
||||||
|
export interface JobData {
|
||||||
|
type: 'proxy-fetch' | 'proxy-check' | 'market-data' | 'historical-data';
|
||||||
|
service: 'proxy' | 'market-data' | 'analytics';
|
||||||
|
provider: string;
|
||||||
|
operation: string;
|
||||||
|
payload: any;
|
||||||
|
priority?: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
export class QueueManagerService {
|
||||||
|
private logger = new Logger('queue-manager');
|
||||||
|
private queue: Queue;
|
||||||
|
private worker: Worker;
|
||||||
|
private queueEvents: QueueEvents;
|
||||||
|
|
||||||
|
constructor() {
|
||||||
|
const connection = {
|
||||||
|
host: process.env.DRAGONFLY_HOST || 'localhost',
|
||||||
|
port: parseInt(process.env.DRAGONFLY_PORT || '6379'),
|
||||||
|
};
|
||||||
|
|
||||||
|
this.queue = new Queue('data-service-queue', { connection });
|
||||||
|
this.worker = new Worker('data-service-queue', this.processJob.bind(this), {
|
||||||
|
connection,
|
||||||
|
concurrency: 10
|
||||||
|
});
|
||||||
|
this.queueEvents = new QueueEvents('data-service-queue', { connection });
|
||||||
|
|
||||||
|
this.setupEventListeners();
|
||||||
|
this.setupScheduledTasks();
|
||||||
|
}
|
||||||
|
|
||||||
|
private async processJob(job: any) {
|
||||||
|
const { type, service, provider, operation, payload }: JobData = job.data;
|
||||||
|
|
||||||
|
this.logger.info('Processing job', { id: job.id, type, service, provider, operation });
|
||||||
|
|
||||||
|
try {
|
||||||
|
switch (type) {
|
||||||
|
case 'proxy-fetch':
|
||||||
|
return await this.handleProxyFetch(payload);
|
||||||
|
case 'proxy-check':
|
||||||
|
return await this.handleProxyCheck(payload);
|
||||||
|
case 'market-data':
|
||||||
|
return await this.handleMarketData(payload);
|
||||||
|
case 'historical-data':
|
||||||
|
return await this.handleHistoricalData(payload);
|
||||||
|
default:
|
||||||
|
throw new Error(`Unknown job type: ${type}`);
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.error('Job failed', { id: job.id, type, error });
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async handleProxyFetch(payload: any) {
|
||||||
|
const { proxyService } = await import('./proxy.service');
|
||||||
|
return await proxyService.fetchProxiesFromSources();
|
||||||
|
}
|
||||||
|
|
||||||
|
private async handleProxyCheck(payload: { proxies: any[] }) {
|
||||||
|
const { proxyService } = await import('./proxy.service');
|
||||||
|
return await proxyService.checkProxies(payload.proxies);
|
||||||
|
}
|
||||||
|
private async handleMarketData(payload: { symbol: string }) {
|
||||||
|
const { marketDataProvider } = await import('../providers/market-data.provider.js');
|
||||||
|
return await marketDataProvider.getLiveData(payload.symbol);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async handleHistoricalData(payload: { symbol: string; from: Date; to: Date; interval: string }) {
|
||||||
|
const { marketDataProvider } = await import('../providers/market-data.provider.js');
|
||||||
|
return await marketDataProvider.getHistoricalData(payload.symbol, payload.from, payload.to, payload.interval);
|
||||||
|
}
|
||||||
|
|
||||||
|
private setupEventListeners() {
|
||||||
|
this.queueEvents.on('completed', (job) => {
|
||||||
|
this.logger.info('Job completed', { id: job.jobId });
|
||||||
|
});
|
||||||
|
|
||||||
|
this.queueEvents.on('failed', (job) => {
|
||||||
|
this.logger.error('Job failed', { id: job.jobId, error: job.failedReason });
|
||||||
|
});
|
||||||
|
|
||||||
|
this.worker.on('progress', (job, progress) => {
|
||||||
|
this.logger.debug('Job progress', { id: job.id, progress });
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private setupScheduledTasks() {
|
||||||
|
// Market data refresh every minute
|
||||||
|
this.addRecurringJob({
|
||||||
|
type: 'market-data',
|
||||||
|
service: 'market-data',
|
||||||
|
provider: 'unified-data',
|
||||||
|
operation: 'refresh-cache',
|
||||||
|
payload: { symbols: ['AAPL', 'GOOGL', 'MSFT'] }
|
||||||
|
}, '*/1 * * * *');
|
||||||
|
|
||||||
|
// Proxy check every 15 minutes
|
||||||
|
this.addRecurringJob({
|
||||||
|
type: 'proxy-fetch',
|
||||||
|
service: 'proxy',
|
||||||
|
provider: 'proxy-service',
|
||||||
|
operation: 'fetch-and-check',
|
||||||
|
payload: {}
|
||||||
|
}, '*/15 * * * *');
|
||||||
|
|
||||||
|
this.logger.info('Scheduled tasks configured');
|
||||||
|
}
|
||||||
|
|
||||||
|
async addJob(jobData: JobData, options?: any) {
|
||||||
|
return this.queue.add(jobData.type, jobData, {
|
||||||
|
priority: jobData.priority || 0,
|
||||||
|
removeOnComplete: 10,
|
||||||
|
removeOnFail: 5,
|
||||||
|
...options
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async addRecurringJob(jobData: JobData, cronPattern: string) {
|
||||||
|
return this.queue.add(
|
||||||
|
`recurring-${jobData.type}`,
|
||||||
|
jobData,
|
||||||
|
{
|
||||||
|
repeat: { pattern: cronPattern },
|
||||||
|
removeOnComplete: 1,
|
||||||
|
removeOnFail: 1,
|
||||||
|
jobId: `recurring-${jobData.service}-${jobData.provider}-${jobData.operation}`
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
async getJobStats() {
|
||||||
|
const [waiting, active, completed, failed, delayed] = await Promise.all([
|
||||||
|
this.queue.getWaiting(),
|
||||||
|
this.queue.getActive(),
|
||||||
|
this.queue.getCompleted(),
|
||||||
|
this.queue.getFailed(),
|
||||||
|
this.queue.getDelayed()
|
||||||
|
]);
|
||||||
|
|
||||||
|
return {
|
||||||
|
waiting: waiting.length,
|
||||||
|
active: active.length,
|
||||||
|
completed: completed.length,
|
||||||
|
failed: failed.length,
|
||||||
|
delayed: delayed.length
|
||||||
|
};
|
||||||
|
}
|
||||||
|
async getQueueStatus() {
|
||||||
|
const stats = await this.getJobStats();
|
||||||
|
return {
|
||||||
|
...stats,
|
||||||
|
workers: this.getWorkerCount(),
|
||||||
|
queue: this.queue.name,
|
||||||
|
connection: {
|
||||||
|
host: process.env.DRAGONFLY_HOST || 'localhost',
|
||||||
|
port: parseInt(process.env.DRAGONFLY_PORT || '6379')
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
getWorkerCount() {
|
||||||
|
return this.worker.opts.concurrency || 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
getRegisteredProviders() {
|
||||||
|
return [
|
||||||
|
{ name: 'proxy-service', type: 'proxy', operations: ['fetch-and-check', 'check-specific'] },
|
||||||
|
{ name: 'market-data-provider', type: 'market-data', operations: ['live-data', 'historical-data'] }
|
||||||
|
];
|
||||||
|
}
|
||||||
|
|
||||||
|
async shutdown() {
|
||||||
|
this.logger.info('Shutting down queue manager');
|
||||||
|
await this.worker.close();
|
||||||
|
await this.queue.close();
|
||||||
|
await this.queueEvents.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export const queueManager = new QueueManagerService();
|
||||||
Loading…
Add table
Add a link
Reference in a new issue