diff --git a/.serena/cache/typescript/document_symbols_cache_v20-05-25.pkl b/.serena/cache/typescript/document_symbols_cache_v20-05-25.pkl index 0f0affb..b5aa38e 100644 Binary files a/.serena/cache/typescript/document_symbols_cache_v20-05-25.pkl and b/.serena/cache/typescript/document_symbols_cache_v20-05-25.pkl differ diff --git a/apps/stock/data-ingestion/src/handlers/webshare/operations/fetch.operations.ts b/apps/stock/data-ingestion/src/handlers/webshare/operations/fetch.operations.ts deleted file mode 100644 index 6c14688..0000000 --- a/apps/stock/data-ingestion/src/handlers/webshare/operations/fetch.operations.ts +++ /dev/null @@ -1,102 +0,0 @@ -/** - * WebShare Fetch Operations - API integration - */ -import { OperationContext } from '@stock-bot/di'; -import type { ProxyInfo } from '@stock-bot/proxy'; -import { WEBSHARE_CONFIG } from '../shared/config'; - -/** - * Fetch proxies from WebShare API and convert to ProxyInfo format - */ -export async function fetchWebShareProxies(): Promise { - const ctx = OperationContext.create('webshare', 'fetch-proxies'); - - try { - // Get configuration from stock config system - ensure it's initialized - const { getStockConfig, initializeStockConfig } = await import('@stock-bot/stock-config'); - - // Try to get existing config, or initialize if needed - let config; - try { - config = getStockConfig(); - } catch (error) { - // Config not initialized yet, initialize it - config = initializeStockConfig('dataIngestion'); - } - - const apiKey = config.webshare?.apiKey; - const apiUrl = config.webshare?.apiUrl; - - ctx.logger.debug('WebShare config loaded', { - hasConfig: !!config, - hasWebshare: !!config.webshare, - webshareConfig: config.webshare, - apiKeyLength: apiKey?.length || 0, - apiUrl: apiUrl, - envApiKey: process.env.WEBSHARE_API_KEY ? 'SET' : 'NOT_SET', - }); - - if (!apiKey || !apiUrl) { - ctx.logger.error('Missing WebShare configuration', { - hasApiKey: !!apiKey, - hasApiUrl: !!apiUrl, - apiKeyValue: apiKey ? `${apiKey.substring(0, 5)}...` : 'NOT_SET', - }); - return []; - } - - ctx.logger.info('Fetching proxies from WebShare API', { apiUrl }); - - const response = await fetch( - `${apiUrl}proxy/list/?mode=${WEBSHARE_CONFIG.DEFAULT_MODE}&page=${WEBSHARE_CONFIG.DEFAULT_PAGE}&page_size=${WEBSHARE_CONFIG.DEFAULT_PAGE_SIZE}`, - { - method: 'GET', - headers: { - Authorization: `Token ${apiKey}`, - 'Content-Type': 'application/json', - }, - signal: AbortSignal.timeout(WEBSHARE_CONFIG.TIMEOUT), - } - ); - - if (!response.ok) { - ctx.logger.error('WebShare API request failed', { - status: response.status, - statusText: response.statusText, - }); - return []; - } - - const data = await response.json(); - - if (!data.results || !Array.isArray(data.results)) { - ctx.logger.error('Invalid response format from WebShare API', { data }); - return []; - } - - // Transform proxy data to ProxyInfo format - const proxies: ProxyInfo[] = data.results.map( - (proxy: { username: string; password: string; proxy_address: string; port: number }) => ({ - source: 'webshare', - protocol: 'http' as const, - host: proxy.proxy_address, - port: proxy.port, - username: proxy.username, - password: proxy.password, - isWorking: true, // WebShare provides working proxies - firstSeen: new Date(), - lastChecked: new Date(), - }) - ); - - ctx.logger.info('Successfully fetched proxies from WebShare', { - count: proxies.length, - total: data.count || proxies.length, - }); - - return proxies; - } catch (error) { - ctx.logger.error('Failed to fetch proxies from WebShare', { error }); - return []; - } -} diff --git a/apps/stock/data-ingestion/src/handlers/webshare/webshare.handler.ts b/apps/stock/data-ingestion/src/handlers/webshare/webshare.handler.ts index decd28d..19cf5c0 100644 --- a/apps/stock/data-ingestion/src/handlers/webshare/webshare.handler.ts +++ b/apps/stock/data-ingestion/src/handlers/webshare/webshare.handler.ts @@ -14,59 +14,52 @@ export class WebShareHandler extends BaseHandler { } @Operation('fetch-proxies') - @QueueSchedule('0 */6 * * *', { // once a month + @QueueSchedule('0 */6 * * *', { // every 6 hours priority: 3, - immediately: true, - description: 'Fetch fresh proxies from WebShare API', + immediately: false, // Don't run immediately since ProxyManager fetches on startup + description: 'Refresh proxies from WebShare API', }) async fetchProxies(_input: unknown, _context: ExecutionContext): Promise { - this.logger.info('Fetching proxies from WebShare API'); + this.logger.info('Refreshing proxies from WebShare API'); try { - const { fetchWebShareProxies } = await import('./operations/fetch.operations'); - const proxies = await fetchWebShareProxies(); - - if (proxies.length > 0) { - // Update the centralized proxy manager using the injected service - if (!this.proxy) { - this.logger.warn('Proxy manager is not initialized, cannot update proxies'); - return { - success: false, - proxiesUpdated: 0, - error: 'Proxy manager not initialized', - }; - } - await this.proxy.updateProxies(proxies); - - this.logger.info('Updated proxy manager with WebShare proxies', { - count: proxies.length, - workingCount: proxies.filter(p => p.isWorking !== false).length, - }); - - // Cache proxy stats for monitoring using handler's cache methods - await this.cacheSet('proxy-count', proxies.length, 3600); - await this.cacheSet( - 'working-count', - proxies.filter(p => p.isWorking !== false).length, - 3600 - ); - await this.cacheSet('last-fetch', new Date().toISOString(), 1800); - - return { - success: true, - proxiesUpdated: proxies.length, - workingProxies: proxies.filter(p => p.isWorking !== false).length, - }; - } else { - this.logger.warn('No proxies fetched from WebShare API'); + // Check if proxy manager is available + if (!this.proxy) { + this.logger.warn('Proxy manager is not initialized, cannot refresh proxies'); return { success: false, - proxiesUpdated: 0, - error: 'No proxies returned from API', + error: 'Proxy manager not initialized', }; } + + // Use the proxy manager's refresh method + await this.proxy.refreshProxies(); + + // Get stats after refresh + const stats = this.proxy.getStats(); + const lastFetchTime = this.proxy.getLastFetchTime(); + + this.logger.info('Successfully refreshed proxies', { + total: stats.total, + working: stats.working, + failed: stats.failed, + lastFetchTime, + }); + + // Cache proxy stats for monitoring using handler's cache methods + await this.cacheSet('proxy-count', stats.total, 3600); + await this.cacheSet('working-count', stats.working, 3600); + await this.cacheSet('last-fetch', lastFetchTime?.toISOString() || 'unknown', 1800); + + return { + success: true, + proxiesUpdated: stats.total, + workingProxies: stats.working, + failedProxies: stats.failed, + lastFetchTime, + }; } catch (error) { - this.logger.error('Failed to fetch and update proxies', { error }); + this.logger.error('Failed to refresh proxies', { error }); throw error; } } diff --git a/libs/core/di/src/config/schemas/index.ts b/libs/core/di/src/config/schemas/index.ts index bfd8de6..a0559da 100644 --- a/libs/core/di/src/config/schemas/index.ts +++ b/libs/core/di/src/config/schemas/index.ts @@ -1,25 +1,28 @@ -import { z } from 'zod'; -import { redisConfigSchema } from './redis.schema'; -import { mongodbConfigSchema } from './mongodb.schema'; -import { postgresConfigSchema } from './postgres.schema'; -import { questdbConfigSchema } from './questdb.schema'; -import { proxyConfigSchema, browserConfigSchema, queueConfigSchema } from './service.schema'; - -export const appConfigSchema = z.object({ - redis: redisConfigSchema, - mongodb: mongodbConfigSchema, - postgres: postgresConfigSchema, - questdb: questdbConfigSchema.optional(), - proxy: proxyConfigSchema.optional(), - browser: browserConfigSchema.optional(), - queue: queueConfigSchema.optional(), -}); - -export type AppConfig = z.infer; - -// Re-export individual schemas and types -export * from './redis.schema'; -export * from './mongodb.schema'; -export * from './postgres.schema'; -export * from './questdb.schema'; +import { z } from 'zod'; +import { redisConfigSchema } from './redis.schema'; +import { mongodbConfigSchema } from './mongodb.schema'; +import { postgresConfigSchema } from './postgres.schema'; +import { questdbConfigSchema } from './questdb.schema'; +import { proxyConfigSchema, browserConfigSchema, queueConfigSchema } from './service.schema'; + +export const appConfigSchema = z.object({ + redis: redisConfigSchema, + mongodb: mongodbConfigSchema, + postgres: postgresConfigSchema, + questdb: questdbConfigSchema.optional(), + proxy: proxyConfigSchema.optional(), + browser: browserConfigSchema.optional(), + queue: queueConfigSchema.optional(), + service: z.object({ + name: z.string(), + }).optional(), +}); + +export type AppConfig = z.infer; + +// Re-export individual schemas and types +export * from './redis.schema'; +export * from './mongodb.schema'; +export * from './postgres.schema'; +export * from './questdb.schema'; export * from './service.schema'; \ No newline at end of file diff --git a/libs/core/di/src/config/schemas/service.schema.ts b/libs/core/di/src/config/schemas/service.schema.ts index 40f955f..47420af 100644 --- a/libs/core/di/src/config/schemas/service.schema.ts +++ b/libs/core/di/src/config/schemas/service.schema.ts @@ -1,33 +1,38 @@ -import { z } from 'zod'; - -export const proxyConfigSchema = z.object({ - cachePrefix: z.string().optional().default('proxy:'), - ttl: z.number().optional().default(3600), -}); - -export const browserConfigSchema = z.object({ - headless: z.boolean().optional().default(true), - timeout: z.number().optional().default(30000), -}); - -export const queueConfigSchema = z.object({ - enabled: z.boolean().optional().default(true), - workers: z.number().optional().default(1), - concurrency: z.number().optional().default(1), - enableScheduledJobs: z.boolean().optional().default(true), - delayWorkerStart: z.boolean().optional().default(false), - defaultJobOptions: z.object({ - attempts: z.number().default(3), - backoff: z.object({ - type: z.enum(['exponential', 'fixed']).default('exponential'), - delay: z.number().default(1000), - }).default({}), - removeOnComplete: z.number().default(100), - removeOnFail: z.number().default(50), - timeout: z.number().optional(), - }).optional().default({}), -}); - -export type ProxyConfig = z.infer; -export type BrowserConfig = z.infer; +import { z } from 'zod'; + +export const proxyConfigSchema = z.object({ + enabled: z.boolean().default(false), + cachePrefix: z.string().optional().default('proxy:'), + ttl: z.number().optional().default(3600), + webshare: z.object({ + apiKey: z.string(), + apiUrl: z.string().default('https://proxy.webshare.io/api/v2/'), + }).optional(), +}); + +export const browserConfigSchema = z.object({ + headless: z.boolean().optional().default(true), + timeout: z.number().optional().default(30000), +}); + +export const queueConfigSchema = z.object({ + enabled: z.boolean().optional().default(true), + workers: z.number().optional().default(1), + concurrency: z.number().optional().default(1), + enableScheduledJobs: z.boolean().optional().default(true), + delayWorkerStart: z.boolean().optional().default(false), + defaultJobOptions: z.object({ + attempts: z.number().default(3), + backoff: z.object({ + type: z.enum(['exponential', 'fixed']).default('exponential'), + delay: z.number().default(1000), + }).default({}), + removeOnComplete: z.number().default(100), + removeOnFail: z.number().default(50), + timeout: z.number().optional(), + }).optional().default({}), +}); + +export type ProxyConfig = z.infer; +export type BrowserConfig = z.infer; export type QueueConfig = z.infer; \ No newline at end of file diff --git a/libs/core/di/src/container/builder.ts b/libs/core/di/src/container/builder.ts index 42cd419..5c6138c 100644 --- a/libs/core/di/src/container/builder.ts +++ b/libs/core/di/src/container/builder.ts @@ -1,4 +1,4 @@ -import { createContainer, InjectionMode, type AwilixContainer } from 'awilix'; +import { createContainer, InjectionMode, asFunction, type AwilixContainer } from 'awilix'; import type { AppConfig as StockBotAppConfig } from '@stock-bot/config'; import { appConfigSchema, type AppConfig } from '../config/schemas'; import { @@ -100,7 +100,7 @@ export class ServiceContainerBuilder { influxPort: 9009, database: 'questdb', }) : undefined, - proxy: this.options.enableProxy ? (config.proxy || { cachePrefix: 'proxy:', ttl: 3600 }) : undefined, + proxy: this.options.enableProxy ? (config.proxy || { enabled: false, cachePrefix: 'proxy:', ttl: 3600 }) : undefined, browser: this.options.enableBrowser ? (config.browser || { headless: true, timeout: 30000 }) : undefined, queue: this.options.enableQueue ? (config.queue || { enabled: true, @@ -127,11 +127,12 @@ export class ServiceContainerBuilder { // Register service container aggregate container.register({ serviceContainer: asFunction(({ - config, logger, cache, proxyManager, browser, + config, logger, cache, globalCache, proxyManager, browser, queueManager, mongoClient, postgresClient, questdbClient }) => ({ logger, cache, + globalCache, proxy: proxyManager, // Map proxyManager to proxy browser, queue: queueManager, // Map queueManager to queue @@ -181,10 +182,14 @@ export class ServiceContainerBuilder { } : undefined, queue: stockBotConfig.queue, browser: stockBotConfig.browser, - proxy: stockBotConfig.proxy, + proxy: stockBotConfig.proxy ? { + ...{ + enabled: false, + cachePrefix: 'proxy:', + ttl: 3600, + }, + ...stockBotConfig.proxy + } : undefined, }; } -} - -// Add missing import -import { asFunction } from 'awilix'; \ No newline at end of file +} \ No newline at end of file diff --git a/libs/core/di/src/container/types.ts b/libs/core/di/src/container/types.ts index afe0593..8da34b0 100644 --- a/libs/core/di/src/container/types.ts +++ b/libs/core/di/src/container/types.ts @@ -1,13 +1,13 @@ +import type { Browser } from '@stock-bot/browser'; +import type { CacheProvider } from '@stock-bot/cache'; import type { IServiceContainer } from '@stock-bot/handlers'; import type { Logger } from '@stock-bot/logger'; -import type { AppConfig } from '../config/schemas'; -import type { CacheProvider } from '@stock-bot/cache'; -import type { ProxyManager } from '@stock-bot/proxy'; -import type { Browser } from '@stock-bot/browser'; -import type { QueueManager } from '@stock-bot/queue'; import type { MongoDBClient } from '@stock-bot/mongodb'; import type { PostgreSQLClient } from '@stock-bot/postgres'; +import type { ProxyManager } from '@stock-bot/proxy'; import type { QuestDBClient } from '@stock-bot/questdb'; +import type { SmartQueueManager } from '@stock-bot/queue'; +import type { AppConfig } from '../config/schemas'; export interface ServiceDefinitions { // Configuration @@ -16,9 +16,10 @@ export interface ServiceDefinitions { // Core services cache: CacheProvider | null; + globalCache: CacheProvider | null; proxyManager: ProxyManager | null; browser: Browser; - queueManager: QueueManager | null; + queueManager: SmartQueueManager | null; // Database clients mongoClient: MongoDBClient | null; diff --git a/libs/core/di/src/registrations/cache.registration.ts b/libs/core/di/src/registrations/cache.registration.ts index 1dddc9e..84993ee 100644 --- a/libs/core/di/src/registrations/cache.registration.ts +++ b/libs/core/di/src/registrations/cache.registration.ts @@ -10,18 +10,34 @@ export function registerCacheServices( if (config.redis.enabled) { container.register({ cache: asFunction(() => { - return createCache({ - redisConfig: { - host: config.redis.host, - port: config.redis.port, - password: config.redis.password, - }, + const { createServiceCache } = require('@stock-bot/queue'); + const serviceName = config.service?.name || 'unknown'; + + // Create service-specific cache that uses the service's Redis DB + return createServiceCache(serviceName, { + host: config.redis.host, + port: config.redis.port, + password: config.redis.password, + db: config.redis.db, // This will be overridden by ServiceCache }); }).singleton(), + + // Also provide global cache for shared data + globalCache: asFunction(() => { + const { createServiceCache } = require('@stock-bot/queue'); + const serviceName = config.service?.name || 'unknown'; + + return createServiceCache(serviceName, { + host: config.redis.host, + port: config.redis.port, + password: config.redis.password, + }, { global: true }); + }).singleton(), }); } else { container.register({ cache: asValue(null), + globalCache: asValue(null), }); } } \ No newline at end of file diff --git a/libs/core/di/src/registrations/service.registration.ts b/libs/core/di/src/registrations/service.registration.ts index a7ef747..357a963 100644 --- a/libs/core/di/src/registrations/service.registration.ts +++ b/libs/core/di/src/registrations/service.registration.ts @@ -32,9 +32,13 @@ export function registerApplicationServices( if (config.proxy && config.redis.enabled) { container.register({ proxyManager: asFunction(({ cache, logger }) => { - if (!cache) {return null;} + if (!cache) return null; + const proxyCache = new NamespacedCache(cache, 'proxy'); - return new ProxyManager(proxyCache, logger); + const proxyManager = new ProxyManager(proxyCache, config.proxy, logger); + + // Note: Initialization will be handled by the lifecycle manager + return proxyManager; }).singleton(), }); } else { @@ -47,8 +51,9 @@ export function registerApplicationServices( if (config.queue?.enabled && config.redis.enabled) { container.register({ queueManager: asFunction(({ logger }) => { - const { QueueManager } = require('@stock-bot/queue'); + const { SmartQueueManager } = require('@stock-bot/queue'); const queueConfig = { + serviceName: config.service?.name || 'unknown', redis: { host: config.redis.host, port: config.redis.port, @@ -62,8 +67,9 @@ export function registerApplicationServices( }, enableScheduledJobs: config.queue!.enableScheduledJobs ?? true, delayWorkerStart: config.queue!.delayWorkerStart ?? false, + autoDiscoverHandlers: true, }; - return new QueueManager(queueConfig, logger); + return new SmartQueueManager(queueConfig, logger); }).singleton(), }); } else { diff --git a/libs/core/di/src/utils/lifecycle.ts b/libs/core/di/src/utils/lifecycle.ts index 44ae23d..528bb49 100644 --- a/libs/core/di/src/utils/lifecycle.ts +++ b/libs/core/di/src/utils/lifecycle.ts @@ -15,6 +15,7 @@ export class ServiceLifecycleManager { { name: 'mongoClient', key: 'mongoClient' as const }, { name: 'postgresClient', key: 'postgresClient' as const }, { name: 'questdbClient', key: 'questdbClient' as const }, + { name: 'proxyManager', key: 'proxyManager' as const }, { name: 'queueManager', key: 'queueManager' as const }, ]; diff --git a/libs/core/handlers/src/base/BaseHandler.ts b/libs/core/handlers/src/base/BaseHandler.ts index 3ee72b2..c32f6c4 100644 --- a/libs/core/handlers/src/base/BaseHandler.ts +++ b/libs/core/handlers/src/base/BaseHandler.ts @@ -40,6 +40,7 @@ export abstract class BaseHandler implements IHandler { // Direct service properties - flattened for cleaner access readonly logger; readonly cache; + readonly globalCache; readonly queue; readonly proxy; readonly browser; @@ -53,6 +54,7 @@ export abstract class BaseHandler implements IHandler { // Flatten all services onto the handler instance this.logger = getLogger(this.constructor.name); this.cache = services.cache; + this.globalCache = services.globalCache; this.queue = services.queue; this.proxy = services.proxy; this.browser = services.browser; @@ -191,7 +193,36 @@ export abstract class BaseHandler implements IHandler { } return this.cache.del(`cache:${this.handlerName}:${key}`); } - + + /** + * Set global cache with key + */ + protected async globalCacheSet(key: string, value: any, ttl?: number): Promise { + if (!this.globalCache) { + return; + } + return this.globalCache.set(key, value, ttl); + } + + /** + * Get global cache with key + */ + protected async globalCacheGet(key: string): Promise { + if (!this.globalCache) { + return null; + } + return this.globalCache.get(key); + } + + /** + * Delete global cache with key + */ + protected async globalCacheDel(key: string): Promise { + if (!this.globalCache) { + return; + } + return this.globalCache.del(key); + } /** * Schedule operation with delay in seconds */ diff --git a/libs/core/handlers/src/types/service-container.ts b/libs/core/handlers/src/types/service-container.ts index 00c1aed..adac159 100644 --- a/libs/core/handlers/src/types/service-container.ts +++ b/libs/core/handlers/src/types/service-container.ts @@ -13,6 +13,7 @@ export interface IServiceContainer { // Core infrastructure readonly logger: any; // Logger instance readonly cache?: any; // Cache provider (Redis/Dragonfly) - optional + readonly globalCache?: any; // Global cache provider (shared across services) - optional readonly queue?: any; // Queue manager (BullMQ) - optional readonly proxy?: ProxyManager; // Proxy manager service - optional (depends on cache) readonly browser?: any; // Browser automation (Playwright) diff --git a/libs/services/proxy/src/proxy-manager.ts b/libs/services/proxy/src/proxy-manager.ts index 39c262b..b53fdb3 100644 --- a/libs/services/proxy/src/proxy-manager.ts +++ b/libs/services/proxy/src/proxy-manager.ts @@ -9,13 +9,15 @@ export class ProxyManager { private proxies: ProxyInfo[] = []; private proxyIndex: number = 0; private lastUpdate: Date | null = null; + private lastFetchTime: Date | null = null; private isInitialized = false; private logger: any; + private config: ProxyManagerConfig; - constructor(cache: CacheProvider, _config: ProxyManagerConfig = {}, logger?: any) { + constructor(cache: CacheProvider, config: ProxyManagerConfig = {}, logger?: any) { this.cache = cache; + this.config = config; this.logger = logger || console; - // Config can be used in the future for customization } /** @@ -271,15 +273,123 @@ export class ProxyManager { } } + /** + * Fetch proxies from WebShare API + */ + private async fetchWebShareProxies(): Promise { + if (!this.config.webshare) { + throw new Error('WebShare configuration not provided'); + } + + const { apiKey, apiUrl } = this.config.webshare; + + this.logger.info('Fetching proxies from WebShare API', { apiUrl }); + + try { + const response = await fetch( + `${apiUrl}proxy/list/?mode=direct&page=1&page_size=100`, + { + method: 'GET', + headers: { + Authorization: `Token ${apiKey}`, + 'Content-Type': 'application/json', + }, + signal: AbortSignal.timeout(10000), // 10 second timeout + } + ); + + if (!response.ok) { + throw new Error(`WebShare API request failed: ${response.status} ${response.statusText}`); + } + + const data = await response.json(); + + if (!data.results || !Array.isArray(data.results)) { + throw new Error('Invalid response format from WebShare API'); + } + + // Transform proxy data to ProxyInfo format + const proxies: ProxyInfo[] = data.results.map( + (proxy: { username: string; password: string; proxy_address: string; port: number }) => ({ + source: 'webshare', + protocol: 'http' as const, + host: proxy.proxy_address, + port: proxy.port, + username: proxy.username, + password: proxy.password, + isWorking: true, // WebShare provides working proxies + firstSeen: new Date(), + lastChecked: new Date(), + }) + ); + + this.logger.info('Successfully fetched proxies from WebShare', { + count: proxies.length, + total: data.count || proxies.length, + }); + + this.lastFetchTime = new Date(); + return proxies; + } catch (error) { + this.logger.error('Failed to fetch proxies from WebShare', { error }); + throw error; + } + } + + /** + * Refresh proxies from WebShare (public method for manual refresh) + */ + async refreshProxies(): Promise { + if (!this.config.enabled || !this.config.webshare) { + this.logger.warn('Proxy refresh called but WebShare is not configured'); + return; + } + + try { + const proxies = await this.fetchWebShareProxies(); + await this.updateProxies(proxies); + } catch (error) { + this.logger.error('Failed to refresh proxies', { error }); + throw error; + } + } + + /** + * Get the last time proxies were fetched from WebShare + */ + getLastFetchTime(): Date | null { + return this.lastFetchTime; + } + /** * Initialize the proxy manager */ async initialize(): Promise { await this.initializeInternal(); - // Note: Initial proxy sync should be handled by the container or application - // that creates ProxyManager instance - this.logger.info('ProxyManager initialized - proxy sync should be handled externally'); + // Fetch proxies on startup if enabled + if (this.config.enabled && this.config.webshare) { + this.logger.info('Proxy fetching is enabled, fetching proxies from WebShare...'); + + try { + const proxies = await this.fetchWebShareProxies(); + if (proxies.length === 0) { + throw new Error('No proxies fetched from WebShare API'); + } + + await this.updateProxies(proxies); + this.logger.info('ProxyManager initialized with fresh proxies', { + count: proxies.length, + lastFetchTime: this.lastFetchTime, + }); + } catch (error) { + // If proxy fetching is enabled but fails, the service should not start + this.logger.error('Failed to fetch proxies during initialization', { error }); + throw new Error(`ProxyManager initialization failed: ${error instanceof Error ? error.message : 'Unknown error'}`); + } + } else { + this.logger.info('ProxyManager initialized without fetching proxies (disabled or not configured)'); + } } } diff --git a/libs/services/proxy/src/types.ts b/libs/services/proxy/src/types.ts index 963310f..52e3339 100644 --- a/libs/services/proxy/src/types.ts +++ b/libs/services/proxy/src/types.ts @@ -24,9 +24,14 @@ export interface ProxyInfo { } export interface ProxyManagerConfig { + enabled?: boolean; cachePrefix?: string; ttl?: number; enableMetrics?: boolean; + webshare?: { + apiKey: string; + apiUrl: string; + }; } export interface ProxySyncConfig { diff --git a/libs/services/queue/src/index.ts b/libs/services/queue/src/index.ts index 6072409..e1e2ae7 100644 --- a/libs/services/queue/src/index.ts +++ b/libs/services/queue/src/index.ts @@ -1,7 +1,16 @@ // Core exports export { Queue, type QueueWorkerConfig } from './queue'; export { QueueManager } from './queue-manager'; +export { SmartQueueManager } from './smart-queue-manager'; export { createJobHandler } from './types'; +export { ServiceCache, createServiceCache } from './service-cache'; +export { + SERVICE_REGISTRY, + getServiceConfig, + findServiceForHandler, + getFullQueueName, + parseQueueName +} from './service-registry'; // Re-export handler registry from types package export { handlerRegistry } from '@stock-bot/types'; @@ -55,4 +64,12 @@ export type { // Scheduled job types ScheduledJob, ScheduleConfig, + + // Smart Queue types + SmartQueueConfig, + QueueRoute, + } from './types'; + +// Re-export service registry types +export type { ServiceConfig } from './service-registry'; diff --git a/libs/services/queue/src/service-cache.ts b/libs/services/queue/src/service-cache.ts new file mode 100644 index 0000000..f8ade29 --- /dev/null +++ b/libs/services/queue/src/service-cache.ts @@ -0,0 +1,167 @@ +import { createCache, type CacheProvider, type CacheStats } from '@stock-bot/cache'; +import type { RedisConfig } from './types'; +import { getServiceConfig, type ServiceConfig } from './service-registry'; + +/** + * Service-aware cache that uses the service's Redis DB + * Automatically prefixes keys with the service's cache namespace + */ +export class ServiceCache implements CacheProvider { + private cache: CacheProvider; + private prefix: string; + + constructor( + serviceName: string, + redisConfig: RedisConfig, + isGlobalCache: boolean = false + ) { + // Get service configuration + const serviceConfig = getServiceConfig(serviceName); + if (!serviceConfig && !isGlobalCache) { + throw new Error(`Unknown service: ${serviceName}`); + } + + // Determine Redis DB and prefix + let db: number; + let prefix: string; + + if (isGlobalCache) { + // Global cache uses db:0 + db = 0; + prefix = 'stock-bot:shared'; + } else { + // Service cache uses service's DB + db = serviceConfig!.db; + prefix = serviceConfig!.cachePrefix; + } + + // Create underlying cache with correct DB + const cacheConfig = { + redisConfig: { + ...redisConfig, + db, + }, + keyPrefix: prefix + ':', + }; + + this.cache = createCache(cacheConfig); + this.prefix = prefix; + } + + // Implement CacheProvider interface + async get(key: string): Promise { + return this.cache.get(key); + } + + async set( + key: string, + value: T, + options?: + | number + | { + ttl?: number; + preserveTTL?: boolean; + onlyIfExists?: boolean; + onlyIfNotExists?: boolean; + getOldValue?: boolean; + } + ): Promise { + return this.cache.set(key, value, options); + } + + async del(key: string): Promise { + return this.cache.del(key); + } + + async exists(key: string): Promise { + return this.cache.exists(key); + } + + async clear(): Promise { + return this.cache.clear(); + } + + async keys(pattern: string): Promise { + return this.cache.keys(pattern); + } + + getStats(): CacheStats { + return this.cache.getStats(); + } + + async health(): Promise { + return this.cache.health(); + } + + async waitForReady(timeout?: number): Promise { + return this.cache.waitForReady(timeout); + } + + isReady(): boolean { + return this.cache.isReady(); + } + + // Enhanced cache methods (delegate to underlying cache if available) + async update(key: string, value: T): Promise { + if (this.cache.update) { + return this.cache.update(key, value); + } + // Fallback implementation + return this.cache.set(key, value, { preserveTTL: true }); + } + + async setIfExists(key: string, value: T, ttl?: number): Promise { + if (this.cache.setIfExists) { + return this.cache.setIfExists(key, value, ttl); + } + // Fallback implementation + const result = await this.cache.set(key, value, { onlyIfExists: true, ttl }); + return result !== null; + } + + async setIfNotExists(key: string, value: T, ttl?: number): Promise { + if (this.cache.setIfNotExists) { + return this.cache.setIfNotExists(key, value, ttl); + } + // Fallback implementation + const result = await this.cache.set(key, value, { onlyIfNotExists: true, ttl }); + return result !== null; + } + + async replace(key: string, value: T, ttl?: number): Promise { + if (this.cache.replace) { + return this.cache.replace(key, value, ttl); + } + // Fallback implementation + return this.cache.set(key, value, ttl); + } + + async updateField(key: string, updater: (current: T | null) => T, ttl?: number): Promise { + if (this.cache.updateField) { + return this.cache.updateField(key, updater, ttl); + } + // Fallback implementation + const current = await this.cache.get(key); + const updated = updater(current); + return this.cache.set(key, updated, ttl); + } + + /** + * Get the actual Redis key with prefix + */ + getKey(key: string): string { + return `${this.prefix}:${key}`; + } +} + + +/** + * Factory function to create service cache + */ +export function createServiceCache( + serviceName: string, + redisConfig: RedisConfig, + options: { global?: boolean } = {} +): ServiceCache { + return new ServiceCache(serviceName, redisConfig, options.global); +} \ No newline at end of file diff --git a/libs/services/queue/src/service-registry.ts b/libs/services/queue/src/service-registry.ts new file mode 100644 index 0000000..6158793 --- /dev/null +++ b/libs/services/queue/src/service-registry.ts @@ -0,0 +1,89 @@ +/** + * Service Registry Configuration + * Maps services to their Redis databases and configurations + */ + +export interface ServiceConfig { + /** Redis database number for this service (used for both queues and cache) */ + db: number; + /** Prefix for queue keys (e.g., 'bull:di') */ + queuePrefix: string; + /** Prefix for cache keys (e.g., 'cache:di') */ + cachePrefix: string; + /** Whether this service only produces jobs (doesn't process them) */ + producerOnly?: boolean; + /** List of handlers this service owns (auto-discovered if not provided) */ + handlers?: string[]; +} + +/** + * Central registry of all services and their configurations + * Each service gets one Redis DB for both queues and cache + * + * Database assignments: + * - db:0 = Global shared cache + * - db:1 = data-ingestion (queues + cache) + * - db:2 = data-pipeline (queues + cache) + * - db:3 = web-api (cache only, producer-only for queues) + */ +export const SERVICE_REGISTRY: Record = { + 'data-ingestion': { + db: 1, + queuePrefix: 'bull:di', + cachePrefix: 'cache:di', + handlers: ['ceo', 'qm', 'webshare', 'ib', 'proxy'], + }, + 'data-pipeline': { + db: 2, + queuePrefix: 'bull:dp', + cachePrefix: 'cache:dp', + handlers: ['exchanges', 'symbols'], + }, + 'web-api': { + db: 3, + queuePrefix: 'bull:api', // Not used since producer-only + cachePrefix: 'cache:api', + producerOnly: true, + }, + // Add more services as needed +}; + +/** + * Get service configuration + */ +export function getServiceConfig(serviceName: string): ServiceConfig | undefined { + return SERVICE_REGISTRY[serviceName]; +} + +/** + * Find which service owns a handler + */ +export function findServiceForHandler(handlerName: string): string | undefined { + for (const [serviceName, config] of Object.entries(SERVICE_REGISTRY)) { + if (config.handlers?.includes(handlerName)) { + return serviceName; + } + } + return undefined; +} + +/** + * Get full queue name with service namespace + */ +export function getFullQueueName(serviceName: string, handlerName: string): string { + return `${serviceName}:${handlerName}`; +} + +/** + * Parse a full queue name into service and handler + */ +export function parseQueueName(fullQueueName: string): { service: string; handler: string } | null { + const parts = fullQueueName.split(':'); + if (parts.length !== 2 || !parts[0] || !parts[1]) { + return null; + } + return { + service: parts[0], + handler: parts[1], + }; +} \ No newline at end of file diff --git a/libs/services/queue/src/smart-queue-manager.ts b/libs/services/queue/src/smart-queue-manager.ts new file mode 100644 index 0000000..e4efeb5 --- /dev/null +++ b/libs/services/queue/src/smart-queue-manager.ts @@ -0,0 +1,301 @@ +import { Queue as BullQueue, type Job } from 'bullmq'; +import IoRedis from 'ioredis'; +import { handlerRegistry } from '@stock-bot/types'; +import { getLogger, type Logger } from '@stock-bot/logger'; +import { QueueManager } from './queue-manager'; +import { Queue } from './queue'; +import type { + SmartQueueConfig, + QueueRoute, + JobData, + JobOptions, + RedisConfig +} from './types'; +import { + SERVICE_REGISTRY, + getServiceConfig, + findServiceForHandler, + getFullQueueName, + parseQueueName, + type ServiceConfig +} from './service-registry'; +import { getRedisConnection } from './utils'; + +/** + * Smart Queue Manager with automatic service discovery and routing + * Handles cross-service communication seamlessly + */ +export class SmartQueueManager extends QueueManager { + private serviceName: string; + private serviceConfig: ServiceConfig; + private queueRoutes = new Map(); + private connections = new Map(); // Redis connections by DB + private producerQueues = new Map(); // For cross-service sending + private _logger: Logger; + + constructor(config: SmartQueueConfig, logger?: Logger) { + // Get service config + const serviceConfig = getServiceConfig(config.serviceName); + if (!serviceConfig) { + throw new Error(`Unknown service: ${config.serviceName}`); + } + + // Update Redis config to use service's DB + const modifiedConfig = { + ...config, + redis: { + ...config.redis, + db: serviceConfig.db, + }, + }; + + super(modifiedConfig, logger); + + this.serviceName = config.serviceName; + this.serviceConfig = serviceConfig; + this._logger = logger || getLogger('SmartQueueManager'); + + // Auto-discover routes if enabled + if (config.autoDiscoverHandlers !== false) { + this.discoverQueueRoutes(); + } + + this._logger.info('SmartQueueManager initialized', { + service: this.serviceName, + db: serviceConfig.db, + handlers: serviceConfig.handlers, + producerOnly: serviceConfig.producerOnly, + }); + } + + /** + * Discover all available queue routes from handler registry + */ + private discoverQueueRoutes(): void { + // Discover from handler registry if available + try { + const handlers = handlerRegistry.getAllHandlers(); + for (const [handlerName, handlerConfig] of handlers) { + // Find which service owns this handler + const ownerService = findServiceForHandler(handlerName); + if (ownerService) { + const ownerConfig = getServiceConfig(ownerService)!; + const fullName = getFullQueueName(ownerService, handlerName); + + this.queueRoutes.set(handlerName, { + fullName, + service: ownerService, + handler: handlerName, + db: ownerConfig.db, + operations: Object.keys(handlerConfig.operations || {}), + }); + + this._logger.trace('Discovered queue route', { + handler: handlerName, + service: ownerService, + db: ownerConfig.db, + }); + } + } + } catch (error) { + this._logger.warn('Handler registry not available, using static configuration', { error }); + } + + // Also add routes from static configuration + Object.entries(SERVICE_REGISTRY).forEach(([serviceName, config]) => { + if (config.handlers) { + config.handlers.forEach(handlerName => { + if (!this.queueRoutes.has(handlerName)) { + const fullName = getFullQueueName(serviceName, handlerName); + this.queueRoutes.set(handlerName, { + fullName, + service: serviceName, + handler: handlerName, + db: config.db, + }); + } + }); + } + }); + } + + /** + * Get or create a Redis connection for a specific DB + */ + private getConnection(db: number): any { + if (!this.connections.has(db)) { + const redisConfig: RedisConfig = { + ...this.getRedisConfig(), + db, + }; + const connection = getRedisConnection(redisConfig); + this.connections.set(db, connection); + this._logger.debug('Created Redis connection', { db }); + } + return this.connections.get(db); + } + + /** + * Get a queue for the current service (for processing) + * Overrides parent to use namespaced queue names + */ + override getQueue(queueName: string, options = {}): Queue { + // For local queues, use the service namespace + const fullQueueName = getFullQueueName(this.serviceName, queueName); + return super.getQueue(fullQueueName, options); + } + + /** + * Send a job to any queue (local or remote) + * This is the main method for cross-service communication + */ + async send( + targetQueue: string, + operation: string, + payload: unknown, + options: JobOptions = {} + ): Promise { + // Resolve the target queue + const route = this.resolveQueueRoute(targetQueue); + if (!route) { + throw new Error(`Unknown queue: ${targetQueue}`); + } + + // Validate operation if we have metadata + if (route.operations && !route.operations.includes(operation)) { + this._logger.warn('Operation not found in handler metadata', { + queue: targetQueue, + operation, + available: route.operations, + }); + } + + // Get or create producer queue for the target + const producerQueue = this.getProducerQueue(route); + + // Create job data + const jobData: JobData = { + handler: route.handler, + operation, + payload, + }; + + // Send the job + const job = await producerQueue.add(operation, jobData, options); + + this._logger.debug('Job sent to queue', { + from: this.serviceName, + to: route.service, + queue: route.handler, + operation, + jobId: job.id, + }); + + return job; + } + + /** + * Alias for send() with more explicit name + */ + async sendTo( + targetService: string, + handler: string, + operation: string, + payload: unknown, + options: JobOptions = {} + ): Promise { + const fullQueueName = `${targetService}:${handler}`; + return this.send(fullQueueName, operation, payload, options); + } + + /** + * Resolve a queue name to a route + */ + private resolveQueueRoute(queueName: string): QueueRoute | null { + // Check if it's a full name (service:handler) + const parsed = parseQueueName(queueName); + if (parsed) { + const config = getServiceConfig(parsed.service); + if (config) { + return { + fullName: queueName, + service: parsed.service, + handler: parsed.handler, + db: config.db, + }; + } + } + + // Check if it's just a handler name + const route = this.queueRoutes.get(queueName); + if (route) { + return route; + } + + // Try to find in static config + const ownerService = findServiceForHandler(queueName); + if (ownerService) { + const config = getServiceConfig(ownerService)!; + return { + fullName: getFullQueueName(ownerService, queueName), + service: ownerService, + handler: queueName, + db: config.db, + }; + } + + return null; + } + + /** + * Get or create a producer queue for cross-service communication + */ + private getProducerQueue(route: QueueRoute): BullQueue { + if (!this.producerQueues.has(route.fullName)) { + const connection = this.getConnection(route.db); + const queue = new BullQueue(route.fullName, { + connection, + defaultJobOptions: this.getConfig().defaultQueueOptions?.defaultJobOptions || {}, + }); + this.producerQueues.set(route.fullName, queue); + } + return this.producerQueues.get(route.fullName)!; + } + + /** + * Get statistics for all queues across all services + */ + async getAllStats(): Promise> { + const stats: Record = {}; + + // Get stats for local queues + stats[this.serviceName] = await this.getGlobalStats(); + + // Get stats for other services if we have access + // This would require additional implementation + + return stats; + } + + /** + * Graceful shutdown + */ + override async shutdown(): Promise { + // Close producer queues + for (const [name, queue] of this.producerQueues) { + await queue.close(); + this._logger.debug('Closed producer queue', { queue: name }); + } + + // Close additional connections + for (const [db, connection] of this.connections) { + if (db !== this.serviceConfig.db) { // Don't close our main connection + connection.disconnect(); + this._logger.debug('Closed Redis connection', { db }); + } + } + + // Call parent shutdown + await super.shutdown(); + } +} \ No newline at end of file diff --git a/libs/services/queue/src/types.ts b/libs/services/queue/src/types.ts index 33d4dd9..a2b6f15 100644 --- a/libs/services/queue/src/types.ts +++ b/libs/services/queue/src/types.ts @@ -163,3 +163,26 @@ export interface ScheduleConfig { data?: unknown; options?: JobOptions; } + +// Smart Queue Types +export interface SmartQueueConfig extends QueueManagerConfig { + /** Name of the current service */ + serviceName: string; + /** Whether to auto-discover handlers from registry */ + autoDiscoverHandlers?: boolean; + /** Custom service registry (defaults to built-in) */ + serviceRegistry?: Record; +} + +export interface QueueRoute { + /** Full queue name (e.g., 'data-ingestion:ceo') */ + fullName: string; + /** Service that owns this queue */ + service: string; + /** Handler name */ + handler: string; + /** Redis DB number */ + db: number; + /** Available operations */ + operations?: string[]; +}