moved most api stuff to web-api and built out a better monitoring solution for web-app
This commit is contained in:
parent
fbff428e90
commit
da1c52a841
45 changed files with 2986 additions and 312 deletions
|
|
@ -4,19 +4,23 @@
|
|||
*/
|
||||
|
||||
import { initializeStockConfig } from '@stock-bot/stock-config';
|
||||
import {
|
||||
ServiceApplication,
|
||||
createServiceContainerFromConfig,
|
||||
initializeServices as initializeAwilixServices,
|
||||
} from '@stock-bot/di';
|
||||
import { ServiceApplication } from '@stock-bot/di';
|
||||
import { getLogger } from '@stock-bot/logger';
|
||||
|
||||
// Local imports
|
||||
import { createRoutes } from './routes/create-routes';
|
||||
import { setupServiceContainer } from './container-setup';
|
||||
|
||||
// Initialize configuration with service-specific overrides
|
||||
const config = initializeStockConfig('webApi');
|
||||
|
||||
// Override queue settings for web-api (no workers needed)
|
||||
if (config.queue) {
|
||||
config.queue.workers = 0;
|
||||
config.queue.concurrency = 0;
|
||||
config.queue.enableScheduledJobs = false;
|
||||
config.queue.delayWorkerStart = true;
|
||||
}
|
||||
|
||||
console.log('Web API Service Configuration:', JSON.stringify(config, null, 2));
|
||||
|
||||
// Create service application
|
||||
|
|
@ -44,9 +48,15 @@ const app = new ServiceApplication(
|
|||
{
|
||||
// Custom lifecycle hooks
|
||||
onContainerReady: (container) => {
|
||||
// Setup service-specific configuration
|
||||
const enhancedContainer = setupServiceContainer(config, container);
|
||||
return enhancedContainer;
|
||||
// Override queue configuration to disable workers
|
||||
const config = container.cradle.config;
|
||||
if (config.queue) {
|
||||
config.queue.workers = 0;
|
||||
config.queue.concurrency = 0;
|
||||
config.queue.enableScheduledJobs = false;
|
||||
config.queue.delayWorkerStart = true;
|
||||
}
|
||||
return container;
|
||||
},
|
||||
onStarted: (port) => {
|
||||
const logger = getLogger('web-api');
|
||||
|
|
@ -57,16 +67,21 @@ const app = new ServiceApplication(
|
|||
|
||||
// Container factory function
|
||||
async function createContainer(config: any) {
|
||||
const container = createServiceContainerFromConfig(config, {
|
||||
enableQuestDB: false, // Web API doesn't need QuestDB
|
||||
enableMongoDB: true,
|
||||
enablePostgres: true,
|
||||
enableCache: true,
|
||||
enableQueue: false, // Web API doesn't need queue processing
|
||||
enableBrowser: false, // Web API doesn't need browser
|
||||
enableProxy: false, // Web API doesn't need proxy
|
||||
});
|
||||
await initializeAwilixServices(container);
|
||||
const { ServiceContainerBuilder } = await import('@stock-bot/di');
|
||||
|
||||
const container = await new ServiceContainerBuilder()
|
||||
.withConfig(config)
|
||||
.withOptions({
|
||||
enableQuestDB: false, // Disable QuestDB for now
|
||||
enableMongoDB: true,
|
||||
enablePostgres: true,
|
||||
enableCache: true,
|
||||
enableQueue: true, // Enable for pipeline operations
|
||||
enableBrowser: false, // Web API doesn't need browser
|
||||
enableProxy: false, // Web API doesn't need proxy
|
||||
})
|
||||
.build(); // This automatically initializes services
|
||||
|
||||
return container;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,23 +1,29 @@
|
|||
/**
|
||||
* Route factory for web API service
|
||||
* Creates routes with access to the service container
|
||||
*/
|
||||
|
||||
import { Hono } from 'hono';
|
||||
import type { IServiceContainer } from '@stock-bot/handlers';
|
||||
import { createHealthRoutes } from './health.routes';
|
||||
import { createExchangeRoutes } from './exchange.routes';
|
||||
|
||||
export function createRoutes(container: IServiceContainer): Hono {
|
||||
const app = new Hono();
|
||||
|
||||
// Create routes with container
|
||||
const healthRoutes = createHealthRoutes(container);
|
||||
const exchangeRoutes = createExchangeRoutes(container);
|
||||
|
||||
// Mount routes
|
||||
app.route('/health', healthRoutes);
|
||||
app.route('/api/exchanges', exchangeRoutes);
|
||||
|
||||
return app;
|
||||
/**
|
||||
* Route factory for web API service
|
||||
* Creates routes with access to the service container
|
||||
*/
|
||||
|
||||
import { Hono } from 'hono';
|
||||
import type { IServiceContainer } from '@stock-bot/handlers';
|
||||
import { createHealthRoutes } from './health.routes';
|
||||
import { createExchangeRoutes } from './exchange.routes';
|
||||
import { createMonitoringRoutes } from './monitoring.routes';
|
||||
import { createPipelineRoutes } from './pipeline.routes';
|
||||
|
||||
export function createRoutes(container: IServiceContainer): Hono {
|
||||
const app = new Hono();
|
||||
|
||||
// Create routes with container
|
||||
const healthRoutes = createHealthRoutes(container);
|
||||
const exchangeRoutes = createExchangeRoutes(container);
|
||||
const monitoringRoutes = createMonitoringRoutes(container);
|
||||
const pipelineRoutes = createPipelineRoutes(container);
|
||||
|
||||
// Mount routes
|
||||
app.route('/health', healthRoutes);
|
||||
app.route('/api/exchanges', exchangeRoutes);
|
||||
app.route('/api/system/monitoring', monitoringRoutes);
|
||||
app.route('/api/pipeline', pipelineRoutes);
|
||||
|
||||
return app;
|
||||
}
|
||||
183
apps/stock/web-api/src/routes/monitoring.routes.ts
Normal file
183
apps/stock/web-api/src/routes/monitoring.routes.ts
Normal file
|
|
@ -0,0 +1,183 @@
|
|||
/**
|
||||
* Monitoring routes for system health and metrics
|
||||
*/
|
||||
|
||||
import { Hono } from 'hono';
|
||||
import type { IServiceContainer } from '@stock-bot/handlers';
|
||||
import { MonitoringService } from '../services/monitoring.service';
|
||||
|
||||
export function createMonitoringRoutes(container: IServiceContainer) {
|
||||
const monitoring = new Hono();
|
||||
const monitoringService = new MonitoringService(container);
|
||||
|
||||
/**
|
||||
* Get overall system health
|
||||
*/
|
||||
monitoring.get('/', async (c) => {
|
||||
try {
|
||||
const health = await monitoringService.getSystemHealth();
|
||||
|
||||
// Set appropriate status code based on health
|
||||
const statusCode = health.status === 'healthy' ? 200 :
|
||||
health.status === 'degraded' ? 503 : 500;
|
||||
|
||||
return c.json(health, statusCode);
|
||||
} catch (error) {
|
||||
return c.json({
|
||||
status: 'error',
|
||||
message: 'Failed to retrieve system health',
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
}, 500);
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* Get cache/Dragonfly statistics
|
||||
*/
|
||||
monitoring.get('/cache', async (c) => {
|
||||
try {
|
||||
const stats = await monitoringService.getCacheStats();
|
||||
return c.json(stats);
|
||||
} catch (error) {
|
||||
return c.json({
|
||||
error: 'Failed to retrieve cache statistics',
|
||||
message: error instanceof Error ? error.message : 'Unknown error',
|
||||
}, 500);
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* Get queue statistics
|
||||
*/
|
||||
monitoring.get('/queues', async (c) => {
|
||||
try {
|
||||
const stats = await monitoringService.getQueueStats();
|
||||
return c.json({ queues: stats });
|
||||
} catch (error) {
|
||||
return c.json({
|
||||
error: 'Failed to retrieve queue statistics',
|
||||
message: error instanceof Error ? error.message : 'Unknown error',
|
||||
}, 500);
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* Get specific queue statistics
|
||||
*/
|
||||
monitoring.get('/queues/:name', async (c) => {
|
||||
try {
|
||||
const queueName = c.req.param('name');
|
||||
const stats = await monitoringService.getQueueStats();
|
||||
const queueStats = stats.find(q => q.name === queueName);
|
||||
|
||||
if (!queueStats) {
|
||||
return c.json({
|
||||
error: 'Queue not found',
|
||||
message: `Queue '${queueName}' does not exist`,
|
||||
}, 404);
|
||||
}
|
||||
|
||||
return c.json(queueStats);
|
||||
} catch (error) {
|
||||
return c.json({
|
||||
error: 'Failed to retrieve queue statistics',
|
||||
message: error instanceof Error ? error.message : 'Unknown error',
|
||||
}, 500);
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* Get database statistics
|
||||
*/
|
||||
monitoring.get('/databases', async (c) => {
|
||||
try {
|
||||
const stats = await monitoringService.getDatabaseStats();
|
||||
return c.json({ databases: stats });
|
||||
} catch (error) {
|
||||
return c.json({
|
||||
error: 'Failed to retrieve database statistics',
|
||||
message: error instanceof Error ? error.message : 'Unknown error',
|
||||
}, 500);
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* Get specific database statistics
|
||||
*/
|
||||
monitoring.get('/databases/:type', async (c) => {
|
||||
try {
|
||||
const dbType = c.req.param('type') as 'postgres' | 'mongodb' | 'questdb';
|
||||
const stats = await monitoringService.getDatabaseStats();
|
||||
const dbStats = stats.find(db => db.type === dbType);
|
||||
|
||||
if (!dbStats) {
|
||||
return c.json({
|
||||
error: 'Database not found',
|
||||
message: `Database type '${dbType}' not found or not enabled`,
|
||||
}, 404);
|
||||
}
|
||||
|
||||
return c.json(dbStats);
|
||||
} catch (error) {
|
||||
return c.json({
|
||||
error: 'Failed to retrieve database statistics',
|
||||
message: error instanceof Error ? error.message : 'Unknown error',
|
||||
}, 500);
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* Get service metrics
|
||||
*/
|
||||
monitoring.get('/metrics', async (c) => {
|
||||
try {
|
||||
const metrics = await monitoringService.getServiceMetrics();
|
||||
return c.json(metrics);
|
||||
} catch (error) {
|
||||
return c.json({
|
||||
error: 'Failed to retrieve service metrics',
|
||||
message: error instanceof Error ? error.message : 'Unknown error',
|
||||
}, 500);
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* Get detailed cache info (Redis INFO command output)
|
||||
*/
|
||||
monitoring.get('/cache/info', async (c) => {
|
||||
try {
|
||||
if (!container.cache) {
|
||||
return c.json({
|
||||
error: 'Cache not available',
|
||||
message: 'Cache service is not enabled',
|
||||
}, 503);
|
||||
}
|
||||
|
||||
const info = await container.cache.info();
|
||||
const stats = await monitoringService.getCacheStats();
|
||||
|
||||
return c.json({
|
||||
parsed: stats,
|
||||
raw: info,
|
||||
});
|
||||
} catch (error) {
|
||||
return c.json({
|
||||
error: 'Failed to retrieve cache info',
|
||||
message: error instanceof Error ? error.message : 'Unknown error',
|
||||
}, 500);
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* Health check endpoint for monitoring
|
||||
*/
|
||||
monitoring.get('/ping', (c) => {
|
||||
return c.json({
|
||||
status: 'ok',
|
||||
timestamp: new Date().toISOString(),
|
||||
service: 'monitoring',
|
||||
});
|
||||
});
|
||||
|
||||
return monitoring;
|
||||
}
|
||||
135
apps/stock/web-api/src/routes/pipeline.routes.ts
Normal file
135
apps/stock/web-api/src/routes/pipeline.routes.ts
Normal file
|
|
@ -0,0 +1,135 @@
|
|||
/**
|
||||
* Pipeline Routes
|
||||
* API endpoints for data pipeline operations
|
||||
*/
|
||||
|
||||
import { Hono } from 'hono';
|
||||
import type { IServiceContainer } from '@stock-bot/handlers';
|
||||
import { getLogger } from '@stock-bot/logger';
|
||||
import { PipelineService } from '../services/pipeline.service';
|
||||
|
||||
const logger = getLogger('pipeline-routes');
|
||||
|
||||
export function createPipelineRoutes(container: IServiceContainer) {
|
||||
const pipeline = new Hono();
|
||||
const pipelineService = new PipelineService(container);
|
||||
|
||||
// Symbol sync endpoints
|
||||
pipeline.post('/symbols', async c => {
|
||||
try {
|
||||
const result = await pipelineService.syncQMSymbols();
|
||||
return c.json(result, result.success ? 200 : 503);
|
||||
} catch (error) {
|
||||
logger.error('Error in POST /symbols', { error });
|
||||
return c.json({ success: false, error: 'Internal server error' }, 500);
|
||||
}
|
||||
});
|
||||
|
||||
pipeline.post('/symbols/:provider', async c => {
|
||||
try {
|
||||
const provider = c.req.param('provider');
|
||||
const result = await pipelineService.syncProviderSymbols(provider);
|
||||
return c.json(result, result.success ? 200 : 503);
|
||||
} catch (error) {
|
||||
logger.error('Error in POST /symbols/:provider', { error });
|
||||
return c.json({ success: false, error: 'Internal server error' }, 500);
|
||||
}
|
||||
});
|
||||
|
||||
// Exchange sync endpoints
|
||||
pipeline.post('/exchanges', async c => {
|
||||
try {
|
||||
const result = await pipelineService.syncQMExchanges();
|
||||
return c.json(result, result.success ? 200 : 503);
|
||||
} catch (error) {
|
||||
logger.error('Error in POST /exchanges', { error });
|
||||
return c.json({ success: false, error: 'Internal server error' }, 500);
|
||||
}
|
||||
});
|
||||
|
||||
pipeline.post('/exchanges/all', async c => {
|
||||
try {
|
||||
const clearFirst = c.req.query('clear') === 'true';
|
||||
const result = await pipelineService.syncAllExchanges(clearFirst);
|
||||
return c.json(result, result.success ? 200 : 503);
|
||||
} catch (error) {
|
||||
logger.error('Error in POST /exchanges/all', { error });
|
||||
return c.json({ success: false, error: 'Internal server error' }, 500);
|
||||
}
|
||||
});
|
||||
|
||||
// Provider mapping sync endpoints
|
||||
pipeline.post('/provider-mappings/qm', async c => {
|
||||
try {
|
||||
const result = await pipelineService.syncQMProviderMappings();
|
||||
return c.json(result, result.success ? 200 : 503);
|
||||
} catch (error) {
|
||||
logger.error('Error in POST /provider-mappings/qm', { error });
|
||||
return c.json({ success: false, error: 'Internal server error' }, 500);
|
||||
}
|
||||
});
|
||||
|
||||
pipeline.post('/provider-mappings/ib', async c => {
|
||||
try {
|
||||
const result = await pipelineService.syncIBExchanges();
|
||||
return c.json(result, result.success ? 200 : 503);
|
||||
} catch (error) {
|
||||
logger.error('Error in POST /provider-mappings/ib', { error });
|
||||
return c.json({ success: false, error: 'Internal server error' }, 500);
|
||||
}
|
||||
});
|
||||
|
||||
// Status endpoint
|
||||
pipeline.get('/status', async c => {
|
||||
try {
|
||||
const result = await pipelineService.getSyncStatus();
|
||||
return c.json(result, result.success ? 200 : 503);
|
||||
} catch (error) {
|
||||
logger.error('Error in GET /status', { error });
|
||||
return c.json({ success: false, error: 'Internal server error' }, 500);
|
||||
}
|
||||
});
|
||||
|
||||
// Clear data endpoint
|
||||
pipeline.post('/clear/postgresql', async c => {
|
||||
try {
|
||||
const dataType = c.req.query('type') as 'exchanges' | 'provider_mappings' | 'all';
|
||||
const result = await pipelineService.clearPostgreSQLData(dataType || 'all');
|
||||
return c.json(result, result.success ? 200 : 503);
|
||||
} catch (error) {
|
||||
logger.error('Error in POST /clear/postgresql', { error });
|
||||
return c.json({ success: false, error: 'Internal server error' }, 500);
|
||||
}
|
||||
});
|
||||
|
||||
// Statistics endpoints
|
||||
pipeline.get('/stats/exchanges', async c => {
|
||||
try {
|
||||
const result = await pipelineService.getExchangeStats();
|
||||
if (result.success) {
|
||||
return c.json(result.data);
|
||||
} else {
|
||||
return c.json({ error: result.error }, 503);
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Error in GET /stats/exchanges', { error });
|
||||
return c.json({ error: 'Internal server error' }, 500);
|
||||
}
|
||||
});
|
||||
|
||||
pipeline.get('/stats/provider-mappings', async c => {
|
||||
try {
|
||||
const result = await pipelineService.getProviderMappingStats();
|
||||
if (result.success) {
|
||||
return c.json(result.data);
|
||||
} else {
|
||||
return c.json({ error: result.error }, 503);
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Error in GET /stats/provider-mappings', { error });
|
||||
return c.json({ error: 'Internal server error' }, 500);
|
||||
}
|
||||
});
|
||||
|
||||
return pipeline;
|
||||
}
|
||||
356
apps/stock/web-api/src/services/monitoring.service.ts
Normal file
356
apps/stock/web-api/src/services/monitoring.service.ts
Normal file
|
|
@ -0,0 +1,356 @@
|
|||
/**
|
||||
* Monitoring Service
|
||||
* Collects health and performance metrics from all system components
|
||||
*/
|
||||
|
||||
import type { IServiceContainer } from '@stock-bot/handlers';
|
||||
import { getLogger } from '@stock-bot/logger';
|
||||
import type {
|
||||
CacheStats,
|
||||
QueueStats,
|
||||
DatabaseStats,
|
||||
SystemHealth,
|
||||
ServiceMetrics,
|
||||
MetricSnapshot
|
||||
} from '../types/monitoring.types';
|
||||
|
||||
export class MonitoringService {
|
||||
private readonly logger = getLogger('monitoring-service');
|
||||
private startTime = Date.now();
|
||||
|
||||
constructor(private readonly container: IServiceContainer) {}
|
||||
|
||||
/**
|
||||
* Get cache/Dragonfly statistics
|
||||
*/
|
||||
async getCacheStats(): Promise<CacheStats> {
|
||||
try {
|
||||
if (!this.container.cache) {
|
||||
return {
|
||||
provider: 'dragonfly',
|
||||
connected: false,
|
||||
};
|
||||
}
|
||||
|
||||
// Get Redis/Dragonfly info
|
||||
const info = await this.container.cache.info();
|
||||
const dbSize = await this.container.cache.dbsize();
|
||||
|
||||
// Parse memory stats from info
|
||||
const memoryUsed = this.parseInfoValue(info, 'used_memory');
|
||||
const memoryPeak = this.parseInfoValue(info, 'used_memory_peak');
|
||||
|
||||
// Parse stats
|
||||
const hits = this.parseInfoValue(info, 'keyspace_hits');
|
||||
const misses = this.parseInfoValue(info, 'keyspace_misses');
|
||||
const evictedKeys = this.parseInfoValue(info, 'evicted_keys');
|
||||
const expiredKeys = this.parseInfoValue(info, 'expired_keys');
|
||||
|
||||
return {
|
||||
provider: 'dragonfly',
|
||||
connected: true,
|
||||
uptime: this.parseInfoValue(info, 'uptime_in_seconds'),
|
||||
memoryUsage: {
|
||||
used: memoryUsed,
|
||||
peak: memoryPeak,
|
||||
},
|
||||
stats: {
|
||||
hits,
|
||||
misses,
|
||||
keys: dbSize,
|
||||
evictedKeys,
|
||||
expiredKeys,
|
||||
},
|
||||
info: this.parseRedisInfo(info),
|
||||
};
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to get cache stats', { error });
|
||||
return {
|
||||
provider: 'dragonfly',
|
||||
connected: false,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get queue statistics
|
||||
*/
|
||||
async getQueueStats(): Promise<QueueStats[]> {
|
||||
const stats: QueueStats[] = [];
|
||||
|
||||
try {
|
||||
if (!this.container.queue) {
|
||||
return stats;
|
||||
}
|
||||
|
||||
// Get all queue names from the queue manager
|
||||
const queueManager = this.container.queue;
|
||||
const queueNames = ['default', 'proxy', 'qm', 'ib', 'ceo', 'webshare']; // Add your queue names
|
||||
|
||||
for (const queueName of queueNames) {
|
||||
try {
|
||||
const queue = queueManager.getQueue(queueName);
|
||||
if (!queue) continue;
|
||||
|
||||
const [waiting, active, completed, failed, delayed, paused] = await Promise.all([
|
||||
queue.getWaitingCount(),
|
||||
queue.getActiveCount(),
|
||||
queue.getCompletedCount(),
|
||||
queue.getFailedCount(),
|
||||
queue.getDelayedCount(),
|
||||
queue.getPausedCount(),
|
||||
]);
|
||||
|
||||
// Get worker info if available
|
||||
const workers = queueManager.getWorker(queueName);
|
||||
const workerInfo = workers ? {
|
||||
count: 1, // Assuming single worker per queue
|
||||
concurrency: workers.concurrency || 1,
|
||||
} : undefined;
|
||||
|
||||
stats.push({
|
||||
name: queueName,
|
||||
connected: true,
|
||||
jobs: {
|
||||
waiting,
|
||||
active,
|
||||
completed,
|
||||
failed,
|
||||
delayed,
|
||||
paused,
|
||||
},
|
||||
workers: workerInfo,
|
||||
});
|
||||
} catch (error) {
|
||||
this.logger.warn(`Failed to get stats for queue ${queueName}`, { error });
|
||||
stats.push({
|
||||
name: queueName,
|
||||
connected: false,
|
||||
jobs: {
|
||||
waiting: 0,
|
||||
active: 0,
|
||||
completed: 0,
|
||||
failed: 0,
|
||||
delayed: 0,
|
||||
paused: 0,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to get queue stats', { error });
|
||||
}
|
||||
|
||||
return stats;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get database statistics
|
||||
*/
|
||||
async getDatabaseStats(): Promise<DatabaseStats[]> {
|
||||
const stats: DatabaseStats[] = [];
|
||||
|
||||
// PostgreSQL stats
|
||||
if (this.container.postgres) {
|
||||
try {
|
||||
const startTime = Date.now();
|
||||
const result = await this.container.postgres.query('SELECT 1');
|
||||
const latency = Date.now() - startTime;
|
||||
|
||||
// Get pool stats
|
||||
const pool = (this.container.postgres as any).pool;
|
||||
const poolStats = pool ? {
|
||||
size: pool.totalCount || 0,
|
||||
active: pool.idleCount || 0,
|
||||
idle: pool.waitingCount || 0,
|
||||
max: pool.options?.max || 0,
|
||||
} : undefined;
|
||||
|
||||
stats.push({
|
||||
type: 'postgres',
|
||||
name: 'PostgreSQL',
|
||||
connected: true,
|
||||
latency,
|
||||
pool: poolStats,
|
||||
});
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to get PostgreSQL stats', { error });
|
||||
stats.push({
|
||||
type: 'postgres',
|
||||
name: 'PostgreSQL',
|
||||
connected: false,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// MongoDB stats
|
||||
if (this.container.mongodb) {
|
||||
try {
|
||||
const startTime = Date.now();
|
||||
const db = this.container.mongodb.db();
|
||||
await db.admin().ping();
|
||||
const latency = Date.now() - startTime;
|
||||
|
||||
const serverStatus = await db.admin().serverStatus();
|
||||
|
||||
stats.push({
|
||||
type: 'mongodb',
|
||||
name: 'MongoDB',
|
||||
connected: true,
|
||||
latency,
|
||||
stats: {
|
||||
version: serverStatus.version,
|
||||
uptime: serverStatus.uptime,
|
||||
connections: serverStatus.connections,
|
||||
opcounters: serverStatus.opcounters,
|
||||
},
|
||||
});
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to get MongoDB stats', { error });
|
||||
stats.push({
|
||||
type: 'mongodb',
|
||||
name: 'MongoDB',
|
||||
connected: false,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// QuestDB stats
|
||||
if (this.container.questdb) {
|
||||
try {
|
||||
const startTime = Date.now();
|
||||
// QuestDB health check
|
||||
const response = await fetch(`http://${process.env.QUESTDB_HOST || 'localhost'}:9000/exec?query=SELECT%201`);
|
||||
const latency = Date.now() - startTime;
|
||||
|
||||
stats.push({
|
||||
type: 'questdb',
|
||||
name: 'QuestDB',
|
||||
connected: response.ok,
|
||||
latency,
|
||||
});
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to get QuestDB stats', { error });
|
||||
stats.push({
|
||||
type: 'questdb',
|
||||
name: 'QuestDB',
|
||||
connected: false,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return stats;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get system health summary
|
||||
*/
|
||||
async getSystemHealth(): Promise<SystemHealth> {
|
||||
const [cacheStats, queueStats, databaseStats] = await Promise.all([
|
||||
this.getCacheStats(),
|
||||
this.getQueueStats(),
|
||||
this.getDatabaseStats(),
|
||||
]);
|
||||
|
||||
const memory = process.memoryUsage();
|
||||
const uptime = Date.now() - this.startTime;
|
||||
|
||||
// Determine overall health status
|
||||
const errors: string[] = [];
|
||||
|
||||
if (!cacheStats.connected) {
|
||||
errors.push('Cache service is disconnected');
|
||||
}
|
||||
|
||||
const disconnectedQueues = queueStats.filter(q => !q.connected);
|
||||
if (disconnectedQueues.length > 0) {
|
||||
errors.push(`${disconnectedQueues.length} queue(s) are disconnected`);
|
||||
}
|
||||
|
||||
const disconnectedDbs = databaseStats.filter(db => !db.connected);
|
||||
if (disconnectedDbs.length > 0) {
|
||||
errors.push(`${disconnectedDbs.length} database(s) are disconnected`);
|
||||
}
|
||||
|
||||
const status = errors.length === 0 ? 'healthy' :
|
||||
errors.length < 3 ? 'degraded' : 'unhealthy';
|
||||
|
||||
return {
|
||||
status,
|
||||
timestamp: new Date().toISOString(),
|
||||
uptime,
|
||||
memory: {
|
||||
used: memory.heapUsed,
|
||||
total: memory.heapTotal,
|
||||
percentage: (memory.heapUsed / memory.heapTotal) * 100,
|
||||
},
|
||||
services: {
|
||||
cache: cacheStats,
|
||||
queues: queueStats,
|
||||
databases: databaseStats,
|
||||
},
|
||||
errors: errors.length > 0 ? errors : undefined,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Get service metrics (placeholder for future implementation)
|
||||
*/
|
||||
async getServiceMetrics(): Promise<ServiceMetrics> {
|
||||
const now = new Date().toISOString();
|
||||
|
||||
return {
|
||||
requestsPerSecond: {
|
||||
timestamp: now,
|
||||
value: 0,
|
||||
unit: 'req/s',
|
||||
},
|
||||
averageResponseTime: {
|
||||
timestamp: now,
|
||||
value: 0,
|
||||
unit: 'ms',
|
||||
},
|
||||
errorRate: {
|
||||
timestamp: now,
|
||||
value: 0,
|
||||
unit: '%',
|
||||
},
|
||||
activeConnections: {
|
||||
timestamp: now,
|
||||
value: 0,
|
||||
unit: 'connections',
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse value from Redis INFO output
|
||||
*/
|
||||
private parseInfoValue(info: string, key: string): number {
|
||||
const match = info.match(new RegExp(`${key}:(\\d+)`));
|
||||
return match ? parseInt(match[1], 10) : 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse Redis INFO into structured object
|
||||
*/
|
||||
private parseRedisInfo(info: string): Record<string, any> {
|
||||
const result: Record<string, any> = {};
|
||||
const sections = info.split('\r\n\r\n');
|
||||
|
||||
for (const section of sections) {
|
||||
const lines = section.split('\r\n');
|
||||
const sectionName = lines[0]?.replace('# ', '') || 'general';
|
||||
result[sectionName] = {};
|
||||
|
||||
for (let i = 1; i < lines.length; i++) {
|
||||
const [key, value] = lines[i].split(':');
|
||||
if (key && value) {
|
||||
result[sectionName][key] = value;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
335
apps/stock/web-api/src/services/pipeline.service.ts
Normal file
335
apps/stock/web-api/src/services/pipeline.service.ts
Normal file
|
|
@ -0,0 +1,335 @@
|
|||
/**
|
||||
* Pipeline Service
|
||||
* Manages data pipeline operations by queuing jobs for the data-pipeline service
|
||||
*/
|
||||
|
||||
import type { IServiceContainer } from '@stock-bot/handlers';
|
||||
import { getLogger } from '@stock-bot/logger';
|
||||
|
||||
const logger = getLogger('pipeline-service');
|
||||
|
||||
export interface PipelineJobResult {
|
||||
success: boolean;
|
||||
jobId?: string;
|
||||
message?: string;
|
||||
error?: string;
|
||||
data?: any;
|
||||
}
|
||||
|
||||
export interface PipelineStatsResult {
|
||||
success: boolean;
|
||||
data?: any;
|
||||
error?: string;
|
||||
}
|
||||
|
||||
export class PipelineService {
|
||||
constructor(private container: IServiceContainer) {}
|
||||
|
||||
/**
|
||||
* Queue a job to sync symbols from QuestionsAndMethods
|
||||
*/
|
||||
async syncQMSymbols(): Promise<PipelineJobResult> {
|
||||
try {
|
||||
const queueManager = this.container.queue;
|
||||
if (!queueManager) {
|
||||
return { success: false, error: 'Queue manager not available' };
|
||||
}
|
||||
|
||||
const symbolsQueue = queueManager.getQueue('symbols');
|
||||
const job = await symbolsQueue.addJob('sync-qm-symbols', {
|
||||
handler: 'symbols',
|
||||
operation: 'sync-qm-symbols',
|
||||
payload: {},
|
||||
});
|
||||
|
||||
logger.info('QM symbols sync job queued', { jobId: job.id });
|
||||
return { success: true, jobId: job.id, message: 'QM symbols sync job queued' };
|
||||
} catch (error) {
|
||||
logger.error('Failed to queue QM symbols sync job', { error });
|
||||
return {
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Failed to queue sync job',
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Queue a job to sync exchanges from QuestionsAndMethods
|
||||
*/
|
||||
async syncQMExchanges(): Promise<PipelineJobResult> {
|
||||
try {
|
||||
const queueManager = this.container.queue;
|
||||
if (!queueManager) {
|
||||
return { success: false, error: 'Queue manager not available' };
|
||||
}
|
||||
|
||||
const exchangesQueue = queueManager.getQueue('exchanges');
|
||||
const job = await exchangesQueue.addJob('sync-qm-exchanges', {
|
||||
handler: 'exchanges',
|
||||
operation: 'sync-qm-exchanges',
|
||||
payload: {},
|
||||
});
|
||||
|
||||
logger.info('QM exchanges sync job queued', { jobId: job.id });
|
||||
return { success: true, jobId: job.id, message: 'QM exchanges sync job queued' };
|
||||
} catch (error) {
|
||||
logger.error('Failed to queue QM exchanges sync job', { error });
|
||||
return {
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Failed to queue sync job',
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Queue a job to sync symbols from a specific provider
|
||||
*/
|
||||
async syncProviderSymbols(provider: string): Promise<PipelineJobResult> {
|
||||
try {
|
||||
const queueManager = this.container.queue;
|
||||
if (!queueManager) {
|
||||
return { success: false, error: 'Queue manager not available' };
|
||||
}
|
||||
|
||||
const symbolsQueue = queueManager.getQueue('symbols');
|
||||
const job = await symbolsQueue.addJob('sync-symbols-from-provider', {
|
||||
handler: 'symbols',
|
||||
operation: 'sync-symbols-from-provider',
|
||||
payload: { provider },
|
||||
});
|
||||
|
||||
logger.info(`${provider} symbols sync job queued`, { jobId: job.id, provider });
|
||||
return {
|
||||
success: true,
|
||||
jobId: job.id,
|
||||
message: `${provider} symbols sync job queued`,
|
||||
};
|
||||
} catch (error) {
|
||||
logger.error('Failed to queue provider symbols sync job', { error, provider });
|
||||
return {
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Failed to queue sync job',
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Queue a job to sync all exchanges
|
||||
*/
|
||||
async syncAllExchanges(clearFirst: boolean = false): Promise<PipelineJobResult> {
|
||||
try {
|
||||
const queueManager = this.container.queue;
|
||||
if (!queueManager) {
|
||||
return { success: false, error: 'Queue manager not available' };
|
||||
}
|
||||
|
||||
const exchangesQueue = queueManager.getQueue('exchanges');
|
||||
const job = await exchangesQueue.addJob('sync-all-exchanges', {
|
||||
handler: 'exchanges',
|
||||
operation: 'sync-all-exchanges',
|
||||
payload: { clearFirst },
|
||||
});
|
||||
|
||||
logger.info('Enhanced exchanges sync job queued', { jobId: job.id, clearFirst });
|
||||
return {
|
||||
success: true,
|
||||
jobId: job.id,
|
||||
message: 'Enhanced exchanges sync job queued',
|
||||
};
|
||||
} catch (error) {
|
||||
logger.error('Failed to queue enhanced exchanges sync job', { error });
|
||||
return {
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Failed to queue sync job',
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Queue a job to sync QM provider mappings
|
||||
*/
|
||||
async syncQMProviderMappings(): Promise<PipelineJobResult> {
|
||||
try {
|
||||
const queueManager = this.container.queue;
|
||||
if (!queueManager) {
|
||||
return { success: false, error: 'Queue manager not available' };
|
||||
}
|
||||
|
||||
const exchangesQueue = queueManager.getQueue('exchanges');
|
||||
const job = await exchangesQueue.addJob('sync-qm-provider-mappings', {
|
||||
handler: 'exchanges',
|
||||
operation: 'sync-qm-provider-mappings',
|
||||
payload: {},
|
||||
});
|
||||
|
||||
logger.info('QM provider mappings sync job queued', { jobId: job.id });
|
||||
return {
|
||||
success: true,
|
||||
jobId: job.id,
|
||||
message: 'QM provider mappings sync job queued',
|
||||
};
|
||||
} catch (error) {
|
||||
logger.error('Failed to queue QM provider mappings sync job', { error });
|
||||
return {
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Failed to queue sync job',
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Queue a job to sync IB exchanges
|
||||
*/
|
||||
async syncIBExchanges(): Promise<PipelineJobResult> {
|
||||
try {
|
||||
const queueManager = this.container.queue;
|
||||
if (!queueManager) {
|
||||
return { success: false, error: 'Queue manager not available' };
|
||||
}
|
||||
|
||||
const exchangesQueue = queueManager.getQueue('exchanges');
|
||||
const job = await exchangesQueue.addJob('sync-ib-exchanges', {
|
||||
handler: 'exchanges',
|
||||
operation: 'sync-ib-exchanges',
|
||||
payload: {},
|
||||
});
|
||||
|
||||
logger.info('IB exchanges sync job queued', { jobId: job.id });
|
||||
return {
|
||||
success: true,
|
||||
jobId: job.id,
|
||||
message: 'IB exchanges sync job queued',
|
||||
};
|
||||
} catch (error) {
|
||||
logger.error('Failed to queue IB exchanges sync job', { error });
|
||||
return {
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Failed to queue sync job',
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get sync status
|
||||
*/
|
||||
async getSyncStatus(): Promise<PipelineJobResult> {
|
||||
try {
|
||||
const queueManager = this.container.queue;
|
||||
if (!queueManager) {
|
||||
return { success: false, error: 'Queue manager not available' };
|
||||
}
|
||||
|
||||
const symbolsQueue = queueManager.getQueue('symbols');
|
||||
const job = await symbolsQueue.addJob('sync-status', {
|
||||
handler: 'symbols',
|
||||
operation: 'sync-status',
|
||||
payload: {},
|
||||
});
|
||||
|
||||
logger.info('Sync status job queued', { jobId: job.id });
|
||||
return {
|
||||
success: true,
|
||||
jobId: job.id,
|
||||
message: 'Sync status job queued',
|
||||
};
|
||||
} catch (error) {
|
||||
logger.error('Failed to queue sync status job', { error });
|
||||
return {
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Failed to queue status job',
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear PostgreSQL data
|
||||
*/
|
||||
async clearPostgreSQLData(
|
||||
dataType: 'exchanges' | 'provider_mappings' | 'all' = 'all'
|
||||
): Promise<PipelineJobResult> {
|
||||
try {
|
||||
const queueManager = this.container.queue;
|
||||
if (!queueManager) {
|
||||
return { success: false, error: 'Queue manager not available' };
|
||||
}
|
||||
|
||||
const exchangesQueue = queueManager.getQueue('exchanges');
|
||||
const job = await exchangesQueue.addJob('clear-postgresql-data', {
|
||||
handler: 'exchanges',
|
||||
operation: 'clear-postgresql-data',
|
||||
payload: { dataType },
|
||||
});
|
||||
|
||||
logger.info('PostgreSQL data clear job queued', { jobId: job.id, dataType });
|
||||
return {
|
||||
success: true,
|
||||
jobId: job.id,
|
||||
message: 'PostgreSQL data clear job queued',
|
||||
};
|
||||
} catch (error) {
|
||||
logger.error('Failed to queue PostgreSQL clear job', { error });
|
||||
return {
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Failed to queue clear job',
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get exchange statistics (waits for result)
|
||||
*/
|
||||
async getExchangeStats(): Promise<PipelineStatsResult> {
|
||||
try {
|
||||
const queueManager = this.container.queue;
|
||||
if (!queueManager) {
|
||||
return { success: false, error: 'Queue manager not available' };
|
||||
}
|
||||
|
||||
const exchangesQueue = queueManager.getQueue('exchanges');
|
||||
const job = await exchangesQueue.addJob('get-exchange-stats', {
|
||||
handler: 'exchanges',
|
||||
operation: 'get-exchange-stats',
|
||||
payload: {},
|
||||
});
|
||||
|
||||
// Wait for job to complete and return result
|
||||
const result = await job.waitUntilFinished();
|
||||
return { success: true, data: result };
|
||||
} catch (error) {
|
||||
logger.error('Failed to get exchange stats', { error });
|
||||
return {
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Failed to get stats',
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get provider mapping statistics (waits for result)
|
||||
*/
|
||||
async getProviderMappingStats(): Promise<PipelineStatsResult> {
|
||||
try {
|
||||
const queueManager = this.container.queue;
|
||||
if (!queueManager) {
|
||||
return { success: false, error: 'Queue manager not available' };
|
||||
}
|
||||
|
||||
const exchangesQueue = queueManager.getQueue('exchanges');
|
||||
const job = await exchangesQueue.addJob('get-provider-mapping-stats', {
|
||||
handler: 'exchanges',
|
||||
operation: 'get-provider-mapping-stats',
|
||||
payload: {},
|
||||
});
|
||||
|
||||
// Wait for job to complete and return result
|
||||
const result = await job.waitUntilFinished();
|
||||
return { success: true, data: result };
|
||||
} catch (error) {
|
||||
logger.error('Failed to get provider mapping stats', { error });
|
||||
return {
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Failed to get stats',
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
93
apps/stock/web-api/src/types/monitoring.types.ts
Normal file
93
apps/stock/web-api/src/types/monitoring.types.ts
Normal file
|
|
@ -0,0 +1,93 @@
|
|||
/**
|
||||
* Monitoring types for system health and metrics
|
||||
*/
|
||||
|
||||
export interface CacheStats {
|
||||
provider: string;
|
||||
connected: boolean;
|
||||
uptime?: number;
|
||||
memoryUsage?: {
|
||||
used: number;
|
||||
peak: number;
|
||||
total?: number;
|
||||
};
|
||||
stats?: {
|
||||
hits: number;
|
||||
misses: number;
|
||||
keys: number;
|
||||
evictedKeys?: number;
|
||||
expiredKeys?: number;
|
||||
};
|
||||
info?: Record<string, any>;
|
||||
}
|
||||
|
||||
export interface QueueStats {
|
||||
name: string;
|
||||
connected: boolean;
|
||||
jobs: {
|
||||
waiting: number;
|
||||
active: number;
|
||||
completed: number;
|
||||
failed: number;
|
||||
delayed: number;
|
||||
paused: number;
|
||||
};
|
||||
workers?: {
|
||||
count: number;
|
||||
concurrency: number;
|
||||
};
|
||||
throughput?: {
|
||||
processed: number;
|
||||
failed: number;
|
||||
avgProcessingTime?: number;
|
||||
};
|
||||
}
|
||||
|
||||
export interface DatabaseStats {
|
||||
type: 'postgres' | 'mongodb' | 'questdb';
|
||||
name: string;
|
||||
connected: boolean;
|
||||
latency?: number;
|
||||
pool?: {
|
||||
size: number;
|
||||
active: number;
|
||||
idle: number;
|
||||
waiting?: number;
|
||||
max: number;
|
||||
};
|
||||
stats?: Record<string, any>;
|
||||
}
|
||||
|
||||
export interface SystemHealth {
|
||||
status: 'healthy' | 'degraded' | 'unhealthy';
|
||||
timestamp: string;
|
||||
uptime: number;
|
||||
memory: {
|
||||
used: number;
|
||||
total: number;
|
||||
percentage: number;
|
||||
};
|
||||
cpu?: {
|
||||
usage: number;
|
||||
loadAverage?: number[];
|
||||
};
|
||||
services: {
|
||||
cache: CacheStats;
|
||||
queues: QueueStats[];
|
||||
databases: DatabaseStats[];
|
||||
};
|
||||
errors?: string[];
|
||||
}
|
||||
|
||||
export interface MetricSnapshot {
|
||||
timestamp: string;
|
||||
value: number;
|
||||
unit?: string;
|
||||
}
|
||||
|
||||
export interface ServiceMetrics {
|
||||
requestsPerSecond: MetricSnapshot;
|
||||
averageResponseTime: MetricSnapshot;
|
||||
errorRate: MetricSnapshot;
|
||||
activeConnections: MetricSnapshot;
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue