removed config from queue, will be injected
This commit is contained in:
parent
0d8119e3de
commit
e924c8b6bc
2 changed files with 65 additions and 357 deletions
|
|
@ -1,347 +0,0 @@
|
||||||
import { getLogger } from '@stock-bot/logger';
|
|
||||||
import { QueueRateLimiter } from './rate-limiter';
|
|
||||||
import { Queue, type QueueWorkerConfig } from './queue';
|
|
||||||
import { CacheProvider, createCache } from '@stock-bot/cache';
|
|
||||||
import type {
|
|
||||||
QueueManagerConfig,
|
|
||||||
QueueOptions,
|
|
||||||
GlobalStats,
|
|
||||||
QueueStats,
|
|
||||||
RateLimitRule,
|
|
||||||
RedisConfig
|
|
||||||
} from './types';
|
|
||||||
import { getRedisConnection } from './utils';
|
|
||||||
|
|
||||||
const logger = getLogger('queue-manager');
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Improved Singleton QueueManager with better patterns
|
|
||||||
*/
|
|
||||||
export class QueueManager {
|
|
||||||
private static instance: QueueManager | null = null;
|
|
||||||
private static initializationPromise: Promise<QueueManager> | null = null;
|
|
||||||
private static config: QueueManagerConfig | null = null;
|
|
||||||
|
|
||||||
private queues = new Map<string, Queue>();
|
|
||||||
private caches = new Map<string, CacheProvider>();
|
|
||||||
private rateLimiter?: QueueRateLimiter;
|
|
||||||
private redisConnection: ReturnType<typeof getRedisConnection>;
|
|
||||||
private isShuttingDown = false;
|
|
||||||
private shutdownPromise: Promise<void> | null = null;
|
|
||||||
|
|
||||||
private constructor(config: QueueManagerConfig) {
|
|
||||||
this.redisConnection = getRedisConnection(config.redis);
|
|
||||||
|
|
||||||
// Initialize rate limiter if rules are provided
|
|
||||||
if (config.rateLimitRules && config.rateLimitRules.length > 0) {
|
|
||||||
this.rateLimiter = new QueueRateLimiter(this.redisConnection);
|
|
||||||
config.rateLimitRules.forEach(rule => {
|
|
||||||
this.rateLimiter!.addRule(rule);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info('QueueManager singleton initialized');
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Initialize the singleton with config (thread-safe)
|
|
||||||
* Should be called once at application startup
|
|
||||||
*/
|
|
||||||
static async initialize(config: QueueManagerConfig): Promise<QueueManager> {
|
|
||||||
// If already initializing, return the same promise (prevents race conditions)
|
|
||||||
if (QueueManager.initializationPromise) {
|
|
||||||
logger.debug('QueueManager initialization already in progress');
|
|
||||||
return QueueManager.initializationPromise;
|
|
||||||
}
|
|
||||||
|
|
||||||
// If already initialized, validate config matches
|
|
||||||
if (QueueManager.instance) {
|
|
||||||
if (!QueueManager.isConfigEqual(config, QueueManager.config!)) {
|
|
||||||
throw new Error('QueueManager already initialized with different config');
|
|
||||||
}
|
|
||||||
return QueueManager.instance;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start initialization
|
|
||||||
QueueManager.initializationPromise = (async () => {
|
|
||||||
try {
|
|
||||||
QueueManager.config = config;
|
|
||||||
QueueManager.instance = new QueueManager(config);
|
|
||||||
|
|
||||||
// Wait for connections to be ready
|
|
||||||
await QueueManager.instance.waitUntilReady();
|
|
||||||
|
|
||||||
return QueueManager.instance;
|
|
||||||
} catch (error) {
|
|
||||||
// Clean up on failure
|
|
||||||
QueueManager.instance = null;
|
|
||||||
QueueManager.config = null;
|
|
||||||
QueueManager.initializationPromise = null;
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
})();
|
|
||||||
|
|
||||||
return QueueManager.initializationPromise;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the singleton instance
|
|
||||||
* Throws if not initialized - forces explicit initialization
|
|
||||||
*/
|
|
||||||
static getInstance(): QueueManager {
|
|
||||||
if (!QueueManager.instance) {
|
|
||||||
throw new Error(
|
|
||||||
'QueueManager not initialized. Call QueueManager.initialize(config) first.'
|
|
||||||
);
|
|
||||||
}
|
|
||||||
return QueueManager.instance;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Check if QueueManager is initialized
|
|
||||||
*/
|
|
||||||
static isInitialized(): boolean {
|
|
||||||
return QueueManager.instance !== null;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get current configuration (readonly)
|
|
||||||
*/
|
|
||||||
static getConfig(): Readonly<QueueManagerConfig> | null {
|
|
||||||
return QueueManager.config ? { ...QueueManager.config } : null;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Reset the singleton (thread-safe, mainly for testing)
|
|
||||||
*/
|
|
||||||
static async reset(): Promise<void> {
|
|
||||||
// Wait for any ongoing initialization
|
|
||||||
if (QueueManager.initializationPromise) {
|
|
||||||
try {
|
|
||||||
await QueueManager.initializationPromise;
|
|
||||||
} catch {
|
|
||||||
// Ignore initialization errors during reset
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (QueueManager.instance) {
|
|
||||||
await QueueManager.instance.shutdown();
|
|
||||||
}
|
|
||||||
|
|
||||||
QueueManager.instance = null;
|
|
||||||
QueueManager.config = null;
|
|
||||||
QueueManager.initializationPromise = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Compare two configs for equality
|
|
||||||
*/
|
|
||||||
private static isConfigEqual(a: QueueManagerConfig, b: QueueManagerConfig): boolean {
|
|
||||||
return (
|
|
||||||
a.redis.host === b.redis.host &&
|
|
||||||
a.redis.port === b.redis.port &&
|
|
||||||
a.redis.password === b.redis.password &&
|
|
||||||
a.redis.db === b.redis.db
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wait until all connections are ready
|
|
||||||
*/
|
|
||||||
async waitUntilReady(timeout: number = 5000): Promise<void> {
|
|
||||||
const startTime = Date.now();
|
|
||||||
|
|
||||||
// Wait for all queues to be ready
|
|
||||||
const readyPromises = Array.from(this.queues.values()).map(queue =>
|
|
||||||
queue.waitUntilReady()
|
|
||||||
);
|
|
||||||
|
|
||||||
// Add timeout
|
|
||||||
await Promise.race([
|
|
||||||
Promise.all(readyPromises),
|
|
||||||
new Promise((_, reject) =>
|
|
||||||
setTimeout(() => reject(new Error('QueueManager ready timeout')), timeout)
|
|
||||||
)
|
|
||||||
]);
|
|
||||||
|
|
||||||
logger.debug('QueueManager ready', {
|
|
||||||
duration: Date.now() - startTime
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get or create a queue - unified method that handles both scenarios
|
|
||||||
*/
|
|
||||||
getQueue(queueName: string, options: QueueOptions = {}): Queue {
|
|
||||||
if (this.isShuttingDown) {
|
|
||||||
throw new Error('QueueManager is shutting down');
|
|
||||||
}
|
|
||||||
|
|
||||||
// Return existing queue if it exists
|
|
||||||
if (this.queues.has(queueName)) {
|
|
||||||
return this.queues.get(queueName)!;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create new queue with merged options
|
|
||||||
const mergedOptions = {
|
|
||||||
...QueueManager.config!.defaultQueueOptions,
|
|
||||||
...options,
|
|
||||||
};
|
|
||||||
|
|
||||||
// Prepare queue configuration
|
|
||||||
const queueConfig: QueueWorkerConfig = {
|
|
||||||
workers: mergedOptions.workers,
|
|
||||||
concurrency: mergedOptions.concurrency,
|
|
||||||
startWorker: mergedOptions.workers && mergedOptions.workers > 0,
|
|
||||||
};
|
|
||||||
|
|
||||||
const queue = new Queue(
|
|
||||||
queueName,
|
|
||||||
QueueManager.config!.redis,
|
|
||||||
mergedOptions.defaultJobOptions || {},
|
|
||||||
queueConfig
|
|
||||||
);
|
|
||||||
|
|
||||||
// Store the queue
|
|
||||||
this.queues.set(queueName, queue);
|
|
||||||
|
|
||||||
// Automatically initialize batch cache for the queue
|
|
||||||
this.initializeBatchCacheSync(queueName);
|
|
||||||
|
|
||||||
// Add queue-specific rate limit rules
|
|
||||||
if (this.rateLimiter && mergedOptions.rateLimitRules) {
|
|
||||||
mergedOptions.rateLimitRules.forEach(rule => {
|
|
||||||
const ruleWithQueue = { ...rule, queueName };
|
|
||||||
this.rateLimiter!.addRule(ruleWithQueue);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info('Queue created with batch cache', {
|
|
||||||
queueName,
|
|
||||||
workers: mergedOptions.workers || 0,
|
|
||||||
concurrency: mergedOptions.concurrency || 1
|
|
||||||
});
|
|
||||||
|
|
||||||
return queue;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Initialize batch cache synchronously
|
|
||||||
*/
|
|
||||||
private initializeBatchCacheSync(queueName: string): void {
|
|
||||||
this.getCache(queueName);
|
|
||||||
logger.debug('Batch cache initialized synchronously for queue', { queueName });
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get or create a cache for a queue
|
|
||||||
*/
|
|
||||||
getCache(queueName: string): CacheProvider {
|
|
||||||
if (!this.caches.has(queueName)) {
|
|
||||||
const cacheProvider = createCache({
|
|
||||||
redisConfig: QueueManager.config!.redis,
|
|
||||||
keyPrefix: `batch:${queueName}:`,
|
|
||||||
ttl: 86400, // 24 hours default
|
|
||||||
enableMetrics: true,
|
|
||||||
});
|
|
||||||
this.caches.set(queueName, cacheProvider);
|
|
||||||
logger.debug('Cache created for queue', { queueName });
|
|
||||||
}
|
|
||||||
return this.caches.get(queueName)!;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Shutdown all queues and workers (thread-safe)
|
|
||||||
*/
|
|
||||||
async shutdown(): Promise<void> {
|
|
||||||
// If already shutting down, return the existing promise
|
|
||||||
if (this.shutdownPromise) {
|
|
||||||
return this.shutdownPromise;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (this.isShuttingDown) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.isShuttingDown = true;
|
|
||||||
logger.info('Shutting down QueueManager...');
|
|
||||||
|
|
||||||
this.shutdownPromise = this.performShutdown();
|
|
||||||
return this.shutdownPromise;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Perform the actual shutdown
|
|
||||||
*/
|
|
||||||
private async performShutdown(): Promise<void> {
|
|
||||||
try {
|
|
||||||
// Close all queues (this now includes workers)
|
|
||||||
const queueShutdownPromises = Array.from(this.queues.values()).map(async (queue) => {
|
|
||||||
try {
|
|
||||||
await queue.close();
|
|
||||||
} catch (error) {
|
|
||||||
logger.warn('Error closing queue', { error: error.message });
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
await Promise.all(queueShutdownPromises);
|
|
||||||
|
|
||||||
// Close all caches
|
|
||||||
const cacheShutdownPromises = Array.from(this.caches.values()).map(async (cache) => {
|
|
||||||
try {
|
|
||||||
if (typeof cache.disconnect === 'function') {
|
|
||||||
await cache.disconnect();
|
|
||||||
} else if (typeof cache.close === 'function') {
|
|
||||||
await cache.close();
|
|
||||||
} else if (typeof cache.quit === 'function') {
|
|
||||||
await cache.quit();
|
|
||||||
}
|
|
||||||
} catch (error) {
|
|
||||||
logger.warn('Error closing cache', { error: error.message });
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
await Promise.all(cacheShutdownPromises);
|
|
||||||
|
|
||||||
// Clear collections
|
|
||||||
this.queues.clear();
|
|
||||||
this.caches.clear();
|
|
||||||
|
|
||||||
logger.info('QueueManager shutdown complete');
|
|
||||||
} catch (error) {
|
|
||||||
logger.error('Error during shutdown', { error: error.message });
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ... rest of the methods remain the same ...
|
|
||||||
|
|
||||||
hasQueue(queueName: string): boolean {
|
|
||||||
return this.queues.has(queueName);
|
|
||||||
}
|
|
||||||
|
|
||||||
getQueueNames(): string[] {
|
|
||||||
return Array.from(this.queues.keys());
|
|
||||||
}
|
|
||||||
|
|
||||||
async getGlobalStats(): Promise<GlobalStats> {
|
|
||||||
const queueStats: Record<string, QueueStats> = {};
|
|
||||||
let totalJobs = 0;
|
|
||||||
let totalWorkers = 0;
|
|
||||||
|
|
||||||
for (const [queueName, queue] of this.queues) {
|
|
||||||
const stats = await queue.getStats();
|
|
||||||
queueStats[queueName] = stats;
|
|
||||||
|
|
||||||
totalJobs += stats.waiting + stats.active + stats.completed + stats.failed + stats.delayed;
|
|
||||||
totalWorkers += stats.workers || 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
return {
|
|
||||||
queues: queueStats,
|
|
||||||
totalJobs,
|
|
||||||
totalWorkers,
|
|
||||||
uptime: process.uptime(),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -25,9 +25,11 @@ export class QueueManager {
|
||||||
private rateLimiter?: QueueRateLimiter;
|
private rateLimiter?: QueueRateLimiter;
|
||||||
private redisConnection: ReturnType<typeof getRedisConnection>;
|
private redisConnection: ReturnType<typeof getRedisConnection>;
|
||||||
private isShuttingDown = false;
|
private isShuttingDown = false;
|
||||||
private isInitialized = false;
|
private shutdownPromise: Promise<void> | null = null;
|
||||||
|
private config: QueueManagerConfig;
|
||||||
|
|
||||||
private constructor(private config: QueueManagerConfig) {
|
private constructor(config: QueueManagerConfig) {
|
||||||
|
this.config = config;
|
||||||
this.redisConnection = getRedisConnection(config.redis);
|
this.redisConnection = getRedisConnection(config.redis);
|
||||||
|
|
||||||
// Initialize rate limiter if rules are provided
|
// Initialize rate limiter if rules are provided
|
||||||
|
|
@ -38,25 +40,27 @@ export class QueueManager {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
this.isInitialized = true;
|
logger.info('QueueManager singleton initialized', {
|
||||||
logger.info('QueueManager singleton initialized');
|
redis: `${config.redis.host}:${config.redis.port}`,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the singleton instance
|
* Get the singleton instance
|
||||||
|
* @throws Error if not initialized - use initialize() first
|
||||||
*/
|
*/
|
||||||
static getInstance(config?: QueueManagerConfig): QueueManager {
|
static getInstance(): QueueManager {
|
||||||
if (!QueueManager.instance) {
|
if (!QueueManager.instance) {
|
||||||
if (!config) {
|
throw new Error(
|
||||||
throw new Error('QueueManager not initialized. Provide config on first call.');
|
'QueueManager not initialized. Call QueueManager.initialize(config) first.'
|
||||||
}
|
);
|
||||||
QueueManager.instance = new QueueManager(config);
|
|
||||||
}
|
}
|
||||||
return QueueManager.instance;
|
return QueueManager.instance;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initialize the singleton with config
|
* Initialize the singleton with config
|
||||||
|
* Must be called before getInstance()
|
||||||
*/
|
*/
|
||||||
static initialize(config: QueueManagerConfig): QueueManager {
|
static initialize(config: QueueManagerConfig): QueueManager {
|
||||||
if (QueueManager.instance) {
|
if (QueueManager.instance) {
|
||||||
|
|
@ -67,6 +71,32 @@ export class QueueManager {
|
||||||
return QueueManager.instance;
|
return QueueManager.instance;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get or initialize the singleton
|
||||||
|
* Convenience method that combines initialize and getInstance
|
||||||
|
*/
|
||||||
|
static getOrInitialize(config?: QueueManagerConfig): QueueManager {
|
||||||
|
if (QueueManager.instance) {
|
||||||
|
return QueueManager.instance;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!config) {
|
||||||
|
throw new Error(
|
||||||
|
'QueueManager not initialized and no config provided. ' +
|
||||||
|
'Either call initialize(config) first or provide config to getOrInitialize(config).'
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
return QueueManager.initialize(config);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the QueueManager is initialized
|
||||||
|
*/
|
||||||
|
static isInitialized(): boolean {
|
||||||
|
return QueueManager.instance !== null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reset the singleton (mainly for testing)
|
* Reset the singleton (mainly for testing)
|
||||||
*/
|
*/
|
||||||
|
|
@ -327,9 +357,14 @@ export class QueueManager {
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Shutdown all queues and workers
|
* Shutdown all queues and workers (thread-safe)
|
||||||
*/
|
*/
|
||||||
async shutdown(): Promise<void> {
|
async shutdown(): Promise<void> {
|
||||||
|
// If already shutting down, return the existing promise
|
||||||
|
if (this.shutdownPromise) {
|
||||||
|
return this.shutdownPromise;
|
||||||
|
}
|
||||||
|
|
||||||
if (this.isShuttingDown) {
|
if (this.isShuttingDown) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
@ -337,6 +372,15 @@ export class QueueManager {
|
||||||
this.isShuttingDown = true;
|
this.isShuttingDown = true;
|
||||||
logger.info('Shutting down QueueManager...');
|
logger.info('Shutting down QueueManager...');
|
||||||
|
|
||||||
|
// Create shutdown promise
|
||||||
|
this.shutdownPromise = this.performShutdown();
|
||||||
|
return this.shutdownPromise;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Perform the actual shutdown
|
||||||
|
*/
|
||||||
|
private async performShutdown(): Promise<void> {
|
||||||
try {
|
try {
|
||||||
// Close all queues (this now includes workers since they're managed by Queue class)
|
// Close all queues (this now includes workers since they're managed by Queue class)
|
||||||
const queueShutdownPromises = Array.from(this.queues.values()).map(async (queue) => {
|
const queueShutdownPromises = Array.from(this.queues.values()).map(async (queue) => {
|
||||||
|
|
@ -375,6 +419,10 @@ export class QueueManager {
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error('Error during shutdown', { error: error.message });
|
logger.error('Error during shutdown', { error: error.message });
|
||||||
throw error;
|
throw error;
|
||||||
|
} finally {
|
||||||
|
// Reset shutdown state
|
||||||
|
this.shutdownPromise = null;
|
||||||
|
this.isShuttingDown = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -392,4 +440,11 @@ export class QueueManager {
|
||||||
getRedisConfig() {
|
getRedisConfig() {
|
||||||
return this.config.redis;
|
return this.config.redis;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the current configuration
|
||||||
|
*/
|
||||||
|
getConfig(): Readonly<QueueManagerConfig> {
|
||||||
|
return { ...this.config };
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue