socket reruns

This commit is contained in:
Boki 2025-07-04 17:04:47 -04:00
parent a876f3c35b
commit 11c6c19628
29 changed files with 3921 additions and 233 deletions

View file

@ -1,13 +1,10 @@
/**
* Stock Bot Web API
* Simplified entry point using ServiceApplication framework
* Entry point with WebSocket support
*/
import { ServiceApplication } from '@stock-bot/di';
import { getLogger } from '@stock-bot/logger';
import { initializeStockConfig } from '@stock-bot/stock-config';
// Local imports
import { createRoutes } from './routes/create-routes';
// Initialize configuration with service-specific overrides
const config = initializeStockConfig('webApi');
@ -23,66 +20,10 @@ if (config.queue) {
const logger = getLogger('web-api');
logger.info('Service configuration:', config);
// Create service application
const app = new ServiceApplication(
config,
{
serviceName: 'web-api',
enableHandlers: false, // Web API doesn't use handlers
enableScheduledJobs: false, // Web API doesn't use scheduled jobs
corsConfig: {
origin: ['http://localhost:4200', 'http://localhost:3000', 'http://localhost:3002', 'http://localhost:5173', 'http://localhost:5174'],
allowMethods: ['GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS'],
allowHeaders: ['Content-Type', 'Authorization'],
credentials: true,
},
serviceMetadata: {
version: '1.0.0',
description: 'Stock Bot REST API',
endpoints: {
health: '/health',
exchanges: '/api/exchanges',
},
},
},
{
// Custom lifecycle hooks
onStarted: _port => {
const logger = getLogger('web-api');
logger.info('Web API service startup initiated with ServiceApplication framework');
},
}
);
// Container factory function
async function createContainer(config: any) {
const { ServiceContainerBuilder } = await import('@stock-bot/di');
const container = await new ServiceContainerBuilder()
.withConfig(config)
.withOptions({
enableQuestDB: false, // Disable QuestDB for now
enableMongoDB: true,
enablePostgres: true,
enableCache: true,
enableQueue: true, // Enable for pipeline operations
enableBrowser: false, // Web API doesn't need browser
enableProxy: false, // Web API doesn't need proxy
})
.build(); // This automatically initializes services
// Run database migrations
if (container.postgres) {
const { runMigrations } = await import('./migrations/migration-runner');
await runMigrations(container);
}
return container;
}
// Start the service
app.start(createContainer, createRoutes).catch(error => {
const logger = getLogger('web-api');
logger.fatal('Failed to start web API service', { error });
process.exit(1);
// Import and start WebSocket-enabled server
import('./server-with-websocket').then(({ startServerWithWebSocket }) => {
startServerWithWebSocket().catch(error => {
logger.fatal('Failed to start web API service with WebSocket', { error });
process.exit(1);
});
});

View file

@ -0,0 +1,178 @@
import { Router } from 'express';
import { BacktestServiceV2 } from '../services/backtest-v2.service';
import type { IServiceContainer } from '@stock-bot/handlers';
import { getLogger } from '@stock-bot/logger';
const logger = getLogger('backtests-v2-router');
export function createBacktestsV2Router(container: IServiceContainer): Router {
const router = Router();
const backtestService = new BacktestServiceV2(container);
// Backtest endpoints
router.get('/backtests', async (req, res) => {
try {
const limit = parseInt(req.query.limit as string) || 50;
const offset = parseInt(req.query.offset as string) || 0;
const backtests = await backtestService.listBacktests({ limit, offset });
res.json(backtests);
} catch (error) {
logger.error('Failed to list backtests', error);
res.status(500).json({ error: 'Failed to list backtests' });
}
});
router.get('/backtests/:id', async (req, res) => {
try {
const backtest = await backtestService.getBacktest(req.params.id);
if (!backtest) {
return res.status(404).json({ error: 'Backtest not found' });
}
res.json(backtest);
} catch (error) {
logger.error('Failed to get backtest', error);
res.status(500).json({ error: 'Failed to get backtest' });
}
});
router.post('/backtests', async (req, res) => {
try {
const backtest = await backtestService.createBacktest(req.body);
res.status(201).json(backtest);
} catch (error) {
logger.error('Failed to create backtest', error);
res.status(500).json({ error: 'Failed to create backtest' });
}
});
router.put('/backtests/:id', async (req, res) => {
try {
const backtest = await backtestService.updateBacktest(req.params.id, req.body);
if (!backtest) {
return res.status(404).json({ error: 'Backtest not found' });
}
res.json(backtest);
} catch (error) {
logger.error('Failed to update backtest', error);
res.status(500).json({ error: 'Failed to update backtest' });
}
});
router.delete('/backtests/:id', async (req, res) => {
try {
await backtestService.deleteBacktest(req.params.id);
res.status(204).send();
} catch (error) {
logger.error('Failed to delete backtest', error);
res.status(500).json({ error: 'Failed to delete backtest' });
}
});
// Run endpoints
router.get('/backtests/:backtestId/runs', async (req, res) => {
try {
const runs = await backtestService.listRuns(req.params.backtestId);
res.json(runs);
} catch (error) {
logger.error('Failed to list runs', error);
res.status(500).json({ error: 'Failed to list runs' });
}
});
router.post('/backtests/:backtestId/runs', async (req, res) => {
try {
const run = await backtestService.createRun({
backtestId: req.params.backtestId,
speedMultiplier: req.body.speedMultiplier
});
res.status(201).json(run);
} catch (error) {
logger.error('Failed to create run', error);
res.status(500).json({ error: 'Failed to create run' });
}
});
router.get('/runs/:id', async (req, res) => {
try {
const run = await backtestService.getRun(req.params.id);
if (!run) {
return res.status(404).json({ error: 'Run not found' });
}
res.json(run);
} catch (error) {
logger.error('Failed to get run', error);
res.status(500).json({ error: 'Failed to get run' });
}
});
router.get('/runs/:id/results', async (req, res) => {
try {
const results = await backtestService.getRunResults(req.params.id);
if (!results) {
return res.status(404).json({ error: 'Results not found' });
}
res.json(results);
} catch (error) {
logger.error('Failed to get run results', error);
res.status(500).json({ error: 'Failed to get run results' });
}
});
router.post('/runs/:id/pause', async (req, res) => {
try {
await backtestService.pauseRun(req.params.id);
res.json({ message: 'Run paused' });
} catch (error) {
logger.error('Failed to pause run', error);
res.status(500).json({ error: 'Failed to pause run' });
}
});
router.post('/runs/:id/resume', async (req, res) => {
try {
await backtestService.resumeRun(req.params.id);
res.json({ message: 'Run resumed' });
} catch (error) {
logger.error('Failed to resume run', error);
res.status(500).json({ error: 'Failed to resume run' });
}
});
router.post('/runs/:id/cancel', async (req, res) => {
try {
await backtestService.cancelRun(req.params.id);
res.json({ message: 'Run cancelled' });
} catch (error) {
logger.error('Failed to cancel run', error);
res.status(500).json({ error: 'Failed to cancel run' });
}
});
router.put('/runs/:id/speed', async (req, res) => {
try {
await backtestService.updateRunSpeed(req.params.id, req.body.speedMultiplier);
res.json({ message: 'Speed updated' });
} catch (error) {
logger.error('Failed to update run speed', error);
res.status(500).json({ error: 'Failed to update run speed' });
}
});
// WebSocket endpoint for real-time run updates
router.ws('/runs/:id/stream', (ws, req) => {
const runId = req.params.id;
logger.info('WebSocket connection established for run', { runId });
// TODO: Implement real-time updates
ws.on('message', (msg) => {
logger.debug('Received WebSocket message', { runId, msg });
});
ws.on('close', () => {
logger.info('WebSocket connection closed', { runId });
});
});
return router;
}

View file

@ -0,0 +1,196 @@
import { Hono } from 'hono';
import type { IServiceContainer } from '@stock-bot/handlers';
import { BacktestServiceV2 } from '../services/backtest-v2.service';
import { getLogger } from '@stock-bot/logger';
const logger = getLogger('backtests-v2-routes');
export function createBacktestV2Routes(container: IServiceContainer) {
const app = new Hono();
const backtestService = new BacktestServiceV2(container);
// Backtest endpoints
app.get('/api/v2/backtests', async (c) => {
try {
const limit = parseInt(c.req.query('limit') || '50');
const offset = parseInt(c.req.query('offset') || '0');
const backtests = await backtestService.listBacktests({ limit, offset });
return c.json(backtests);
} catch (error) {
logger.error('Failed to list backtests', error);
return c.json({ error: 'Failed to list backtests' }, 500);
}
});
app.get('/api/v2/backtests/:id', async (c) => {
try {
const backtest = await backtestService.getBacktest(c.req.param('id'));
if (!backtest) {
return c.json({ error: 'Backtest not found' }, 404);
}
return c.json(backtest);
} catch (error) {
logger.error('Failed to get backtest', error);
return c.json({ error: 'Failed to get backtest' }, 500);
}
});
app.post('/api/v2/backtests', async (c) => {
try {
const body = await c.req.json();
const backtest = await backtestService.createBacktest(body);
return c.json(backtest, 201);
} catch (error) {
logger.error('Failed to create backtest', error);
return c.json({ error: 'Failed to create backtest' }, 500);
}
});
app.put('/api/v2/backtests/:id', async (c) => {
try {
const body = await c.req.json();
const backtest = await backtestService.updateBacktest(c.req.param('id'), body);
if (!backtest) {
return c.json({ error: 'Backtest not found' }, 404);
}
return c.json(backtest);
} catch (error) {
logger.error('Failed to update backtest', error);
return c.json({ error: 'Failed to update backtest' }, 500);
}
});
app.delete('/api/v2/backtests/:id', async (c) => {
try {
await backtestService.deleteBacktest(c.req.param('id'));
return c.text('', 204);
} catch (error) {
logger.error('Failed to delete backtest', error);
return c.json({ error: 'Failed to delete backtest' }, 500);
}
});
// Run endpoints
app.get('/api/v2/backtests/:backtestId/runs', async (c) => {
try {
const runs = await backtestService.listRuns(c.req.param('backtestId'));
return c.json(runs);
} catch (error) {
logger.error('Failed to list runs', error);
return c.json({ error: 'Failed to list runs' }, 500);
}
});
app.post('/api/v2/backtests/:backtestId/runs', async (c) => {
try {
const body = await c.req.json();
const run = await backtestService.createRun({
backtestId: c.req.param('backtestId'),
speedMultiplier: body.speedMultiplier
});
return c.json(run, 201);
} catch (error) {
logger.error('Failed to create run', error);
return c.json({ error: 'Failed to create run' }, 500);
}
});
app.get('/api/v2/runs/:id', async (c) => {
try {
const run = await backtestService.getRun(c.req.param('id'));
if (!run) {
return c.json({ error: 'Run not found' }, 404);
}
return c.json(run);
} catch (error) {
logger.error('Failed to get run', error);
return c.json({ error: 'Failed to get run' }, 500);
}
});
app.get('/api/v2/runs/:id/results', async (c) => {
try {
const results = await backtestService.getRunResults(c.req.param('id'));
if (!results) {
return c.json({ error: 'Results not found' }, 404);
}
return c.json(results);
} catch (error) {
logger.error('Failed to get run results', error);
return c.json({ error: 'Failed to get run results' }, 500);
}
});
app.post('/api/v2/runs/:id/pause', async (c) => {
try {
await backtestService.pauseRun(c.req.param('id'));
return c.json({ message: 'Run paused' });
} catch (error) {
logger.error('Failed to pause run', error);
return c.json({ error: 'Failed to pause run' }, 500);
}
});
app.post('/api/v2/runs/:id/resume', async (c) => {
try {
await backtestService.resumeRun(c.req.param('id'));
return c.json({ message: 'Run resumed' });
} catch (error) {
logger.error('Failed to resume run', error);
return c.json({ error: 'Failed to resume run' }, 500);
}
});
app.post('/api/v2/runs/:id/cancel', async (c) => {
try {
await backtestService.cancelRun(c.req.param('id'));
return c.json({ message: 'Run cancelled' });
} catch (error) {
logger.error('Failed to cancel run', error);
return c.json({ error: 'Failed to cancel run' }, 500);
}
});
app.put('/api/v2/runs/:id/speed', async (c) => {
try {
const body = await c.req.json();
await backtestService.updateRunSpeed(c.req.param('id'), body.speedMultiplier);
return c.json({ message: 'Speed updated' });
} catch (error) {
logger.error('Failed to update run speed', error);
return c.json({ error: 'Failed to update run speed' }, 500);
}
});
// Progress update endpoint (called by orchestrator)
app.post('/api/runs/:id/progress', async (c) => {
try {
const runId = c.req.param('id');
const body = await c.req.json();
const { progress, currentDate } = body;
// Update run progress in database
await backtestService.updateRunStatus(runId, 'running', {
progress,
currentDate
});
// Broadcast via WebSocket if handler is available
const wsHandler = (global as any).wsHandler;
if (wsHandler) {
wsHandler.sendProgressUpdate(runId, progress, currentDate);
}
return c.json({ message: 'Progress updated' });
} catch (error) {
logger.error('Failed to update run progress', error);
return c.json({ error: 'Failed to update run progress' }, 500);
}
});
// TODO: WebSocket endpoint for real-time run updates
// This needs additional setup with Hono WebSocket support
return app;
}

View file

@ -4,15 +4,25 @@
*/
import { Hono } from 'hono';
import { cors } from 'hono/cors';
import type { IServiceContainer } from '@stock-bot/handlers';
import { createExchangeRoutes } from './exchange.routes';
import { createHealthRoutes } from './health.routes';
import { createMonitoringRoutes } from './monitoring.routes';
import { createPipelineRoutes } from './pipeline.routes';
import { createBacktestRoutes } from './backtest.routes';
import { createBacktestV2Routes } from './backtests-v2.routes';
export function createRoutes(container: IServiceContainer): Hono {
const app = new Hono();
// Add CORS middleware
app.use('*', cors({
origin: ['http://localhost:4200', 'http://localhost:3000', 'http://localhost:3002', 'http://localhost:5173', 'http://localhost:5174'],
allowMethods: ['GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS'],
allowHeaders: ['Content-Type', 'Authorization'],
credentials: true,
}));
// Create routes with container
const healthRoutes = createHealthRoutes(container);
@ -20,6 +30,7 @@ export function createRoutes(container: IServiceContainer): Hono {
const monitoringRoutes = createMonitoringRoutes(container);
const pipelineRoutes = createPipelineRoutes(container);
const backtestRoutes = createBacktestRoutes(container);
const backtestV2Routes = createBacktestV2Routes(container);
// Mount routes
app.route('/health', healthRoutes);
@ -27,6 +38,7 @@ export function createRoutes(container: IServiceContainer): Hono {
app.route('/api/system/monitoring', monitoringRoutes);
app.route('/api/pipeline', pipelineRoutes);
app.route('/', backtestRoutes); // Mounted at root since routes already have /api prefix
app.route('/', backtestV2Routes); // V2 routes also mounted at root
return app;
}

View file

@ -0,0 +1,47 @@
import type { IServiceContainer } from '@stock-bot/handlers';
import { createWebSocketHandler } from '../websocket/run-updates';
import { getLogger } from '@stock-bot/logger';
const logger = getLogger('websocket-routes');
export function createWebSocketRoute(container: IServiceContainer) {
const wsHandler = createWebSocketHandler(container);
// Make the handler available globally for other services
(global as any).wsHandler = wsHandler;
return (request: Request, server: any) => {
const url = new URL(request.url);
const runId = url.searchParams.get('runId');
if (!runId) {
return new Response('Missing runId parameter', { status: 400 });
}
logger.info(`WebSocket upgrade request for run: ${runId}`);
// Upgrade the connection to WebSocket
const success = server.upgrade(request, {
data: { runId }
});
if (success) {
logger.info(`WebSocket upgrade successful for run: ${runId}`);
// Return undefined to indicate successful upgrade
return;
} else {
logger.error(`WebSocket upgrade failed for run: ${runId}`);
return new Response('WebSocket upgrade failed', { status: 500 });
}
};
}
// WebSocket connection handler for Bun
export function handleWebSocketConnection(ws: any) {
const { runId } = ws.data;
const wsHandler = (global as any).wsHandler;
if (wsHandler && runId) {
wsHandler.handleConnection(ws, runId);
}
}

View file

@ -0,0 +1,131 @@
import { getLogger } from '@stock-bot/logger';
import { initializeStockConfig } from '@stock-bot/stock-config';
import { createRoutes } from './routes/create-routes';
import { createWebSocketRoute, handleWebSocketConnection } from './routes/websocket.routes';
import type { IServiceContainer } from '@stock-bot/handlers';
// Initialize configuration with service-specific overrides
const config = initializeStockConfig('webApi');
// Override queue settings for web-api (no workers needed)
if (config.queue) {
config.queue.workers = 0;
config.queue.concurrency = 0;
config.queue.enableScheduledJobs = false;
}
// Log the full configuration
const logger = getLogger('web-api');
logger.info('Service configuration:', config);
// Container factory function
async function createContainer(config: any) {
const { ServiceContainerBuilder } = await import('@stock-bot/di');
const container = await new ServiceContainerBuilder()
.withConfig(config)
.withOptions({
enableQuestDB: false,
enableMongoDB: true,
enablePostgres: true,
enableCache: true,
enableQueue: true,
enableBrowser: false,
enableProxy: false,
})
.build();
// Run database migrations
if (container.postgres) {
const { runMigrations } = await import('./migrations/migration-runner');
await runMigrations(container);
}
return container;
}
// Custom server start function with WebSocket support
export async function startServerWithWebSocket() {
let container: IServiceContainer | null = null;
try {
// Create container
const diContainer = await createContainer(config);
container = diContainer.resolve('serviceContainer');
// Create HTTP routes
const routes = createRoutes(container);
// Create WebSocket route handler
const wsRoute = createWebSocketRoute(container);
// Start server with WebSocket support
const port = config.service.port || 2003;
logger.info(`Starting server on port ${port}...`);
let server;
try {
server = Bun.serve({
port,
// Handle HTTP requests
async fetch(req, server) {
const url = new URL(req.url);
// Check if this is a WebSocket upgrade request
if (url.pathname === '/ws' && req.headers.get('upgrade') === 'websocket') {
return wsRoute(req, server);
}
// Otherwise handle as normal HTTP request
return routes.fetch(req);
},
// WebSocket handlers
websocket: {
open(ws) {
handleWebSocketConnection(ws);
},
message(ws, message) {
// Message handling is done in the WebSocket handler
// No need to echo messages back
},
close(ws, code, reason) {
logger.info('WebSocket closed', { code, reason });
},
error(ws, error) {
logger.error('WebSocket error', error);
}
},
development: config.environment === 'development',
});
} catch (error) {
if (error instanceof Error && error.message.includes('port')) {
logger.error(`Port ${port} is already in use. Please stop any other servers running on this port.`);
throw new Error(`Port ${port} is already in use`);
}
throw error;
}
logger.info(`Web API service with WebSocket support started on port ${server.port}`);
logger.info(`WebSocket endpoint available at ws://localhost:${server.port}/ws`);
// Handle shutdown
process.on('SIGINT', async () => {
logger.info('Shutting down server...');
server.stop();
process.exit(0);
});
} catch (error) {
logger.fatal('Failed to start web API service', { error });
process.exit(1);
}
}
// Export the function - don't start automatically

View file

@ -0,0 +1,569 @@
import { v4 as uuidv4 } from 'uuid';
import { getLogger } from '@stock-bot/logger';
import type { IServiceContainer } from '@stock-bot/handlers';
import type { WebSocketHandler } from '../websocket/run-updates';
const logger = getLogger('backtest-v2-service');
// Use environment variable or default
const ORCHESTRATOR_URL = process.env.ORCHESTRATOR_URL || 'http://localhost:2004';
export interface BacktestRequest {
name: string;
strategy: string;
symbols: string[];
startDate: string;
endDate: string;
initialCapital: number;
config?: Record<string, any>;
}
export interface Backtest {
id: string;
name: string;
strategy: string;
symbols: string[];
startDate: Date;
endDate: Date;
initialCapital: number;
config: Record<string, any>;
createdAt: Date;
updatedAt: Date;
}
export interface Run {
id: string;
backtestId: string;
runNumber: number;
status: 'pending' | 'running' | 'completed' | 'failed' | 'cancelled' | 'paused';
speedMultiplier: number;
error?: string;
startedAt?: Date;
completedAt?: Date;
pausedAt?: Date;
progress: number;
currentDate?: Date;
createdAt: Date;
updatedAt: Date;
}
export interface RunRequest {
backtestId: string;
speedMultiplier?: number | null;
}
export class BacktestServiceV2 {
private container: IServiceContainer;
private activeRuns: Map<string, any> = new Map(); // Track active WebSocket connections
private wsHandler: WebSocketHandler | null = null;
constructor(container: IServiceContainer) {
this.container = container;
logger.info('BacktestServiceV2 initialized');
}
// Backtest CRUD operations
async createBacktest(request: BacktestRequest): Promise<Backtest> {
const backtestId = uuidv4();
const result = await this.container.postgres.query(
`INSERT INTO backtests
(id, name, strategy, symbols, start_date, end_date, initial_capital, config)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING *`,
[
backtestId,
request.name,
request.strategy,
JSON.stringify(request.symbols),
request.startDate,
request.endDate,
request.initialCapital,
JSON.stringify(request.config || {})
]
);
const backtest = this.mapBacktest(result.rows[0]);
// Automatically create and start a run for the new backtest
try {
await this.createRun({
backtestId: backtest.id,
speedMultiplier: null // Max speed (instant)
});
} catch (error) {
logger.error('Failed to auto-create run for new backtest', { backtestId: backtest.id, error });
// Don't fail the backtest creation if run creation fails
}
return backtest;
}
async getBacktest(id: string): Promise<Backtest | null> {
const result = await this.container.postgres.query(
'SELECT * FROM backtests WHERE id = $1',
[id]
);
if (result.rows.length === 0) {
return null;
}
return this.mapBacktest(result.rows[0]);
}
async updateBacktest(id: string, request: Partial<BacktestRequest>): Promise<Backtest | null> {
const updates: string[] = [];
const values: any[] = [];
let paramCount = 1;
if (request.name !== undefined) {
updates.push(`name = $${paramCount++}`);
values.push(request.name);
}
if (request.strategy !== undefined) {
updates.push(`strategy = $${paramCount++}`);
values.push(request.strategy);
}
if (request.symbols !== undefined) {
updates.push(`symbols = $${paramCount++}`);
values.push(JSON.stringify(request.symbols));
}
if (request.startDate !== undefined) {
updates.push(`start_date = $${paramCount++}`);
values.push(request.startDate);
}
if (request.endDate !== undefined) {
updates.push(`end_date = $${paramCount++}`);
values.push(request.endDate);
}
if (request.initialCapital !== undefined) {
updates.push(`initial_capital = $${paramCount++}`);
values.push(request.initialCapital);
}
if (request.config !== undefined) {
updates.push(`config = $${paramCount++}`);
values.push(JSON.stringify(request.config));
}
if (updates.length === 0) {
return this.getBacktest(id);
}
values.push(id);
const result = await this.container.postgres.query(
`UPDATE backtests
SET ${updates.join(', ')}, updated_at = NOW()
WHERE id = $${paramCount}
RETURNING *`,
values
);
if (result.rows.length === 0) {
return null;
}
return this.mapBacktest(result.rows[0]);
}
async listBacktests(params: { limit: number; offset: number }): Promise<Backtest[]> {
const result = await this.container.postgres.query(
`SELECT * FROM backtests
ORDER BY created_at DESC
LIMIT $1 OFFSET $2`,
[params.limit, params.offset]
);
return result.rows.map(row => this.mapBacktest(row));
}
async deleteBacktest(id: string): Promise<void> {
await this.container.postgres.query(
'DELETE FROM backtests WHERE id = $1',
[id]
);
}
// Run operations
async createRun(request: RunRequest): Promise<Run> {
const runId = uuidv4();
// Get the next run number for this backtest
const runNumberResult = await this.container.postgres.query(
'SELECT COALESCE(MAX(run_number), 0) + 1 as next_run_number FROM runs WHERE backtest_id = $1',
[request.backtestId]
);
const runNumber = runNumberResult.rows[0].next_run_number;
const result = await this.container.postgres.query(
`INSERT INTO runs
(id, backtest_id, run_number, speed_multiplier, status)
VALUES ($1, $2, $3, $4, 'pending')
RETURNING *`,
[
runId,
request.backtestId,
runNumber,
request.speedMultiplier ?? 1.0
]
);
const run = this.mapRun(result.rows[0]);
// Start the run immediately
this.startRun(run);
return run;
}
async getRun(id: string): Promise<Run | null> {
const result = await this.container.postgres.query(
'SELECT * FROM runs WHERE id = $1',
[id]
);
if (result.rows.length === 0) {
return null;
}
return this.mapRun(result.rows[0]);
}
async listRuns(backtestId: string): Promise<Run[]> {
const result = await this.container.postgres.query(
`SELECT * FROM runs
WHERE backtest_id = $1
ORDER BY run_number DESC`,
[backtestId]
);
return result.rows.map(row => this.mapRun(row));
}
async updateRunStatus(
id: string,
status: Run['status'],
updates?: {
error?: string;
progress?: number;
currentDate?: Date;
speedMultiplier?: number;
}
): Promise<void> {
const setClauses = ['status = $2', 'updated_at = NOW()'];
const values: any[] = [id, status];
let paramCount = 3;
if (status === 'running' && !updates?.currentDate) {
setClauses.push(`started_at = NOW()`);
}
if (status === 'completed' || status === 'failed' || status === 'cancelled') {
setClauses.push(`completed_at = NOW()`);
}
if (status === 'paused') {
setClauses.push(`paused_at = NOW()`);
}
if (updates?.error !== undefined) {
setClauses.push(`error = $${paramCount++}`);
values.push(updates.error);
}
if (updates?.progress !== undefined) {
setClauses.push(`progress = $${paramCount++}`);
values.push(updates.progress);
}
if (updates?.currentDate !== undefined) {
setClauses.push(`current_simulation_date = $${paramCount++}`);
values.push(updates.currentDate);
}
if (updates?.speedMultiplier !== undefined) {
setClauses.push(`speed_multiplier = $${paramCount++}`);
values.push(updates.speedMultiplier);
}
await this.container.postgres.query(
`UPDATE runs SET ${setClauses.join(', ')} WHERE id = $1`,
values
);
// Send WebSocket update if handler is available
if (this.getWebSocketHandler()) {
this.getWebSocketHandler().broadcastRunUpdate(id, {
type: 'run_update',
runId: id,
data: {
status,
...updates,
timestamp: new Date().toISOString()
}
});
}
}
async pauseRun(id: string): Promise<void> {
await this.updateRunStatus(id, 'paused');
// Send pause command to orchestrator
try {
const response = await fetch(`${ORCHESTRATOR_URL}/api/backtest/pause`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' }
});
if (!response.ok) {
logger.warn('Failed to pause backtest in orchestrator');
}
} catch (error) {
logger.error('Error pausing backtest in orchestrator', error);
}
}
async resumeRun(id: string): Promise<void> {
const run = await this.getRun(id);
if (!run || run.status !== 'paused') {
throw new Error('Run is not paused');
}
await this.updateRunStatus(id, 'running');
// Send resume command to orchestrator
try {
const response = await fetch(`${ORCHESTRATOR_URL}/api/backtest/resume`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' }
});
if (!response.ok) {
logger.warn('Failed to resume backtest in orchestrator');
}
} catch (error) {
logger.error('Error resuming backtest in orchestrator', error);
}
}
async cancelRun(id: string): Promise<void> {
await this.updateRunStatus(id, 'cancelled');
// TODO: Send cancel command to orchestrator
}
async updateRunSpeed(id: string, speedMultiplier: number | null): Promise<void> {
await this.updateRunStatus(id, 'running', { speedMultiplier });
// Send speed update to orchestrator
try {
const response = await fetch(`${ORCHESTRATOR_URL}/api/backtest/speed`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ speed: speedMultiplier })
});
if (!response.ok) {
logger.warn('Failed to update backtest speed in orchestrator');
}
} catch (error) {
logger.error('Error updating backtest speed in orchestrator', error);
}
}
// Get run results
async getRunResults(runId: string): Promise<any> {
const result = await this.container.postgres.query(
`SELECT
br.*,
r.backtest_id,
b.strategy,
b.symbols,
b.start_date,
b.end_date,
b.initial_capital,
b.config as backtest_config
FROM backtest_results br
JOIN runs r ON r.id = br.run_id
JOIN backtests b ON b.id = r.backtest_id
WHERE br.run_id = $1`,
[runId]
);
if (result.rows.length === 0) {
return null;
}
const row = result.rows[0];
return {
runId: row.run_id,
backtestId: row.backtest_id,
status: 'completed',
completedAt: row.completed_at.toISOString(),
config: {
name: row.backtest_config?.name || 'Backtest',
strategy: row.strategy,
symbols: row.symbols,
startDate: row.start_date.toISOString(),
endDate: row.end_date.toISOString(),
initialCapital: parseFloat(row.initial_capital),
commission: row.backtest_config?.commission ?? 0.001,
slippage: row.backtest_config?.slippage ?? 0.0001,
dataFrequency: row.backtest_config?.dataFrequency || '1d',
},
metrics: row.metrics,
equity: row.equity_curve,
ohlcData: row.ohlc_data,
trades: row.trades,
positions: row.positions,
analytics: row.analytics,
executionTime: row.execution_time,
};
}
// Private methods
private async startRun(run: Run): Promise<void> {
const backtest = await this.getBacktest(run.backtestId);
if (!backtest) {
throw new Error('Backtest not found');
}
await this.updateRunStatus(run.id, 'running', { progress: 0 });
// Send initial WebSocket update
if (this.getWebSocketHandler()) {
this.getWebSocketHandler().sendProgressUpdate(run.id, 0);
}
try {
// Call orchestrator to run backtest
const response = await fetch(`${ORCHESTRATOR_URL}/api/backtest/run`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
runId: run.id,
mode: 'backtest',
startDate: new Date(backtest.startDate).toISOString(),
endDate: new Date(backtest.endDate).toISOString(),
symbols: backtest.symbols,
strategy: backtest.strategy,
initialCapital: backtest.initialCapital,
dataFrequency: backtest.config?.dataFrequency || '1d',
commission: backtest.config?.commission ?? 0.001,
slippage: backtest.config?.slippage ?? 0.0001,
speed: run.speedMultiplier === null ? 'max' :
run.speedMultiplier >= 10 ? '10x' :
run.speedMultiplier >= 5 ? '5x' :
run.speedMultiplier >= 2 ? '2x' : 'realtime',
fillModel: {
slippage: 'realistic',
marketImpact: true,
partialFills: true
}
}),
});
if (!response.ok) {
const errorText = await response.text();
logger.error('Orchestrator request failed', {
status: response.status,
statusText: response.statusText,
error: errorText,
request: {
runId: run.id,
strategy: backtest.strategy,
symbols: backtest.symbols,
startDate: new Date(backtest.startDate).toISOString(),
endDate: new Date(backtest.endDate).toISOString(),
}
});
throw new Error(`Orchestrator returned ${response.status}: ${errorText}`);
}
const result = await response.json();
if (result.status === 'completed') {
await this.updateRunStatus(run.id, 'completed', { progress: 100 });
await this.saveRunResults(run.id, result);
// Send completion via WebSocket
if (this.getWebSocketHandler()) {
this.getWebSocketHandler().sendCompletion(run.id, result);
}
}
} catch (error) {
logger.error('Failed to start run', { runId: run.id, error });
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
await this.updateRunStatus(run.id, 'failed', {
error: errorMessage
});
// Send error via WebSocket
if (this.getWebSocketHandler()) {
this.getWebSocketHandler().sendError(run.id, errorMessage);
}
}
}
private async saveRunResults(runId: string, result: any): Promise<void> {
try {
await this.container.postgres.query(
`INSERT INTO backtest_results
(run_id, completed_at, metrics, equity_curve, ohlc_data, trades, positions, analytics, execution_time)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)`,
[
runId,
result.completedAt || new Date(),
JSON.stringify(result.metrics || {}),
JSON.stringify(result.equity || result.equityCurve || []),
JSON.stringify(result.ohlcData || {}),
JSON.stringify(result.trades || []),
JSON.stringify(result.positions || result.finalPositions || {}),
JSON.stringify(result.analytics || {}),
result.executionTime || 0
]
);
} catch (error) {
logger.error('Failed to save run results', { runId, error });
throw error;
}
}
private mapBacktest(row: any): Backtest {
return {
id: row.id,
name: row.name,
strategy: row.strategy,
symbols: row.symbols,
startDate: row.start_date,
endDate: row.end_date,
initialCapital: parseFloat(row.initial_capital),
config: row.config,
createdAt: row.created_at,
updatedAt: row.updated_at,
};
}
private getWebSocketHandler(): WebSocketHandler | null {
// Try to get from global if not already set
if (!this.wsHandler && (global as any).wsHandler) {
this.wsHandler = (global as any).wsHandler;
}
return this.wsHandler;
}
private mapRun(row: any): Run {
return {
id: row.id,
backtestId: row.backtest_id,
runNumber: row.run_number,
status: row.status,
speedMultiplier: parseFloat(row.speed_multiplier),
error: row.error,
startedAt: row.started_at,
completedAt: row.completed_at,
pausedAt: row.paused_at,
progress: parseFloat(row.progress),
currentDate: row.current_simulation_date,
createdAt: row.created_at,
updatedAt: row.updated_at,
};
}
}

