diff --git a/apps/data-service/src/index.ts b/apps/data-service/src/index.ts index 7e9da31..af4651b 100644 --- a/apps/data-service/src/index.ts +++ b/apps/data-service/src/index.ts @@ -251,5 +251,4 @@ startServer().catch(error => { logger.info('Data service startup initiated'); -// Export queue manager for providers -export { queueManager }; +// Queue manager is available via QueueManager.getInstance() singleton pattern diff --git a/apps/data-service/src/providers/ib.tasks.ts b/apps/data-service/src/providers/ib.tasks.ts index 00481a6..5fa6a9d 100644 --- a/apps/data-service/src/providers/ib.tasks.ts +++ b/apps/data-service/src/providers/ib.tasks.ts @@ -108,7 +108,7 @@ export async function fetchSession(): Promise | undefined } } -export async function fetchExchanges(sessionHeaders: Record): Promise { +export async function fetchExchanges(sessionHeaders: Record): Promise { try { logger.info('🔍 Fetching exchanges with session headers...'); @@ -170,7 +170,7 @@ export async function fetchExchanges(sessionHeaders: Record): Pr } // Fetch symbols from IB using the session headers -export async function fetchSymbols(sessionHeaders: Record): Promise { +export async function fetchSymbols(sessionHeaders: Record): Promise { try { logger.info('🔍 Fetching symbols with session headers...'); // Configure the proxy @@ -275,6 +275,8 @@ export async function fetchSymbols(sessionHeaders: Record): Prom logger.info('Saved IB symbols to DB', { totalSymbols: symbols.length, }); + + return symbols; // logger.info('📤 Making request to exchange API...', { // url: exchangeUrl, // headerCount: Object.keys(requestHeaders).length, diff --git a/apps/data-service/src/providers/proxy.tasks.ts b/apps/data-service/src/providers/proxy.tasks.ts index e78063f..f468e34 100644 --- a/apps/data-service/src/providers/proxy.tasks.ts +++ b/apps/data-service/src/providers/proxy.tasks.ts @@ -1,6 +1,8 @@ import { createCache, type CacheProvider } from '@stock-bot/cache'; +import { getDatabaseConfig } from '@stock-bot/config'; import { HttpClient, ProxyInfo } from '@stock-bot/http'; import { getLogger } from '@stock-bot/logger'; +import { QueueManager } from '@stock-bot/queue'; // Type definitions export interface ProxySource { @@ -177,7 +179,9 @@ export async function initializeProxyResources(waitForCache = false): Promise { - const { queueManager } = await import('../index'); - if (!queueManager) { - throw new Error('Queue manager not initialized'); - } + const queueManager = QueueManager.getInstance(); const queue = queueManager.getQueue('proxy'); const job = await queue.add('proxy-fetch', { handler: 'proxy', @@ -327,10 +328,7 @@ export async function queueProxyFetch(): Promise { } export async function queueProxyCheck(proxies: ProxyInfo[]): Promise { - const { queueManager } = await import('../index'); - if (!queueManager) { - throw new Error('Queue manager not initialized'); - } + const queueManager = QueueManager.getInstance(); const queue = queueManager.getQueue('proxy'); const job = await queue.add('proxy-check', { handler: 'proxy', diff --git a/apps/data-service/src/providers/qm.tasks.ts b/apps/data-service/src/providers/qm.tasks.ts index 36f67fd..326d3d2 100644 --- a/apps/data-service/src/providers/qm.tasks.ts +++ b/apps/data-service/src/providers/qm.tasks.ts @@ -1,6 +1,7 @@ import { getRandomUserAgent } from '@stock-bot/http'; import { getLogger } from '@stock-bot/logger'; import { getMongoDBClient } from '@stock-bot/mongodb-client'; +import { QueueManager } from '@stock-bot/queue'; import { getProxy } from './webshare.provider'; // Shared instances (module-scoped, not global) @@ -171,10 +172,7 @@ async function createAlphabetJobs( maxDepth: number ): Promise<{ success: boolean; symbolsFound: number; jobsCreated: number }> { try { - const { queueManager } = await import('../index'); - if (!queueManager) { - throw new Error('Queue manager not initialized'); - } + const queueManager = QueueManager.getInstance(); const queue = queueManager.getQueue('qm'); let jobsCreated = 0; @@ -242,10 +240,7 @@ async function searchAndSpawnJobs( // If we have 50+ symbols and haven't reached max depth, spawn sub-jobs if (symbolCount >= 50 && depth < maxDepth) { - const { queueManager } = await import('../index'); - if (!queueManager) { - throw new Error('Queue manager not initialized'); - } + const queueManager = QueueManager.getInstance(); const queue = queueManager.getQueue('qm'); logger.info(`Spawning sub-jobs for prefix "${prefix}" (${symbolCount} >= 50 symbols)`); diff --git a/apps/data-service/src/providers/webshare.provider.ts b/apps/data-service/src/providers/webshare.provider.ts index add2fde..a4e0ea6 100644 --- a/apps/data-service/src/providers/webshare.provider.ts +++ b/apps/data-service/src/providers/webshare.provider.ts @@ -2,7 +2,11 @@ * WebShare Provider for proxy management */ import { getLogger } from '@stock-bot/logger'; -import { handlerRegistry, createJobHandler, type HandlerConfigWithSchedule } from '@stock-bot/queue'; +import { + createJobHandler, + handlerRegistry, + type HandlerConfigWithSchedule, +} from '@stock-bot/queue'; const logger = getLogger('webshare-provider'); @@ -18,7 +22,7 @@ export function getProxy(): string | null { const proxy = proxies[currentProxyIndex]; currentProxyIndex = (currentProxyIndex + 1) % proxies.length; - return proxy; + return proxy ?? null; } // Initialize and register the WebShare provider @@ -97,7 +101,7 @@ async function fetchProxiesFromWebShare(): Promise { // Get configuration from config system const { getConfig } = await import('@stock-bot/config'); const config = getConfig(); - + // Get configuration from config system const apiKey = config.webshare?.apiKey; const apiUrl = config.webshare?.apiUrl; diff --git a/apps/data-service/src/routes/market-data.routes.ts b/apps/data-service/src/routes/market-data.routes.ts index 8233eab..62bd74e 100644 --- a/apps/data-service/src/routes/market-data.routes.ts +++ b/apps/data-service/src/routes/market-data.routes.ts @@ -3,7 +3,7 @@ */ import { Hono } from 'hono'; import { getLogger } from '@stock-bot/logger'; -import { processItems, queueManager } from '../index'; +import { processItems, QueueManager } from '@stock-bot/queue'; const logger = getLogger('market-data-routes'); @@ -16,9 +16,10 @@ marketDataRoutes.get('/api/live/:symbol', async c => { try { // Queue job for live data using Yahoo provider - const job = await queueManager.add('market-data-live', { - type: 'market-data-live', - provider: 'yahoo-finance', + const queueManager = QueueManager.getInstance(); + const queue = queueManager.getQueue('yahoo-finance'); + const job = await queue.add('live-data', { + handler: 'yahoo-finance', operation: 'live-data', payload: { symbol }, }); @@ -46,9 +47,10 @@ marketDataRoutes.get('/api/historical/:symbol', async c => { const toDate = to ? new Date(to) : new Date(); // Now // Queue job for historical data using Yahoo provider - const job = await queueManager.add('market-data-historical', { - type: 'market-data-historical', - provider: 'yahoo-finance', + const queueManager = QueueManager.getInstance(); + const queue = queueManager.getQueue('yahoo-finance'); + const job = await queue.add('historical-data', { + handler: 'yahoo-finance', operation: 'historical-data', payload: { symbol, @@ -94,13 +96,13 @@ marketDataRoutes.post('/api/process-symbols', async c => { useBatching, }); - const result = await processItems(symbols, queueManager, { + const result = await processItems(symbols, provider, { + handler: provider, + operation, totalDelayHours, useBatching, batchSize, priority: 2, - provider, - operation, retries: 2, removeOnComplete: 5, removeOnFail: 10,