stock-bot/apps/dashboard/src/app/services/websocket.service.ts

218 lines
6.2 KiB
TypeScript

import { Injectable, signal } from '@angular/core';
import { BehaviorSubject, Observable, Subject } from 'rxjs';
import { filter, map } from 'rxjs/operators';
export interface WebSocketMessage {
type: string;
data: any;
timestamp: string;
}
export interface MarketDataUpdate {
symbol: string;
price: number;
change: number;
changePercent: number;
volume: number;
timestamp: string;
}
export interface RiskAlert {
id: string;
symbol: string;
alertType: 'POSITION_LIMIT' | 'DAILY_LOSS' | 'VOLATILITY' | 'PORTFOLIO_RISK';
message: string;
severity: 'LOW' | 'MEDIUM' | 'HIGH';
timestamp: string;
}
@Injectable({
providedIn: 'root'
})
export class WebSocketService {
private readonly WS_ENDPOINTS = {
marketData: 'ws://localhost:3001/ws',
riskGuardian: 'ws://localhost:3002/ws',
strategyOrchestrator: 'ws://localhost:3003/ws'
};
private connections = new Map<string, WebSocket>();
private messageSubjects = new Map<string, Subject<WebSocketMessage>>();
// Connection status signals
public isConnected = signal<boolean>(false);
public connectionStatus = signal<{ [key: string]: boolean }>({
marketData: false,
riskGuardian: false,
strategyOrchestrator: false
});
constructor() {
this.initializeConnections();
}
private initializeConnections() {
// Initialize WebSocket connections for all services
Object.entries(this.WS_ENDPOINTS).forEach(([service, url]) => {
this.connect(service, url);
});
}
private connect(serviceName: string, url: string) {
try {
const ws = new WebSocket(url);
const messageSubject = new Subject<WebSocketMessage>();
ws.onopen = () => {
console.log(`Connected to ${serviceName} WebSocket`);
this.updateConnectionStatus(serviceName, true);
};
ws.onmessage = (event) => {
try {
const message: WebSocketMessage = JSON.parse(event.data);
messageSubject.next(message);
} catch (error) {
console.error(`Failed to parse WebSocket message from ${serviceName}:`, error);
}
};
ws.onclose = () => {
console.log(`Disconnected from ${serviceName} WebSocket`);
this.updateConnectionStatus(serviceName, false);
// Attempt to reconnect after 5 seconds
setTimeout(() => {
this.connect(serviceName, url);
}, 5000);
};
ws.onerror = (error) => {
console.error(`WebSocket error for ${serviceName}:`, error);
this.updateConnectionStatus(serviceName, false);
};
this.connections.set(serviceName, ws);
this.messageSubjects.set(serviceName, messageSubject);
} catch (error) {
console.error(`Failed to connect to ${serviceName} WebSocket:`, error);
this.updateConnectionStatus(serviceName, false);
}
}
private updateConnectionStatus(serviceName: string, isConnected: boolean) {
const currentStatus = this.connectionStatus();
const newStatus = { ...currentStatus, [serviceName]: isConnected };
this.connectionStatus.set(newStatus);
// Update overall connection status
const overallConnected = Object.values(newStatus).some(status => status);
this.isConnected.set(overallConnected);
}
// Market Data Updates
getMarketDataUpdates(): Observable<MarketDataUpdate> {
const subject = this.messageSubjects.get('marketData');
if (!subject) {
throw new Error('Market data WebSocket not initialized');
}
return subject.asObservable().pipe(
filter(message => message.type === 'market_data_update'),
map(message => message.data as MarketDataUpdate)
);
}
// Risk Alerts
getRiskAlerts(): Observable<RiskAlert> {
const subject = this.messageSubjects.get('riskGuardian');
if (!subject) {
throw new Error('Risk Guardian WebSocket not initialized');
}
return subject.asObservable().pipe(
filter(message => message.type === 'risk_alert'),
map(message => message.data as RiskAlert)
);
}
// Strategy Updates
getStrategyUpdates(): Observable<any> {
const subject = this.messageSubjects.get('strategyOrchestrator');
if (!subject) {
throw new Error('Strategy Orchestrator WebSocket not initialized');
}
return subject.asObservable().pipe(
filter(message => message.type === 'strategy_update'),
map(message => message.data)
);
}
// Strategy Signals
getStrategySignals(strategyId?: string): Observable<any> {
const subject = this.messageSubjects.get('strategyOrchestrator');
if (!subject) {
throw new Error('Strategy Orchestrator WebSocket not initialized');
}
return subject.asObservable().pipe(
filter(message =>
message.type === 'strategy_signal' &&
(!strategyId || message.data.strategyId === strategyId)
),
map(message => message.data)
);
}
// Strategy Trades
getStrategyTrades(strategyId?: string): Observable<any> {
const subject = this.messageSubjects.get('strategyOrchestrator');
if (!subject) {
throw new Error('Strategy Orchestrator WebSocket not initialized');
}
return subject.asObservable().pipe(
filter(message =>
message.type === 'strategy_trade' &&
(!strategyId || message.data.strategyId === strategyId)
),
map(message => message.data)
);
}
// All strategy-related messages, useful for components that need all types
getAllStrategyMessages(): Observable<WebSocketMessage> {
const subject = this.messageSubjects.get('strategyOrchestrator');
if (!subject) {
throw new Error('Strategy Orchestrator WebSocket not initialized');
}
return subject.asObservable().pipe(
filter(message =>
message.type.startsWith('strategy_')
)
);
}
// Send messages
sendMessage(serviceName: string, message: any) {
const ws = this.connections.get(serviceName);
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(message));
} else {
console.warn(`Cannot send message to ${serviceName}: WebSocket not connected`);
}
}
// Cleanup
disconnect() {
this.connections.forEach((ws, serviceName) => {
if (ws.readyState === WebSocket.OPEN) {
ws.close();
}
});
this.connections.clear();
this.messageSubjects.clear();
}
}