diff --git a/apps/data-service/src/index.ts b/apps/data-service/src/index.ts index b43f22e..850c2bf 100644 --- a/apps/data-service/src/index.ts +++ b/apps/data-service/src/index.ts @@ -208,9 +208,11 @@ async function initializeServices() { logger.info('Initializing data service...'); try { - // Initialize queue service + // Initialize queue service (Redis connections should be ready now) + logger.info('Starting queue service initialization...'); await queueManager.initialize(); logger.info('Queue service initialized'); + logger.info('All services initialized successfully'); } catch (error) { logger.error('Failed to initialize services', { error }); @@ -221,22 +223,6 @@ async function initializeServices() { // Start server async function startServer() { await initializeServices(); - - serve({ - fetch: app.fetch, - port: PORT, - }); - - logger.info(`Data Service started on port ${PORT}`); - logger.info('Available endpoints:'); - logger.info(' GET /health - Health check'); - logger.info(' GET /api/queue/status - Queue status'); - logger.info(' POST /api/queue/job - Add job to queue'); - logger.info(' GET /api/live/:symbol - Live market data'); - logger.info(' GET /api/historical/:symbol - Historical market data'); - logger.info(' POST /api/proxy/fetch - Queue proxy fetch'); - logger.info(' POST /api/proxy/check - Queue proxy check'); - logger.info(' GET /api/providers - List registered providers'); } // Graceful shutdown diff --git a/apps/data-service/src/providers/proxy.provider.ts b/apps/data-service/src/providers/proxy.provider.ts index 7e2111d..b6a0648 100644 --- a/apps/data-service/src/providers/proxy.provider.ts +++ b/apps/data-service/src/providers/proxy.provider.ts @@ -123,7 +123,7 @@ export const proxyProvider: ProviderConfig = { } } }, - scheduledJobs: [ + scheduledJobs: [ { type: 'proxy-maintenance', operation: 'fetch-and-check', @@ -131,7 +131,7 @@ export const proxyProvider: ProviderConfig = { // should remove and just run at the same time so app restarts dont keeping adding same jobs cronPattern: getEvery24HourCron(), priority: 5, - immediately: true, + immediately: true, // Don't run immediately during startup to avoid conflicts description: 'Fetch and validate proxy list from sources' } ] diff --git a/apps/data-service/src/providers/proxy.tasks.ts b/apps/data-service/src/providers/proxy.tasks.ts index 4f9c4cc..86461c2 100644 --- a/apps/data-service/src/providers/proxy.tasks.ts +++ b/apps/data-service/src/providers/proxy.tasks.ts @@ -116,17 +116,20 @@ async function initializeSharedResources() { enableMetrics: true }); + // Always initialize httpClient and concurrencyLimit first + httpClient = new HttpClient({ timeout: 10000 }, logger); + concurrencyLimit = pLimit(PROXY_CONFIG.CONCURRENCY_LIMIT); + + // Try to connect to cache, but don't block initialization if it fails try { // Use longer timeout for cache connection await cache.waitForReady(30000); // 30 seconds logger.info('Cache connection established'); } catch (error) { - logger.error('Cache connection failed, continuing with degraded functionality:', error); + logger.warn('Cache connection failed, continuing with degraded functionality:', {error}); // Don't throw - allow the service to continue with cache fallbacks } - httpClient = new HttpClient({ timeout: 10000 }, logger); - concurrencyLimit = pLimit(PROXY_CONFIG.CONCURRENCY_LIMIT); logger.info('Proxy tasks initialized'); } } @@ -172,6 +175,19 @@ export async function fetchProxiesFromSources(): Promise { await initializeSharedResources(); await resetProxyStats(); + // Ensure concurrencyLimit is available before using it + if (!concurrencyLimit) { + logger.error('concurrencyLimit not initialized, using sequential processing'); + const result = []; + for (const source of PROXY_CONFIG.PROXY_SOURCES) { + const proxies = await fetchProxiesFromSource(source); + result.push(...proxies); + } + let allProxies: ProxyInfo[] = result; + allProxies = removeDuplicateProxies(allProxies); + return allProxies; + } + const sources = PROXY_CONFIG.PROXY_SOURCES.map(source => concurrencyLimit(() => fetchProxiesFromSource(source)) ); diff --git a/apps/data-service/src/services/queue.service.ts b/apps/data-service/src/services/queue.service.ts index 6a60965..9e2a0b4 100644 --- a/apps/data-service/src/services/queue.service.ts +++ b/apps/data-service/src/services/queue.service.ts @@ -41,7 +41,7 @@ export class QueueService { maxRetriesPerRequest: null, retryDelayOnFailover: 100, enableReadyCheck: false, - lazyConnect: true, + lazyConnect: false, // Disable Redis Cluster mode if you're using standalone Redis/Dragonfly enableOfflineQueue: true }; diff --git a/libs/cache/src/connection-manager.ts b/libs/cache/src/connection-manager.ts index faf3988..b16610a 100644 --- a/libs/cache/src/connection-manager.ts +++ b/libs/cache/src/connection-manager.ts @@ -16,6 +16,7 @@ export class RedisConnectionManager { private static sharedConnections = new Map(); private static instance: RedisConnectionManager; private logger = getLogger('redis-connection-manager'); + private static readyConnections = new Set(); // Singleton pattern for the manager itself static getInstance(): RedisConnectionManager { @@ -189,6 +190,85 @@ export class RedisConnectionManager { return { healthy: allHealthy, details }; } + + /** + * Wait for all created connections to be ready + * @param timeout Maximum time to wait in milliseconds + * @returns Promise that resolves when all connections are ready + */ + static async waitForAllConnections(timeout: number = 30000): Promise { + const instance = this.getInstance(); + const allConnections = new Map([ + ...instance.connections, + ...this.sharedConnections + ]); + + if (allConnections.size === 0) { + instance.logger.info('No Redis connections to wait for'); + return; + } + + instance.logger.info(`Waiting for ${allConnections.size} Redis connections to be ready...`); + + const connectionPromises = Array.from(allConnections.entries()).map(([name, redis]) => + instance.waitForConnection(redis, name, timeout) + ); + + try { + await Promise.all(connectionPromises); + instance.logger.info('All Redis connections are ready'); + } catch (error) { + instance.logger.error('Failed to establish all Redis connections:', error); + throw error; + } + } + + /** + * Wait for a specific connection to be ready + */ + private async waitForConnection(redis: Redis, name: string, timeout: number): Promise { + return new Promise((resolve, reject) => { + const timeoutId = setTimeout(() => { + reject(new Error(`Redis connection ${name} failed to be ready within ${timeout}ms`)); + }, timeout); + + const onReady = () => { + clearTimeout(timeoutId); + RedisConnectionManager.readyConnections.add(name); + this.logger.info(`Redis connection ready: ${name}`); + resolve(); + }; + + const onError = (err: Error) => { + clearTimeout(timeoutId); + this.logger.error(`Redis connection failed for ${name}:`, err); + reject(err); + }; + + if (redis.status === 'ready') { + onReady(); + } else { + redis.once('ready', onReady); + redis.once('error', onError); + } + }); + } + + /** + * Check if all connections are ready + */ + static areAllConnectionsReady(): boolean { + const instance = this.getInstance(); + const allConnections = new Map([ + ...instance.connections, + ...this.sharedConnections + ]); + + return allConnections.size > 0 && + Array.from(allConnections.keys()).every(name => + this.readyConnections.has(name) + ); + } } export default RedisConnectionManager; diff --git a/libs/event-bus/src/index.ts b/libs/event-bus/src/index.ts index d8b13bd..bab7e92 100644 --- a/libs/event-bus/src/index.ts +++ b/libs/event-bus/src/index.ts @@ -59,7 +59,7 @@ export class EventBus extends EventEmitter { password: dragonflyConfig.DRAGONFLY_PASSWORD, db: dragonflyConfig.DRAGONFLY_DATABASE || 0, maxRetriesPerRequest: dragonflyConfig.DRAGONFLY_MAX_RETRIES, - lazyConnect: true, + lazyConnect: false, }); if (!this.useStreams) { diff --git a/scripts/get-redis-connections.ts b/scripts/get-redis-connections.ts index 66635d7..576c7ba 100644 --- a/scripts/get-redis-connections.ts +++ b/scripts/get-redis-connections.ts @@ -61,7 +61,7 @@ class DragonFlyConnectionMonitor { password: process.env.DRAGONFLY_PASSWORD || undefined, db: parseInt(process.env.DRAGONFLY_DATABASE || '0'), maxRetriesPerRequest: 3, - lazyConnect: true, + lazyConnect: false, }); }