reworked queue lib

This commit is contained in:
Boki 2025-06-19 07:20:14 -04:00
parent 629ba2b8d4
commit c05a7413dc
34 changed files with 3887 additions and 861 deletions

View file

@ -1,8 +1,11 @@
import { Queue, QueueEvents, Worker, type Job } from 'bullmq';
import { getLogger } from '@stock-bot/logger';
import { processBatchJob } from './batch-processor';
import { providerRegistry } from './provider-registry';
import type { JobData, ProviderConfig, ProviderInitializer, QueueConfig } from './types';
import { DeadLetterQueueHandler } from './dlq-handler';
import { handlerRegistry } from './handler-registry';
import { QueueMetricsCollector } from './queue-metrics';
import { QueueRateLimiter, type RateLimitRule } from './rate-limiter';
import type { HandlerConfig, HandlerInitializer, JobData, QueueConfig } from './types';
import { getRedisConnection } from './utils';
const logger = getLogger('queue-manager');
@ -11,8 +14,11 @@ export class QueueManager {
private workers: Worker[] = [];
private queueEvents!: QueueEvents;
private config: Required<QueueConfig>;
private providers: ProviderInitializer[];
private handlers: HandlerInitializer[];
private enableScheduledJobs: boolean;
private dlqHandler?: DeadLetterQueueHandler;
private metricsCollector?: QueueMetricsCollector;
private rateLimiter?: QueueRateLimiter;
private get isInitialized() {
return !!this.queue;
@ -27,18 +33,18 @@ export class QueueManager {
constructor(config: QueueConfig = {}) {
// Enhanced configuration
this.providers = config.providers || [];
this.handlers = config.handlers || [];
this.enableScheduledJobs = config.enableScheduledJobs ?? true;
// Set default configuration
this.config = {
workers: config.workers || parseInt(process.env.WORKER_COUNT || '5'),
concurrency: config.concurrency || parseInt(process.env.WORKER_CONCURRENCY || '20'),
workers: config.workers ?? 5,
concurrency: config.concurrency ?? 20,
redis: {
host: config.redis?.host || process.env.DRAGONFLY_HOST || 'localhost',
port: config.redis?.port || parseInt(process.env.DRAGONFLY_PORT || '6379'),
password: config.redis?.password || process.env.DRAGONFLY_PASSWORD,
db: config.redis?.db || parseInt(process.env.DRAGONFLY_DB || '0'),
host: config.redis?.host || 'localhost',
port: config.redis?.port || 6379,
password: config.redis?.password || '',
db: config.redis?.db || 0,
},
queueName: config.queueName || 'default-queue',
defaultJobOptions: {
@ -51,13 +57,19 @@ export class QueueManager {
},
...config.defaultJobOptions,
},
providers: this.providers,
handlers: this.handlers,
enableScheduledJobs: this.enableScheduledJobs,
enableRateLimit: config.enableRateLimit || false,
globalRateLimit: config.globalRateLimit,
enableDLQ: config.enableDLQ || false,
dlqConfig: config.dlqConfig,
enableMetrics: config.enableMetrics || false,
rateLimitRules: config.rateLimitRules || [],
};
}
/**
* Initialize the queue manager with enhanced provider and scheduled job support
* Initialize the queue manager with enhanced handler and scheduled job support
*/
async initialize(): Promise<void> {
if (this.isInitialized) {
@ -69,13 +81,13 @@ export class QueueManager {
queueName: this.config.queueName,
workers: this.config.workers,
concurrency: this.config.concurrency,
providers: this.providers.length,
handlers: this.handlers.length,
enableScheduledJobs: this.enableScheduledJobs,
});
try {
// Step 1: Register all providers
await this.registerProviders();
// Step 1: Register all handlers
await this.registerHandlers();
// Step 2: Initialize core queue infrastructure
const connection = this.getConnection();
@ -90,15 +102,39 @@ export class QueueManager {
// Initialize queue events
this.queueEvents = new QueueEvents(queueName, { connection });
// Step 3: Start workers
// Wait for queue to be ready
await this.queue.waitUntilReady();
// Step 3: Initialize DLQ handler if enabled
if (this.config.enableDLQ) {
this.dlqHandler = new DeadLetterQueueHandler(this.queue, connection, this.config.dlqConfig);
}
// Step 4: Initialize metrics collector if enabled
if (this.config.enableMetrics) {
this.metricsCollector = new QueueMetricsCollector(this.queue, this.queueEvents);
}
// Step 5: Initialize rate limiter if enabled
if (this.config.enableRateLimit && this.config.rateLimitRules) {
const redis = await this.getRedisClient();
this.rateLimiter = new QueueRateLimiter(redis);
// Add configured rate limit rules
for (const rule of this.config.rateLimitRules) {
this.rateLimiter.addRule(rule);
}
}
// Step 6: Start workers
await this.startWorkers();
// Step 4: Setup event listeners
// Step 7: Setup event listeners
this.setupEventListeners();
// Step 5: Batch cache will be initialized by individual Queue instances
// Step 8: Batch cache will be initialized by individual Queue instances
// Step 6: Set up scheduled jobs
// Step 9: Set up scheduled jobs
if (this.enableScheduledJobs) {
await this.setupScheduledJobs();
}
@ -111,45 +147,45 @@ export class QueueManager {
}
/**
* Register all configured providers
* Register all configured handlers
*/
private async registerProviders(): Promise<void> {
logger.info('Registering queue providers...', { count: this.providers.length });
private async registerHandlers(): Promise<void> {
logger.info('Registering queue handlers...', { count: this.handlers.length });
// Initialize providers using the configured provider initializers
for (const providerInitializer of this.providers) {
// Initialize handlers using the configured handler initializers
for (const handlerInitializer of this.handlers) {
try {
await providerInitializer();
await handlerInitializer();
} catch (error) {
logger.error('Failed to initialize provider', { error });
logger.error('Failed to initialize handler', { error });
throw error;
}
}
// Now register all providers from the registry with the queue manager
const allProviders = providerRegistry.getAllProviders();
for (const [providerName, config] of allProviders) {
this.registerProvider(providerName, config.operations);
logger.info(`Registered provider: ${providerName}`);
// Now register all handlers from the registry with the queue manager
const allHandlers = handlerRegistry.getAllHandlers();
for (const [handlerName, config] of allHandlers) {
this.registerHandler(handlerName, config.operations);
logger.info(`Registered handler: ${handlerName}`);
}
// Log scheduled jobs
const scheduledJobs = providerRegistry.getAllScheduledJobs();
logger.info(`Registered ${scheduledJobs.length} scheduled jobs across all providers`);
for (const { provider, job } of scheduledJobs) {
const scheduledJobs = handlerRegistry.getAllScheduledJobs();
logger.info(`Registered ${scheduledJobs.length} scheduled jobs across all handlers`);
for (const { handler, job } of scheduledJobs) {
logger.info(
`Scheduled job: ${provider}.${job.type} - ${job.description} (${job.cronPattern})`
`Scheduled job: ${handler}.${job.type} - ${job.description} (${job.cronPattern})`
);
}
logger.info('All providers registered successfully');
logger.info('All handlers registered successfully');
}
/**
* Set up scheduled jobs from provider registry
* Set up scheduled jobs from handler registry
*/
private async setupScheduledJobs(): Promise<void> {
const scheduledJobs = providerRegistry.getAllScheduledJobs();
const scheduledJobs = handlerRegistry.getAllScheduledJobs();
if (scheduledJobs.length === 0) {
logger.info('No scheduled jobs found');
@ -158,17 +194,17 @@ export class QueueManager {
logger.info(`Setting up ${scheduledJobs.length} scheduled jobs...`);
for (const { provider, job } of scheduledJobs) {
for (const { handler, job } of scheduledJobs) {
try {
const jobData: JobData = {
type: job.type,
provider,
handler,
operation: job.operation,
payload: job.payload,
priority: job.priority,
};
await this.add(`recurring-${provider}-${job.operation}`, jobData, {
await this.add(`recurring-${handler}-${job.operation}`, jobData, {
repeat: {
pattern: job.cronPattern,
tz: 'UTC',
@ -184,9 +220,9 @@ export class QueueManager {
},
});
logger.info(`Scheduled job registered: ${provider}.${job.type} (${job.cronPattern})`);
logger.info(`Scheduled job registered: ${handler}.${job.type} (${job.cronPattern})`);
} catch (error) {
logger.error(`Failed to register scheduled job: ${provider}.${job.type}`, { error });
logger.error(`Failed to register scheduled job: ${handler}.${job.type}`, { error });
}
}
@ -194,10 +230,10 @@ export class QueueManager {
}
/**
* Register a provider with its operations
* Register a handler with its operations
*/
registerProvider(providerName: string, config: ProviderConfig): void {
providerRegistry.register(providerName, config);
registerHandler(handlerName: string, config: HandlerConfig): void {
handlerRegistry.register(handlerName, config);
}
/**
@ -290,41 +326,142 @@ export class QueueManager {
return this.config.redis;
}
/**
* Get queue metrics
*/
async getMetrics() {
if (!this.metricsCollector) {
throw new Error('Metrics not enabled. Set enableMetrics: true in config');
}
return this.metricsCollector.collect();
}
/**
* Get metrics report
*/
async getMetricsReport(): Promise<string> {
if (!this.metricsCollector) {
throw new Error('Metrics not enabled. Set enableMetrics: true in config');
}
return this.metricsCollector.getReport();
}
/**
* Get DLQ stats
*/
async getDLQStats() {
if (!this.dlqHandler) {
throw new Error('DLQ not enabled. Set enableDLQ: true in config');
}
return this.dlqHandler.getStats();
}
/**
* Retry jobs from DLQ
*/
async retryDLQJobs(limit = 10) {
if (!this.dlqHandler) {
throw new Error('DLQ not enabled. Set enableDLQ: true in config');
}
return this.dlqHandler.retryDLQJobs(limit);
}
/**
* Add rate limit rule
*/
addRateLimitRule(rule: RateLimitRule): void {
if (!this.rateLimiter) {
throw new Error('Rate limiting not enabled. Set enableRateLimit: true in config');
}
this.rateLimiter.addRule(rule);
}
/**
* Get rate limit status
*/
async getRateLimitStatus(handler: string, operation: string) {
if (!this.rateLimiter) {
throw new Error('Rate limiting not enabled. Set enableRateLimit: true in config');
}
return this.rateLimiter.getStatus(handler, operation);
}
/**
* Shutdown the queue manager
*/
async shutdown(): Promise<void> {
logger.info('Shutting down queue manager...');
const shutdownTasks: Promise<void>[] = [];
try {
// Shutdown DLQ handler
if (this.dlqHandler) {
shutdownTasks.push(
this.dlqHandler.shutdown().catch(err =>
logger.warn('Error shutting down DLQ handler', { error: err })
)
);
}
// Close workers
await Promise.all(this.workers.map(worker => worker.close()));
this.workers = [];
if (this.workers.length > 0) {
shutdownTasks.push(
Promise.all(
this.workers.map(worker =>
worker.close().catch(err =>
logger.warn('Error closing worker', { error: err })
)
)
).then(() => {
this.workers = [];
})
);
}
// Close queue events
if (this.queueEvents) {
await this.queueEvents.close();
shutdownTasks.push(
this.queueEvents.close().catch(err =>
logger.warn('Error closing queue events', { error: err })
)
);
}
// Close queue
if (this.queue) {
await this.queue.close();
shutdownTasks.push(
this.queue.close().catch(err =>
logger.warn('Error closing queue', { error: err })
)
);
}
// Wait for all shutdown tasks with a timeout
await Promise.race([
Promise.all(shutdownTasks),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Shutdown timeout')), 5000)
)
]).catch(err => {
logger.warn('Some shutdown tasks did not complete cleanly', { error: err });
});
logger.info('Queue manager shutdown complete');
} catch (error) {
logger.error('Error during queue manager shutdown', { error });
throw error;
// Don't throw in shutdown to avoid hanging tests
}
}
private getConnection() {
return {
host: this.config.redis.host,
port: this.config.redis.port,
password: this.config.redis.password,
db: this.config.redis.db,
};
return getRedisConnection(this.config.redis);
}
private async getRedisClient() {
// Create a redis client for rate limiting
const Redis = require('ioredis');
return new Redis(this.getConnection());
}
private async startWorkers(): Promise<void> {
@ -359,30 +496,45 @@ export class QueueManager {
}
private async processJob(job: Job) {
const { provider, operation, payload }: JobData = job.data;
const { handler, operation, payload }: JobData = job.data;
logger.info('Processing job', {
id: job.id,
provider,
handler,
operation,
payloadKeys: Object.keys(payload || {}),
});
try {
let result;
// Check rate limits if enabled
if (this.rateLimiter) {
const rateLimit = await this.rateLimiter.checkLimit(handler, operation);
if (!rateLimit.allowed) {
// Reschedule job with delay
const delay = rateLimit.retryAfter || 60000;
logger.warn('Job rate limited, rescheduling', {
id: job.id,
handler,
operation,
retryAfter: delay,
});
// Regular handler lookup
const handler = providerRegistry.getHandler(provider, operation);
if (!handler) {
throw new Error(`No handler found for ${provider}:${operation}`);
throw new Error(`Rate limited. Retry after ${delay}ms`);
}
}
result = await handler(payload);
// Regular handler lookup
const jobHandler = handlerRegistry.getHandler(handler, operation);
if (!jobHandler) {
throw new Error(`No handler found for ${handler}:${operation}`);
}
const result = await jobHandler(payload);
logger.info('Job completed successfully', {
id: job.id,
provider,
handler,
operation,
});
@ -390,10 +542,16 @@ export class QueueManager {
} catch (error) {
logger.error('Job processing failed', {
id: job.id,
provider,
handler,
operation,
error: error instanceof Error ? error.message : String(error),
});
// Handle DLQ if enabled
if (this.dlqHandler && error instanceof Error) {
await this.dlqHandler.handleFailedJob(job, error);
}
throw error;
}
}