moved folders around

This commit is contained in:
Boki 2025-06-21 18:27:00 -04:00
parent 4f89affc2b
commit 36cb84b343
202 changed files with 1160 additions and 660 deletions

View file

@ -0,0 +1,328 @@
import { getLogger } from '@stock-bot/logger';
import { QueueManager } from './queue-manager';
import type { BatchJobData, BatchResult, JobData, ProcessOptions } from './types';
const logger = getLogger('batch-processor');
/**
* Main function - processes items either directly or in batches
* Each item becomes payload: item (no processing needed)
*/
export async function processItems<T>(
items: T[],
queueName: string,
options: ProcessOptions
): Promise<BatchResult> {
const queueManager = QueueManager.getInstance();
queueManager.getQueue(queueName);
const startTime = Date.now();
if (items.length === 0) {
return {
jobsCreated: 0,
mode: 'direct',
totalItems: 0,
duration: 0,
};
}
logger.info('Starting batch processing', {
totalItems: items.length,
mode: options.useBatching ? 'batch' : 'direct',
batchSize: options.batchSize,
totalDelayHours: options.totalDelayHours,
});
try {
const result = options.useBatching
? await processBatched(items, queueName, options)
: await processDirect(items, queueName, options);
const duration = Date.now() - startTime;
logger.info('Batch processing completed', {
...result,
duration: `${(duration / 1000).toFixed(1)}s`,
});
return { ...result, duration };
} catch (error) {
logger.error('Batch processing failed', error);
throw error;
}
}
/**
* Process items directly - each item becomes a separate job
*/
async function processDirect<T>(
items: T[],
queueName: string,
options: ProcessOptions
): Promise<Omit<BatchResult, 'duration'>> {
const queueManager = QueueManager.getInstance();
queueManager.getQueue(queueName);
const totalDelayMs = options.totalDelayHours * 60 * 60 * 1000; // Convert hours to milliseconds
const delayPerItem = totalDelayMs / items.length;
logger.info('Creating direct jobs', {
totalItems: items.length,
delayPerItem: `${(delayPerItem / 1000).toFixed(1)}s`,
});
const jobs = items.map((item, index) => ({
name: 'process-item',
data: {
handler: options.handler || 'generic',
operation: options.operation || 'process-item',
payload: item, // Just the item directly - no wrapper!
priority: options.priority || undefined,
},
opts: {
delay: index * delayPerItem,
priority: options.priority || undefined,
attempts: options.retries || 3,
removeOnComplete: options.removeOnComplete || 10,
removeOnFail: options.removeOnFail || 5,
},
}));
const createdJobs = await addJobsInChunks(queueName, jobs);
return {
totalItems: items.length,
jobsCreated: createdJobs.length,
mode: 'direct',
};
}
/**
* Process items in batches - store items directly
*/
async function processBatched<T>(
items: T[],
queueName: string,
options: ProcessOptions
): Promise<Omit<BatchResult, 'duration'>> {
const queueManager = QueueManager.getInstance();
queueManager.getQueue(queueName);
const batchSize = options.batchSize || 100;
const batches = createBatches(items, batchSize);
const totalDelayMs = options.totalDelayHours * 60 * 60 * 1000; // Convert hours to milliseconds
const delayPerBatch = totalDelayMs / batches.length;
logger.info('Creating batch jobs', {
totalItems: items.length,
batchSize,
totalBatches: batches.length,
delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes`,
});
const batchJobs = await Promise.all(
batches.map(async (batch, batchIndex) => {
// Just store the items directly - no processing needed
const payloadKey = await storeItems(batch, queueName, options);
return {
name: 'process-batch',
data: {
handler: options.handler || 'generic',
operation: 'process-batch-items',
payload: {
payloadKey,
batchIndex,
totalBatches: batches.length,
itemCount: batch.length,
totalDelayHours: options.totalDelayHours,
} as BatchJobData,
priority: options.priority || undefined,
},
opts: {
delay: batchIndex * delayPerBatch,
priority: options.priority || undefined,
attempts: options.retries || 3,
removeOnComplete: options.removeOnComplete || 10,
removeOnFail: options.removeOnFail || 5,
},
};
})
);
const createdJobs = await addJobsInChunks(queueName, batchJobs);
return {
totalItems: items.length,
jobsCreated: createdJobs.length,
batchesCreated: batches.length,
mode: 'batch',
};
}
/**
* Process a batch job - loads items and creates individual jobs
*/
export async function processBatchJob(jobData: BatchJobData, queueName: string): Promise<unknown> {
const queueManager = QueueManager.getInstance();
queueManager.getQueue(queueName);
const { payloadKey, batchIndex, totalBatches, itemCount, totalDelayHours } = jobData;
logger.trace('Processing batch job', {
batchIndex,
totalBatches,
itemCount,
totalDelayHours,
});
try {
const payload = await loadPayload(payloadKey, queueName);
if (!payload || !payload.items || !payload.options) {
logger.error('Invalid payload data', { payloadKey, payload });
throw new Error(`Invalid payload data for key: ${payloadKey}`);
}
const { items, options } = payload;
// Calculate the time window for this batch
const totalDelayMs = totalDelayHours * 60 * 60 * 1000; // Convert hours to ms
const delayPerBatch = totalDelayMs / totalBatches; // Time allocated for each batch
const delayPerItem = delayPerBatch / items.length; // Distribute items evenly within batch window
logger.trace('Calculating job delays', {
batchIndex,
delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes`,
delayPerItem: `${(delayPerItem / 1000).toFixed(2)} seconds`,
itemsInBatch: items.length,
});
// Create jobs directly from items - each item becomes payload: item
const jobs = items.map((item: unknown, index: number) => ({
name: 'process-item',
data: {
handler: options.handler || 'generic',
operation: options.operation || 'generic',
payload: item, // Just the item directly!
priority: options.priority || undefined,
},
opts: {
delay: index * delayPerItem, // Distribute evenly within batch window
priority: options.priority || undefined,
attempts: options.retries || 3,
},
}));
const createdJobs = await addJobsInChunks(queueName, jobs);
// Cleanup payload after successful processing
await cleanupPayload(payloadKey, queueName);
return {
batchIndex,
itemsProcessed: items.length,
jobsCreated: createdJobs.length,
};
} catch (error) {
logger.error('Batch job processing failed', { batchIndex, error });
throw error;
}
}
// Helper functions
function createBatches<T>(items: T[], batchSize: number): T[][] {
const batches: T[][] = [];
for (let i = 0; i < items.length; i += batchSize) {
batches.push(items.slice(i, i + batchSize));
}
return batches;
}
async function storeItems<T>(
items: T[],
queueName: string,
options: ProcessOptions
): Promise<string> {
const queueManager = QueueManager.getInstance();
const cache = queueManager.getCache(queueName);
const payloadKey = `payload:${Date.now()}:${Math.random().toString(36).substr(2, 9)}`;
const payload = {
items, // Just store the items directly
options: {
delayPerItem: 1000,
priority: options.priority || undefined,
retries: options.retries || 3,
handler: options.handler || 'generic',
operation: options.operation || 'generic',
},
createdAt: new Date().toISOString(),
};
const ttlSeconds = options.ttl || 86400; // 24 hours default
await cache.set(payloadKey, payload, ttlSeconds);
return payloadKey;
}
async function loadPayload<T>(
key: string,
queueName: string
): Promise<{
items: T[];
options: {
delayPerItem: number;
priority?: number;
retries: number;
handler: string;
operation: string;
};
} | null> {
const queueManager = QueueManager.getInstance();
const cache = queueManager.getCache(queueName);
return (await cache.get(key)) as {
items: T[];
options: {
delayPerItem: number;
priority?: number;
retries: number;
handler: string;
operation: string;
};
} | null;
}
async function cleanupPayload(key: string, queueName: string): Promise<void> {
const queueManager = QueueManager.getInstance();
const cache = queueManager.getCache(queueName);
await cache.del(key);
}
async function addJobsInChunks(
queueName: string,
jobs: Array<{ name: string; data: JobData; opts?: Record<string, unknown> }>,
chunkSize = 100
): Promise<unknown[]> {
const queueManager = QueueManager.getInstance();
const queue = queueManager.getQueue(queueName);
const allCreatedJobs = [];
for (let i = 0; i < jobs.length; i += chunkSize) {
const chunk = jobs.slice(i, i + chunkSize);
try {
const createdJobs = await queue.addBulk(chunk);
allCreatedJobs.push(...createdJobs);
// Small delay between chunks to avoid overwhelming Redis
if (i + chunkSize < jobs.length) {
await new Promise(resolve => setTimeout(resolve, 100));
}
} catch (error) {
logger.error('Failed to add job chunk', {
startIndex: i,
chunkSize: chunk.length,
error,
});
}
}
return allCreatedJobs;
}

