diff --git a/apps/data-ingestion/AWILIX-MIGRATION.md b/apps/data-ingestion/AWILIX-MIGRATION.md new file mode 100644 index 0000000..e23d3d1 --- /dev/null +++ b/apps/data-ingestion/AWILIX-MIGRATION.md @@ -0,0 +1,85 @@ +# Awilix DI Container Migration Guide + +This guide explains how to use the new Awilix dependency injection container in the data-ingestion service. + +## Overview + +The Awilix container provides proper dependency injection for decoupled libraries, allowing them to be reused in other projects without stock-bot specific dependencies. + +## Current Implementation + +The data-ingestion service now uses a hybrid approach: +1. Awilix container for ProxyManager and other decoupled services +2. Legacy service factory for backward compatibility + +## Usage Example + +```typescript +// Create Awilix container +const awilixConfig = { + redis: { + host: config.database.dragonfly.host, + port: config.database.dragonfly.port, + db: config.database.dragonfly.db, + }, + mongodb: { + uri: config.database.mongodb.uri, + database: config.database.mongodb.database, + }, + postgres: { + host: config.database.postgres.host, + port: config.database.postgres.port, + database: config.database.postgres.database, + user: config.database.postgres.user, + password: config.database.postgres.password, + }, + proxy: { + cachePrefix: 'proxy:', + ttl: 3600, + }, +}; + +const container = createServiceContainer(awilixConfig); +await initializeServices(container); + +// Access services from container +const proxyManager = container.resolve('proxyManager'); +const cache = container.resolve('cache'); +``` + +## Handler Integration + +Handlers receive services through the enhanced service container: + +```typescript +// Create service adapter with proxy from Awilix +const serviceContainerWithProxy = createServiceAdapter(services); +Object.defineProperty(serviceContainerWithProxy, 'proxy', { + get: () => container.resolve('proxyManager'), + enumerable: true, + configurable: true +}); + +// Handlers can now access proxy service +class MyHandler extends BaseHandler { + async myOperation() { + const proxy = this.proxy.getRandomProxy(); + // Use proxy... + } +} +``` + +## Benefits + +1. **Decoupled Libraries**: Libraries no longer depend on @stock-bot/config +2. **Reusability**: Libraries can be used in other projects +3. **Testability**: Easy to mock dependencies for testing +4. **Type Safety**: Full TypeScript support with Awilix + +## Next Steps + +To fully migrate to Awilix: +1. Update HTTP library to accept dependencies via constructor +2. Update Queue library to accept Redis config via constructor +3. Create actual MongoDB, PostgreSQL, and QuestDB clients in the container +4. Remove legacy service factory once all services are migrated \ No newline at end of file diff --git a/apps/data-ingestion/src/handlers/index.ts b/apps/data-ingestion/src/handlers/index.ts index c65cf9b..c6c193c 100644 --- a/apps/data-ingestion/src/handlers/index.ts +++ b/apps/data-ingestion/src/handlers/index.ts @@ -3,8 +3,7 @@ * Automatically discovers and registers all handlers */ -import type { IDataIngestionServices } from '@stock-bot/di'; -import { createServiceAdapter } from '@stock-bot/di'; +import type { IServiceContainer } from '@stock-bot/handlers'; import { autoRegisterHandlers } from '@stock-bot/handlers'; import { getLogger } from '@stock-bot/logger'; import { join } from 'path'; @@ -19,10 +18,7 @@ const logger = getLogger('handler-init'); /** * Initialize and register all handlers automatically */ -export async function initializeAllHandlers(services: IDataIngestionServices): Promise { - // Create generic service container adapter - const serviceContainer = createServiceAdapter(services); - +export async function initializeAllHandlers(serviceContainer: IServiceContainer): Promise { try { // Auto-register all handlers in this directory const result = await autoRegisterHandlers( diff --git a/apps/data-ingestion/src/handlers/webshare/webshare.handler.ts b/apps/data-ingestion/src/handlers/webshare/webshare.handler.ts index 3516126..7dd94f5 100644 --- a/apps/data-ingestion/src/handlers/webshare/webshare.handler.ts +++ b/apps/data-ingestion/src/handlers/webshare/webshare.handler.ts @@ -6,7 +6,6 @@ import { type ExecutionContext, type IServiceContainer } from '@stock-bot/handlers'; -import { updateProxies } from '@stock-bot/utils'; @Handler('webshare') export class WebShareHandler extends BaseHandler { @@ -28,8 +27,8 @@ export class WebShareHandler extends BaseHandler { const proxies = await fetchWebShareProxies(); if (proxies.length > 0) { - // Update the centralized proxy manager - await updateProxies(proxies); + // Update the centralized proxy manager using the injected service + await this.proxy.updateProxies(proxies); this.logger.info('Updated proxy manager with WebShare proxies', { count: proxies.length, diff --git a/apps/data-ingestion/src/index.ts b/apps/data-ingestion/src/index.ts index 6265342..655e19c 100644 --- a/apps/data-ingestion/src/index.ts +++ b/apps/data-ingestion/src/index.ts @@ -10,14 +10,17 @@ import { cors } from 'hono/cors'; // Library imports import { + createServiceContainer, + initializeServices as initializeAwilixServices, + createServiceAdapter, createDataIngestionServices, disposeDataIngestionServices, - type IDataIngestionServices + type IDataIngestionServices, + type ServiceContainer } from '@stock-bot/di'; import { getLogger, setLoggerConfig, shutdownLoggers } from '@stock-bot/logger'; import { Shutdown } from '@stock-bot/shutdown'; import { handlerRegistry } from '@stock-bot/types'; -import { ProxyManager } from '@stock-bot/utils'; // Local imports import { createRoutes } from './routes/create-routes'; @@ -43,6 +46,7 @@ const logger = getLogger('data-ingestion'); const PORT = serviceConfig.port; let server: ReturnType | null = null; let services: IDataIngestionServices | null = null; +let container: ServiceContainer | null = null; let app: Hono | null = null; // Initialize shutdown manager @@ -53,7 +57,36 @@ async function initializeServices() { logger.info('Initializing data-ingestion service with improved DI...'); try { - // Create all services using the service factory + // Create Awilix container with proper config structure + logger.debug('Creating Awilix DI container...'); + const awilixConfig = { + redis: { + host: config.database.dragonfly.host, + port: config.database.dragonfly.port, + db: config.database.dragonfly.db, + }, + mongodb: { + uri: config.database.mongodb.uri, + database: config.database.mongodb.database, + }, + postgres: { + host: config.database.postgres.host, + port: config.database.postgres.port, + database: config.database.postgres.database, + user: config.database.postgres.user, + password: config.database.postgres.password, + }, + proxy: { + cachePrefix: 'proxy:', + ttl: 3600, + }, + }; + + container = createServiceContainer(awilixConfig); + await initializeAwilixServices(container); + logger.info('Awilix container created and initialized'); + + // Create all services using the service factory (for backward compatibility) logger.debug('Creating services using service factory...'); services = await createDataIngestionServices(config); logger.info('All services created successfully'); @@ -76,16 +109,20 @@ async function initializeServices() { const routes = createRoutes(services); app.route('/', routes); - // Initialize proxy manager - logger.debug('Initializing proxy manager...'); - await ProxyManager.initialize(); - logger.info('Proxy manager initialized'); - - // Initialize handlers with new DI pattern - logger.debug('Initializing data handlers with new DI pattern...'); + // Initialize handlers with Awilix service container + logger.debug('Initializing data handlers with Awilix DI pattern...'); - // Auto-register all handlers - await initializeAllHandlers(services); + // Create service adapter that includes proxy from Awilix container + const serviceContainerWithProxy = createServiceAdapter(services); + // Override the proxy service with the one from Awilix + Object.defineProperty(serviceContainerWithProxy, 'proxy', { + get: () => container!.resolve('proxyManager'), + enumerable: true, + configurable: true + }); + + // Auto-register all handlers with the enhanced service container + await initializeAllHandlers(serviceContainerWithProxy); logger.info('Data handlers initialized with new DI pattern'); diff --git a/bun.lock b/bun.lock index 0ca1be6..e32daf0 100644 --- a/bun.lock +++ b/bun.lock @@ -7,6 +7,7 @@ "@primeng/themes": "^19.1.3", "@tanstack/table-core": "^8.21.3", "@types/pg": "^8.15.4", + "awilix": "^12.0.5", "bullmq": "^5.53.2", "ioredis": "^5.6.1", "pg": "^8.16.0", @@ -986,6 +987,8 @@ "available-typed-arrays": ["available-typed-arrays@1.0.7", "", { "dependencies": { "possible-typed-array-names": "^1.0.0" } }, "sha512-wvUjBtSGN7+7SjNpq/9M2Tg350UZD3q62IFZLbRAR1bSMlCo1ZaeW+BJ+D090e4hIIZLBcTDWe4Mh4jvUDajzQ=="], + "awilix": ["awilix@12.0.5", "", { "dependencies": { "camel-case": "^4.1.2", "fast-glob": "^3.3.3" } }, "sha512-Qf/V/hRo6DK0FoBKJ9QiObasRxHAhcNi0mV6kW2JMawxS3zq6Un+VsZmVAZDUfvB+MjTEiJ2tUJUl4cr0JiUAw=="], + "aws4": ["aws4@1.13.2", "", {}, "sha512-lHe62zvbTB5eEABUVi/AwVh0ZKY9rMMDhmm+eeyuuUQbQ3+J+fONVQOZyj+DdrvD4BY33uYniyRJ4UJIaSKAfw=="], "axios": ["axios@1.10.0", "", { "dependencies": { "follow-redirects": "^1.15.6", "form-data": "^4.0.0", "proxy-from-env": "^1.1.0" } }, "sha512-/1xYAC4MP/HEG+3duIhFr4ZQXR4sQXOIe+o6sdqzeykGLx6Upp/1p8MHqhINOvGeP7xyNHe7tsiJByc4SSVUxw=="], @@ -1060,6 +1063,8 @@ "callsites": ["callsites@3.1.0", "", {}, "sha512-P8BjAsXvZS+VIDUI11hHCQEv74YT67YUi5JJFNWIqL235sBmjX4+qx9Muvls5ivyNENctx46xQLQ3aTuE7ssaQ=="], + "camel-case": ["camel-case@4.1.2", "", { "dependencies": { "pascal-case": "^3.1.2", "tslib": "^2.0.3" } }, "sha512-gxGWBrTT1JuMx6R+o5PTXMmUnhnVzLQ9SNutD4YqKtI6ap897t3tKECYla6gCWEkplXnlNybEkZg9GEGxKFCgw=="], + "camelcase": ["camelcase@6.3.0", "", {}, "sha512-Gmy6FhYlCY7uOElZUSbxo2UCDH8owEk996gkbrpsgGtrJLM3J7jGxl9Ic7Qwwj4ivOE5AWZWRMecDdF7hqGjFA=="], "camelcase-css": ["camelcase-css@2.0.1", "", {}, "sha512-QOSvevhslijgYwRx6Rv7zKdMF8lbRmx+uQGx2+vDc+KI/eBnsy9kit5aj23AgGu3pa4t9AgwbnXWqS+iOY+2aA=="], @@ -1596,6 +1601,8 @@ "loose-envify": ["loose-envify@1.4.0", "", { "dependencies": { "js-tokens": "^3.0.0 || ^4.0.0" }, "bin": { "loose-envify": "cli.js" } }, "sha512-lyuxPGr/Wfhrlem2CL/UcnUc1zcqKAImBDzukY7Y5F/yQiNdko6+fRLevlw1HgMySw7f611UIY408EtxRSoK3Q=="], + "lower-case": ["lower-case@2.0.2", "", { "dependencies": { "tslib": "^2.0.3" } }, "sha512-7fm3l3NAF9WfN6W3JOmf5drwpVqX78JtoGJ3A6W0a6ZnldM41w2fV5D490psKFTpMds8TJse/eHLFFsNHHjHgg=="], + "lowercase-keys": ["lowercase-keys@3.0.0", "", {}, "sha512-ozCC6gdQ+glXOQsveKD0YsDy8DSQFjDTz4zyzEHNV5+JP5D62LmfDZ6o1cycFx9ouG940M5dE8C8CTewdj2YWQ=="], "lru-cache": ["lru-cache@11.1.0", "", {}, "sha512-QIXZUBJUx+2zHUdQujWejBkcD9+cs94tLn0+YL8UrCh+D5sCXZ4c7LaEH48pNwRY3MLDgqUFyhlCyjJPf1WP0A=="], @@ -1694,6 +1701,8 @@ "new-find-package-json": ["new-find-package-json@2.0.0", "", { "dependencies": { "debug": "^4.3.4" } }, "sha512-lDcBsjBSMlj3LXH2v/FW3txlh2pYTjmbOXPYJD93HI5EwuLzI11tdHSIpUMmfq/IOsldj4Ps8M8flhm+pCK4Ew=="], + "no-case": ["no-case@3.0.4", "", { "dependencies": { "lower-case": "^2.0.2", "tslib": "^2.0.3" } }, "sha512-fgAN3jGAh+RoxUGZHTSOLJIqUc2wmoBwGR4tbpNAKmmovFoWq0OdRkb0VkldReO2a2iBT/OEulG9XSUc10r3zg=="], + "node-abi": ["node-abi@3.75.0", "", { "dependencies": { "semver": "^7.3.5" } }, "sha512-OhYaY5sDsIka7H7AtijtI9jwGYLyl29eQn/W623DiN/MIv5sUqc4g7BIDThX+gb7di9f6xK02nkp8sdfFWZLTg=="], "node-abort-controller": ["node-abort-controller@3.1.1", "", {}, "sha512-AGK2yQKIjRuqnc6VkX2Xj5d+QW8xZ87pa1UK6yA6ouUyuxfHuMP6umE5QK7UmTeOAymo+Zx1Fxiuw9rVx8taHQ=="], @@ -1782,6 +1791,8 @@ "parseurl": ["parseurl@1.3.3", "", {}, "sha512-CiyeOxFT/JZyN5m0z9PfXw4SCBJ6Sygz1Dpl0wqjlhDEGGBP1GnsUVEL0p63hoG1fcj3fHynXi9NYO4nWOL+qQ=="], + "pascal-case": ["pascal-case@3.1.2", "", { "dependencies": { "no-case": "^3.0.4", "tslib": "^2.0.3" } }, "sha512-uWlGT3YSnK9x3BQJaOdcZwrnV6hPpd8jFH1/ucpiLRPh/2zCVJKS19E4GvYHvaCcACn3foXZ0cLB9Wrx1KGe5g=="], + "path-exists": ["path-exists@4.0.0", "", {}, "sha512-ak9Qy5Q7jYb2Wwcey5Fpvg2KoAc/ZIhLSLOSBmRmygPsGwkVVt0fZa0qrtMz+m6tJTAHfZQ8FnmB4MG4LWy7/w=="], "path-is-absolute": ["path-is-absolute@1.0.1", "", {}, "sha512-AVbw3UJ2e9bq64vSaS9Am0fje1Pa8pbGqTTsmXfaIiMpnr5DlDhfJOuLj9Sf95ZPVDAUerDfEk88MPmPe7UCQg=="], diff --git a/libs/core/di/src/adapters/service-adapter.ts b/libs/core/di/src/adapters/service-adapter.ts index fe87f7f..69695af 100644 --- a/libs/core/di/src/adapters/service-adapter.ts +++ b/libs/core/di/src/adapters/service-adapter.ts @@ -5,7 +5,6 @@ import type { IServiceContainer } from '@stock-bot/handlers'; import type { IDataIngestionServices } from '../service-interfaces'; -import { ProxyManager } from '@stock-bot/proxy'; /** * Adapter that converts IDataIngestionServices to IServiceContainer @@ -23,9 +22,10 @@ export class DataIngestionServiceAdapter implements IServiceContainer { // HTTP client not in current data services - will be added when needed return null; } - get proxy() { - // Return singleton proxy manager instance - return ProxyManager.getInstance(); + get proxy(): any { + // Proxy manager should be injected via Awilix container + // This adapter is for legacy compatibility + throw new Error('ProxyManager must be provided through Awilix container'); } // Database clients diff --git a/libs/core/di/src/awilix-container.ts b/libs/core/di/src/awilix-container.ts new file mode 100644 index 0000000..b5251ec --- /dev/null +++ b/libs/core/di/src/awilix-container.ts @@ -0,0 +1,174 @@ +/** + * Awilix DI Container Setup + * Creates a decoupled, reusable dependency injection container + */ + +import { createContainer, asFunction, asValue, InjectionMode, type AwilixContainer } from 'awilix'; +import { createCache, type CacheProvider } from '@stock-bot/cache'; +import { ProxyManager } from '@stock-bot/proxy'; +import { getLogger } from '@stock-bot/logger'; +import type { IServiceContainer } from '@stock-bot/handlers'; + +// Configuration types +export interface AppConfig { + redis: { + host: string; + port: number; + password?: string; + username?: string; + db?: number; + }; + mongodb: { + uri: string; + database: string; + }; + postgres: { + host: string; + port: number; + database: string; + user: string; + password: string; + }; + questdb?: { + host: string; + port: number; + }; + proxy?: { + cachePrefix?: string; + ttl?: number; + }; +} + +/** + * Create and configure the DI container + */ +export function createServiceContainer(config: AppConfig): AwilixContainer { + const container = createContainer({ + injectionMode: InjectionMode.PROXY, + }); + + // Register configuration values + container.register({ + // Configuration + config: asValue(config), + redisConfig: asValue(config.redis), + mongoConfig: asValue(config.mongodb), + postgresConfig: asValue(config.postgres), + questdbConfig: asValue(config.questdb || { host: 'localhost', port: 9009 }), + + // Core services with dependency injection + logger: asFunction(() => getLogger('app')).singleton(), + + // Cache with injected config and logger + cache: asFunction(({ redisConfig, logger }) => + createCache({ + redisConfig, + logger, + keyPrefix: 'cache:', + ttl: 3600, + enableMetrics: true, + }) + ).singleton(), + + // Proxy manager with injected cache and logger + proxyManager: asFunction(({ cache, config, logger }) => { + const manager = new ProxyManager( + cache, + config.proxy || {}, + logger + ); + // Note: initialization happens in initializeServices function + return manager; + }).singleton(), + + // HTTP client can be added here when decoupled + httpClient: asFunction(() => { + // TODO: Import and create HTTP client when decoupled + return null; + }).singleton(), + + // Database clients - placeholders for now + mongoClient: asFunction(() => { + // TODO: Create MongoDB client + return null; + }).singleton(), + + postgresClient: asFunction(() => { + // TODO: Create PostgreSQL client + return null; + }).singleton(), + + questdbClient: asFunction(() => { + // TODO: Create QuestDB client + return null; + }).singleton(), + + // Queue manager - placeholder + queueManager: asFunction(() => { + // TODO: Create queue manager when decoupled + return null; + }).singleton(), + + // Build the IServiceContainer for handlers + serviceContainer: asFunction((cradle) => ({ + logger: cradle.logger, + cache: cradle.cache, + proxy: cradle.proxyManager, + http: cradle.httpClient, + mongodb: cradle.mongoClient, + postgres: cradle.postgresClient, + questdb: cradle.questdbClient, + queue: cradle.queueManager, + } as IServiceContainer)).singleton(), + }); + + return container; +} + +/** + * Initialize async services after container creation + */ +export async function initializeServices(container: AwilixContainer): Promise { + const logger = container.resolve('logger'); + + try { + // Wait for cache to be ready first + const cache = container.resolve('cache'); + if (cache && typeof cache.waitForReady === 'function') { + await cache.waitForReady(10000); + logger.info('Cache is ready'); + } + + // Initialize proxy manager + const proxyManager = container.resolve('proxyManager'); + if (proxyManager && typeof proxyManager.initialize === 'function') { + await proxyManager.initialize(); + logger.info('Proxy manager initialized'); + } + + // Initialize other async services as needed + // ... + + logger.info('All services initialized successfully'); + } catch (error) { + logger.error('Failed to initialize services', { error }); + throw error; + } +} + +// Type definitions for container resolution +export interface ServiceCradle { + config: AppConfig; + logger: any; + cache: CacheProvider; + proxyManager: ProxyManager; + httpClient: any; + mongoClient: any; + postgresClient: any; + questdbClient: any; + queueManager: any; + serviceContainer: IServiceContainer; +} + +// Export typed container +export type ServiceContainer = AwilixContainer; \ No newline at end of file diff --git a/libs/core/di/src/index.ts b/libs/core/di/src/index.ts index 24698b8..977d23a 100644 --- a/libs/core/di/src/index.ts +++ b/libs/core/di/src/index.ts @@ -6,4 +6,13 @@ export * from './pool-size-calculator'; export * from './types'; export * from './service-interfaces'; export * from './service-factory'; -export * from './adapters/service-adapter'; \ No newline at end of file +export * from './adapters/service-adapter'; + +// Awilix container exports +export { + createServiceContainer, + initializeServices, + type AppConfig, + type ServiceCradle, + type ServiceContainer +} from './awilix-container'; \ No newline at end of file diff --git a/libs/core/di/src/service-factory.ts b/libs/core/di/src/service-factory.ts index 0150539..c3ac16a 100644 --- a/libs/core/di/src/service-factory.ts +++ b/libs/core/di/src/service-factory.ts @@ -5,7 +5,6 @@ import { getLogger } from '@stock-bot/logger'; import { ConnectionFactory } from './connection-factory'; import { PoolSizeCalculator } from './pool-size-calculator'; -import { ProxyManager } from '@stock-bot/proxy'; import type { IDataIngestionServices, IServiceFactory, @@ -45,9 +44,7 @@ export class DataIngestionServiceFactory implements IServiceFactory { this.createQueueConnection(connectionFactory, config) ]); - // Initialize proxy manager - logger.info('Initializing proxy manager...'); - await ProxyManager.initialize(); + // Note: Proxy manager initialization moved to Awilix container const services: IDataIngestionServices = { mongodb: mongoPool.client, diff --git a/libs/data/cache/src/connection-manager.ts b/libs/data/cache/src/connection-manager.ts index d0d361e..6da09b3 100644 --- a/libs/data/cache/src/connection-manager.ts +++ b/libs/data/cache/src/connection-manager.ts @@ -1,5 +1,4 @@ import Redis from 'ioredis'; -import { getLogger } from '@stock-bot/logger'; import type { RedisConfig } from './types'; interface ConnectionConfig { @@ -7,6 +6,7 @@ interface ConnectionConfig { singleton?: boolean; db?: number; redisConfig: RedisConfig; + logger?: any; } /** @@ -16,7 +16,7 @@ export class RedisConnectionManager { private connections = new Map(); private static sharedConnections = new Map(); private static instance: RedisConnectionManager; - private logger = getLogger('redis-connection-manager'); + private logger: any = console; private static readyConnections = new Set(); // Singleton pattern for the manager itself @@ -33,7 +33,10 @@ export class RedisConnectionManager { * @returns Redis connection instance */ getConnection(config: ConnectionConfig): Redis { - const { name, singleton = false, db, redisConfig } = config; + const { name, singleton = false, db, redisConfig, logger } = config; + if (logger) { + this.logger = logger; + } if (singleton) { // Use shared connection across all instances diff --git a/libs/data/cache/src/redis-cache.ts b/libs/data/cache/src/redis-cache.ts index 7fcab9e..050c48d 100644 --- a/libs/data/cache/src/redis-cache.ts +++ b/libs/data/cache/src/redis-cache.ts @@ -1,5 +1,4 @@ import Redis from 'ioredis'; -import { getLogger } from '@stock-bot/logger'; import { RedisConnectionManager } from './connection-manager'; import { CacheOptions, CacheProvider, CacheStats } from './types'; @@ -8,7 +7,7 @@ import { CacheOptions, CacheProvider, CacheStats } from './types'; */ export class RedisCache implements CacheProvider { private redis: Redis; - private logger = getLogger('redis-cache'); + private logger: any; private defaultTTL: number; private keyPrefix: string; private enableMetrics: boolean; @@ -29,6 +28,7 @@ export class RedisCache implements CacheProvider { this.defaultTTL = options.ttl ?? 3600; // 1 hour default this.keyPrefix = options.keyPrefix ?? 'cache:'; this.enableMetrics = options.enableMetrics ?? true; + this.logger = options.logger || console; // Use provided logger or console as fallback // Get connection manager instance this.connectionManager = RedisConnectionManager.getInstance(); @@ -47,6 +47,7 @@ export class RedisCache implements CacheProvider { name: `${baseName}-SERVICE`, singleton: options.shared ?? true, // Default to shared connection for cache redisConfig: options.redisConfig, + logger: this.logger, }); // Only setup event handlers for non-shared connections to avoid memory leaks diff --git a/libs/data/cache/src/types.ts b/libs/data/cache/src/types.ts index cdaaca2..6678f94 100644 --- a/libs/data/cache/src/types.ts +++ b/libs/data/cache/src/types.ts @@ -85,6 +85,7 @@ export interface CacheOptions { name?: string; // Name for connection identification shared?: boolean; // Whether to use shared connection redisConfig: RedisConfig; + logger?: any; // Optional logger instance } export interface CacheStats { diff --git a/libs/services/proxy/src/index.ts b/libs/services/proxy/src/index.ts index f84e821..17ad05f 100644 --- a/libs/services/proxy/src/index.ts +++ b/libs/services/proxy/src/index.ts @@ -5,32 +5,15 @@ // Main classes export { ProxyManager } from './proxy-manager'; -export { ProxySyncService } from './proxy-sync'; // Types -export type { - ProxyInfo, - ProxyManagerConfig, - ProxySyncConfig, - ProxyStats +export type { + ProxyInfo, + ProxyManagerConfig, ProxyStats, ProxySyncConfig } from './types'; -// Convenience functions -export { - getProxy, - getRandomProxy, - getAllProxies, - getWorkingProxies, - updateProxies, - getProxyStats -} from './proxy-manager'; - -export { - getProxySyncService, - startProxySync, - stopProxySync, - syncProxiesOnce -} from './proxy-sync'; +// Note: Convenience functions removed as ProxyManager is no longer a singleton +// Create an instance and use its methods directly // Default export -export { ProxyManager as default } from './proxy-manager'; \ No newline at end of file +export { ProxyManager as default } from './proxy-manager'; diff --git a/libs/services/proxy/src/proxy-manager.ts b/libs/services/proxy/src/proxy-manager.ts index 94787de..fc5fd74 100644 --- a/libs/services/proxy/src/proxy-manager.ts +++ b/libs/services/proxy/src/proxy-manager.ts @@ -1,37 +1,21 @@ /** * Centralized Proxy Manager - Handles proxy storage, retrieval, and caching */ -import { createCache, type CacheProvider } from '@stock-bot/cache'; -import { getDatabaseConfig } from '@stock-bot/config'; -import { getLogger } from '@stock-bot/logger'; +import type { CacheProvider } from '@stock-bot/cache'; import type { ProxyInfo, ProxyManagerConfig, ProxyStats } from './types'; -const logger = getLogger('proxy-manager'); - export class ProxyManager { - private static instance: ProxyManager | null = null; private cache: CacheProvider; private proxies: ProxyInfo[] = []; private proxyIndex: number = 0; private lastUpdate: Date | null = null; private isInitialized = false; - private config: ProxyManagerConfig; + private logger: any; - private constructor(config: ProxyManagerConfig = {}) { - this.config = { - cachePrefix: 'proxies:', - ttl: 86400, // 24 hours - enableMetrics: true, - ...config - }; - - const databaseConfig = getDatabaseConfig(); - this.cache = createCache({ - redisConfig: databaseConfig.dragonfly, - keyPrefix: this.config.cachePrefix, - ttl: this.config.ttl, - enableMetrics: this.config.enableMetrics, - }); + constructor(cache: CacheProvider, _config: ProxyManagerConfig = {}, logger?: any) { + this.cache = cache; + this.logger = logger || console; + // Config can be used in the future for customization } /** @@ -43,27 +27,27 @@ export class ProxyManager { } try { - logger.info('Initializing proxy manager...'); + this.logger.info('Initializing proxy manager...'); // Wait for cache to be ready await this.cache.waitForReady(10000); // Wait up to 10 seconds - logger.debug('Cache is ready'); + this.logger.debug('Cache is ready'); await this.loadFromCache(); this.isInitialized = true; - logger.info('Proxy manager initialized', { + this.logger.info('Proxy manager initialized', { proxiesLoaded: this.proxies.length, lastUpdate: this.lastUpdate, }); } catch (error) { - logger.error('Failed to initialize proxy manager', { error }); + this.logger.error('Failed to initialize proxy manager', { error }); this.isInitialized = true; // Set to true anyway to avoid infinite retries } } getProxy(): string | null { if (this.proxies.length === 0) { - logger.warn('No proxies available in memory'); + this.logger.warn('No proxies available in memory'); return null; } @@ -97,7 +81,7 @@ export class ProxyManager { // Return null if no proxies available if (this.proxies.length === 0) { - logger.warn('No proxies available in memory'); + this.logger.warn('No proxies available in memory'); return null; } @@ -105,7 +89,7 @@ export class ProxyManager { const workingProxies = this.proxies.filter(proxy => proxy.isWorking !== false); if (workingProxies.length === 0) { - logger.warn('No working proxies available'); + this.logger.warn('No working proxies available'); return null; } @@ -122,11 +106,11 @@ export class ProxyManager { const selectedProxy = topProxies[Math.floor(Math.random() * topProxies.length)]; if (!selectedProxy) { - logger.warn('No proxy selected from available pool'); + this.logger.warn('No proxy selected from available pool'); return null; } - logger.debug('Selected proxy', { + this.logger.debug('Selected proxy', { host: selectedProxy.host, port: selectedProxy.port, successRate: selectedProxy.successRate, @@ -178,8 +162,13 @@ export class ProxyManager { * Update the proxy pool with new proxies */ async updateProxies(proxies: ProxyInfo[]): Promise { + // Ensure manager is initialized before updating + if (!this.isInitialized) { + await this.initializeInternal(); + } + try { - logger.info('Updating proxy pool', { newCount: proxies.length, existingCount: this.proxies.length }); + this.logger.info('Updating proxy pool', { newCount: proxies.length, existingCount: this.proxies.length }); this.proxies = proxies; this.lastUpdate = new Date(); @@ -189,13 +178,13 @@ export class ProxyManager { await this.cache.set('last-update', this.lastUpdate.toISOString()); const workingCount = proxies.filter(p => p.isWorking !== false).length; - logger.info('Proxy pool updated successfully', { + this.logger.info('Proxy pool updated successfully', { totalProxies: proxies.length, workingProxies: workingCount, lastUpdate: this.lastUpdate, }); } catch (error) { - logger.error('Failed to update proxy pool', { error }); + this.logger.error('Failed to update proxy pool', { error }); throw error; } } @@ -210,10 +199,10 @@ export class ProxyManager { if (existingIndex >= 0) { this.proxies[existingIndex] = { ...this.proxies[existingIndex], ...proxy }; - logger.debug('Updated existing proxy', { host: proxy.host, port: proxy.port }); + this.logger.debug('Updated existing proxy', { host: proxy.host, port: proxy.port }); } else { this.proxies.push(proxy); - logger.debug('Added new proxy', { host: proxy.host, port: proxy.port }); + this.logger.debug('Added new proxy', { host: proxy.host, port: proxy.port }); } // Update cache @@ -231,7 +220,7 @@ export class ProxyManager { if (this.proxies.length < initialLength) { await this.updateProxies(this.proxies); - logger.debug('Removed proxy', { host, port, protocol }); + this.logger.debug('Removed proxy', { host, port, protocol }); } } @@ -245,7 +234,7 @@ export class ProxyManager { await this.cache.del('active-proxies'); await this.cache.del('last-update'); - logger.info('Cleared all proxies'); + this.logger.info('Cleared all proxies'); } /** @@ -267,79 +256,29 @@ export class ProxyManager { this.proxies = cachedProxies; this.lastUpdate = lastUpdateStr ? new Date(lastUpdateStr) : null; - logger.debug('Loaded proxies from cache', { + this.logger.debug('Loaded proxies from cache', { count: this.proxies.length, lastUpdate: this.lastUpdate, }); } else { - logger.debug('No cached proxies found'); + this.logger.debug('No cached proxies found'); } } catch (error) { - logger.error('Failed to load proxies from cache', { error }); + this.logger.error('Failed to load proxies from cache', { error }); } } /** - * Initialize the singleton instance + * Initialize the proxy manager */ - static async initialize(config?: ProxyManagerConfig): Promise { - if (!ProxyManager.instance) { - ProxyManager.instance = new ProxyManager(config); - await ProxyManager.instance.initializeInternal(); - - // Perform initial sync with proxy:active:* storage - try { - const { syncProxiesOnce } = await import('./proxy-sync'); - await syncProxiesOnce(); - logger.info('Initial proxy sync completed'); - } catch (error) { - logger.error('Failed to perform initial proxy sync', { error }); - } - } - } - - /** - * Get the singleton instance (must be initialized first) - */ - static getInstance(): ProxyManager { - if (!ProxyManager.instance) { - throw new Error('ProxyManager not initialized. Call ProxyManager.initialize() first.'); - } - return ProxyManager.instance; - } - - /** - * Reset the singleton instance (for testing) - */ - static reset(): void { - ProxyManager.instance = null; + async initialize(): Promise { + await this.initializeInternal(); + + // Note: Initial proxy sync should be handled by the container or application + // that creates ProxyManager instance + this.logger.info('ProxyManager initialized - proxy sync should be handled externally'); } } // Export the class as default -export default ProxyManager; - -// Convenience functions for easier imports -export function getProxy(): string | null { - return ProxyManager.getInstance().getProxy(); -} - -export function getRandomProxy(): ProxyInfo | null { - return ProxyManager.getInstance().getRandomProxy(); -} - -export function getAllProxies(): ProxyInfo[] { - return ProxyManager.getInstance().getAllProxies(); -} - -export function getWorkingProxies(): ProxyInfo[] { - return ProxyManager.getInstance().getWorkingProxies(); -} - -export async function updateProxies(proxies: ProxyInfo[]): Promise { - return ProxyManager.getInstance().updateProxies(proxies); -} - -export function getProxyStats(): ProxyStats { - return ProxyManager.getInstance().getStats(); -} \ No newline at end of file +export default ProxyManager; \ No newline at end of file diff --git a/libs/services/proxy/src/proxy-sync.ts b/libs/services/proxy/src/proxy-sync.ts deleted file mode 100644 index 80d8ff4..0000000 --- a/libs/services/proxy/src/proxy-sync.ts +++ /dev/null @@ -1,170 +0,0 @@ -/** - * Proxy Storage Synchronization Service - * - * This service bridges the gap between two proxy storage systems: - * 1. proxy:active:* keys (used by proxy tasks for individual proxy storage) - * 2. proxies:active-proxies (used by ProxyManager for centralized storage) - */ - -import { createCache, type CacheProvider } from '@stock-bot/cache'; -import { getDatabaseConfig } from '@stock-bot/config'; -import { getLogger } from '@stock-bot/logger'; -import type { ProxyInfo, ProxySyncConfig } from './types'; -import { ProxyManager } from './proxy-manager'; - -const logger = getLogger('proxy-sync'); - -export class ProxySyncService { - private cache: CacheProvider; - private syncInterval: Timer | null = null; - private isRunning = false; - private config: ProxySyncConfig; - - constructor(config: ProxySyncConfig = {}) { - this.config = { - intervalMs: 300000, // 5 minutes - enableAutoSync: true, - ...config - }; - - const databaseConfig = getDatabaseConfig(); - this.cache = createCache({ - redisConfig: databaseConfig.dragonfly, - keyPrefix: '', // No prefix to access all keys - ttl: 86400, - }); - } - - /** - * Start the synchronization service - * @param intervalMs - Sync interval in milliseconds (default: 5 minutes) - */ - async start(intervalMs?: number): Promise { - const interval = intervalMs || this.config.intervalMs!; - - if (this.isRunning) { - logger.warn('Proxy sync service is already running'); - return; - } - - this.isRunning = true; - logger.info('Starting proxy sync service', { intervalMs: interval }); - - // Wait for cache to be ready before initial sync - await this.cache.waitForReady(10000); - - // Initial sync - await this.syncProxies(); - - // Set up periodic sync if enabled - if (this.config.enableAutoSync) { - this.syncInterval = setInterval(async () => { - try { - await this.syncProxies(); - } catch (error) { - logger.error('Error during periodic sync', { error }); - } - }, interval); - } - } - - /** - * Stop the synchronization service - */ - stop(): void { - if (this.syncInterval) { - clearInterval(this.syncInterval); - this.syncInterval = null; - } - this.isRunning = false; - logger.info('Stopped proxy sync service'); - } - - /** - * Perform a one-time synchronization - */ - async syncProxies(): Promise { - try { - logger.debug('Starting proxy synchronization'); - - // Wait for cache to be ready - await this.cache.waitForReady(5000); - - // Collect all proxies from proxy:active:* storage - const proxyKeys = await this.cache.keys('proxy:active:*'); - - if (proxyKeys.length === 0) { - logger.debug('No proxies found in proxy:active:* storage'); - return; - } - - const allProxies: ProxyInfo[] = []; - - // Fetch all proxies in parallel for better performance - const proxyPromises = proxyKeys.map(key => this.cache.get(key)); - const proxyResults = await Promise.all(proxyPromises); - - for (const proxy of proxyResults) { - if (proxy) { - allProxies.push(proxy); - } - } - - const workingCount = allProxies.filter(p => p.isWorking).length; - - logger.info('Collected proxies from storage', { - total: allProxies.length, - working: workingCount, - }); - - // Update ProxyManager with all proxies - const manager = ProxyManager.getInstance(); - await manager.updateProxies(allProxies); - - logger.info('Proxy synchronization completed', { - synchronized: allProxies.length, - working: workingCount, - }); - } catch (error) { - logger.error('Failed to sync proxies', { error }); - throw error; - } - } - - /** - * Get synchronization status - */ - getStatus(): { isRunning: boolean; config: ProxySyncConfig } { - return { - isRunning: this.isRunning, - config: this.config - }; - } -} - -// Export singleton instance -let syncServiceInstance: ProxySyncService | null = null; - -export function getProxySyncService(config?: ProxySyncConfig): ProxySyncService { - if (!syncServiceInstance) { - syncServiceInstance = new ProxySyncService(config); - } - return syncServiceInstance; -} - -// Convenience functions -export async function startProxySync(intervalMs?: number, config?: ProxySyncConfig): Promise { - const service = getProxySyncService(config); - await service.start(intervalMs); -} - -export function stopProxySync(): void { - if (syncServiceInstance) { - syncServiceInstance.stop(); - } -} - -export async function syncProxiesOnce(): Promise { - const service = getProxySyncService(); - await service.syncProxies(); -} \ No newline at end of file diff --git a/libs/utils/src/index.ts b/libs/utils/src/index.ts index 430bafa..d6b297e 100644 --- a/libs/utils/src/index.ts +++ b/libs/utils/src/index.ts @@ -2,4 +2,3 @@ export * from './calculations/index'; export * from './common'; export * from './dateUtils'; export * from './generic-functions'; -export * from './proxy'; diff --git a/libs/utils/src/proxy/index.ts b/libs/utils/src/proxy/index.ts deleted file mode 100644 index fe21f79..0000000 --- a/libs/utils/src/proxy/index.ts +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Proxy management utilities - */ -export { - default as ProxyManager, - getProxy, - getRandomProxy, - getAllProxies, - getWorkingProxies, - updateProxies -} from './proxy-manager'; - -export { - ProxySyncService, - getProxySyncService, - startProxySync, - stopProxySync, - syncProxiesOnce -} from './proxy-sync'; - -export type { ProxyInfo } from '@stock-bot/http'; // Re-export for convenience \ No newline at end of file diff --git a/libs/utils/src/proxy/proxy-manager.ts b/libs/utils/src/proxy/proxy-manager.ts deleted file mode 100644 index f7000d2..0000000 --- a/libs/utils/src/proxy/proxy-manager.ts +++ /dev/null @@ -1,291 +0,0 @@ -/** - * Centralized Proxy Manager - Handles proxy storage, retrieval, and caching - */ -import { createCache, type CacheProvider } from '@stock-bot/cache'; -import { getDatabaseConfig } from '@stock-bot/config'; -import { getLogger } from '@stock-bot/logger'; -import type { ProxyInfo } from '@stock-bot/http'; - -const logger = getLogger('proxy-manager'); - -export class ProxyManager { - private static instance: ProxyManager | null = null; - private cache: CacheProvider; - private proxies: ProxyInfo[] = []; - private lastUpdate: Date | null = null; - private isInitialized = false; - - private constructor() { - const databaseConfig = getDatabaseConfig(); - this.cache = createCache({ - redisConfig: databaseConfig.dragonfly, - keyPrefix: 'proxies:', - ttl: 86400, // 24 hours - enableMetrics: true, - }); - } - - /** - * Internal initialization - loads existing proxies from cache - */ - private async initializeInternal(): Promise { - if (this.isInitialized) { - return; - } - - try { - logger.info('Initializing proxy manager...'); - - // Wait for cache to be ready - await this.cache.waitForReady(10000); // Wait up to 10 seconds - logger.debug('Cache is ready'); - - await this.loadFromCache(); - this.isInitialized = true; - logger.info('Proxy manager initialized', { - proxiesLoaded: this.proxies.length, - lastUpdate: this.lastUpdate, - }); - } catch (error) { - logger.error('Failed to initialize proxy manager', { error }); - this.isInitialized = true; // Set to true anyway to avoid infinite retries - } - } - - /** - * Get a random working proxy from the available pool (synchronous) - */ - getRandomProxy(): ProxyInfo | null { - // Ensure initialized - if (!this.isInitialized) { - throw new Error('ProxyManager not initialized'); - } - - // Return null if no proxies available - if (this.proxies.length === 0) { - logger.warn('No proxies available in memory'); - return null; - } - - // Filter for working proxies (not explicitly marked as non-working) - const workingProxies = this.proxies.filter(proxy => proxy.isWorking !== false); - - if (workingProxies.length === 0) { - logger.warn('No working proxies available'); - return null; - } - - // Return random proxy with preference for recently successful ones - const sortedProxies = workingProxies.sort((a, b) => { - // Prefer proxies with better success rates - const aRate = a.successRate || 0; - const bRate = b.successRate || 0; - return bRate - aRate; - }); - - // Take from top 50% of best performing proxies - const topProxies = sortedProxies.slice(0, Math.max(1, Math.floor(sortedProxies.length * 0.5))); - const selectedProxy = topProxies[Math.floor(Math.random() * topProxies.length)]; - - if (!selectedProxy) { - logger.warn('No proxy selected from available pool'); - return null; - } - - logger.debug('Selected proxy', { - host: selectedProxy.host, - port: selectedProxy.port, - successRate: selectedProxy.successRate, - totalAvailable: workingProxies.length, - }); - - return selectedProxy; - } - - /** - * Get all working proxies (synchronous) - */ - getWorkingProxies(): ProxyInfo[] { - if (!this.isInitialized) { - throw new Error('ProxyManager not initialized'); - } - - return this.proxies.filter(proxy => proxy.isWorking !== false); - } - - /** - * Get all proxies (working and non-working) - */ - getAllProxies(): ProxyInfo[] { - if (!this.isInitialized) { - throw new Error('ProxyManager not initialized'); - } - - return [...this.proxies]; - } - - /** - * Update the proxy pool with new proxies - */ - async updateProxies(proxies: ProxyInfo[]): Promise { - try { - logger.info('Updating proxy pool', { newCount: proxies.length, existingCount: this.proxies.length }); - - this.proxies = proxies; - this.lastUpdate = new Date(); - - // Store to cache - await this.cache.set('active-proxies', proxies); - await this.cache.set('last-update', this.lastUpdate.toISOString()); - - const workingCount = proxies.filter(p => p.isWorking !== false).length; - logger.info('Proxy pool updated successfully', { - totalProxies: proxies.length, - workingProxies: workingCount, - lastUpdate: this.lastUpdate, - }); - } catch (error) { - logger.error('Failed to update proxy pool', { error }); - throw error; - } - } - - /** - * Add or update a single proxy in the pool - */ - async updateProxy(proxy: ProxyInfo): Promise { - const existingIndex = this.proxies.findIndex( - p => p.host === proxy.host && p.port === proxy.port && p.protocol === proxy.protocol - ); - - if (existingIndex >= 0) { - this.proxies[existingIndex] = { ...this.proxies[existingIndex], ...proxy }; - logger.debug('Updated existing proxy', { host: proxy.host, port: proxy.port }); - } else { - this.proxies.push(proxy); - logger.debug('Added new proxy', { host: proxy.host, port: proxy.port }); - } - - // Update cache - await this.updateProxies(this.proxies); - } - - /** - * Remove a proxy from the pool - */ - async removeProxy(host: string, port: number, protocol: string): Promise { - const initialLength = this.proxies.length; - this.proxies = this.proxies.filter( - p => !(p.host === host && p.port === port && p.protocol === protocol) - ); - - if (this.proxies.length < initialLength) { - await this.updateProxies(this.proxies); - logger.debug('Removed proxy', { host, port, protocol }); - } - } - - /** - * Clear all proxies from memory and cache - */ - async clearProxies(): Promise { - this.proxies = []; - this.lastUpdate = null; - - await this.cache.del('active-proxies'); - await this.cache.del('last-update'); - - logger.info('Cleared all proxies'); - } - - /** - * Check if proxy manager is ready - */ - isReady(): boolean { - return this.isInitialized; - } - - /** - * Load proxies from cache storage - */ - private async loadFromCache(): Promise { - try { - const cachedProxies = await this.cache.get('active-proxies'); - const lastUpdateStr = await this.cache.get('last-update'); - - if (cachedProxies && Array.isArray(cachedProxies)) { - this.proxies = cachedProxies; - this.lastUpdate = lastUpdateStr ? new Date(lastUpdateStr) : null; - - logger.debug('Loaded proxies from cache', { - count: this.proxies.length, - lastUpdate: this.lastUpdate, - }); - } else { - logger.debug('No cached proxies found'); - } - } catch (error) { - logger.error('Failed to load proxies from cache', { error }); - } - } - - /** - * Initialize the singleton instance - */ - static async initialize(): Promise { - if (!ProxyManager.instance) { - ProxyManager.instance = new ProxyManager(); - await ProxyManager.instance.initializeInternal(); - - // Perform initial sync with proxy:active:* storage - try { - const { syncProxiesOnce } = await import('./proxy-sync'); - await syncProxiesOnce(); - logger.info('Initial proxy sync completed'); - } catch (error) { - logger.error('Failed to perform initial proxy sync', { error }); - } - } - } - - /** - * Get the singleton instance (must be initialized first) - */ - static getInstance(): ProxyManager { - if (!ProxyManager.instance) { - throw new Error('ProxyManager not initialized. Call ProxyManager.initialize() first.'); - } - return ProxyManager.instance; - } - - /** - * Reset the singleton instance (for testing) - */ - static reset(): void { - ProxyManager.instance = null; - } -} - -// Export the class as default -export default ProxyManager; - -// Convenience functions for easier imports -export function getProxy(): ProxyInfo | null { - return ProxyManager.getInstance().getRandomProxy(); -} - -export function getRandomProxy(): ProxyInfo | null { - return ProxyManager.getInstance().getRandomProxy(); -} - -export function getAllProxies(): ProxyInfo[] { - return ProxyManager.getInstance().getAllProxies(); -} - -export function getWorkingProxies(): ProxyInfo[] { - return ProxyManager.getInstance().getWorkingProxies(); -} - -export async function updateProxies(proxies: ProxyInfo[]): Promise { - return ProxyManager.getInstance().updateProxies(proxies); -} \ No newline at end of file diff --git a/libs/utils/src/proxy/proxy-sync.ts b/libs/utils/src/proxy/proxy-sync.ts deleted file mode 100644 index cb99e46..0000000 --- a/libs/utils/src/proxy/proxy-sync.ts +++ /dev/null @@ -1,157 +0,0 @@ -/** - * Proxy Storage Synchronization Service - * - * This service bridges the gap between two proxy storage systems: - * 1. proxy:active:* keys (used by proxy tasks for individual proxy storage) - * 2. proxies:active-proxies (used by ProxyManager for centralized storage) - */ - -import { createCache, type CacheProvider } from '@stock-bot/cache'; -import { getDatabaseConfig } from '@stock-bot/config'; -import { getLogger } from '@stock-bot/logger'; -import type { ProxyInfo } from '@stock-bot/http'; -import { ProxyManager } from './proxy-manager'; - -const logger = getLogger('proxy-sync'); - -export class ProxySyncService { - private cache: CacheProvider; - private syncInterval: Timer | null = null; - private isRunning = false; - - constructor() { - const databaseConfig = getDatabaseConfig(); - this.cache = createCache({ - redisConfig: databaseConfig.dragonfly, - keyPrefix: '', // No prefix to access all keys - ttl: 86400, - }); - } - - /** - * Start the synchronization service - * @param intervalMs - Sync interval in milliseconds (default: 5 minutes) - */ - async start(intervalMs: number = 300000): Promise { - if (this.isRunning) { - logger.warn('Proxy sync service is already running'); - return; - } - - this.isRunning = true; - logger.info('Starting proxy sync service', { intervalMs }); - - // Wait for cache to be ready before initial sync - await this.cache.waitForReady(10000); - - // Initial sync - await this.syncProxies(); - - // Set up periodic sync - this.syncInterval = setInterval(async () => { - try { - await this.syncProxies(); - } catch (error) { - logger.error('Error during periodic sync', { error }); - } - }, intervalMs); - } - - /** - * Stop the synchronization service - */ - stop(): void { - if (this.syncInterval) { - clearInterval(this.syncInterval); - this.syncInterval = null; - } - this.isRunning = false; - logger.info('Stopped proxy sync service'); - } - - /** - * Perform a one-time synchronization - */ - async syncProxies(): Promise { - try { - logger.debug('Starting proxy synchronization'); - - // Wait for cache to be ready - await this.cache.waitForReady(5000); - - // Collect all proxies from proxy:active:* storage - const proxyKeys = await this.cache.keys('proxy:active:*'); - - if (proxyKeys.length === 0) { - logger.debug('No proxies found in proxy:active:* storage'); - return; - } - - const allProxies: ProxyInfo[] = []; - - // Fetch all proxies in parallel for better performance - const proxyPromises = proxyKeys.map(key => this.cache.get(key)); - const proxyResults = await Promise.all(proxyPromises); - - for (const proxy of proxyResults) { - if (proxy) { - allProxies.push(proxy); - } - } - - const workingCount = allProxies.filter(p => p.isWorking).length; - - logger.info('Collected proxies from storage', { - total: allProxies.length, - working: workingCount, - }); - - // Update ProxyManager with all proxies - const manager = ProxyManager.getInstance(); - await manager.updateProxies(allProxies); - - logger.info('Proxy synchronization completed', { - synchronized: allProxies.length, - working: workingCount, - }); - } catch (error) { - logger.error('Failed to sync proxies', { error }); - throw error; - } - } - - /** - * Get synchronization status - */ - getStatus(): { isRunning: boolean; lastSync?: Date } { - return { - isRunning: this.isRunning, - }; - } -} - -// Export singleton instance -let syncServiceInstance: ProxySyncService | null = null; - -export function getProxySyncService(): ProxySyncService { - if (!syncServiceInstance) { - syncServiceInstance = new ProxySyncService(); - } - return syncServiceInstance; -} - -// Convenience functions -export async function startProxySync(intervalMs?: number): Promise { - const service = getProxySyncService(); - await service.start(intervalMs); -} - -export function stopProxySync(): void { - const service = getProxySyncService(); - service.stop(); -} - -export async function syncProxiesOnce(): Promise { - const service = getProxySyncService(); - await service.syncProxies(); -} \ No newline at end of file diff --git a/package.json b/package.json index c535e5e..d647e05 100644 --- a/package.json +++ b/package.json @@ -57,7 +57,7 @@ "workspaces": [ "libs/*", "libs/core/*", - "libs/data/*", + "libs/data/*", "libs/services/*", "apps/*" ], @@ -96,6 +96,7 @@ "@primeng/themes": "^19.1.3", "@tanstack/table-core": "^8.21.3", "@types/pg": "^8.15.4", + "awilix": "^12.0.5", "bullmq": "^5.53.2", "ioredis": "^5.6.1", "pg": "^8.16.0",