reorganized web-app

This commit is contained in:
Boki 2025-06-23 16:55:29 -04:00
parent 5c87f068d6
commit b67fe48f72
31 changed files with 1781 additions and 431 deletions

View file

@ -179,5 +179,82 @@ export function createMonitoringRoutes(container: IServiceContainer) {
});
});
/**
* Get service status for all microservices
*/
monitoring.get('/services', async (c) => {
try {
const services = await monitoringService.getServiceStatus();
return c.json({ services });
} catch (error) {
return c.json({
error: 'Failed to retrieve service status',
message: error instanceof Error ? error.message : 'Unknown error',
}, 500);
}
});
/**
* Get proxy statistics
*/
monitoring.get('/proxies', async (c) => {
try {
const stats = await monitoringService.getProxyStats();
return c.json(stats || { enabled: false });
} catch (error) {
return c.json({
error: 'Failed to retrieve proxy statistics',
message: error instanceof Error ? error.message : 'Unknown error',
}, 500);
}
});
/**
* Get comprehensive system overview
*/
monitoring.get('/overview', async (c) => {
try {
const overview = await monitoringService.getSystemOverview();
return c.json(overview);
} catch (error) {
return c.json({
error: 'Failed to retrieve system overview',
message: error instanceof Error ? error.message : 'Unknown error',
}, 500);
}
});
/**
* Test direct BullMQ queue access
*/
monitoring.get('/test/queue/:name', async (c) => {
const queueName = c.req.param('name');
const { Queue } = await import('bullmq');
const connection = {
host: 'localhost',
port: 6379,
db: 1,
};
const queue = new Queue(`{${queueName}}`, { connection });
try {
const counts = await queue.getJobCounts();
await queue.close();
return c.json({
queueName,
bullmqName: `{${queueName}}`,
counts
});
} catch (error: any) {
await queue.close();
return c.json({
queueName,
error: error.message
}, 500);
}
});
return monitoring;
}

View file