View file

@ -0,0 +1,251 @@
import { Queue, type Job } from 'bullmq';
import { getLogger } from '@stock-bot/logger';
import type { DLQConfig, RedisConfig } from './types';
import { getRedisConnection } from './utils';
const logger = getLogger('dlq-handler');
export class DeadLetterQueueHandler {
private dlq: Queue;
private config: Required<DLQConfig>;
private failureCount = new Map<string, number>();
constructor(
private mainQueue: Queue,
connection: RedisConfig,
config: DLQConfig = {}
) {
this.config = {
maxRetries: config.maxRetries ?? 3,
retryDelay: config.retryDelay ?? 60000, // 1 minute
alertThreshold: config.alertThreshold ?? 100,
cleanupAge: config.cleanupAge ?? 168, // 7 days
};
// Create DLQ with same name but -dlq suffix
const dlqName = `${mainQueue.name}-dlq`;
this.dlq = new Queue(dlqName, { connection: getRedisConnection(connection) });
}
/**
* Process a failed job - either retry or move to DLQ
*/
async handleFailedJob(job: Job, error: Error): Promise<void> {
const jobKey = `${job.name}:${job.id}`;
const currentFailures = (this.failureCount.get(jobKey) || 0) + 1;
this.failureCount.set(jobKey, currentFailures);
logger.warn('Job failed', {
jobId: job.id,
jobName: job.name,
attempt: job.attemptsMade,
maxAttempts: job.opts.attempts,
error: error.message,
failureCount: currentFailures,
});
// Check if job should be moved to DLQ
if (job.attemptsMade >= (job.opts.attempts || this.config.maxRetries)) {
await this.moveToDeadLetterQueue(job, error);
this.failureCount.delete(jobKey);
}
}
/**
* Move job to dead letter queue
*/
private async moveToDeadLetterQueue(job: Job, error: Error): Promise<void> {
try {
const dlqData = {
originalJob: {
id: job.id,
name: job.name,
data: job.data,
opts: job.opts,
attemptsMade: job.attemptsMade,
failedReason: job.failedReason,
processedOn: job.processedOn,
timestamp: job.timestamp,
},
error: {
message: error.message,
stack: error.stack,
name: error.name,
},
movedToDLQAt: new Date().toISOString(),
};
await this.dlq.add('failed-job', dlqData, {
removeOnComplete: false,
removeOnFail: false,
});
logger.error('Job moved to DLQ', {
jobId: job.id,
jobName: job.name,
error: error.message,
});
// Check if we need to alert
await this.checkAlertThreshold();
} catch (dlqError) {
logger.error('Failed to move job to DLQ', {
jobId: job.id,
error: dlqError,
});
}
}
/**
* Retry jobs from DLQ
*/
async retryDLQJobs(limit = 10): Promise<number> {
const jobs = await this.dlq.getCompleted(0, limit);
let retriedCount = 0;
for (const dlqJob of jobs) {
try {
const { originalJob } = dlqJob.data;
// Re-add to main queue with delay
await this.mainQueue.add(
originalJob.name,
originalJob.data,
{
...originalJob.opts,
delay: this.config.retryDelay,
attempts: this.config.maxRetries,
}
);
// Remove from DLQ
await dlqJob.remove();
retriedCount++;
logger.info('Job retried from DLQ', {
originalJobId: originalJob.id,
jobName: originalJob.name,
});
} catch (error) {
logger.error('Failed to retry DLQ job', {
dlqJobId: dlqJob.id,
error,
});
}
}
return retriedCount;
}
/**
* Get DLQ statistics
*/
async getStats(): Promise<{
total: number;
recent: number;
byJobName: Record<string, number>;
oldestJob: Date | null;
}> {
const [completed, failed, waiting] = await Promise.all([
this.dlq.getCompleted(),
this.dlq.getFailed(),
this.dlq.getWaiting(),
]);
const allJobs = [...completed, ...failed, ...waiting];
const byJobName: Record<string, number> = {};
let oldestTimestamp: number | null = null;
for (const job of allJobs) {
const jobName = job.data.originalJob?.name || 'unknown';
byJobName[jobName] = (byJobName[jobName] || 0) + 1;
if (!oldestTimestamp || job.timestamp < oldestTimestamp) {
oldestTimestamp = job.timestamp;
}
}
// Count recent jobs (last 24 hours)
const oneDayAgo = Date.now() - 24 * 60 * 60 * 1000;
const recent = allJobs.filter(job => job.timestamp > oneDayAgo).length;
return {
total: allJobs.length,
recent,
byJobName,
oldestJob: oldestTimestamp ? new Date(oldestTimestamp) : null,
};
}
/**
* Clean up old DLQ entries
*/
async cleanup(): Promise<number> {
const ageInMs = this.config.cleanupAge * 60 * 60 * 1000;
const cutoffTime = Date.now() - ageInMs;
const jobs = await this.dlq.getCompleted();
let removedCount = 0;
for (const job of jobs) {
if (job.timestamp < cutoffTime) {
await job.remove();
removedCount++;
}
}
logger.info('DLQ cleanup completed', {
removedCount,
cleanupAge: `${this.config.cleanupAge} hours`,
});
return removedCount;
}
/**
* Check if alert threshold is exceeded
*/
private async checkAlertThreshold(): Promise<void> {
const stats = await this.getStats();
if (stats.total >= this.config.alertThreshold) {
logger.error('DLQ alert threshold exceeded', {
threshold: this.config.alertThreshold,
currentCount: stats.total,
byJobName: stats.byJobName,
});
// In a real implementation, this would trigger alerts
}
}
/**
* Get failed jobs for inspection
*/
async inspectFailedJobs(limit = 10): Promise<Array<{
id: string;
name: string;
data: unknown;
error: unknown;
failedAt: string;
attempts: number;
}>> {
const jobs = await this.dlq.getCompleted(0, limit);
return jobs.map(job => ({
id: job.data.originalJob.id,
name: job.data.originalJob.name,
data: job.data.originalJob.data,
error: job.data.error,
failedAt: job.data.movedToDLQAt,
attempts: job.data.originalJob.attemptsMade,
}));
}
/**
* Shutdown DLQ handler
*/
async shutdown(): Promise<void> {
await this.dlq.close();
this.failureCount.clear();
}
}

View file

