From e8c90532d5b15c2a65c448264b24500bf4de050d Mon Sep 17 00:00:00 2001 From: Boki Date: Sat, 14 Jun 2025 18:22:28 -0400 Subject: [PATCH] thing im pretty much done with extracting the queue and making it reususable, maybe just a few more change to be able to making the batch names a bit more specific --- apps/data-service/src/index.ts | 41 ++++++++++++- .../src/providers/proxy.provider.ts | 58 ++++++++++++++++--- .../data-service/src/providers/proxy.tasks.ts | 10 ++-- apps/data-service/src/routes/health.routes.ts | 2 +- .../src/routes/market-data.routes.ts | 6 +- apps/data-service/src/routes/queue.routes.ts | 2 +- .../src/services/queue-manager.service.ts | 57 ------------------ libs/queue/debug-batch-cleanup.ts | 2 +- libs/queue/examples/basic-usage.ts | 2 +- libs/queue/examples/batch-processing.ts | 12 ++-- libs/queue/src/batch-processor.ts | 8 ++- libs/queue/src/types.ts | 2 +- libs/queue/test-api-structure.ts | 2 +- libs/queue/test-simplified-api.ts | 6 +- 14 files changed, 117 insertions(+), 93 deletions(-) delete mode 100644 apps/data-service/src/services/queue-manager.service.ts diff --git a/apps/data-service/src/index.ts b/apps/data-service/src/index.ts index 75b133e..a4ec6ed 100644 --- a/apps/data-service/src/index.ts +++ b/apps/data-service/src/index.ts @@ -6,10 +6,10 @@ import { Browser } from '@stock-bot/browser'; import { loadEnvVariables } from '@stock-bot/config'; import { getLogger, shutdownLoggers } from '@stock-bot/logger'; import { connectMongoDB, disconnectMongoDB } from '@stock-bot/mongodb-client'; +import { processItems, QueueManager, type QueueConfig } from '@stock-bot/queue'; import { Shutdown } from '@stock-bot/shutdown'; import { initializeIBResources } from './providers/ib.tasks'; import { initializeProxyResources } from './providers/proxy.tasks'; -import { initializeQueueManager, shutdownQueueManager } from './services/queue-manager.service'; import { healthRoutes, queueRoutes } from './routes'; // Load environment variables @@ -23,6 +23,41 @@ let server: ReturnType | null = null; // Initialize shutdown manager with 15 second timeout const shutdown = Shutdown.getInstance({ timeout: 15000 }); +/** + * Create and configure the enhanced queue manager for data service + */ +function createDataServiceQueueManager(): QueueManager { + const config: QueueConfig = { + queueName: 'data-service-queue', + workers: parseInt(process.env.WORKER_COUNT || '5'), + concurrency: parseInt(process.env.WORKER_CONCURRENCY || '20'), + redis: { + host: process.env.DRAGONFLY_HOST || 'localhost', + port: parseInt(process.env.DRAGONFLY_PORT || '6379'), + }, + providers: [ + // Import and initialize providers lazily + async () => { + const { initializeIBProvider } = await import('./providers/ib.provider'); + return initializeIBProvider(); + }, + async () => { + const { initializeProxyProvider } = await import('./providers/proxy.provider'); + return initializeProxyProvider(); + }, + ], + enableScheduledJobs: true, + }; + + return new QueueManager(config); +} + +// Create singleton instance +export const queueManager = createDataServiceQueueManager(); + +// Export processItems for direct use +export { processItems }; + // Register all routes app.route('', healthRoutes); app.route('', queueRoutes); @@ -54,7 +89,7 @@ async function initializeServices() { // Initialize queue manager (includes batch cache initialization) logger.info('Starting queue manager initialization...'); - await initializeQueueManager(); + await queueManager.initialize(); logger.info('Queue manager initialized'); logger.info('All services initialized successfully'); @@ -92,7 +127,7 @@ shutdown.onShutdown(async () => { shutdown.onShutdown(async () => { logger.info('Shutting down queue manager...'); try { - await shutdownQueueManager(); + await queueManager.shutdown(); logger.info('Queue manager shut down successfully'); } catch (error) { logger.error('Error shutting down queue manager', { error }); diff --git a/apps/data-service/src/providers/proxy.provider.ts b/apps/data-service/src/providers/proxy.provider.ts index 3432b1c..debc750 100644 --- a/apps/data-service/src/providers/proxy.provider.ts +++ b/apps/data-service/src/providers/proxy.provider.ts @@ -14,7 +14,57 @@ export function initializeProxyProvider() { const proxyProviderConfig: ProviderConfigWithSchedule = { name: 'proxy', + operations: { + 'fetch-from-sources': async _payload => { + // Fetch proxies from all configured sources + logger.info('Processing fetch proxies from sources request'); + const { fetchProxiesFromSources } = await import('./proxy.tasks'); + const { processItems, queueManager } = await import('../index'); + + // Fetch all proxies from sources + const proxies = await fetchProxiesFromSources(); + logger.info('Fetched proxies from sources', { count: proxies.length }); + + if (proxies.length === 0) { + logger.warn('No proxies fetched from sources'); + return { processed: 0, successful: 0 }; + } + + // Batch process the proxies through check-proxy operation + const batchResult = await processItems( + proxies, + queueManager, + { + provider: 'proxy', + operation: 'check-proxy', + totalDelayHours: 0.083, // 5 minutes (5/60 hours) + batchSize: 50, // Process 50 proxies per batch + priority: 3, + useBatching: true, + retries: 1, + ttl: 30000, // 30 second timeout per proxy check + removeOnComplete: 5, + removeOnFail: 3, + } + ); + + logger.info('Batch proxy validation completed', { + totalProxies: proxies.length, + jobsCreated: batchResult.jobsCreated, + mode: batchResult.mode, + batchesCreated: batchResult.batchesCreated, + duration: `${batchResult.duration}ms` + }); + + return { + processed: proxies.length, + jobsCreated: batchResult.jobsCreated, + batchesCreated: batchResult.batchesCreated, + mode: batchResult.mode + }; + }, + 'check-proxy': async (payload: ProxyInfo) => { // payload is now the raw proxy info object logger.debug('Processing proxy check request', { @@ -23,12 +73,6 @@ export function initializeProxyProvider() { const { checkProxy } = await import('./proxy.tasks'); return checkProxy(payload); }, - 'fetch-from-sources': async _payload => { - // Fetch proxies from all configured sources - logger.info('Processing fetch proxies from sources request'); - const { fetchProxiesFromSources } = await import('./proxy.tasks'); - return fetchProxiesFromSources(); - }, }, scheduledJobs: [ { @@ -38,7 +82,7 @@ export function initializeProxyProvider() { cronPattern: '0 */2 * * *', // Every 2 hours priority: 5, description: 'Fetch and validate proxy list from sources', - // immediately: true, // Don't run immediately during startup to avoid conflicts + immediately: true, // Don't run immediately during startup to avoid conflicts }, ], }; diff --git a/apps/data-service/src/providers/proxy.tasks.ts b/apps/data-service/src/providers/proxy.tasks.ts index a1294a5..7ed6f15 100644 --- a/apps/data-service/src/providers/proxy.tasks.ts +++ b/apps/data-service/src/providers/proxy.tasks.ts @@ -244,7 +244,7 @@ async function updateProxyInCache(proxy: ProxyInfo, isWorking: boolean): Promise const cacheKey = `${PROXY_CONFIG.CACHE_KEY}:${proxy.protocol}://${proxy.host}:${proxy.port}`; try { - const existing: any = await cache.get(cacheKey); + const existing: ProxyInfo | null = await cache.get(cacheKey); // For failed proxies, only update if they already exist if (!isWorking && !existing) { @@ -309,8 +309,8 @@ async function updateProxyInCache(proxy: ProxyInfo, isWorking: boolean): Promise // Individual task functions export async function queueProxyFetch(): Promise { - const { queueManager } = await import('../services/queue.service'); - const job = await queueManager.addJob({ + const { queueManager } = await import('../index'); + const job = await queueManager.add('proxy-fetch', { type: 'proxy-fetch', provider: 'proxy-service', operation: 'fetch-and-check', @@ -324,8 +324,8 @@ export async function queueProxyFetch(): Promise { } export async function queueProxyCheck(proxies: ProxyInfo[]): Promise { - const { queueManager } = await import('../services/queue.service'); - const job = await queueManager.addJob({ + const { queueManager } = await import('../index'); + const job = await queueManager.add('proxy-check', { type: 'proxy-check', provider: 'proxy-service', operation: 'check-specific', diff --git a/apps/data-service/src/routes/health.routes.ts b/apps/data-service/src/routes/health.routes.ts index 45d0f7f..2d0a4f6 100644 --- a/apps/data-service/src/routes/health.routes.ts +++ b/apps/data-service/src/routes/health.routes.ts @@ -2,7 +2,7 @@ * Health check routes */ import { Hono } from 'hono'; -import { queueManager } from '../services/queue-manager.service'; +import { queueManager } from '../index'; export const healthRoutes = new Hono(); diff --git a/apps/data-service/src/routes/market-data.routes.ts b/apps/data-service/src/routes/market-data.routes.ts index 8800d15..8233eab 100644 --- a/apps/data-service/src/routes/market-data.routes.ts +++ b/apps/data-service/src/routes/market-data.routes.ts @@ -3,7 +3,7 @@ */ import { Hono } from 'hono'; import { getLogger } from '@stock-bot/logger'; -import { processItems, queueManager } from '../services/queue-manager.service'; +import { processItems, queueManager } from '../index'; const logger = getLogger('market-data-routes'); @@ -79,7 +79,7 @@ marketDataRoutes.post('/api/process-symbols', async c => { provider = 'ib', operation = 'fetch-session', useBatching = true, - totalDelayMs = 30000, + totalDelayHours = 0.0083, // ~30 seconds (30/3600 hours) batchSize = 10, } = await c.req.json(); @@ -95,7 +95,7 @@ marketDataRoutes.post('/api/process-symbols', async c => { }); const result = await processItems(symbols, queueManager, { - totalDelayMs, + totalDelayHours, useBatching, batchSize, priority: 2, diff --git a/apps/data-service/src/routes/queue.routes.ts b/apps/data-service/src/routes/queue.routes.ts index 5a19f8e..8795be4 100644 --- a/apps/data-service/src/routes/queue.routes.ts +++ b/apps/data-service/src/routes/queue.routes.ts @@ -3,7 +3,7 @@ */ import { Hono } from 'hono'; import { getLogger } from '@stock-bot/logger'; -import { queueManager } from '../services/queue-manager.service'; +import { queueManager } from '../index'; const logger = getLogger('queue-routes'); diff --git a/apps/data-service/src/services/queue-manager.service.ts b/apps/data-service/src/services/queue-manager.service.ts deleted file mode 100644 index 62deb6f..0000000 --- a/apps/data-service/src/services/queue-manager.service.ts +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Data Service Queue Manager - * Uses the enhanced @stock-bot/queue library with provider registry - */ -import { getLogger } from '@stock-bot/logger'; -import type { QueueConfig } from '@stock-bot/queue'; -import { processItems, QueueManager } from '@stock-bot/queue'; - -const logger = getLogger('queue-manager-service'); - -/** - * Create and configure the enhanced queue manager for data service - */ -function createDataServiceQueueManager(): QueueManager { - const config: QueueConfig = { - queueName: 'data-service-queue', - workers: parseInt(process.env.WORKER_COUNT || '5'), - concurrency: parseInt(process.env.WORKER_CONCURRENCY || '20'), - redis: { - host: process.env.DRAGONFLY_HOST || 'localhost', - port: parseInt(process.env.DRAGONFLY_PORT || '6379'), - }, - providers: [ - // Import and initialize providers lazily - async () => { - const { initializeIBProvider } = await import('../providers/ib.provider'); - return initializeIBProvider(); - }, - async () => { - const { initializeProxyProvider } = await import('../providers/proxy.provider'); - return initializeProxyProvider(); - }, - ], - enableScheduledJobs: true, - }; - - return new QueueManager(config); -} - -// Create singleton instance -export const queueManager = createDataServiceQueueManager(); - -// Export convenience functions that use the queue manager -export async function initializeQueueManager() { - logger.info('Initializing data service queue manager...'); - await queueManager.initialize(); - logger.info('Data service queue manager initialized successfully'); -} - -export async function shutdownQueueManager() { - logger.info('Shutting down data service queue manager...'); - await queueManager.shutdown(); - logger.info('Data service queue manager shutdown complete'); -} - -// Export processItems for direct use -export { processItems }; diff --git a/libs/queue/debug-batch-cleanup.ts b/libs/queue/debug-batch-cleanup.ts index 7e174f8..34346ac 100644 --- a/libs/queue/debug-batch-cleanup.ts +++ b/libs/queue/debug-batch-cleanup.ts @@ -35,7 +35,7 @@ async function debugBatchCleanup() { // Process in batches const result = await processItems(items, queueManager, { - totalDelayMs: 10000, // 10 seconds total + totalDelayHours: 0.0028, // 10 seconds useBatching: true, batchSize: 3, // This will create 3 batches: [3,3,1] priority: 1, diff --git a/libs/queue/examples/basic-usage.ts b/libs/queue/examples/basic-usage.ts index 997dc95..5fcbd70 100644 --- a/libs/queue/examples/basic-usage.ts +++ b/libs/queue/examples/basic-usage.ts @@ -54,7 +54,7 @@ async function basicUsageExample() { const symbols = ['GOOGL', 'MSFT', 'TSLA', 'AMZN']; const result = await processItems(symbols, queueManager, { - totalDelayMs: 30000, // 30 seconds total + totalDelayHours: 0.0083, // 30 seconds useBatching: true, batchSize: 2, priority: 1, diff --git a/libs/queue/examples/batch-processing.ts b/libs/queue/examples/batch-processing.ts index f24aa8b..23315f0 100644 --- a/libs/queue/examples/batch-processing.ts +++ b/libs/queue/examples/batch-processing.ts @@ -44,7 +44,7 @@ async function batchProcessingExample() { [1, 2, 3, 4, 5], // Just pass the array directly! queueManager, { - totalDelayMs: 15000, // 15 seconds total + totalDelayHours: 0.0042, // 15 seconds useBatching: false, // Direct mode priority: 2, provider: 'data-processor', @@ -63,7 +63,7 @@ async function batchProcessingExample() { })); const batchResult = await processItems(batchData, queueManager, { - totalDelayMs: 20000, // 20 seconds total + totalDelayHours: 0.0056, // 20 seconds useBatching: true, // Batch mode batchSize: 5, // 5 items per batch priority: 1, @@ -80,7 +80,7 @@ async function batchProcessingExample() { const symbolResult = await processItems(symbols, queueManager, { operation: 'analyze-symbol', provider: 'data-processor', - totalDelayMs: 25000, // 25 seconds total + totalDelayHours: 0.0069, // 25 seconds useBatching: true, batchSize: 3, priority: 1, @@ -97,7 +97,7 @@ async function batchProcessingExample() { })); const largeResult = await processItems(largeDataset, queueManager, { - totalDelayMs: 60000, // 1 minute total + totalDelayHours: 0.0167, // 1 minute useBatching: true, batchSize: 50, // 50 items per batch priority: 3, @@ -160,7 +160,7 @@ async function compareProcessingModes() { console.log('Testing direct mode...'); const directStart = Date.now(); const directResult = await processItems(testData, queueManager, { - totalDelayMs: 10000, + totalDelayHours: 0.0028, // 10 seconds useBatching: false, provider: 'test', operation: 'process', @@ -174,7 +174,7 @@ async function compareProcessingModes() { console.log('Testing batch mode...'); const batchStart = Date.now(); const batchResult = await processItems(testData, queueManager, { - totalDelayMs: 10000, + totalDelayHours: 0.0028, // 10 seconds useBatching: true, batchSize: 5, provider: 'test', diff --git a/libs/queue/src/batch-processor.ts b/libs/queue/src/batch-processor.ts index fba6aae..92895be 100644 --- a/libs/queue/src/batch-processor.ts +++ b/libs/queue/src/batch-processor.ts @@ -56,7 +56,7 @@ export async function processItems( totalItems: items.length, mode: options.useBatching ? 'batch' : 'direct', batchSize: options.batchSize, - totalDelayMs: options.totalDelayMs, + totalDelayHours: options.totalDelayHours, }); try { @@ -86,7 +86,8 @@ async function processDirect( queue: QueueManager, options: ProcessOptions ): Promise> { - const delayPerItem = options.totalDelayMs / items.length; + const totalDelayMs = options.totalDelayHours * 60 * 60 * 1000; // Convert hours to milliseconds + const delayPerItem = totalDelayMs / items.length; logger.info('Creating direct jobs', { totalItems: items.length, @@ -130,7 +131,8 @@ async function processBatched( ): Promise> { const batchSize = options.batchSize || 100; const batches = createBatches(items, batchSize); - const delayPerBatch = options.totalDelayMs / batches.length; + const totalDelayMs = options.totalDelayHours * 60 * 60 * 1000; // Convert hours to milliseconds + const delayPerBatch = totalDelayMs / batches.length; logger.info('Creating batch jobs', { totalItems: items.length, diff --git a/libs/queue/src/types.ts b/libs/queue/src/types.ts index 7efa208..f27b961 100644 --- a/libs/queue/src/types.ts +++ b/libs/queue/src/types.ts @@ -8,7 +8,7 @@ export interface JobData { } export interface ProcessOptions { - totalDelayMs: number; + totalDelayHours: number; batchSize?: number; priority?: number; useBatching?: boolean; diff --git a/libs/queue/test-api-structure.ts b/libs/queue/test-api-structure.ts index 267abee..8605b5a 100644 --- a/libs/queue/test-api-structure.ts +++ b/libs/queue/test-api-structure.ts @@ -22,7 +22,7 @@ async function quickTest() { // Verify the processItems function signature const items = [1, 2, 3]; const options = { - totalDelayMs: 1000, + totalDelayHours: 0.0003, // ~1 second useBatching: false, provider: 'test', operation: 'test', diff --git a/libs/queue/test-simplified-api.ts b/libs/queue/test-simplified-api.ts index 56260db..d114279 100644 --- a/libs/queue/test-simplified-api.ts +++ b/libs/queue/test-simplified-api.ts @@ -30,7 +30,7 @@ async function testSimplifiedAPI() { // Test 1: Simple array of numbers const numbers = [1, 2, 3, 4, 5]; const result1 = await processItems(numbers, queueManager, { - totalDelayMs: 5000, + totalDelayHours: 0.0014, // ~5 seconds (5/3600 hours) useBatching: false, provider: 'test-provider', operation: 'process-item', @@ -46,7 +46,7 @@ async function testSimplifiedAPI() { ]; const result2 = await processItems(objects, queueManager, { - totalDelayMs: 5000, + totalDelayHours: 0.0014, // ~5 seconds useBatching: true, batchSize: 2, provider: 'test-provider', @@ -59,7 +59,7 @@ async function testSimplifiedAPI() { const symbols = Array.from({ length: 1000 }, (_, i) => `Symbol-${i + 1}`); console.log('📋 Testing with symbols...'); const result3 = await processItems(symbols, queueManager, { - totalDelayMs: 3000, + totalDelayHours: 0.0008, // ~3 seconds useBatching: true, batchSize: 1, provider: 'test-provider',