From d9404c2bda14a67cbfee1f91d9c9f5a2ebd0a262 Mon Sep 17 00:00:00 2001 From: Boki Date: Wed, 11 Jun 2025 08:03:55 -0400 Subject: [PATCH] added env back and fixed up queue service --- .env | 162 ++++++++++++++++++ .gitignore | 7 - apps/data-service/src/index.ts | 33 ++-- apps/data-service/src/routes/queue.routes.ts | 17 +- .../src/services/queue.service.ts | 72 ++++++-- 5 files changed, 258 insertions(+), 33 deletions(-) create mode 100644 .env diff --git a/.env b/.env new file mode 100644 index 0000000..b1ae1aa --- /dev/null +++ b/.env @@ -0,0 +1,162 @@ +# =========================================== +# STOCK BOT PLATFORM - ENVIRONMENT VARIABLES +# =========================================== + +# Core Application Settings +NODE_ENV=development +LOG_LEVEL=info + +# Data Service Configuration +DATA_SERVICE_PORT=2001 + +# Queue and Worker Configuration +WORKER_COUNT=5 +WORKER_CONCURRENCY=20 + +# =========================================== +# DATABASE CONFIGURATIONS +# =========================================== + +# Dragonfly/Redis Configuration +DRAGONFLY_HOST=localhost +DRAGONFLY_PORT=6379 +DRAGONFLY_PASSWORD= + +# PostgreSQL Configuration +POSTGRES_HOST=localhost +POSTGRES_PORT=5432 +POSTGRES_DB=stockbot +POSTGRES_USER=postgres +POSTGRES_PASSWORD=postgres +POSTGRES_SSL=false + +# QuestDB Configuration +QUESTDB_HOST=localhost +QUESTDB_PORT=9000 +QUESTDB_DB=qdb +QUESTDB_USER=admin +QUESTDB_PASSWORD=quest + +# MongoDB Configuration +MONGODB_HOST=localhost +MONGODB_PORT=27017 +MONGODB_DB=stockbot +MONGODB_USER= +MONGODB_PASSWORD= +MONGODB_URI=mongodb://localhost:27017/stockbot + +# =========================================== +# DATA PROVIDER CONFIGURATIONS +# =========================================== + +# Proxy Configuration +PROXY_VALIDATION_HOURS=24 +PROXY_BATCH_SIZE=100 +PROXY_DIRECT_MODE=false + +# Yahoo Finance (if using API keys) +YAHOO_API_KEY= +YAHOO_API_SECRET= + +# QuoteMedia Configuration +QUOTEMEDIA_API_KEY= +QUOTEMEDIA_BASE_URL=https://api.quotemedia.com + +# =========================================== +# TRADING PLATFORM INTEGRATIONS +# =========================================== + +# Alpaca Trading +ALPACA_API_KEY= +ALPACA_SECRET_KEY= +ALPACA_BASE_URL=https://paper-api.alpaca.markets +ALPACA_PAPER_TRADING=true + +# Polygon.io +POLYGON_API_KEY= +POLYGON_BASE_URL=https://api.polygon.io + +# =========================================== +# RISK MANAGEMENT +# =========================================== + +# Risk Management Settings +MAX_POSITION_SIZE=10000 +MAX_DAILY_LOSS=1000 +MAX_PORTFOLIO_EXPOSURE=0.8 +STOP_LOSS_PERCENTAGE=0.02 +TAKE_PROFIT_PERCENTAGE=0.05 + +# =========================================== +# MONITORING AND OBSERVABILITY +# =========================================== + +# Prometheus Configuration +PROMETHEUS_HOST=localhost +PROMETHEUS_PORT=9090 +PROMETHEUS_METRICS_PORT=9091 +PROMETHEUS_PUSHGATEWAY_URL=http://localhost:9091 + +# Grafana Configuration +GRAFANA_HOST=localhost +GRAFANA_PORT=3000 +GRAFANA_ADMIN_USER=admin +GRAFANA_ADMIN_PASSWORD=admin + +# Loki Logging +LOKI_HOST=localhost +LOKI_PORT=3100 +LOKI_URL=http://localhost:3100 + +# =========================================== +# CACHE CONFIGURATION +# =========================================== + +# Cache Settings +CACHE_TTL=300 +CACHE_MAX_ITEMS=10000 +CACHE_ENABLED=true + +# =========================================== +# SECURITY SETTINGS +# =========================================== + +# JWT Configuration +JWT_SECRET=your-super-secret-jwt-key-change-this-in-production +JWT_EXPIRES_IN=24h + +# API Rate Limiting +RATE_LIMIT_WINDOW=15 +RATE_LIMIT_MAX_REQUESTS=100 + +# =========================================== +# DEVELOPMENT SETTINGS +# =========================================== + +# Debug Settings +DEBUG_MODE=false +VERBOSE_LOGGING=false + +# Development Tools +HOT_RELOAD=true +SOURCE_MAPS=true + +# =========================================== +# DOCKER CONFIGURATION +# =========================================== + +# Docker-specific settings (used in docker-compose) +COMPOSE_PROJECT_NAME=stock-bot +DOCKER_BUILDKIT=1 + +# =========================================== +# MISCELLANEOUS +# =========================================== + +# Timezone +TZ=UTC + +# Application Metadata +APP_NAME=Stock Bot Platform +APP_VERSION=1.0.0 +APP_DESCRIPTION=Advanced Stock Trading and Analysis Platform diff --git a/.gitignore b/.gitignore index 6079a5f..60fc550 100644 --- a/.gitignore +++ b/.gitignore @@ -11,13 +11,6 @@ build/ *.d.ts -# Environment variables -.env -.env.local -.env.development.local -.env.test.local -.env.production.local - # Logs npm-debug.log* yarn-debug.log* diff --git a/apps/data-service/src/index.ts b/apps/data-service/src/index.ts index 47133df..0d1e025 100644 --- a/apps/data-service/src/index.ts +++ b/apps/data-service/src/index.ts @@ -4,7 +4,7 @@ import { getLogger } from '@stock-bot/logger'; import { loadEnvVariables } from '@stock-bot/config'; import { Hono } from 'hono'; -import { onShutdown, setShutdownTimeout } from '@stock-bot/shutdown'; +import { Shutdown } from '@stock-bot/shutdown'; import { queueManager } from './services/queue.service'; import { initializeBatchCache } from './utils/batch-helpers'; import { initializeProxyCache } from './providers/proxy.tasks'; @@ -24,6 +24,9 @@ const logger = getLogger('data-service'); const PORT = parseInt(process.env.DATA_SERVICE_PORT || '3002'); let server: any = null; +// Initialize shutdown manager with 15 second timeout +const shutdown = Shutdown.getInstance({ timeout: 15000 }); + // Register all routes app.route('', healthRoutes); app.route('', queueRoutes); @@ -70,26 +73,34 @@ async function startServer() { logger.info(`Data Service started on port ${PORT}`); } -// Setup shutdown handling -setShutdownTimeout(15000); - -// Register cleanup for HTTP server -onShutdown(async () => { +// Register shutdown handlers +shutdown.onShutdown(async () => { if (server) { logger.info('Stopping HTTP server...'); - server.stop(); + try { + server.stop(); + logger.info('HTTP server stopped successfully'); + } catch (error) { + logger.error('Error stopping HTTP server', { error }); + } } }); -// Register cleanup for queue manager -onShutdown(async () => { +shutdown.onShutdown(async () => { logger.info('Shutting down queue manager...'); - await queueManager.shutdown(); + try { + await queueManager.shutdown(); + logger.info('Queue manager shut down successfully'); + } catch (error) { + logger.error('Error shutting down queue manager', { error }); + throw error; // Re-throw to mark shutdown as failed + } }); +// Start the application startServer().catch(error => { logger.error('Failed to start server', { error }); process.exit(1); }); -logger.info('Shutdown handlers registered'); \ No newline at end of file +logger.info('Data service startup initiated with graceful shutdown handlers'); \ No newline at end of file diff --git a/apps/data-service/src/routes/queue.routes.ts b/apps/data-service/src/routes/queue.routes.ts index cef4317..994335d 100644 --- a/apps/data-service/src/routes/queue.routes.ts +++ b/apps/data-service/src/routes/queue.routes.ts @@ -34,7 +34,8 @@ queueRoutes.post('/api/queue/job', async (c) => { // Provider registry endpoints queueRoutes.get('/api/providers', async (c) => { try { - const providers = queueManager.getRegisteredProviders(); + const { providerRegistry } = await import('../services/provider-registry.service'); + const providers = providerRegistry.getProviders(); return c.json({ status: 'success', providers }); } catch (error) { logger.error('Failed to get providers', { error }); @@ -45,7 +46,8 @@ queueRoutes.get('/api/providers', async (c) => { // Add new endpoint to see scheduled jobs queueRoutes.get('/api/scheduled-jobs', async (c) => { try { - const jobs = queueManager.getScheduledJobsInfo(); + const { providerRegistry } = await import('../services/provider-registry.service'); + const jobs = providerRegistry.getAllScheduledJobs(); return c.json({ status: 'success', count: jobs.length, @@ -56,3 +58,14 @@ queueRoutes.get('/api/scheduled-jobs', async (c) => { return c.json({ status: 'error', message: 'Failed to get scheduled jobs' }, 500); } }); + +queueRoutes.post('/api/queue/drain', async (c) => { + try { + await queueManager.drainQueue(); + const status = await queueManager.getQueueStatus(); + return c.json({ status: 'success', message: 'Queue drained', queueStatus: status }); + } catch (error) { + logger.error('Failed to drain queue', { error }); + return c.json({ status: 'error', message: 'Failed to drain queue' }, 500); + } +}); diff --git a/apps/data-service/src/services/queue.service.ts b/apps/data-service/src/services/queue.service.ts index 29b4f3e..cce2548 100644 --- a/apps/data-service/src/services/queue.service.ts +++ b/apps/data-service/src/services/queue.service.ts @@ -292,7 +292,6 @@ export class QueueService { delayed: delayed.length }; } - async drainQueue() { if (this.isInitialized) { await this.queue.drain(); @@ -309,24 +308,71 @@ export class QueueService { workers: this.workers.length, concurrency: this.getTotalConcurrency() }; - } - async shutdown() { + } async shutdown() { if (!this.isInitialized) { this.logger.warn('Queue service not initialized, nothing to shutdown'); return; } - this.logger.info('Shutting down queue service'); + this.logger.info('Shutting down queue service gracefully...'); - // Close all workers - await Promise.all(this.workers.map((worker, index) => { - this.logger.debug(`Closing worker ${index + 1}`); - return worker.close(); - })); - - await this.queue.close(); - await this.queueEvents.close(); - this.logger.info('Queue service shutdown complete'); + try { + // Step 1: Stop accepting new jobs and wait for current jobs to finish + this.logger.debug('Closing workers gracefully...'); + const workerClosePromises = this.workers.map(async (worker, index) => { + this.logger.debug(`Closing worker ${index + 1}/${this.workers.length}`); + try { + // Wait for current jobs to finish, then close + await Promise.race([ + worker.close(), + new Promise((_, reject) => + setTimeout(() => reject(new Error(`Worker ${index + 1} close timeout`)), 5000) + ) + ]); + this.logger.debug(`Worker ${index + 1} closed successfully`); + } catch (error) { + this.logger.error(`Failed to close worker ${index + 1}`, { error }); + // Force close if graceful close fails + await worker.close(true); + } + }); + + await Promise.allSettled(workerClosePromises); + this.logger.debug('All workers closed'); + + // Step 2: Close queue and events with timeout protection + this.logger.debug('Closing queue and events...'); + await Promise.allSettled([ + Promise.race([ + this.queue.close(), + new Promise((_, reject) => + setTimeout(() => reject(new Error('Queue close timeout')), 3000) + ) + ]).catch(error => this.logger.error('Queue close error', { error })), + + Promise.race([ + this.queueEvents.close(), + new Promise((_, reject) => + setTimeout(() => reject(new Error('QueueEvents close timeout')), 3000) + ) + ]).catch(error => this.logger.error('QueueEvents close error', { error })) + ]); + + this.logger.info('Queue service shutdown completed successfully'); + } catch (error) { + this.logger.error('Error during queue service shutdown', { error }); + // Force close everything as last resort + try { + await Promise.allSettled([ + ...this.workers.map(worker => worker.close(true)), + this.queue.close(), + this.queueEvents.close() + ]); + } catch (forceCloseError) { + this.logger.error('Force close also failed', { error: forceCloseError }); + } + throw error; + } } }