@ -0,0 +1,191 @@
import { getLogger } from '@stock-bot/logger';
import type { JobHandler, HandlerConfig, HandlerConfigWithSchedule, ScheduledJob } from './types';
const logger = getLogger('handler-registry');
class HandlerRegistry {
private handlers = new Map<string, HandlerConfig>();
private handlerSchedules = new Map<string, ScheduledJob[]>();
/**
* Register a handler with its operations (simple config)
*/
register(handlerName: string, config: HandlerConfig): void {
logger.info(`Registering handler: ${handlerName}`, {
operations: Object.keys(config),
});
this.handlers.set(handlerName, config);
}
/**
* Register a handler with operations and scheduled jobs (full config)
*/
registerWithSchedule(config: HandlerConfigWithSchedule): void {
logger.info(`Registering handler with schedule: ${config.name}`, {
operations: Object.keys(config.operations),
scheduledJobs: config.scheduledJobs?.length || 0,
});
this.handlers.set(config.name, config.operations);
if (config.scheduledJobs && config.scheduledJobs.length > 0) {
this.handlerSchedules.set(config.name, config.scheduledJobs);
}
}
/**
* Get a handler for a specific handler and operation
*/
getHandler(handler: string, operation: string): JobHandler | null {
const handlerConfig = this.handlers.get(handler);
if (!handlerConfig) {
logger.warn(`Handler not found: ${handler}`);
return null;
}
const jobHandler = handlerConfig[operation];
if (!jobHandler) {
logger.warn(`Operation not found: ${handler}:${operation}`, {
availableOperations: Object.keys(handlerConfig),
});
return null;
}
return jobHandler;
}
/**
* Get all scheduled jobs from all handlers
*/
getAllScheduledJobs(): Array<{ handler: string; job: ScheduledJob }> {
const allJobs: Array<{ handler: string; job: ScheduledJob }> = [];
for (const [handlerName, jobs] of this.handlerSchedules) {
for (const job of jobs) {
allJobs.push({
handler: handlerName,
job,
});
}
}
return allJobs;
}
/**
* Get scheduled jobs for a specific handler
*/
getScheduledJobs(handler: string): ScheduledJob[] {
return this.handlerSchedules.get(handler) || [];
}
/**
* Check if a handler has scheduled jobs
*/
hasScheduledJobs(handler: string): boolean {
return this.handlerSchedules.has(handler);
}
/**
* Get all registered handlers with their configurations
*/
getHandlerConfigs(): Array<{ name: string; operations: string[]; scheduledJobs: number }> {
return Array.from(this.handlers.keys()).map(name => ({
name,
operations: Object.keys(this.handlers.get(name) || {}),
scheduledJobs: this.handlerSchedules.get(name)?.length || 0,
}));
}
/**
* Get all handlers with their full configurations for queue manager registration
*/
getAllHandlers(): Map<string, { operations: HandlerConfig; scheduledJobs?: ScheduledJob[] }> {
const result = new Map<
string,
{ operations: HandlerConfig; scheduledJobs?: ScheduledJob[] }
>();
for (const [name, operations] of this.handlers) {
const scheduledJobs = this.handlerSchedules.get(name);
result.set(name, {
operations,
scheduledJobs,
});
}
return result;
}
/**
* Get all registered handlers
*/
getHandlers(): string[] {
return Array.from(this.handlers.keys());
}
/**
* Get operations for a specific handler
*/
getOperations(handler: string): string[] {
const handlerConfig = this.handlers.get(handler);
return handlerConfig ? Object.keys(handlerConfig) : [];
}
/**
* Check if a handler exists
*/
hasHandler(handler: string): boolean {
return this.handlers.has(handler);
}
/**
* Check if a handler has a specific operation
*/
hasOperation(handler: string, operation: string): boolean {
const handlerConfig = this.handlers.get(handler);
return handlerConfig ? operation in handlerConfig : false;
}
/**
* Remove a handler
*/
unregister(handler: string): boolean {
this.handlerSchedules.delete(handler);
return this.handlers.delete(handler);
}
/**
* Clear all handlers
*/
clear(): void {
this.handlers.clear();
this.handlerSchedules.clear();
}
/**
* Get registry statistics
*/
getStats(): { handlers: number; totalOperations: number; totalScheduledJobs: number } {
let totalOperations = 0;
let totalScheduledJobs = 0;
for (const config of this.handlers.values()) {
totalOperations += Object.keys(config).length;
}
for (const jobs of this.handlerSchedules.values()) {
totalScheduledJobs += jobs.length;
}
return {
handlers: this.handlers.size,
totalOperations,
totalScheduledJobs,
};
}
}
// Export singleton instance
export const handlerRegistry = new HandlerRegistry();

View file

@ -0,0 +1,62 @@
// Core exports
export { Queue, type QueueWorkerConfig } from './queue';
export { QueueManager } from './queue-manager';
export { handlerRegistry } from './handler-registry';
export { createJobHandler } from './types';
// Batch processing
export { processBatchJob, processItems } from './batch-processor';
// Queue factory functions
// QueueFactory removed - use QueueManager directly
// DLQ handling
export { DeadLetterQueueHandler, DeadLetterQueueHandler as DLQHandler } from './dlq-handler';
// Metrics
export { QueueMetricsCollector } from './queue-metrics';
// Rate limiting
export { QueueRateLimiter } from './rate-limiter';
// Types
export type {
// Core types
JobData,
JobOptions,
QueueOptions,
QueueStats,
GlobalStats,
// Batch processing types
BatchResult,
ProcessOptions,
BatchJobData,
// Handler types
JobHandler,
TypedJobHandler,
HandlerConfig,
TypedHandlerConfig,
HandlerConfigWithSchedule,
TypedHandlerConfigWithSchedule,
HandlerInitializer,
// Configuration types
RedisConfig,
QueueConfig,
QueueManagerConfig,
// Rate limiting types
RateLimitConfig,
RateLimitRule,
// DLQ types
DLQConfig,
DLQJobInfo,
// Scheduled job types
ScheduledJob,
ScheduleConfig,
} from './types';

View file

