diff --git a/apps/data-service/src/providers/proxy.provider.ts b/apps/data-service/src/providers/proxy.provider.ts index debc750..eac5dbf 100644 --- a/apps/data-service/src/providers/proxy.provider.ts +++ b/apps/data-service/src/providers/proxy.provider.ts @@ -21,7 +21,7 @@ export function initializeProxyProvider() { logger.info('Processing fetch proxies from sources request'); const { fetchProxiesFromSources } = await import('./proxy.tasks'); const { processItems, queueManager } = await import('../index'); - + // Fetch all proxies from sources const proxies = await fetchProxiesFromSources(); logger.info('Fetched proxies from sources', { count: proxies.length }); @@ -32,36 +32,32 @@ export function initializeProxyProvider() { } // Batch process the proxies through check-proxy operation - const batchResult = await processItems( - proxies, - queueManager, - { - provider: 'proxy', - operation: 'check-proxy', - totalDelayHours: 0.083, // 5 minutes (5/60 hours) - batchSize: 50, // Process 50 proxies per batch - priority: 3, - useBatching: true, - retries: 1, - ttl: 30000, // 30 second timeout per proxy check - removeOnComplete: 5, - removeOnFail: 3, - } - ); + const batchResult = await processItems(proxies, queueManager, { + provider: 'proxy', + operation: 'check-proxy', + totalDelayHours: 0.083, // 5 minutes (5/60 hours) + batchSize: 50, // Process 50 proxies per batch + priority: 3, + useBatching: true, + retries: 1, + ttl: 30000, // 30 second timeout per proxy check + removeOnComplete: 5, + removeOnFail: 3, + }); logger.info('Batch proxy validation completed', { totalProxies: proxies.length, jobsCreated: batchResult.jobsCreated, mode: batchResult.mode, batchesCreated: batchResult.batchesCreated, - duration: `${batchResult.duration}ms` + duration: `${batchResult.duration}ms`, }); return { processed: proxies.length, jobsCreated: batchResult.jobsCreated, batchesCreated: batchResult.batchesCreated, - mode: batchResult.mode + mode: batchResult.mode, }; }, diff --git a/libs/queue/examples/basic-usage.ts b/libs/queue/examples/basic-usage.ts deleted file mode 100644 index 5fcbd70..0000000 --- a/libs/queue/examples/basic-usage.ts +++ /dev/null @@ -1,87 +0,0 @@ -import { initializeBatchCache, processItems, QueueManager } from '@stock-bot/queue'; - -async function basicUsageExample() { - console.log('=== Basic Queue Usage Example ==='); - - // 1. Initialize queue manager - const queueManager = new QueueManager({ - queueName: 'example-queue', - workers: 3, - concurrency: 10, - redis: { - host: 'localhost', - port: 6379, - }, - }); - - // 2. Register providers - queueManager.registerProvider('market-data', { - 'fetch-price': async payload => { - // payload is now the raw symbol string - console.log(`Fetching price for ${payload}`); - // Simulate API call - await new Promise(resolve => setTimeout(resolve, 100)); - return { - symbol: payload, - price: Math.random() * 1000, - timestamp: new Date().toISOString(), - }; - }, - - 'update-cache': async payload => { - // payload is now the raw symbol string - console.log(`Updating cache for ${payload}`); - // Simulate cache update - await new Promise(resolve => setTimeout(resolve, 50)); - return { success: true, symbol: payload }; - }, - }); - - // 3. Initialize - await queueManager.initialize(); - await initializeBatchCache(queueManager); - - // 4. Add individual jobs - console.log('Adding individual jobs...'); - await queueManager.add('fetch-price', { - provider: 'market-data', - operation: 'fetch-price', - payload: 'AAPL', // Direct symbol instead of wrapped object - }); - - // 5. Process items in batch - console.log('Processing items in batch...'); - const symbols = ['GOOGL', 'MSFT', 'TSLA', 'AMZN']; - - const result = await processItems(symbols, queueManager, { - totalDelayHours: 0.0083, // 30 seconds - useBatching: true, - batchSize: 2, - priority: 1, - provider: 'market-data', - operation: 'fetch-price', - }); - - console.log('Batch processing result:', result); - - // 6. Get queue statistics - const stats = await queueManager.getStats(); - console.log('Queue stats:', stats); - - // 7. Clean up old jobs - await queueManager.clean(60000); // Clean jobs older than 1 minute - - // 8. Shutdown gracefully - setTimeout(async () => { - console.log('Shutting down...'); - await queueManager.shutdown(); - console.log('Shutdown complete'); - }, 35000); -} - -// Run the example -if (require.main === module) { - basicUsageExample().catch(console.error); -} - -export { basicUsageExample }; diff --git a/libs/queue/examples/batch-processing.ts b/libs/queue/examples/batch-processing.ts deleted file mode 100644 index 23315f0..0000000 --- a/libs/queue/examples/batch-processing.ts +++ /dev/null @@ -1,200 +0,0 @@ -import { initializeBatchCache, processItems, QueueManager } from '@stock-bot/queue'; - -async function batchProcessingExample() { - console.log('=== Batch Processing Example ==='); - - // Initialize queue manager - const queueManager = new QueueManager({ - queueName: 'batch-example-queue', - workers: 2, - concurrency: 5, - }); - - // Register data processing provider - queueManager.registerProvider('data-processor', { - 'process-item': async payload => { - console.log(`Processing item: ${JSON.stringify(payload)}`); - // Simulate processing time - await new Promise(resolve => setTimeout(resolve, 200)); - return { processed: true, originalData: payload }; - }, - - 'analyze-symbol': async payload => { - // payload is now the raw symbol string - console.log(`Analyzing symbol: ${payload}`); - // Simulate analysis - await new Promise(resolve => setTimeout(resolve, 150)); - return { - symbol: payload, - analysis: { - trend: Math.random() > 0.5 ? 'up' : 'down', - confidence: Math.random(), - timestamp: new Date().toISOString(), - }, - }; - }, - }); - - await queueManager.initialize(); - await initializeBatchCache(queueManager); - - // Example 1: Direct processing (each item = separate job) - console.log('\n--- Direct Processing Example ---'); - const directResult = await processItems( - [1, 2, 3, 4, 5], // Just pass the array directly! - queueManager, - { - totalDelayHours: 0.0042, // 15 seconds - useBatching: false, // Direct mode - priority: 2, - provider: 'data-processor', - operation: 'process-item', - } - ); - - console.log('Direct processing result:', directResult); - - // Example 2: Batch processing (groups of items) - console.log('\n--- Batch Processing Example ---'); - const batchData = Array.from({ length: 25 }, (_, i) => ({ - id: i + 1, - value: Math.random() * 100, - category: i % 3 === 0 ? 'A' : i % 3 === 1 ? 'B' : 'C', - })); - - const batchResult = await processItems(batchData, queueManager, { - totalDelayHours: 0.0056, // 20 seconds - useBatching: true, // Batch mode - batchSize: 5, // 5 items per batch - priority: 1, - provider: 'data-processor', - operation: 'process-item', - }); - - console.log('Batch processing result:', batchResult); - - // Example 3: Symbol processing (using processItems) - console.log('\n--- Symbol Processing Example ---'); - const symbols = ['AAPL', 'GOOGL', 'MSFT', 'TSLA', 'AMZN', 'META', 'NFLX']; - - const symbolResult = await processItems(symbols, queueManager, { - operation: 'analyze-symbol', - provider: 'data-processor', - totalDelayHours: 0.0069, // 25 seconds - useBatching: true, - batchSize: 3, - priority: 1, - }); - - console.log('Symbol processing result:', symbolResult); - - // Example 4: Large dataset with optimal batching - console.log('\n--- Large Dataset Example ---'); - const largeDataset = Array.from({ length: 1000 }, (_, i) => ({ - id: i + 1, - data: `item-${i + 1}`, - random: Math.random(), - })); - - const largeResult = await processItems(largeDataset, queueManager, { - totalDelayHours: 0.0167, // 1 minute - useBatching: true, - batchSize: 50, // 50 items per batch - priority: 3, - provider: 'data-processor', - operation: 'process-item', - retries: 2, - removeOnComplete: 5, - removeOnFail: 10, - }); - - console.log('Large dataset result:', largeResult); - - // Monitor queue progress - console.log('\n--- Monitoring Queue ---'); - const monitorInterval = setInterval(async () => { - const stats = await queueManager.getStats(); - console.log('Queue stats:', { - waiting: stats.waiting, - active: stats.active, - completed: stats.completed, - failed: stats.failed, - }); - - // Stop monitoring when queue is mostly empty - if (stats.waiting === 0 && stats.active === 0) { - clearInterval(monitorInterval); - console.log('Queue processing complete!'); - - setTimeout(async () => { - await queueManager.shutdown(); - console.log('Shutdown complete'); - }, 2000); - } - }, 5000); -} - -// Utility function to compare processing modes -async function compareProcessingModes() { - console.log('\n=== Processing Mode Comparison ==='); - - const queueManager = new QueueManager({ - queueName: 'comparison-queue', - workers: 2, - concurrency: 10, - }); - - queueManager.registerProvider('test', { - process: async payload => { - await new Promise(resolve => setTimeout(resolve, 100)); - return { processed: true, originalData: payload }; - }, - }); - - await queueManager.initialize(); - await initializeBatchCache(queueManager); - - const testData = Array.from({ length: 20 }, (_, i) => ({ id: i + 1 })); - - // Test direct mode - console.log('Testing direct mode...'); - const directStart = Date.now(); - const directResult = await processItems(testData, queueManager, { - totalDelayHours: 0.0028, // 10 seconds - useBatching: false, - provider: 'test', - operation: 'process', - }); - console.log('Direct mode:', { - ...directResult, - actualDuration: Date.now() - directStart, - }); - - // Test batch mode - console.log('Testing batch mode...'); - const batchStart = Date.now(); - const batchResult = await processItems(testData, queueManager, { - totalDelayHours: 0.0028, // 10 seconds - useBatching: true, - batchSize: 5, - provider: 'test', - operation: 'process', - }); - console.log('Batch mode:', { - ...batchResult, - actualDuration: Date.now() - batchStart, - }); - - setTimeout(async () => { - await queueManager.shutdown(); - }, 15000); -} - -// Run examples -if (require.main === module) { - batchProcessingExample() - .then(() => compareProcessingModes()) - .catch(console.error); -} - -export { batchProcessingExample, compareProcessingModes }; diff --git a/libs/queue/examples/migration-example.ts b/libs/queue/examples/migration-example.ts deleted file mode 100644 index b524fd4..0000000 --- a/libs/queue/examples/migration-example.ts +++ /dev/null @@ -1,211 +0,0 @@ -// Migration example from existing QueueService to new QueueManager -// OLD WAY (using existing QueueService) -/* -import { QueueService } from '../services/queue.service'; -import { providerRegistry } from '../services/provider-registry.service'; -import { processItems, initializeBatchCache } from '../utils/batch-helpers'; - -class OldDataService { - private queueService: QueueService; - - constructor() { - this.queueService = new QueueService(); - } - - async initialize() { - // Register providers - providerRegistry.register('market-data', { - 'live-data': async (payload) => { - // Handle live data - }, - }); - - await this.queueService.initialize(); - } - - async processSymbols(symbols: string[]) { - return processSymbols(symbols, this.queueService, { - operation: 'live-data', - service: 'market-data', - provider: 'yahoo', - totalDelayMs: 300000, - }); - } -} -*/ - -// NEW WAY (using @stock-bot/queue) -import { initializeBatchCache, processItems, QueueManager } from '@stock-bot/queue'; - -class NewDataService { - private queueManager: QueueManager; - - constructor() { - this.queueManager = new QueueManager({ - queueName: 'data-service-queue', - workers: 5, - concurrency: 20, - }); - } - - async initialize() { - // Register providers using the new API - this.queueManager.registerProvider('market-data', { - 'live-data': async payload => { - // payload is now the raw symbol string - console.log('Processing live data for:', payload); - // Handle live data - same logic as before - return { - symbol: payload, - price: Math.random() * 1000, - timestamp: new Date().toISOString(), - }; - }, - - 'historical-data': async payload => { - // payload is now the raw symbol string - console.log('Processing historical data for:', payload); - // Handle historical data - return { - symbol: payload, - data: Array.from({ length: 100 }, (_, i) => ({ - date: new Date(Date.now() - i * 86400000).toISOString(), - price: Math.random() * 1000, - })), - }; - }, - }); - - this.queueManager.registerProvider('analytics', { - 'calculate-indicators': async payload => { - // payload is now the raw symbol string - console.log('Calculating indicators for:', payload); - // Calculate technical indicators - return { - symbol: payload, - indicators: { - sma20: Math.random() * 1000, - rsi: Math.random() * 100, - macd: Math.random() * 10, - }, - }; - }, - }); - - await this.queueManager.initialize(); - await initializeBatchCache(this.queueManager); - } - - // Method that works exactly like before - async processSymbols(symbols: string[]) { - return processItems(symbols, this.queueManager, { - operation: 'live-data', - provider: 'market-data', // Note: provider name in the new system - totalDelayMs: 300000, - useBatching: false, - priority: 1, - }); - } - - // New method showcasing batch processing - async processSymbolsBatch(symbols: string[]) { - return processItems(symbols, this.queueManager, { - totalDelayMs: 300000, - useBatching: true, - batchSize: 50, - priority: 1, - provider: 'market-data', - operation: 'live-data', - }); - } - - // Analytics processing - async processAnalytics(symbols: string[]) { - return processItems(symbols, this.queueManager, { - totalDelayMs: 180000, // 3 minutes - useBatching: true, - batchSize: 20, - priority: 2, - provider: 'analytics', - operation: 'calculate-indicators', - }); - } - - async getQueueStats() { - return this.queueManager.getStats(); - } - - async shutdown() { - await this.queueManager.shutdown(); - } -} - -// Example usage -async function migrationExample() { - console.log('=== Migration Example ==='); - - const dataService = new NewDataService(); - await dataService.initialize(); - - const symbols = ['AAPL', 'GOOGL', 'MSFT', 'TSLA']; - - // Test symbol processing (works like before) - console.log('Processing symbols (direct)...'); - const directResult = await dataService.processSymbols(symbols.slice(0, 2)); - console.log('Direct result:', directResult); - - // Test batch processing (new capability) - console.log('Processing symbols (batch)...'); - const batchResult = await dataService.processSymbolsBatch(symbols); - console.log('Batch result:', batchResult); - - // Test analytics processing - console.log('Processing analytics...'); - const analyticsResult = await dataService.processAnalytics(symbols); - console.log('Analytics result:', analyticsResult); - - // Monitor progress - setInterval(async () => { - const stats = await dataService.getQueueStats(); - console.log('Queue stats:', stats); - - if (stats.waiting === 0 && stats.active === 0) { - console.log('All jobs complete!'); - await dataService.shutdown(); - process.exit(0); - } - }, 3000); -} - -// Key Migration Steps: -/* -1. IMPORTS: - - Replace: import { QueueService } from '../services/queue.service' - - With: import { QueueManager } from '@stock-bot/queue' - -2. PROVIDER REGISTRATION: - - Replace: providerRegistry.register(...) - - With: queueManager.registerProvider(...) - -3. INITIALIZATION: - - Replace: await queueService.initialize() - - With: await queueManager.initialize() + await initializeBatchCache() - -4. BATCH HELPERS: - - Replace: import { processItems } from '../utils/batch-helpers' - - With: import { processItems } from '@stock-bot/queue' - -5. JOB PARAMETERS: - - totalDelayHours → totalDelayMs (convert hours to milliseconds) - - Ensure provider names match registered providers - -6. CONFIGURATION: - - Use QueueConfig interface for type safety - - Environment variables work the same way -*/ - -if (require.main === module) { - migrationExample().catch(console.error); -} - -export { migrationExample, NewDataService }; diff --git a/libs/queue/test-api-structure.ts b/libs/queue/test-api-structure.ts deleted file mode 100644 index 8605b5a..0000000 --- a/libs/queue/test-api-structure.ts +++ /dev/null @@ -1,48 +0,0 @@ -#!/usr/bin/env bun -// Simple test to verify the API is correctly structured -import { initializeBatchCache, processItems, QueueManager } from './src/index.js'; - -async function quickTest() { - console.log('🚀 Quick API structure test...'); - - try { - // Test 1: Check imports - console.log('✅ Imports successful'); - console.log('- QueueManager type:', typeof QueueManager); - console.log('- processItems type:', typeof processItems); - console.log('- initializeBatchCache type:', typeof initializeBatchCache); - - // Test 2: Check function signatures - const queueManager = new QueueManager({ - queueName: 'test-api-structure', - }); - - console.log('✅ QueueManager created'); - - // Verify the processItems function signature - const items = [1, 2, 3]; - const options = { - totalDelayHours: 0.0003, // ~1 second - useBatching: false, - provider: 'test', - operation: 'test', - }; - - // This should not throw a type error - console.log('✅ processItems signature is correct (no type errors)'); - console.log('- Items:', items); - console.log('- Options:', options); - - console.log('🎯 API structure test completed successfully!'); - console.log('📋 Summary:'); - console.log(' - Security vulnerability eliminated (no function serialization)'); - console.log(' - Redundant processSymbols function removed'); - console.log(' - API simplified to: processItems(items, queue, options)'); - console.log(' - Items are passed directly as payloads'); - console.log('🏆 Queue library is ready for production use!'); - } catch (error) { - console.error('❌ Test failed:', error); - } -} - -quickTest();