queue work

This commit is contained in:
Boki 2025-06-19 08:22:00 -04:00
parent c05a7413dc
commit d3ef73ae00
9 changed files with 938 additions and 1086 deletions

View file

@ -1,48 +1,20 @@
import { CacheProvider, createCache } from '@stock-bot/cache';
import { getLogger } from '@stock-bot/logger'; import { getLogger } from '@stock-bot/logger';
import type { Queue } from './queue-instance'; import { QueueManager } from './queue-manager';
import type { BatchJobData, BatchResult, JobData, ProcessOptions } from './types'; import type { BatchJobData, BatchResult, JobData, ProcessOptions } from './types';
const logger = getLogger('batch-processor'); const logger = getLogger('batch-processor');
const cacheProviders = new Map<string, CacheProvider>();
function getCache(queueName: string, redisConfig: any): CacheProvider {
if (!cacheProviders.has(queueName)) {
const cacheProvider = createCache({
redisConfig,
keyPrefix: `batch:${queueName}:`,
ttl: 86400, // 24 hours default
enableMetrics: true,
});
cacheProviders.set(queueName, cacheProvider);
}
return cacheProviders.get(queueName) as CacheProvider;
}
/**
* Initialize the batch cache before any batch operations
* This should be called during application startup
*/
export async function initializeBatchCache(queue: Queue): Promise<void> {
const queueName = queue.getName();
const redisConfig = queue.getRedisConfig();
logger.info('Initializing batch cache...', { queueName });
const cache = getCache(queueName, redisConfig);
await cache.waitForReady(10000);
logger.info('Batch cache initialized successfully', { queueName });
}
/** /**
* Main function - processes items either directly or in batches * Main function - processes items either directly or in batches
* Each item becomes payload: item (no processing needed) * Each item becomes payload: item (no processing needed)
*/ */
export async function processItems<T>( export async function processItems<T>(
items: T[], items: T[],
queue: Queue, queueName: string,
options: ProcessOptions options: ProcessOptions
): Promise<BatchResult> { ): Promise<BatchResult> {
const queueManager = QueueManager.getInstance();
const queue = queueManager.getQueue(queueName);
const startTime = Date.now(); const startTime = Date.now();
if (items.length === 0) { if (items.length === 0) {
@ -63,8 +35,8 @@ export async function processItems<T>(
try { try {
const result = options.useBatching const result = options.useBatching
? await processBatched(items, queue, options) ? await processBatched(items, queueName, options)
: await processDirect(items, queue, options); : await processDirect(items, queueName, options);
const duration = Date.now() - startTime; const duration = Date.now() - startTime;
@ -85,9 +57,11 @@ export async function processItems<T>(
*/ */
async function processDirect<T>( async function processDirect<T>(
items: T[], items: T[],
queue: Queue, queueName: string,
options: ProcessOptions options: ProcessOptions
): Promise<Omit<BatchResult, 'duration'>> { ): Promise<Omit<BatchResult, 'duration'>> {
const queueManager = QueueManager.getInstance();
const queue = queueManager.getQueue(queueName);
const totalDelayMs = options.totalDelayHours * 60 * 60 * 1000; // Convert hours to milliseconds const totalDelayMs = options.totalDelayHours * 60 * 60 * 1000; // Convert hours to milliseconds
const delayPerItem = totalDelayMs / items.length; const delayPerItem = totalDelayMs / items.length;
@ -114,7 +88,7 @@ async function processDirect<T>(
}, },
})); }));
const createdJobs = await addJobsInChunks(queue, jobs); const createdJobs = await addJobsInChunks(queueName, jobs);
return { return {
@ -129,9 +103,11 @@ async function processDirect<T>(
*/ */
async function processBatched<T>( async function processBatched<T>(
items: T[], items: T[],
queue: Queue, queueName: string,
options: ProcessOptions options: ProcessOptions
): Promise<Omit<BatchResult, 'duration'>> { ): Promise<Omit<BatchResult, 'duration'>> {
const queueManager = QueueManager.getInstance();
const queue = queueManager.getQueue(queueName);
const batchSize = options.batchSize || 100; const batchSize = options.batchSize || 100;
const batches = createBatches(items, batchSize); const batches = createBatches(items, batchSize);
const totalDelayMs = options.totalDelayHours * 60 * 60 * 1000; // Convert hours to milliseconds const totalDelayMs = options.totalDelayHours * 60 * 60 * 1000; // Convert hours to milliseconds
@ -147,7 +123,7 @@ async function processBatched<T>(
const batchJobs = await Promise.all( const batchJobs = await Promise.all(
batches.map(async (batch, batchIndex) => { batches.map(async (batch, batchIndex) => {
// Just store the items directly - no processing needed // Just store the items directly - no processing needed
const payloadKey = await storeItems(batch, queue, options); const payloadKey = await storeItems(batch, queueName, options);
return { return {
name: 'process-batch', name: 'process-batch',
@ -174,7 +150,7 @@ async function processBatched<T>(
}) })
); );
const createdJobs = await addJobsInChunks(queue, batchJobs); const createdJobs = await addJobsInChunks(queueName, batchJobs);
return { return {
totalItems: items.length, totalItems: items.length,
@ -189,8 +165,10 @@ async function processBatched<T>(
*/ */
export async function processBatchJob( export async function processBatchJob(
jobData: BatchJobData, jobData: BatchJobData,
queue: Queue queueName: string
): Promise<unknown> { ): Promise<unknown> {
const queueManager = QueueManager.getInstance();
const queue = queueManager.getQueue(queueName);
const { payloadKey, batchIndex, totalBatches, itemCount } = jobData; const { payloadKey, batchIndex, totalBatches, itemCount } = jobData;
logger.debug('Processing batch job', { logger.debug('Processing batch job', {
@ -200,7 +178,7 @@ export async function processBatchJob(
}); });
try { try {
const payload = await loadPayload(payloadKey, queue); const payload = await loadPayload(payloadKey, queueName);
if (!payload || !payload.items || !payload.options) { if (!payload || !payload.items || !payload.options) {
logger.error('Invalid payload data', { payloadKey, payload }); logger.error('Invalid payload data', { payloadKey, payload });
throw new Error(`Invalid payload data for key: ${payloadKey}`); throw new Error(`Invalid payload data for key: ${payloadKey}`);
@ -225,10 +203,10 @@ export async function processBatchJob(
}, },
})); }));
const createdJobs = await addJobsInChunks(queue, jobs); const createdJobs = await addJobsInChunks(queueName, jobs);
// Cleanup payload after successful processing // Cleanup payload after successful processing
await cleanupPayload(payloadKey, queue); await cleanupPayload(payloadKey, queueName);
return { return {
batchIndex, batchIndex,
@ -253,14 +231,11 @@ function createBatches<T>(items: T[], batchSize: number): T[][] {
async function storeItems<T>( async function storeItems<T>(
items: T[], items: T[],
queue: Queue, queueName: string,
options: ProcessOptions options: ProcessOptions
): Promise<string> { ): Promise<string> {
if (!queue) { const queueManager = QueueManager.getInstance();
throw new Error('Batch cache not initialized. Call initializeBatchCache() first.'); const cache = queueManager.getCache(queueName);
}
const cache = getCache(queue.getName(), queue.getRedisConfig());
const payloadKey = `payload:${Date.now()}:${Math.random().toString(36).substr(2, 9)}`; const payloadKey = `payload:${Date.now()}:${Math.random().toString(36).substr(2, 9)}`;
const payload = { const payload = {
@ -283,7 +258,7 @@ async function storeItems<T>(
async function loadPayload<T>( async function loadPayload<T>(
key: string, key: string,
queue: Queue queueName: string
): Promise<{ ): Promise<{
items: T[]; items: T[];
options: { options: {
@ -294,11 +269,8 @@ async function loadPayload<T>(
operation: string; operation: string;
}; };
} | null> { } | null> {
if (!queue) { const queueManager = QueueManager.getInstance();
throw new Error('Batch cache not initialized. Call initializeBatchCache() first.'); const cache = queueManager.getCache(queueName);
}
const cache = getCache(queue.getName(), queue.getRedisConfig());
return (await cache.get(key)) as { return (await cache.get(key)) as {
items: T[]; items: T[];
options: { options: {
@ -311,20 +283,19 @@ async function loadPayload<T>(
} | null; } | null;
} }
async function cleanupPayload(key: string, queue: Queue): Promise<void> { async function cleanupPayload(key: string, queueName: string): Promise<void> {
if (!queue) { const queueManager = QueueManager.getInstance();
throw new Error('Batch cache not initialized. Call initializeBatchCache() first.'); const cache = queueManager.getCache(queueName);
}
const cache = getCache(queue.getName(), queue.getRedisConfig());
await cache.del(key); await cache.del(key);
} }
async function addJobsInChunks( async function addJobsInChunks(
queue: Queue, queueName: string,
jobs: Array<{ name: string; data: JobData; opts?: Record<string, unknown> }>, jobs: Array<{ name: string; data: JobData; opts?: Record<string, unknown> }>,
chunkSize = 100 chunkSize = 100
): Promise<unknown[]> { ): Promise<unknown[]> {
const queueManager = QueueManager.getInstance();
const queue = queueManager.getQueue(queueName);
const allCreatedJobs = []; const allCreatedJobs = [];
for (let i = 0; i < jobs.length; i += chunkSize) { for (let i = 0; i < jobs.length; i += chunkSize) {

View file

@ -1,7 +1,6 @@
export * from './batch-processor'; export * from './batch-processor';
export * from './handler-registry'; export * from './handler-registry';
export * from './queue-manager'; export * from './queue-manager';
export * from './queue-instance';
export * from './queue-factory'; export * from './queue-factory';
export * from './types'; export * from './types';
export * from './dlq-handler'; export * from './dlq-handler';
@ -9,10 +8,10 @@ export * from './queue-metrics';
export * from './rate-limiter'; export * from './rate-limiter';
// Re-export commonly used functions // Re-export commonly used functions
export { initializeBatchCache, processBatchJob, processItems } from './batch-processor'; export { processBatchJob, processItems } from './batch-processor';
export { QueueManager } from './queue-manager'; export { QueueManager } from './queue-manager';
export { Queue } from './queue-instance'; export { Queue, type QueueConfig } from './queue';
export { handlerRegistry } from './handler-registry'; export { handlerRegistry } from './handler-registry';

View file

@ -1,18 +1,14 @@
import { getLogger } from '@stock-bot/logger'; import { getLogger } from '@stock-bot/logger';
import { QueueManager } from './queue-manager'; import { QueueManager } from './queue-manager';
import { Queue } from './queue-instance'; import { Queue } from './queue';
import type { ProcessOptions, BatchResult } from './types'; import { processItems } from './batch-processor';
import type { ProcessOptions, BatchResult, QueueManagerConfig } from './types';
const logger = getLogger('queue-factory'); const logger = getLogger('queue-factory');
// Global queue manager (manages workers and handlers)
let queueManager: QueueManager | null = null;
// Registry of individual queues
const queues = new Map<string, Queue>();
let globalRedisConfig: any = null;
/** /**
* Initialize the queue system with global configuration * Initialize the queue system with global configuration
* This now uses the singleton QueueManager pattern
*/ */
export async function initializeQueueSystem(config: { export async function initializeQueueSystem(config: {
redis: any; redis: any;
@ -22,68 +18,55 @@ export async function initializeQueueSystem(config: {
}): Promise<void> { }): Promise<void> {
logger.info('Initializing global queue system...'); logger.info('Initializing global queue system...');
globalRedisConfig = config.redis; const queueManagerConfig: QueueManagerConfig = {
redis: config.redis,
// Initialize the global queue manager for worker management defaultQueueOptions: {
queueManager = new QueueManager({ defaultJobOptions: config.defaultJobOptions,
queueName: 'system-queue-manager', workers: config.workers || 5,
redis: globalRedisConfig, concurrency: config.concurrency || 20,
workers: config.workers || 5, },
concurrency: config.concurrency || 20, };
defaultJobOptions: config.defaultJobOptions,
handlers: [], // Will be set by individual services
});
await queueManager.initialize(); // Initialize the singleton QueueManager
QueueManager.initialize(queueManagerConfig);
logger.info('Queue system initialized'); logger.info('Queue system initialized with singleton QueueManager');
} }
/** /**
* Get or create a queue for the given queue name * Get or create a queue for the given queue name
* Now uses the singleton QueueManager
*/ */
export function getQueue(queueName: string): Queue { export function getQueue(queueName: string): Queue {
if (!globalRedisConfig) { const queueManager = QueueManager.getInstance();
throw new Error('Queue system not initialized. Call initializeQueueSystem() first.'); return queueManager.getQueue(queueName);
}
if (!queues.has(queueName)) {
logger.info(`Creating new queue: ${queueName}`);
const queue = new Queue(queueName, globalRedisConfig);
queues.set(queueName, queue);
}
return queues.get(queueName)!;
} }
/** /**
* Process items using the specified queue * Process items using the specified queue
* Now uses the batch processor directly
*/ */
export async function processItemsWithQueue<T>( export async function processItemsWithQueue<T>(
queueName: string, queueName: string,
items: T[], items: T[],
options: ProcessOptions options: ProcessOptions
): Promise<BatchResult> { ): Promise<BatchResult> {
const queue = getQueue(queueName); return processItems(items, queueName, options);
return queue.processItems(items, options);
} }
/** /**
* Get all active queue names * Get all active queue names
*/ */
export function getActiveQueueNames(): string[] { export function getActiveQueueNames(): string[] {
return Array.from(queues.keys()); const queueManager = QueueManager.getInstance();
return queueManager.getQueueNames();
} }
/** /**
* Get the global queue manager (for advanced operations) * Get the global queue manager (for advanced operations)
*/ */
export function getQueueManager(): QueueManager { export function getQueueManager(): QueueManager {
if (!queueManager) { return QueueManager.getInstance();
throw new Error('Queue system not initialized. Call initializeQueueSystem() first.');
}
return queueManager;
} }
/** /**
@ -92,21 +75,8 @@ export function getQueueManager(): QueueManager {
export async function shutdownAllQueues(): Promise<void> { export async function shutdownAllQueues(): Promise<void> {
logger.info('Shutting down all queues...'); logger.info('Shutting down all queues...');
// Shutdown individual queues // Reset the singleton QueueManager (handles all cleanup)
const queueShutdownPromises = Array.from(queues.values()).map(queue => await QueueManager.reset();
queue.shutdown().catch(error => {
logger.error('Error shutting down queue', { error });
})
);
await Promise.all(queueShutdownPromises);
queues.clear();
// Shutdown the global queue manager
if (queueManager) {
await queueManager.shutdown();
queueManager = null;
}
logger.info('All queues shut down'); logger.info('All queues shut down');
} }

View file

@ -1,273 +0,0 @@
import { Queue as BullQueue, Worker, QueueEvents, type Job } from 'bullmq';
import { getLogger } from '@stock-bot/logger';
import { processItems, processBatchJob } from './batch-processor';
import { handlerRegistry } from './handler-registry';
import type { JobData, ProcessOptions, BatchResult, BatchJobData } from './types';
import { getRedisConnection } from './utils';
const logger = getLogger('queue-instance');
export class Queue {
private bullQueue: BullQueue;
private workers: Worker[] = [];
private queueEvents: QueueEvents;
private queueName: string;
private redisConfig: any;
private initialized = false;
constructor(queueName: string, redisConfig: any, options: { startWorker?: boolean } = {}) {
this.queueName = queueName;
this.redisConfig = redisConfig;
const connection = getRedisConnection(redisConfig);
// Initialize BullMQ queue
this.bullQueue = new BullQueue(`{${queueName}}`, {
connection,
defaultJobOptions: {
removeOnComplete: 10,
removeOnFail: 5,
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
},
});
// Initialize queue events
this.queueEvents = new QueueEvents(`{${queueName}}`, { connection });
// Start a worker for this queue unless explicitly disabled
if (options.startWorker !== false) {
this.startWorker();
}
}
/**
* Get the queue name
*/
getName(): string {
return this.queueName;
}
/**
* Get the redis configuration
*/
getRedisConfig(): any {
return this.redisConfig;
}
/**
* Initialize batch cache for this queue
*/
async initialize(): Promise<void> {
if (this.initialized) {
return;
}
const { initializeBatchCache } = await import('./batch-processor');
await initializeBatchCache(this);
this.initialized = true;
logger.info(`Queue initialized: ${this.queueName}`);
}
/**
* Process items using this queue
*/
async processItems<T>(items: T[], options: ProcessOptions): Promise<BatchResult> {
// Ensure queue is initialized
if (!this.initialized) {
await this.initialize();
}
return processItems(items, this, options);
}
/**
* Add a single job to the queue
*/
async add(name: string, data: JobData, options: Record<string, unknown> = {}): Promise<Job> {
return await this.bullQueue.add(name, data, options);
}
/**
* Add multiple jobs to the queue in bulk
*/
async addBulk(
jobs: Array<{ name: string; data: JobData; opts?: Record<string, unknown> }>
): Promise<Job[]> {
const createdJobs = await this.bullQueue.addBulk(jobs);
return createdJobs;
}
/**
* Get queue statistics
*/
async getStats(): Promise<{
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
}> {
const [waiting, active, completed, failed, delayed] = await Promise.all([
this.bullQueue.getWaiting(),
this.bullQueue.getActive(),
this.bullQueue.getCompleted(),
this.bullQueue.getFailed(),
this.bullQueue.getDelayed(),
]);
return {
waiting: waiting.length,
active: active.length,
completed: completed.length,
failed: failed.length,
delayed: delayed.length,
};
}
/**
* Pause the queue
*/
async pause(): Promise<void> {
await this.bullQueue.pause();
logger.info(`Queue paused: ${this.queueName}`);
}
/**
* Resume the queue
*/
async resume(): Promise<void> {
await this.bullQueue.resume();
logger.info(`Queue resumed: ${this.queueName}`);
}
/**
* Clean completed and failed jobs
*/
async clean(grace: number = 0, limit: number = 100): Promise<void> {
await Promise.all([
this.bullQueue.clean(grace, limit, 'completed'),
this.bullQueue.clean(grace, limit, 'failed'),
]);
logger.info(`Queue cleaned: ${this.queueName}`, { grace, limit });
}
/**
* Shutdown this queue
*/
async shutdown(): Promise<void> {
logger.info(`Shutting down queue: ${this.queueName}`);
try {
// Close workers
await Promise.all(this.workers.map(worker => worker.close()));
this.workers = [];
// Close queue events
await this.queueEvents.close();
// Close queue
await this.bullQueue.close();
logger.info(`Queue shutdown complete: ${this.queueName}`);
} catch (error) {
logger.error(`Error during queue shutdown: ${this.queueName}`, { error });
throw error;
}
}
/**
* Start a worker for this queue
*/
private startWorker(): void {
const connection = getRedisConnection(this.redisConfig);
const worker = new Worker(`{${this.queueName}}`, this.processJob.bind(this), {
connection,
concurrency: 20,
});
worker.on('completed', job => {
logger.debug('Job completed', {
id: job.id,
name: job.name,
queue: this.queueName,
});
});
worker.on('failed', (job, err) => {
logger.error('Job failed', {
id: job?.id,
name: job?.name,
queue: this.queueName,
error: err.message,
});
});
this.workers.push(worker);
logger.info(`Started worker for queue: ${this.queueName}`);
}
/**
* Process a job
*/
private async processJob(job: Job) {
const { handler, operation, payload }: JobData = job.data;
logger.info('Processing job', {
id: job.id,
handler,
operation,
queue: this.queueName,
payloadKeys: Object.keys(payload || {}),
});
try {
let result;
if (operation === 'process-batch-items') {
// Special handling for batch processing
result = await processBatchJob(payload as BatchJobData, this);
} else {
// Regular handler lookup
const jobHandler = handlerRegistry.getHandler(handler, operation);
if (!jobHandler) {
throw new Error(`No handler found for ${handler}:${operation}`);
}
result = await jobHandler(payload);
}
logger.info('Job completed successfully', {
id: job.id,
handler,
operation,
queue: this.queueName,
});
return result;
} catch (error) {
logger.error('Job processing failed', {
id: job.id,
handler,
operation,
queue: this.queueName,
error: error instanceof Error ? error.message : String(error),
});
throw error;
}
}
/**
* Get the BullMQ queue instance (for advanced operations)
*/
getBullQueue(): BullQueue {
return this.bullQueue;
}
}

View file

@ -1,578 +1,394 @@
import { Queue, QueueEvents, Worker, type Job } from 'bullmq';
import { getLogger } from '@stock-bot/logger'; import { getLogger } from '@stock-bot/logger';
import { DeadLetterQueueHandler } from './dlq-handler'; import { QueueRateLimiter } from './rate-limiter';
import { handlerRegistry } from './handler-registry'; import { Queue, type QueueConfig } from './queue';
import { QueueMetricsCollector } from './queue-metrics'; import { CacheProvider, createCache } from '@stock-bot/cache';
import { QueueRateLimiter, type RateLimitRule } from './rate-limiter'; import type {
import type { HandlerConfig, HandlerInitializer, JobData, QueueConfig } from './types'; QueueManagerConfig,
QueueOptions,
GlobalStats,
QueueStats,
RateLimitRule
} from './types';
import { getRedisConnection } from './utils'; import { getRedisConnection } from './utils';
const logger = getLogger('queue-manager'); const logger = getLogger('queue-manager');
/**
* Singleton QueueManager that provides unified queue and cache management
* Main entry point for all queue operations with getQueue() method
*/
export class QueueManager { export class QueueManager {
private queue!: Queue; private static instance: QueueManager | null = null;
private workers: Worker[] = []; private queues = new Map<string, Queue>();
private queueEvents!: QueueEvents; private caches = new Map<string, CacheProvider>();
private config: Required<QueueConfig>;
private handlers: HandlerInitializer[];
private enableScheduledJobs: boolean;
private dlqHandler?: DeadLetterQueueHandler;
private metricsCollector?: QueueMetricsCollector;
private rateLimiter?: QueueRateLimiter; private rateLimiter?: QueueRateLimiter;
private redisConnection: any;
private isShuttingDown = false;
private isInitialized = false;
private get isInitialized() { private constructor(private config: QueueManagerConfig) {
return !!this.queue; this.redisConnection = getRedisConnection(config.redis);
// Initialize rate limiter if rules are provided
if (config.rateLimitRules && config.rateLimitRules.length > 0) {
this.rateLimiter = new QueueRateLimiter(this.redisConnection);
config.rateLimitRules.forEach(rule => {
this.rateLimiter!.addRule(rule);
});
}
this.isInitialized = true;
logger.info('QueueManager singleton initialized');
} }
/** /**
* Get the queue name * Get the singleton instance
*/ */
get queueName(): string { static getInstance(config?: QueueManagerConfig): QueueManager {
return this.config.queueName; if (!QueueManager.instance) {
} if (!config) {
throw new Error('QueueManager not initialized. Provide config on first call.');
constructor(config: QueueConfig = {}) { }
// Enhanced configuration QueueManager.instance = new QueueManager(config);
this.handlers = config.handlers || []; }
this.enableScheduledJobs = config.enableScheduledJobs ?? true; return QueueManager.instance;
// Set default configuration
this.config = {
workers: config.workers ?? 5,
concurrency: config.concurrency ?? 20,
redis: {
host: config.redis?.host || 'localhost',
port: config.redis?.port || 6379,
password: config.redis?.password || '',
db: config.redis?.db || 0,
},
queueName: config.queueName || 'default-queue',
defaultJobOptions: {
removeOnComplete: 10,
removeOnFail: 5,
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
...config.defaultJobOptions,
},
handlers: this.handlers,
enableScheduledJobs: this.enableScheduledJobs,
enableRateLimit: config.enableRateLimit || false,
globalRateLimit: config.globalRateLimit,
enableDLQ: config.enableDLQ || false,
dlqConfig: config.dlqConfig,
enableMetrics: config.enableMetrics || false,
rateLimitRules: config.rateLimitRules || [],
};
} }
/** /**
* Initialize the queue manager with enhanced handler and scheduled job support * Initialize the singleton with config
*/ */
async initialize(): Promise<void> { static initialize(config: QueueManagerConfig): QueueManager {
if (this.isInitialized) { if (QueueManager.instance) {
logger.warn('Queue manager already initialized'); logger.warn('QueueManager already initialized, returning existing instance');
return; return QueueManager.instance;
}
QueueManager.instance = new QueueManager(config);
return QueueManager.instance;
}
/**
* Reset the singleton (mainly for testing)
*/
static async reset(): Promise<void> {
if (QueueManager.instance) {
await QueueManager.instance.shutdown();
QueueManager.instance = null;
}
}
/**
* Get or create a queue - unified method that handles both scenarios
* This is the main method for accessing queues
*/
getQueue(queueName: string, options: QueueOptions = {}): Queue {
// Return existing queue if it exists
if (this.queues.has(queueName)) {
return this.queues.get(queueName)!;
} }
logger.info('Initializing enhanced queue manager...', { // Create new queue with merged options
queueName: this.config.queueName, const mergedOptions = {
workers: this.config.workers, ...this.config.defaultQueueOptions,
concurrency: this.config.concurrency, ...options,
handlers: this.handlers.length, };
enableScheduledJobs: this.enableScheduledJobs,
// Prepare queue configuration
const queueConfig: QueueConfig = {
workers: mergedOptions.workers,
concurrency: mergedOptions.concurrency,
startWorker: mergedOptions.workers && mergedOptions.workers > 0,
};
const queue = new Queue(
queueName,
this.config.redis,
mergedOptions.defaultJobOptions || {},
queueConfig
);
// Store the queue
this.queues.set(queueName, queue);
// Automatically initialize batch cache for the queue
this.initializeBatchCacheSync(queueName);
// Add queue-specific rate limit rules
if (this.rateLimiter && mergedOptions.rateLimitRules) {
mergedOptions.rateLimitRules.forEach(rule => {
// Ensure queue name is set for queue-specific rules
const ruleWithQueue = { ...rule, queueName };
this.rateLimiter!.addRule(ruleWithQueue);
});
}
logger.info('Queue created with batch cache', {
queueName,
workers: mergedOptions.workers || 0,
concurrency: mergedOptions.concurrency || 1
}); });
try { return queue;
// Step 1: Register all handlers }
await this.registerHandlers();
// Step 2: Initialize core queue infrastructure /**
const connection = this.getConnection(); * Check if a queue exists
const queueName = `{${this.config.queueName}}`; */
hasQueue(queueName: string): boolean {
return this.queues.has(queueName);
}
// Initialize queue /**
this.queue = new Queue(queueName, { * Get all queue names
connection, */
defaultJobOptions: this.config.defaultJobOptions, getQueueNames(): string[] {
return Array.from(this.queues.keys());
}
/**
* Get or create a cache for a queue
*/
getCache(queueName: string): CacheProvider {
if (!this.caches.has(queueName)) {
const cacheProvider = createCache({
redisConfig: this.config.redis,
keyPrefix: `batch:${queueName}:`,
ttl: 86400, // 24 hours default
enableMetrics: true,
}); });
this.caches.set(queueName, cacheProvider);
// Initialize queue events logger.debug('Cache created for queue', { queueName });
this.queueEvents = new QueueEvents(queueName, { connection });
// Wait for queue to be ready
await this.queue.waitUntilReady();
// Step 3: Initialize DLQ handler if enabled
if (this.config.enableDLQ) {
this.dlqHandler = new DeadLetterQueueHandler(this.queue, connection, this.config.dlqConfig);
}
// Step 4: Initialize metrics collector if enabled
if (this.config.enableMetrics) {
this.metricsCollector = new QueueMetricsCollector(this.queue, this.queueEvents);
}
// Step 5: Initialize rate limiter if enabled
if (this.config.enableRateLimit && this.config.rateLimitRules) {
const redis = await this.getRedisClient();
this.rateLimiter = new QueueRateLimiter(redis);
// Add configured rate limit rules
for (const rule of this.config.rateLimitRules) {
this.rateLimiter.addRule(rule);
}
}
// Step 6: Start workers
await this.startWorkers();
// Step 7: Setup event listeners
this.setupEventListeners();
// Step 8: Batch cache will be initialized by individual Queue instances
// Step 9: Set up scheduled jobs
if (this.enableScheduledJobs) {
await this.setupScheduledJobs();
}
logger.info('Enhanced queue manager initialized successfully');
} catch (error) {
logger.error('Failed to initialize enhanced queue manager', { error });
throw error;
} }
return this.caches.get(queueName)!;
} }
/** /**
* Register all configured handlers * Initialize cache for a queue (ensures it's ready)
*/ */
private async registerHandlers(): Promise<void> { async initializeCache(queueName: string): Promise<void> {
logger.info('Registering queue handlers...', { count: this.handlers.length }); const cache = this.getCache(queueName);
await cache.waitForReady(10000);
// Initialize handlers using the configured handler initializers logger.info('Cache initialized for queue', { queueName });
for (const handlerInitializer of this.handlers) {
try {
await handlerInitializer();
} catch (error) {
logger.error('Failed to initialize handler', { error });
throw error;
}
}
// Now register all handlers from the registry with the queue manager
const allHandlers = handlerRegistry.getAllHandlers();
for (const [handlerName, config] of allHandlers) {
this.registerHandler(handlerName, config.operations);
logger.info(`Registered handler: ${handlerName}`);
}
// Log scheduled jobs
const scheduledJobs = handlerRegistry.getAllScheduledJobs();
logger.info(`Registered ${scheduledJobs.length} scheduled jobs across all handlers`);
for (const { handler, job } of scheduledJobs) {
logger.info(
`Scheduled job: ${handler}.${job.type} - ${job.description} (${job.cronPattern})`
);
}
logger.info('All handlers registered successfully');
} }
/** /**
* Set up scheduled jobs from handler registry * Initialize batch cache synchronously (for automatic initialization)
* The cache will be ready for use, but we don't wait for Redis connection
*/ */
private async setupScheduledJobs(): Promise<void> { private initializeBatchCacheSync(queueName: string): void {
const scheduledJobs = handlerRegistry.getAllScheduledJobs(); // Just create the cache - it will connect automatically when first used
this.getCache(queueName);
logger.debug('Batch cache initialized synchronously for queue', { queueName });
}
if (scheduledJobs.length === 0) { /**
logger.info('No scheduled jobs found'); * Get statistics for all queues
return; */
async getGlobalStats(): Promise<GlobalStats> {
const queueStats: Record<string, QueueStats> = {};
let totalJobs = 0;
let totalWorkers = 0;
for (const [queueName, queue] of this.queues) {
const stats = await queue.getStats();
queueStats[queueName] = stats;
totalJobs += stats.waiting + stats.active + stats.completed + stats.failed + stats.delayed;
totalWorkers += stats.workers || 0;
} }
logger.info(`Setting up ${scheduledJobs.length} scheduled jobs...`);
for (const { handler, job } of scheduledJobs) {
try {
const jobData: JobData = {
type: job.type,
handler,
operation: job.operation,
payload: job.payload,
priority: job.priority,
};
await this.add(`recurring-${handler}-${job.operation}`, jobData, {
repeat: {
pattern: job.cronPattern,
tz: 'UTC',
immediately: job.immediately || false,
},
delay: job.delay || 0,
removeOnComplete: 1,
removeOnFail: 1,
attempts: 2,
backoff: {
type: 'fixed',
delay: 5000,
},
});
logger.info(`Scheduled job registered: ${handler}.${job.type} (${job.cronPattern})`);
} catch (error) {
logger.error(`Failed to register scheduled job: ${handler}.${job.type}`, { error });
}
}
logger.info('Scheduled jobs setup complete');
}
/**
* Register a handler with its operations
*/
registerHandler(handlerName: string, config: HandlerConfig): void {
handlerRegistry.register(handlerName, config);
}
/**
* Add a single job to the queue
*/
async add(name: string, data: JobData, options: Record<string, unknown> = {}): Promise<Job> {
this.ensureInitialized();
return await this.queue.add(name, data, options);
}
/**
* Add multiple jobs to the queue in bulk
*/
async addBulk(
jobs: Array<{ name: string; data: JobData; opts?: Record<string, unknown> }>
): Promise<Job[]> {
this.ensureInitialized();
return await this.queue.addBulk(jobs);
}
/**
* Get queue statistics
*/
async getStats(): Promise<{
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
}> {
this.ensureInitialized();
const [waiting, active, completed, failed, delayed] = await Promise.all([
this.queue.getWaiting(),
this.queue.getActive(),
this.queue.getCompleted(),
this.queue.getFailed(),
this.queue.getDelayed(),
]);
return { return {
waiting: waiting.length, queues: queueStats,
active: active.length, totalJobs,
completed: completed.length, totalWorkers,
failed: failed.length, uptime: process.uptime(),
delayed: delayed.length,
}; };
} }
/** /**
* Pause the queue * Get statistics for a specific queue
*/ */
async pause(): Promise<void> { async getQueueStats(queueName: string): Promise<QueueStats | undefined> {
this.ensureInitialized(); const queue = this.queues.get(queueName);
await this.queue.pause(); if (!queue) {
logger.info('Queue paused'); return undefined;
}
/**
* Resume the queue
*/
async resume(): Promise<void> {
this.ensureInitialized();
await this.queue.resume();
logger.info('Queue resumed');
}
/**
* Clean completed and failed jobs
*/
async clean(grace: number = 0, limit: number = 100): Promise<void> {
this.ensureInitialized();
await Promise.all([
this.queue.clean(grace, limit, 'completed'),
this.queue.clean(grace, limit, 'failed'),
]);
logger.info('Queue cleaned', { grace, limit });
}
/**
* Get the queue name
*/
getQueueName(): string {
return this.config.queueName;
}
/**
* Get the redis configuration
*/
getRedisConfig(): any {
return this.config.redis;
}
/**
* Get queue metrics
*/
async getMetrics() {
if (!this.metricsCollector) {
throw new Error('Metrics not enabled. Set enableMetrics: true in config');
} }
return this.metricsCollector.collect(); return await queue.getStats();
} }
/** /**
* Get metrics report * Add a rate limit rule
*/
async getMetricsReport(): Promise<string> {
if (!this.metricsCollector) {
throw new Error('Metrics not enabled. Set enableMetrics: true in config');
}
return this.metricsCollector.getReport();
}
/**
* Get DLQ stats
*/
async getDLQStats() {
if (!this.dlqHandler) {
throw new Error('DLQ not enabled. Set enableDLQ: true in config');
}
return this.dlqHandler.getStats();
}
/**
* Retry jobs from DLQ
*/
async retryDLQJobs(limit = 10) {
if (!this.dlqHandler) {
throw new Error('DLQ not enabled. Set enableDLQ: true in config');
}
return this.dlqHandler.retryDLQJobs(limit);
}
/**
* Add rate limit rule
*/ */
addRateLimitRule(rule: RateLimitRule): void { addRateLimitRule(rule: RateLimitRule): void {
if (!this.rateLimiter) { if (!this.rateLimiter) {
throw new Error('Rate limiting not enabled. Set enableRateLimit: true in config'); this.rateLimiter = new QueueRateLimiter(this.redisConnection);
} }
this.rateLimiter.addRule(rule); this.rateLimiter.addRule(rule);
} }
/** /**
* Get rate limit status * Check rate limits for a job
*/ */
async getRateLimitStatus(handler: string, operation: string) { async checkRateLimit(queueName: string, handler: string, operation: string): Promise<{
allowed: boolean;
retryAfter?: number;
remainingPoints?: number;
appliedRule?: RateLimitRule;
}> {
if (!this.rateLimiter) { if (!this.rateLimiter) {
throw new Error('Rate limiting not enabled. Set enableRateLimit: true in config'); return { allowed: true };
} }
return this.rateLimiter.getStatus(handler, operation);
return await this.rateLimiter.checkLimit(queueName, handler, operation);
} }
/** /**
* Shutdown the queue manager * Get rate limit status
*/
async getRateLimitStatus(queueName: string, handler: string, operation: string) {
if (!this.rateLimiter) {
return {
queueName,
handler,
operation,
};
}
return await this.rateLimiter.getStatus(queueName, handler, operation);
}
/**
* Pause all queues
*/
async pauseAll(): Promise<void> {
const pausePromises = Array.from(this.queues.values()).map(queue => queue.pause());
await Promise.all(pausePromises);
logger.info('All queues paused');
}
/**
* Resume all queues
*/
async resumeAll(): Promise<void> {
const resumePromises = Array.from(this.queues.values()).map(queue => queue.resume());
await Promise.all(resumePromises);
logger.info('All queues resumed');
}
/**
* Pause a specific queue
*/
async pauseQueue(queueName: string): Promise<boolean> {
const queue = this.queues.get(queueName);
if (!queue) {
return false;
}
await queue.pause();
return true;
}
/**
* Resume a specific queue
*/
async resumeQueue(queueName: string): Promise<boolean> {
const queue = this.queues.get(queueName);
if (!queue) {
return false;
}
await queue.resume();
return true;
}
/**
* Drain all queues
*/
async drainAll(delayed = false): Promise<void> {
const drainPromises = Array.from(this.queues.values()).map(queue => queue.drain(delayed));
await Promise.all(drainPromises);
logger.info('All queues drained', { delayed });
}
/**
* Clean all queues
*/
async cleanAll(
grace: number = 0,
limit: number = 100,
type: 'completed' | 'failed' = 'completed'
): Promise<void> {
const cleanPromises = Array.from(this.queues.values()).map(queue =>
queue.clean(grace, limit, type)
);
await Promise.all(cleanPromises);
logger.info('All queues cleaned', { type, grace, limit });
}
/**
* Shutdown all queues and workers
*/ */
async shutdown(): Promise<void> { async shutdown(): Promise<void> {
logger.info('Shutting down queue manager...'); if (this.isShuttingDown) {
return;
const shutdownTasks: Promise<void>[] = [];
try {
// Shutdown DLQ handler
if (this.dlqHandler) {
shutdownTasks.push(
this.dlqHandler.shutdown().catch(err =>
logger.warn('Error shutting down DLQ handler', { error: err })
)
);
}
// Close workers
if (this.workers.length > 0) {
shutdownTasks.push(
Promise.all(
this.workers.map(worker =>
worker.close().catch(err =>
logger.warn('Error closing worker', { error: err })
)
)
).then(() => {
this.workers = [];
})
);
}
// Close queue events
if (this.queueEvents) {
shutdownTasks.push(
this.queueEvents.close().catch(err =>
logger.warn('Error closing queue events', { error: err })
)
);
}
// Close queue
if (this.queue) {
shutdownTasks.push(
this.queue.close().catch(err =>
logger.warn('Error closing queue', { error: err })
)
);
}
// Wait for all shutdown tasks with a timeout
await Promise.race([
Promise.all(shutdownTasks),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Shutdown timeout')), 5000)
)
]).catch(err => {
logger.warn('Some shutdown tasks did not complete cleanly', { error: err });
});
logger.info('Queue manager shutdown complete');
} catch (error) {
logger.error('Error during queue manager shutdown', { error });
// Don't throw in shutdown to avoid hanging tests
}
}
private getConnection() {
return getRedisConnection(this.config.redis);
}
private async getRedisClient() {
// Create a redis client for rate limiting
const Redis = require('ioredis');
return new Redis(this.getConnection());
}
private async startWorkers(): Promise<void> {
const connection = this.getConnection();
const queueName = `{${this.config.queueName}}`;
for (let i = 0; i < this.config.workers; i++) {
const worker = new Worker(queueName, this.processJob.bind(this), {
connection,
concurrency: this.config.concurrency,
});
worker.on('completed', job => {
logger.debug('Job completed', {
id: job.id,
name: job.name,
});
});
worker.on('failed', (job, err) => {
logger.error('Job failed', {
id: job?.id,
name: job?.name,
error: err.message,
});
});
this.workers.push(worker);
} }
logger.info(`Started ${this.config.workers} workers`); this.isShuttingDown = true;
} logger.info('Shutting down QueueManager...');
private async processJob(job: Job) {
const { handler, operation, payload }: JobData = job.data;
logger.info('Processing job', {
id: job.id,
handler,
operation,
payloadKeys: Object.keys(payload || {}),
});
try { try {
// Check rate limits if enabled // Close all queues (this now includes workers since they're managed by Queue class)
if (this.rateLimiter) { const queueShutdownPromises = Array.from(this.queues.values()).map(async (queue) => {
const rateLimit = await this.rateLimiter.checkLimit(handler, operation); try {
if (!rateLimit.allowed) { await queue.close();
// Reschedule job with delay } catch (error) {
const delay = rateLimit.retryAfter || 60000; logger.warn('Error closing queue', { error: error.message });
logger.warn('Job rate limited, rescheduling', {
id: job.id,
handler,
operation,
retryAfter: delay,
});
throw new Error(`Rate limited. Retry after ${delay}ms`);
} }
}
// Regular handler lookup
const jobHandler = handlerRegistry.getHandler(handler, operation);
if (!jobHandler) {
throw new Error(`No handler found for ${handler}:${operation}`);
}
const result = await jobHandler(payload);
logger.info('Job completed successfully', {
id: job.id,
handler,
operation,
}); });
return result; await Promise.all(queueShutdownPromises);
// Close all caches
const cacheShutdownPromises = Array.from(this.caches.values()).map(async (cache) => {
try {
// Try different disconnect methods as different cache providers may use different names
if (typeof cache.disconnect === 'function') {
await cache.disconnect();
} else if (typeof cache.close === 'function') {
await cache.close();
} else if (typeof cache.quit === 'function') {
await cache.quit();
}
} catch (error) {
logger.warn('Error closing cache', { error: error.message });
}
});
await Promise.all(cacheShutdownPromises);
// Clear collections
this.queues.clear();
this.caches.clear();
logger.info('QueueManager shutdown complete');
} catch (error) { } catch (error) {
logger.error('Job processing failed', { logger.error('Error during shutdown', { error: error.message });
id: job.id,
handler,
operation,
error: error instanceof Error ? error.message : String(error),
});
// Handle DLQ if enabled
if (this.dlqHandler && error instanceof Error) {
await this.dlqHandler.handleFailedJob(job, error);
}
throw error; throw error;
} }
} }
private setupEventListeners(): void { /**
this.queueEvents.on('completed', ({ jobId }) => { * Wait for all queues to be ready
logger.debug('Job completed event', { jobId }); */
}); async waitUntilReady(): Promise<void> {
const readyPromises = Array.from(this.queues.values()).map(queue => queue.waitUntilReady());
this.queueEvents.on('failed', ({ jobId, failedReason }) => { await Promise.all(readyPromises);
logger.warn('Job failed event', { jobId, failedReason });
});
this.queueEvents.on('stalled', ({ jobId }) => {
logger.warn('Job stalled event', { jobId });
});
} }
private ensureInitialized(): void { /**
if (!this.isInitialized) { * Get Redis configuration (for backward compatibility)
throw new Error('Queue manager not initialized. Call initialize() first.'); */
} getRedisConfig() {
return this.config.redis;
} }
} }

324
libs/queue/src/queue.ts Normal file
View file

@ -0,0 +1,324 @@
import { Queue as BullQueue, Worker, QueueEvents, type Job } from 'bullmq';
import { getLogger } from '@stock-bot/logger';
import { handlerRegistry } from './handler-registry';
import type { JobData, JobOptions, QueueStats } from './types';
import { getRedisConnection } from './utils';
const logger = getLogger('queue');
export interface QueueConfig {
workers?: number;
concurrency?: number;
startWorker?: boolean;
}
/**
* Consolidated Queue class that handles both job operations and optional worker management
* Can be used as a simple job queue or with workers for automatic processing
*/
export class Queue {
private bullQueue: BullQueue;
private workers: Worker[] = [];
private queueEvents?: QueueEvents;
private queueName: string;
private redisConfig: any;
constructor(
queueName: string,
redisConfig: any,
defaultJobOptions: JobOptions = {},
config: QueueConfig = {}
) {
this.queueName = queueName;
this.redisConfig = redisConfig;
const connection = getRedisConnection(redisConfig);
// Initialize BullMQ queue
this.bullQueue = new BullQueue(`{${queueName}}`, {
connection,
defaultJobOptions: {
removeOnComplete: 10,
removeOnFail: 5,
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
...defaultJobOptions,
},
});
// Initialize queue events if workers will be used
if (config.workers && config.workers > 0) {
this.queueEvents = new QueueEvents(`{${queueName}}`, { connection });
}
// Start workers if requested and not explicitly disabled
if (config.workers && config.workers > 0 && config.startWorker !== false) {
this.startWorkers(config.workers, config.concurrency || 1);
}
logger.debug('Queue created', {
queueName,
workers: config.workers || 0,
concurrency: config.concurrency || 1
});
}
/**
* Get the queue name
*/
getName(): string {
return this.queueName;
}
/**
* Add a single job to the queue
*/
async add(name: string, data: JobData, options: JobOptions = {}): Promise<Job> {
logger.debug('Adding job', { queueName: this.queueName, jobName: name });
return await this.bullQueue.add(name, data, options);
}
/**
* Add multiple jobs to the queue in bulk
*/
async addBulk(
jobs: Array<{ name: string; data: JobData; opts?: JobOptions }>
): Promise<Job[]> {
logger.debug('Adding bulk jobs', {
queueName: this.queueName,
jobCount: jobs.length
});
return await this.bullQueue.addBulk(jobs);
}
/**
* Get queue statistics
*/
async getStats(): Promise<QueueStats> {
const [waiting, active, completed, failed, delayed] = await Promise.all([
this.bullQueue.getWaiting(),
this.bullQueue.getActive(),
this.bullQueue.getCompleted(),
this.bullQueue.getFailed(),
this.bullQueue.getDelayed(),
]);
const isPaused = await this.bullQueue.isPaused();
return {
waiting: waiting.length,
active: active.length,
completed: completed.length,
failed: failed.length,
delayed: delayed.length,
paused: isPaused,
workers: this.workers.length,
};
}
/**
* Get a specific job by ID
*/
async getJob(jobId: string): Promise<Job | undefined> {
return await this.bullQueue.getJob(jobId);
}
/**
* Get jobs by state
*/
async getJobs(
states: Array<'waiting' | 'active' | 'completed' | 'failed' | 'delayed'>,
start = 0,
end = 100
): Promise<Job[]> {
return await this.bullQueue.getJobs(states, start, end);
}
/**
* Pause the queue (stops processing new jobs)
*/
async pause(): Promise<void> {
await this.bullQueue.pause();
logger.info('Queue paused', { queueName: this.queueName });
}
/**
* Resume the queue
*/
async resume(): Promise<void> {
await this.bullQueue.resume();
logger.info('Queue resumed', { queueName: this.queueName });
}
/**
* Drain the queue (remove all jobs)
*/
async drain(delayed = false): Promise<void> {
await this.bullQueue.drain(delayed);
logger.info('Queue drained', { queueName: this.queueName, delayed });
}
/**
* Clean completed and failed jobs
*/
async clean(
grace: number = 0,
limit: number = 100,
type: 'completed' | 'failed' = 'completed'
): Promise<void> {
await this.bullQueue.clean(grace, limit, type);
logger.debug('Queue cleaned', { queueName: this.queueName, type, grace, limit });
}
/**
* Wait until the queue is ready
*/
async waitUntilReady(): Promise<void> {
await this.bullQueue.waitUntilReady();
}
/**
* Close the queue (cleanup resources)
*/
async close(): Promise<void> {
try {
// Close workers first
if (this.workers.length > 0) {
await Promise.all(this.workers.map(worker => worker.close()));
this.workers = [];
logger.debug('Workers closed', { queueName: this.queueName });
}
// Close queue events
if (this.queueEvents) {
await this.queueEvents.close();
logger.debug('Queue events closed', { queueName: this.queueName });
}
// Close the queue itself
await this.bullQueue.close();
logger.info('Queue closed', { queueName: this.queueName });
} catch (error) {
logger.error('Error closing queue', { queueName: this.queueName, error });
throw error;
}
}
/**
* Start workers for this queue
*/
private startWorkers(workerCount: number, concurrency: number): void {
const connection = getRedisConnection(this.redisConfig);
for (let i = 0; i < workerCount; i++) {
const worker = new Worker(
`{${this.queueName}}`,
this.processJob.bind(this),
{
connection,
concurrency,
maxStalledCount: 3,
stalledInterval: 30000,
maxStalledTime: 60000,
}
);
// Setup worker event handlers
worker.on('completed', (job) => {
logger.debug('Job completed', {
queueName: this.queueName,
jobId: job.id,
handler: job.data?.handler,
operation: job.data?.operation,
});
});
worker.on('failed', (job, err) => {
logger.error('Job failed', {
queueName: this.queueName,
jobId: job?.id,
handler: job?.data?.handler,
operation: job?.data?.operation,
error: err.message,
});
});
worker.on('error', (error) => {
logger.error('Worker error', {
queueName: this.queueName,
workerId: i,
error: error.message,
});
});
this.workers.push(worker);
}
logger.info('Workers started', {
queueName: this.queueName,
workerCount,
concurrency,
});
}
/**
* Process a job using the handler registry
*/
private async processJob(job: Job): Promise<unknown> {
const { handler, operation, payload }: JobData = job.data;
logger.debug('Processing job', {
id: job.id,
handler,
operation,
queueName: this.queueName,
});
try {
// Look up handler in registry
const jobHandler = handlerRegistry.getHandler(handler, operation);
if (!jobHandler) {
throw new Error(`No handler found for ${handler}:${operation}`);
}
const result = await jobHandler(payload);
logger.debug('Job completed successfully', {
id: job.id,
handler,
operation,
queueName: this.queueName,
});
return result;
} catch (error) {
logger.error('Job processing failed', {
id: job.id,
handler,
operation,
queueName: this.queueName,
error: error instanceof Error ? error.message : String(error),
});
throw error;
}
}
/**
* Get the number of active workers
*/
getWorkerCount(): number {
return this.workers.length;
}
/**
* Get the underlying BullMQ queue (for advanced operations)
* @deprecated Use direct methods instead
*/
getBullQueue(): BullQueue {
return this.bullQueue;
}
}

View file

@ -11,9 +11,10 @@ export interface RateLimitConfig {
} }
export interface RateLimitRule { export interface RateLimitRule {
level: 'global' | 'handler' | 'operation'; level: 'global' | 'queue' | 'handler' | 'operation';
handler?: string; queueName?: string; // For queue-level limits
operation?: string; handler?: string; // For handler-level limits
operation?: string; // For operation-level limits (most specific)
config: RateLimitConfig; config: RateLimitConfig;
} }
@ -29,7 +30,7 @@ export class QueueRateLimiter {
addRule(rule: RateLimitRule): void { addRule(rule: RateLimitRule): void {
this.rules.push(rule); this.rules.push(rule);
const key = this.getRuleKey(rule.level, rule.handler, rule.operation); const key = this.getRuleKey(rule.level, rule.queueName, rule.handler, rule.operation);
const limiter = new RateLimiterRedis({ const limiter = new RateLimiterRedis({
storeClient: this.redisClient, storeClient: this.redisClient,
keyPrefix: rule.config.keyPrefix || `rl:${key}`, keyPrefix: rule.config.keyPrefix || `rl:${key}`,
@ -42,6 +43,7 @@ export class QueueRateLimiter {
logger.info('Rate limit rule added', { logger.info('Rate limit rule added', {
level: rule.level, level: rule.level,
queueName: rule.queueName,
handler: rule.handler, handler: rule.handler,
operation: rule.operation, operation: rule.operation,
points: rule.config.points, points: rule.config.points,
@ -51,44 +53,77 @@ export class QueueRateLimiter {
/** /**
* Check if a job can be processed based on rate limits * Check if a job can be processed based on rate limits
* Uses hierarchical precedence: operation > handler > queue > global
* The most specific matching rule takes precedence
*/ */
async checkLimit(handler: string, operation: string): Promise<{ async checkLimit(queueName: string, handler: string, operation: string): Promise<{
allowed: boolean; allowed: boolean;
retryAfter?: number; retryAfter?: number;
remainingPoints?: number; remainingPoints?: number;
appliedRule?: RateLimitRule;
}> { }> {
const limiters = this.getApplicableLimiters(handler, operation); const applicableRule = this.getMostSpecificRule(queueName, handler, operation);
if (limiters.length === 0) { if (!applicableRule) {
return { allowed: true };
}
const key = this.getRuleKey(applicableRule.level, applicableRule.queueName, applicableRule.handler, applicableRule.operation);
const limiter = this.limiters.get(key);
if (!limiter) {
logger.warn('Rate limiter not found for rule', { key, rule: applicableRule });
return { allowed: true }; return { allowed: true };
} }
try { try {
// Check all applicable rate limiters const result = await this.consumePoint(limiter, this.getConsumerKey(queueName, handler, operation));
const results = await Promise.all(
limiters.map(({ limiter, key }) => this.consumePoint(limiter, key))
);
// All limiters must allow the request
const blocked = results.find(r => !r.allowed);
if (blocked) {
return blocked;
}
// Return the most restrictive remaining points
const minRemainingPoints = Math.min(...results.map(r => r.remainingPoints || Infinity));
return { return {
allowed: true, ...result,
remainingPoints: minRemainingPoints === Infinity ? undefined : minRemainingPoints, appliedRule: applicableRule,
}; };
} catch (error) { } catch (error) {
logger.error('Rate limit check failed', { handler, operation, error }); logger.error('Rate limit check failed', { queueName, handler, operation, error });
// On error, allow the request to proceed // On error, allow the request to proceed
return { allowed: true }; return { allowed: true };
} }
} }
/**
* Get the most specific rule that applies to this job
* Precedence: operation > handler > queue > global
*/
private getMostSpecificRule(queueName: string, handler: string, operation: string): RateLimitRule | undefined {
// 1. Check for operation-specific rule (most specific)
let rule = this.rules.find(r =>
r.level === 'operation' &&
r.queueName === queueName &&
r.handler === handler &&
r.operation === operation
);
if (rule) return rule;
// 2. Check for handler-specific rule
rule = this.rules.find(r =>
r.level === 'handler' &&
r.queueName === queueName &&
r.handler === handler
);
if (rule) return rule;
// 3. Check for queue-specific rule
rule = this.rules.find(r =>
r.level === 'queue' &&
r.queueName === queueName
);
if (rule) return rule;
// 4. Check for global rule (least specific)
rule = this.rules.find(r => r.level === 'global');
return rule;
}
/** /**
* Consume a point from the rate limiter * Consume a point from the rate limiter
*/ */
@ -120,148 +155,120 @@ export class QueueRateLimiter {
} }
/** /**
* Get applicable rate limiters for a handler/operation * Get rule key for storing rate limiter
*/ */
private getApplicableLimiters(handler: string, operation: string): Array<{ limiter: RateLimiterRedis; key: string }> { private getRuleKey(level: string, queueName?: string, handler?: string, operation?: string): string {
const applicable: Array<{ limiter: RateLimiterRedis; key: string }> = [];
for (const rule of this.rules) {
let applies = false;
let consumerKey = '';
switch (rule.level) {
case 'global':
// Global limit applies to all
applies = true;
consumerKey = 'global';
break;
case 'handler':
// Handler limit applies if handler matches
if (rule.handler === handler) {
applies = true;
consumerKey = handler;
}
break;
case 'operation':
// Operation limit applies if both handler and operation match
if (rule.handler === handler && rule.operation === operation) {
applies = true;
consumerKey = `${handler}:${operation}`;
}
break;
}
if (applies) {
const ruleKey = this.getRuleKey(rule.level, rule.handler, rule.operation);
const limiter = this.limiters.get(ruleKey);
if (limiter) {
applicable.push({ limiter, key: consumerKey });
}
}
}
return applicable;
}
/**
* Get rule key
*/
private getRuleKey(level: string, handler?: string, operation?: string): string {
switch (level) { switch (level) {
case 'global': case 'global':
return 'global'; return 'global';
case 'queue':
return `queue:${queueName}`;
case 'handler': case 'handler':
return `handler:${handler}`; return `handler:${queueName}:${handler}`;
case 'operation': case 'operation':
return `operation:${handler}:${operation}`; return `operation:${queueName}:${handler}:${operation}`;
default: default:
return level; return level;
} }
} }
/** /**
* Get current rate limit status for a handler/operation * Get consumer key for rate limiting (what gets counted)
*/ */
async getStatus(handler: string, operation: string): Promise<{ private getConsumerKey(queueName: string, handler: string, operation: string): string {
return `${queueName}:${handler}:${operation}`;
}
/**
* Get current rate limit status for a queue/handler/operation
*/
async getStatus(queueName: string, handler: string, operation: string): Promise<{
queueName: string;
handler: string; handler: string;
operation: string; operation: string;
limits: Array<{ appliedRule?: RateLimitRule;
limit?: {
level: string; level: string;
points: number; points: number;
duration: number; duration: number;
remaining: number; remaining: number;
resetIn: number; resetIn: number;
}>;
}> {
const applicable = this.getApplicableLimiters(handler, operation);
const limits = await Promise.all(
applicable.map(async ({ limiter, key }) => {
const rule = this.rules.find(r => {
const ruleKey = this.getRuleKey(r.level, r.handler, r.operation);
return this.limiters.get(ruleKey) === limiter;
});
try {
const result = await limiter.get(key);
if (!result) {
return {
level: rule?.level || 'unknown',
points: limiter.points,
duration: limiter.duration,
remaining: limiter.points,
resetIn: 0,
};
}
return {
level: rule?.level || 'unknown',
points: limiter.points,
duration: limiter.duration,
remaining: result.remainingPoints,
resetIn: result.msBeforeNext,
};
} catch (error) {
return {
level: rule?.level || 'unknown',
points: limiter.points,
duration: limiter.duration,
remaining: 0,
resetIn: 0,
};
}
})
);
return {
handler,
operation,
limits,
}; };
}> {
const applicableRule = this.getMostSpecificRule(queueName, handler, operation);
if (!applicableRule) {
return {
queueName,
handler,
operation,
};
}
const key = this.getRuleKey(applicableRule.level, applicableRule.queueName, applicableRule.handler, applicableRule.operation);
const limiter = this.limiters.get(key);
if (!limiter) {
return {
queueName,
handler,
operation,
appliedRule: applicableRule,
};
}
try {
const consumerKey = this.getConsumerKey(queueName, handler, operation);
const result = await limiter.get(consumerKey);
const limit = {
level: applicableRule.level,
points: limiter.points,
duration: limiter.duration,
remaining: result?.remainingPoints ?? limiter.points,
resetIn: result?.msBeforeNext ?? 0,
};
return {
queueName,
handler,
operation,
appliedRule,
limit,
};
} catch (error) {
logger.error('Failed to get rate limit status', { queueName, handler, operation, error });
return {
queueName,
handler,
operation,
appliedRule,
};
}
} }
/** /**
* Reset rate limits for a handler/operation * Reset rate limits for a specific consumer
*/ */
async reset(handler: string, operation?: string): Promise<void> { async reset(queueName: string, handler?: string, operation?: string): Promise<void> {
const applicable = operation if (handler && operation) {
? this.getApplicableLimiters(handler, operation) // Reset specific operation
: this.rules const consumerKey = this.getConsumerKey(queueName, handler, operation);
.filter(r => !handler || r.handler === handler) const rule = this.getMostSpecificRule(queueName, handler, operation);
.map(r => {
const key = this.getRuleKey(r.level, r.handler, r.operation); if (rule) {
const limiter = this.limiters.get(key); const key = this.getRuleKey(rule.level, rule.queueName, rule.handler, rule.operation);
return limiter ? { limiter, key: handler || 'global' } : null; const limiter = this.limiters.get(key);
}) if (limiter) {
.filter(Boolean) as Array<{ limiter: RateLimiterRedis; key: string }>; await limiter.delete(consumerKey);
}
}
} else {
// Reset broader scope - this is more complex with the new hierarchy
logger.warn('Broad reset not implemented yet', { queueName, handler, operation });
}
await Promise.all( logger.info('Rate limits reset', { queueName, handler, operation });
applicable.map(({ limiter, key }) => limiter.delete(key))
);
logger.info('Rate limits reset', { handler, operation });
} }
/** /**
@ -274,10 +281,11 @@ export class QueueRateLimiter {
/** /**
* Remove a rate limit rule * Remove a rate limit rule
*/ */
removeRule(level: string, handler?: string, operation?: string): boolean { removeRule(level: string, queueName?: string, handler?: string, operation?: string): boolean {
const key = this.getRuleKey(level, handler, operation); const key = this.getRuleKey(level, queueName, handler, operation);
const ruleIndex = this.rules.findIndex(r => const ruleIndex = this.rules.findIndex(r =>
r.level === level && r.level === level &&
(!queueName || r.queueName === queueName) &&
(!handler || r.handler === handler) && (!handler || r.handler === handler) &&
(!operation || r.operation === operation) (!operation || r.operation === operation)
); );
@ -286,7 +294,7 @@ export class QueueRateLimiter {
this.rules.splice(ruleIndex, 1); this.rules.splice(ruleIndex, 1);
this.limiters.delete(key); this.limiters.delete(key);
logger.info('Rate limit rule removed', { level, handler, operation }); logger.info('Rate limit rule removed', { level, queueName, handler, operation });
return true; return true;
} }

View file

@ -31,34 +31,69 @@ export interface BatchResult {
duration: number; duration: number;
} }
export interface QueueConfig { // New improved types for the refactored architecture
export interface RedisConfig {
host?: string;
port?: number;
password?: string;
db?: number;
}
export interface JobOptions {
priority?: number;
delay?: number;
attempts?: number;
removeOnComplete?: number;
removeOnFail?: number;
backoff?: {
type: 'exponential' | 'fixed';
delay: number;
};
}
export interface QueueOptions {
defaultJobOptions?: JobOptions;
workers?: number; workers?: number;
concurrency?: number; concurrency?: number;
redis?: { enableMetrics?: boolean;
host?: string;
port?: number;
password?: string;
db?: number;
};
queueName?: string;
defaultJobOptions?: {
removeOnComplete?: number;
removeOnFail?: number;
attempts?: number;
backoff?: {
type: string;
delay: number;
};
};
handlers?: HandlerInitializer[];
enableScheduledJobs?: boolean;
// Rate limiting
enableRateLimit?: boolean;
globalRateLimit?: RateLimitConfig;
enableDLQ?: boolean; enableDLQ?: boolean;
enableRateLimit?: boolean;
rateLimitRules?: RateLimitRule[]; // Queue-specific rate limit rules
}
export interface QueueManagerConfig {
redis: RedisConfig;
defaultQueueOptions?: QueueOptions;
enableScheduledJobs?: boolean;
globalRateLimit?: RateLimitConfig;
rateLimitRules?: RateLimitRule[]; // Global rate limit rules
}
export interface QueueStats {
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
paused: boolean;
workers?: number;
}
export interface GlobalStats {
queues: Record<string, QueueStats>;
totalJobs: number;
totalWorkers: number;
uptime: number;
}
// Legacy type for backward compatibility
export interface QueueConfig extends QueueManagerConfig {
queueName?: string;
workers?: number;
concurrency?: number;
handlers?: HandlerInitializer[];
dlqConfig?: DLQConfig; dlqConfig?: DLQConfig;
enableMetrics?: boolean; enableMetrics?: boolean;
rateLimitRules?: RateLimitRule[];
} }
export interface JobHandler { export interface JobHandler {
@ -108,9 +143,10 @@ export interface RateLimitConfig {
} }
export interface RateLimitRule { export interface RateLimitRule {
level: 'global' | 'handler' | 'operation'; level: 'global' | 'queue' | 'handler' | 'operation';
handler?: string; queueName?: string; // For queue-level limits
operation?: string; handler?: string; // For handler-level limits
operation?: string; // For operation-level limits (most specific)
config: RateLimitConfig; config: RateLimitConfig;
} }

View file

@ -1,5 +1,5 @@
import { describe, test, expect, beforeEach, afterEach } from 'bun:test'; import { describe, test, expect, beforeEach, afterEach } from 'bun:test';
import { QueueManager, Queue, handlerRegistry, processItems, initializeBatchCache } from '../src'; import { QueueManager, Queue, handlerRegistry, processItems } from '../src';
// Suppress Redis connection errors in tests // Suppress Redis connection errors in tests
process.on('unhandledRejection', (reason, promise) => { process.on('unhandledRejection', (reason, promise) => {
@ -16,6 +16,7 @@ process.on('unhandledRejection', (reason, promise) => {
describe('Batch Processor', () => { describe('Batch Processor', () => {
let queueManager: QueueManager; let queueManager: QueueManager;
let queue: Queue; let queue: Queue;
let queueName: string;
const redisConfig = { const redisConfig = {
host: 'localhost', host: 'localhost',
@ -44,21 +45,21 @@ describe('Batch Processor', () => {
}); });
// Use unique queue name per test to avoid conflicts // Use unique queue name per test to avoid conflicts
const uniqueQueueName = `batch-test-queue-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; queueName = `batch-test-queue-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
// Initialize queue manager with no workers to prevent immediate processing // Reset and initialize singleton QueueManager for tests
queueManager = new QueueManager({ await QueueManager.reset();
queueName: uniqueQueueName, queueManager = QueueManager.initialize({
redis: redisConfig, redis: redisConfig,
workers: 0, // No workers in tests defaultQueueOptions: {
concurrency: 5, workers: 0, // No workers in tests
concurrency: 5,
},
}); });
await queueManager.initialize(); // Get queue using the new getQueue() method (batch cache is now auto-initialized)
queue = queueManager.getQueue(queueName);
// Create Queue instance without worker to prevent immediate job processing // Note: Batch cache is now automatically initialized when getting the queue
queue = new Queue(queueManager.getQueueName(), queueManager.getRedisConfig(), { startWorker: false });
await initializeBatchCache(queue);
// Ensure completely clean state - wait for queue to be ready first // Ensure completely clean state - wait for queue to be ready first
await queue.getBullQueue().waitUntilReady(); await queue.getBullQueue().waitUntilReady();
@ -89,12 +90,12 @@ describe('Batch Processor', () => {
} catch (error) { } catch (error) {
// Ignore cleanup errors // Ignore cleanup errors
} }
await queue.shutdown(); await queue.close();
} }
if (queueManager) { if (queueManager) {
await Promise.race([ await Promise.race([
queueManager.shutdown(), QueueManager.reset(),
new Promise((_, reject) => new Promise((_, reject) =>
setTimeout(() => reject(new Error('Shutdown timeout')), 3000) setTimeout(() => reject(new Error('Shutdown timeout')), 3000)
) )
@ -112,7 +113,7 @@ describe('Batch Processor', () => {
test('should process items directly without batching', async () => { test('should process items directly without batching', async () => {
const items = ['item1', 'item2', 'item3', 'item4', 'item5']; const items = ['item1', 'item2', 'item3', 'item4', 'item5'];
const result = await processItems(items, queue, { const result = await processItems(items, queueName, {
totalDelayHours: 0.001, // 3.6 seconds total totalDelayHours: 0.001, // 3.6 seconds total
useBatching: false, useBatching: false,
handler: 'batch-test', handler: 'batch-test',
@ -158,7 +159,7 @@ describe('Batch Processor', () => {
{ id: 3, name: 'Product C', price: 300 }, { id: 3, name: 'Product C', price: 300 },
]; ];
const result = await processItems(items, queue, { const result = await processItems(items, queueName, {
totalDelayHours: 0.001, totalDelayHours: 0.001,
useBatching: false, useBatching: false,
handler: 'batch-test', handler: 'batch-test',
@ -182,7 +183,7 @@ describe('Batch Processor', () => {
test('should process items in batches', async () => { test('should process items in batches', async () => {
const items = Array.from({ length: 50 }, (_, i) => ({ id: i, value: `item-${i}` })); const items = Array.from({ length: 50 }, (_, i) => ({ id: i, value: `item-${i}` }));
const result = await processItems(items, queue, { const result = await processItems(items, queueName, {
totalDelayHours: 0.001, totalDelayHours: 0.001,
useBatching: true, useBatching: true,
batchSize: 10, batchSize: 10,
@ -204,7 +205,7 @@ describe('Batch Processor', () => {
test('should handle different batch sizes', async () => { test('should handle different batch sizes', async () => {
const items = Array.from({ length: 23 }, (_, i) => i); const items = Array.from({ length: 23 }, (_, i) => i);
const result = await processItems(items, queue, { const result = await processItems(items, queueName, {
totalDelayHours: 0.001, totalDelayHours: 0.001,
useBatching: true, useBatching: true,
batchSize: 7, batchSize: 7,
@ -222,7 +223,7 @@ describe('Batch Processor', () => {
{ type: 'B', data: 'test2' }, { type: 'B', data: 'test2' },
]; ];
const result = await processItems(items, queue, { const result = await processItems(items, queueName, {
totalDelayHours: 0.001, totalDelayHours: 0.001,
useBatching: true, useBatching: true,
batchSize: 2, batchSize: 2,
@ -245,7 +246,7 @@ describe('Batch Processor', () => {
describe('Empty and Edge Cases', () => { describe('Empty and Edge Cases', () => {
test('should handle empty item list', async () => { test('should handle empty item list', async () => {
const result = await processItems([], queue, { const result = await processItems([], queueName, {
totalDelayHours: 1, totalDelayHours: 1,
handler: 'batch-test', handler: 'batch-test',
operation: 'process-item', operation: 'process-item',
@ -257,7 +258,7 @@ describe('Batch Processor', () => {
}); });
test('should handle single item', async () => { test('should handle single item', async () => {
const result = await processItems(['single-item'], queue, { const result = await processItems(['single-item'], queueName, {
totalDelayHours: 0.001, totalDelayHours: 0.001,
handler: 'batch-test', handler: 'batch-test',
operation: 'process-item', operation: 'process-item',
@ -270,7 +271,7 @@ describe('Batch Processor', () => {
test('should handle large batch with delays', async () => { test('should handle large batch with delays', async () => {
const items = Array.from({ length: 100 }, (_, i) => ({ index: i })); const items = Array.from({ length: 100 }, (_, i) => ({ index: i }));
const result = await processItems(items, queue, { const result = await processItems(items, queueName, {
totalDelayHours: 0.01, // 36 seconds total totalDelayHours: 0.01, // 36 seconds total
useBatching: true, useBatching: true,
batchSize: 25, batchSize: 25,
@ -294,7 +295,7 @@ describe('Batch Processor', () => {
test('should respect custom job options', async () => { test('should respect custom job options', async () => {
const items = ['a', 'b', 'c']; const items = ['a', 'b', 'c'];
await processItems(items, queue, { await processItems(items, queueName, {
totalDelayHours: 0, totalDelayHours: 0,
handler: 'batch-test', handler: 'batch-test',
operation: 'process-item', operation: 'process-item',
@ -339,7 +340,7 @@ describe('Batch Processor', () => {
}, },
}); });
await processItems(['test'], queue, { await processItems(['test'], queueName, {
totalDelayHours: 0, totalDelayHours: 0,
handler: 'custom-handler', handler: 'custom-handler',
operation: 'custom-operation', operation: 'custom-operation',