@ -0,0 +1,488 @@
import { CacheProvider, createCache } from '@stock-bot/cache';
import { getLogger } from '@stock-bot/logger';
import { Queue, type QueueWorkerConfig } from './queue';
import { QueueRateLimiter } from './rate-limiter';
import type {
GlobalStats,
QueueManagerConfig,
QueueOptions,
QueueStats,
RateLimitRule,
} from './types';
import { getRedisConnection } from './utils';
const logger = getLogger('queue-manager');
/**
* Singleton QueueManager that provides unified queue and cache management
* Main entry point for all queue operations with getQueue() method
*/
export class QueueManager {
private static instance: QueueManager | null = null;
private queues = new Map<string, Queue>();
private caches = new Map<string, CacheProvider>();
private rateLimiter?: QueueRateLimiter;
private redisConnection: ReturnType<typeof getRedisConnection>;
private isShuttingDown = false;
private shutdownPromise: Promise<void> | null = null;
private config: QueueManagerConfig;
private constructor(config: QueueManagerConfig) {
this.config = config;
this.redisConnection = getRedisConnection(config.redis);
// Initialize rate limiter if rules are provided
if (config.rateLimitRules && config.rateLimitRules.length > 0) {
this.rateLimiter = new QueueRateLimiter(this.redisConnection);
config.rateLimitRules.forEach(rule => {
if (this.rateLimiter) {
this.rateLimiter.addRule(rule);
}
});
}
logger.info('QueueManager singleton initialized', {
redis: `${config.redis.host}:${config.redis.port}`,
});
}
/**
* Get the singleton instance
* @throws Error if not initialized - use initialize() first
*/
static getInstance(): QueueManager {
if (!QueueManager.instance) {
throw new Error('QueueManager not initialized. Call QueueManager.initialize(config) first.');
}
return QueueManager.instance;
}
/**
* Initialize the singleton with config
* Must be called before getInstance()
*/
static initialize(config: QueueManagerConfig): QueueManager {
if (QueueManager.instance) {
logger.warn('QueueManager already initialized, returning existing instance');
return QueueManager.instance;
}
QueueManager.instance = new QueueManager(config);
return QueueManager.instance;
}
/**
* Get or initialize the singleton
* Convenience method that combines initialize and getInstance
*/
static getOrInitialize(config?: QueueManagerConfig): QueueManager {
if (QueueManager.instance) {
return QueueManager.instance;
}
if (!config) {
throw new Error(
'QueueManager not initialized and no config provided. ' +
'Either call initialize(config) first or provide config to getOrInitialize(config).'
);
}
return QueueManager.initialize(config);
}
/**
* Check if the QueueManager is initialized
*/
static isInitialized(): boolean {
return QueueManager.instance !== null;
}
/**
* Reset the singleton (mainly for testing)
*/
static async reset(): Promise<void> {
if (QueueManager.instance) {
await QueueManager.instance.shutdown();
QueueManager.instance = null;
}
}
/**
* Get or create a queue - unified method that handles both scenarios
* This is the main method for accessing queues
*/
getQueue(queueName: string, options: QueueOptions = {}): Queue {
// Return existing queue if it exists
if (this.queues.has(queueName)) {
const existingQueue = this.queues.get(queueName);
if (existingQueue) {
return existingQueue;
}
}
// Create new queue with merged options
const mergedOptions = {
...this.config.defaultQueueOptions,
...options,
};
// Prepare queue configuration
const queueConfig: QueueWorkerConfig = {
workers: mergedOptions.workers,
concurrency: mergedOptions.concurrency,
startWorker: !!mergedOptions.workers && mergedOptions.workers > 0 && !this.config.delayWorkerStart,
};
const queue = new Queue(
queueName,
this.config.redis,
mergedOptions.defaultJobOptions || {},
queueConfig
);
// Store the queue
this.queues.set(queueName, queue);
// Automatically initialize batch cache for the queue
this.initializeBatchCacheSync(queueName);
// Add queue-specific rate limit rules
if (this.rateLimiter && mergedOptions.rateLimitRules) {
mergedOptions.rateLimitRules.forEach(rule => {
// Ensure queue name is set for queue-specific rules
const ruleWithQueue = { ...rule, queueName };
if (this.rateLimiter) {
this.rateLimiter.addRule(ruleWithQueue);
}
});
}
logger.info('Queue created with batch cache', {
queueName,
workers: mergedOptions.workers || 0,
concurrency: mergedOptions.concurrency || 1,
});
return queue;
}
/**
* Check if a queue exists
*/
hasQueue(queueName: string): boolean {
return this.queues.has(queueName);
}
/**
* Get all queue names
*/
getQueueNames(): string[] {
return Array.from(this.queues.keys());
}
/**
* Get or create a cache for a queue
*/
getCache(queueName: string): CacheProvider {
if (!this.caches.has(queueName)) {
const cacheProvider = createCache({
redisConfig: this.config.redis,
keyPrefix: `batch:${queueName}:`,
ttl: 86400, // 24 hours default
enableMetrics: true,
});
this.caches.set(queueName, cacheProvider);
logger.trace('Cache created for queue', { queueName });
}
const cache = this.caches.get(queueName);
if (!cache) {
throw new Error(`Expected cache for queue ${queueName} to exist`);
}
return cache;
}
/**
* Initialize cache for a queue (ensures it's ready)
*/
async initializeCache(queueName: string): Promise<void> {
const cache = this.getCache(queueName);
await cache.waitForReady(10000);
logger.info('Cache initialized for queue', { queueName });
}
/**
* Initialize batch cache synchronously (for automatic initialization)
* The cache will be ready for use, but we don't wait for Redis connection
*/
private initializeBatchCacheSync(queueName: string): void {
// Just create the cache - it will connect automatically when first used
this.getCache(queueName);
logger.trace('Batch cache initialized synchronously for queue', { queueName });
}
/**
* Get statistics for all queues
*/
async getGlobalStats(): Promise<GlobalStats> {
const queueStats: Record<string, QueueStats> = {};
let totalJobs = 0;
let totalWorkers = 0;
for (const [queueName, queue] of this.queues) {
const stats = await queue.getStats();
queueStats[queueName] = stats;
totalJobs += stats.waiting + stats.active + stats.completed + stats.failed + stats.delayed;
totalWorkers += stats.workers || 0;
}
return {
queues: queueStats,
totalJobs,
totalWorkers,
uptime: process.uptime(),
};
}
/**
* Get statistics for a specific queue
*/
async getQueueStats(queueName: string): Promise<QueueStats | undefined> {
const queue = this.queues.get(queueName);
if (!queue) {
return undefined;
}
return await queue.getStats();
}
/**
* Add a rate limit rule
*/
addRateLimitRule(rule: RateLimitRule): void {
if (!this.rateLimiter) {
this.rateLimiter = new QueueRateLimiter(this.redisConnection);
}
this.rateLimiter.addRule(rule);
}
/**
* Check rate limits for a job
*/
async checkRateLimit(
queueName: string,
handler: string,
operation: string
): Promise<{
allowed: boolean;
retryAfter?: number;
remainingPoints?: number;
appliedRule?: RateLimitRule;
}> {
if (!this.rateLimiter) {
return { allowed: true };
}
return await this.rateLimiter.checkLimit(queueName, handler, operation);
}
/**
* Get rate limit status
*/
async getRateLimitStatus(queueName: string, handler: string, operation: string) {
if (!this.rateLimiter) {
return {
queueName,
handler,
operation,
};
}
return await this.rateLimiter.getStatus(queueName, handler, operation);
}
/**
* Pause all queues
*/
async pauseAll(): Promise<void> {
const pausePromises = Array.from(this.queues.values()).map(queue => queue.pause());
await Promise.all(pausePromises);
logger.info('All queues paused');
}
/**
* Resume all queues
*/
async resumeAll(): Promise<void> {
const resumePromises = Array.from(this.queues.values()).map(queue => queue.resume());
await Promise.all(resumePromises);
logger.info('All queues resumed');
}
/**
* Pause a specific queue
*/
async pauseQueue(queueName: string): Promise<boolean> {
const queue = this.queues.get(queueName);
if (!queue) {
return false;
}
await queue.pause();
return true;
}
/**
* Resume a specific queue
*/
async resumeQueue(queueName: string): Promise<boolean> {
const queue = this.queues.get(queueName);
if (!queue) {
return false;
}
await queue.resume();
return true;
}
/**
* Drain all queues
*/
async drainAll(delayed = false): Promise<void> {
const drainPromises = Array.from(this.queues.values()).map(queue => queue.drain(delayed));
await Promise.all(drainPromises);
logger.info('All queues drained', { delayed });
}
/**
* Clean all queues
*/
async cleanAll(
grace: number = 0,
limit: number = 100,
type: 'completed' | 'failed' = 'completed'
): Promise<void> {
const cleanPromises = Array.from(this.queues.values()).map(queue =>
queue.clean(grace, limit, type)
);
await Promise.all(cleanPromises);
logger.info('All queues cleaned', { type, grace, limit });
}
/**
* Shutdown all queues and workers (thread-safe)
*/
async shutdown(): Promise<void> {
// If already shutting down, return the existing promise
if (this.shutdownPromise) {
return this.shutdownPromise;
}
if (this.isShuttingDown) {
return;
}
this.isShuttingDown = true;
logger.info('Shutting down QueueManager...');
// Create shutdown promise
this.shutdownPromise = this.performShutdown();
return this.shutdownPromise;
}
/**
* Perform the actual shutdown
*/
private async performShutdown(): Promise<void> {
try {
// Close all queues (this now includes workers since they're managed by Queue class)
const queueShutdownPromises = Array.from(this.queues.values()).map(async queue => {
try {
// Add timeout to queue.close() to prevent hanging
await queue.close();
// const timeoutPromise = new Promise<never>((_, reject) =>
// setTimeout(() => reject(new Error('Queue close timeout')), 100)
// );
// await Promise.race([closePromise, timeoutPromise]);
} catch (error) {
logger.warn('Error closing queue', { error: (error as Error).message });
}
});
await Promise.all(queueShutdownPromises);
// Close all caches
const cacheShutdownPromises = Array.from(this.caches.values()).map(async cache => {
try {
// Clear cache before shutdown
await cache.clear();
} catch (error) {
logger.warn('Error clearing cache', { error: (error as Error).message });
}
});
await Promise.all(cacheShutdownPromises);
// Clear collections
this.queues.clear();
this.caches.clear();
logger.info('QueueManager shutdown complete');
} catch (error) {
logger.error('Error during shutdown', { error: (error as Error).message });
throw error;
} finally {
// Reset shutdown state
this.shutdownPromise = null;
this.isShuttingDown = false;
}
}
/**
* Start workers for all queues (used when delayWorkerStart is enabled)
*/
startAllWorkers(): void {
if (!this.config.delayWorkerStart) {
logger.warn('startAllWorkers() called but delayWorkerStart is not enabled');
return;
}
let workersStarted = 0;
for (const queue of this.queues.values()) {
const workerCount = this.config.defaultQueueOptions?.workers || 1;
const concurrency = this.config.defaultQueueOptions?.concurrency || 1;
if (workerCount > 0) {
queue.startWorkersManually(workerCount, concurrency);
workersStarted++;
}
}
logger.info('All workers started', {
totalQueues: this.queues.size,
queuesWithWorkers: workersStarted,
delayWorkerStart: this.config.delayWorkerStart
});
}
/**
* Wait for all queues to be ready
*/
async waitUntilReady(): Promise<void> {
const readyPromises = Array.from(this.queues.values()).map(queue => queue.waitUntilReady());
await Promise.all(readyPromises);
}
/**
* Get Redis configuration (for backward compatibility)
*/
getRedisConfig() {
return this.config.redis;
}
/**
* Get the current configuration
*/
getConfig(): Readonly<QueueManagerConfig> {
return { ...this.config };
}
}

