From 830b9e94a1b94d9a2ee76c3b7a6cd3521bfcb91a Mon Sep 17 00:00:00 2001 From: Boki Date: Fri, 20 Jun 2025 23:11:28 -0400 Subject: [PATCH] fixes --- .env | 2 +- libs/cache/src/index.ts | 54 +-------- libs/utils/src/proxy/index.ts | 9 ++ libs/utils/src/proxy/proxy-manager.ts | 14 +++ libs/utils/src/proxy/proxy-sync.ts | 157 ++++++++++++++++++++++++++ 5 files changed, 184 insertions(+), 52 deletions(-) create mode 100644 libs/utils/src/proxy/proxy-sync.ts diff --git a/.env b/.env index cf5abf7..942b1f9 100644 --- a/.env +++ b/.env @@ -4,7 +4,7 @@ # Core Application Settings NODE_ENV=development -LOG_LEVEL=error +LOG_LEVEL=info # Data Service Configuration DATA_SERVICE_PORT=2001 diff --git a/libs/cache/src/index.ts b/libs/cache/src/index.ts index d11247b..2594101 100644 --- a/libs/cache/src/index.ts +++ b/libs/cache/src/index.ts @@ -1,5 +1,5 @@ import { RedisCache } from './redis-cache'; -import type { CacheOptions, CacheProvider, RedisConfig } from './types'; +import type { CacheOptions, CacheProvider } from './types'; // Cache instances registry to prevent multiple instances with same prefix const cacheInstances = new Map(); @@ -37,62 +37,14 @@ export function createCache(options: CacheOptions): CacheProvider { return new RedisCache(defaultOptions); } -/** - * Create a cache instance for trading data - */ -export function createTradingCache(redisConfig: RedisConfig, options?: Partial>): CacheProvider { - return createCache({ - keyPrefix: 'trading:', - ttl: 3600, // 1 hour default - enableMetrics: true, - shared: true, - ...options, - redisConfig, - }); -} - -/** - * Create a cache for market data with shorter TTL - */ -export function createMarketDataCache(redisConfig: RedisConfig, options?: Partial>): CacheProvider { - return createCache({ - keyPrefix: 'market:', - ttl: 300, // 5 minutes for market data - enableMetrics: true, - shared: true, - ...options, - redisConfig, - }); -} - -/** - * Create a cache for indicators with longer TTL - */ -export function createIndicatorCache(redisConfig: RedisConfig, options?: Partial>): CacheProvider { - return createCache({ - keyPrefix: 'indicators:', - ttl: 1800, // 30 minutes for indicators - enableMetrics: true, - shared: true, - ...options, - redisConfig, - }); -} - // Export types and classes export type { - CacheProvider, - CacheOptions, - CacheConfig, - CacheStats, - CacheKey, - SerializationOptions, - RedisConfig, + CacheConfig, CacheKey, CacheOptions, CacheProvider, CacheStats, RedisConfig, SerializationOptions } from './types'; -export { RedisCache } from './redis-cache'; export { RedisConnectionManager } from './connection-manager'; export { CacheKeyGenerator } from './key-generator'; +export { RedisCache } from './redis-cache'; // Default export for convenience export default createCache; diff --git a/libs/utils/src/proxy/index.ts b/libs/utils/src/proxy/index.ts index 91e1b41..fe21f79 100644 --- a/libs/utils/src/proxy/index.ts +++ b/libs/utils/src/proxy/index.ts @@ -9,4 +9,13 @@ export { getWorkingProxies, updateProxies } from './proxy-manager'; + +export { + ProxySyncService, + getProxySyncService, + startProxySync, + stopProxySync, + syncProxiesOnce +} from './proxy-sync'; + export type { ProxyInfo } from '@stock-bot/http'; // Re-export for convenience \ No newline at end of file diff --git a/libs/utils/src/proxy/proxy-manager.ts b/libs/utils/src/proxy/proxy-manager.ts index 9a306a6..f7000d2 100644 --- a/libs/utils/src/proxy/proxy-manager.ts +++ b/libs/utils/src/proxy/proxy-manager.ts @@ -35,6 +35,11 @@ export class ProxyManager { try { logger.info('Initializing proxy manager...'); + + // Wait for cache to be ready + await this.cache.waitForReady(10000); // Wait up to 10 seconds + logger.debug('Cache is ready'); + await this.loadFromCache(); this.isInitialized = true; logger.info('Proxy manager initialized', { @@ -231,6 +236,15 @@ export class ProxyManager { if (!ProxyManager.instance) { ProxyManager.instance = new ProxyManager(); await ProxyManager.instance.initializeInternal(); + + // Perform initial sync with proxy:active:* storage + try { + const { syncProxiesOnce } = await import('./proxy-sync'); + await syncProxiesOnce(); + logger.info('Initial proxy sync completed'); + } catch (error) { + logger.error('Failed to perform initial proxy sync', { error }); + } } } diff --git a/libs/utils/src/proxy/proxy-sync.ts b/libs/utils/src/proxy/proxy-sync.ts new file mode 100644 index 0000000..cb99e46 --- /dev/null +++ b/libs/utils/src/proxy/proxy-sync.ts @@ -0,0 +1,157 @@ +/** + * Proxy Storage Synchronization Service + * + * This service bridges the gap between two proxy storage systems: + * 1. proxy:active:* keys (used by proxy tasks for individual proxy storage) + * 2. proxies:active-proxies (used by ProxyManager for centralized storage) + */ + +import { createCache, type CacheProvider } from '@stock-bot/cache'; +import { getDatabaseConfig } from '@stock-bot/config'; +import { getLogger } from '@stock-bot/logger'; +import type { ProxyInfo } from '@stock-bot/http'; +import { ProxyManager } from './proxy-manager'; + +const logger = getLogger('proxy-sync'); + +export class ProxySyncService { + private cache: CacheProvider; + private syncInterval: Timer | null = null; + private isRunning = false; + + constructor() { + const databaseConfig = getDatabaseConfig(); + this.cache = createCache({ + redisConfig: databaseConfig.dragonfly, + keyPrefix: '', // No prefix to access all keys + ttl: 86400, + }); + } + + /** + * Start the synchronization service + * @param intervalMs - Sync interval in milliseconds (default: 5 minutes) + */ + async start(intervalMs: number = 300000): Promise { + if (this.isRunning) { + logger.warn('Proxy sync service is already running'); + return; + } + + this.isRunning = true; + logger.info('Starting proxy sync service', { intervalMs }); + + // Wait for cache to be ready before initial sync + await this.cache.waitForReady(10000); + + // Initial sync + await this.syncProxies(); + + // Set up periodic sync + this.syncInterval = setInterval(async () => { + try { + await this.syncProxies(); + } catch (error) { + logger.error('Error during periodic sync', { error }); + } + }, intervalMs); + } + + /** + * Stop the synchronization service + */ + stop(): void { + if (this.syncInterval) { + clearInterval(this.syncInterval); + this.syncInterval = null; + } + this.isRunning = false; + logger.info('Stopped proxy sync service'); + } + + /** + * Perform a one-time synchronization + */ + async syncProxies(): Promise { + try { + logger.debug('Starting proxy synchronization'); + + // Wait for cache to be ready + await this.cache.waitForReady(5000); + + // Collect all proxies from proxy:active:* storage + const proxyKeys = await this.cache.keys('proxy:active:*'); + + if (proxyKeys.length === 0) { + logger.debug('No proxies found in proxy:active:* storage'); + return; + } + + const allProxies: ProxyInfo[] = []; + + // Fetch all proxies in parallel for better performance + const proxyPromises = proxyKeys.map(key => this.cache.get(key)); + const proxyResults = await Promise.all(proxyPromises); + + for (const proxy of proxyResults) { + if (proxy) { + allProxies.push(proxy); + } + } + + const workingCount = allProxies.filter(p => p.isWorking).length; + + logger.info('Collected proxies from storage', { + total: allProxies.length, + working: workingCount, + }); + + // Update ProxyManager with all proxies + const manager = ProxyManager.getInstance(); + await manager.updateProxies(allProxies); + + logger.info('Proxy synchronization completed', { + synchronized: allProxies.length, + working: workingCount, + }); + } catch (error) { + logger.error('Failed to sync proxies', { error }); + throw error; + } + } + + /** + * Get synchronization status + */ + getStatus(): { isRunning: boolean; lastSync?: Date } { + return { + isRunning: this.isRunning, + }; + } +} + +// Export singleton instance +let syncServiceInstance: ProxySyncService | null = null; + +export function getProxySyncService(): ProxySyncService { + if (!syncServiceInstance) { + syncServiceInstance = new ProxySyncService(); + } + return syncServiceInstance; +} + +// Convenience functions +export async function startProxySync(intervalMs?: number): Promise { + const service = getProxySyncService(); + await service.start(intervalMs); +} + +export function stopProxySync(): void { + const service = getProxySyncService(); + service.stop(); +} + +export async function syncProxiesOnce(): Promise { + const service = getProxySyncService(); + await service.syncProxies(); +} \ No newline at end of file