fixed cache keys

This commit is contained in:
Boki 2025-06-22 20:34:35 -04:00
parent db3aa9c330
commit 19dfda2392
13 changed files with 286 additions and 221 deletions

View file

@ -1,9 +1,6 @@
import { getLogger } from '@stock-bot/logger';
import { QueueManager } from './queue-manager';
import type { BatchJobData, BatchResult, JobData, ProcessOptions } from './types';
const logger = getLogger('batch-processor');
/**
* Main function - processes items either directly or in batches
* Each item becomes payload: item (no processing needed)
@ -14,7 +11,12 @@ export async function processItems<T>(
options: ProcessOptions,
queueManager: QueueManager
): Promise<BatchResult> {
queueManager.getQueue(queueName);
const queue = queueManager.getQueue(queueName);
const logger = queue.createChildLogger('batch-processor', {
queueName,
totalItems: items.length,
mode: options.useBatching ? 'batch' : 'direct',
});
const startTime = Date.now();
if (items.length === 0) {
@ -61,7 +63,11 @@ async function processDirect<T>(
options: ProcessOptions,
queueManager: QueueManager
): Promise<Omit<BatchResult, 'duration'>> {
queueManager.getQueue(queueName);
const queue = queueManager.getQueue(queueName);
const logger = queue.createChildLogger('batch-direct', {
queueName,
totalItems: items.length,
});
const totalDelayMs = options.totalDelayHours * 60 * 60 * 1000; // Convert hours to milliseconds
const delayPerItem = totalDelayMs / items.length;
@ -105,7 +111,11 @@ async function processBatched<T>(
options: ProcessOptions,
queueManager: QueueManager
): Promise<Omit<BatchResult, 'duration'>> {
queueManager.getQueue(queueName);
const queue = queueManager.getQueue(queueName);
const logger = queue.createChildLogger('batch-batched', {
queueName,
totalItems: items.length,
});
const batchSize = options.batchSize || 100;
const batches = createBatches(items, batchSize);
const totalDelayMs = options.totalDelayHours * 60 * 60 * 1000; // Convert hours to milliseconds
@ -162,10 +172,15 @@ async function processBatched<T>(
* Process a batch job - loads items and creates individual jobs
*/
export async function processBatchJob(jobData: BatchJobData, queueName: string, queueManager: QueueManager): Promise<unknown> {
queueManager.getQueue(queueName);
const queue = queueManager.getQueue(queueName);
const logger = queue.createChildLogger('batch-job', {
queueName,
batchIndex: jobData.batchIndex,
payloadKey: jobData.payloadKey,
});
const { payloadKey, batchIndex, totalBatches, itemCount, totalDelayHours } = jobData;
logger.trace('Processing batch job', {
logger.debug('Processing batch job', {
batchIndex,
totalBatches,
itemCount,
@ -186,7 +201,7 @@ export async function processBatchJob(jobData: BatchJobData, queueName: string,
const delayPerBatch = totalDelayMs / totalBatches; // Time allocated for each batch
const delayPerItem = delayPerBatch / items.length; // Distribute items evenly within batch window
logger.trace('Calculating job delays', {
logger.debug('Calculating job delays', {
batchIndex,
delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes`,
delayPerItem: `${(delayPerItem / 1000).toFixed(2)} seconds`,
@ -301,6 +316,10 @@ async function addJobsInChunks(
chunkSize = 100
): Promise<unknown[]> {
const queue = queueManager.getQueue(queueName);
const logger = queue.createChildLogger('batch-chunk', {
queueName,
totalJobs: jobs.length,
});
const allCreatedJobs = [];
for (let i = 0; i < jobs.length; i += chunkSize) {

View file

@ -1,20 +1,20 @@
import { Queue, type Job } from 'bullmq';
import { getLogger } from '@stock-bot/logger';
import type { DLQConfig, RedisConfig } from './types';
import { getRedisConnection } from './utils';
const logger = getLogger('dlq-handler');
export class DeadLetterQueueHandler {
private dlq: Queue;
private config: Required<DLQConfig>;
private failureCount = new Map<string, number>();
private readonly logger: any;
constructor(
private mainQueue: Queue,
connection: RedisConfig,
config: DLQConfig = {}
config: DLQConfig = {},
logger?: any
) {
this.logger = logger || console;
this.config = {
maxRetries: config.maxRetries ?? 3,
retryDelay: config.retryDelay ?? 60000, // 1 minute
@ -35,7 +35,7 @@ export class DeadLetterQueueHandler {
const currentFailures = (this.failureCount.get(jobKey) || 0) + 1;
this.failureCount.set(jobKey, currentFailures);
logger.warn('Job failed', {
this.logger.warn('Job failed', {
jobId: job.id,
jobName: job.name,
attempt: job.attemptsMade,
@ -80,7 +80,7 @@ export class DeadLetterQueueHandler {
removeOnFail: 50,
});
logger.error('Job moved to DLQ', {
this.logger.error('Job moved to DLQ', {
jobId: job.id,
jobName: job.name,
error: error.message,
@ -89,7 +89,7 @@ export class DeadLetterQueueHandler {
// Check if we need to alert
await this.checkAlertThreshold();
} catch (dlqError) {
logger.error('Failed to move job to DLQ', {
this.logger.error('Failed to move job to DLQ', {
jobId: job.id,
error: dlqError,
});
@ -118,12 +118,12 @@ export class DeadLetterQueueHandler {
await dlqJob.remove();
retriedCount++;
logger.info('Job retried from DLQ', {
this.logger.info('Job retried from DLQ', {
originalJobId: originalJob.id,
jobName: originalJob.name,
});
} catch (error) {
logger.error('Failed to retry DLQ job', {
this.logger.error('Failed to retry DLQ job', {
dlqJobId: dlqJob.id,
error,
});
@ -190,7 +190,7 @@ export class DeadLetterQueueHandler {
}
}
logger.info('DLQ cleanup completed', {
this.logger.info('DLQ cleanup completed', {
removedCount,
cleanupAge: `${this.config.cleanupAge} hours`,
});
@ -205,7 +205,7 @@ export class DeadLetterQueueHandler {
const stats = await this.getStats();
if (stats.total >= this.config.alertThreshold) {
logger.error('DLQ alert threshold exceeded', {
this.logger.error('DLQ alert threshold exceeded', {
threshold: this.config.alertThreshold,
currentCount: stats.total,
byJobName: stats.byJobName,

View file

@ -1,6 +1,5 @@
import { createCache } from '@stock-bot/cache';
import type { CacheProvider } from '@stock-bot/cache';
import { getLogger } from '@stock-bot/logger';
import { Queue, type QueueWorkerConfig } from './queue';
import { QueueRateLimiter } from './rate-limiter';
import type {
@ -12,8 +11,6 @@ import type {
} from './types';
import { getRedisConnection } from './utils';
const logger = getLogger('queue-manager');
/**
* QueueManager provides unified queue and cache management
* Main entry point for all queue operations with getQueue() method
@ -27,14 +24,16 @@ export class QueueManager {
private isShuttingDown = false;
private shutdownPromise: Promise<void> | null = null;
private config: QueueManagerConfig;
private readonly logger: any;
constructor(config: QueueManagerConfig) {
constructor(config: QueueManagerConfig, logger?: any) {
this.config = config;
this.logger = logger || console;
this.redisConnection = getRedisConnection(config.redis);
// Initialize rate limiter if rules are provided
if (config.rateLimitRules && config.rateLimitRules.length > 0) {
this.rateLimiter = new QueueRateLimiter(this.redisConnection);
this.rateLimiter = new QueueRateLimiter(this.redisConnection, this.logger);
config.rateLimitRules.forEach(rule => {
if (this.rateLimiter) {
this.rateLimiter.addRule(rule);
@ -42,7 +41,7 @@ export class QueueManager {
});
}
logger.info('QueueManager initialized', {
this.logger.info('QueueManager initialized', {
redis: `${config.redis.host}:${config.redis.port}`,
});
}
@ -53,7 +52,7 @@ export class QueueManager {
* @throws Error if not initialized - use initialize() first
*/
static getInstance(): QueueManager {
logger.warn(
console.warn(
'QueueManager.getInstance() is deprecated. Please use dependency injection instead.'
);
if (!QueueManager.instance) {
@ -68,11 +67,11 @@ export class QueueManager {
* Must be called before getInstance()
*/
static initialize(config: QueueManagerConfig): QueueManager {
logger.warn(
console.warn(
'QueueManager.initialize() is deprecated. Please use dependency injection instead.'
);
if (QueueManager.instance) {
logger.warn('QueueManager already initialized, returning existing instance');
console.warn('QueueManager already initialized, returning existing instance');
return QueueManager.instance;
}
QueueManager.instance = new QueueManager(config);
@ -85,7 +84,7 @@ export class QueueManager {
* Convenience method that combines initialize and getInstance
*/
static getOrInitialize(config?: QueueManagerConfig): QueueManager {
logger.warn(
console.warn(
'QueueManager.getOrInitialize() is deprecated. Please use dependency injection instead.'
);
if (QueueManager.instance) {
@ -152,7 +151,8 @@ export class QueueManager {
queueName,
this.config.redis,
mergedOptions.defaultJobOptions || {},
queueConfig
queueConfig,
this.logger
);
// Store the queue
@ -172,7 +172,7 @@ export class QueueManager {
});
}
logger.info('Queue created with batch cache', {
this.logger.info('Queue created with batch cache', {
queueName,
workers: mergedOptions.workers || 0,
concurrency: mergedOptions.concurrency || 1,
@ -207,7 +207,7 @@ export class QueueManager {
enableMetrics: true,
});
this.caches.set(queueName, cacheProvider);
logger.trace('Cache created for queue', { queueName });
this.logger.trace('Cache created for queue', { queueName });
}
const cache = this.caches.get(queueName);
if (!cache) {
@ -222,7 +222,7 @@ export class QueueManager {
async initializeCache(queueName: string): Promise<void> {
const cache = this.getCache(queueName);
await cache.waitForReady(10000);
logger.info('Cache initialized for queue', { queueName });
this.logger.info('Cache initialized for queue', { queueName });
}
/**
@ -232,7 +232,7 @@ export class QueueManager {
private initializeBatchCacheSync(queueName: string): void {
// Just create the cache - it will connect automatically when first used
this.getCache(queueName);
logger.trace('Batch cache initialized synchronously for queue', { queueName });
this.logger.trace('Batch cache initialized synchronously for queue', { queueName });
}
/**
@ -321,7 +321,7 @@ export class QueueManager {
async pauseAll(): Promise<void> {
const pausePromises = Array.from(this.queues.values()).map(queue => queue.pause());
await Promise.all(pausePromises);
logger.info('All queues paused');
this.logger.info('All queues paused');
}
/**
@ -330,7 +330,7 @@ export class QueueManager {
async resumeAll(): Promise<void> {
const resumePromises = Array.from(this.queues.values()).map(queue => queue.resume());
await Promise.all(resumePromises);
logger.info('All queues resumed');
this.logger.info('All queues resumed');
}
/**
@ -365,7 +365,7 @@ export class QueueManager {
async drainAll(delayed = false): Promise<void> {
const drainPromises = Array.from(this.queues.values()).map(queue => queue.drain(delayed));
await Promise.all(drainPromises);
logger.info('All queues drained', { delayed });
this.logger.info('All queues drained', { delayed });
}
/**
@ -380,7 +380,7 @@ export class QueueManager {
queue.clean(grace, limit, type)
);
await Promise.all(cleanPromises);
logger.info('All queues cleaned', { type, grace, limit });
this.logger.info('All queues cleaned', { type, grace, limit });
}
/**
@ -397,7 +397,7 @@ export class QueueManager {
}
this.isShuttingDown = true;
logger.info('Shutting down QueueManager...');
this.logger.info('Shutting down QueueManager...');
// Create shutdown promise
this.shutdownPromise = this.performShutdown();
@ -420,7 +420,7 @@ export class QueueManager {
// await Promise.race([closePromise, timeoutPromise]);
} catch (error) {
logger.warn('Error closing queue', { error: (error as Error).message });
this.logger.warn('Error closing queue', { error: (error as Error).message });
}
});
@ -432,7 +432,7 @@ export class QueueManager {
// Clear cache before shutdown
await cache.clear();
} catch (error) {
logger.warn('Error clearing cache', { error: (error as Error).message });
this.logger.warn('Error clearing cache', { error: (error as Error).message });
}
});
@ -442,9 +442,9 @@ export class QueueManager {
this.queues.clear();
this.caches.clear();
logger.info('QueueManager shutdown complete');
this.logger.info('QueueManager shutdown complete');
} catch (error) {
logger.error('Error during shutdown', { error: (error as Error).message });
this.logger.error('Error during shutdown', { error: (error as Error).message });
throw error;
} finally {
// Reset shutdown state
@ -458,7 +458,7 @@ export class QueueManager {
*/
startAllWorkers(): void {
if (!this.config.delayWorkerStart) {
logger.info(
this.logger.info(
'startAllWorkers() called but workers already started automatically (delayWorkerStart is false)'
);
return;
@ -475,7 +475,7 @@ export class QueueManager {
}
}
logger.info('All workers started', {
this.logger.info('All workers started', {
totalQueues: this.queues.size,
queuesWithWorkers: workersStarted,
delayWorkerStart: this.config.delayWorkerStart,

View file

@ -1,11 +1,8 @@
import { Queue as BullQueue, QueueEvents, Worker, type Job } from 'bullmq';
import { getLogger } from '@stock-bot/logger';
import { handlerRegistry } from '@stock-bot/types';
import type { JobData, JobOptions, QueueStats, RedisConfig } from './types';
import { getRedisConnection } from './utils';
const logger = getLogger('queue');
export interface QueueWorkerConfig {
workers?: number;
concurrency?: number;
@ -22,15 +19,18 @@ export class Queue {
private queueEvents?: QueueEvents;
private queueName: string;
private redisConfig: RedisConfig;
private readonly logger: any;
constructor(
queueName: string,
redisConfig: RedisConfig,
defaultJobOptions: JobOptions = {},
config: QueueWorkerConfig = {}
config: QueueWorkerConfig = {},
logger?: any
) {
this.queueName = queueName;
this.redisConfig = redisConfig;
this.logger = logger || console;
const connection = getRedisConnection(redisConfig);
@ -59,7 +59,7 @@ export class Queue {
this.startWorkers(config.workers, config.concurrency || 1);
}
logger.trace('Queue created', {
this.logger.trace('Queue created', {
queueName,
workers: config.workers || 0,
concurrency: config.concurrency || 1,
@ -77,7 +77,7 @@ export class Queue {
* Add a single job to the queue
*/
async add(name: string, data: JobData, options: JobOptions = {}): Promise<Job> {
logger.trace('Adding job', { queueName: this.queueName, jobName: name });
this.logger.trace('Adding job', { queueName: this.queueName, jobName: name });
return await this.bullQueue.add(name, data, options);
}
@ -85,7 +85,7 @@ export class Queue {
* Add multiple jobs to the queue in bulk
*/
async addBulk(jobs: Array<{ name: string; data: JobData; opts?: JobOptions }>): Promise<Job[]> {
logger.trace('Adding bulk jobs', {
this.logger.trace('Adding bulk jobs', {
queueName: this.queueName,
jobCount: jobs.length,
});
@ -111,7 +111,7 @@ export class Queue {
},
};
logger.info('Adding scheduled job', {
this.logger.info('Adding scheduled job', {
queueName: this.queueName,
jobName: name,
cronPattern,
@ -170,7 +170,7 @@ export class Queue {
*/
async pause(): Promise<void> {
await this.bullQueue.pause();
logger.info('Queue paused', { queueName: this.queueName });
this.logger.info('Queue paused', { queueName: this.queueName });
}
/**
@ -178,7 +178,7 @@ export class Queue {
*/
async resume(): Promise<void> {
await this.bullQueue.resume();
logger.info('Queue resumed', { queueName: this.queueName });
this.logger.info('Queue resumed', { queueName: this.queueName });
}
/**
@ -186,7 +186,7 @@ export class Queue {
*/
async drain(delayed = false): Promise<void> {
await this.bullQueue.drain(delayed);
logger.info('Queue drained', { queueName: this.queueName, delayed });
this.logger.info('Queue drained', { queueName: this.queueName, delayed });
}
/**
@ -198,7 +198,7 @@ export class Queue {
type: 'completed' | 'failed' = 'completed'
): Promise<void> {
await this.bullQueue.clean(grace, limit, type);
logger.debug('Queue cleaned', { queueName: this.queueName, type, grace, limit });
this.logger.debug('Queue cleaned', { queueName: this.queueName, type, grace, limit });
}
/**
@ -218,12 +218,12 @@ export class Queue {
try {
// Close the queue itself
await this.bullQueue.close();
logger.info('Queue closed', { queueName: this.queueName });
this.logger.info('Queue closed', { queueName: this.queueName });
// Close queue events
if (this.queueEvents) {
await this.queueEvents.close();
logger.debug('Queue events closed', { queueName: this.queueName });
this.logger.debug('Queue events closed', { queueName: this.queueName });
}
// Close workers first
@ -234,14 +234,26 @@ export class Queue {
})
);
this.workers = [];
logger.debug('Workers closed', { queueName: this.queueName });
this.logger.debug('Workers closed', { queueName: this.queueName });
}
} catch (error) {
logger.error('Error closing queue', { queueName: this.queueName, error });
this.logger.error('Error closing queue', { queueName: this.queueName, error });
throw error;
}
}
/**
* Create a child logger with additional context
* Useful for batch processing and other queue operations
*/
createChildLogger(name: string, context?: any) {
if (this.logger && typeof this.logger.child === 'function') {
return this.logger.child(name, context);
}
// Fallback to main logger if child not supported (e.g., console)
return this.logger;
}
/**
* Start workers for this queue
*/
@ -258,7 +270,7 @@ export class Queue {
// Setup worker event handlers
worker.on('completed', job => {
logger.trace('Job completed', {
this.logger.trace('Job completed', {
queueName: this.queueName,
jobId: job.id,
handler: job.data?.handler,
@ -267,7 +279,7 @@ export class Queue {
});
worker.on('failed', (job, err) => {
logger.error('Job failed', {
this.logger.error('Job failed', {
queueName: this.queueName,
jobId: job?.id,
handler: job?.data?.handler,
@ -277,7 +289,7 @@ export class Queue {
});
worker.on('error', error => {
logger.error('Worker error', {
this.logger.error('Worker error', {
queueName: this.queueName,
workerId: i,
error: error.message,
@ -287,7 +299,7 @@ export class Queue {
this.workers.push(worker);
}
logger.info('Workers started', {
this.logger.info('Workers started', {
queueName: this.queueName,
workerCount,
concurrency,
@ -300,7 +312,7 @@ export class Queue {
private async processJob(job: Job): Promise<unknown> {
const { handler, operation, payload }: JobData = job.data;
logger.trace('Processing job', {
this.logger.trace('Processing job', {
id: job.id,
handler,
operation,
@ -317,7 +329,7 @@ export class Queue {
const result = await jobHandler(payload);
logger.trace('Job completed successfully', {
this.logger.trace('Job completed successfully', {
id: job.id,
handler,
operation,
@ -326,7 +338,7 @@ export class Queue {
return result;
} catch (error) {
logger.error('Job processing failed', {
this.logger.error('Job processing failed', {
id: job.id,
handler,
operation,
@ -342,7 +354,7 @@ export class Queue {
*/
startWorkersManually(workerCount: number, concurrency: number = 1): void {
if (this.workers.length > 0) {
logger.warn('Workers already started for queue', { queueName: this.queueName });
this.logger.warn('Workers already started for queue', { queueName: this.queueName });
return;
}

View file

@ -1,9 +1,6 @@
import { RateLimiterRedis, RateLimiterRes } from 'rate-limiter-flexible';
import { getLogger } from '@stock-bot/logger';
import type { RateLimitConfig as BaseRateLimitConfig, RateLimitRule } from './types';
const logger = getLogger('rate-limiter');
// Extend the base config to add rate-limiter specific fields
export interface RateLimitConfig extends BaseRateLimitConfig {
keyPrefix?: string;
@ -12,8 +9,14 @@ export interface RateLimitConfig extends BaseRateLimitConfig {
export class QueueRateLimiter {
private limiters = new Map<string, RateLimiterRedis>();
private rules: RateLimitRule[] = [];
private readonly logger: any;
constructor(private redisClient: ReturnType<typeof import('./utils').getRedisConnection>) {}
constructor(
private redisClient: ReturnType<typeof import('./utils').getRedisConnection>,
logger?: any
) {
this.logger = logger || console;
}
/**
* Add a rate limit rule
@ -32,7 +35,7 @@ export class QueueRateLimiter {
this.limiters.set(key, limiter);
logger.info('Rate limit rule added', {
this.logger.info('Rate limit rule added', {
level: rule.level,
queueName: rule.queueName,
handler: rule.handler,
@ -72,7 +75,7 @@ export class QueueRateLimiter {
const limiter = this.limiters.get(key);
if (!limiter) {
logger.warn('Rate limiter not found for rule', { key, rule: applicableRule });
this.logger.warn('Rate limiter not found for rule', { key, rule: applicableRule });
return { allowed: true };
}
@ -87,7 +90,7 @@ export class QueueRateLimiter {
appliedRule: applicableRule,
};
} catch (error) {
logger.error('Rate limit check failed', { queueName, handler, operation, error });
this.logger.error('Rate limit check failed', { queueName, handler, operation, error });
// On error, allow the request to proceed
return { allowed: true };
}
@ -148,7 +151,7 @@ export class QueueRateLimiter {
};
} catch (rejRes) {
if (rejRes instanceof RateLimiterRes) {
logger.warn('Rate limit exceeded', {
this.logger.warn('Rate limit exceeded', {
key,
retryAfter: rejRes.msBeforeNext,
});
@ -260,7 +263,7 @@ export class QueueRateLimiter {
limit,
};
} catch (error) {
logger.error('Failed to get rate limit status', { queueName, handler, operation, error });
this.logger.error('Failed to get rate limit status', { queueName, handler, operation, error });
return {
queueName,
handler,
@ -288,10 +291,10 @@ export class QueueRateLimiter {
}
} else {
// Reset broader scope - this is more complex with the new hierarchy
logger.warn('Broad reset not implemented yet', { queueName, handler, operation });
this.logger.warn('Broad reset not implemented yet', { queueName, handler, operation });
}
logger.info('Rate limits reset', { queueName, handler, operation });
this.logger.info('Rate limits reset', { queueName, handler, operation });
}
/**
@ -318,7 +321,7 @@ export class QueueRateLimiter {
this.rules.splice(ruleIndex, 1);
this.limiters.delete(key);
logger.info('Rate limit rule removed', { level, queueName, handler, operation });
this.logger.info('Rate limit rule removed', { level, queueName, handler, operation });
return true;
}