View file

@ -0,0 +1,314 @@
import { Queue, QueueEvents } from 'bullmq';
// import { getLogger } from '@stock-bot/logger';
// const logger = getLogger('queue-metrics');
export interface QueueMetrics {
// Job counts
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
paused?: number;
// Performance metrics
processingTime: {
avg: number;
min: number;
max: number;
p95: number;
p99: number;
};
// Throughput
throughput: {
completedPerMinute: number;
failedPerMinute: number;
totalPerMinute: number;
};
// Job age
oldestWaitingJob: Date | null;
// Health
isHealthy: boolean;
healthIssues: string[];
}
export class QueueMetricsCollector {
private processingTimes: number[] = [];
private completedTimestamps: number[] = [];
private failedTimestamps: number[] = [];
private jobStartTimes = new Map<string, number>();
private readonly maxSamples = 1000;
private readonly metricsInterval = 60000; // 1 minute
constructor(
private queue: Queue,
private queueEvents: QueueEvents
) {
this.setupEventListeners();
}
/**
* Setup event listeners for metrics collection
*/
private setupEventListeners(): void {
this.queueEvents.on('completed', () => {
// Record completion
this.completedTimestamps.push(Date.now());
this.cleanupOldTimestamps();
});
this.queueEvents.on('failed', () => {
// Record failure
this.failedTimestamps.push(Date.now());
this.cleanupOldTimestamps();
});
// Track processing times
this.queueEvents.on('active', ({ jobId }) => {
this.jobStartTimes.set(jobId, Date.now());
});
this.queueEvents.on('completed', ({ jobId }) => {
const startTime = this.jobStartTimes.get(jobId);
if (startTime) {
const processingTime = Date.now() - startTime;
this.recordProcessingTime(processingTime);
this.jobStartTimes.delete(jobId);
}
});
}
/**
* Record processing time
*/
private recordProcessingTime(time: number): void {
this.processingTimes.push(time);
// Keep only recent samples
if (this.processingTimes.length > this.maxSamples) {
this.processingTimes = this.processingTimes.slice(-this.maxSamples);
}
}
/**
* Clean up old timestamps
*/
private cleanupOldTimestamps(): void {
const cutoff = Date.now() - this.metricsInterval;
this.completedTimestamps = this.completedTimestamps.filter(ts => ts > cutoff);
this.failedTimestamps = this.failedTimestamps.filter(ts => ts > cutoff);
}
/**
* Collect current metrics
*/
async collect(): Promise<QueueMetrics> {
// Get job counts
const [waiting, active, completed, failed, delayed] = await Promise.all([
this.queue.getWaitingCount(),
this.queue.getActiveCount(),
this.queue.getCompletedCount(),
this.queue.getFailedCount(),
this.queue.getDelayedCount(),
]);
// BullMQ doesn't have getPausedCount, check if queue is paused
const paused = await this.queue.isPaused() ? waiting : 0;
// Calculate processing time metrics
const processingTime = this.calculateProcessingTimeMetrics();
// Calculate throughput
const throughput = this.calculateThroughput();
// Get oldest waiting job
const oldestWaitingJob = await this.getOldestWaitingJob();
// Check health
const { isHealthy, healthIssues } = this.checkHealth({
waiting,
active,
failed,
processingTime,
});
return {
waiting,
active,
completed,
failed,
delayed,
paused,
processingTime,
throughput,
oldestWaitingJob,
isHealthy,
healthIssues,
};
}
/**
* Calculate processing time metrics
*/
private calculateProcessingTimeMetrics(): QueueMetrics['processingTime'] {
if (this.processingTimes.length === 0) {
return { avg: 0, min: 0, max: 0, p95: 0, p99: 0 };
}
const sorted = [...this.processingTimes].sort((a, b) => a - b);
const sum = sorted.reduce((acc, val) => acc + val, 0);
return {
avg: sorted.length > 0 ? Math.round(sum / sorted.length) : 0,
min: sorted[0] || 0,
max: sorted[sorted.length - 1] || 0,
p95: sorted[Math.floor(sorted.length * 0.95)] || 0,
p99: sorted[Math.floor(sorted.length * 0.99)] || 0,
};
}
/**
* Calculate throughput metrics
*/
private calculateThroughput(): QueueMetrics['throughput'] {
const now = Date.now();
const oneMinuteAgo = now - 60000;
const completedPerMinute = this.completedTimestamps.filter(ts => ts > oneMinuteAgo).length;
const failedPerMinute = this.failedTimestamps.filter(ts => ts > oneMinuteAgo).length;
return {
completedPerMinute,
failedPerMinute,
totalPerMinute: completedPerMinute + failedPerMinute,
};
}
/**
* Get oldest waiting job
*/
private async getOldestWaitingJob(): Promise<Date | null> {
const waitingJobs = await this.queue.getWaiting(0, 1);
if (waitingJobs.length > 0) {
return new Date(waitingJobs[0].timestamp);
}
return null;
}
/**
* Check queue health
*/
private checkHealth(metrics: {
waiting: number;
active: number;
failed: number;
processingTime: QueueMetrics['processingTime'];
}): { isHealthy: boolean; healthIssues: string[] } {
const issues: string[] = [];
// Check for high failure rate
const failureRate = metrics.failed / (metrics.failed + this.completedTimestamps.length);
if (failureRate > 0.1) {
issues.push(`High failure rate: ${(failureRate * 100).toFixed(1)}%`);
}
// Check for queue backlog
if (metrics.waiting > 1000) {
issues.push(`Large queue backlog: ${metrics.waiting} jobs waiting`);
}
// Check for slow processing
if (metrics.processingTime.avg > 30000) { // 30 seconds
issues.push(`Slow average processing time: ${(metrics.processingTime.avg / 1000).toFixed(1)}s`);
}
// Check for stalled active jobs
if (metrics.active > 100) {
issues.push(`High number of active jobs: ${metrics.active}`);
}
return {
isHealthy: issues.length === 0,
healthIssues: issues,
};
}
/**
* Get formatted metrics report
*/
async getReport(): Promise<string> {
const metrics = await this.collect();
return `
Queue Metrics Report
===================
Status: ${metrics.isHealthy ? '✅ Healthy' : '⚠️ Issues Detected'}
Job Counts:
- Waiting: ${metrics.waiting}
- Active: ${metrics.active}
- Completed: ${metrics.completed}
- Failed: ${metrics.failed}
- Delayed: ${metrics.delayed}
- Paused: ${metrics.paused}
Performance:
- Avg Processing Time: ${(metrics.processingTime.avg / 1000).toFixed(2)}s
- Min/Max: ${(metrics.processingTime.min / 1000).toFixed(2)}s / ${(metrics.processingTime.max / 1000).toFixed(2)}s
- P95/P99: ${(metrics.processingTime.p95 / 1000).toFixed(2)}s / ${(metrics.processingTime.p99 / 1000).toFixed(2)}s
Throughput:
- Completed/min: ${metrics.throughput.completedPerMinute}
- Failed/min: ${metrics.throughput.failedPerMinute}
- Total/min: ${metrics.throughput.totalPerMinute}
${metrics.oldestWaitingJob ? `Oldest Waiting Job: ${metrics.oldestWaitingJob.toISOString()}` : 'No waiting jobs'}
${metrics.healthIssues.length > 0 ? `\nHealth Issues:\n${metrics.healthIssues.map(issue => `- ${issue}`).join('\n')}` : ''}
`.trim();
}
/**
* Export metrics in Prometheus format
*/
async getPrometheusMetrics(): Promise<string> {
const metrics = await this.collect();
const queueName = this.queue.name;
return `
# HELP queue_jobs_total Total number of jobs by status
# TYPE queue_jobs_total gauge
queue_jobs_total{queue="${queueName}",status="waiting"} ${metrics.waiting}
queue_jobs_total{queue="${queueName}",status="active"} ${metrics.active}
queue_jobs_total{queue="${queueName}",status="completed"} ${metrics.completed}
queue_jobs_total{queue="${queueName}",status="failed"} ${metrics.failed}
queue_jobs_total{queue="${queueName}",status="delayed"} ${metrics.delayed}
queue_jobs_total{queue="${queueName}",status="paused"} ${metrics.paused}
# HELP queue_processing_time_seconds Job processing time in seconds
# TYPE queue_processing_time_seconds summary
queue_processing_time_seconds{queue="${queueName}",quantile="0.5"} ${(metrics.processingTime.avg / 1000).toFixed(3)}
queue_processing_time_seconds{queue="${queueName}",quantile="0.95"} ${(metrics.processingTime.p95 / 1000).toFixed(3)}
queue_processing_time_seconds{queue="${queueName}",quantile="0.99"} ${(metrics.processingTime.p99 / 1000).toFixed(3)}
queue_processing_time_seconds_sum{queue="${queueName}"} ${(metrics.processingTime.avg * this.processingTimes.length / 1000).toFixed(3)}
queue_processing_time_seconds_count{queue="${queueName}"} ${this.processingTimes.length}
# HELP queue_throughput_per_minute Jobs processed per minute
# TYPE queue_throughput_per_minute gauge
queue_throughput_per_minute{queue="${queueName}",status="completed"} ${metrics.throughput.completedPerMinute}
queue_throughput_per_minute{queue="${queueName}",status="failed"} ${metrics.throughput.failedPerMinute}
queue_throughput_per_minute{queue="${queueName}",status="total"} ${metrics.throughput.totalPerMinute}
# HELP queue_health Queue health status
# TYPE queue_health gauge
queue_health{queue="${queueName}"} ${metrics.isHealthy ? 1 : 0}
`.trim();
}
}

