stock-bot/libs/queue/src/queue-manager.ts
2025-06-19 08:31:21 -04:00

395 lines
11 KiB
TypeScript

import { getLogger } from '@stock-bot/logger';
import { QueueRateLimiter } from './rate-limiter';
import { Queue, type QueueWorkerConfig } from './queue';
import { CacheProvider, createCache } from '@stock-bot/cache';
import type {
QueueManagerConfig,
QueueOptions,
GlobalStats,
QueueStats,
RateLimitRule,
RedisConfig
} from './types';
import { getRedisConnection } from './utils';
const logger = getLogger('queue-manager');
/**
* Singleton QueueManager that provides unified queue and cache management
* Main entry point for all queue operations with getQueue() method
*/
export class QueueManager {
private static instance: QueueManager | null = null;
private queues = new Map<string, Queue>();
private caches = new Map<string, CacheProvider>();
private rateLimiter?: QueueRateLimiter;
private redisConnection: ReturnType<typeof getRedisConnection>;
private isShuttingDown = false;
private isInitialized = false;
private constructor(private config: QueueManagerConfig) {
this.redisConnection = getRedisConnection(config.redis);
// Initialize rate limiter if rules are provided
if (config.rateLimitRules && config.rateLimitRules.length > 0) {
this.rateLimiter = new QueueRateLimiter(this.redisConnection);
config.rateLimitRules.forEach(rule => {
this.rateLimiter!.addRule(rule);
});
}
this.isInitialized = true;
logger.info('QueueManager singleton initialized');
}
/**
* Get the singleton instance
*/
static getInstance(config?: QueueManagerConfig): QueueManager {
if (!QueueManager.instance) {
if (!config) {
throw new Error('QueueManager not initialized. Provide config on first call.');
}
QueueManager.instance = new QueueManager(config);
}
return QueueManager.instance;
}
/**
* Initialize the singleton with config
*/
static initialize(config: QueueManagerConfig): QueueManager {
if (QueueManager.instance) {
logger.warn('QueueManager already initialized, returning existing instance');
return QueueManager.instance;
}
QueueManager.instance = new QueueManager(config);
return QueueManager.instance;
}
/**
* Reset the singleton (mainly for testing)
*/
static async reset(): Promise<void> {
if (QueueManager.instance) {
await QueueManager.instance.shutdown();
QueueManager.instance = null;
}
}
/**
* Get or create a queue - unified method that handles both scenarios
* This is the main method for accessing queues
*/
getQueue(queueName: string, options: QueueOptions = {}): Queue {
// Return existing queue if it exists
if (this.queues.has(queueName)) {
return this.queues.get(queueName)!;
}
// Create new queue with merged options
const mergedOptions = {
...this.config.defaultQueueOptions,
...options,
};
// Prepare queue configuration
const queueConfig: QueueWorkerConfig = {
workers: mergedOptions.workers,
concurrency: mergedOptions.concurrency,
startWorker: mergedOptions.workers && mergedOptions.workers > 0,
};
const queue = new Queue(
queueName,
this.config.redis,
mergedOptions.defaultJobOptions || {},
queueConfig
);
// Store the queue
this.queues.set(queueName, queue);
// Automatically initialize batch cache for the queue
this.initializeBatchCacheSync(queueName);
// Add queue-specific rate limit rules
if (this.rateLimiter && mergedOptions.rateLimitRules) {
mergedOptions.rateLimitRules.forEach(rule => {
// Ensure queue name is set for queue-specific rules
const ruleWithQueue = { ...rule, queueName };
this.rateLimiter!.addRule(ruleWithQueue);
});
}
logger.info('Queue created with batch cache', {
queueName,
workers: mergedOptions.workers || 0,
concurrency: mergedOptions.concurrency || 1
});
return queue;
}
/**
* Check if a queue exists
*/
hasQueue(queueName: string): boolean {
return this.queues.has(queueName);
}
/**
* Get all queue names
*/
getQueueNames(): string[] {
return Array.from(this.queues.keys());
}
/**
* Get or create a cache for a queue
*/
getCache(queueName: string): CacheProvider {
if (!this.caches.has(queueName)) {
const cacheProvider = createCache({
redisConfig: this.config.redis,
keyPrefix: `batch:${queueName}:`,
ttl: 86400, // 24 hours default
enableMetrics: true,
});
this.caches.set(queueName, cacheProvider);
logger.debug('Cache created for queue', { queueName });
}
return this.caches.get(queueName)!;
}
/**
* Initialize cache for a queue (ensures it's ready)
*/
async initializeCache(queueName: string): Promise<void> {
const cache = this.getCache(queueName);
await cache.waitForReady(10000);
logger.info('Cache initialized for queue', { queueName });
}
/**
* Initialize batch cache synchronously (for automatic initialization)
* The cache will be ready for use, but we don't wait for Redis connection
*/
private initializeBatchCacheSync(queueName: string): void {
// Just create the cache - it will connect automatically when first used
this.getCache(queueName);
logger.debug('Batch cache initialized synchronously for queue', { queueName });
}
/**
* Get statistics for all queues
*/
async getGlobalStats(): Promise<GlobalStats> {
const queueStats: Record<string, QueueStats> = {};
let totalJobs = 0;
let totalWorkers = 0;
for (const [queueName, queue] of this.queues) {
const stats = await queue.getStats();
queueStats[queueName] = stats;
totalJobs += stats.waiting + stats.active + stats.completed + stats.failed + stats.delayed;
totalWorkers += stats.workers || 0;
}
return {
queues: queueStats,
totalJobs,
totalWorkers,
uptime: process.uptime(),
};
}
/**
* Get statistics for a specific queue
*/
async getQueueStats(queueName: string): Promise<QueueStats | undefined> {
const queue = this.queues.get(queueName);
if (!queue) {
return undefined;
}
return await queue.getStats();
}
/**
* Add a rate limit rule
*/
addRateLimitRule(rule: RateLimitRule): void {
if (!this.rateLimiter) {
this.rateLimiter = new QueueRateLimiter(this.redisConnection);
}
this.rateLimiter.addRule(rule);
}
/**
* Check rate limits for a job
*/
async checkRateLimit(queueName: string, handler: string, operation: string): Promise<{
allowed: boolean;
retryAfter?: number;
remainingPoints?: number;
appliedRule?: RateLimitRule;
}> {
if (!this.rateLimiter) {
return { allowed: true };
}
return await this.rateLimiter.checkLimit(queueName, handler, operation);
}
/**
* Get rate limit status
*/
async getRateLimitStatus(queueName: string, handler: string, operation: string) {
if (!this.rateLimiter) {
return {
queueName,
handler,
operation,
};
}
return await this.rateLimiter.getStatus(queueName, handler, operation);
}
/**
* Pause all queues
*/
async pauseAll(): Promise<void> {
const pausePromises = Array.from(this.queues.values()).map(queue => queue.pause());
await Promise.all(pausePromises);
logger.info('All queues paused');
}
/**
* Resume all queues
*/
async resumeAll(): Promise<void> {
const resumePromises = Array.from(this.queues.values()).map(queue => queue.resume());
await Promise.all(resumePromises);
logger.info('All queues resumed');
}
/**
* Pause a specific queue
*/
async pauseQueue(queueName: string): Promise<boolean> {
const queue = this.queues.get(queueName);
if (!queue) {
return false;
}
await queue.pause();
return true;
}
/**
* Resume a specific queue
*/
async resumeQueue(queueName: string): Promise<boolean> {
const queue = this.queues.get(queueName);
if (!queue) {
return false;
}
await queue.resume();
return true;
}
/**
* Drain all queues
*/
async drainAll(delayed = false): Promise<void> {
const drainPromises = Array.from(this.queues.values()).map(queue => queue.drain(delayed));
await Promise.all(drainPromises);
logger.info('All queues drained', { delayed });
}
/**
* Clean all queues
*/
async cleanAll(
grace: number = 0,
limit: number = 100,
type: 'completed' | 'failed' = 'completed'
): Promise<void> {
const cleanPromises = Array.from(this.queues.values()).map(queue =>
queue.clean(grace, limit, type)
);
await Promise.all(cleanPromises);
logger.info('All queues cleaned', { type, grace, limit });
}
/**
* Shutdown all queues and workers
*/
async shutdown(): Promise<void> {
if (this.isShuttingDown) {
return;
}
this.isShuttingDown = true;
logger.info('Shutting down QueueManager...');
try {
// Close all queues (this now includes workers since they're managed by Queue class)
const queueShutdownPromises = Array.from(this.queues.values()).map(async (queue) => {
try {
await queue.close();
} catch (error) {
logger.warn('Error closing queue', { error: error.message });
}
});
await Promise.all(queueShutdownPromises);
// Close all caches
const cacheShutdownPromises = Array.from(this.caches.values()).map(async (cache) => {
try {
// Try different disconnect methods as different cache providers may use different names
if (typeof cache.disconnect === 'function') {
await cache.disconnect();
} else if (typeof cache.close === 'function') {
await cache.close();
} else if (typeof cache.quit === 'function') {
await cache.quit();
}
} catch (error) {
logger.warn('Error closing cache', { error: error.message });
}
});
await Promise.all(cacheShutdownPromises);
// Clear collections
this.queues.clear();
this.caches.clear();
logger.info('QueueManager shutdown complete');
} catch (error) {
logger.error('Error during shutdown', { error: error.message });
throw error;
}
}
/**
* Wait for all queues to be ready
*/
async waitUntilReady(): Promise<void> {
const readyPromises = Array.from(this.queues.values()).map(queue => queue.waitUntilReady());
await Promise.all(readyPromises);
}
/**
* Get Redis configuration (for backward compatibility)
*/
getRedisConfig() {
return this.config.redis;
}
}