View file

@ -0,0 +1,160 @@
import { getLogger } from '@stock-bot/logger';
import type { IServiceContainer } from '@stock-bot/handlers';
const logger = getLogger('websocket-run-updates');
// Store active WebSocket connections per run
// Using 'any' type for Bun WebSocket compatibility
const runConnections = new Map<string, Set<any>>();
export interface RunUpdateMessage {
type: 'run_update' | 'progress' | 'error' | 'completed';
runId: string;
data: any;
}
export function createWebSocketHandler(container: IServiceContainer) {
return {
// Handle new WebSocket connection (ws is Bun's WebSocket type)
handleConnection(ws: any, runId: string) {
logger.info(`WebSocket connected for run: ${runId}`);
// Add connection to the run's connection set
if (!runConnections.has(runId)) {
runConnections.set(runId, new Set());
}
runConnections.get(runId)!.add(ws);
// Send initial connection confirmation
ws.send(JSON.stringify({
type: 'connected',
runId,
timestamp: new Date().toISOString()
}));
// In Bun, WebSocket handlers are set directly as properties
ws.onmessage = (event: MessageEvent) => {
try {
const message = JSON.parse(event.data.toString());
logger.debug('Received WebSocket message:', message);
// Handle different message types
switch (message.type) {
case 'ping':
ws.send(JSON.stringify({ type: 'pong' }));
break;
case 'subscribe':
// Already subscribed to this run
break;
default:
logger.warn('Unknown message type:', message.type);
}
} catch (error) {
logger.error('Error handling WebSocket message:', error);
}
};
// Handle connection close
ws.onclose = (event: any) => {
logger.info(`WebSocket disconnected for run: ${runId}`, { code: event.code, reason: event.reason });
const connections = runConnections.get(runId);
if (connections) {
connections.delete(ws);
if (connections.size === 0) {
runConnections.delete(runId);
}
}
};
// Handle errors
ws.onerror = (error: Event) => {
logger.error(`WebSocket error for run ${runId}:`, error);
};
},
// Broadcast update to all connected clients for a run
broadcastRunUpdate(runId: string, update: Partial<RunUpdateMessage>) {
const connections = runConnections.get(runId);
if (!connections || connections.size === 0) {
return;
}
const message: RunUpdateMessage = {
type: 'run_update',
runId,
data: update.data || update,
...update
};
const messageStr = JSON.stringify(message);
connections.forEach(ws => {
try {
// Bun WebSocket - just try to send, it will throw if not open
ws.send(messageStr);
} catch (error) {
// Connection is closed, ignore
}
});
},
// Send progress update
sendProgressUpdate(runId: string, progress: number, currentDate?: string) {
this.broadcastRunUpdate(runId, {
type: 'progress',
data: {
progress,
currentDate,
timestamp: new Date().toISOString()
}
});
},
// Send error
sendError(runId: string, error: string) {
this.broadcastRunUpdate(runId, {
type: 'error',
data: {
error,
timestamp: new Date().toISOString()
}
});
},
// Send completion
sendCompletion(runId: string, results?: any) {
this.broadcastRunUpdate(runId, {
type: 'completed',
data: {
results,
timestamp: new Date().toISOString()
}
});
// Clean up connections after completion
setTimeout(() => {
const connections = runConnections.get(runId);
if (connections) {
connections.forEach(ws => ws.close());
runConnections.delete(runId);
}
}, 5000);
},
// Get active connections count
getConnectionsCount(runId: string): number {
return runConnections.get(runId)?.size || 0;
},
// Clean up all connections
cleanup() {
runConnections.forEach((connections, runId) => {
connections.forEach(ws => ws.close());
});
runConnections.clear();
}
};
}
// Export the WebSocket handler type
export type WebSocketHandler = ReturnType<typeof createWebSocketHandler>;