View file

@ -0,0 +1,372 @@
import { Queue as BullQueue, QueueEvents, Worker, type Job } from 'bullmq';
import { getLogger } from '@stock-bot/logger';
import { handlerRegistry } from './handler-registry';
import type { JobData, JobOptions, QueueStats, RedisConfig } from './types';
import { getRedisConnection } from './utils';
const logger = getLogger('queue');
export interface QueueWorkerConfig {
workers?: number;
concurrency?: number;
startWorker?: boolean;
}
/**
* Consolidated Queue class that handles both job operations and optional worker management
* Can be used as a simple job queue or with workers for automatic processing
*/
export class Queue {
private bullQueue: BullQueue;
private workers: Worker[] = [];
private queueEvents?: QueueEvents;
private queueName: string;
private redisConfig: RedisConfig;
constructor(
queueName: string,
redisConfig: RedisConfig,
defaultJobOptions: JobOptions = {},
config: QueueWorkerConfig = {}
) {
this.queueName = queueName;
this.redisConfig = redisConfig;
const connection = getRedisConnection(redisConfig);
// Initialize BullMQ queue
this.bullQueue = new BullQueue(`{${queueName}}`, {
connection,
defaultJobOptions: {
removeOnComplete: 10,
removeOnFail: 5,
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
...defaultJobOptions,
},
});
// Initialize queue events if workers will be used
if (config.workers && config.workers > 0) {
this.queueEvents = new QueueEvents(`{${queueName}}`, { connection });
}
// Start workers if requested and not explicitly disabled
if (config.workers && config.workers > 0 && config.startWorker !== false) {
this.startWorkers(config.workers, config.concurrency || 1);
}
logger.trace('Queue created', {
queueName,
workers: config.workers || 0,
concurrency: config.concurrency || 1,
});
}
/**
* Get the queue name
*/
getName(): string {
return this.queueName;
}
/**
* Add a single job to the queue
*/
async add(name: string, data: JobData, options: JobOptions = {}): Promise<Job> {
logger.trace('Adding job', { queueName: this.queueName, jobName: name });
return await this.bullQueue.add(name, data, options);
}
/**
* Add multiple jobs to the queue in bulk
*/
async addBulk(jobs: Array<{ name: string; data: JobData; opts?: JobOptions }>): Promise<Job[]> {
logger.trace('Adding bulk jobs', {
queueName: this.queueName,
jobCount: jobs.length,
});
return await this.bullQueue.addBulk(jobs);
}
/**
* Add a scheduled job with cron-like pattern
*/
async addScheduledJob(
name: string,
data: JobData,
cronPattern: string,
options: JobOptions = {}
): Promise<Job> {
const scheduledOptions: JobOptions = {
...options,
repeat: {
pattern: cronPattern,
// Use job name as repeat key to prevent duplicates
key: `${this.queueName}:${name}`,
...options.repeat,
},
};
logger.info('Adding scheduled job', {
queueName: this.queueName,
jobName: name,
cronPattern,
repeatKey: scheduledOptions.repeat?.key,
immediately: scheduledOptions.repeat?.immediately,
});
return await this.bullQueue.add(name, data, scheduledOptions);
}
/**
* Get queue statistics
*/
async getStats(): Promise<QueueStats> {
const [waiting, active, completed, failed, delayed] = await Promise.all([
this.bullQueue.getWaiting(),
this.bullQueue.getActive(),
this.bullQueue.getCompleted(),
this.bullQueue.getFailed(),
this.bullQueue.getDelayed(),
]);
const isPaused = await this.bullQueue.isPaused();
return {
waiting: waiting.length,
active: active.length,
completed: completed.length,
failed: failed.length,
delayed: delayed.length,
paused: isPaused,
workers: this.workers.length,
};
}
/**
* Get a specific job by ID
*/
async getJob(jobId: string): Promise<Job | undefined> {
return await this.bullQueue.getJob(jobId);
}
/**
* Get jobs by state
*/
async getJobs(
states: Array<'waiting' | 'active' | 'completed' | 'failed' | 'delayed'>,
start = 0,
end = 100
): Promise<Job[]> {
return await this.bullQueue.getJobs(states, start, end);
}
/**
* Pause the queue (stops processing new jobs)
*/
async pause(): Promise<void> {
await this.bullQueue.pause();
logger.info('Queue paused', { queueName: this.queueName });
}
/**
* Resume the queue
*/
async resume(): Promise<void> {
await this.bullQueue.resume();
logger.info('Queue resumed', { queueName: this.queueName });
}
/**
* Drain the queue (remove all jobs)
*/
async drain(delayed = false): Promise<void> {
await this.bullQueue.drain(delayed);
logger.info('Queue drained', { queueName: this.queueName, delayed });
}
/**
* Clean completed and failed jobs
*/
async clean(
grace: number = 0,
limit: number = 100,
type: 'completed' | 'failed' = 'completed'
): Promise<void> {
await this.bullQueue.clean(grace, limit, type);
logger.debug('Queue cleaned', { queueName: this.queueName, type, grace, limit });
}
/**
* Wait until the queue is ready
*/
async waitUntilReady(): Promise<void> {
await this.bullQueue.waitUntilReady();
}
/**
* Close the queue (cleanup resources)
*/
/**
* Close the queue (cleanup resources)
*/
async close(): Promise<void> {
try {
// Close the queue itself
await this.bullQueue.close();
logger.info('Queue closed', { queueName: this.queueName });
// Close queue events
if (this.queueEvents) {
await this.queueEvents.close();
logger.debug('Queue events closed', { queueName: this.queueName });
}
// Close workers first
if (this.workers.length > 0) {
await Promise.all(
this.workers.map(async worker => {
return await worker.close();
})
);
this.workers = [];
logger.debug('Workers closed', { queueName: this.queueName });
}
} catch (error) {
logger.error('Error closing queue', { queueName: this.queueName, error });
throw error;
}
}
/**
* Start workers for this queue
*/
private startWorkers(workerCount: number, concurrency: number): void {
const connection = getRedisConnection(this.redisConfig);
for (let i = 0; i < workerCount; i++) {
const worker = new Worker(`{${this.queueName}}`, this.processJob.bind(this), {
connection,
concurrency,
maxStalledCount: 3,
stalledInterval: 30000,
});
// Setup worker event handlers
worker.on('completed', job => {
logger.trace('Job completed', {
queueName: this.queueName,
jobId: job.id,
handler: job.data?.handler,
operation: job.data?.operation,
});
});
worker.on('failed', (job, err) => {
logger.error('Job failed', {
queueName: this.queueName,
jobId: job?.id,
handler: job?.data?.handler,
operation: job?.data?.operation,
error: err.message,
});
});
worker.on('error', error => {
logger.error('Worker error', {
queueName: this.queueName,
workerId: i,
error: error.message,
});
});
this.workers.push(worker);
}
logger.info('Workers started', {
queueName: this.queueName,
workerCount,
concurrency,
});
}
/**
* Process a job using the handler registry
*/
private async processJob(job: Job): Promise<unknown> {
const { handler, operation, payload }: JobData = job.data;
logger.trace('Processing job', {
id: job.id,
handler,
operation,
queueName: this.queueName,
});
try {
// Look up handler in registry
const jobHandler = handlerRegistry.getHandler(handler, operation);
if (!jobHandler) {
throw new Error(`No handler found for ${handler}:${operation}`);
}
const result = await jobHandler(payload);
logger.trace('Job completed successfully', {
id: job.id,
handler,
operation,
queueName: this.queueName,
});
return result;
} catch (error) {
logger.error('Job processing failed', {
id: job.id,
handler,
operation,
queueName: this.queueName,
error: error instanceof Error ? error.message : String(error),
});
throw error;
}
}
/**
* Start workers manually (for delayed initialization)
*/
startWorkersManually(workerCount: number, concurrency: number = 1): void {
if (this.workers.length > 0) {
logger.warn('Workers already started for queue', { queueName: this.queueName });
return;
}
// Initialize queue events if not already done
if (!this.queueEvents) {
const connection = getRedisConnection(this.redisConfig);
this.queueEvents = new QueueEvents(`{${this.queueName}}`, { connection });
}
this.startWorkers(workerCount, concurrency);
}
/**
* Get the number of active workers
*/
getWorkerCount(): number {
return this.workers.length;
}
/**
* Get the underlying BullMQ queue (for advanced operations)
* @deprecated Use direct methods instead
*/
getBullQueue(): BullQueue {
return this.bullQueue;
}
}

