From 9b8a7bdd4bf0b3bb56cea0e8d320a84f2cbbc3c5 Mon Sep 17 00:00:00 2001 From: Bojan Kucera Date: Sun, 8 Jun 2025 18:56:52 -0400 Subject: [PATCH] work on queue --- .../src/providers/proxy.provider.ts | 18 +- .../data-service/src/providers/proxy.tasks.ts | 523 +++++++++--------- .../src/providers/quotemedia.provider.ts | 48 +- .../src/providers/yahoo.provider.ts | 48 +- .../src/services/provider-registry.service.ts | 1 + .../src/services/queue.service.ts | 270 +++++---- 6 files changed, 488 insertions(+), 420 deletions(-) diff --git a/apps/data-service/src/providers/proxy.provider.ts b/apps/data-service/src/providers/proxy.provider.ts index d42f94f..f82190a 100644 --- a/apps/data-service/src/providers/proxy.provider.ts +++ b/apps/data-service/src/providers/proxy.provider.ts @@ -1,5 +1,13 @@ import { ProviderConfig } from '../services/provider-registry.service'; +// This will run at the same time each day as when the app started +const getEvery24HourCron = (): string => { + const now = new Date(); + const hours = now.getHours(); + const minutes = now.getMinutes(); + return `${minutes} ${hours} * * *`; // Every day at startup time +}; + export const proxyProvider: ProviderConfig = { name: 'proxy-service', service: 'proxy', @@ -13,11 +21,6 @@ export const proxyProvider: ProviderConfig = { const { proxyService } = await import('./proxy.tasks'); return await proxyService.checkProxies(payload.proxies); }, - - 'get-working-proxy': async (payload: { protocol?: string; country?: string; timeout?: number }) => { - const { proxyService } = await import('./proxy.tasks'); - return await proxyService.getWorkingProxy(); - } }, scheduledJobs: [ @@ -25,9 +28,12 @@ export const proxyProvider: ProviderConfig = { type: 'proxy-maintenance', operation: 'fetch-and-check', payload: {}, - cronPattern: '*/15 * * * *', // Every 15 minutes + cronPattern: getEvery24HourCron(), // Every 15 minutes priority: 5, + immediately: true, description: 'Fetch and validate proxy list from sources' } ] }; + + diff --git a/apps/data-service/src/providers/proxy.tasks.ts b/apps/data-service/src/providers/proxy.tasks.ts index fc2ef77..936833f 100644 --- a/apps/data-service/src/providers/proxy.tasks.ts +++ b/apps/data-service/src/providers/proxy.tasks.ts @@ -3,17 +3,15 @@ import createCache, { type CacheProvider } from '@stock-bot/cache'; import { HttpClient, ProxyInfo } from '@stock-bot/http'; import pLimit from 'p-limit'; -export class ProxyService { - private logger = new Logger('proxy-service'); - private cache: CacheProvider = createCache('hybrid'); - private httpClient: HttpClient; - private readonly concurrencyLimit = pLimit(100); - private readonly CACHE_KEY = 'proxy'; - private readonly CACHE_TTL = 86400; // 24 hours - private readonly CHECK_TIMEOUT = 7000; - private readonly CHECK_IP = '99.246.102.205' - private readonly CHECK_URL = 'https://proxy-detection.stare.gg/?api_key=bd406bf53ddc6abe1d9de5907830a955'; - private readonly PROXY_SOURCES = [ +// Shared configuration and utilities +const PROXY_CONFIG = { + CACHE_KEY: 'proxy', + CACHE_TTL: 86400, // 24 hours + CHECK_TIMEOUT: 7000, + CHECK_IP: '99.246.102.205', + CHECK_URL: 'https://proxy-detection.stare.gg/?api_key=bd406bf53ddc6abe1d9de5907830a955', + CONCURRENCY_LIMIT: 100, + PROXY_SOURCES: [ {url: 'https://raw.githubusercontent.com/prxchk/proxy-list/main/http.txt',protocol: 'http', }, {url: 'https://raw.githubusercontent.com/casals-ar/proxy-list/main/http',protocol: 'http', }, {url: 'https://raw.githubusercontent.com/MuRongPIG/Proxy-Master/main/http.txt',protocol: 'http', }, @@ -60,267 +58,252 @@ export class ProxyService { // {url: 'https://raw.githubusercontent.com/MuRongPIG/Proxy-Master/main/socks5.txt',protocol: 'socks5', }, // {url: 'https://raw.githubusercontent.com/BreakingTechFr/Proxy_Free/refs/heads/main/proxies/socks5.txt',protocol: 'socks5', }, ] +}; - constructor() { - this.httpClient = new HttpClient({ - timeout: 10000, - }, this.logger); +// Shared instances (module-scoped, not global) +let logger: Logger; +let cache: CacheProvider; +let httpClient: HttpClient; +let concurrencyLimit: ReturnType; - - this.logger.info('ProxyService initialized'); - } - - // Add queue integration methods - async queueProxyFetch(): Promise { - const { queueManager } = await import('../services/queue.service'); - const job = await queueManager.addJob({ - type: 'proxy-fetch', - service: 'proxy', - provider: 'proxy-service', - operation: 'fetch-and-check', - payload: {}, - priority: 5 - }); - - const jobId = job.id || 'unknown'; - this.logger.info('Proxy fetch job queued', { jobId }); - return jobId; - } - - async queueProxyCheck(proxies: ProxyInfo[]): Promise { - const { queueManager } = await import('../services/queue.service'); - const job = await queueManager.addJob({ - type: 'proxy-check', - service: 'proxy', - provider: 'proxy-service', - operation: 'check-specific', - payload: { proxies }, - priority: 3 - }); - - const jobId = job.id || 'unknown'; - this.logger.info('Proxy check job queued', { jobId, count: proxies.length }); - return jobId; - } - - - async fetchProxiesFromSources() : Promise { - const sources = this.PROXY_SOURCES.map(source => - this.concurrencyLimit(() => this.fetchProxiesFromSource(source)) - ) - const result = await Promise.all(sources); - let allProxies: ProxyInfo[] = result.flat(); - allProxies = this.removeDuplicateProxies(allProxies) - await this.checkProxies(allProxies) - return allProxies.length - } - - private removeDuplicateProxies(proxies: ProxyInfo[]): ProxyInfo[] { - const seen = new Set(); - const unique: ProxyInfo[] = []; - - for (const proxy of proxies) { - const key = `${proxy.protocol}://${proxy.host}:${proxy.port}`; - if (!seen.has(key)) { - seen.add(key); - unique.push(proxy); - } - } - - return unique; - } - - - async fetchProxiesFromSource(source: { url: string; protocol: string }): Promise { - const allProxies: ProxyInfo[] = []; - - try { - this.logger.info(`Fetching proxies from ${source.url}`); - - const response = await this.httpClient.get(source.url, { - timeout: 10000 - }); - - if (response.status !== 200) { - this.logger.warn(`Failed to fetch from ${source.url}: ${response.status}`); - return [] - } - - const text = response.data; - const lines = text.split('\n').filter((line: string) => line.trim()); - - - for (const line of lines) { - let trimmed = line.trim(); - trimmed = this.cleanProxyUrl(trimmed); - if (!trimmed || trimmed.startsWith('#')) continue; - - // Parse formats like "host:port" or "host:port:user:pass" - const parts = trimmed.split(':'); - if (parts.length >= 2) { - const proxy: ProxyInfo = { - protocol: source.protocol as 'http' | 'https' | 'socks4' | 'socks5', - host: parts[0], - port: parseInt(parts[1]) - }; - - if (!isNaN(proxy.port) && proxy.host) { - allProxies.push(proxy); - } - } - } - - this.logger.info(`Parsed ${allProxies.length} proxies from ${source.url}`); - - } catch (error) { - this.logger.error(`Error fetching proxies from ${source.url}`, error); - return []; - } - // this.logger.info(`Total proxies fetched: ${allProxies.length}`); - return allProxies; - } - - private cleanProxyUrl(url: string): string { - // Remove http:// or https:// and any leading zeros from the host part - return url - .replace(/^https?:\/\//, '') // Remove protocol - .replace(/^0+/, '') // Remove leading zeros at start - .replace(/:0+(\d)/g, ':$1'); // Remove leading zeros from port numbers - } - - /** - * Check if a proxy is working - */ - async checkProxy(proxy: ProxyInfo): Promise { - let success = false; - this.logger.debug(`Checking Proxy with ${this.concurrencyLimit.pendingCount } pending: `, { - protocol: proxy.protocol, - host: proxy.host, - port: proxy.port, - }); - // console.log('Checking proxy:', `${proxy.protocol}://${proxy.host}:${proxy.port}`, this.concurrencyLimit.activeCount, this.concurrencyLimit.pendingCount); - try { - - - // Test the proxy - const response = await this.httpClient.get(this.CHECK_URL, { - proxy, - timeout: this.CHECK_TIMEOUT - }); - - const isWorking = response.status >= 200 && response.status < 300; - - const result: ProxyInfo = { - ...proxy, - isWorking, - checkedAt: new Date(), - responseTime: response.responseTime, - }; - // console.log('Proxy check result:', proxy); - if (isWorking && !JSON.stringify(response.data).includes(this.CHECK_IP)) { - success = true - await this.cache.set(`${this.CACHE_KEY}:${proxy.protocol}://${proxy.host}:${proxy.port}`, result, this.CACHE_TTL); - } else { - await this.cache.del(`${this.CACHE_KEY}:${proxy.protocol}://${proxy.host}:${proxy.port}`); - } - - this.logger.debug('Proxy check completed', { - host: proxy.host, - port: proxy.port, - isWorking, - }); - - return result; - - } catch (error) { - const errorMessage = error instanceof Error ? error.message : String(error); - - const result: ProxyInfo = { - ...proxy, - isWorking: false, - error: errorMessage, - checkedAt: new Date() - }; - - // Cache failed result for shorter time - // await this.cache.set(cacheKey, result, 300); // 5 minutes - if(!success) // If the proxy check failed, remove it from cache - success is here cause i think abort signal fails sometimes - await this.cache.del(`${this.CACHE_KEY}:${proxy.protocol}://${proxy.host}:${proxy.port}`); - - this.logger.debug('Proxy check failed', { - host: proxy.host, - port: proxy.port, - error: errorMessage - }); - - return result; - } - } - - /** - * Check multiple proxies concurrently - */ - async checkProxies(proxies: ProxyInfo[]): Promise { - this.logger.info('Checking proxies', { count: proxies.length }); - - const checkPromises = proxies.map(proxy => - this.concurrencyLimit(() => this.checkProxy(proxy)) - ); - - const results = await Promise.all(checkPromises); - const workingCount = results.filter(r => r.isWorking).length; - - this.logger.info('Proxy check completed', { - total: proxies.length, - working: workingCount, - failed: proxies.length - workingCount - }); - - return results; - } - - /** - * Get a random working proxy from cache - */ - async getWorkingProxy(): Promise { - try { - // Note: This is a simplified implementation - // In production, you'd want to maintain a working proxies list - this.logger.warn('getWorkingProxy not fully implemented - requires proxy list management'); - return null; - } catch (error) { - this.logger.error('Error getting working proxy', error); - return null; - } - } - - /** - * Add proxies to check and cache - */ - async addProxies(proxies: ProxyInfo[]): Promise { - this.logger.info('Adding proxies for validation', { count: proxies.length }); - - // Start background validation - this.checkProxies(proxies).catch(error => { - this.logger.error('Error in background proxy validation', error); - }); - } - - /** - * Clear proxy cache - */ - async clearCache(): Promise { - this.logger.info('Clearing proxy cache'); - // Note: Cache provider limitations - would need proper key tracking - } - - /** - * Shutdown service - */ - async shutdown(): Promise { - this.logger.info('Shutting down ProxyService'); +// Initialize shared resources +function initializeSharedResources() { + if (!logger) { + logger = new Logger('proxy-tasks'); + cache = createCache('hybrid'); + httpClient = new HttpClient({ timeout: 10000 }, logger); + concurrencyLimit = pLimit(PROXY_CONFIG.CONCURRENCY_LIMIT); + logger.info('Proxy tasks initialized'); } } -// Export singleton instance -export const proxyService = new ProxyService(); \ No newline at end of file +// Individual task functions +export async function queueProxyFetch(): Promise { + initializeSharedResources(); + + const { queueManager } = await import('../services/queue.service'); + const job = await queueManager.addJob({ + type: 'proxy-fetch', + service: 'proxy', + provider: 'proxy-service', + operation: 'fetch-and-check', + payload: {}, + priority: 5 + }); + + const jobId = job.id || 'unknown'; + logger.info('Proxy fetch job queued', { jobId }); + return jobId; +} + +export async function queueProxyCheck(proxies: ProxyInfo[]): Promise { + initializeSharedResources(); + + const { queueManager } = await import('../services/queue.service'); + const job = await queueManager.addJob({ + type: 'proxy-check', + service: 'proxy', + provider: 'proxy-service', + operation: 'check-specific', + payload: { proxies }, + priority: 3 + }); + + const jobId = job.id || 'unknown'; + logger.info('Proxy check job queued', { jobId, count: proxies.length }); + return jobId; +} + +export async function fetchProxiesFromSources(): Promise { + initializeSharedResources(); + + const sources = PROXY_CONFIG.PROXY_SOURCES.map(source => + concurrencyLimit(() => fetchProxiesFromSource(source)) + ); + const result = await Promise.all(sources); + let allProxies: ProxyInfo[] = result.flat(); + allProxies = removeDuplicateProxies(allProxies); + // await checkProxies(allProxies); + return allProxies.length; +} + +export async function fetchProxiesFromSource(source: { url: string; protocol: string }): Promise { + initializeSharedResources(); + + const allProxies: ProxyInfo[] = []; + + try { + logger.info(`Fetching proxies from ${source.url}`); + + const response = await httpClient.get(source.url, { + timeout: 10000 + }); + + if (response.status !== 200) { + logger.warn(`Failed to fetch from ${source.url}: ${response.status}`); + return []; + } + + const text = response.data; + const lines = text.split('\n').filter((line: string) => line.trim()); + + for (const line of lines) { + let trimmed = line.trim(); + trimmed = cleanProxyUrl(trimmed); + if (!trimmed || trimmed.startsWith('#')) continue; + + // Parse formats like "host:port" or "host:port:user:pass" + const parts = trimmed.split(':'); + if (parts.length >= 2) { + const proxy: ProxyInfo = { + protocol: source.protocol as 'http' | 'https' | 'socks4' | 'socks5', + host: parts[0], + port: parseInt(parts[1]) + }; + + if (!isNaN(proxy.port) && proxy.host) { + allProxies.push(proxy); + } + } + } + + logger.info(`Parsed ${allProxies.length} proxies from ${source.url}`); + + } catch (error) { + logger.error(`Error fetching proxies from ${source.url}`, error); + return []; + } + + return allProxies; +} + +/** + * Check multiple proxies concurrently + */ +export async function checkProxies(proxies: ProxyInfo[]): Promise { + initializeSharedResources(); + + logger.info('Checking proxies', { count: proxies.length }); + + const checkPromises = proxies.map(proxy => + concurrencyLimit(() => checkProxy(proxy)) + ); + const results = await Promise.all(checkPromises); + const workingCount = results.filter((r: ProxyInfo) => r.isWorking).length; + + logger.info('Proxy check completed', { + total: proxies.length, + working: workingCount, + failed: proxies.length - workingCount + }); + + return results; +} + +/** + * Check if a proxy is working + */ +export async function checkProxy(proxy: ProxyInfo): Promise { + initializeSharedResources(); + + let success = false; + logger.debug(`Checking Proxy with ${concurrencyLimit.pendingCount} pending:`, { + protocol: proxy.protocol, + host: proxy.host, + port: proxy.port, + }); + + try { + // Test the proxy + const response = await httpClient.get(PROXY_CONFIG.CHECK_URL, { + proxy, + timeout: PROXY_CONFIG.CHECK_TIMEOUT + }); + + const isWorking = response.status >= 200 && response.status < 300; + + const result: ProxyInfo = { + ...proxy, + isWorking, + checkedAt: new Date(), + responseTime: response.responseTime, + }; + + if (isWorking && !JSON.stringify(response.data).includes(PROXY_CONFIG.CHECK_IP)) { + success = true; + await cache.set(`${PROXY_CONFIG.CACHE_KEY}:${proxy.protocol}://${proxy.host}:${proxy.port}`, result, PROXY_CONFIG.CACHE_TTL); + } else { + await cache.del(`${PROXY_CONFIG.CACHE_KEY}:${proxy.protocol}://${proxy.host}:${proxy.port}`); + } + + logger.debug('Proxy check completed', { + host: proxy.host, + port: proxy.port, + isWorking, + }); + + return result; + + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); + + const result: ProxyInfo = { + ...proxy, + isWorking: false, + error: errorMessage, + checkedAt: new Date() + }; + + // If the proxy check failed, remove it from cache - success is here cause i think abort signal fails sometimes + if (!success) { + await cache.del(`${PROXY_CONFIG.CACHE_KEY}:${proxy.protocol}://${proxy.host}:${proxy.port}`); + } + + logger.debug('Proxy check failed', { + host: proxy.host, + port: proxy.port, + error: errorMessage + }); + + return result; + } +} + + + +// Utility functions +function cleanProxyUrl(url: string): string { + return url + .replace(/^https?:\/\//, '') + .replace(/^0+/, '') + .replace(/:0+(\d)/g, ':$1'); +} + +function removeDuplicateProxies(proxies: ProxyInfo[]): ProxyInfo[] { + const seen = new Set(); + const unique: ProxyInfo[] = []; + + for (const proxy of proxies) { + const key = `${proxy.protocol}://${proxy.host}:${proxy.port}`; + if (!seen.has(key)) { + seen.add(key); + unique.push(proxy); + } + } + + return unique; +} + +// Optional: Export a convenience object that groups related tasks +export const proxyTasks = { + queueProxyFetch, + queueProxyCheck, + fetchProxiesFromSources, + fetchProxiesFromSource, + checkProxy, + checkProxies, +}; + +// Export singleton instance for backward compatibility (optional) +// Remove this if you want to fully move to the task-based approach +export const proxyService = proxyTasks; \ No newline at end of file diff --git a/apps/data-service/src/providers/quotemedia.provider.ts b/apps/data-service/src/providers/quotemedia.provider.ts index 79afbe0..23a2c04 100644 --- a/apps/data-service/src/providers/quotemedia.provider.ts +++ b/apps/data-service/src/providers/quotemedia.provider.ts @@ -166,29 +166,29 @@ export const quotemediaProvider: ProviderConfig = { }, scheduledJobs: [ - { - type: 'quotemedia-premium-refresh', - operation: 'batch-quotes', - payload: { symbols: ['AAPL', 'GOOGL', 'MSFT'] }, - cronPattern: '*/2 * * * *', // Every 2 minutes - priority: 7, - description: 'Refresh premium quotes with detailed market data' - }, - { - type: 'quotemedia-options-update', - operation: 'options-chain', - payload: { symbol: 'SPY' }, - cronPattern: '*/10 * * * *', // Every 10 minutes - priority: 5, - description: 'Update options chain data for SPY ETF' - }, - { - type: 'quotemedia-profiles', - operation: 'company-profile', - payload: { symbol: 'AAPL' }, - cronPattern: '0 9 * * 1-5', // Weekdays at 9 AM - priority: 3, - description: 'Update company profile data' - } + // { + // type: 'quotemedia-premium-refresh', + // operation: 'batch-quotes', + // payload: { symbols: ['AAPL', 'GOOGL', 'MSFT'] }, + // cronPattern: '*/2 * * * *', // Every 2 minutes + // priority: 7, + // description: 'Refresh premium quotes with detailed market data' + // }, + // { + // type: 'quotemedia-options-update', + // operation: 'options-chain', + // payload: { symbol: 'SPY' }, + // cronPattern: '*/10 * * * *', // Every 10 minutes + // priority: 5, + // description: 'Update options chain data for SPY ETF' + // }, + // { + // type: 'quotemedia-profiles', + // operation: 'company-profile', + // payload: { symbol: 'AAPL' }, + // cronPattern: '0 9 * * 1-5', // Weekdays at 9 AM + // priority: 3, + // description: 'Update company profile data' + // } ] }; diff --git a/apps/data-service/src/providers/yahoo.provider.ts b/apps/data-service/src/providers/yahoo.provider.ts index 402269d..b4dd25f 100644 --- a/apps/data-service/src/providers/yahoo.provider.ts +++ b/apps/data-service/src/providers/yahoo.provider.ts @@ -227,29 +227,29 @@ export const yahooProvider: ProviderConfig = { }, scheduledJobs: [ - { - type: 'yahoo-market-refresh', - operation: 'live-data', - payload: { symbol: 'AAPL' }, - cronPattern: '*/1 * * * *', // Every minute - priority: 8, - description: 'Refresh Apple stock price from Yahoo Finance' - }, - { - type: 'yahoo-sp500-update', - operation: 'live-data', - payload: { symbol: 'SPY' }, - cronPattern: '*/2 * * * *', // Every 2 minutes - priority: 9, - description: 'Update S&P 500 ETF price' - }, - { - type: 'yahoo-earnings-check', - operation: 'earnings', - payload: { symbol: 'AAPL' }, - cronPattern: '0 16 * * 1-5', // Weekdays at 4 PM (market close) - priority: 6, - description: 'Check earnings data for Apple' - } + // { + // type: 'yahoo-market-refresh', + // operation: 'live-data', + // payload: { symbol: 'AAPL' }, + // cronPattern: '*/1 * * * *', // Every minute + // priority: 8, + // description: 'Refresh Apple stock price from Yahoo Finance' + // }, + // { + // type: 'yahoo-sp500-update', + // operation: 'live-data', + // payload: { symbol: 'SPY' }, + // cronPattern: '*/2 * * * *', // Every 2 minutes + // priority: 9, + // description: 'Update S&P 500 ETF price' + // }, + // { + // type: 'yahoo-earnings-check', + // operation: 'earnings', + // payload: { symbol: 'AAPL' }, + // cronPattern: '0 16 * * 1-5', // Weekdays at 4 PM (market close) + // priority: 6, + // description: 'Check earnings data for Apple' + // } ] }; diff --git a/apps/data-service/src/services/provider-registry.service.ts b/apps/data-service/src/services/provider-registry.service.ts index d7e0f59..90ebb43 100644 --- a/apps/data-service/src/services/provider-registry.service.ts +++ b/apps/data-service/src/services/provider-registry.service.ts @@ -11,6 +11,7 @@ export interface ScheduledJob { cronPattern: string; priority?: number; description?: string; + immediately?: boolean; } export interface ProviderConfig { diff --git a/apps/data-service/src/services/queue.service.ts b/apps/data-service/src/services/queue.service.ts index 6e8b4e8..d9c272f 100644 --- a/apps/data-service/src/services/queue.service.ts +++ b/apps/data-service/src/services/queue.service.ts @@ -9,12 +9,13 @@ export interface JobData { operation: string; payload: any; priority?: number; + immediately?: boolean; } export class QueueService { private logger = new Logger('queue-service'); private queue!: Queue; - private worker!: Worker; + private workers: Worker[] = []; private queueEvents!: QueueEvents; private isInitialized = false; @@ -45,6 +46,10 @@ export class QueueService { enableOfflineQueue: false }; + // Worker configuration + const workerCount = parseInt(process.env.WORKER_COUNT || '4'); + const concurrencyPerWorker = parseInt(process.env.WORKER_CONCURRENCY || '20'); + this.logger.info('Connecting to Redis/Dragonfly', connection); try { @@ -60,13 +65,34 @@ export class QueueService { } } }); - this.worker = new Worker('{data-service-queue}', this.processJob.bind(this), { - connection, - concurrency: 5, // Reduce concurrency to avoid overwhelming Redis - }); + // Create multiple workers + for (let i = 0; i < workerCount; i++) { + const worker = new Worker( + '{data-service-queue}', + this.processJob.bind(this), + { + connection: { ...connection }, // Each worker gets its own connection + concurrency: concurrencyPerWorker, + maxStalledCount: 1, + stalledInterval: 30000, + } + ); + // Add worker-specific logging + worker.on('ready', () => { + this.logger.info(`Worker ${i + 1} ready`, { workerId: i + 1 }); + }); + + worker.on('error', (error) => { + this.logger.error(`Worker ${i + 1} error`, { workerId: i + 1, error }); + }); + + this.workers.push(worker); + } this.queueEvents = new QueueEvents('{data-service-queue}', { connection }); // Test connection + + // Wait for all workers to be ready await this.queue.waitUntilReady(); - await this.worker.waitUntilReady(); + await Promise.all(this.workers.map(worker => worker.waitUntilReady())); await this.queueEvents.waitUntilReady(); this.setupEventListeners(); @@ -81,6 +107,16 @@ export class QueueService { } } + // Update getTotalConcurrency method + getTotalConcurrency() { + if (!this.isInitialized) { + return 0; + } + return this.workers.reduce((total, worker) => { + return total + (worker.opts.concurrency || 1); + }, 0); + } + private async registerProviders() { this.logger.info('Registering providers...'); @@ -159,79 +195,104 @@ export class QueueService { }); } private async setupScheduledTasks() { - try { - this.logger.info('Setting up scheduled tasks from providers...'); - - // Clear any existing repeatable jobs first - const repeatableJobs = await this.queue.getRepeatableJobs(); - this.logger.info(`Found ${repeatableJobs.length} existing repeatable jobs`); - - for (const job of repeatableJobs) { - try { - await this.queue.removeJobScheduler(job.name); - this.logger.debug('Removed existing repeatable job', { name: job.name }); - } catch (error) { - this.logger.warn('Failed to remove existing repeatable job', { - name: job.name, - error: error instanceof Error ? error.message : String(error) - }); - } - } - - // Get all scheduled jobs from all providers - const allScheduledJobs = providerRegistry.getAllScheduledJobs(); - - if (allScheduledJobs.length === 0) { - this.logger.warn('No scheduled jobs found in providers'); - return; - } - - let successCount = 0; - let failureCount = 0; - - // Register each scheduled job with delay between registrations - for (const { service, provider, job } of allScheduledJobs) { - try { - // Add a small delay between job registrations to avoid overwhelming Redis - await new Promise(resolve => setTimeout(resolve, 100)); - - await this.addRecurringJob({ - type: job.type, - service: service, - provider: provider, - operation: job.operation, - payload: job.payload, - priority: job.priority - }, job.cronPattern); - - this.logger.info('Scheduled job registered', { - type: job.type, - service, - provider, - operation: job.operation, - cronPattern: job.cronPattern, - description: job.description - }); - - successCount++; - - } catch (error) { - this.logger.error('Failed to register scheduled job', { - type: job.type, - service, - provider, - error: error instanceof Error ? error.message : String(error) - }); - failureCount++; - } - } - - this.logger.info(`Scheduled tasks setup complete: ${successCount} successful, ${failureCount} failed`); - - } catch (error) { - this.logger.error('Failed to setup scheduled tasks', error); + try { + this.logger.info('Setting up scheduled tasks from providers...'); + + // Get all scheduled jobs from all providers + const allScheduledJobs = providerRegistry.getAllScheduledJobs(); + + if (allScheduledJobs.length === 0) { + this.logger.warn('No scheduled jobs found in providers'); + return; } + + // Get existing repeatable jobs for comparison + const existingJobs = await this.queue.getRepeatableJobs(); + this.logger.info(`Found ${existingJobs.length} existing repeatable jobs`); + + let successCount = 0; + let failureCount = 0; + let updatedCount = 0; + let newCount = 0; + + // Process each scheduled job + for (const { service, provider, job } of allScheduledJobs) { + try { + const jobKey = `${service}-${provider}-${job.operation}`; + + // Check if this job already exists + const existingJob = existingJobs.find(existing => + existing.key?.includes(jobKey) || existing.name === job.type + ); + + if (existingJob) { + // Check if the job needs updating (different cron pattern or config) + const needsUpdate = existingJob.pattern !== job.cronPattern; + + if (needsUpdate) { + this.logger.info('Job configuration changed, updating', { + jobKey, + oldPattern: existingJob.pattern, + newPattern: job.cronPattern + }); + updatedCount++; + } else { + this.logger.debug('Job unchanged, skipping', { jobKey }); + successCount++; + continue; + } + } else { + newCount++; + } + + // Add delay between job registrations + await new Promise(resolve => setTimeout(resolve, 100)); + + await this.addRecurringJob({ + type: job.type, + service: service, + provider: provider, + operation: job.operation, + payload: job.payload, + priority: job.priority, + immediately: job.immediately || false + }, job.cronPattern); + + this.logger.info('Scheduled job registered', { + type: job.type, + service, + provider, + operation: job.operation, + cronPattern: job.cronPattern, + description: job.description, + immediately: job.immediately || false + }); + + successCount++; + + } catch (error) { + this.logger.error('Failed to register scheduled job', { + type: job.type, + service, + provider, + error: error instanceof Error ? error.message : String(error) + }); + failureCount++; + } + } + + this.logger.info(`Scheduled tasks setup complete`, { + total: allScheduledJobs.length, + successful: successCount, + failed: failureCount, + updated: updatedCount, + new: newCount + }); + + } catch (error) { + this.logger.error('Failed to setup scheduled tasks', error); } +} async addJob(jobData: JobData, options?: any) { if (!this.isInitialized) { @@ -251,28 +312,43 @@ export class QueueService { } try { - // Create a unique job ID to avoid Redis key conflicts - const jobId = `recurring-${jobData.service}-${jobData.provider}-${jobData.operation}`; + // Create a unique job key for this specific job + const jobKey = `${jobData.service}-${jobData.provider}-${jobData.operation}`; - // First, try to remove any existing recurring job with the same ID - try { - await this.queue.removeRepeatable(jobData.type, { - pattern: cronPattern, - jobId: jobId + // Get all existing repeatable jobs + const existingJobs = await this.queue.getRepeatableJobs(); + + // Find and remove the existing job with the same key if it exists + const existingJob = existingJobs.find(job => { + // Check if this is the same job by comparing key components + return job.key?.includes(jobKey) || job.name === jobData.type; + }); + + if (existingJob) { + this.logger.info('Updating existing recurring job', { + jobKey, + existingPattern: existingJob.pattern, + newPattern: cronPattern }); - } catch (removeError) { - // Ignore errors when removing non-existent jobs - this.logger.debug('No existing recurring job to remove', { jobId }); + + // Remove the existing job + await this.queue.removeRepeatableByKey(existingJob.key); + + // Small delay to ensure cleanup is complete + await new Promise(resolve => setTimeout(resolve, 100)); + } else { + this.logger.info('Creating new recurring job', { jobKey, cronPattern }); } - // Add the new recurring job with proper options + // Add the new/updated recurring job const job = await this.queue.add(jobData.type, jobData, { repeat: { pattern: cronPattern, - // Use UTC timezone to avoid timezone issues - tz: 'UTC' + tz: 'UTC', + immediately: jobData.immediately || false, }, - jobId: jobId, + // Use a consistent jobId for this specific recurring job + jobId: `recurring-${jobKey}`, removeOnComplete: 1, removeOnFail: 1, attempts: 2, @@ -283,16 +359,17 @@ export class QueueService { ...options }); - this.logger.info('Recurring job added successfully', { - jobId: jobId, + this.logger.info('Recurring job added/updated successfully', { + jobKey, type: jobData.type, - cronPattern: cronPattern + cronPattern, + immediately: jobData.immediately || false }); return job; } catch (error) { - this.logger.error('Failed to add recurring job', { + this.logger.error('Failed to add/update recurring job', { jobData, cronPattern, error: error instanceof Error ? error.message : String(error) @@ -363,7 +440,8 @@ export class QueueService { operation: job.operation, cronPattern: job.cronPattern, priority: job.priority, - description: job.description + description: job.description, + immediately: job.immediately || false })); }