stock-bot/libs/core/queue/src/queue-manager.ts
2025-06-25 11:38:23 -04:00

710 lines
20 KiB
TypeScript

import { Queue as BullQueue, type Job } from 'bullmq';
import type { CacheProvider } from '@stock-bot/cache';
import { createCache } from '@stock-bot/cache';
import type { HandlerRegistry } from '@stock-bot/handler-registry';
import { getLogger } from '@stock-bot/logger';
import { Queue, type QueueWorkerConfig } from './queue';
import { QueueRateLimiter } from './rate-limiter';
import { getFullQueueName, parseQueueName } from './service-utils';
import type {
GlobalStats,
JobData,
JobOptions,
QueueManagerConfig,
QueueOptions,
QueueRoute,
QueueStats,
RateLimitRule,
} from './types';
import { getRedisConnection } from './utils';
// Logger interface for type safety
interface Logger {
info(message: string, meta?: Record<string, unknown>): void;
error(message: string, meta?: Record<string, unknown>): void;
warn(message: string, meta?: Record<string, unknown>): void;
debug(message: string, meta?: Record<string, unknown>): void;
trace(message: string, meta?: Record<string, unknown>): void;
child?(name: string, context?: Record<string, unknown>): Logger;
}
/**
* QueueManager provides unified queue and cache management with service discovery
* Handles both local and cross-service queue operations
*/
export class QueueManager {
private queues = new Map<string, Queue>();
private caches = new Map<string, CacheProvider>();
private rateLimiter?: QueueRateLimiter;
private redisConnection: ReturnType<typeof getRedisConnection>;
private isShuttingDown = false;
private shutdownPromise: Promise<void> | null = null;
private config: QueueManagerConfig;
private readonly logger: Logger;
// Service discovery features
private serviceName?: string;
private queueRoutes = new Map<string, QueueRoute>();
private producerQueues = new Map<string, BullQueue>(); // For cross-service sending
private handlerRegistry?: HandlerRegistry;
constructor(config: QueueManagerConfig, handlerRegistry?: HandlerRegistry, logger?: Logger) {
// Always use DB 0 for queues if service name is provided
if (config.serviceName) {
config = {
...config,
redis: {
...config.redis,
db: 0, // All queues in DB 0 for cross-service communication
},
};
}
this.config = config;
this.serviceName = config.serviceName;
this.handlerRegistry = handlerRegistry;
this.logger = logger || getLogger('QueueManager');
this.redisConnection = getRedisConnection(config.redis);
// Initialize rate limiter if rules are provided
if (config.rateLimitRules && config.rateLimitRules.length > 0) {
this.rateLimiter = new QueueRateLimiter(this.redisConnection, this.logger);
config.rateLimitRules.forEach(rule => {
if (this.rateLimiter) {
this.rateLimiter.addRule(rule);
}
});
}
// Auto-discover routes if enabled and registry provided
if (config.serviceName && config.autoDiscoverHandlers !== false && handlerRegistry) {
this.discoverQueueRoutes();
}
this.logger.info('QueueManager initialized', {
redis: `${config.redis.host}:${config.redis.port}`,
service: this.serviceName,
discoveredRoutes: this.queueRoutes.size,
hasRegistry: !!handlerRegistry,
});
}
/**
* Get or create a queue - unified method that handles both scenarios
* This is the main method for accessing queues
* If serviceName is configured, automatically handles namespacing
*/
getQueue(queueName: string, options: QueueOptions = {}): Queue {
let fullQueueName = queueName;
let isOwnQueue = true;
// Handle service namespacing if service name is configured
if (this.serviceName) {
const parsed = parseQueueName(queueName);
if (parsed) {
// Already in service:handler format
fullQueueName = queueName;
isOwnQueue = parsed.service === this.serviceName;
} else {
// Just handler name, assume it's for current service
fullQueueName = getFullQueueName(this.serviceName, queueName);
isOwnQueue = true;
}
// For cross-service queues, create without workers (producer-only)
if (!isOwnQueue) {
options = {
...options,
workers: 0, // No workers for other services' queues
};
} else {
// For own service queues, include handler registry
options = {
...options,
handlerRegistry: this.handlerRegistry,
};
}
queueName = fullQueueName;
}
// Return existing queue if it exists
if (this.queues.has(queueName)) {
const existingQueue = this.queues.get(queueName);
if (existingQueue) {
return existingQueue;
}
}
// Create new queue with merged options
const mergedOptions = {
...this.config.defaultQueueOptions,
...options,
};
// Prepare queue configuration
const workers = mergedOptions.workers ?? this.config.defaultQueueOptions?.workers ?? 1;
const concurrency =
mergedOptions.concurrency ?? this.config.defaultQueueOptions?.concurrency ?? 1;
const queueConfig: QueueWorkerConfig = {
workers,
concurrency,
startWorker: workers > 0,
handlerRegistry: options.handlerRegistry || this.handlerRegistry,
serviceName: this.config.serviceName,
};
const queue = new Queue(
queueName,
this.config.redis,
mergedOptions.defaultJobOptions || {},
queueConfig,
this.logger
);
// Store the queue
this.queues.set(queueName, queue);
// Automatically initialize batch cache for the queue
this.initializeBatchCacheSync(queueName);
// Add queue-specific rate limit rules
if (this.rateLimiter && mergedOptions.rateLimitRules) {
mergedOptions.rateLimitRules.forEach(rule => {
// Ensure queue name is set for queue-specific rules
const ruleWithQueue = { ...rule, queueName };
if (this.rateLimiter) {
this.rateLimiter.addRule(ruleWithQueue);
}
});
}
this.logger.info('Queue created with batch cache', {
queueName,
originalQueueName: options.handlerRegistry ? 'has-handler-registry' : 'no-handler-registry',
workers: workers,
concurrency: concurrency,
handlerRegistryProvided: !!this.handlerRegistry,
willStartWorkers: workers > 0,
isOwnQueue,
serviceName: this.serviceName,
});
return queue;
}
/**
* Check if a queue exists
*/
hasQueue(queueName: string): boolean {
return this.queues.has(queueName);
}
/**
* Get all queue names
*/
getQueueNames(): string[] {
return Array.from(this.queues.keys());
}
/**
* Get or create a cache for a queue
*/
getCache(queueName: string): CacheProvider {
if (!this.caches.has(queueName)) {
const cacheProvider = createCache({
redisConfig: this.config.redis,
keyPrefix: `batch:${queueName}:`,
ttl: 86400, // 24 hours default
enableMetrics: true,
logger: this.logger,
});
this.caches.set(queueName, cacheProvider);
this.logger.trace('Cache created for queue', { queueName });
}
const cache = this.caches.get(queueName);
if (!cache) {
throw new Error(`Expected cache for queue ${queueName} to exist`);
}
return cache;
}
/**
* Initialize cache for a queue (ensures it's ready)
*/
async initializeCache(queueName: string): Promise<void> {
const cache = this.getCache(queueName);
await cache.waitForReady(10000);
this.logger.info('Cache initialized for queue', { queueName });
}
/**
* Initialize batch cache synchronously (for automatic initialization)
* The cache will be ready for use, but we don't wait for Redis connection
*/
private initializeBatchCacheSync(queueName: string): void {
// Just create the cache - it will connect automatically when first used
this.getCache(queueName);
this.logger.trace('Batch cache initialized synchronously for queue', { queueName });
}
/**
* Get the queues map (for subclasses)
*/
protected getQueues(): Map<string, Queue> {
return this.queues;
}
/**
* Get statistics for all queues
*/
async getGlobalStats(): Promise<GlobalStats> {
const queueStats: Record<string, QueueStats> = {};
let totalJobs = 0;
let totalWorkers = 0;
for (const [queueName, queue] of this.queues) {
const stats = await queue.getStats();
queueStats[queueName] = stats;
totalJobs += stats.waiting + stats.active + stats.completed + stats.failed + stats.delayed;
totalWorkers += stats.workers || 0;
}
return {
queues: queueStats,
totalJobs,
totalWorkers,
uptime: process.uptime(),
};
}
/**
* Get statistics for a specific queue
*/
async getQueueStats(queueName: string): Promise<QueueStats | undefined> {
const queue = this.queues.get(queueName);
if (!queue) {
return undefined;
}
return await queue.getStats();
}
/**
* Add a rate limit rule
*/
addRateLimitRule(rule: RateLimitRule): void {
if (!this.rateLimiter) {
this.rateLimiter = new QueueRateLimiter(this.redisConnection, this.logger);
}
this.rateLimiter.addRule(rule);
}
/**
* Check rate limits for a job
*/
async checkRateLimit(
queueName: string,
handler: string,
operation: string
): Promise<{
allowed: boolean;
retryAfter?: number;
remainingPoints?: number;
appliedRule?: RateLimitRule;
}> {
if (!this.rateLimiter) {
return { allowed: true };
}
return await this.rateLimiter.checkLimit(queueName, handler, operation);
}
/**
* Get rate limit status
*/
async getRateLimitStatus(queueName: string, handler: string, operation: string) {
if (!this.rateLimiter) {
return {
queueName,
handler,
operation,
};
}
return await this.rateLimiter.getStatus(queueName, handler, operation);
}
/**
* Pause all queues
*/
async pauseAll(): Promise<void> {
const pausePromises = Array.from(this.queues.values()).map(queue => queue.pause());
await Promise.all(pausePromises);
this.logger.info('All queues paused');
}
/**
* Resume all queues
*/
async resumeAll(): Promise<void> {
const resumePromises = Array.from(this.queues.values()).map(queue => queue.resume());
await Promise.all(resumePromises);
this.logger.info('All queues resumed');
}
/**
* Pause a specific queue
*/
async pauseQueue(queueName: string): Promise<boolean> {
const queue = this.queues.get(queueName);
if (!queue) {
return false;
}
await queue.pause();
return true;
}
/**
* Resume a specific queue
*/
async resumeQueue(queueName: string): Promise<boolean> {
const queue = this.queues.get(queueName);
if (!queue) {
return false;
}
await queue.resume();
return true;
}
/**
* Drain all queues
*/
async drainAll(delayed = false): Promise<void> {
const drainPromises = Array.from(this.queues.values()).map(queue => queue.drain(delayed));
await Promise.all(drainPromises);
this.logger.info('All queues drained', { delayed });
}
/**
* Clean all queues
*/
async cleanAll(
grace: number = 0,
limit: number = 100,
type: 'completed' | 'failed' = 'completed'
): Promise<void> {
const cleanPromises = Array.from(this.queues.values()).map(queue =>
queue.clean(grace, limit, type)
);
await Promise.all(cleanPromises);
this.logger.info('All queues cleaned', { type, grace, limit });
}
/**
* Shutdown all queues and workers (thread-safe)
*/
async shutdown(): Promise<void> {
// If already shutting down, return the existing promise
if (this.shutdownPromise) {
return this.shutdownPromise;
}
if (this.isShuttingDown) {
return;
}
this.isShuttingDown = true;
this.logger.info('Shutting down QueueManager...');
// Create shutdown promise
this.shutdownPromise = this.performShutdown();
return this.shutdownPromise;
}
/**
* Perform the actual shutdown
*/
private async performShutdown(): Promise<void> {
try {
// Close all queues (this now includes workers since they're managed by Queue class)
const queueShutdownPromises = Array.from(this.queues.values()).map(async queue => {
try {
// Add timeout to queue.close() to prevent hanging
await queue.close();
// const timeoutPromise = new Promise<never>((_, reject) =>
// setTimeout(() => reject(new Error('Queue close timeout')), 100)
// );
// await Promise.race([closePromise, timeoutPromise]);
} catch (error) {
this.logger.warn('Error closing queue', { error: (error as Error).message });
}
});
await Promise.all(queueShutdownPromises);
// Close all caches
const cacheShutdownPromises = Array.from(this.caches.values()).map(async cache => {
try {
// Clear cache before shutdown
await cache.clear();
} catch (error) {
this.logger.warn('Error clearing cache', { error: (error as Error).message });
}
});
await Promise.all(cacheShutdownPromises);
// Clear collections
this.queues.clear();
this.caches.clear();
this.logger.info('QueueManager shutdown complete');
} catch (error) {
this.logger.error('Error during shutdown', { error: (error as Error).message });
throw error;
} finally {
// Reset shutdown state
this.shutdownPromise = null;
this.isShuttingDown = false;
}
}
/**
* Start workers for all queues (used when delayWorkerStart is enabled)
*/
startAllWorkers(): void {
let workersStarted = 0;
const queues = this.queues;
this.logger.info(
`Starting workers for ${queues.size} queues: ${Array.from(queues.keys()).join(', ')} (service: ${this.serviceName})`
);
for (const [queueName, queue] of queues) {
// If we have a service name, check if this queue belongs to us
if (this.serviceName) {
const parsed = parseQueueName(queueName);
// Skip if not our service's queue
if (parsed && parsed.service !== this.serviceName) {
this.logger.trace('Skipping workers for cross-service queue', {
queueName,
ownerService: parsed.service,
currentService: this.serviceName,
});
continue;
}
}
const workerCount = this.config.defaultQueueOptions?.workers || 1;
const concurrency = this.config.defaultQueueOptions?.concurrency || 1;
if (workerCount > 0) {
queue.startWorkersManually(workerCount, concurrency);
workersStarted++;
this.logger.debug('Started workers for queue', {
queueName,
workers: workerCount,
concurrency,
});
}
}
this.logger.info('Service workers started', {
service: this.serviceName || 'default',
totalQueues: queues.size,
queuesWithWorkers: workersStarted,
});
}
/**
* Wait for all queues to be ready
*/
async waitUntilReady(): Promise<void> {
const readyPromises = Array.from(this.queues.values()).map(queue => queue.waitUntilReady());
await Promise.all(readyPromises);
}
/**
* Get Redis configuration (for backward compatibility)
*/
getRedisConfig() {
return this.config.redis;
}
/**
* Get the current configuration
*/
getConfig(): Readonly<QueueManagerConfig> {
return { ...this.config };
}
/**
* Send a job to any queue (local or remote)
* This is the main method for cross-service communication
*/
async send(
targetQueue: string,
operation: string,
payload: unknown,
options: JobOptions = {}
): Promise<Job> {
if (!this.serviceName) {
// If no service name, just use regular queue
const queue = this.getQueue(targetQueue);
return queue.add(operation, { handler: targetQueue, operation, payload }, options);
}
// Resolve the target queue
const route = this.resolveQueueRoute(targetQueue);
if (!route) {
throw new Error(`Unknown queue: ${targetQueue}`);
}
// Validate operation if we have metadata
if (route.operations && !route.operations.includes(operation)) {
this.logger.warn('Operation not found in handler metadata', {
handler: route.handler,
operation,
availableOperations: route.operations,
});
}
// Use a producer queue for cross-service sending
const producerQueue = this.getProducerQueue(route.fullName);
const jobData: JobData = {
handler: route.handler,
operation,
payload,
};
this.logger.trace('Sending job to queue', {
targetQueue: route.fullName,
handler: route.handler,
operation,
fromService: this.serviceName,
toService: route.service,
});
return producerQueue.add(operation, jobData, options);
}
/**
* Resolve a queue name to a route
*/
private resolveQueueRoute(queueName: string): QueueRoute | null {
// Check if it's a full queue name with service prefix
const parsed = parseQueueName(queueName);
if (parsed) {
// Try to find in discovered routes by handler name
const route = this.queueRoutes.get(parsed.handler);
if (route && route.service === parsed.service) {
return route;
}
// Create a route on the fly
return {
fullName: queueName,
service: parsed.service,
handler: parsed.handler,
db: 0, // All queues in DB 0
};
}
// Check if it's just a handler name in our routes
const route = this.queueRoutes.get(queueName);
if (route) {
return route;
}
// Try to find in handler registry
const ownerService = this.handlerRegistry?.getHandlerService(queueName);
if (ownerService) {
return {
fullName: getFullQueueName(ownerService, queueName),
service: ownerService,
handler: queueName,
db: 0, // All queues in DB 0
};
}
return null;
}
/**
* Get a producer queue for sending to other services
*/
private getProducerQueue(queueName: string): BullQueue {
if (!this.producerQueues.has(queueName)) {
const queue = new BullQueue(queueName, {
connection: this.redisConnection,
defaultJobOptions: this.config.defaultQueueOptions?.defaultJobOptions,
});
this.producerQueues.set(queueName, queue);
}
return this.producerQueues.get(queueName)!;
}
/**
* Discover all available queue routes from handler registry
*/
private discoverQueueRoutes(): void {
if (!this.handlerRegistry) {
this.logger.warn('No handler registry provided, skipping route discovery');
return;
}
try {
const handlers = this.handlerRegistry.getAllMetadata();
for (const [handlerName, metadata] of handlers) {
// Get the service that registered this handler
const ownerService = metadata.service;
if (ownerService) {
const fullName = getFullQueueName(ownerService, handlerName);
this.queueRoutes.set(handlerName, {
fullName,
service: ownerService,
handler: handlerName,
db: 0, // All queues in DB 0
operations: metadata.operations.map((op: any) => op.name),
});
this.logger.trace('Discovered queue route', {
handler: handlerName,
service: ownerService,
operations: metadata.operations.length,
});
}
}
// Also discover handlers registered by the current service
if (this.serviceName) {
const myHandlers = this.handlerRegistry.getServiceHandlers(this.serviceName);
for (const metadata of myHandlers) {
const handlerName = metadata.name;
if (!this.queueRoutes.has(handlerName)) {
const fullName = getFullQueueName(this.serviceName, handlerName);
this.queueRoutes.set(handlerName, {
fullName,
service: this.serviceName,
handler: handlerName,
db: 0, // All queues in DB 0
});
}
}
}
this.logger.info('Queue routes discovered', {
totalRoutes: this.queueRoutes.size,
routes: Array.from(this.queueRoutes.keys()),
});
} catch (error) {
this.logger.error('Failed to discover queue routes', { error });
}
}
}