@ -11,8 +11,12 @@ import type {
DatabaseStats,
SystemHealth,
ServiceMetrics,
MetricSnapshot
MetricSnapshot,
ServiceStatus,
ProxyStats,
SystemOverview
} from '../types/monitoring.types';
import * as os from 'os';
export class MonitoringService {
private readonly logger = getLogger('monitoring-service');
@ -32,36 +36,30 @@ export class MonitoringService {
};
}
// Get Redis/Dragonfly info
const info = await this.container.cache.info();
const dbSize = await this.container.cache.dbsize();
// Parse memory stats from info
const memoryUsed = this.parseInfoValue(info, 'used_memory');
const memoryPeak = this.parseInfoValue(info, 'used_memory_peak');
// Parse stats
const hits = this.parseInfoValue(info, 'keyspace_hits');
const misses = this.parseInfoValue(info, 'keyspace_misses');
const evictedKeys = this.parseInfoValue(info, 'evicted_keys');
const expiredKeys = this.parseInfoValue(info, 'expired_keys');
// Check if cache is connected using the isReady method
const isConnected = this.container.cache.isReady();
if (!isConnected) {
return {
provider: 'dragonfly',
connected: false,
};
}
// Get cache stats from the provider
const cacheStats = this.container.cache.getStats();
// Since we can't access Redis info directly, we'll use what's available
return {
provider: 'dragonfly',
connected: true,
uptime: this.parseInfoValue(info, 'uptime_in_seconds'),
memoryUsage: {
used: memoryUsed,
peak: memoryPeak,
},
uptime: cacheStats.uptime,
stats: {
hits,
misses,
keys: dbSize,
evictedKeys,
expiredKeys,
hits: cacheStats.hits,
misses: cacheStats.misses,
keys: 0, // We can't get total keys without direct Redis access
evictedKeys: 0,
expiredKeys: 0,
},
info: this.parseRedisInfo(info),
};
} catch (error) {
this.logger.error('Failed to get cache stats', { error });
@ -80,47 +78,52 @@ export class MonitoringService {
try {
if (!this.container.queue) {
this.logger.warn('No queue manager available');
return stats;
}
// Get all queue names from the queue manager
const queueManager = this.container.queue;
const queueNames = ['default', 'proxy', 'qm', 'ib', 'ceo', 'webshare']; // Add your queue names
// Get all queue names from the SmartQueueManager
const queueManager = this.container.queue as any;
this.logger.debug('Queue manager type:', {
type: queueManager.constructor.name,
hasGetAllQueues: typeof queueManager.getAllQueues === 'function',
hasQueues: !!queueManager.queues,
hasGetQueue: typeof queueManager.getQueue === 'function'
});
// Always use the known queue names since web-api doesn't create worker queues
const queueNames = ['proxy', 'qm', 'ib', 'ceo', 'webshare', 'exchanges', 'symbols'];
this.logger.debug('Using known queue names', { count: queueNames.length, names: queueNames });
// Create BullMQ queues directly with the correct format
for (const queueName of queueNames) {
try {
const queue = queueManager.getQueue(queueName);
if (!queue) continue;
const [waiting, active, completed, failed, delayed, paused] = await Promise.all([
queue.getWaitingCount(),
queue.getActiveCount(),
queue.getCompletedCount(),
queue.getFailedCount(),
queue.getDelayedCount(),
queue.getPausedCount(),
]);
// Get worker info if available
const workers = queueManager.getWorker(queueName);
const workerInfo = workers ? {
count: 1, // Assuming single worker per queue
concurrency: workers.concurrency || 1,
} : undefined;
// Import BullMQ directly to create queue instances
const { Queue: BullMQQueue } = await import('bullmq');
const connection = {
host: 'localhost',
port: 6379,
db: 1, // Queue DB
};
// Create BullMQ queue with the correct format
const bullQueue = new BullMQQueue(`{${queueName}}`, { connection });
// Get stats directly from BullMQ
const queueStats = await this.getQueueStatsForBullQueue(bullQueue, queueName);
stats.push({
name: queueName,
connected: true,
jobs: {
waiting,
active,
completed,
failed,
delayed,
paused,
jobs: queueStats,
workers: {
count: 0,
concurrency: 1,
},
workers: workerInfo,
});
// Close the queue connection after getting stats
await bullQueue.close();
} catch (error) {
this.logger.warn(`Failed to get stats for queue ${queueName}`, { error });
stats.push({
@ -144,6 +147,162 @@ export class MonitoringService {
return stats;
}
/**
* Get stats for a BullMQ queue directly
*/
private async getQueueStatsForBullQueue(bullQueue: any, queueName: string) {
try {
// BullMQ provides getJobCounts which returns all counts at once
const counts = await bullQueue.getJobCounts();
return {
waiting: counts.waiting || 0,
active: counts.active || 0,
completed: counts.completed || 0,
failed: counts.failed || 0,
delayed: counts.delayed || 0,
paused: counts.paused || 0,
prioritized: counts.prioritized || 0,
'waiting-children': counts['waiting-children'] || 0,
};
} catch (error) {
this.logger.error(`Failed to get stats for BullMQ queue ${queueName}`, { error });
// Fallback to individual methods
try {
const [waiting, active, completed, failed, delayed, paused] = await Promise.all([
bullQueue.getWaitingCount(),
bullQueue.getActiveCount(),
bullQueue.getCompletedCount(),
bullQueue.getFailedCount(),
bullQueue.getDelayedCount(),
bullQueue.getPausedCount ? bullQueue.getPausedCount() : 0
]);
return {
waiting,
active,
completed,
failed,
delayed,
paused,
};
} catch (fallbackError) {
this.logger.error(`Fallback also failed for queue ${queueName}`, { fallbackError });
return this.getQueueStatsForQueue(bullQueue, queueName);
}
}
}
/**
* Get stats for a specific queue
*/
private async getQueueStatsForQueue(queue: any, queueName: string) {
// Check if it has the getStats method
if (queue.getStats && typeof queue.getStats === 'function') {
const stats = await queue.getStats();
return {
waiting: stats.waiting || 0,
active: stats.active || 0,
completed: stats.completed || 0,
failed: stats.failed || 0,
delayed: stats.delayed || 0,
paused: stats.paused || 0,
};
}
// Try individual count methods
const [waiting, active, completed, failed, delayed] = await Promise.all([
this.safeGetCount(queue, 'getWaitingCount', 'getWaiting'),
this.safeGetCount(queue, 'getActiveCount', 'getActive'),
this.safeGetCount(queue, 'getCompletedCount', 'getCompleted'),
this.safeGetCount(queue, 'getFailedCount', 'getFailed'),
this.safeGetCount(queue, 'getDelayedCount', 'getDelayed'),
]);
const paused = await this.safeGetPausedStatus(queue);
return {
waiting,
active,
completed,
failed,
delayed,
paused,
};
}
/**
* Safely get count from queue
*/
private async safeGetCount(queue: any, ...methodNames: string[]): Promise<number> {
for (const methodName of methodNames) {
if (queue[methodName] && typeof queue[methodName] === 'function') {
try {
const result = await queue[methodName]();
return Array.isArray(result) ? result.length : (result || 0);
} catch (e) {
// Continue to next method
}
}
}
return 0;
}
/**
* Get paused status
*/
private async safeGetPausedStatus(queue: any): Promise<number> {
try {
if (queue.isPaused && typeof queue.isPaused === 'function') {
const isPaused = await queue.isPaused();
return isPaused ? 1 : 0;
}
if (queue.getPausedCount && typeof queue.getPausedCount === 'function') {
return await queue.getPausedCount();
}
} catch (e) {
// Ignore
}
return 0;
}
/**
* Get worker info for a queue
*/
private getWorkerInfo(queue: any, queueManager: any, queueName: string) {
try {
// Check queue itself for worker info
if (queue.workers && Array.isArray(queue.workers)) {
return {
count: queue.workers.length,
concurrency: queue.workers[0]?.concurrency || 1,
};
}
// Check queue manager for worker config
if (queueManager.config?.defaultQueueOptions) {
const options = queueManager.config.defaultQueueOptions;
return {
count: options.workers || 1,
concurrency: options.concurrency || 1,
};
}
// Check for getWorkerCount method
if (queue.getWorkerCount && typeof queue.getWorkerCount === 'function') {
const count = queue.getWorkerCount();
return {
count,
concurrency: 1,
};
}
} catch (e) {
// Ignore
}
return undefined;
}
/**
* Get database statistics
*/
@ -187,7 +346,8 @@ export class MonitoringService {
if (this.container.mongodb) {
try {
const startTime = Date.now();
const db = this.container.mongodb.db();
const mongoClient = this.container.mongodb as any; // This is MongoDBClient
const db = mongoClient.getDatabase();
await db.admin().ping();
const latency = Date.now() - startTime;
@ -252,8 +412,10 @@ export class MonitoringService {
this.getDatabaseStats(),
]);
const memory = process.memoryUsage();
const processMemory = process.memoryUsage();
const systemMemory = this.getSystemMemory();
const uptime = Date.now() - this.startTime;
const cpuInfo = this.getCpuUsage();
// Determine overall health status
const errors: string[] = [];
@ -280,10 +442,15 @@ export class MonitoringService {
timestamp: new Date().toISOString(),
uptime,
memory: {
used: memory.heapUsed,
total: memory.heapTotal,
percentage: (memory.heapUsed / memory.heapTotal) * 100,
used: systemMemory.used,
total: systemMemory.total,
percentage: systemMemory.percentage,
heap: {
used: processMemory.heapUsed,
total: processMemory.heapTotal,
},
},
cpu: cpuInfo,
services: {
cache: cacheStats,
queues: queueStats,
@ -353,4 +520,217 @@ export class MonitoringService {
return result;
}
/**
* Get service status for all microservices
*/
async getServiceStatus(): Promise<ServiceStatus[]> {
const services: ServiceStatus[] = [];
// Define service endpoints
const serviceEndpoints = [
{ name: 'data-ingestion', port: 2001, path: '/health' },
{ name: 'data-pipeline', port: 2002, path: '/health' },
{ name: 'web-api', port: 2003, path: '/health' },
];
for (const service of serviceEndpoints) {
try {
const startTime = Date.now();
const response = await fetch(`http://localhost:${service.port}${service.path}`, {
signal: AbortSignal.timeout(5000), // 5 second timeout
});
const latency = Date.now() - startTime;
if (response.ok) {
const data = await response.json();
services.push({
name: service.name,
version: data.version || '1.0.0',
status: 'running',
port: service.port,
uptime: data.uptime || 0,
lastCheck: new Date().toISOString(),
healthy: true,
});
} else {
services.push({
name: service.name,
version: 'unknown',
status: 'error',
port: service.port,
uptime: 0,
lastCheck: new Date().toISOString(),
healthy: false,
error: `HTTP ${response.status}`,
});
}
} catch (error) {
services.push({
name: service.name,
version: 'unknown',
status: 'stopped',
port: service.port,
uptime: 0,
lastCheck: new Date().toISOString(),
healthy: false,
error: error instanceof Error ? error.message : 'Connection failed',
});
}
}
// Add current service (web-api)
services.push({
name: 'web-api',
version: '1.0.0',
status: 'running',
port: process.env.PORT ? parseInt(process.env.PORT) : 2003,
uptime: Date.now() - this.startTime,
lastCheck: new Date().toISOString(),
healthy: true,
});
return services;
}
/**
* Get proxy statistics
*/
async getProxyStats(): Promise<ProxyStats | null> {
try {
if (!this.container.proxy) {
return {
enabled: false,
totalProxies: 0,
workingProxies: 0,
failedProxies: 0,
};
}
const proxyManager = this.container.proxy as any;
// Check if proxy manager is ready
if (!proxyManager.isReady || !proxyManager.isReady()) {
return {
enabled: true,
totalProxies: 0,
workingProxies: 0,
failedProxies: 0,
};
}
const stats = proxyManager.getStats ? proxyManager.getStats() : null;
const lastFetchTime = proxyManager.getLastFetchTime ? proxyManager.getLastFetchTime() : null;
return {
enabled: true,
totalProxies: stats?.total || 0,
workingProxies: stats?.working || 0,
failedProxies: stats?.failed || 0,
lastUpdate: stats?.lastUpdate ? new Date(stats.lastUpdate).toISOString() : undefined,
lastFetchTime: lastFetchTime ? new Date(lastFetchTime).toISOString() : undefined,
};
} catch (error) {
this.logger.error('Failed to get proxy stats', { error });
return null;
}
}
/**
* Get comprehensive system overview
*/
async getSystemOverview(): Promise<SystemOverview> {
const [services, health, proxies] = await Promise.all([
this.getServiceStatus(),
this.getSystemHealth(),
this.getProxyStats(),
]);
return {
services,
health,
proxies: proxies || undefined,
environment: {
nodeVersion: process.version,
platform: os.platform(),
architecture: os.arch(),
hostname: os.hostname(),
},
};
}
/**
* Get detailed CPU usage
*/
private getCpuUsage() {
const cpus = os.cpus();
let totalIdle = 0;
let totalTick = 0;
cpus.forEach(cpu => {
for (const type in cpu.times) {
totalTick += cpu.times[type as keyof typeof cpu.times];
}
totalIdle += cpu.times.idle;
});
const idle = totalIdle / cpus.length;
const total = totalTick / cpus.length;
const usage = 100 - ~~(100 * idle / total);
return {
usage,
loadAverage: os.loadavg(),
cores: cpus.length,
};
}
/**
* Get system memory info
*/
private getSystemMemory() {
const totalMem = os.totalmem();
const freeMem = os.freemem();
// On Linux, freeMem includes buffers/cache, but we want "available" memory
// which better represents memory that can be used by applications
let availableMem = freeMem;
// Try to read from /proc/meminfo for more accurate memory stats on Linux
if (os.platform() === 'linux') {
try {
const fs = require('fs');
const meminfo = fs.readFileSync('/proc/meminfo', 'utf8');
const lines = meminfo.split('\n');
let memAvailable = 0;
let memTotal = 0;
for (const line of lines) {
if (line.startsWith('MemAvailable:')) {
memAvailable = parseInt(line.split(/\s+/)[1], 10) * 1024; // Convert from KB to bytes
} else if (line.startsWith('MemTotal:')) {
memTotal = parseInt(line.split(/\s+/)[1], 10) * 1024;
}
}
if (memAvailable > 0) {
availableMem = memAvailable;
}
} catch (error) {
// Fallback to os.freemem() if we can't read /proc/meminfo
this.logger.debug('Could not read /proc/meminfo', { error });
}
}
const usedMem = totalMem - availableMem;
return {
total: totalMem,
used: usedMem,
free: freeMem,
available: availableMem,
percentage: (usedMem / totalMem) * 100,
};
}
}

View file

@ -31,6 +31,8 @@ export interface QueueStats {
failed: number;
delayed: number;
paused: number;
prioritized?: number;
'waiting-children'?: number;
};
workers?: {
count: number;
@ -90,4 +92,36 @@ export interface ServiceMetrics {
averageResponseTime: MetricSnapshot;
errorRate: MetricSnapshot;
activeConnections: MetricSnapshot;
}
export interface ServiceStatus {
name: string;
version: string;
status: 'running' | 'stopped' | 'error';
port?: number;
uptime: number;
lastCheck: string;
healthy: boolean;
error?: string;
}
export interface ProxyStats {
enabled: boolean;
totalProxies: number;
workingProxies: number;
failedProxies: number;
lastUpdate?: string;
lastFetchTime?: string;
}
export interface SystemOverview {
services: ServiceStatus[];
health: SystemHealth;
proxies?: ProxyStats;
environment: {
nodeVersion: string;
platform: string;
architecture: string;
hostname: string;
};
}