This commit is contained in:
Boki 2025-06-23 13:55:22 -04:00
parent 8a1a28b26e
commit d76f0ff5ff
3 changed files with 22 additions and 16 deletions

View file

@ -98,7 +98,7 @@ export async function processIndividualSymbol(
await this.scheduleOperation('process-individual-symbol', {
ceoId: ceoId,
timestamp: latestSpielTime,
});
}, {priority: 1});
}
this.logger.info(

View file

@ -41,7 +41,7 @@ export class RedisConnectionManager {
if (singleton) {
// Use shared connection across all instances
if (!RedisConnectionManager.sharedConnections.has(name)) {
const connection = this.createConnection(name, redisConfig, db);
const connection = this.createConnection(name, redisConfig, db, logger);
RedisConnectionManager.sharedConnections.set(name, connection);
this.logger.info(`Created shared Redis connection: ${name}`);
}
@ -53,7 +53,7 @@ export class RedisConnectionManager {
} else {
// Create unique connection per instance
const uniqueName = `${name}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
const connection = this.createConnection(uniqueName, redisConfig, db);
const connection = this.createConnection(uniqueName, redisConfig, db, logger);
this.connections.set(uniqueName, connection);
this.logger.debug(`Created unique Redis connection: ${uniqueName}`);
return connection;
@ -63,7 +63,7 @@ export class RedisConnectionManager {
/**
* Create a new Redis connection with configuration
*/
private createConnection(name: string, config: RedisConfig, db?: number): Redis {
private createConnection(name: string, config: RedisConfig, db?: number, logger?: any): Redis {
const redisOptions = {
host: config.host,
port: config.port,
@ -88,26 +88,29 @@ export class RedisConnectionManager {
};
const redis = new Redis(redisOptions);
// Use the provided logger or fall back to instance logger
const log = logger || this.logger;
// Setup event handlers
redis.on('connect', () => {
this.logger.info(`Redis connection established: ${name}`);
log.info(`Redis connection established: ${name}`);
});
redis.on('ready', () => {
this.logger.info(`Redis connection ready: ${name}`);
log.info(`Redis connection ready: ${name}`);
});
redis.on('error', err => {
this.logger.error(`Redis connection error for ${name}:`, err);
log.error(`Redis connection error for ${name}:`, err);
});
redis.on('close', () => {
this.logger.warn(`Redis connection closed: ${name}`);
log.warn(`Redis connection closed: ${name}`);
});
redis.on('reconnecting', () => {
this.logger.warn(`Redis reconnecting: ${name}`);
log.warn(`Redis reconnecting: ${name}`);
});
return redis;

View file

@ -75,11 +75,13 @@ export class QueueManager {
};
// Prepare queue configuration
const workers = mergedOptions.workers ?? this.config.defaultQueueOptions?.workers ?? 1;
const concurrency = mergedOptions.concurrency ?? this.config.defaultQueueOptions?.concurrency ?? 1;
const queueConfig: QueueWorkerConfig = {
workers: mergedOptions.workers,
concurrency: mergedOptions.concurrency,
startWorker:
!!mergedOptions.workers && mergedOptions.workers > 0 && !this.config.delayWorkerStart,
workers,
concurrency,
startWorker: workers > 0 && !this.config.delayWorkerStart,
};
const queue = new Queue(
@ -109,8 +111,8 @@ export class QueueManager {
this.logger.info('Queue created with batch cache', {
queueName,
workers: mergedOptions.workers || 0,
concurrency: mergedOptions.concurrency || 1,
workers: workers,
concurrency: concurrency,
});
return queue;
@ -140,6 +142,7 @@ export class QueueManager {
keyPrefix: `batch:${queueName}:`,
ttl: 86400, // 24 hours default
enableMetrics: true,
logger: this.logger,
});
this.caches.set(queueName, cacheProvider);
this.logger.trace('Cache created for queue', { queueName });
@ -210,7 +213,7 @@ export class QueueManager {
*/
addRateLimitRule(rule: RateLimitRule): void {
if (!this.rateLimiter) {
this.rateLimiter = new QueueRateLimiter(this.redisConnection);
this.rateLimiter = new QueueRateLimiter(this.redisConnection, this.logger);
}
this.rateLimiter.addRule(rule);
}