View file

@ -0,0 +1,294 @@
import { RateLimiterRedis, RateLimiterRes } from 'rate-limiter-flexible';
import { getLogger } from '@stock-bot/logger';
import type { RateLimitConfig as BaseRateLimitConfig, RateLimitRule } from './types';
const logger = getLogger('rate-limiter');
// Extend the base config to add rate-limiter specific fields
export interface RateLimitConfig extends BaseRateLimitConfig {
keyPrefix?: string;
}
export class QueueRateLimiter {
private limiters = new Map<string, RateLimiterRedis>();
private rules: RateLimitRule[] = [];
constructor(private redisClient: ReturnType<typeof import('./utils').getRedisConnection>) {}
/**
* Add a rate limit rule
*/
addRule(rule: RateLimitRule): void {
this.rules.push(rule);
const key = this.getRuleKey(rule.level, rule.queueName, rule.handler, rule.operation);
const limiter = new RateLimiterRedis({
storeClient: this.redisClient,
keyPrefix: `rl:${key}`,
points: rule.config.points,
duration: rule.config.duration,
blockDuration: rule.config.blockDuration || 0,
});
this.limiters.set(key, limiter);
logger.info('Rate limit rule added', {
level: rule.level,
queueName: rule.queueName,
handler: rule.handler,
operation: rule.operation,
points: rule.config.points,
duration: rule.config.duration,
});
}
/**
* Check if a job can be processed based on rate limits
* Uses hierarchical precedence: operation > handler > queue > global
* The most specific matching rule takes precedence
*/
async checkLimit(queueName: string, handler: string, operation: string): Promise<{
allowed: boolean;
retryAfter?: number;
remainingPoints?: number;
appliedRule?: RateLimitRule;
}> {
const applicableRule = this.getMostSpecificRule(queueName, handler, operation);
if (!applicableRule) {
return { allowed: true };
}
const key = this.getRuleKey(applicableRule.level, applicableRule.queueName, applicableRule.handler, applicableRule.operation);
const limiter = this.limiters.get(key);
if (!limiter) {
logger.warn('Rate limiter not found for rule', { key, rule: applicableRule });
return { allowed: true };
}
try {
const result = await this.consumePoint(limiter, this.getConsumerKey(queueName, handler, operation));
return {
...result,
appliedRule: applicableRule,
};
} catch (error) {
logger.error('Rate limit check failed', { queueName, handler, operation, error });
// On error, allow the request to proceed
return { allowed: true };
}
}
/**
* Get the most specific rule that applies to this job
* Precedence: operation > handler > queue > global
*/
private getMostSpecificRule(queueName: string, handler: string, operation: string): RateLimitRule | undefined {
// 1. Check for operation-specific rule (most specific)
let rule = this.rules.find(r =>
r.level === 'operation' &&
r.queueName === queueName &&
r.handler === handler &&
r.operation === operation
);
if (rule) {return rule;}
// 2. Check for handler-specific rule
rule = this.rules.find(r =>
r.level === 'handler' &&
r.queueName === queueName &&
r.handler === handler
);
if (rule) {return rule;}
// 3. Check for queue-specific rule
rule = this.rules.find(r =>
r.level === 'queue' &&
r.queueName === queueName
);
if (rule) {return rule;}
// 4. Check for global rule (least specific)
rule = this.rules.find(r => r.level === 'global');
return rule;
}
/**
* Consume a point from the rate limiter
*/
private async consumePoint(
limiter: RateLimiterRedis,
key: string
): Promise<{ allowed: boolean; retryAfter?: number; remainingPoints?: number }> {
try {
const result = await limiter.consume(key);
return {
allowed: true,
remainingPoints: result.remainingPoints,
};
} catch (rejRes) {
if (rejRes instanceof RateLimiterRes) {
logger.warn('Rate limit exceeded', {
key,
retryAfter: rejRes.msBeforeNext,
});
return {
allowed: false,
retryAfter: rejRes.msBeforeNext,
remainingPoints: rejRes.remainingPoints,
};
}
throw rejRes;
}
}
/**
* Get rule key for storing rate limiter
*/
private getRuleKey(level: string, queueName?: string, handler?: string, operation?: string): string {
switch (level) {
case 'global':
return 'global';
case 'queue':
return `queue:${queueName}`;
case 'handler':
return `handler:${queueName}:${handler}`;
case 'operation':
return `operation:${queueName}:${handler}:${operation}`;
default:
return level;
}
}
/**
* Get consumer key for rate limiting (what gets counted)
*/
private getConsumerKey(queueName: string, handler: string, operation: string): string {
return `${queueName}:${handler}:${operation}`;
}
/**
* Get current rate limit status for a queue/handler/operation
*/
async getStatus(queueName: string, handler: string, operation: string): Promise<{
queueName: string;
handler: string;
operation: string;
appliedRule?: RateLimitRule;
limit?: {
level: string;
points: number;
duration: number;
remaining: number;
resetIn: number;
};
}> {
const applicableRule = this.getMostSpecificRule(queueName, handler, operation);
if (!applicableRule) {
return {
queueName,
handler,
operation,
};
}
const key = this.getRuleKey(applicableRule.level, applicableRule.queueName, applicableRule.handler, applicableRule.operation);
const limiter = this.limiters.get(key);
if (!limiter) {
return {
queueName,
handler,
operation,
appliedRule: applicableRule,
};
}
try {
const consumerKey = this.getConsumerKey(queueName, handler, operation);
const result = await limiter.get(consumerKey);
const limit = {
level: applicableRule.level,
points: limiter.points,
duration: limiter.duration,
remaining: result?.remainingPoints ?? limiter.points,
resetIn: result?.msBeforeNext ?? 0,
};
return {
queueName,
handler,
operation,
appliedRule: applicableRule,
limit,
};
} catch (error) {
logger.error('Failed to get rate limit status', { queueName, handler, operation, error });
return {
queueName,
handler,
operation,
appliedRule: applicableRule,
};
}
}
/**
* Reset rate limits for a specific consumer
*/
async reset(queueName: string, handler?: string, operation?: string): Promise<void> {
if (handler && operation) {
// Reset specific operation
const consumerKey = this.getConsumerKey(queueName, handler, operation);
const rule = this.getMostSpecificRule(queueName, handler, operation);
if (rule) {
const key = this.getRuleKey(rule.level, rule.queueName, rule.handler, rule.operation);
const limiter = this.limiters.get(key);
if (limiter) {
await limiter.delete(consumerKey);
}
}
} else {
// Reset broader scope - this is more complex with the new hierarchy
logger.warn('Broad reset not implemented yet', { queueName, handler, operation });
}
logger.info('Rate limits reset', { queueName, handler, operation });
}
/**
* Get all configured rate limit rules
*/
getRules(): RateLimitRule[] {
return [...this.rules];
}
/**
* Remove a rate limit rule
*/
removeRule(level: string, queueName?: string, handler?: string, operation?: string): boolean {
const key = this.getRuleKey(level, queueName, handler, operation);
const ruleIndex = this.rules.findIndex(r =>
r.level === level &&
(!queueName || r.queueName === queueName) &&
(!handler || r.handler === handler) &&
(!operation || r.operation === operation)
);
if (ruleIndex >= 0) {
this.rules.splice(ruleIndex, 1);
this.limiters.delete(key);
logger.info('Rate limit rule removed', { level, queueName, handler, operation });
return true;
}
return false;
}
}

