From 19dfda23925a72961d7a3da46cecc5315bd0ffb4 Mon Sep 17 00:00:00 2001 From: Boki Date: Sun, 22 Jun 2025 20:34:35 -0400 Subject: [PATCH] fixed cache keys --- .../src/handlers/example-handler.ts | 107 ------------------ .../src/handlers/webshare/webshare.handler.ts | 10 +- libs/core/di/src/awilix-container.ts | 55 +++++---- libs/core/handlers/src/base/BaseHandler.ts | 15 ++- libs/data/cache/src/cache-factory.ts | 23 ++++ libs/data/cache/src/index.ts | 2 + libs/data/cache/src/namespaced-cache.ts | 89 +++++++++++++++ libs/services/proxy/src/proxy-manager.ts | 8 +- libs/services/queue/src/batch-processor.ts | 37 ++++-- libs/services/queue/src/dlq-handler.ts | 22 ++-- libs/services/queue/src/queue-manager.ts | 52 ++++----- libs/services/queue/src/queue.ts | 60 ++++++---- libs/services/queue/src/rate-limiter.ts | 27 +++-- 13 files changed, 286 insertions(+), 221 deletions(-) delete mode 100644 apps/data-ingestion/src/handlers/example-handler.ts create mode 100644 libs/data/cache/src/cache-factory.ts create mode 100644 libs/data/cache/src/namespaced-cache.ts diff --git a/apps/data-ingestion/src/handlers/example-handler.ts b/apps/data-ingestion/src/handlers/example-handler.ts deleted file mode 100644 index f2c55eb..0000000 --- a/apps/data-ingestion/src/handlers/example-handler.ts +++ /dev/null @@ -1,107 +0,0 @@ -import { OperationContext } from '@stock-bot/di'; -import type { ServiceContainer } from '@stock-bot/di'; - -/** - * Example handler showing how to use the new connection pooling pattern - */ -export class ExampleHandler { - constructor(private readonly container: ServiceContainer) {} - - /** - * Example operation using the enhanced OperationContext - */ - async performOperation(data: any): Promise { - // Create operation context with container - const context = new OperationContext('example-handler', 'perform-operation', this.container, { - data, - }); - - try { - // Log operation start - context.logger.info('Starting operation', { data }); - - // Use MongoDB through service resolution - const mongodb = context.resolve('mongodb'); - const result = await mongodb.collection('test').insertOne(data); - context.logger.debug('MongoDB insert complete', { insertedId: result.insertedId }); - - // Use PostgreSQL through service resolution - const postgres = context.resolve('postgres'); - await postgres.query('INSERT INTO operations (id, status) VALUES ($1, $2)', [ - result.insertedId, - 'completed', - ]); - - // Use cache through service resolution - const cache = context.resolve('cache'); - await cache.set(`operation:${result.insertedId}`, { - status: 'completed', - timestamp: new Date(), - }); - - context.logger.info('Operation completed successfully'); - } catch (error) { - context.logger.error('Operation failed', { error }); - throw error; - } - } - - /** - * Example of batch operation with isolated connection pool - */ - async performBatchOperation(items: any[]): Promise { - // Create a scoped container for this batch operation - const scopedContainer = this.container.createScope(); - - const context = new OperationContext('example-handler', 'batch-operation', scopedContainer, { - itemCount: items.length, - }); - - try { - context.logger.info('Starting batch operation', { itemCount: items.length }); - - // Get services once for the batch - const mongodb = context.resolve('mongodb'); - const cache = context.resolve('cache'); - - // Process items in parallel - const promises = items.map(async (item, index) => { - const itemContext = new OperationContext( - 'example-handler', - `batch-item-${index}`, - scopedContainer, - { item } - ); - - try { - await mongodb.collection('batch').insertOne(item); - await cache.set(`batch:${item.id}`, item); - } catch (error) { - itemContext.logger.error('Batch item failed', { error, itemIndex: index }); - throw error; - } - }); - - await Promise.all(promises); - context.logger.info('Batch operation completed'); - } finally { - // Clean up scoped resources - await scopedContainer.dispose(); - } - } -} - -/** - * Example of how to use in a job handler - */ -export async function createExampleJobHandler(container: ServiceContainer) { - return async (job: any) => { - const handler = new ExampleHandler(container); - - if (job.data.type === 'batch') { - await handler.performBatchOperation(job.data.items); - } else { - await handler.performOperation(job.data); - } - }; -} diff --git a/apps/data-ingestion/src/handlers/webshare/webshare.handler.ts b/apps/data-ingestion/src/handlers/webshare/webshare.handler.ts index 20ae89f..a933aeb 100644 --- a/apps/data-ingestion/src/handlers/webshare/webshare.handler.ts +++ b/apps/data-ingestion/src/handlers/webshare/webshare.handler.ts @@ -43,14 +43,14 @@ export class WebShareHandler extends BaseHandler { workingCount: proxies.filter(p => p.isWorking !== false).length, }); - // Cache proxy stats for monitoring - await this.cache.set('webshare-proxy-count', proxies.length, 3600); - await this.cache.set( - 'webshare-working-count', + // Cache proxy stats for monitoring using handler's cache methods + await this.cacheSet('proxy-count', proxies.length, 3600); + await this.cacheSet( + 'working-count', proxies.filter(p => p.isWorking !== false).length, 3600 ); - await this.cache.set('last-webshare-fetch', new Date().toISOString(), 1800); + await this.cacheSet('last-fetch', new Date().toISOString(), 1800); return { success: true, diff --git a/libs/core/di/src/awilix-container.ts b/libs/core/di/src/awilix-container.ts index 58b3d4a..4c30f68 100644 --- a/libs/core/di/src/awilix-container.ts +++ b/libs/core/di/src/awilix-container.ts @@ -61,6 +61,11 @@ const appConfigSchema = z.object({ timeout: z.number().optional(), }) .optional(), + queue: z + .object({ + enabled: z.boolean().optional(), + }) + .optional(), }); export type AppConfig = z.infer; @@ -114,13 +119,14 @@ export function createServiceContainer(rawConfig: unknown): AwilixContainer getLogger('app')).singleton(), }; - // Conditionally register cache/dragonfly + // Conditionally register cache/dragonfly instances if (config.redis?.enabled !== false) { + // Main cache instance registrations.cache = asFunction(({ redisConfig, logger }) => createCache({ redisConfig, logger, - keyPrefix: 'cache:', + keyPrefix: '', // No prefix at this level, namespaces will handle it ttl: 3600, enableMetrics: true, }) @@ -129,13 +135,15 @@ export function createServiceContainer(rawConfig: unknown): AwilixContainer { if (!cache) { logger.warn('Cache is disabled, ProxyManager will have limited functionality'); return null; } - const manager = new ProxyManager(cache, config.proxy || {}, logger); + const { NamespacedCache } = require('@stock-bot/cache'); + const proxyCache = new NamespacedCache(cache, 'proxy'); + const manager = new ProxyManager(proxyCache, config.proxy || {}, logger); return manager; }).singleton(); @@ -188,22 +196,26 @@ export function createServiceContainer(rawConfig: unknown): AwilixContainer { - const { QueueManager } = require('@stock-bot/queue'); - - return new QueueManager({ - redis: { - host: redisConfig.host, - port: redisConfig.port, - db: redisConfig.db, - password: redisConfig.password, - username: redisConfig.username, - }, - enableScheduledJobs: true, - delayWorkerStart: true, // We'll start workers manually - }); - }).singleton(); + // Queue manager - conditionally registered with logger injection + if (config.redis?.enabled !== false && config.queue?.enabled !== false) { + registrations.queueManager = asFunction(({ redisConfig, logger }) => { + const { QueueManager } = require('@stock-bot/queue'); + + return new QueueManager({ + redis: { + host: redisConfig.host, + port: redisConfig.port, + db: redisConfig.db, + password: redisConfig.password, + username: redisConfig.username, + }, + enableScheduledJobs: true, + delayWorkerStart: true, // We'll start workers manually + }, logger); // Pass logger to QueueManager + }).singleton(); + } else { + registrations.queueManager = asValue(null); + } // Browser automation registrations.browser = asFunction(({ config, logger }) => { @@ -377,6 +389,9 @@ export function createServiceContainerFromConfig( headless: true, timeout: 30000, } : undefined, + queue: { + enabled: enableQueue && enableCache, // Queue depends on Redis/cache + }, }; return createServiceContainer(containerConfig); diff --git a/libs/core/handlers/src/base/BaseHandler.ts b/libs/core/handlers/src/base/BaseHandler.ts index fe96461..09d8a58 100644 --- a/libs/core/handlers/src/base/BaseHandler.ts +++ b/libs/core/handlers/src/base/BaseHandler.ts @@ -6,6 +6,7 @@ import { type HandlerConfigWithSchedule, } from '@stock-bot/types'; import { fetch } from '@stock-bot/utils'; +import { createNamespacedCache } from '@stock-bot/cache'; import type { IServiceContainer } from '../types/service-container'; import type { ExecutionContext, IHandler } from '../types/types'; @@ -126,6 +127,14 @@ export abstract class BaseHandler implements IHandler { return this.mongodb.collection(name); } + /** + * Create a sub-namespaced cache for specific operations + * Example: handler 'webshare' creates namespace 'webshare:api' -> keys will be 'cache:webshare:api:*' + */ + protected createNamespacedCache(subNamespace: string) { + return createNamespacedCache(this.cache, `${this.handlerName}:${subNamespace}`); + } + /** * Set cache with handler-prefixed key */ @@ -133,7 +142,7 @@ export abstract class BaseHandler implements IHandler { if (!this.cache) { return; } - return this.cache.set(`${this.handlerName}:${key}`, value, ttl); + return this.cache.set(`cache:${this.handlerName}:${key}`, value, ttl); } /** @@ -143,7 +152,7 @@ export abstract class BaseHandler implements IHandler { if (!this.cache) { return null; } - return this.cache.get(`${this.handlerName}:${key}`); + return this.cache.get(`cache:${this.handlerName}:${key}`); } /** @@ -153,7 +162,7 @@ export abstract class BaseHandler implements IHandler { if (!this.cache) { return; } - return this.cache.del(`${this.handlerName}:${key}`); + return this.cache.del(`cache:${this.handlerName}:${key}`); } /** diff --git a/libs/data/cache/src/cache-factory.ts b/libs/data/cache/src/cache-factory.ts new file mode 100644 index 0000000..f778c0e --- /dev/null +++ b/libs/data/cache/src/cache-factory.ts @@ -0,0 +1,23 @@ +import { NamespacedCache } from './namespaced-cache'; +import type { CacheProvider } from './types'; + +/** + * Factory function to create namespaced caches + * Provides a clean API for services to get their own namespaced cache + */ +export function createNamespacedCache( + cache: CacheProvider | null | undefined, + namespace: string +): CacheProvider | null { + if (!cache) { + return null; + } + return new NamespacedCache(cache, namespace); +} + +/** + * Type guard to check if cache is available + */ +export function isCacheAvailable(cache: any): cache is CacheProvider { + return cache !== null && cache !== undefined && typeof cache.get === 'function'; +} \ No newline at end of file diff --git a/libs/data/cache/src/index.ts b/libs/data/cache/src/index.ts index 56f476f..4a4e4e3 100644 --- a/libs/data/cache/src/index.ts +++ b/libs/data/cache/src/index.ts @@ -51,3 +51,5 @@ export type { export { RedisConnectionManager } from './connection-manager'; export { CacheKeyGenerator } from './key-generator'; export { RedisCache } from './redis-cache'; +export { NamespacedCache } from './namespaced-cache'; +export { createNamespacedCache, isCacheAvailable } from './cache-factory'; diff --git a/libs/data/cache/src/namespaced-cache.ts b/libs/data/cache/src/namespaced-cache.ts new file mode 100644 index 0000000..2ecf832 --- /dev/null +++ b/libs/data/cache/src/namespaced-cache.ts @@ -0,0 +1,89 @@ +import type { CacheProvider } from './types'; + +/** + * A cache wrapper that automatically prefixes all keys with a namespace + * Used to provide isolated cache spaces for different services + */ +export class NamespacedCache implements CacheProvider { + private readonly prefix: string; + + constructor( + private readonly cache: CacheProvider, + private readonly namespace: string + ) { + this.prefix = `cache:${namespace}:`; + } + + async get(key: string): Promise { + return this.cache.get(`${this.prefix}${key}`); + } + + async set( + key: string, + value: T, + options?: + | number + | { + ttl?: number; + preserveTTL?: boolean; + onlyIfExists?: boolean; + onlyIfNotExists?: boolean; + getOldValue?: boolean; + } + ): Promise { + return this.cache.set(`${this.prefix}${key}`, value, options); + } + + async del(key: string): Promise { + return this.cache.del(`${this.prefix}${key}`); + } + + async exists(key: string): Promise { + return this.cache.exists(`${this.prefix}${key}`); + } + + async keys(pattern: string = '*'): Promise { + const fullPattern = `${this.prefix}${pattern}`; + const keys = await this.cache.keys(fullPattern); + // Remove the prefix from returned keys for cleaner API + return keys.map(k => k.substring(this.prefix.length)); + } + + async clear(): Promise { + // Clear only keys with this namespace prefix + const keys = await this.cache.keys(`${this.prefix}*`); + if (keys.length > 0) { + await Promise.all(keys.map(key => this.cache.del(key))); + } + } + + + getStats() { + return this.cache.getStats(); + } + + async health(): Promise { + return this.cache.health(); + } + + isReady(): boolean { + return this.cache.isReady(); + } + + async waitForReady(timeout?: number): Promise { + return this.cache.waitForReady(timeout); + } + + async close(): Promise { + // Namespaced cache doesn't own the connection, so we don't close it + // The underlying cache instance should be closed by its owner + } + + getNamespace(): string { + return this.namespace; + } + + getFullPrefix(): string { + return this.prefix; + } +} \ No newline at end of file diff --git a/libs/services/proxy/src/proxy-manager.ts b/libs/services/proxy/src/proxy-manager.ts index f1019a2..39c262b 100644 --- a/libs/services/proxy/src/proxy-manager.ts +++ b/libs/services/proxy/src/proxy-manager.ts @@ -176,8 +176,8 @@ export class ProxyManager { this.proxies = proxies; this.lastUpdate = new Date(); - // Store to cache - await this.cache.set('active-proxies', proxies); + // Store to cache (keys will be prefixed with cache:proxy: automatically) + await this.cache.set('active', proxies); await this.cache.set('last-update', this.lastUpdate.toISOString()); const workingCount = proxies.filter(p => p.isWorking !== false).length; @@ -234,7 +234,7 @@ export class ProxyManager { this.proxies = []; this.lastUpdate = null; - await this.cache.del('active-proxies'); + await this.cache.del('active'); await this.cache.del('last-update'); this.logger.info('Cleared all proxies'); @@ -252,7 +252,7 @@ export class ProxyManager { */ private async loadFromCache(): Promise { try { - const cachedProxies = await this.cache.get('active-proxies'); + const cachedProxies = await this.cache.get('active'); const lastUpdateStr = await this.cache.get('last-update'); if (cachedProxies && Array.isArray(cachedProxies)) { diff --git a/libs/services/queue/src/batch-processor.ts b/libs/services/queue/src/batch-processor.ts index fbf7f8e..e1a2e5f 100644 --- a/libs/services/queue/src/batch-processor.ts +++ b/libs/services/queue/src/batch-processor.ts @@ -1,9 +1,6 @@ -import { getLogger } from '@stock-bot/logger'; import { QueueManager } from './queue-manager'; import type { BatchJobData, BatchResult, JobData, ProcessOptions } from './types'; -const logger = getLogger('batch-processor'); - /** * Main function - processes items either directly or in batches * Each item becomes payload: item (no processing needed) @@ -14,7 +11,12 @@ export async function processItems( options: ProcessOptions, queueManager: QueueManager ): Promise { - queueManager.getQueue(queueName); + const queue = queueManager.getQueue(queueName); + const logger = queue.createChildLogger('batch-processor', { + queueName, + totalItems: items.length, + mode: options.useBatching ? 'batch' : 'direct', + }); const startTime = Date.now(); if (items.length === 0) { @@ -61,7 +63,11 @@ async function processDirect( options: ProcessOptions, queueManager: QueueManager ): Promise> { - queueManager.getQueue(queueName); + const queue = queueManager.getQueue(queueName); + const logger = queue.createChildLogger('batch-direct', { + queueName, + totalItems: items.length, + }); const totalDelayMs = options.totalDelayHours * 60 * 60 * 1000; // Convert hours to milliseconds const delayPerItem = totalDelayMs / items.length; @@ -105,7 +111,11 @@ async function processBatched( options: ProcessOptions, queueManager: QueueManager ): Promise> { - queueManager.getQueue(queueName); + const queue = queueManager.getQueue(queueName); + const logger = queue.createChildLogger('batch-batched', { + queueName, + totalItems: items.length, + }); const batchSize = options.batchSize || 100; const batches = createBatches(items, batchSize); const totalDelayMs = options.totalDelayHours * 60 * 60 * 1000; // Convert hours to milliseconds @@ -162,10 +172,15 @@ async function processBatched( * Process a batch job - loads items and creates individual jobs */ export async function processBatchJob(jobData: BatchJobData, queueName: string, queueManager: QueueManager): Promise { - queueManager.getQueue(queueName); + const queue = queueManager.getQueue(queueName); + const logger = queue.createChildLogger('batch-job', { + queueName, + batchIndex: jobData.batchIndex, + payloadKey: jobData.payloadKey, + }); const { payloadKey, batchIndex, totalBatches, itemCount, totalDelayHours } = jobData; - logger.trace('Processing batch job', { + logger.debug('Processing batch job', { batchIndex, totalBatches, itemCount, @@ -186,7 +201,7 @@ export async function processBatchJob(jobData: BatchJobData, queueName: string, const delayPerBatch = totalDelayMs / totalBatches; // Time allocated for each batch const delayPerItem = delayPerBatch / items.length; // Distribute items evenly within batch window - logger.trace('Calculating job delays', { + logger.debug('Calculating job delays', { batchIndex, delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes`, delayPerItem: `${(delayPerItem / 1000).toFixed(2)} seconds`, @@ -301,6 +316,10 @@ async function addJobsInChunks( chunkSize = 100 ): Promise { const queue = queueManager.getQueue(queueName); + const logger = queue.createChildLogger('batch-chunk', { + queueName, + totalJobs: jobs.length, + }); const allCreatedJobs = []; for (let i = 0; i < jobs.length; i += chunkSize) { diff --git a/libs/services/queue/src/dlq-handler.ts b/libs/services/queue/src/dlq-handler.ts index 1e8abc1..cb4061c 100644 --- a/libs/services/queue/src/dlq-handler.ts +++ b/libs/services/queue/src/dlq-handler.ts @@ -1,20 +1,20 @@ import { Queue, type Job } from 'bullmq'; -import { getLogger } from '@stock-bot/logger'; import type { DLQConfig, RedisConfig } from './types'; import { getRedisConnection } from './utils'; -const logger = getLogger('dlq-handler'); - export class DeadLetterQueueHandler { private dlq: Queue; private config: Required; private failureCount = new Map(); + private readonly logger: any; constructor( private mainQueue: Queue, connection: RedisConfig, - config: DLQConfig = {} + config: DLQConfig = {}, + logger?: any ) { + this.logger = logger || console; this.config = { maxRetries: config.maxRetries ?? 3, retryDelay: config.retryDelay ?? 60000, // 1 minute @@ -35,7 +35,7 @@ export class DeadLetterQueueHandler { const currentFailures = (this.failureCount.get(jobKey) || 0) + 1; this.failureCount.set(jobKey, currentFailures); - logger.warn('Job failed', { + this.logger.warn('Job failed', { jobId: job.id, jobName: job.name, attempt: job.attemptsMade, @@ -80,7 +80,7 @@ export class DeadLetterQueueHandler { removeOnFail: 50, }); - logger.error('Job moved to DLQ', { + this.logger.error('Job moved to DLQ', { jobId: job.id, jobName: job.name, error: error.message, @@ -89,7 +89,7 @@ export class DeadLetterQueueHandler { // Check if we need to alert await this.checkAlertThreshold(); } catch (dlqError) { - logger.error('Failed to move job to DLQ', { + this.logger.error('Failed to move job to DLQ', { jobId: job.id, error: dlqError, }); @@ -118,12 +118,12 @@ export class DeadLetterQueueHandler { await dlqJob.remove(); retriedCount++; - logger.info('Job retried from DLQ', { + this.logger.info('Job retried from DLQ', { originalJobId: originalJob.id, jobName: originalJob.name, }); } catch (error) { - logger.error('Failed to retry DLQ job', { + this.logger.error('Failed to retry DLQ job', { dlqJobId: dlqJob.id, error, }); @@ -190,7 +190,7 @@ export class DeadLetterQueueHandler { } } - logger.info('DLQ cleanup completed', { + this.logger.info('DLQ cleanup completed', { removedCount, cleanupAge: `${this.config.cleanupAge} hours`, }); @@ -205,7 +205,7 @@ export class DeadLetterQueueHandler { const stats = await this.getStats(); if (stats.total >= this.config.alertThreshold) { - logger.error('DLQ alert threshold exceeded', { + this.logger.error('DLQ alert threshold exceeded', { threshold: this.config.alertThreshold, currentCount: stats.total, byJobName: stats.byJobName, diff --git a/libs/services/queue/src/queue-manager.ts b/libs/services/queue/src/queue-manager.ts index b2626e4..410b474 100644 --- a/libs/services/queue/src/queue-manager.ts +++ b/libs/services/queue/src/queue-manager.ts @@ -1,6 +1,5 @@ import { createCache } from '@stock-bot/cache'; import type { CacheProvider } from '@stock-bot/cache'; -import { getLogger } from '@stock-bot/logger'; import { Queue, type QueueWorkerConfig } from './queue'; import { QueueRateLimiter } from './rate-limiter'; import type { @@ -12,8 +11,6 @@ import type { } from './types'; import { getRedisConnection } from './utils'; -const logger = getLogger('queue-manager'); - /** * QueueManager provides unified queue and cache management * Main entry point for all queue operations with getQueue() method @@ -27,14 +24,16 @@ export class QueueManager { private isShuttingDown = false; private shutdownPromise: Promise | null = null; private config: QueueManagerConfig; + private readonly logger: any; - constructor(config: QueueManagerConfig) { + constructor(config: QueueManagerConfig, logger?: any) { this.config = config; + this.logger = logger || console; this.redisConnection = getRedisConnection(config.redis); // Initialize rate limiter if rules are provided if (config.rateLimitRules && config.rateLimitRules.length > 0) { - this.rateLimiter = new QueueRateLimiter(this.redisConnection); + this.rateLimiter = new QueueRateLimiter(this.redisConnection, this.logger); config.rateLimitRules.forEach(rule => { if (this.rateLimiter) { this.rateLimiter.addRule(rule); @@ -42,7 +41,7 @@ export class QueueManager { }); } - logger.info('QueueManager initialized', { + this.logger.info('QueueManager initialized', { redis: `${config.redis.host}:${config.redis.port}`, }); } @@ -53,7 +52,7 @@ export class QueueManager { * @throws Error if not initialized - use initialize() first */ static getInstance(): QueueManager { - logger.warn( + console.warn( 'QueueManager.getInstance() is deprecated. Please use dependency injection instead.' ); if (!QueueManager.instance) { @@ -68,11 +67,11 @@ export class QueueManager { * Must be called before getInstance() */ static initialize(config: QueueManagerConfig): QueueManager { - logger.warn( + console.warn( 'QueueManager.initialize() is deprecated. Please use dependency injection instead.' ); if (QueueManager.instance) { - logger.warn('QueueManager already initialized, returning existing instance'); + console.warn('QueueManager already initialized, returning existing instance'); return QueueManager.instance; } QueueManager.instance = new QueueManager(config); @@ -85,7 +84,7 @@ export class QueueManager { * Convenience method that combines initialize and getInstance */ static getOrInitialize(config?: QueueManagerConfig): QueueManager { - logger.warn( + console.warn( 'QueueManager.getOrInitialize() is deprecated. Please use dependency injection instead.' ); if (QueueManager.instance) { @@ -152,7 +151,8 @@ export class QueueManager { queueName, this.config.redis, mergedOptions.defaultJobOptions || {}, - queueConfig + queueConfig, + this.logger ); // Store the queue @@ -172,7 +172,7 @@ export class QueueManager { }); } - logger.info('Queue created with batch cache', { + this.logger.info('Queue created with batch cache', { queueName, workers: mergedOptions.workers || 0, concurrency: mergedOptions.concurrency || 1, @@ -207,7 +207,7 @@ export class QueueManager { enableMetrics: true, }); this.caches.set(queueName, cacheProvider); - logger.trace('Cache created for queue', { queueName }); + this.logger.trace('Cache created for queue', { queueName }); } const cache = this.caches.get(queueName); if (!cache) { @@ -222,7 +222,7 @@ export class QueueManager { async initializeCache(queueName: string): Promise { const cache = this.getCache(queueName); await cache.waitForReady(10000); - logger.info('Cache initialized for queue', { queueName }); + this.logger.info('Cache initialized for queue', { queueName }); } /** @@ -232,7 +232,7 @@ export class QueueManager { private initializeBatchCacheSync(queueName: string): void { // Just create the cache - it will connect automatically when first used this.getCache(queueName); - logger.trace('Batch cache initialized synchronously for queue', { queueName }); + this.logger.trace('Batch cache initialized synchronously for queue', { queueName }); } /** @@ -321,7 +321,7 @@ export class QueueManager { async pauseAll(): Promise { const pausePromises = Array.from(this.queues.values()).map(queue => queue.pause()); await Promise.all(pausePromises); - logger.info('All queues paused'); + this.logger.info('All queues paused'); } /** @@ -330,7 +330,7 @@ export class QueueManager { async resumeAll(): Promise { const resumePromises = Array.from(this.queues.values()).map(queue => queue.resume()); await Promise.all(resumePromises); - logger.info('All queues resumed'); + this.logger.info('All queues resumed'); } /** @@ -365,7 +365,7 @@ export class QueueManager { async drainAll(delayed = false): Promise { const drainPromises = Array.from(this.queues.values()).map(queue => queue.drain(delayed)); await Promise.all(drainPromises); - logger.info('All queues drained', { delayed }); + this.logger.info('All queues drained', { delayed }); } /** @@ -380,7 +380,7 @@ export class QueueManager { queue.clean(grace, limit, type) ); await Promise.all(cleanPromises); - logger.info('All queues cleaned', { type, grace, limit }); + this.logger.info('All queues cleaned', { type, grace, limit }); } /** @@ -397,7 +397,7 @@ export class QueueManager { } this.isShuttingDown = true; - logger.info('Shutting down QueueManager...'); + this.logger.info('Shutting down QueueManager...'); // Create shutdown promise this.shutdownPromise = this.performShutdown(); @@ -420,7 +420,7 @@ export class QueueManager { // await Promise.race([closePromise, timeoutPromise]); } catch (error) { - logger.warn('Error closing queue', { error: (error as Error).message }); + this.logger.warn('Error closing queue', { error: (error as Error).message }); } }); @@ -432,7 +432,7 @@ export class QueueManager { // Clear cache before shutdown await cache.clear(); } catch (error) { - logger.warn('Error clearing cache', { error: (error as Error).message }); + this.logger.warn('Error clearing cache', { error: (error as Error).message }); } }); @@ -442,9 +442,9 @@ export class QueueManager { this.queues.clear(); this.caches.clear(); - logger.info('QueueManager shutdown complete'); + this.logger.info('QueueManager shutdown complete'); } catch (error) { - logger.error('Error during shutdown', { error: (error as Error).message }); + this.logger.error('Error during shutdown', { error: (error as Error).message }); throw error; } finally { // Reset shutdown state @@ -458,7 +458,7 @@ export class QueueManager { */ startAllWorkers(): void { if (!this.config.delayWorkerStart) { - logger.info( + this.logger.info( 'startAllWorkers() called but workers already started automatically (delayWorkerStart is false)' ); return; @@ -475,7 +475,7 @@ export class QueueManager { } } - logger.info('All workers started', { + this.logger.info('All workers started', { totalQueues: this.queues.size, queuesWithWorkers: workersStarted, delayWorkerStart: this.config.delayWorkerStart, diff --git a/libs/services/queue/src/queue.ts b/libs/services/queue/src/queue.ts index 2b1d884..93d3926 100644 --- a/libs/services/queue/src/queue.ts +++ b/libs/services/queue/src/queue.ts @@ -1,11 +1,8 @@ import { Queue as BullQueue, QueueEvents, Worker, type Job } from 'bullmq'; -import { getLogger } from '@stock-bot/logger'; import { handlerRegistry } from '@stock-bot/types'; import type { JobData, JobOptions, QueueStats, RedisConfig } from './types'; import { getRedisConnection } from './utils'; -const logger = getLogger('queue'); - export interface QueueWorkerConfig { workers?: number; concurrency?: number; @@ -22,15 +19,18 @@ export class Queue { private queueEvents?: QueueEvents; private queueName: string; private redisConfig: RedisConfig; + private readonly logger: any; constructor( queueName: string, redisConfig: RedisConfig, defaultJobOptions: JobOptions = {}, - config: QueueWorkerConfig = {} + config: QueueWorkerConfig = {}, + logger?: any ) { this.queueName = queueName; this.redisConfig = redisConfig; + this.logger = logger || console; const connection = getRedisConnection(redisConfig); @@ -59,7 +59,7 @@ export class Queue { this.startWorkers(config.workers, config.concurrency || 1); } - logger.trace('Queue created', { + this.logger.trace('Queue created', { queueName, workers: config.workers || 0, concurrency: config.concurrency || 1, @@ -77,7 +77,7 @@ export class Queue { * Add a single job to the queue */ async add(name: string, data: JobData, options: JobOptions = {}): Promise { - logger.trace('Adding job', { queueName: this.queueName, jobName: name }); + this.logger.trace('Adding job', { queueName: this.queueName, jobName: name }); return await this.bullQueue.add(name, data, options); } @@ -85,7 +85,7 @@ export class Queue { * Add multiple jobs to the queue in bulk */ async addBulk(jobs: Array<{ name: string; data: JobData; opts?: JobOptions }>): Promise { - logger.trace('Adding bulk jobs', { + this.logger.trace('Adding bulk jobs', { queueName: this.queueName, jobCount: jobs.length, }); @@ -111,7 +111,7 @@ export class Queue { }, }; - logger.info('Adding scheduled job', { + this.logger.info('Adding scheduled job', { queueName: this.queueName, jobName: name, cronPattern, @@ -170,7 +170,7 @@ export class Queue { */ async pause(): Promise { await this.bullQueue.pause(); - logger.info('Queue paused', { queueName: this.queueName }); + this.logger.info('Queue paused', { queueName: this.queueName }); } /** @@ -178,7 +178,7 @@ export class Queue { */ async resume(): Promise { await this.bullQueue.resume(); - logger.info('Queue resumed', { queueName: this.queueName }); + this.logger.info('Queue resumed', { queueName: this.queueName }); } /** @@ -186,7 +186,7 @@ export class Queue { */ async drain(delayed = false): Promise { await this.bullQueue.drain(delayed); - logger.info('Queue drained', { queueName: this.queueName, delayed }); + this.logger.info('Queue drained', { queueName: this.queueName, delayed }); } /** @@ -198,7 +198,7 @@ export class Queue { type: 'completed' | 'failed' = 'completed' ): Promise { await this.bullQueue.clean(grace, limit, type); - logger.debug('Queue cleaned', { queueName: this.queueName, type, grace, limit }); + this.logger.debug('Queue cleaned', { queueName: this.queueName, type, grace, limit }); } /** @@ -218,12 +218,12 @@ export class Queue { try { // Close the queue itself await this.bullQueue.close(); - logger.info('Queue closed', { queueName: this.queueName }); + this.logger.info('Queue closed', { queueName: this.queueName }); // Close queue events if (this.queueEvents) { await this.queueEvents.close(); - logger.debug('Queue events closed', { queueName: this.queueName }); + this.logger.debug('Queue events closed', { queueName: this.queueName }); } // Close workers first @@ -234,14 +234,26 @@ export class Queue { }) ); this.workers = []; - logger.debug('Workers closed', { queueName: this.queueName }); + this.logger.debug('Workers closed', { queueName: this.queueName }); } } catch (error) { - logger.error('Error closing queue', { queueName: this.queueName, error }); + this.logger.error('Error closing queue', { queueName: this.queueName, error }); throw error; } } + /** + * Create a child logger with additional context + * Useful for batch processing and other queue operations + */ + createChildLogger(name: string, context?: any) { + if (this.logger && typeof this.logger.child === 'function') { + return this.logger.child(name, context); + } + // Fallback to main logger if child not supported (e.g., console) + return this.logger; + } + /** * Start workers for this queue */ @@ -258,7 +270,7 @@ export class Queue { // Setup worker event handlers worker.on('completed', job => { - logger.trace('Job completed', { + this.logger.trace('Job completed', { queueName: this.queueName, jobId: job.id, handler: job.data?.handler, @@ -267,7 +279,7 @@ export class Queue { }); worker.on('failed', (job, err) => { - logger.error('Job failed', { + this.logger.error('Job failed', { queueName: this.queueName, jobId: job?.id, handler: job?.data?.handler, @@ -277,7 +289,7 @@ export class Queue { }); worker.on('error', error => { - logger.error('Worker error', { + this.logger.error('Worker error', { queueName: this.queueName, workerId: i, error: error.message, @@ -287,7 +299,7 @@ export class Queue { this.workers.push(worker); } - logger.info('Workers started', { + this.logger.info('Workers started', { queueName: this.queueName, workerCount, concurrency, @@ -300,7 +312,7 @@ export class Queue { private async processJob(job: Job): Promise { const { handler, operation, payload }: JobData = job.data; - logger.trace('Processing job', { + this.logger.trace('Processing job', { id: job.id, handler, operation, @@ -317,7 +329,7 @@ export class Queue { const result = await jobHandler(payload); - logger.trace('Job completed successfully', { + this.logger.trace('Job completed successfully', { id: job.id, handler, operation, @@ -326,7 +338,7 @@ export class Queue { return result; } catch (error) { - logger.error('Job processing failed', { + this.logger.error('Job processing failed', { id: job.id, handler, operation, @@ -342,7 +354,7 @@ export class Queue { */ startWorkersManually(workerCount: number, concurrency: number = 1): void { if (this.workers.length > 0) { - logger.warn('Workers already started for queue', { queueName: this.queueName }); + this.logger.warn('Workers already started for queue', { queueName: this.queueName }); return; } diff --git a/libs/services/queue/src/rate-limiter.ts b/libs/services/queue/src/rate-limiter.ts index ecb9d52..fa44840 100644 --- a/libs/services/queue/src/rate-limiter.ts +++ b/libs/services/queue/src/rate-limiter.ts @@ -1,9 +1,6 @@ import { RateLimiterRedis, RateLimiterRes } from 'rate-limiter-flexible'; -import { getLogger } from '@stock-bot/logger'; import type { RateLimitConfig as BaseRateLimitConfig, RateLimitRule } from './types'; -const logger = getLogger('rate-limiter'); - // Extend the base config to add rate-limiter specific fields export interface RateLimitConfig extends BaseRateLimitConfig { keyPrefix?: string; @@ -12,8 +9,14 @@ export interface RateLimitConfig extends BaseRateLimitConfig { export class QueueRateLimiter { private limiters = new Map(); private rules: RateLimitRule[] = []; + private readonly logger: any; - constructor(private redisClient: ReturnType) {} + constructor( + private redisClient: ReturnType, + logger?: any + ) { + this.logger = logger || console; + } /** * Add a rate limit rule @@ -32,7 +35,7 @@ export class QueueRateLimiter { this.limiters.set(key, limiter); - logger.info('Rate limit rule added', { + this.logger.info('Rate limit rule added', { level: rule.level, queueName: rule.queueName, handler: rule.handler, @@ -72,7 +75,7 @@ export class QueueRateLimiter { const limiter = this.limiters.get(key); if (!limiter) { - logger.warn('Rate limiter not found for rule', { key, rule: applicableRule }); + this.logger.warn('Rate limiter not found for rule', { key, rule: applicableRule }); return { allowed: true }; } @@ -87,7 +90,7 @@ export class QueueRateLimiter { appliedRule: applicableRule, }; } catch (error) { - logger.error('Rate limit check failed', { queueName, handler, operation, error }); + this.logger.error('Rate limit check failed', { queueName, handler, operation, error }); // On error, allow the request to proceed return { allowed: true }; } @@ -148,7 +151,7 @@ export class QueueRateLimiter { }; } catch (rejRes) { if (rejRes instanceof RateLimiterRes) { - logger.warn('Rate limit exceeded', { + this.logger.warn('Rate limit exceeded', { key, retryAfter: rejRes.msBeforeNext, }); @@ -260,7 +263,7 @@ export class QueueRateLimiter { limit, }; } catch (error) { - logger.error('Failed to get rate limit status', { queueName, handler, operation, error }); + this.logger.error('Failed to get rate limit status', { queueName, handler, operation, error }); return { queueName, handler, @@ -288,10 +291,10 @@ export class QueueRateLimiter { } } else { // Reset broader scope - this is more complex with the new hierarchy - logger.warn('Broad reset not implemented yet', { queueName, handler, operation }); + this.logger.warn('Broad reset not implemented yet', { queueName, handler, operation }); } - logger.info('Rate limits reset', { queueName, handler, operation }); + this.logger.info('Rate limits reset', { queueName, handler, operation }); } /** @@ -318,7 +321,7 @@ export class QueueRateLimiter { this.rules.splice(ruleIndex, 1); this.limiters.delete(key); - logger.info('Rate limit rule removed', { level, queueName, handler, operation }); + this.logger.info('Rate limit rule removed', { level, queueName, handler, operation }); return true; }