From 8daaff27fd274fce025fb984bf10211c56887bfd Mon Sep 17 00:00:00 2001 From: Bojan Kucera Date: Sun, 8 Jun 2025 14:20:45 -0400 Subject: [PATCH] reorganized stuff --- apps/data-service/src/PROXY-SERVICE-README.md | 258 ------------------ apps/data-service/src/index.ts | 3 +- .../src/services/proxy-queue-integration.ts | 90 ------ .../src/services/proxy.service.ts | 5 +- .../src/services/queue-manager.service.ts | 186 ------------- .../src/services/queue.service.ts | 236 ++++++++-------- 6 files changed, 116 insertions(+), 662 deletions(-) delete mode 100644 apps/data-service/src/PROXY-SERVICE-README.md delete mode 100644 apps/data-service/src/services/proxy-queue-integration.ts delete mode 100644 apps/data-service/src/services/queue-manager.service.ts diff --git a/apps/data-service/src/PROXY-SERVICE-README.md b/apps/data-service/src/PROXY-SERVICE-README.md deleted file mode 100644 index 878f33f..0000000 --- a/apps/data-service/src/PROXY-SERVICE-README.md +++ /dev/null @@ -1,258 +0,0 @@ -# Proxy Service - -A comprehensive proxy management service for the Stock Bot platform that integrates with existing libraries (Redis cache, logger, http) to provide robust proxy scraping, validation, and management capabilities. - -## Features - -- **Automatic Proxy Scraping**: Scrapes free proxies from multiple public sources -- **Proxy Validation**: Tests proxy connectivity and response times -- **Redis Caching**: Stores proxy data with TTL and working status in Redis -- **Health Monitoring**: Periodic health checks for working proxies -- **Structured Logging**: Comprehensive logging with the platform's logger -- **HTTP Client Integration**: Seamless integration with the existing http library -- **Background Processing**: Non-blocking proxy validation and refresh jobs - -## Quick Start - -```typescript -import { proxyService } from './services/proxy.service.js'; - -// Start the proxy service with automatic refresh -await proxyService.queueRefreshProxies(30 * 60 * 1000); // Refresh every 30 minutes -await proxyService.startHealthChecks(15 * 60 * 1000); // Health check every 15 minutes - -// Get a working proxy -const proxy = await proxyService.getWorkingProxy(); - -// Use the proxy with HttpClient -import { HttpClient } from '@stock-bot/http'; -const client = new HttpClient({ proxy }); -const response = await client.get('https://api.example.com/data'); -``` - -## Core Methods - -### Proxy Management - -```typescript -// Scrape proxies from default sources -const count = await proxyService.scrapeProxies(); - -// Scrape from custom sources -const customSources = [ - { - url: 'https://example.com/proxy-list.txt', - type: 'free', - format: 'text', - parser: (content) => parseCustomFormat(content) - } -]; -await proxyService.scrapeProxies(customSources); - -// Test a specific proxy -const result = await proxyService.checkProxy(proxy, 'http://httpbin.org/ip'); -console.log(`Proxy working: ${result.isWorking}, Response time: ${result.responseTime}ms`); -``` - -### Proxy Retrieval - -```typescript -// Get a single working proxy -const proxy = await proxyService.getWorkingProxy(); - -// Get multiple working proxies -const proxies = await proxyService.getWorkingProxies(10); - -// Get all proxies (including non-working) -const allProxies = await proxyService.getAllProxies(); -``` - -### Statistics and Monitoring - -```typescript -// Get proxy statistics -const stats = await proxyService.getProxyStats(); -console.log(`Total: ${stats.total}, Working: ${stats.working}, Failed: ${stats.failed}`); -console.log(`Average response time: ${stats.avgResponseTime}ms`); -``` - -### Maintenance - -```typescript -// Clear all proxy data -await proxyService.clearProxies(); - -// Graceful shutdown -await proxyService.shutdown(); -``` - -## Configuration - -The service uses environment variables for Redis configuration: - -```bash -REDIS_HOST=localhost # Redis host (default: localhost) -REDIS_PORT=6379 # Redis port (default: 6379) -REDIS_DB=0 # Redis database (default: 0) -``` - -## Proxy Sources - -Default sources include: -- TheSpeedX/PROXY-List (HTTP proxies) -- clarketm/proxy-list (HTTP proxies) -- ShiftyTR/Proxy-List (HTTP proxies) -- monosans/proxy-list (HTTP proxies) - -### Custom Proxy Sources - -You can add custom proxy sources with different formats: - -```typescript -const customSource = { - url: 'https://api.example.com/proxies', - type: 'premium', - format: 'json', - parser: (content) => { - const data = JSON.parse(content); - return data.proxies.map(p => ({ - type: 'http', - host: p.ip, - port: p.port, - username: p.user, - password: p.pass - })); - } -}; -``` - -## Integration Examples - -### With Market Data Collection - -```typescript -import { proxyService } from './services/proxy.service.js'; -import { HttpClient } from '@stock-bot/http'; - -async function fetchMarketDataWithProxy(symbol: string) { - const proxy = await proxyService.getWorkingProxy(); - if (!proxy) { - throw new Error('No working proxies available'); - } - - const client = new HttpClient({ - proxy, - timeout: 10000, - retries: 2 - }); - - try { - return await client.get(`https://api.example.com/stock/${symbol}`); - } catch (error) { - // Mark proxy as potentially failed and try another - await proxyService.checkProxy(proxy); - throw error; - } -} -``` - -### Proxy Rotation Strategy - -```typescript -async function fetchWithProxyRotation(urls: string[]) { - const proxies = await proxyService.getWorkingProxies(urls.length); - - const promises = urls.map(async (url, index) => { - const proxy = proxies[index % proxies.length]; - const client = new HttpClient({ proxy }); - return client.get(url); - }); - - return Promise.allSettled(promises); -} -``` - -## Cache Structure - -The service stores data in Redis with the following structure: - -``` -proxy:{host}:{port} # Individual proxy data with status -proxy:working:{host}:{port} # Working proxy references -proxy:stats # Cached statistics -``` - -## Logging - -The service provides structured logging for all operations: - -- Proxy scraping progress and results -- Validation results and timing -- Cache operations and statistics -- Error conditions and recovery - -## Background Jobs - -### Refresh Job -- Scrapes proxies from all sources -- Removes duplicates -- Stores in cache with metadata -- Triggers background validation - -### Health Check Job -- Tests existing working proxies -- Updates status in cache -- Removes failed proxies from working set -- Maintains proxy pool health - -### Validation Job -- Tests newly scraped proxies -- Updates working status -- Measures response times -- Runs in background to avoid blocking - -## Error Handling - -The service includes comprehensive error handling: - -- Network failures during scraping -- Redis connection issues -- Proxy validation timeouts -- Invalid proxy formats -- Cache operation failures - -All errors are logged with context and don't crash the service. - -## Performance Considerations - -- **Concurrent Validation**: Processes proxies in chunks of 50 -- **Rate Limiting**: Includes delays between validation chunks -- **Cache Efficiency**: Uses TTL and working proxy sets -- **Memory Management**: Processes large proxy lists in batches -- **Background Processing**: Validation doesn't block main operations - -## Dependencies - -- `@stock-bot/cache`: Redis caching with TTL support -- `@stock-bot/logger`: Structured logging with Loki integration -- `@stock-bot/http`: HTTP client with built-in proxy support -- `ioredis`: Redis client (via cache library) -- `pino`: High-performance logging (via logger library) - -## Limitations - -Due to the current Redis cache provider interface: -- Key pattern matching not available -- Bulk operations limited -- Set operations (sadd, srem) not directly supported - -The service works around these limitations using individual key operations and maintains functionality while noting areas for future enhancement. - -## Future Enhancements - -- Premium proxy source integration -- Proxy performance analytics -- Geographic proxy distribution -- Protocol-specific proxy pools (HTTP, HTTPS, SOCKS) -- Enhanced caching with set operations -- Proxy authentication management diff --git a/apps/data-service/src/index.ts b/apps/data-service/src/index.ts index 0301cf9..b5599d6 100644 --- a/apps/data-service/src/index.ts +++ b/apps/data-service/src/index.ts @@ -5,7 +5,7 @@ import { getLogger } from '@stock-bot/logger'; import { loadEnvVariables } from '@stock-bot/config'; import { Hono } from 'hono'; import { serve } from '@hono/node-server'; -import { queueManager } from './services/queue-manager.service'; +import { queueManager } from './services/queue.service'; import { proxyService } from './services/proxy.service'; import { marketDataProvider } from './providers/market-data.provider'; @@ -124,6 +124,7 @@ async function initializeServices() { try { // Queue manager is initialized automatically when imported logger.info('Queue manager initialized'); + // Initialize providers logger.info('All services initialized successfully'); diff --git a/apps/data-service/src/services/proxy-queue-integration.ts b/apps/data-service/src/services/proxy-queue-integration.ts deleted file mode 100644 index b6f2a76..0000000 --- a/apps/data-service/src/services/proxy-queue-integration.ts +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Example: Proxy Service with BullMQ Integration - * This shows how to integrate the queue service with your existing proxy service - */ -import { getLogger } from '@stock-bot/logger'; -import { queueService } from './queue.service'; -import type { ProxyInfo } from '@stock-bot/http'; - -const logger = getLogger('proxy-queue-integration'); - -export class ProxyQueueIntegration { - - constructor() { - // Initialize recurring tasks when service starts - this.initializeScheduledTasks(); - } - - private async initializeScheduledTasks() { - try { - await queueService.scheduleRecurringTasks(); - logger.info('Proxy scheduling tasks initialized'); - } catch (error) { - logger.error('Failed to initialize scheduled tasks', { error }); - } - } - - /** - * Manually trigger proxy fetching and checking - */ - async triggerProxyFetch() { - try { - const job = await queueService.addManualProxyFetch(); - logger.info('Manual proxy fetch job added', { jobId: job.id }); - return job; - } catch (error) { - logger.error('Failed to trigger proxy fetch', { error }); - throw error; - } - } - - /** - * Check specific proxies immediately - */ - async checkSpecificProxies(proxies: ProxyInfo[]) { - try { - const job = await queueService.addImmediateProxyCheck(proxies); - logger.info('Specific proxy check job added', { - jobId: job.id, - proxiesCount: proxies.length - }); - return job; - } catch (error) { - logger.error('Failed to check specific proxies', { error }); - throw error; - } - } - - /** - * Get queue statistics - */ - async getStats() { - try { - return await queueService.getQueueStats(); - } catch (error) { - logger.error('Failed to get queue stats', { error }); - throw error; - } - } - - /** - * Get the queue instance for Bull Board monitoring - */ - async getQueue() { - return await queueService.getQueue(); - } - - /** - * Shutdown queue service gracefully - */ - async shutdown() { - try { - await queueService.shutdown(); - logger.info('Proxy queue integration shut down'); - } catch (error) { - logger.error('Error during shutdown', { error }); - } - } -} - -export const proxyQueueIntegration = new ProxyQueueIntegration(); diff --git a/apps/data-service/src/services/proxy.service.ts b/apps/data-service/src/services/proxy.service.ts index ae0edad..bebc411 100644 --- a/apps/data-service/src/services/proxy.service.ts +++ b/apps/data-service/src/services/proxy.service.ts @@ -2,7 +2,6 @@ import { Logger } from '@stock-bot/logger'; import createCache, { type CacheProvider } from '@stock-bot/cache'; import { HttpClient, ProxyInfo } from '@stock-bot/http'; import pLimit from 'p-limit'; -import { queueService } from './queue.service'; export class ProxyService { private logger = new Logger('proxy-service'); @@ -83,7 +82,7 @@ export class ProxyService { // Add queue integration methods async queueProxyFetch(): Promise { - const { queueManager } = await import('./queue-manager.service'); + const { queueManager } = await import('./queue.service'); const job = await queueManager.addJob({ type: 'proxy-fetch', service: 'proxy', @@ -99,7 +98,7 @@ export class ProxyService { } async queueProxyCheck(proxies: ProxyInfo[]): Promise { - const { queueManager } = await import('./queue-manager.service'); + const { queueManager } = await import('./queue.service'); const job = await queueManager.addJob({ type: 'proxy-check', service: 'proxy', diff --git a/apps/data-service/src/services/queue-manager.service.ts b/apps/data-service/src/services/queue-manager.service.ts deleted file mode 100644 index 6ee2f38..0000000 --- a/apps/data-service/src/services/queue-manager.service.ts +++ /dev/null @@ -1,186 +0,0 @@ -import { Queue, Worker, QueueEvents } from 'bullmq'; -import { Logger } from '@stock-bot/logger'; - -export interface JobData { - type: 'proxy-fetch' | 'proxy-check' | 'market-data' | 'historical-data'; - service: 'proxy' | 'market-data' | 'analytics'; - provider: string; - operation: string; - payload: any; - priority?: number; -} - -export class QueueManagerService { - private logger = new Logger('queue-manager'); - private queue: Queue; - private worker: Worker; - private queueEvents: QueueEvents; - - constructor() { - const connection = { - host: process.env.DRAGONFLY_HOST || 'localhost', - port: parseInt(process.env.DRAGONFLY_PORT || '6379'), - }; - - this.queue = new Queue('data-service-queue', { connection }); - this.worker = new Worker('data-service-queue', this.processJob.bind(this), { - connection, - concurrency: 10 - }); - this.queueEvents = new QueueEvents('data-service-queue', { connection }); - - this.setupEventListeners(); - this.setupScheduledTasks(); - } - - private async processJob(job: any) { - const { type, service, provider, operation, payload }: JobData = job.data; - - this.logger.info('Processing job', { id: job.id, type, service, provider, operation }); - - try { - switch (type) { - case 'proxy-fetch': - return await this.handleProxyFetch(payload); - case 'proxy-check': - return await this.handleProxyCheck(payload); - case 'market-data': - return await this.handleMarketData(payload); - case 'historical-data': - return await this.handleHistoricalData(payload); - default: - throw new Error(`Unknown job type: ${type}`); - } - } catch (error) { - this.logger.error('Job failed', { id: job.id, type, error }); - throw error; - } - } - - private async handleProxyFetch(payload: any) { - const { proxyService } = await import('./proxy.service'); - return await proxyService.fetchProxiesFromSources(); - } - - private async handleProxyCheck(payload: { proxies: any[] }) { - const { proxyService } = await import('./proxy.service'); - return await proxyService.checkProxies(payload.proxies); - } - private async handleMarketData(payload: { symbol: string }) { - const { marketDataProvider } = await import('../providers/market-data.provider.js'); - return await marketDataProvider.getLiveData(payload.symbol); - } - - private async handleHistoricalData(payload: { symbol: string; from: Date; to: Date; interval: string }) { - const { marketDataProvider } = await import('../providers/market-data.provider.js'); - return await marketDataProvider.getHistoricalData(payload.symbol, payload.from, payload.to, payload.interval); - } - - private setupEventListeners() { - this.queueEvents.on('completed', (job) => { - this.logger.info('Job completed', { id: job.jobId }); - }); - - this.queueEvents.on('failed', (job) => { - this.logger.error('Job failed', { id: job.jobId, error: job.failedReason }); - }); - - this.worker.on('progress', (job, progress) => { - this.logger.debug('Job progress', { id: job.id, progress }); - }); - } - - private setupScheduledTasks() { - // Market data refresh every minute - this.addRecurringJob({ - type: 'market-data', - service: 'market-data', - provider: 'unified-data', - operation: 'refresh-cache', - payload: { symbols: ['AAPL', 'GOOGL', 'MSFT'] } - }, '*/1 * * * *'); - - // Proxy check every 15 minutes - this.addRecurringJob({ - type: 'proxy-fetch', - service: 'proxy', - provider: 'proxy-service', - operation: 'fetch-and-check', - payload: {} - }, '*/15 * * * *'); - - this.logger.info('Scheduled tasks configured'); - } - - async addJob(jobData: JobData, options?: any) { - return this.queue.add(jobData.type, jobData, { - priority: jobData.priority || 0, - removeOnComplete: 10, - removeOnFail: 5, - ...options - }); - } - - async addRecurringJob(jobData: JobData, cronPattern: string) { - return this.queue.add( - `recurring-${jobData.type}`, - jobData, - { - repeat: { pattern: cronPattern }, - removeOnComplete: 1, - removeOnFail: 1, - jobId: `recurring-${jobData.service}-${jobData.provider}-${jobData.operation}` - } - ); - } - - async getJobStats() { - const [waiting, active, completed, failed, delayed] = await Promise.all([ - this.queue.getWaiting(), - this.queue.getActive(), - this.queue.getCompleted(), - this.queue.getFailed(), - this.queue.getDelayed() - ]); - - return { - waiting: waiting.length, - active: active.length, - completed: completed.length, - failed: failed.length, - delayed: delayed.length - }; - } - async getQueueStatus() { - const stats = await this.getJobStats(); - return { - ...stats, - workers: this.getWorkerCount(), - queue: this.queue.name, - connection: { - host: process.env.DRAGONFLY_HOST || 'localhost', - port: parseInt(process.env.DRAGONFLY_PORT || '6379') - } - }; - } - - getWorkerCount() { - return this.worker.opts.concurrency || 1; - } - - getRegisteredProviders() { - return [ - { name: 'proxy-service', type: 'proxy', operations: ['fetch-and-check', 'check-specific'] }, - { name: 'market-data-provider', type: 'market-data', operations: ['live-data', 'historical-data'] } - ]; - } - - async shutdown() { - this.logger.info('Shutting down queue manager'); - await this.worker.close(); - await this.queue.close(); - await this.queueEvents.close(); - } -} - -export const queueManager = new QueueManagerService(); diff --git a/apps/data-service/src/services/queue.service.ts b/apps/data-service/src/services/queue.service.ts index 20fe361..5582828 100644 --- a/apps/data-service/src/services/queue.service.ts +++ b/apps/data-service/src/services/queue.service.ts @@ -1,19 +1,17 @@ -/** - * BullMQ Queue Service - * Handles job scheduling and processing for the data service - */ import { Queue, Worker, QueueEvents } from 'bullmq'; -import { getLogger } from '@stock-bot/logger'; -import type { ProxyInfo } from '@stock-bot/http'; +import { Logger } from '@stock-bot/logger'; -const logger = getLogger('queue-service'); - -export interface ProxyJobData { - type: 'fetch-and-check' | 'check-specific' | 'clear-cache'; - proxies?: ProxyInfo[]; +export interface JobData { + type: 'proxy-fetch' | 'proxy-check' | 'market-data' | 'historical-data'; + service: 'proxy' | 'market-data' | 'analytics'; + provider: string; + operation: string; + payload: any; + priority?: number; } export class QueueService { + private logger = new Logger('queue-manager'); private queue: Queue; private worker: Worker; private queueEvents: QueueEvents; @@ -24,151 +22,125 @@ export class QueueService { port: parseInt(process.env.DRAGONFLY_PORT || '6379'), }; - // Create queue - this.queue = new Queue('proxy-tasks', { connection }); - - // Create worker - this.worker = new Worker('proxy-tasks', this.processJob.bind(this), { + this.queue = new Queue('data-service-queue', { connection }); + this.worker = new Worker('data-service-queue', this.processJob.bind(this), { connection, - concurrency: 3, + concurrency: 10 }); - - // Create queue events for monitoring - this.queueEvents = new QueueEvents('proxy-tasks', { connection }); + this.queueEvents = new QueueEvents('data-service-queue', { connection }); this.setupEventListeners(); - logger.info('Queue service initialized', { connection }); + this.setupScheduledTasks(); } private async processJob(job: any) { - const { type, proxies }: ProxyJobData = job.data; - logger.info('Processing job', { - id: job.id, - type, - proxiesCount: proxies?.length - }); + const { type, service, provider, operation, payload }: JobData = job.data; + + this.logger.info('Processing job', { id: job.id, type, service, provider, operation }); try { switch (type) { - case 'fetch-and-check': - // Import proxy service dynamically to avoid circular dependencies - const { proxyService } = await import('./proxy.service'); - return await proxyService.fetchProxiesFromSources(); - - case 'check-specific': - if (!proxies) throw new Error('Proxies required for check-specific job'); - const { proxyService: ps } = await import('./proxy.service'); - return await ps.checkProxies(proxies); - - case 'clear-cache': - // Clear proxy cache - const { proxyService: pcs } = await import('./proxy.service'); - // Assuming you have a clearCache method - // return await pcs.clearCache(); - logger.info('Cache clear job processed'); - return { cleared: true }; - + case 'proxy-fetch': + return await this.handleProxyFetch(payload); + case 'proxy-check': + return await this.handleProxyCheck(payload); + case 'market-data': + return await this.handleMarketData(payload); + case 'historical-data': + return await this.handleHistoricalData(payload); default: throw new Error(`Unknown job type: ${type}`); } } catch (error) { - logger.error('Job processing failed', { - id: job.id, - type, - error: error instanceof Error ? error.message : String(error) - }); + this.logger.error('Job failed', { id: job.id, type, error }); throw error; } } + private async handleProxyFetch(payload: any) { + const { proxyService } = await import('./proxy.service'); + return await proxyService.fetchProxiesFromSources(); + } + + private async handleProxyCheck(payload: { proxies: any[] }) { + const { proxyService } = await import('./proxy.service'); + return await proxyService.checkProxies(payload.proxies); + } + private async handleMarketData(payload: { symbol: string }) { + const { marketDataProvider } = await import('../providers/market-data.provider.js'); + return await marketDataProvider.getLiveData(payload.symbol); + } + + private async handleHistoricalData(payload: { symbol: string; from: Date; to: Date; interval: string }) { + const { marketDataProvider } = await import('../providers/market-data.provider.js'); + return await marketDataProvider.getHistoricalData(payload.symbol, payload.from, payload.to, payload.interval); + } + private setupEventListeners() { - this.worker.on('completed', (job) => { - logger.info('Job completed', { - id: job.id, - type: job.data.type, - result: job.returnvalue - }); + this.queueEvents.on('completed', (job) => { + this.logger.info('Job completed', { id: job.jobId }); }); - this.worker.on('failed', (job, err) => { - logger.error('Job failed', { - id: job?.id, - type: job?.data.type, - error: err.message - }); + this.queueEvents.on('failed', (job) => { + this.logger.error('Job failed', { id: job.jobId, error: job.failedReason }); }); this.worker.on('progress', (job, progress) => { - logger.debug('Job progress', { - id: job.id, - progress: `${progress}%` - }); - }); - - this.queueEvents.on('waiting', ({ jobId }) => { - logger.debug('Job waiting', { jobId }); - }); - - this.queueEvents.on('active', ({ jobId }) => { - logger.debug('Job active', { jobId }); + this.logger.debug('Job progress', { id: job.id, progress }); }); } - async scheduleRecurringTasks() { - // Fetch and check proxies every 15 minutes - await this.queue.add('fetch-and-check', - { type: 'fetch-and-check' }, - { - repeat: { pattern: '*/15 * * * *' }, - removeOnComplete: 10, - removeOnFail: 5, - jobId: 'recurring-proxy-fetch', // Use consistent ID to prevent duplicates - } - ); + private setupScheduledTasks() { + // Market data refresh every minute + this.addRecurringJob({ + type: 'market-data', + service: 'market-data', + provider: 'unified-data', + operation: 'refresh-cache', + payload: { symbols: ['AAPL', 'GOOGL', 'MSFT'] } + }, '*/1 * * * *'); - // Clear cache daily at midnight - await this.queue.add('clear-cache', - { type: 'clear-cache' }, + // Proxy check every 15 minutes + this.addRecurringJob({ + type: 'proxy-fetch', + service: 'proxy', + provider: 'proxy-service', + operation: 'fetch-and-check', + payload: {} + }, '*/15 * * * *'); + + this.logger.info('Scheduled tasks configured'); + } + + async addJob(jobData: JobData, options?: any) { + return this.queue.add(jobData.type, jobData, { + priority: jobData.priority || 0, + removeOnComplete: 10, + removeOnFail: 5, + ...options + }); + } + + async addRecurringJob(jobData: JobData, cronPattern: string) { + return this.queue.add( + `recurring-${jobData.type}`, + jobData, { - repeat: { pattern: '0 0 * * *' }, + repeat: { pattern: cronPattern }, removeOnComplete: 1, removeOnFail: 1, - jobId: 'daily-cache-clear', - } - ); - - logger.info('Recurring tasks scheduled'); - } - - async addImmediateProxyCheck(proxies: ProxyInfo[]) { - return await this.queue.add('check-specific', - { type: 'check-specific', proxies }, - { - priority: 10, - removeOnComplete: 5, - removeOnFail: 3, + jobId: `recurring-${jobData.service}-${jobData.provider}-${jobData.operation}` } ); } - async addManualProxyFetch() { - return await this.queue.add('fetch-and-check', - { type: 'fetch-and-check' }, - { - priority: 5, - removeOnComplete: 5, - removeOnFail: 3, - } - ); - } - - async getQueueStats() { + async getJobStats() { const [waiting, active, completed, failed, delayed] = await Promise.all([ this.queue.getWaiting(), this.queue.getActive(), this.queue.getCompleted(), this.queue.getFailed(), - this.queue.getDelayed(), + this.queue.getDelayed() ]); return { @@ -176,23 +148,39 @@ export class QueueService { active: active.length, completed: completed.length, failed: failed.length, - delayed: delayed.length, + delayed: delayed.length + }; + } + async getQueueStatus() { + const stats = await this.getJobStats(); + return { + ...stats, + workers: this.getWorkerCount(), + queue: this.queue.name, + connection: { + host: process.env.DRAGONFLY_HOST || 'localhost', + port: parseInt(process.env.DRAGONFLY_PORT || '6379') + } }; } - async getQueue() { - return this.queue; + getWorkerCount() { + return this.worker.opts.concurrency || 1; + } + + getRegisteredProviders() { + return [ + { name: 'proxy-service', type: 'proxy', operations: ['fetch-and-check', 'check-specific'] }, + { name: 'market-data-provider', type: 'market-data', operations: ['live-data', 'historical-data'] } + ]; } async shutdown() { - logger.info('Shutting down queue service...'); - + this.logger.info('Shutting down queue manager'); await this.worker.close(); await this.queue.close(); await this.queueEvents.close(); - - logger.info('Queue service shut down'); } } -export const queueService = new QueueService(); +export const queueManager = new QueueService();