From ddcf94a587e33d884578024e8aabda30651342b0 Mon Sep 17 00:00:00 2001 From: Boki Date: Sat, 14 Jun 2025 13:02:37 -0400 Subject: [PATCH] added intial processing service --- apps/processing-service/package.json | 3 +- apps/processing-service/src/index.ts | 136 +++++++++++++----- .../src/indicators/indicators.ts | 81 ----------- .../src/routes/health.routes.ts | 44 ++++++ apps/processing-service/src/routes/index.ts | 5 + .../src/routes/processing.routes.ts | 55 +++++++ apps/processing-service/src/services/index.ts | 4 + .../src/services/processing.service.ts | 113 +++++++++++++++ 8 files changed, 325 insertions(+), 116 deletions(-) delete mode 100644 apps/processing-service/src/indicators/indicators.ts create mode 100644 apps/processing-service/src/routes/health.routes.ts create mode 100644 apps/processing-service/src/routes/index.ts create mode 100644 apps/processing-service/src/routes/processing.routes.ts create mode 100644 apps/processing-service/src/services/index.ts create mode 100644 apps/processing-service/src/services/processing.service.ts diff --git a/apps/processing-service/package.json b/apps/processing-service/package.json index 1bfe342..0d45f89 100644 --- a/apps/processing-service/package.json +++ b/apps/processing-service/package.json @@ -5,7 +5,7 @@ "main": "dist/index.js", "type": "module", "scripts": { - "devvvvv": "bun --watch src/index.ts", + "dev": "bun --watch src/index.ts", "build": "bun build src/index.ts --outdir dist --target node", "start": "bun dist/index.js", "test": "bun test", @@ -14,6 +14,7 @@ "dependencies": { "@stock-bot/config": "*", "@stock-bot/logger": "*", + "@stock-bot/shutdown": "*", "@stock-bot/types": "*", "@stock-bot/utils": "*", "@stock-bot/event-bus": "*", diff --git a/apps/processing-service/src/index.ts b/apps/processing-service/src/index.ts index 62d953e..e1680d5 100644 --- a/apps/processing-service/src/index.ts +++ b/apps/processing-service/src/index.ts @@ -1,10 +1,12 @@ /** - * Processing Service - Technical indicators and data processing + * Processing Service - Data processing and technical indicators service */ -import { serve } from '@hono/node-server'; import { Hono } from 'hono'; import { loadEnvVariables } from '@stock-bot/config'; -import { getLogger } from '@stock-bot/logger'; +import { getLogger, shutdownLoggers } from '@stock-bot/logger'; +import { Shutdown } from '@stock-bot/shutdown'; +import { healthRoutes, processingRoutes } from './routes'; +import { processingServiceManager } from './services'; // Load environment variables loadEnvVariables(); @@ -12,43 +14,109 @@ loadEnvVariables(); const app = new Hono(); const logger = getLogger('processing-service'); const PORT = parseInt(process.env.PROCESSING_SERVICE_PORT || '3003'); +let server: ReturnType | null = null; -// Health check endpoint -app.get('/health', c => { - return c.json({ - service: 'processing-service', - status: 'healthy', - timestamp: new Date().toISOString(), - }); -}); +// Initialize shutdown manager with 15 second timeout +const shutdown = Shutdown.getInstance({ timeout: 15000 }); -// Technical indicators endpoint -app.post('/api/indicators', async c => { - const body = await c.req.json(); - logger.info('Technical indicators request', { indicators: body.indicators }); +// Register all routes +app.route('', healthRoutes); +app.route('/api/processing', processingRoutes); - // TODO: Implement technical indicators processing - return c.json({ - message: 'Technical indicators endpoint - not implemented yet', - requestedIndicators: body.indicators, - }); -}); +// Initialize services +async function initializeServices() { + logger.info('Initializing processing service...'); -// Vectorized processing endpoint -app.post('/api/vectorized/process', async c => { - const body = await c.req.json(); - logger.info('Vectorized processing request', { dataPoints: body.data?.length }); + try { + // Initialize processing service manager + logger.info('Starting processing service manager initialization...'); + await processingServiceManager.initialize(); + logger.info('Processing service manager initialized'); - // TODO: Implement vectorized processing - return c.json({ - message: 'Vectorized processing endpoint - not implemented yet', - }); -}); + // TODO: Add other service initializations here as needed + // - MongoDB client for reading/writing processed data + // - Event bus for listening to data events + // - Technical indicators engines + // - Vector engines for similarity calculations + + logger.info('All services initialized successfully'); + } catch (error) { + logger.error('Failed to initialize services', { error }); + throw error; + } +} // Start server -serve({ - fetch: app.fetch, - port: PORT, +async function startServer() { + await initializeServices(); + + // Start the HTTP server using Bun's native serve + server = Bun.serve({ + port: PORT, + fetch: app.fetch, + development: process.env.NODE_ENV === 'development', + }); + + logger.info(`Processing Service started on port ${PORT}`); +} + +// Register shutdown handlers +shutdown.onShutdown(async () => { + if (server) { + logger.info('Stopping HTTP server...'); + try { + server.stop(); + logger.info('HTTP server stopped successfully'); + } catch (error) { + logger.error('Error stopping HTTP server', { error }); + } + } }); -logger.info(`Processing Service started on port ${PORT}`); +shutdown.onShutdown(async () => { + logger.info('Shutting down processing service manager...'); + try { + await processingServiceManager.shutdown(); + logger.info('Processing service manager shut down successfully'); + } catch (error) { + logger.error('Error shutting down processing service manager', { error }); + } +}); + +shutdown.onShutdown(async () => { + logger.info('Shutting down loggers...'); + try { + await shutdownLoggers(); + logger.info('Loggers shut down successfully'); + } catch (error) { + logger.error('Error shutting down loggers', { error }); + } +}); + +// Handle uncaught exceptions and unhandled rejections +process.on('uncaughtException', (error) => { + logger.error('Uncaught exception', { error }); + shutdown.shutdownAndExit('uncaughtException', 1); +}); + +process.on('unhandledRejection', (reason, promise) => { + logger.error('Unhandled rejection', { reason, promise }); + shutdown.shutdownAndExit('unhandledRejection', 1); +}); + +// Handle shutdown signals +process.on('SIGINT', () => { + logger.info('Received SIGINT signal'); + shutdown.shutdownAndExit('SIGINT', 0); +}); + +process.on('SIGTERM', () => { + logger.info('Received SIGTERM signal'); + shutdown.shutdownAndExit('SIGTERM', 0); +}); + +// Start the service +startServer().catch((error) => { + logger.error('Failed to start processing service', { error }); + process.exit(1); +}); diff --git a/apps/processing-service/src/indicators/indicators.ts b/apps/processing-service/src/indicators/indicators.ts deleted file mode 100644 index 60b05c1..0000000 --- a/apps/processing-service/src/indicators/indicators.ts +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Technical Indicators Service - * Leverages @stock-bot/utils for calculations - */ -import { getLogger } from '@stock-bot/logger'; -import { ema, macd, rsi, sma } from '@stock-bot/utils'; - -const logger = getLogger('indicators-service'); - -export interface IndicatorRequest { - symbol: string; - data: number[]; - indicators: string[]; - parameters?: Record; -} - -export interface IndicatorResult { - symbol: string; - timestamp: Date; - indicators: Record; -} - -export class IndicatorsService { - async calculateIndicators(request: IndicatorRequest): Promise { - logger.info('Calculating indicators', { - symbol: request.symbol, - indicators: request.indicators, - dataPoints: request.data.length, - }); - - const results: Record = {}; - - for (const indicator of request.indicators) { - try { - switch (indicator.toLowerCase()) { - case 'sma': { - const smaPeriod = request.parameters?.smaPeriod || 20; - results.sma = sma(request.data, smaPeriod); - break; - } - - case 'ema': { - const emaPeriod = request.parameters?.emaPeriod || 20; - results.ema = ema(request.data, emaPeriod); - break; - } - - case 'rsi': { - const rsiPeriod = request.parameters?.rsiPeriod || 14; - results.rsi = rsi(request.data, rsiPeriod); - break; - } - - case 'macd': { - const fast = request.parameters?.macdFast || 12; - const slow = request.parameters?.macdSlow || 26; - const signal = request.parameters?.macdSignal || 9; - results.macd = macd(request.data, fast, slow, signal).macd; - break; - } - - case 'stochastic': - // TODO: Implement stochastic oscillator - logger.warn('Stochastic oscillator not implemented yet'); - break; - - default: - logger.warn('Unknown indicator requested', { indicator }); - } - } catch (error) { - logger.error('Error calculating indicator', { indicator, error }); - } - } - - return { - symbol: request.symbol, - timestamp: new Date(), - indicators: results, - }; - } -} diff --git a/apps/processing-service/src/routes/health.routes.ts b/apps/processing-service/src/routes/health.routes.ts new file mode 100644 index 0000000..cd9f8aa --- /dev/null +++ b/apps/processing-service/src/routes/health.routes.ts @@ -0,0 +1,44 @@ +/** + * Health and status routes for processing service + */ +import { Hono } from 'hono'; + +const healthRoutes = new Hono(); + +// Health check endpoint +healthRoutes.get('/health', (c) => { + return c.json({ + status: 'healthy', + service: 'processing-service', + timestamp: new Date().toISOString(), + version: '1.0.0' + }); +}); + +// Detailed status endpoint +healthRoutes.get('/status', (c) => { + return c.json({ + service: 'processing-service', + status: 'running', + uptime: process.uptime(), + memory: process.memoryUsage(), + timestamp: new Date().toISOString(), + environment: process.env.NODE_ENV || 'development' + }); +}); + +// Ready check endpoint +healthRoutes.get('/ready', (c) => { + // TODO: Add checks for service dependencies + // - Database connections + // - Event bus connections + // - Required resources + + return c.json({ + status: 'ready', + service: 'processing-service', + timestamp: new Date().toISOString() + }); +}); + +export { healthRoutes }; diff --git a/apps/processing-service/src/routes/index.ts b/apps/processing-service/src/routes/index.ts new file mode 100644 index 0000000..f271d84 --- /dev/null +++ b/apps/processing-service/src/routes/index.ts @@ -0,0 +1,5 @@ +/** + * Route exports for processing service + */ +export { healthRoutes } from './health.routes'; +export { processingRoutes } from './processing.routes'; diff --git a/apps/processing-service/src/routes/processing.routes.ts b/apps/processing-service/src/routes/processing.routes.ts new file mode 100644 index 0000000..24807ef --- /dev/null +++ b/apps/processing-service/src/routes/processing.routes.ts @@ -0,0 +1,55 @@ +/** + * Processing routes for data processing operations + */ +import { Hono } from 'hono'; +import { getLogger } from '@stock-bot/logger'; +import { processingServiceManager } from '../services'; + +const processingRoutes = new Hono(); +const logger = getLogger('processing-routes'); + +// Process data endpoint +processingRoutes.post('/process', async (c) => { + try { + const body = await c.req.json(); + + logger.info('Processing request received', { + dataType: body.type, + recordCount: body.data?.length || 0 + }); + + // Use processing service manager to handle the request + const result = await processingServiceManager.processData( + body.type || 'unknown', + body.data || [] + ); + + return c.json({ + status: 'success', + message: 'Data processing completed', + result, + timestamp: new Date().toISOString() + }); + + } catch (error) { + logger.error('Processing error', { error }); + return c.json({ + status: 'error', + message: 'Processing failed', + error: error instanceof Error ? error.message : 'Unknown error' + }, 500); + } +}); + +// Get processing status +processingRoutes.get('/status', (c) => { + const status = processingServiceManager.getStatus(); + return c.json({ + ...status, + activeJobs: 0, // TODO: Implement job tracking + queueLength: 0, // TODO: Implement queue monitoring + lastProcessed: null, // TODO: Track last processing time + }); +}); + +export { processingRoutes }; diff --git a/apps/processing-service/src/services/index.ts b/apps/processing-service/src/services/index.ts new file mode 100644 index 0000000..a57545c --- /dev/null +++ b/apps/processing-service/src/services/index.ts @@ -0,0 +1,4 @@ +/** + * Services exports for processing service + */ +export { ProcessingServiceManager, processingServiceManager } from './processing.service'; diff --git a/apps/processing-service/src/services/processing.service.ts b/apps/processing-service/src/services/processing.service.ts new file mode 100644 index 0000000..ba4ecbb --- /dev/null +++ b/apps/processing-service/src/services/processing.service.ts @@ -0,0 +1,113 @@ +/** + * Processing Service Manager + * + * Manages the core processing operations for the processing service + */ +import { getLogger } from '@stock-bot/logger'; + +const logger = getLogger('processing-service-manager'); + +export class ProcessingServiceManager { + private isInitialized = false; + + async initialize(): Promise { + if (this.isInitialized) { + logger.warn('Processing service manager already initialized'); + return; + } + + logger.info('Initializing processing service manager...'); + + try { + // TODO: Initialize processing components + // - Technical indicators engine + // - Data transformation pipeline + // - Event listeners for data events + // - Job queues for processing tasks + + this.isInitialized = true; + logger.info('Processing service manager initialized successfully'); + } catch (error) { + logger.error('Failed to initialize processing service manager', { error }); + throw error; + } + } + + async shutdown(): Promise { + if (!this.isInitialized) { + logger.warn('Processing service manager not initialized, nothing to shutdown'); + return; + } + + logger.info('Shutting down processing service manager...'); + + try { + // TODO: Cleanup processing components + // - Stop job processing + // - Close database connections + // - Cleanup event listeners + + this.isInitialized = false; + logger.info('Processing service manager shut down successfully'); + } catch (error) { + logger.error('Error shutting down processing service manager', { error }); + throw error; + } + } + + /** + * Process data with technical indicators + */ + async processData(dataType: string, data: unknown[]): Promise<{ + status: string; + dataType: string; + inputCount: number; + outputCount: number; + processedAt: Date; + processingTime: number; + }> { + if (!this.isInitialized) { + throw new Error('Processing service manager not initialized'); + } + + logger.info(`Processing ${data.length} records of type: ${dataType}`); + + try { + // TODO: Implement actual processing logic + // - Apply technical indicators + // - Calculate signals + // - Transform data format + // - Save processed results + + const result = { + status: 'success', + dataType, + inputCount: data.length, + outputCount: data.length, // Placeholder + processedAt: new Date(), + processingTime: 0 // Placeholder + }; + + logger.info('Data processing completed', result); + return result; + + } catch (error) { + logger.error('Data processing failed', { error, dataType, inputCount: data.length }); + throw error; + } + } + + /** + * Get processing service status + */ + getStatus() { + return { + initialized: this.isInitialized, + status: this.isInitialized ? 'ready' : 'not_initialized', + timestamp: new Date().toISOString() + }; + } +} + +// Export singleton instance +export const processingServiceManager = new ProcessingServiceManager();