View file

@ -0,0 +1,208 @@
// Types for queue operations
export interface JobData<T = unknown> {
handler: string;
operation: string;
payload: T;
priority?: number;
}
export interface ProcessOptions {
totalDelayHours: number;
batchSize?: number;
priority?: number;
useBatching?: boolean;
retries?: number;
ttl?: number;
removeOnComplete?: number;
removeOnFail?: number;
// Job routing information
handler?: string;
operation?: string;
}
export interface BatchResult {
jobsCreated: number;
mode: 'direct' | 'batch';
totalItems: number;
batchesCreated?: number;
duration: number;
}
// New improved types for the refactored architecture
export interface RedisConfig {
host: string;
port: number;
password?: string;
db?: number;
}
export interface JobOptions {
priority?: number;
delay?: number;
attempts?: number;
removeOnComplete?: number;
removeOnFail?: number;
backoff?: {
type: 'exponential' | 'fixed';
delay: number;
};
repeat?: {
pattern?: string;
key?: string;
limit?: number;
every?: number;
immediately?: boolean;
};
}
export interface QueueOptions {
defaultJobOptions?: JobOptions;
workers?: number;
concurrency?: number;
enableMetrics?: boolean;
enableDLQ?: boolean;
enableRateLimit?: boolean;
rateLimitRules?: RateLimitRule[]; // Queue-specific rate limit rules
}
export interface QueueManagerConfig {
redis: RedisConfig;
defaultQueueOptions?: QueueOptions;
enableScheduledJobs?: boolean;
globalRateLimit?: RateLimitConfig;
rateLimitRules?: RateLimitRule[]; // Global rate limit rules
delayWorkerStart?: boolean; // If true, workers won't start automatically
}
export interface QueueStats {
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
paused: boolean;
workers?: number;
}
export interface GlobalStats {
queues: Record<string, QueueStats>;
totalJobs: number;
totalWorkers: number;
uptime: number;
}
// Legacy type for backward compatibility
export interface QueueConfig extends QueueManagerConfig {
queueName?: string;
workers?: number;
concurrency?: number;
handlers?: HandlerInitializer[];
dlqConfig?: DLQConfig;
enableMetrics?: boolean;
}
export interface JobHandler<TPayload = unknown, TResult = unknown> {
(payload: TPayload): Promise<TResult>;
}
// Type-safe wrapper for creating job handlers
export type TypedJobHandler<TPayload, TResult = unknown> = (payload: TPayload) => Promise<TResult>;
// Helper to create type-safe job handlers
export function createJobHandler<TPayload = unknown, TResult = unknown>(
handler: TypedJobHandler<TPayload, TResult>
): JobHandler<unknown, TResult> {
return async (payload: unknown): Promise<TResult> => {
return handler(payload as TPayload);
};
}
export interface ScheduledJob<T = unknown> {
type: string;
operation: string;
payload?: T;
cronPattern: string;
priority?: number;
description?: string;
immediately?: boolean;
delay?: number;
}
export interface HandlerConfig {
[operation: string]: JobHandler;
}
// Type-safe handler configuration
export type TypedHandlerConfig<T extends Record<string, JobHandler> = Record<string, JobHandler>> = {
[K in keyof T]: T[K];
};
export interface HandlerConfigWithSchedule {
name: string;
operations: Record<string, JobHandler>;
scheduledJobs?: ScheduledJob[];
// Rate limiting
rateLimit?: RateLimitConfig;
operationLimits?: Record<string, RateLimitConfig>;
}
// Type-safe version of HandlerConfigWithSchedule
export interface TypedHandlerConfigWithSchedule<T extends Record<string, JobHandler> = Record<string, JobHandler>> {
name: string;
operations: T;
scheduledJobs?: ScheduledJob[];
// Rate limiting
rateLimit?: RateLimitConfig;
operationLimits?: Record<string, RateLimitConfig>;
}
export interface BatchJobData {
payloadKey: string;
batchIndex: number;
totalBatches: number;
itemCount: number;
totalDelayHours: number; // Total time to distribute all batches
}
export interface HandlerInitializer {
(): void | Promise<void>;
}
// Rate limiting types
export interface RateLimitConfig {
points: number;
duration: number;
blockDuration?: number;
}
export interface RateLimitRule {
level: 'global' | 'queue' | 'handler' | 'operation';
queueName?: string; // For queue-level limits
handler?: string; // For handler-level limits
operation?: string; // For operation-level limits (most specific)
config: RateLimitConfig;
}
// DLQ types
export interface DLQConfig {
maxRetries?: number;
retryDelay?: number;
alertThreshold?: number;
cleanupAge?: number;
}
export interface DLQJobInfo {
id: string;
name: string;
failedReason: string;
attemptsMade: number;
timestamp: number;
data: unknown;
}
export interface ScheduleConfig {
pattern: string;
jobName: string;
data?: unknown;
options?: JobOptions;
}

View file

@ -0,0 +1,28 @@
import type { RedisConfig } from './types';
/**
* Get Redis connection configuration with retry settings
*/
export function getRedisConnection(config: RedisConfig) {
const isTest = process.env.NODE_ENV === 'test' || process.env['BUNIT'] === '1';
return {
host: config.host,
port: config.port,
password: config.password,
db: config.db,
maxRetriesPerRequest: null, // Required by BullMQ
enableReadyCheck: false,
connectTimeout: isTest ? 1000 : 3000,
lazyConnect: false, // Changed from true to ensure connection is established immediately
keepAlive: true, // Changed from false to maintain persistent connections
retryStrategy: (times: number) => {
const maxRetries = isTest ? 1 : 3;
if (times > maxRetries) {
return null; // Stop retrying
}
const delay = isTest ? 100 : Math.min(times * 100, 3000);
return delay;
},
};
}