From db3aa9c330f8e8d2b00e11928530e80b4cd75026 Mon Sep 17 00:00:00 2001 From: Boki Date: Sun, 22 Jun 2025 19:16:25 -0400 Subject: [PATCH] removed singletop pattern from queue manager --- .../proxy/operations/queue.operations.ts | 20 +- .../src/handlers/proxy/proxy.handler.ts | 6 +- .../src/routes/create-routes.ts | 6 +- .../src/routes/market-data.routes.ts | 233 ++++++++++-------- .../data-ingestion/src/routes/queue.routes.ts | 46 ++-- .../data-pipeline/src/routes/create-routes.ts | 15 +- .../src/routes/enhanced-sync.routes.ts | 220 ++++++++++------- apps/data-pipeline/src/routes/stats.routes.ts | 90 ++++--- apps/data-pipeline/src/routes/sync.routes.ts | 159 ++++++------ libs/core/di/src/awilix-container.ts | 23 +- libs/services/queue/src/batch-processor.ts | 44 ++-- libs/services/queue/src/queue-manager.ts | 22 +- 12 files changed, 504 insertions(+), 380 deletions(-) diff --git a/apps/data-ingestion/src/handlers/proxy/operations/queue.operations.ts b/apps/data-ingestion/src/handlers/proxy/operations/queue.operations.ts index 4b34072..54114b5 100644 --- a/apps/data-ingestion/src/handlers/proxy/operations/queue.operations.ts +++ b/apps/data-ingestion/src/handlers/proxy/operations/queue.operations.ts @@ -3,12 +3,16 @@ */ import { OperationContext } from '@stock-bot/di'; import type { ProxyInfo } from '@stock-bot/proxy'; -import { QueueManager } from '@stock-bot/queue'; +import type { IServiceContainer } from '@stock-bot/handlers'; -export async function queueProxyFetch(): Promise { +export async function queueProxyFetch(container: IServiceContainer): Promise { const ctx = OperationContext.create('proxy', 'queue-fetch'); - const queueManager = QueueManager.getInstance(); + const queueManager = container.queue; + if (!queueManager) { + throw new Error('Queue manager not available'); + } + const queue = queueManager.getQueue('proxy'); const job = await queue.add('proxy-fetch', { handler: 'proxy', @@ -22,10 +26,14 @@ export async function queueProxyFetch(): Promise { return jobId; } -export async function queueProxyCheck(proxies: ProxyInfo[]): Promise { +export async function queueProxyCheck(proxies: ProxyInfo[], container: IServiceContainer): Promise { const ctx = OperationContext.create('proxy', 'queue-check'); - const queueManager = QueueManager.getInstance(); + const queueManager = container.queue; + if (!queueManager) { + throw new Error('Queue manager not available'); + } + const queue = queueManager.getQueue('proxy'); const job = await queue.add('proxy-check', { handler: 'proxy', @@ -37,4 +45,4 @@ export async function queueProxyCheck(proxies: ProxyInfo[]): Promise { const jobId = job.id || 'unknown'; ctx.logger.info('Proxy check job queued', { jobId, count: proxies.length }); return jobId; -} +} \ No newline at end of file diff --git a/apps/data-ingestion/src/handlers/proxy/proxy.handler.ts b/apps/data-ingestion/src/handlers/proxy/proxy.handler.ts index c98070c..6261728 100644 --- a/apps/data-ingestion/src/handlers/proxy/proxy.handler.ts +++ b/apps/data-ingestion/src/handlers/proxy/proxy.handler.ts @@ -35,6 +35,10 @@ export function initializeProxyProvider(_container: ServiceContainer) { return { processed: 0, successful: 0 }; } + // Get QueueManager instance - we have to use getInstance for now until handlers get container access + const { QueueManager } = await import('@stock-bot/queue'); + const queueManager = QueueManager.getInstance(); + // Batch process the proxies through check-proxy operation const batchResult = await processItems(proxies, 'proxy', { handler: 'proxy', @@ -47,7 +51,7 @@ export function initializeProxyProvider(_container: ServiceContainer) { ttl: 30000, // 30 second timeout per proxy check removeOnComplete: 5, removeOnFail: 3, - }); + }, queueManager); handlerLogger.info('Batch proxy validation completed', { totalProxies: proxies.length, diff --git a/apps/data-ingestion/src/routes/create-routes.ts b/apps/data-ingestion/src/routes/create-routes.ts index 7c4a341..f6c29cd 100644 --- a/apps/data-ingestion/src/routes/create-routes.ts +++ b/apps/data-ingestion/src/routes/create-routes.ts @@ -6,7 +6,7 @@ import { Hono } from 'hono'; import type { IServiceContainer } from '@stock-bot/handlers'; import { exchangeRoutes } from './exchange.routes'; import { healthRoutes } from './health.routes'; -import { queueRoutes } from './queue.routes'; +import { createQueueRoutes } from './queue.routes'; /** * Creates all routes with access to type-safe services @@ -17,9 +17,9 @@ export function createRoutes(services: IServiceContainer): Hono { // Mount routes that don't need services app.route('/health', healthRoutes); - // Mount routes that need services (will be updated to use services) + // Mount routes that need services app.route('/api/exchanges', exchangeRoutes); - app.route('/api/queue', queueRoutes); + app.route('/api/queue', createQueueRoutes(services)); // Store services in app context for handlers that need it app.use('*', async (c, next) => { diff --git a/apps/data-ingestion/src/routes/market-data.routes.ts b/apps/data-ingestion/src/routes/market-data.routes.ts index 62bd74e..562ccbe 100644 --- a/apps/data-ingestion/src/routes/market-data.routes.ts +++ b/apps/data-ingestion/src/routes/market-data.routes.ts @@ -3,119 +3,140 @@ */ import { Hono } from 'hono'; import { getLogger } from '@stock-bot/logger'; -import { processItems, QueueManager } from '@stock-bot/queue'; +import { processItems } from '@stock-bot/queue'; +import type { IServiceContainer } from '@stock-bot/handlers'; const logger = getLogger('market-data-routes'); -export const marketDataRoutes = new Hono(); +export function createMarketDataRoutes(container: IServiceContainer) { + const marketDataRoutes = new Hono(); -// Market data endpoints -marketDataRoutes.get('/api/live/:symbol', async c => { - const symbol = c.req.param('symbol'); - logger.info('Live data request', { symbol }); + // Market data endpoints + marketDataRoutes.get('/api/live/:symbol', async c => { + const symbol = c.req.param('symbol'); + logger.info('Live data request', { symbol }); - try { - // Queue job for live data using Yahoo provider - const queueManager = QueueManager.getInstance(); - const queue = queueManager.getQueue('yahoo-finance'); - const job = await queue.add('live-data', { - handler: 'yahoo-finance', - operation: 'live-data', - payload: { symbol }, - }); - return c.json({ - status: 'success', - message: 'Live data job queued', - jobId: job.id, - symbol, - }); - } catch (error) { - logger.error('Failed to queue live data job', { symbol, error }); - return c.json({ status: 'error', message: 'Failed to queue live data job' }, 500); - } -}); - -marketDataRoutes.get('/api/historical/:symbol', async c => { - const symbol = c.req.param('symbol'); - const from = c.req.query('from'); - const to = c.req.query('to'); - - logger.info('Historical data request', { symbol, from, to }); - - try { - const fromDate = from ? new Date(from) : new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); // 30 days ago - const toDate = to ? new Date(to) : new Date(); // Now - - // Queue job for historical data using Yahoo provider - const queueManager = QueueManager.getInstance(); - const queue = queueManager.getQueue('yahoo-finance'); - const job = await queue.add('historical-data', { - handler: 'yahoo-finance', - operation: 'historical-data', - payload: { + try { + // Queue job for live data using Yahoo provider + const queueManager = container.queue; + if (!queueManager) { + return c.json({ status: 'error', message: 'Queue manager not available' }, 503); + } + + const queue = queueManager.getQueue('yahoo-finance'); + const job = await queue.add('live-data', { + handler: 'yahoo-finance', + operation: 'live-data', + payload: { symbol }, + }); + return c.json({ + status: 'success', + message: 'Live data job queued', + jobId: job.id, symbol, - from: fromDate.toISOString(), - to: toDate.toISOString(), - }, - }); - - return c.json({ - status: 'success', - message: 'Historical data job queued', - jobId: job.id, - symbol, - from: fromDate, - to: toDate, - }); - } catch (error) { - logger.error('Failed to queue historical data job', { symbol, from, to, error }); - return c.json({ status: 'error', message: 'Failed to queue historical data job' }, 500); - } -}); - -// Batch processing endpoint using new queue system -marketDataRoutes.post('/api/process-symbols', async c => { - try { - const { - symbols, - provider = 'ib', - operation = 'fetch-session', - useBatching = true, - totalDelayHours = 0.0083, // ~30 seconds (30/3600 hours) - batchSize = 10, - } = await c.req.json(); - - if (!symbols || !Array.isArray(symbols) || symbols.length === 0) { - return c.json({ status: 'error', message: 'Invalid symbols array' }, 400); + }); + } catch (error) { + logger.error('Failed to queue live data job', { symbol, error }); + return c.json({ status: 'error', message: 'Failed to queue live data job' }, 500); } + }); - logger.info('Batch processing symbols', { - count: symbols.length, - provider, - operation, - useBatching, - }); + marketDataRoutes.get('/api/historical/:symbol', async c => { + const symbol = c.req.param('symbol'); + const from = c.req.query('from'); + const to = c.req.query('to'); - const result = await processItems(symbols, provider, { - handler: provider, - operation, - totalDelayHours, - useBatching, - batchSize, - priority: 2, - retries: 2, - removeOnComplete: 5, - removeOnFail: 10, - }); + logger.info('Historical data request', { symbol, from, to }); - return c.json({ - status: 'success', - message: 'Batch processing initiated', - result, - symbols: symbols.length, - }); - } catch (error) { - logger.error('Failed to process symbols batch', { error }); - return c.json({ status: 'error', message: 'Failed to process symbols batch' }, 500); - } -}); + try { + const fromDate = from ? new Date(from) : new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); // 30 days ago + const toDate = to ? new Date(to) : new Date(); // Now + + // Queue job for historical data using Yahoo provider + const queueManager = container.queue; + if (!queueManager) { + return c.json({ status: 'error', message: 'Queue manager not available' }, 503); + } + + const queue = queueManager.getQueue('yahoo-finance'); + const job = await queue.add('historical-data', { + handler: 'yahoo-finance', + operation: 'historical-data', + payload: { + symbol, + from: fromDate.toISOString(), + to: toDate.toISOString(), + }, + }); + + return c.json({ + status: 'success', + message: 'Historical data job queued', + jobId: job.id, + symbol, + from: fromDate, + to: toDate, + }); + } catch (error) { + logger.error('Failed to queue historical data job', { symbol, from, to, error }); + return c.json({ status: 'error', message: 'Failed to queue historical data job' }, 500); + } + }); + + // Batch processing endpoint using new queue system + marketDataRoutes.post('/api/process-symbols', async c => { + try { + const { + symbols, + provider = 'ib', + operation = 'fetch-session', + useBatching = true, + totalDelayHours = 0.0083, // ~30 seconds (30/3600 hours) + batchSize = 10, + } = await c.req.json(); + + if (!symbols || !Array.isArray(symbols) || symbols.length === 0) { + return c.json({ status: 'error', message: 'Invalid symbols array' }, 400); + } + + logger.info('Batch processing symbols', { + count: symbols.length, + provider, + operation, + useBatching, + }); + + const queueManager = container.queue; + if (!queueManager) { + return c.json({ status: 'error', message: 'Queue manager not available' }, 503); + } + + const result = await processItems(symbols, provider, { + handler: provider, + operation, + totalDelayHours, + useBatching, + batchSize, + priority: 2, + retries: 2, + removeOnComplete: 5, + removeOnFail: 10, + }, queueManager); + + return c.json({ + status: 'success', + message: 'Batch processing initiated', + result, + symbols: symbols.length, + }); + } catch (error) { + logger.error('Failed to process symbols batch', { error }); + return c.json({ status: 'error', message: 'Failed to process symbols batch' }, 500); + } + }); + + return marketDataRoutes; +} + +// Legacy export for backward compatibility +export const marketDataRoutes = createMarketDataRoutes({} as IServiceContainer); \ No newline at end of file diff --git a/apps/data-ingestion/src/routes/queue.routes.ts b/apps/data-ingestion/src/routes/queue.routes.ts index 8dd8edc..d3cd595 100644 --- a/apps/data-ingestion/src/routes/queue.routes.ts +++ b/apps/data-ingestion/src/routes/queue.routes.ts @@ -1,25 +1,35 @@ import { Hono } from 'hono'; import { getLogger } from '@stock-bot/logger'; -import { QueueManager } from '@stock-bot/queue'; +import type { IServiceContainer } from '@stock-bot/handlers'; const logger = getLogger('queue-routes'); -const queue = new Hono(); -// Queue status endpoint -queue.get('/status', async c => { - try { - const queueManager = QueueManager.getInstance(); - const globalStats = await queueManager.getGlobalStats(); +export function createQueueRoutes(container: IServiceContainer) { + const queue = new Hono(); - return c.json({ - status: 'success', - data: globalStats, - message: 'Queue status retrieved successfully', - }); - } catch (error) { - logger.error('Failed to get queue status', { error }); - return c.json({ status: 'error', message: 'Failed to get queue status' }, 500); - } -}); + // Queue status endpoint + queue.get('/status', async c => { + try { + const queueManager = container.queue; + if (!queueManager) { + return c.json({ status: 'error', message: 'Queue manager not available' }, 503); + } + + const globalStats = await queueManager.getGlobalStats(); -export { queue as queueRoutes }; + return c.json({ + status: 'success', + data: globalStats, + message: 'Queue status retrieved successfully', + }); + } catch (error) { + logger.error('Failed to get queue status', { error }); + return c.json({ status: 'error', message: 'Failed to get queue status' }, 500); + } + }); + + return queue; +} + +// Legacy export for backward compatibility +export const queueRoutes = createQueueRoutes({} as IServiceContainer); \ No newline at end of file diff --git a/apps/data-pipeline/src/routes/create-routes.ts b/apps/data-pipeline/src/routes/create-routes.ts index 8cf160f..13bf479 100644 --- a/apps/data-pipeline/src/routes/create-routes.ts +++ b/apps/data-pipeline/src/routes/create-routes.ts @@ -4,10 +4,13 @@ */ import { Hono } from 'hono'; -import type { ServiceContainer } from '@stock-bot/di'; -import { healthRoutes, syncRoutes, enhancedSyncRoutes, statsRoutes } from './index'; +import type { IServiceContainer } from '@stock-bot/handlers'; +import { healthRoutes } from './health.routes'; +import { createSyncRoutes } from './sync.routes'; +import { createEnhancedSyncRoutes } from './enhanced-sync.routes'; +import { createStatsRoutes } from './stats.routes'; -export function createRoutes(container: ServiceContainer): Hono { +export function createRoutes(container: IServiceContainer): Hono { const app = new Hono(); // Add container to context for all routes @@ -18,9 +21,9 @@ export function createRoutes(container: ServiceContainer): Hono { // Mount routes app.route('/health', healthRoutes); - app.route('/sync', syncRoutes); - app.route('/sync', enhancedSyncRoutes); - app.route('/sync/stats', statsRoutes); + app.route('/sync', createSyncRoutes(container)); + app.route('/sync', createEnhancedSyncRoutes(container)); + app.route('/sync/stats', createStatsRoutes(container)); return app; } \ No newline at end of file diff --git a/apps/data-pipeline/src/routes/enhanced-sync.routes.ts b/apps/data-pipeline/src/routes/enhanced-sync.routes.ts index 474e550..a70a126 100644 --- a/apps/data-pipeline/src/routes/enhanced-sync.routes.ts +++ b/apps/data-pipeline/src/routes/enhanced-sync.routes.ts @@ -1,100 +1,154 @@ import { Hono } from 'hono'; import { getLogger } from '@stock-bot/logger'; -import { QueueManager } from '@stock-bot/queue'; +import type { IServiceContainer } from '@stock-bot/handlers'; const logger = getLogger('enhanced-sync-routes'); -const enhancedSync = new Hono(); -// Enhanced sync endpoints -enhancedSync.post('/exchanges/all', async c => { - try { - const clearFirst = c.req.query('clear') === 'true'; - const queueManager = QueueManager.getInstance(); - const exchangesQueue = queueManager.getQueue('exchanges'); +export function createEnhancedSyncRoutes(container: IServiceContainer) { + const enhancedSync = new Hono(); - const job = await exchangesQueue.addJob('sync-all-exchanges', { - handler: 'exchanges', - operation: 'sync-all-exchanges', - payload: { clearFirst }, - }); + // Enhanced sync endpoints + enhancedSync.post('/exchanges/all', async c => { + try { + const clearFirst = c.req.query('clear') === 'true'; + const queueManager = container.queue; + if (!queueManager) { + return c.json({ success: false, error: 'Queue manager not available' }, 503); + } + + const exchangesQueue = queueManager.getQueue('exchanges'); - return c.json({ success: true, jobId: job.id, message: 'Enhanced exchange sync job queued' }); - } catch (error) { - logger.error('Failed to queue enhanced exchange sync job', { error }); - return c.json( - { success: false, error: error instanceof Error ? error.message : 'Unknown error' }, - 500 - ); - } -}); + const job = await exchangesQueue.addJob('sync-all-exchanges', { + handler: 'exchanges', + operation: 'sync-all-exchanges', + payload: { clearFirst }, + }); -enhancedSync.post('/provider-mappings/qm', async c => { - try { - const queueManager = QueueManager.getInstance(); - const exchangesQueue = queueManager.getQueue('exchanges'); + return c.json({ success: true, jobId: job.id, message: 'Enhanced exchange sync job queued' }); + } catch (error) { + logger.error('Failed to queue enhanced exchange sync job', { error }); + return c.json( + { success: false, error: error instanceof Error ? error.message : 'Unknown error' }, + 500 + ); + } + }); - const job = await exchangesQueue.addJob('sync-qm-provider-mappings', { - handler: 'exchanges', - operation: 'sync-qm-provider-mappings', - payload: {}, - }); + enhancedSync.post('/provider-mappings/qm', async c => { + try { + const queueManager = container.queue; + if (!queueManager) { + return c.json({ success: false, error: 'Queue manager not available' }, 503); + } + + const exchangesQueue = queueManager.getQueue('exchanges'); - return c.json({ - success: true, - jobId: job.id, - message: 'QM provider mappings sync job queued', - }); - } catch (error) { - logger.error('Failed to queue QM provider mappings sync job', { error }); - return c.json( - { success: false, error: error instanceof Error ? error.message : 'Unknown error' }, - 500 - ); - } -}); + const job = await exchangesQueue.addJob('sync-qm-provider-mappings', { + handler: 'exchanges', + operation: 'sync-qm-provider-mappings', + payload: {}, + }); -enhancedSync.post('/symbols/:provider', async c => { - try { - const provider = c.req.param('provider'); - const clearFirst = c.req.query('clear') === 'true'; - const queueManager = QueueManager.getInstance(); - const symbolsQueue = queueManager.getQueue('symbols'); + return c.json({ + success: true, + jobId: job.id, + message: 'QM provider mappings sync job queued', + }); + } catch (error) { + logger.error('Failed to queue QM provider mappings sync job', { error }); + return c.json( + { success: false, error: error instanceof Error ? error.message : 'Unknown error' }, + 500 + ); + } + }); - const job = await symbolsQueue.addJob(`sync-symbols-${provider}`, { - handler: 'symbols', - operation: `sync-symbols-${provider}`, - payload: { provider, clearFirst }, - }); + enhancedSync.post('/provider-mappings/ib', async c => { + try { + const queueManager = container.queue; + if (!queueManager) { + return c.json({ success: false, error: 'Queue manager not available' }, 503); + } + + const exchangesQueue = queueManager.getQueue('exchanges'); - return c.json({ success: true, jobId: job.id, message: `${provider} symbols sync job queued` }); - } catch (error) { - logger.error('Failed to queue enhanced symbol sync job', { error }); - return c.json( - { success: false, error: error instanceof Error ? error.message : 'Unknown error' }, - 500 - ); - } -}); + const job = await exchangesQueue.addJob('sync-ib-exchanges', { + handler: 'exchanges', + operation: 'sync-ib-exchanges', + payload: {}, + }); -// Enhanced status endpoints -enhancedSync.get('/status/enhanced', async c => { - try { - const queueManager = QueueManager.getInstance(); - const exchangesQueue = queueManager.getQueue('exchanges'); + return c.json({ + success: true, + jobId: job.id, + message: 'IB exchanges sync job queued', + }); + } catch (error) { + logger.error('Failed to queue IB exchanges sync job', { error }); + return c.json( + { success: false, error: error instanceof Error ? error.message : 'Unknown error' }, + 500 + ); + } + }); - const job = await exchangesQueue.addJob('enhanced-sync-status', { - handler: 'exchanges', - operation: 'enhanced-sync-status', - payload: {}, - }); + enhancedSync.get('/status', async c => { + try { + const queueManager = container.queue; + if (!queueManager) { + return c.json({ success: false, error: 'Queue manager not available' }, 503); + } + + const symbolsQueue = queueManager.getQueue('symbols'); - // Wait for job to complete and return result - const result = await job.waitUntilFinished(); - return c.json(result); - } catch (error) { - logger.error('Failed to get enhanced sync status', { error }); - return c.json({ error: error instanceof Error ? error.message : 'Unknown error' }, 500); - } -}); + const job = await symbolsQueue.addJob('sync-status', { + handler: 'symbols', + operation: 'sync-status', + payload: {}, + }); -export { enhancedSync as enhancedSyncRoutes }; + return c.json({ success: true, jobId: job.id, message: 'Sync status job queued' }); + } catch (error) { + logger.error('Failed to queue sync status job', { error }); + return c.json( + { success: false, error: error instanceof Error ? error.message : 'Unknown error' }, + 500 + ); + } + }); + + enhancedSync.post('/clear/postgresql', async c => { + try { + const dataType = c.req.query('type') as 'exchanges' | 'provider_mappings' | 'all'; + const queueManager = container.queue; + if (!queueManager) { + return c.json({ success: false, error: 'Queue manager not available' }, 503); + } + + const exchangesQueue = queueManager.getQueue('exchanges'); + + const job = await exchangesQueue.addJob('clear-postgresql-data', { + handler: 'exchanges', + operation: 'clear-postgresql-data', + payload: { dataType: dataType || 'all' }, + }); + + return c.json({ + success: true, + jobId: job.id, + message: 'PostgreSQL data clear job queued', + }); + } catch (error) { + logger.error('Failed to queue PostgreSQL clear job', { error }); + return c.json( + { success: false, error: error instanceof Error ? error.message : 'Unknown error' }, + 500 + ); + } + }); + + return enhancedSync; +} + +// Legacy export for backward compatibility +export const enhancedSyncRoutes = createEnhancedSyncRoutes({} as IServiceContainer); \ No newline at end of file diff --git a/apps/data-pipeline/src/routes/stats.routes.ts b/apps/data-pipeline/src/routes/stats.routes.ts index 9c8c488..d11ae55 100644 --- a/apps/data-pipeline/src/routes/stats.routes.ts +++ b/apps/data-pipeline/src/routes/stats.routes.ts @@ -1,49 +1,63 @@ import { Hono } from 'hono'; import { getLogger } from '@stock-bot/logger'; -import { QueueManager } from '@stock-bot/queue'; +import type { IServiceContainer } from '@stock-bot/handlers'; const logger = getLogger('stats-routes'); -const stats = new Hono(); -// Statistics endpoints -stats.get('/exchanges', async c => { - try { - const queueManager = QueueManager.getInstance(); - const exchangesQueue = queueManager.getQueue('exchanges'); +export function createStatsRoutes(container: IServiceContainer) { + const stats = new Hono(); - const job = await exchangesQueue.addJob('get-exchange-stats', { - handler: 'exchanges', - operation: 'get-exchange-stats', - payload: {}, - }); + // Statistics endpoints + stats.get('/exchanges', async c => { + try { + const queueManager = container.queue; + if (!queueManager) { + return c.json({ error: 'Queue manager not available' }, 503); + } + + const exchangesQueue = queueManager.getQueue('exchanges'); - // Wait for job to complete and return result - const result = await job.waitUntilFinished(); - return c.json(result); - } catch (error) { - logger.error('Failed to get exchange stats', { error }); - return c.json({ error: error instanceof Error ? error.message : 'Unknown error' }, 500); - } -}); + const job = await exchangesQueue.addJob('get-exchange-stats', { + handler: 'exchanges', + operation: 'get-exchange-stats', + payload: {}, + }); -stats.get('/provider-mappings', async c => { - try { - const queueManager = QueueManager.getInstance(); - const exchangesQueue = queueManager.getQueue('exchanges'); + // Wait for job to complete and return result + const result = await job.waitUntilFinished(); + return c.json(result); + } catch (error) { + logger.error('Failed to get exchange stats', { error }); + return c.json({ error: error instanceof Error ? error.message : 'Unknown error' }, 500); + } + }); - const job = await exchangesQueue.addJob('get-provider-mapping-stats', { - handler: 'exchanges', - operation: 'get-provider-mapping-stats', - payload: {}, - }); + stats.get('/provider-mappings', async c => { + try { + const queueManager = container.queue; + if (!queueManager) { + return c.json({ error: 'Queue manager not available' }, 503); + } + + const exchangesQueue = queueManager.getQueue('exchanges'); - // Wait for job to complete and return result - const result = await job.waitUntilFinished(); - return c.json(result); - } catch (error) { - logger.error('Failed to get provider mapping stats', { error }); - return c.json({ error: error instanceof Error ? error.message : 'Unknown error' }, 500); - } -}); + const job = await exchangesQueue.addJob('get-provider-mapping-stats', { + handler: 'exchanges', + operation: 'get-provider-mapping-stats', + payload: {}, + }); -export { stats as statsRoutes }; + // Wait for job to complete and return result + const result = await job.waitUntilFinished(); + return c.json(result); + } catch (error) { + logger.error('Failed to get provider mapping stats', { error }); + return c.json({ error: error instanceof Error ? error.message : 'Unknown error' }, 500); + } + }); + + return stats; +} + +// Legacy export for backward compatibility +export const statsRoutes = createStatsRoutes({} as IServiceContainer); \ No newline at end of file diff --git a/apps/data-pipeline/src/routes/sync.routes.ts b/apps/data-pipeline/src/routes/sync.routes.ts index 8bf40b7..7d753ca 100644 --- a/apps/data-pipeline/src/routes/sync.routes.ts +++ b/apps/data-pipeline/src/routes/sync.routes.ts @@ -1,96 +1,95 @@ import { Hono } from 'hono'; import { getLogger } from '@stock-bot/logger'; -import { QueueManager } from '@stock-bot/queue'; +import type { IServiceContainer } from '@stock-bot/handlers'; const logger = getLogger('sync-routes'); -const sync = new Hono(); -// Manual sync trigger endpoints -sync.post('/symbols', async c => { - try { - const queueManager = QueueManager.getInstance(); - const symbolsQueue = queueManager.getQueue('symbols'); +export function createSyncRoutes(container: IServiceContainer) { + const sync = new Hono(); - const job = await symbolsQueue.addJob('sync-qm-symbols', { - handler: 'symbols', - operation: 'sync-qm-symbols', - payload: {}, - }); + // Manual sync trigger endpoints + sync.post('/symbols', async c => { + try { + const queueManager = container.queue; + if (!queueManager) { + return c.json({ success: false, error: 'Queue manager not available' }, 503); + } + + const symbolsQueue = queueManager.getQueue('symbols'); - return c.json({ success: true, jobId: job.id, message: 'QM symbols sync job queued' }); - } catch (error) { - logger.error('Failed to queue symbol sync job', { error }); - return c.json( - { success: false, error: error instanceof Error ? error.message : 'Unknown error' }, - 500 - ); - } -}); + const job = await symbolsQueue.addJob('sync-qm-symbols', { + handler: 'symbols', + operation: 'sync-qm-symbols', + payload: {}, + }); -sync.post('/exchanges', async c => { - try { - const queueManager = QueueManager.getInstance(); - const exchangesQueue = queueManager.getQueue('exchanges'); + return c.json({ success: true, jobId: job.id, message: 'QM symbols sync job queued' }); + } catch (error) { + logger.error('Failed to queue symbol sync job', { error }); + return c.json( + { success: false, error: error instanceof Error ? error.message : 'Unknown error' }, + 500 + ); + } + }); - const job = await exchangesQueue.addJob('sync-qm-exchanges', { - handler: 'exchanges', - operation: 'sync-qm-exchanges', - payload: {}, - }); + sync.post('/exchanges', async c => { + try { + const queueManager = container.queue; + if (!queueManager) { + return c.json({ success: false, error: 'Queue manager not available' }, 503); + } + + const exchangesQueue = queueManager.getQueue('exchanges'); - return c.json({ success: true, jobId: job.id, message: 'QM exchanges sync job queued' }); - } catch (error) { - logger.error('Failed to queue exchange sync job', { error }); - return c.json( - { success: false, error: error instanceof Error ? error.message : 'Unknown error' }, - 500 - ); - } -}); + const job = await exchangesQueue.addJob('sync-qm-exchanges', { + handler: 'exchanges', + operation: 'sync-qm-exchanges', + payload: {}, + }); -// Get sync status -sync.get('/status', async c => { - try { - const queueManager = QueueManager.getInstance(); - const symbolsQueue = queueManager.getQueue('symbols'); + return c.json({ success: true, jobId: job.id, message: 'QM exchanges sync job queued' }); + } catch (error) { + logger.error('Failed to queue exchange sync job', { error }); + return c.json( + { success: false, error: error instanceof Error ? error.message : 'Unknown error' }, + 500 + ); + } + }); - const job = await symbolsQueue.addJob('sync-status', { - handler: 'symbols', - operation: 'sync-status', - payload: {}, - }); + sync.post('/symbols/:provider', async c => { + try { + const provider = c.req.param('provider'); + const queueManager = container.queue; + if (!queueManager) { + return c.json({ success: false, error: 'Queue manager not available' }, 503); + } + + const symbolsQueue = queueManager.getQueue('symbols'); - // Wait for job to complete and return result - const result = await job.waitUntilFinished(); - return c.json(result); - } catch (error) { - logger.error('Failed to get sync status', { error }); - return c.json({ error: error instanceof Error ? error.message : 'Unknown error' }, 500); - } -}); + const job = await symbolsQueue.addJob('sync-symbols-from-provider', { + handler: 'symbols', + operation: 'sync-symbols-from-provider', + payload: { provider }, + }); -// Clear data endpoint -sync.post('/clear', async c => { - try { - const queueManager = QueueManager.getInstance(); - const exchangesQueue = queueManager.getQueue('exchanges'); + return c.json({ + success: true, + jobId: job.id, + message: `${provider} symbols sync job queued`, + }); + } catch (error) { + logger.error('Failed to queue provider symbol sync job', { error }); + return c.json( + { success: false, error: error instanceof Error ? error.message : 'Unknown error' }, + 500 + ); + } + }); - const job = await exchangesQueue.addJob('clear-postgresql-data', { - handler: 'exchanges', - operation: 'clear-postgresql-data', - payload: {}, - }); + return sync; +} - // Wait for job to complete and return result - const result = await job.waitUntilFinished(); - return c.json({ success: true, result }); - } catch (error) { - logger.error('Failed to clear PostgreSQL data', { error }); - return c.json( - { success: false, error: error instanceof Error ? error.message : 'Unknown error' }, - 500 - ); - } -}); - -export { sync as syncRoutes }; +// Legacy export for backward compatibility +export const syncRoutes = createSyncRoutes({} as IServiceContainer); \ No newline at end of file diff --git a/libs/core/di/src/awilix-container.ts b/libs/core/di/src/awilix-container.ts index 715678a..58b3d4a 100644 --- a/libs/core/di/src/awilix-container.ts +++ b/libs/core/di/src/awilix-container.ts @@ -188,19 +188,18 @@ export function createServiceContainer(rawConfig: unknown): AwilixContainer { - // Import dynamically to avoid circular dependency + // Queue manager - properly instantiated with DI + registrations.queueManager = asFunction(({ redisConfig, logger }) => { const { QueueManager } = require('@stock-bot/queue'); - - // Check if already initialized (singleton pattern) - if (QueueManager.isInitialized()) { - return QueueManager.getInstance(); - } - - // Initialize if not already done - return QueueManager.initialize({ - redis: { host: redisConfig.host, port: redisConfig.port, db: redisConfig.db }, + + return new QueueManager({ + redis: { + host: redisConfig.host, + port: redisConfig.port, + db: redisConfig.db, + password: redisConfig.password, + username: redisConfig.username, + }, enableScheduledJobs: true, delayWorkerStart: true, // We'll start workers manually }); diff --git a/libs/services/queue/src/batch-processor.ts b/libs/services/queue/src/batch-processor.ts index 8d822a4..fbf7f8e 100644 --- a/libs/services/queue/src/batch-processor.ts +++ b/libs/services/queue/src/batch-processor.ts @@ -11,9 +11,9 @@ const logger = getLogger('batch-processor'); export async function processItems( items: T[], queueName: string, - options: ProcessOptions + options: ProcessOptions, + queueManager: QueueManager ): Promise { - const queueManager = QueueManager.getInstance(); queueManager.getQueue(queueName); const startTime = Date.now(); @@ -35,8 +35,8 @@ export async function processItems( try { const result = options.useBatching - ? await processBatched(items, queueName, options) - : await processDirect(items, queueName, options); + ? await processBatched(items, queueName, options, queueManager) + : await processDirect(items, queueName, options, queueManager); const duration = Date.now() - startTime; @@ -58,9 +58,9 @@ export async function processItems( async function processDirect( items: T[], queueName: string, - options: ProcessOptions + options: ProcessOptions, + queueManager: QueueManager ): Promise> { - const queueManager = QueueManager.getInstance(); queueManager.getQueue(queueName); const totalDelayMs = options.totalDelayHours * 60 * 60 * 1000; // Convert hours to milliseconds const delayPerItem = totalDelayMs / items.length; @@ -87,7 +87,7 @@ async function processDirect( }, })); - const createdJobs = await addJobsInChunks(queueName, jobs); + const createdJobs = await addJobsInChunks(queueName, jobs, queueManager); return { totalItems: items.length, @@ -102,9 +102,9 @@ async function processDirect( async function processBatched( items: T[], queueName: string, - options: ProcessOptions + options: ProcessOptions, + queueManager: QueueManager ): Promise> { - const queueManager = QueueManager.getInstance(); queueManager.getQueue(queueName); const batchSize = options.batchSize || 100; const batches = createBatches(items, batchSize); @@ -121,7 +121,7 @@ async function processBatched( const batchJobs = await Promise.all( batches.map(async (batch, batchIndex) => { // Just store the items directly - no processing needed - const payloadKey = await storeItems(batch, queueName, options); + const payloadKey = await storeItems(batch, queueName, options, queueManager); return { name: 'process-batch', @@ -148,7 +148,7 @@ async function processBatched( }) ); - const createdJobs = await addJobsInChunks(queueName, batchJobs); + const createdJobs = await addJobsInChunks(queueName, batchJobs, queueManager); return { totalItems: items.length, @@ -161,8 +161,7 @@ async function processBatched( /** * Process a batch job - loads items and creates individual jobs */ -export async function processBatchJob(jobData: BatchJobData, queueName: string): Promise { - const queueManager = QueueManager.getInstance(); +export async function processBatchJob(jobData: BatchJobData, queueName: string, queueManager: QueueManager): Promise { queueManager.getQueue(queueName); const { payloadKey, batchIndex, totalBatches, itemCount, totalDelayHours } = jobData; @@ -174,7 +173,7 @@ export async function processBatchJob(jobData: BatchJobData, queueName: string): }); try { - const payload = await loadPayload(payloadKey, queueName); + const payload = await loadPayload(payloadKey, queueName, queueManager); if (!payload || !payload.items || !payload.options) { logger.error('Invalid payload data', { payloadKey, payload }); throw new Error(`Invalid payload data for key: ${payloadKey}`); @@ -210,10 +209,10 @@ export async function processBatchJob(jobData: BatchJobData, queueName: string): }, })); - const createdJobs = await addJobsInChunks(queueName, jobs); + const createdJobs = await addJobsInChunks(queueName, jobs, queueManager); // Cleanup payload after successful processing - await cleanupPayload(payloadKey, queueName); + await cleanupPayload(payloadKey, queueName, queueManager); return { batchIndex, @@ -239,9 +238,9 @@ function createBatches(items: T[], batchSize: number): T[][] { async function storeItems( items: T[], queueName: string, - options: ProcessOptions + options: ProcessOptions, + queueManager: QueueManager ): Promise { - const queueManager = QueueManager.getInstance(); const cache = queueManager.getCache(queueName); const payloadKey = `payload:${Date.now()}:${Math.random().toString(36).substr(2, 9)}`; @@ -265,7 +264,8 @@ async function storeItems( async function loadPayload( key: string, - queueName: string + queueName: string, + queueManager: QueueManager ): Promise<{ items: T[]; options: { @@ -276,7 +276,6 @@ async function loadPayload( operation: string; }; } | null> { - const queueManager = QueueManager.getInstance(); const cache = queueManager.getCache(queueName); return (await cache.get(key)) as { items: T[]; @@ -290,8 +289,7 @@ async function loadPayload( } | null; } -async function cleanupPayload(key: string, queueName: string): Promise { - const queueManager = QueueManager.getInstance(); +async function cleanupPayload(key: string, queueName: string, queueManager: QueueManager): Promise { const cache = queueManager.getCache(queueName); await cache.del(key); } @@ -299,9 +297,9 @@ async function cleanupPayload(key: string, queueName: string): Promise { async function addJobsInChunks( queueName: string, jobs: Array<{ name: string; data: JobData; opts?: Record }>, + queueManager: QueueManager, chunkSize = 100 ): Promise { - const queueManager = QueueManager.getInstance(); const queue = queueManager.getQueue(queueName); const allCreatedJobs = []; diff --git a/libs/services/queue/src/queue-manager.ts b/libs/services/queue/src/queue-manager.ts index 620f93b..b2626e4 100644 --- a/libs/services/queue/src/queue-manager.ts +++ b/libs/services/queue/src/queue-manager.ts @@ -15,7 +15,7 @@ import { getRedisConnection } from './utils'; const logger = getLogger('queue-manager'); /** - * Singleton QueueManager that provides unified queue and cache management + * QueueManager provides unified queue and cache management * Main entry point for all queue operations with getQueue() method */ export class QueueManager { @@ -28,7 +28,7 @@ export class QueueManager { private shutdownPromise: Promise | null = null; private config: QueueManagerConfig; - private constructor(config: QueueManagerConfig) { + constructor(config: QueueManagerConfig) { this.config = config; this.redisConnection = getRedisConnection(config.redis); @@ -42,16 +42,20 @@ export class QueueManager { }); } - logger.info('QueueManager singleton initialized', { + logger.info('QueueManager initialized', { redis: `${config.redis.host}:${config.redis.port}`, }); } /** + * @deprecated Use dependency injection instead. This method will be removed in a future version. * Get the singleton instance * @throws Error if not initialized - use initialize() first */ static getInstance(): QueueManager { + logger.warn( + 'QueueManager.getInstance() is deprecated. Please use dependency injection instead.' + ); if (!QueueManager.instance) { throw new Error('QueueManager not initialized. Call QueueManager.initialize(config) first.'); } @@ -59,10 +63,14 @@ export class QueueManager { } /** + * @deprecated Use dependency injection instead. This method will be removed in a future version. * Initialize the singleton with config * Must be called before getInstance() */ static initialize(config: QueueManagerConfig): QueueManager { + logger.warn( + 'QueueManager.initialize() is deprecated. Please use dependency injection instead.' + ); if (QueueManager.instance) { logger.warn('QueueManager already initialized, returning existing instance'); return QueueManager.instance; @@ -72,10 +80,14 @@ export class QueueManager { } /** + * @deprecated Use dependency injection instead. This method will be removed in a future version. * Get or initialize the singleton * Convenience method that combines initialize and getInstance */ static getOrInitialize(config?: QueueManagerConfig): QueueManager { + logger.warn( + 'QueueManager.getOrInitialize() is deprecated. Please use dependency injection instead.' + ); if (QueueManager.instance) { return QueueManager.instance; } @@ -91,6 +103,7 @@ export class QueueManager { } /** + * @deprecated Use dependency injection instead. This method will be removed in a future version. * Check if the QueueManager is initialized */ static isInitialized(): boolean { @@ -98,6 +111,7 @@ export class QueueManager { } /** + * @deprecated Use dependency injection instead. This method will be removed in a future version. * Reset the singleton (mainly for testing) */ static async reset(): Promise { @@ -489,4 +503,4 @@ export class QueueManager { getConfig(): Readonly { return { ...this.config }; } -} +} \ No newline at end of file