This commit is contained in:
Boki 2025-06-19 08:34:16 -04:00
parent a2fa08de88
commit 0d8119e3de
2 changed files with 348 additions and 1 deletions

View file

@ -0,0 +1,347 @@
import { getLogger } from '@stock-bot/logger';
import { QueueRateLimiter } from './rate-limiter';
import { Queue, type QueueWorkerConfig } from './queue';
import { CacheProvider, createCache } from '@stock-bot/cache';
import type {
QueueManagerConfig,
QueueOptions,
GlobalStats,
QueueStats,
RateLimitRule,
RedisConfig
} from './types';
import { getRedisConnection } from './utils';
const logger = getLogger('queue-manager');
/**
* Improved Singleton QueueManager with better patterns
*/
export class QueueManager {
private static instance: QueueManager | null = null;
private static initializationPromise: Promise<QueueManager> | null = null;
private static config: QueueManagerConfig | null = null;
private queues = new Map<string, Queue>();
private caches = new Map<string, CacheProvider>();
private rateLimiter?: QueueRateLimiter;
private redisConnection: ReturnType<typeof getRedisConnection>;
private isShuttingDown = false;
private shutdownPromise: Promise<void> | null = null;
private constructor(config: QueueManagerConfig) {
this.redisConnection = getRedisConnection(config.redis);
// Initialize rate limiter if rules are provided
if (config.rateLimitRules && config.rateLimitRules.length > 0) {
this.rateLimiter = new QueueRateLimiter(this.redisConnection);
config.rateLimitRules.forEach(rule => {
this.rateLimiter!.addRule(rule);
});
}
logger.info('QueueManager singleton initialized');
}
/**
* Initialize the singleton with config (thread-safe)
* Should be called once at application startup
*/
static async initialize(config: QueueManagerConfig): Promise<QueueManager> {
// If already initializing, return the same promise (prevents race conditions)
if (QueueManager.initializationPromise) {
logger.debug('QueueManager initialization already in progress');
return QueueManager.initializationPromise;
}
// If already initialized, validate config matches
if (QueueManager.instance) {
if (!QueueManager.isConfigEqual(config, QueueManager.config!)) {
throw new Error('QueueManager already initialized with different config');
}
return QueueManager.instance;
}
// Start initialization
QueueManager.initializationPromise = (async () => {
try {
QueueManager.config = config;
QueueManager.instance = new QueueManager(config);
// Wait for connections to be ready
await QueueManager.instance.waitUntilReady();
return QueueManager.instance;
} catch (error) {
// Clean up on failure
QueueManager.instance = null;
QueueManager.config = null;
QueueManager.initializationPromise = null;
throw error;
}
})();
return QueueManager.initializationPromise;
}
/**
* Get the singleton instance
* Throws if not initialized - forces explicit initialization
*/
static getInstance(): QueueManager {
if (!QueueManager.instance) {
throw new Error(
'QueueManager not initialized. Call QueueManager.initialize(config) first.'
);
}
return QueueManager.instance;
}
/**
* Check if QueueManager is initialized
*/
static isInitialized(): boolean {
return QueueManager.instance !== null;
}
/**
* Get current configuration (readonly)
*/
static getConfig(): Readonly<QueueManagerConfig> | null {
return QueueManager.config ? { ...QueueManager.config } : null;
}
/**
* Reset the singleton (thread-safe, mainly for testing)
*/
static async reset(): Promise<void> {
// Wait for any ongoing initialization
if (QueueManager.initializationPromise) {
try {
await QueueManager.initializationPromise;
} catch {
// Ignore initialization errors during reset
}
}
if (QueueManager.instance) {
await QueueManager.instance.shutdown();
}
QueueManager.instance = null;
QueueManager.config = null;
QueueManager.initializationPromise = null;
}
/**
* Compare two configs for equality
*/
private static isConfigEqual(a: QueueManagerConfig, b: QueueManagerConfig): boolean {
return (
a.redis.host === b.redis.host &&
a.redis.port === b.redis.port &&
a.redis.password === b.redis.password &&
a.redis.db === b.redis.db
);
}
/**
* Wait until all connections are ready
*/
async waitUntilReady(timeout: number = 5000): Promise<void> {
const startTime = Date.now();
// Wait for all queues to be ready
const readyPromises = Array.from(this.queues.values()).map(queue =>
queue.waitUntilReady()
);
// Add timeout
await Promise.race([
Promise.all(readyPromises),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('QueueManager ready timeout')), timeout)
)
]);
logger.debug('QueueManager ready', {
duration: Date.now() - startTime
});
}
/**
* Get or create a queue - unified method that handles both scenarios
*/
getQueue(queueName: string, options: QueueOptions = {}): Queue {
if (this.isShuttingDown) {
throw new Error('QueueManager is shutting down');
}
// Return existing queue if it exists
if (this.queues.has(queueName)) {
return this.queues.get(queueName)!;
}
// Create new queue with merged options
const mergedOptions = {
...QueueManager.config!.defaultQueueOptions,
...options,
};
// Prepare queue configuration
const queueConfig: QueueWorkerConfig = {
workers: mergedOptions.workers,
concurrency: mergedOptions.concurrency,
startWorker: mergedOptions.workers && mergedOptions.workers > 0,
};
const queue = new Queue(
queueName,
QueueManager.config!.redis,
mergedOptions.defaultJobOptions || {},
queueConfig
);
// Store the queue
this.queues.set(queueName, queue);
// Automatically initialize batch cache for the queue
this.initializeBatchCacheSync(queueName);
// Add queue-specific rate limit rules
if (this.rateLimiter && mergedOptions.rateLimitRules) {
mergedOptions.rateLimitRules.forEach(rule => {
const ruleWithQueue = { ...rule, queueName };
this.rateLimiter!.addRule(ruleWithQueue);
});
}
logger.info('Queue created with batch cache', {
queueName,
workers: mergedOptions.workers || 0,
concurrency: mergedOptions.concurrency || 1
});
return queue;
}
/**
* Initialize batch cache synchronously
*/
private initializeBatchCacheSync(queueName: string): void {
this.getCache(queueName);
logger.debug('Batch cache initialized synchronously for queue', { queueName });
}
/**
* Get or create a cache for a queue
*/
getCache(queueName: string): CacheProvider {
if (!this.caches.has(queueName)) {
const cacheProvider = createCache({
redisConfig: QueueManager.config!.redis,
keyPrefix: `batch:${queueName}:`,
ttl: 86400, // 24 hours default
enableMetrics: true,
});
this.caches.set(queueName, cacheProvider);
logger.debug('Cache created for queue', { queueName });
}
return this.caches.get(queueName)!;
}
/**
* Shutdown all queues and workers (thread-safe)
*/
async shutdown(): Promise<void> {
// If already shutting down, return the existing promise
if (this.shutdownPromise) {
return this.shutdownPromise;
}
if (this.isShuttingDown) {
return;
}
this.isShuttingDown = true;
logger.info('Shutting down QueueManager...');
this.shutdownPromise = this.performShutdown();
return this.shutdownPromise;
}
/**
* Perform the actual shutdown
*/
private async performShutdown(): Promise<void> {
try {
// Close all queues (this now includes workers)
const queueShutdownPromises = Array.from(this.queues.values()).map(async (queue) => {
try {
await queue.close();
} catch (error) {
logger.warn('Error closing queue', { error: error.message });
}
});
await Promise.all(queueShutdownPromises);
// Close all caches
const cacheShutdownPromises = Array.from(this.caches.values()).map(async (cache) => {
try {
if (typeof cache.disconnect === 'function') {
await cache.disconnect();
} else if (typeof cache.close === 'function') {
await cache.close();
} else if (typeof cache.quit === 'function') {
await cache.quit();
}
} catch (error) {
logger.warn('Error closing cache', { error: error.message });
}
});
await Promise.all(cacheShutdownPromises);
// Clear collections
this.queues.clear();
this.caches.clear();
logger.info('QueueManager shutdown complete');
} catch (error) {
logger.error('Error during shutdown', { error: error.message });
throw error;
}
}
// ... rest of the methods remain the same ...
hasQueue(queueName: string): boolean {
return this.queues.has(queueName);
}
getQueueNames(): string[] {
return Array.from(this.queues.keys());
}
async getGlobalStats(): Promise<GlobalStats> {
const queueStats: Record<string, QueueStats> = {};
let totalJobs = 0;
let totalWorkers = 0;
for (const [queueName, queue] of this.queues) {
const stats = await queue.getStats();
queueStats[queueName] = stats;
totalJobs += stats.waiting + stats.active + stats.completed + stats.failed + stats.delayed;
totalWorkers += stats.workers || 0;
}
return {
queues: queueStats,
totalJobs,
totalWorkers,
uptime: process.uptime(),
};
}
}

View file

@ -17,7 +17,7 @@ export {
} from './queue-factory';
// DLQ handling
export { DLQHandler } from './dlq-handler';
export { DeadLetterQueueHandler, DeadLetterQueueHandler as DLQHandler } from './dlq-handler';
// Metrics
export { QueueMetricsCollector } from './queue-metrics';