From ad5e353ec3249844804a3d05f5175f9fa76b223f Mon Sep 17 00:00:00 2001 From: Boki Date: Sat, 14 Jun 2025 15:42:47 -0400 Subject: [PATCH] fully cleaned things up, few more things to go. --- apps/data-service/src/index.ts | 11 +- apps/data-service/src/routes/health.routes.ts | 4 +- apps/data-service/src/routes/index.ts | 3 - .../src/routes/market-data.routes.ts | 8 +- apps/data-service/src/routes/proxy.routes.ts | 76 ------ apps/data-service/src/routes/queue.routes.ts | 2 +- apps/data-service/src/routes/test.routes.ts | 87 ------- .../src/services/queue-manager.service.ts | 219 ++++-------------- libs/queue/src/index.ts | 1 + libs/queue/src/queue-manager.ts | 125 +++++++++- libs/queue/src/types.ts | 6 + 11 files changed, 180 insertions(+), 362 deletions(-) delete mode 100644 apps/data-service/src/routes/proxy.routes.ts delete mode 100644 apps/data-service/src/routes/test.routes.ts diff --git a/apps/data-service/src/index.ts b/apps/data-service/src/index.ts index b08f31e..75b133e 100644 --- a/apps/data-service/src/index.ts +++ b/apps/data-service/src/index.ts @@ -9,8 +9,8 @@ import { connectMongoDB, disconnectMongoDB } from '@stock-bot/mongodb-client'; import { Shutdown } from '@stock-bot/shutdown'; import { initializeIBResources } from './providers/ib.tasks'; import { initializeProxyResources } from './providers/proxy.tasks'; -import { queueManager } from './services/queue-manager.service'; -import { healthRoutes, marketDataRoutes, proxyRoutes, queueRoutes, testRoutes } from './routes'; +import { initializeQueueManager, shutdownQueueManager } from './services/queue-manager.service'; +import { healthRoutes, queueRoutes } from './routes'; // Load environment variables loadEnvVariables(); @@ -26,9 +26,6 @@ const shutdown = Shutdown.getInstance({ timeout: 15000 }); // Register all routes app.route('', healthRoutes); app.route('', queueRoutes); -app.route('', marketDataRoutes); -app.route('', proxyRoutes); -app.route('', testRoutes); // Initialize services async function initializeServices() { @@ -57,7 +54,7 @@ async function initializeServices() { // Initialize queue manager (includes batch cache initialization) logger.info('Starting queue manager initialization...'); - await queueManager.initialize(); + await initializeQueueManager(); logger.info('Queue manager initialized'); logger.info('All services initialized successfully'); @@ -95,7 +92,7 @@ shutdown.onShutdown(async () => { shutdown.onShutdown(async () => { logger.info('Shutting down queue manager...'); try { - await queueManager.shutdown(); + await shutdownQueueManager(); logger.info('Queue manager shut down successfully'); } catch (error) { logger.error('Error shutting down queue manager', { error }); diff --git a/apps/data-service/src/routes/health.routes.ts b/apps/data-service/src/routes/health.routes.ts index 44d7e21..45d0f7f 100644 --- a/apps/data-service/src/routes/health.routes.ts +++ b/apps/data-service/src/routes/health.routes.ts @@ -2,7 +2,7 @@ * Health check routes */ import { Hono } from 'hono'; -import { queueManager } from '../services/queue.service'; +import { queueManager } from '../services/queue-manager.service'; export const healthRoutes = new Hono(); @@ -14,7 +14,7 @@ healthRoutes.get('/health', c => { timestamp: new Date().toISOString(), queue: { status: 'running', - workers: queueManager.getWorkerCount(), + name: queueManager.getQueueName(), }, }); }); diff --git a/apps/data-service/src/routes/index.ts b/apps/data-service/src/routes/index.ts index 9abc23e..9a1af7d 100644 --- a/apps/data-service/src/routes/index.ts +++ b/apps/data-service/src/routes/index.ts @@ -3,6 +3,3 @@ */ export { healthRoutes } from './health.routes'; export { queueRoutes } from './queue.routes'; -export { marketDataRoutes } from './market-data.routes'; -export { proxyRoutes } from './proxy.routes'; -export { testRoutes } from './test.routes'; diff --git a/apps/data-service/src/routes/market-data.routes.ts b/apps/data-service/src/routes/market-data.routes.ts index 2b34509..8800d15 100644 --- a/apps/data-service/src/routes/market-data.routes.ts +++ b/apps/data-service/src/routes/market-data.routes.ts @@ -3,7 +3,7 @@ */ import { Hono } from 'hono'; import { getLogger } from '@stock-bot/logger'; -import { queueManager } from '../services/queue-manager.service'; +import { processItems, queueManager } from '../services/queue-manager.service'; const logger = getLogger('market-data-routes'); @@ -16,7 +16,7 @@ marketDataRoutes.get('/api/live/:symbol', async c => { try { // Queue job for live data using Yahoo provider - const job = await queueManager.addJob('market-data-live', { + const job = await queueManager.add('market-data-live', { type: 'market-data-live', provider: 'yahoo-finance', operation: 'live-data', @@ -46,7 +46,7 @@ marketDataRoutes.get('/api/historical/:symbol', async c => { const toDate = to ? new Date(to) : new Date(); // Now // Queue job for historical data using Yahoo provider - const job = await queueManager.addJob('market-data-historical', { + const job = await queueManager.add('market-data-historical', { type: 'market-data-historical', provider: 'yahoo-finance', operation: 'historical-data', @@ -94,7 +94,7 @@ marketDataRoutes.post('/api/process-symbols', async c => { useBatching, }); - const result = await queueManager.processSymbols(symbols, { + const result = await processItems(symbols, queueManager, { totalDelayMs, useBatching, batchSize, diff --git a/apps/data-service/src/routes/proxy.routes.ts b/apps/data-service/src/routes/proxy.routes.ts deleted file mode 100644 index 47116e5..0000000 --- a/apps/data-service/src/routes/proxy.routes.ts +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Proxy management routes - */ -import { Hono } from 'hono'; -import { getLogger } from '@stock-bot/logger'; -import { queueManager } from '../services/queue.service'; - -const logger = getLogger('proxy-routes'); - -export const proxyRoutes = new Hono(); - -// Proxy management endpoints -proxyRoutes.post('/api/proxy/fetch', async c => { - try { - const job = await queueManager.addJob({ - type: 'proxy-fetch', - provider: 'proxy-provider', - operation: 'fetch-and-check', - payload: {}, - priority: 5, - }); - - return c.json({ - status: 'success', - jobId: job.id, - message: 'Proxy fetch job queued', - }); - } catch (error) { - logger.error('Failed to queue proxy fetch', { error }); - return c.json({ status: 'error', message: 'Failed to queue proxy fetch' }, 500); - } -}); - -proxyRoutes.post('/api/proxy/check', async c => { - try { - const { proxies } = await c.req.json(); - const job = await queueManager.addJob({ - type: 'proxy-check', - provider: 'proxy-provider', - operation: 'check-specific', - payload: { proxies }, - priority: 8, - }); - - return c.json({ - status: 'success', - jobId: job.id, - message: `Proxy check job queued for ${proxies.length} proxies`, - }); - } catch (error) { - logger.error('Failed to queue proxy check', { error }); - return c.json({ status: 'error', message: 'Failed to queue proxy check' }, 500); - } -}); - -// Get proxy stats via queue -proxyRoutes.get('/api/proxy/stats', async c => { - try { - const job = await queueManager.addJob({ - type: 'proxy-stats', - provider: 'proxy-provider', - operation: 'get-stats', - payload: {}, - priority: 3, - }); - - return c.json({ - status: 'success', - jobId: job.id, - message: 'Proxy stats job queued', - }); - } catch (error) { - logger.error('Failed to queue proxy stats', { error }); - return c.json({ status: 'error', message: 'Failed to queue proxy stats' }, 500); - } -}); diff --git a/apps/data-service/src/routes/queue.routes.ts b/apps/data-service/src/routes/queue.routes.ts index 3c0422e..5a19f8e 100644 --- a/apps/data-service/src/routes/queue.routes.ts +++ b/apps/data-service/src/routes/queue.routes.ts @@ -23,7 +23,7 @@ queueRoutes.get('/api/queue/status', async c => { queueRoutes.post('/api/queue/job', async c => { try { const { name, data, options } = await c.req.json(); - const job = await queueManager.addJob(name, data, options); + const job = await queueManager.add(name, data, options); return c.json({ status: 'success', jobId: job.id }); } catch (error) { logger.error('Failed to add job', { error }); diff --git a/apps/data-service/src/routes/test.routes.ts b/apps/data-service/src/routes/test.routes.ts deleted file mode 100644 index 5581a8f..0000000 --- a/apps/data-service/src/routes/test.routes.ts +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Test and development routes for batch processing - */ -import { Hono } from 'hono'; -import { getLogger } from '@stock-bot/logger'; -import { queueManager } from '../services/queue.service'; - -const logger = getLogger('test-routes'); - -export const testRoutes = new Hono(); - -// Test endpoint for new functional batch processing -testRoutes.post('/api/test/batch-symbols', async c => { - try { - const { symbols, useBatching = false, totalDelayHours = 1 } = await c.req.json(); - const { processItems } = await import('../utils/batch-helpers'); - - if (!symbols || !Array.isArray(symbols)) { - return c.json({ status: 'error', message: 'symbols array is required' }, 400); - } - - const result = await processItems( - symbols, - (symbol, index) => ({ - symbol, - index, - timestamp: new Date().toISOString(), - }), - queueManager, - { - totalDelayHours, - useBatching, - batchSize: 10, - priority: 1, - provider: 'test-provider', - operation: 'live-data', - } - ); - - return c.json({ - status: 'success', - message: 'Batch processing started', - result, - }); - } catch (error) { - logger.error('Failed to start batch symbol processing', { error }); - return c.json({ status: 'error', message: 'Failed to start batch processing' }, 500); - } -}); - -testRoutes.post('/api/test/batch-custom', async c => { - try { - const { items, useBatching = false, totalDelayHours = 0.5 } = await c.req.json(); - const { processItems } = await import('../utils/batch-helpers'); - - if (!items || !Array.isArray(items)) { - return c.json({ status: 'error', message: 'items array is required' }, 400); - } - - const result = await processItems( - items, - (item, index) => ({ - originalItem: item, - processIndex: index, - timestamp: new Date().toISOString(), - }), - queueManager, - { - totalDelayHours, - useBatching, - batchSize: 5, - priority: 1, - provider: 'test-provider', - operation: 'custom-test', - } - ); - - return c.json({ - status: 'success', - message: 'Custom batch processing started', - result, - }); - } catch (error) { - logger.error('Failed to start custom batch processing', { error }); - return c.json({ status: 'error', message: 'Failed to start custom batch processing' }, 500); - } -}); diff --git a/apps/data-service/src/services/queue-manager.service.ts b/apps/data-service/src/services/queue-manager.service.ts index 478c2f0..62deb6f 100644 --- a/apps/data-service/src/services/queue-manager.service.ts +++ b/apps/data-service/src/services/queue-manager.service.ts @@ -1,184 +1,57 @@ /** * Data Service Queue Manager - * Uses the new @stock-bot/queue library with provider registry + * Uses the enhanced @stock-bot/queue library with provider registry */ import { getLogger } from '@stock-bot/logger'; -import type { JobData } from '@stock-bot/queue'; -import { initializeBatchCache, providerRegistry, QueueManager } from '@stock-bot/queue'; +import type { QueueConfig } from '@stock-bot/queue'; +import { processItems, QueueManager } from '@stock-bot/queue'; const logger = getLogger('queue-manager-service'); -class DataServiceQueueManager { - private queueManager: QueueManager; - private isInitialized = false; - - constructor() { - this.queueManager = new QueueManager({ - queueName: 'data-service-queue', - workers: parseInt(process.env.WORKER_COUNT || '5'), - concurrency: parseInt(process.env.WORKER_CONCURRENCY || '20'), - redis: { - host: process.env.DRAGONFLY_HOST || 'localhost', - port: parseInt(process.env.DRAGONFLY_PORT || '6379'), +/** + * Create and configure the enhanced queue manager for data service + */ +function createDataServiceQueueManager(): QueueManager { + const config: QueueConfig = { + queueName: 'data-service-queue', + workers: parseInt(process.env.WORKER_COUNT || '5'), + concurrency: parseInt(process.env.WORKER_CONCURRENCY || '20'), + redis: { + host: process.env.DRAGONFLY_HOST || 'localhost', + port: parseInt(process.env.DRAGONFLY_PORT || '6379'), + }, + providers: [ + // Import and initialize providers lazily + async () => { + const { initializeIBProvider } = await import('../providers/ib.provider'); + return initializeIBProvider(); }, - }); - } + async () => { + const { initializeProxyProvider } = await import('../providers/proxy.provider'); + return initializeProxyProvider(); + }, + ], + enableScheduledJobs: true, + }; - async initialize() { - if (this.isInitialized) { - logger.warn('Queue manager already initialized'); - return; - } - - logger.info('Initializing data service queue manager...'); - - try { - // Register all providers - await this.registerProviders(); - - // Initialize the queue manager - await this.queueManager.initialize(); - - // Initialize batch cache - await initializeBatchCache(this.queueManager); - - // Set up scheduled jobs - await this.setupScheduledJobs(); - - this.isInitialized = true; - logger.info('Data service queue manager initialized successfully'); - } catch (error) { - logger.error('Failed to initialize queue manager', error); - throw error; - } - } - - private async registerProviders() { - logger.info('Registering queue providers...'); - - // Initialize providers using the new provider registry system - const { initializeIBProvider } = await import('../providers/ib.provider'); - const { initializeProxyProvider } = await import('../providers/proxy.provider'); - - // Register providers with scheduled jobs - initializeIBProvider(); - initializeProxyProvider(); - - // Now register all providers from the registry with the queue manager - const allProviders = providerRegistry.getAllProviders(); - for (const [providerName, config] of allProviders) { - this.queueManager.registerProvider(providerName, config.operations); - logger.info(`Registered provider: ${providerName}`); - } - - // Log scheduled jobs - const scheduledJobs = providerRegistry.getAllScheduledJobs(); - logger.info(`Registered ${scheduledJobs.length} scheduled jobs across all providers`); - for (const { provider, job } of scheduledJobs) { - logger.info( - `Scheduled job: ${provider}.${job.type} - ${job.description} (${job.cronPattern})` - ); - } - - logger.info('All providers registered successfully'); - } - - private async setupScheduledJobs() { - const scheduledJobs = providerRegistry.getAllScheduledJobs(); - - if (scheduledJobs.length === 0) { - logger.info('No scheduled jobs found'); - return; - } - - logger.info(`Setting up ${scheduledJobs.length} scheduled jobs...`); - - for (const { provider, job } of scheduledJobs) { - try { - const jobData: JobData = { - type: job.type, - provider, - operation: job.operation, - payload: job.payload, - priority: job.priority, - }; - - await this.queueManager.add(`recurring-${provider}-${job.operation}`, jobData, { - repeat: { - pattern: job.cronPattern, - tz: 'UTC', - immediately: job.immediately || false, - }, - removeOnComplete: 1, - removeOnFail: 1, - attempts: 2, - backoff: { - type: 'fixed', - delay: 5000, - }, - }); - - logger.info(`Scheduled job registered: ${provider}.${job.type} (${job.cronPattern})`); - } catch (error) { - logger.error(`Failed to register scheduled job: ${provider}.${job.type}`, { error }); - } - } - - logger.info('Scheduled jobs setup complete'); - } - - async addJob(name: string, data: JobData, options?: Record) { - if (!this.isInitialized) { - throw new Error('Queue manager not initialized'); - } - - return this.queueManager.add(name, data, options); - } - - async addBulk(jobs: Array<{ name: string; data: JobData; opts?: Record }>) { - if (!this.isInitialized) { - throw new Error('Queue manager not initialized'); - } - - return this.queueManager.addBulk(jobs); - } - - async getStats() { - if (!this.isInitialized) { - throw new Error('Queue manager not initialized'); - } - - return this.queueManager.getStats(); - } - - async clean(grace: number) { - if (!this.isInitialized) { - throw new Error('Queue manager not initialized'); - } - - return this.queueManager.clean(grace); - } - - async shutdown() { - if (!this.isInitialized) { - return; - } - - logger.info('Shutting down queue manager...'); - await this.queueManager.shutdown(); - this.isInitialized = false; - logger.info('Queue manager shutdown complete'); - } - - // Compatibility methods for existing code - getQueueName() { - return this.queueManager.getQueueName(); - } - - get queue() { - return this.queueManager; - } + return new QueueManager(config); } -// Export singleton instance -export const queueManager = new DataServiceQueueManager(); +// Create singleton instance +export const queueManager = createDataServiceQueueManager(); + +// Export convenience functions that use the queue manager +export async function initializeQueueManager() { + logger.info('Initializing data service queue manager...'); + await queueManager.initialize(); + logger.info('Data service queue manager initialized successfully'); +} + +export async function shutdownQueueManager() { + logger.info('Shutting down data service queue manager...'); + await queueManager.shutdown(); + logger.info('Data service queue manager shutdown complete'); +} + +// Export processItems for direct use +export { processItems }; diff --git a/libs/queue/src/index.ts b/libs/queue/src/index.ts index f10c791..fa4ea41 100644 --- a/libs/queue/src/index.ts +++ b/libs/queue/src/index.ts @@ -17,6 +17,7 @@ export type { ProcessOptions, ProviderConfig, ProviderConfigWithSchedule, + ProviderInitializer, QueueConfig, ScheduledJob, } from './types'; diff --git a/libs/queue/src/queue-manager.ts b/libs/queue/src/queue-manager.ts index 05974fc..6b8f26f 100644 --- a/libs/queue/src/queue-manager.ts +++ b/libs/queue/src/queue-manager.ts @@ -2,7 +2,7 @@ import { Queue, QueueEvents, Worker, type Job } from 'bullmq'; import { getLogger } from '@stock-bot/logger'; import { processBatchJob } from './batch-processor'; import { providerRegistry } from './provider-registry'; -import type { JobData, ProviderConfig, QueueConfig } from './types'; +import type { JobData, ProviderConfig, ProviderInitializer, QueueConfig } from './types'; const logger = getLogger('queue-manager'); @@ -11,6 +11,8 @@ export class QueueManager { private workers: Worker[] = []; private queueEvents!: QueueEvents; private config: Required; + private providers: ProviderInitializer[]; + private enableScheduledJobs: boolean; private get isInitialized() { return !!this.queue; @@ -24,6 +26,10 @@ export class QueueManager { } constructor(config: QueueConfig = {}) { + // Enhanced configuration + this.providers = config.providers || []; + this.enableScheduledJobs = config.enableScheduledJobs ?? true; + // Set default configuration this.config = { workers: config.workers || parseInt(process.env.WORKER_COUNT || '5'), @@ -45,11 +51,13 @@ export class QueueManager { }, ...config.defaultJobOptions, }, + providers: this.providers, + enableScheduledJobs: this.enableScheduledJobs, }; } /** - * Initialize the queue manager + * Initialize the queue manager with enhanced provider and scheduled job support */ async initialize(): Promise { if (this.isInitialized) { @@ -57,13 +65,19 @@ export class QueueManager { return; } - logger.info('Initializing queue manager...', { + logger.info('Initializing enhanced queue manager...', { queueName: this.config.queueName, workers: this.config.workers, concurrency: this.config.concurrency, + providers: this.providers.length, + enableScheduledJobs: this.enableScheduledJobs, }); try { + // Step 1: Register all providers + await this.registerProviders(); + + // Step 2: Initialize core queue infrastructure const connection = this.getConnection(); const queueName = `{${this.config.queueName}}`; @@ -76,19 +90,110 @@ export class QueueManager { // Initialize queue events this.queueEvents = new QueueEvents(queueName, { connection }); - // Start workers + // Step 3: Start workers await this.startWorkers(); - // Setup event listeners + // Step 4: Setup event listeners this.setupEventListeners(); - logger.info('Queue manager initialized successfully'); + // Step 5: Initialize batch cache + const { initializeBatchCache } = await import('./batch-processor'); + await initializeBatchCache(this); + + // Step 6: Set up scheduled jobs + if (this.enableScheduledJobs) { + await this.setupScheduledJobs(); + } + + logger.info('Enhanced queue manager initialized successfully'); } catch (error) { - logger.error('Failed to initialize queue manager', { error }); + logger.error('Failed to initialize enhanced queue manager', { error }); throw error; } } + /** + * Register all configured providers + */ + private async registerProviders(): Promise { + logger.info('Registering queue providers...', { count: this.providers.length }); + + // Initialize providers using the configured provider initializers + for (const providerInitializer of this.providers) { + try { + await providerInitializer(); + } catch (error) { + logger.error('Failed to initialize provider', { error }); + throw error; + } + } + + // Now register all providers from the registry with the queue manager + const allProviders = providerRegistry.getAllProviders(); + for (const [providerName, config] of allProviders) { + this.registerProvider(providerName, config.operations); + logger.info(`Registered provider: ${providerName}`); + } + + // Log scheduled jobs + const scheduledJobs = providerRegistry.getAllScheduledJobs(); + logger.info(`Registered ${scheduledJobs.length} scheduled jobs across all providers`); + for (const { provider, job } of scheduledJobs) { + logger.info( + `Scheduled job: ${provider}.${job.type} - ${job.description} (${job.cronPattern})` + ); + } + + logger.info('All providers registered successfully'); + } + + /** + * Set up scheduled jobs from provider registry + */ + private async setupScheduledJobs(): Promise { + const scheduledJobs = providerRegistry.getAllScheduledJobs(); + + if (scheduledJobs.length === 0) { + logger.info('No scheduled jobs found'); + return; + } + + logger.info(`Setting up ${scheduledJobs.length} scheduled jobs...`); + + for (const { provider, job } of scheduledJobs) { + try { + const jobData: JobData = { + type: job.type, + provider, + operation: job.operation, + payload: job.payload, + priority: job.priority, + }; + + await this.add(`recurring-${provider}-${job.operation}`, jobData, { + repeat: { + pattern: job.cronPattern, + tz: 'UTC', + immediately: job.immediately || false, + }, + removeOnComplete: 1, + removeOnFail: 1, + attempts: 2, + backoff: { + type: 'fixed', + delay: 5000, + }, + }); + + logger.info(`Scheduled job registered: ${provider}.${job.type} (${job.cronPattern})`); + } catch (error) { + logger.error(`Failed to register scheduled job: ${provider}.${job.type}`, { error }); + } + } + + logger.info('Scheduled jobs setup complete'); + } + /** * Register a provider with its operations */ @@ -99,7 +204,7 @@ export class QueueManager { /** * Add a single job to the queue */ - async add(name: string, data: JobData, options: any = {}): Promise { + async add(name: string, data: JobData, options: Record = {}): Promise { this.ensureInitialized(); return await this.queue.add(name, data, options); } @@ -107,7 +212,9 @@ export class QueueManager { /** * Add multiple jobs to the queue in bulk */ - async addBulk(jobs: Array<{ name: string; data: JobData; opts?: any }>): Promise { + async addBulk( + jobs: Array<{ name: string; data: JobData; opts?: Record }> + ): Promise { this.ensureInitialized(); return await this.queue.addBulk(jobs); } diff --git a/libs/queue/src/types.ts b/libs/queue/src/types.ts index a973381..7efa208 100644 --- a/libs/queue/src/types.ts +++ b/libs/queue/src/types.ts @@ -50,6 +50,8 @@ export interface QueueConfig { delay: number; }; }; + providers?: ProviderInitializer[]; + enableScheduledJobs?: boolean; } export interface JobHandler { @@ -82,3 +84,7 @@ export interface BatchJobData { totalBatches: number; itemCount: number; } + +export interface ProviderInitializer { + (): void | Promise; +}