added intial processing service

This commit is contained in:
Boki 2025-06-14 13:02:37 -04:00
parent cbef304045
commit ddcf94a587
8 changed files with 325 additions and 116 deletions

View file

@ -5,7 +5,7 @@
"main": "dist/index.js",
"type": "module",
"scripts": {
"devvvvv": "bun --watch src/index.ts",
"dev": "bun --watch src/index.ts",
"build": "bun build src/index.ts --outdir dist --target node",
"start": "bun dist/index.js",
"test": "bun test",
@ -14,6 +14,7 @@
"dependencies": {
"@stock-bot/config": "*",
"@stock-bot/logger": "*",
"@stock-bot/shutdown": "*",
"@stock-bot/types": "*",
"@stock-bot/utils": "*",
"@stock-bot/event-bus": "*",

View file

@ -1,10 +1,12 @@
/**
* Processing Service - Technical indicators and data processing
* Processing Service - Data processing and technical indicators service
*/
import { serve } from '@hono/node-server';
import { Hono } from 'hono';
import { loadEnvVariables } from '@stock-bot/config';
import { getLogger } from '@stock-bot/logger';
import { getLogger, shutdownLoggers } from '@stock-bot/logger';
import { Shutdown } from '@stock-bot/shutdown';
import { healthRoutes, processingRoutes } from './routes';
import { processingServiceManager } from './services';
// Load environment variables
loadEnvVariables();
@ -12,43 +14,109 @@ loadEnvVariables();
const app = new Hono();
const logger = getLogger('processing-service');
const PORT = parseInt(process.env.PROCESSING_SERVICE_PORT || '3003');
let server: ReturnType<typeof Bun.serve> | null = null;
// Health check endpoint
app.get('/health', c => {
return c.json({
service: 'processing-service',
status: 'healthy',
timestamp: new Date().toISOString(),
});
});
// Initialize shutdown manager with 15 second timeout
const shutdown = Shutdown.getInstance({ timeout: 15000 });
// Technical indicators endpoint
app.post('/api/indicators', async c => {
const body = await c.req.json();
logger.info('Technical indicators request', { indicators: body.indicators });
// Register all routes
app.route('', healthRoutes);
app.route('/api/processing', processingRoutes);
// TODO: Implement technical indicators processing
return c.json({
message: 'Technical indicators endpoint - not implemented yet',
requestedIndicators: body.indicators,
});
});
// Initialize services
async function initializeServices() {
logger.info('Initializing processing service...');
// Vectorized processing endpoint
app.post('/api/vectorized/process', async c => {
const body = await c.req.json();
logger.info('Vectorized processing request', { dataPoints: body.data?.length });
try {
// Initialize processing service manager
logger.info('Starting processing service manager initialization...');
await processingServiceManager.initialize();
logger.info('Processing service manager initialized');
// TODO: Implement vectorized processing
return c.json({
message: 'Vectorized processing endpoint - not implemented yet',
});
});
// TODO: Add other service initializations here as needed
// - MongoDB client for reading/writing processed data
// - Event bus for listening to data events
// - Technical indicators engines
// - Vector engines for similarity calculations
logger.info('All services initialized successfully');
} catch (error) {
logger.error('Failed to initialize services', { error });
throw error;
}
}
// Start server
serve({
fetch: app.fetch,
port: PORT,
async function startServer() {
await initializeServices();
// Start the HTTP server using Bun's native serve
server = Bun.serve({
port: PORT,
fetch: app.fetch,
development: process.env.NODE_ENV === 'development',
});
logger.info(`Processing Service started on port ${PORT}`);
}
// Register shutdown handlers
shutdown.onShutdown(async () => {
if (server) {
logger.info('Stopping HTTP server...');
try {
server.stop();
logger.info('HTTP server stopped successfully');
} catch (error) {
logger.error('Error stopping HTTP server', { error });
}
}
});
logger.info(`Processing Service started on port ${PORT}`);
shutdown.onShutdown(async () => {
logger.info('Shutting down processing service manager...');
try {
await processingServiceManager.shutdown();
logger.info('Processing service manager shut down successfully');
} catch (error) {
logger.error('Error shutting down processing service manager', { error });
}
});
shutdown.onShutdown(async () => {
logger.info('Shutting down loggers...');
try {
await shutdownLoggers();
logger.info('Loggers shut down successfully');
} catch (error) {
logger.error('Error shutting down loggers', { error });
}
});
// Handle uncaught exceptions and unhandled rejections
process.on('uncaughtException', (error) => {
logger.error('Uncaught exception', { error });
shutdown.shutdownAndExit('uncaughtException', 1);
});
process.on('unhandledRejection', (reason, promise) => {
logger.error('Unhandled rejection', { reason, promise });
shutdown.shutdownAndExit('unhandledRejection', 1);
});
// Handle shutdown signals
process.on('SIGINT', () => {
logger.info('Received SIGINT signal');
shutdown.shutdownAndExit('SIGINT', 0);
});
process.on('SIGTERM', () => {
logger.info('Received SIGTERM signal');
shutdown.shutdownAndExit('SIGTERM', 0);
});
// Start the service
startServer().catch((error) => {
logger.error('Failed to start processing service', { error });
process.exit(1);
});

View file

@ -1,81 +0,0 @@
/**
* Technical Indicators Service
* Leverages @stock-bot/utils for calculations
*/
import { getLogger } from '@stock-bot/logger';
import { ema, macd, rsi, sma } from '@stock-bot/utils';
const logger = getLogger('indicators-service');
export interface IndicatorRequest {
symbol: string;
data: number[];
indicators: string[];
parameters?: Record<string, any>;
}
export interface IndicatorResult {
symbol: string;
timestamp: Date;
indicators: Record<string, number[]>;
}
export class IndicatorsService {
async calculateIndicators(request: IndicatorRequest): Promise<IndicatorResult> {
logger.info('Calculating indicators', {
symbol: request.symbol,
indicators: request.indicators,
dataPoints: request.data.length,
});
const results: Record<string, number[]> = {};
for (const indicator of request.indicators) {
try {
switch (indicator.toLowerCase()) {
case 'sma': {
const smaPeriod = request.parameters?.smaPeriod || 20;
results.sma = sma(request.data, smaPeriod);
break;
}
case 'ema': {
const emaPeriod = request.parameters?.emaPeriod || 20;
results.ema = ema(request.data, emaPeriod);
break;
}
case 'rsi': {
const rsiPeriod = request.parameters?.rsiPeriod || 14;
results.rsi = rsi(request.data, rsiPeriod);
break;
}
case 'macd': {
const fast = request.parameters?.macdFast || 12;
const slow = request.parameters?.macdSlow || 26;
const signal = request.parameters?.macdSignal || 9;
results.macd = macd(request.data, fast, slow, signal).macd;
break;
}
case 'stochastic':
// TODO: Implement stochastic oscillator
logger.warn('Stochastic oscillator not implemented yet');
break;
default:
logger.warn('Unknown indicator requested', { indicator });
}
} catch (error) {
logger.error('Error calculating indicator', { indicator, error });
}
}
return {
symbol: request.symbol,
timestamp: new Date(),
indicators: results,
};
}
}

View file

@ -0,0 +1,44 @@
/**
* Health and status routes for processing service
*/
import { Hono } from 'hono';
const healthRoutes = new Hono();
// Health check endpoint
healthRoutes.get('/health', (c) => {
return c.json({
status: 'healthy',
service: 'processing-service',
timestamp: new Date().toISOString(),
version: '1.0.0'
});
});
// Detailed status endpoint
healthRoutes.get('/status', (c) => {
return c.json({
service: 'processing-service',
status: 'running',
uptime: process.uptime(),
memory: process.memoryUsage(),
timestamp: new Date().toISOString(),
environment: process.env.NODE_ENV || 'development'
});
});
// Ready check endpoint
healthRoutes.get('/ready', (c) => {
// TODO: Add checks for service dependencies
// - Database connections
// - Event bus connections
// - Required resources
return c.json({
status: 'ready',
service: 'processing-service',
timestamp: new Date().toISOString()
});
});
export { healthRoutes };

View file

@ -0,0 +1,5 @@
/**
* Route exports for processing service
*/
export { healthRoutes } from './health.routes';
export { processingRoutes } from './processing.routes';

View file

@ -0,0 +1,55 @@
/**
* Processing routes for data processing operations
*/
import { Hono } from 'hono';
import { getLogger } from '@stock-bot/logger';
import { processingServiceManager } from '../services';
const processingRoutes = new Hono();
const logger = getLogger('processing-routes');
// Process data endpoint
processingRoutes.post('/process', async (c) => {
try {
const body = await c.req.json();
logger.info('Processing request received', {
dataType: body.type,
recordCount: body.data?.length || 0
});
// Use processing service manager to handle the request
const result = await processingServiceManager.processData(
body.type || 'unknown',
body.data || []
);
return c.json({
status: 'success',
message: 'Data processing completed',
result,
timestamp: new Date().toISOString()
});
} catch (error) {
logger.error('Processing error', { error });
return c.json({
status: 'error',
message: 'Processing failed',
error: error instanceof Error ? error.message : 'Unknown error'
}, 500);
}
});
// Get processing status
processingRoutes.get('/status', (c) => {
const status = processingServiceManager.getStatus();
return c.json({
...status,
activeJobs: 0, // TODO: Implement job tracking
queueLength: 0, // TODO: Implement queue monitoring
lastProcessed: null, // TODO: Track last processing time
});
});
export { processingRoutes };

View file

@ -0,0 +1,4 @@
/**
* Services exports for processing service
*/
export { ProcessingServiceManager, processingServiceManager } from './processing.service';

View file

@ -0,0 +1,113 @@
/**
* Processing Service Manager
*
* Manages the core processing operations for the processing service
*/
import { getLogger } from '@stock-bot/logger';
const logger = getLogger('processing-service-manager');
export class ProcessingServiceManager {
private isInitialized = false;
async initialize(): Promise<void> {
if (this.isInitialized) {
logger.warn('Processing service manager already initialized');
return;
}
logger.info('Initializing processing service manager...');
try {
// TODO: Initialize processing components
// - Technical indicators engine
// - Data transformation pipeline
// - Event listeners for data events
// - Job queues for processing tasks
this.isInitialized = true;
logger.info('Processing service manager initialized successfully');
} catch (error) {
logger.error('Failed to initialize processing service manager', { error });
throw error;
}
}
async shutdown(): Promise<void> {
if (!this.isInitialized) {
logger.warn('Processing service manager not initialized, nothing to shutdown');
return;
}
logger.info('Shutting down processing service manager...');
try {
// TODO: Cleanup processing components
// - Stop job processing
// - Close database connections
// - Cleanup event listeners
this.isInitialized = false;
logger.info('Processing service manager shut down successfully');
} catch (error) {
logger.error('Error shutting down processing service manager', { error });
throw error;
}
}
/**
* Process data with technical indicators
*/
async processData(dataType: string, data: unknown[]): Promise<{
status: string;
dataType: string;
inputCount: number;
outputCount: number;
processedAt: Date;
processingTime: number;
}> {
if (!this.isInitialized) {
throw new Error('Processing service manager not initialized');
}
logger.info(`Processing ${data.length} records of type: ${dataType}`);
try {
// TODO: Implement actual processing logic
// - Apply technical indicators
// - Calculate signals
// - Transform data format
// - Save processed results
const result = {
status: 'success',
dataType,
inputCount: data.length,
outputCount: data.length, // Placeholder
processedAt: new Date(),
processingTime: 0 // Placeholder
};
logger.info('Data processing completed', result);
return result;
} catch (error) {
logger.error('Data processing failed', { error, dataType, inputCount: data.length });
throw error;
}
}
/**
* Get processing service status
*/
getStatus() {
return {
initialized: this.isInitialized,
status: this.isInitialized ? 'ready' : 'not_initialized',
timestamp: new Date().toISOString()
};
}
}
// Export singleton instance
export const processingServiceManager = new ProcessingServiceManager();