restructured libs to be more aligned with core components
This commit is contained in:
parent
947b1d748d
commit
0d1be9e3cb
50 changed files with 73 additions and 67 deletions
345
libs/core/queue/src/batch-processor.ts
Normal file
345
libs/core/queue/src/batch-processor.ts
Normal file
|
|
@ -0,0 +1,345 @@
|
|||
import { QueueManager } from './queue-manager';
|
||||
import type { BatchJobData, BatchResult, JobData, ProcessOptions } from './types';
|
||||
|
||||
/**
|
||||
* Main function - processes items either directly or in batches
|
||||
* Each item becomes payload: item (no processing needed)
|
||||
*/
|
||||
export async function processItems<T>(
|
||||
items: T[],
|
||||
queueName: string,
|
||||
options: ProcessOptions,
|
||||
queueManager: QueueManager
|
||||
): Promise<BatchResult> {
|
||||
const queue = queueManager.getQueue(queueName);
|
||||
const logger = queue.createChildLogger('batch-processor', {
|
||||
queueName,
|
||||
totalItems: items.length,
|
||||
mode: options.useBatching ? 'batch' : 'direct',
|
||||
});
|
||||
const startTime = Date.now();
|
||||
|
||||
if (items.length === 0) {
|
||||
return {
|
||||
jobsCreated: 0,
|
||||
mode: 'direct',
|
||||
totalItems: 0,
|
||||
duration: 0,
|
||||
};
|
||||
}
|
||||
|
||||
logger.info('Starting batch processing', {
|
||||
totalItems: items.length,
|
||||
mode: options.useBatching ? 'batch' : 'direct',
|
||||
batchSize: options.batchSize,
|
||||
totalDelayHours: options.totalDelayHours,
|
||||
});
|
||||
|
||||
try {
|
||||
const result = options.useBatching
|
||||
? await processBatched(items, queueName, options, queueManager)
|
||||
: await processDirect(items, queueName, options, queueManager);
|
||||
|
||||
const duration = Date.now() - startTime;
|
||||
|
||||
logger.info('Batch processing completed', {
|
||||
...result,
|
||||
duration: `${(duration / 1000).toFixed(1)}s`,
|
||||
});
|
||||
|
||||
return { ...result, duration };
|
||||
} catch (error) {
|
||||
logger.error('Batch processing failed', { error });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Process items directly - each item becomes a separate job
|
||||
*/
|
||||
async function processDirect<T>(
|
||||
items: T[],
|
||||
queueName: string,
|
||||
options: ProcessOptions,
|
||||
queueManager: QueueManager
|
||||
): Promise<Omit<BatchResult, 'duration'>> {
|
||||
const queue = queueManager.getQueue(queueName);
|
||||
const logger = queue.createChildLogger('batch-direct', {
|
||||
queueName,
|
||||
totalItems: items.length,
|
||||
});
|
||||
const totalDelayMs = options.totalDelayHours * 60 * 60 * 1000; // Convert hours to milliseconds
|
||||
const delayPerItem = totalDelayMs / items.length;
|
||||
|
||||
logger.info('Creating direct jobs', {
|
||||
totalItems: items.length,
|
||||
delayPerItem: `${(delayPerItem / 1000).toFixed(1)}s`,
|
||||
});
|
||||
|
||||
const jobs = items.map((item, index) => ({
|
||||
name: 'process-item',
|
||||
data: {
|
||||
handler: options.handler || 'generic',
|
||||
operation: options.operation || 'process-item',
|
||||
payload: item, // Just the item directly - no wrapper!
|
||||
priority: options.priority || undefined,
|
||||
},
|
||||
opts: {
|
||||
delay: index * delayPerItem,
|
||||
priority: options.priority || undefined,
|
||||
attempts: options.retries || 3,
|
||||
removeOnComplete: options.removeOnComplete || 10,
|
||||
removeOnFail: options.removeOnFail || 5,
|
||||
},
|
||||
}));
|
||||
|
||||
const createdJobs = await addJobsInChunks(queueName, jobs, queueManager);
|
||||
|
||||
return {
|
||||
totalItems: items.length,
|
||||
jobsCreated: createdJobs.length,
|
||||
mode: 'direct',
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Process items in batches - store items directly
|
||||
*/
|
||||
async function processBatched<T>(
|
||||
items: T[],
|
||||
queueName: string,
|
||||
options: ProcessOptions,
|
||||
queueManager: QueueManager
|
||||
): Promise<Omit<BatchResult, 'duration'>> {
|
||||
const queue = queueManager.getQueue(queueName);
|
||||
const logger = queue.createChildLogger('batch-batched', {
|
||||
queueName,
|
||||
totalItems: items.length,
|
||||
});
|
||||
const batchSize = options.batchSize || 100;
|
||||
const batches = createBatches(items, batchSize);
|
||||
const totalDelayMs = options.totalDelayHours * 60 * 60 * 1000; // Convert hours to milliseconds
|
||||
const delayPerBatch = totalDelayMs / batches.length;
|
||||
|
||||
logger.info('Creating batch jobs', {
|
||||
totalItems: items.length,
|
||||
batchSize,
|
||||
totalBatches: batches.length,
|
||||
delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes`,
|
||||
});
|
||||
|
||||
const batchJobs = await Promise.all(
|
||||
batches.map(async (batch, batchIndex) => {
|
||||
// Just store the items directly - no processing needed
|
||||
const payloadKey = await storeItems(batch, queueName, options, queueManager);
|
||||
|
||||
return {
|
||||
name: 'process-batch',
|
||||
data: {
|
||||
handler: options.handler || 'generic',
|
||||
operation: 'process-batch-items',
|
||||
payload: {
|
||||
payloadKey,
|
||||
batchIndex,
|
||||
totalBatches: batches.length,
|
||||
itemCount: batch.length,
|
||||
totalDelayHours: options.totalDelayHours,
|
||||
} as BatchJobData,
|
||||
priority: options.priority || undefined,
|
||||
},
|
||||
opts: {
|
||||
delay: batchIndex * delayPerBatch,
|
||||
priority: options.priority || undefined,
|
||||
attempts: options.retries || 3,
|
||||
removeOnComplete: options.removeOnComplete || 10,
|
||||
removeOnFail: options.removeOnFail || 5,
|
||||
},
|
||||
};
|
||||
})
|
||||
);
|
||||
|
||||
const createdJobs = await addJobsInChunks(queueName, batchJobs, queueManager);
|
||||
|
||||
return {
|
||||
totalItems: items.length,
|
||||
jobsCreated: createdJobs.length,
|
||||
batchesCreated: batches.length,
|
||||
mode: 'batch',
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a batch job - loads items and creates individual jobs
|
||||
*/
|
||||
export async function processBatchJob(jobData: BatchJobData, queueName: string, queueManager: QueueManager): Promise<unknown> {
|
||||
const queue = queueManager.getQueue(queueName);
|
||||
const logger = queue.createChildLogger('batch-job', {
|
||||
queueName,
|
||||
batchIndex: jobData.batchIndex,
|
||||
payloadKey: jobData.payloadKey,
|
||||
});
|
||||
const { payloadKey, batchIndex, totalBatches, itemCount, totalDelayHours } = jobData;
|
||||
|
||||
logger.debug('Processing batch job', {
|
||||
batchIndex,
|
||||
totalBatches,
|
||||
itemCount,
|
||||
totalDelayHours,
|
||||
});
|
||||
|
||||
try {
|
||||
const payload = await loadPayload(payloadKey, queueName, queueManager);
|
||||
if (!payload || !payload.items || !payload.options) {
|
||||
logger.error('Invalid payload data', { payloadKey, payload });
|
||||
throw new Error(`Invalid payload data for key: ${payloadKey}`);
|
||||
}
|
||||
|
||||
const { items, options } = payload;
|
||||
|
||||
// Calculate the time window for this batch
|
||||
const totalDelayMs = totalDelayHours * 60 * 60 * 1000; // Convert hours to ms
|
||||
const delayPerBatch = totalDelayMs / totalBatches; // Time allocated for each batch
|
||||
const delayPerItem = delayPerBatch / items.length; // Distribute items evenly within batch window
|
||||
|
||||
logger.debug('Calculating job delays', {
|
||||
batchIndex,
|
||||
delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes`,
|
||||
delayPerItem: `${(delayPerItem / 1000).toFixed(2)} seconds`,
|
||||
itemsInBatch: items.length,
|
||||
});
|
||||
|
||||
// Create jobs directly from items - each item becomes payload: item
|
||||
const jobs = items.map((item: unknown, index: number) => ({
|
||||
name: 'process-item',
|
||||
data: {
|
||||
handler: options.handler || 'generic',
|
||||
operation: options.operation || 'generic',
|
||||
payload: item, // Just the item directly!
|
||||
priority: options.priority || undefined,
|
||||
},
|
||||
opts: {
|
||||
delay: index * delayPerItem, // Distribute evenly within batch window
|
||||
priority: options.priority || undefined,
|
||||
attempts: options.retries || 3,
|
||||
},
|
||||
}));
|
||||
|
||||
const createdJobs = await addJobsInChunks(queueName, jobs, queueManager);
|
||||
|
||||
// Cleanup payload after successful processing
|
||||
await cleanupPayload(payloadKey, queueName, queueManager);
|
||||
|
||||
return {
|
||||
batchIndex,
|
||||
itemsProcessed: items.length,
|
||||
jobsCreated: createdJobs.length,
|
||||
};
|
||||
} catch (error) {
|
||||
logger.error('Batch job processing failed', { batchIndex, error });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
// Helper functions
|
||||
|
||||
function createBatches<T>(items: T[], batchSize: number): T[][] {
|
||||
const batches: T[][] = [];
|
||||
for (let i = 0; i < items.length; i += batchSize) {
|
||||
batches.push(items.slice(i, i + batchSize));
|
||||
}
|
||||
return batches;
|
||||
}
|
||||
|
||||
async function storeItems<T>(
|
||||
items: T[],
|
||||
queueName: string,
|
||||
options: ProcessOptions,
|
||||
queueManager: QueueManager
|
||||
): Promise<string> {
|
||||
const cache = queueManager.getCache(queueName);
|
||||
const payloadKey = `payload:${Date.now()}:${Math.random().toString(36).substr(2, 9)}`;
|
||||
|
||||
const payload = {
|
||||
items, // Just store the items directly
|
||||
options: {
|
||||
delayPerItem: 1000,
|
||||
priority: options.priority || undefined,
|
||||
retries: options.retries || 3,
|
||||
handler: options.handler || 'generic',
|
||||
operation: options.operation || 'generic',
|
||||
},
|
||||
createdAt: new Date().toISOString(),
|
||||
};
|
||||
|
||||
const ttlSeconds = options.ttl || 86400; // 24 hours default
|
||||
await cache.set(payloadKey, payload, ttlSeconds);
|
||||
|
||||
return payloadKey;
|
||||
}
|
||||
|
||||
async function loadPayload<T>(
|
||||
key: string,
|
||||
queueName: string,
|
||||
queueManager: QueueManager
|
||||
): Promise<{
|
||||
items: T[];
|
||||
options: {
|
||||
delayPerItem: number;
|
||||
priority?: number;
|
||||
retries: number;
|
||||
handler: string;
|
||||
operation: string;
|
||||
};
|
||||
} | null> {
|
||||
const cache = queueManager.getCache(queueName);
|
||||
return (await cache.get(key)) as {
|
||||
items: T[];
|
||||
options: {
|
||||
delayPerItem: number;
|
||||
priority?: number;
|
||||
retries: number;
|
||||
handler: string;
|
||||
operation: string;
|
||||
};
|
||||
} | null;
|
||||
}
|
||||
|
||||
async function cleanupPayload(key: string, queueName: string, queueManager: QueueManager): Promise<void> {
|
||||
const cache = queueManager.getCache(queueName);
|
||||
await cache.del(key);
|
||||
}
|
||||
|
||||
async function addJobsInChunks(
|
||||
queueName: string,
|
||||
jobs: Array<{ name: string; data: JobData; opts?: Record<string, unknown> }>,
|
||||
queueManager: QueueManager,
|
||||
chunkSize = 100
|
||||
): Promise<unknown[]> {
|
||||
const queue = queueManager.getQueue(queueName);
|
||||
const logger = queue.createChildLogger('batch-chunk', {
|
||||
queueName,
|
||||
totalJobs: jobs.length,
|
||||
});
|
||||
const allCreatedJobs = [];
|
||||
|
||||
for (let i = 0; i < jobs.length; i += chunkSize) {
|
||||
const chunk = jobs.slice(i, i + chunkSize);
|
||||
try {
|
||||
const createdJobs = await queue.addBulk(chunk);
|
||||
allCreatedJobs.push(...createdJobs);
|
||||
|
||||
// Small delay between chunks to avoid overwhelming Redis
|
||||
if (i + chunkSize < jobs.length) {
|
||||
await new Promise(resolve => setTimeout(resolve, 100));
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Failed to add job chunk', {
|
||||
startIndex: i,
|
||||
chunkSize: chunk.length,
|
||||
error,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return allCreatedJobs;
|
||||
}
|
||||
257
libs/core/queue/src/dlq-handler.ts
Normal file
257
libs/core/queue/src/dlq-handler.ts
Normal file
|
|
@ -0,0 +1,257 @@
|
|||
import { Queue, type Job } from 'bullmq';
|
||||
import type { DLQConfig, RedisConfig } from './types';
|
||||
import { getRedisConnection } from './utils';
|
||||
|
||||
// Logger interface for type safety
|
||||
interface Logger {
|
||||
info(message: string, meta?: Record<string, unknown>): void;
|
||||
error(message: string, meta?: Record<string, unknown>): void;
|
||||
warn(message: string, meta?: Record<string, unknown>): void;
|
||||
debug(message: string, meta?: Record<string, unknown>): void;
|
||||
}
|
||||
|
||||
export class DeadLetterQueueHandler {
|
||||
private dlq: Queue;
|
||||
private config: Required<DLQConfig>;
|
||||
private failureCount = new Map<string, number>();
|
||||
private readonly logger: Logger;
|
||||
|
||||
constructor(
|
||||
private mainQueue: Queue,
|
||||
connection: RedisConfig,
|
||||
config: DLQConfig = {},
|
||||
logger?: Logger
|
||||
) {
|
||||
this.logger = logger || console;
|
||||
this.config = {
|
||||
maxRetries: config.maxRetries ?? 3,
|
||||
retryDelay: config.retryDelay ?? 60000, // 1 minute
|
||||
alertThreshold: config.alertThreshold ?? 100,
|
||||
cleanupAge: config.cleanupAge ?? 168, // 7 days
|
||||
};
|
||||
|
||||
// Create DLQ with same name but -dlq suffix
|
||||
const dlqName = `${mainQueue.name}-dlq`;
|
||||
this.dlq = new Queue(dlqName, { connection: getRedisConnection(connection) });
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a failed job - either retry or move to DLQ
|
||||
*/
|
||||
async handleFailedJob(job: Job, error: Error): Promise<void> {
|
||||
const jobKey = `${job.name}:${job.id}`;
|
||||
const currentFailures = (this.failureCount.get(jobKey) || 0) + 1;
|
||||
this.failureCount.set(jobKey, currentFailures);
|
||||
|
||||
this.logger.warn('Job failed', {
|
||||
jobId: job.id,
|
||||
jobName: job.name,
|
||||
attempt: job.attemptsMade,
|
||||
maxAttempts: job.opts.attempts,
|
||||
error: error.message,
|
||||
failureCount: currentFailures,
|
||||
});
|
||||
|
||||
// Check if job should be moved to DLQ
|
||||
if (job.attemptsMade >= (job.opts.attempts || this.config.maxRetries)) {
|
||||
await this.moveToDeadLetterQueue(job, error);
|
||||
this.failureCount.delete(jobKey);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Move job to dead letter queue
|
||||
*/
|
||||
private async moveToDeadLetterQueue(job: Job, error: Error): Promise<void> {
|
||||
try {
|
||||
const dlqData = {
|
||||
originalJob: {
|
||||
id: job.id,
|
||||
name: job.name,
|
||||
data: job.data,
|
||||
opts: job.opts,
|
||||
attemptsMade: job.attemptsMade,
|
||||
failedReason: job.failedReason,
|
||||
processedOn: job.processedOn,
|
||||
timestamp: job.timestamp,
|
||||
},
|
||||
error: {
|
||||
message: error.message,
|
||||
stack: error.stack,
|
||||
name: error.name,
|
||||
},
|
||||
movedToDLQAt: new Date().toISOString(),
|
||||
};
|
||||
|
||||
await this.dlq.add('failed-job', dlqData, {
|
||||
removeOnComplete: 100,
|
||||
removeOnFail: 50,
|
||||
});
|
||||
|
||||
this.logger.error('Job moved to DLQ', {
|
||||
jobId: job.id,
|
||||
jobName: job.name,
|
||||
error: error.message,
|
||||
});
|
||||
|
||||
// Check if we need to alert
|
||||
await this.checkAlertThreshold();
|
||||
} catch (dlqError) {
|
||||
this.logger.error('Failed to move job to DLQ', {
|
||||
jobId: job.id,
|
||||
error: dlqError,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retry jobs from DLQ
|
||||
*/
|
||||
async retryDLQJobs(limit = 10): Promise<number> {
|
||||
const jobs = await this.dlq.getCompleted(0, limit);
|
||||
let retriedCount = 0;
|
||||
|
||||
for (const dlqJob of jobs) {
|
||||
try {
|
||||
const { originalJob } = dlqJob.data;
|
||||
|
||||
// Re-add to main queue with delay
|
||||
await this.mainQueue.add(originalJob.name, originalJob.data, {
|
||||
...originalJob.opts,
|
||||
delay: this.config.retryDelay,
|
||||
attempts: this.config.maxRetries,
|
||||
});
|
||||
|
||||
// Remove from DLQ
|
||||
await dlqJob.remove();
|
||||
retriedCount++;
|
||||
|
||||
this.logger.info('Job retried from DLQ', {
|
||||
originalJobId: originalJob.id,
|
||||
jobName: originalJob.name,
|
||||
});
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to retry DLQ job', {
|
||||
dlqJobId: dlqJob.id,
|
||||
error,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return retriedCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get DLQ statistics
|
||||
*/
|
||||
async getStats(): Promise<{
|
||||
total: number;
|
||||
recent: number;
|
||||
byJobName: Record<string, number>;
|
||||
oldestJob: Date | null;
|
||||
}> {
|
||||
const [completed, failed, waiting] = await Promise.all([
|
||||
this.dlq.getCompleted(),
|
||||
this.dlq.getFailed(),
|
||||
this.dlq.getWaiting(),
|
||||
]);
|
||||
|
||||
const allJobs = [...completed, ...failed, ...waiting];
|
||||
const byJobName: Record<string, number> = {};
|
||||
let oldestTimestamp: number | null = null;
|
||||
|
||||
for (const job of allJobs) {
|
||||
const jobName = job.data.originalJob?.name || 'unknown';
|
||||
byJobName[jobName] = (byJobName[jobName] || 0) + 1;
|
||||
|
||||
if (!oldestTimestamp || job.timestamp < oldestTimestamp) {
|
||||
oldestTimestamp = job.timestamp;
|
||||
}
|
||||
}
|
||||
|
||||
// Count recent jobs (last 24 hours)
|
||||
const oneDayAgo = Date.now() - 24 * 60 * 60 * 1000;
|
||||
const recent = allJobs.filter(job => job.timestamp > oneDayAgo).length;
|
||||
|
||||
return {
|
||||
total: allJobs.length,
|
||||
recent,
|
||||
byJobName,
|
||||
oldestJob: oldestTimestamp ? new Date(oldestTimestamp) : null,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean up old DLQ entries
|
||||
*/
|
||||
async cleanup(): Promise<number> {
|
||||
const ageInMs = this.config.cleanupAge * 60 * 60 * 1000;
|
||||
const cutoffTime = Date.now() - ageInMs;
|
||||
|
||||
const jobs = await this.dlq.getCompleted();
|
||||
let removedCount = 0;
|
||||
|
||||
for (const job of jobs) {
|
||||
if (job.timestamp < cutoffTime) {
|
||||
await job.remove();
|
||||
removedCount++;
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.info('DLQ cleanup completed', {
|
||||
removedCount,
|
||||
cleanupAge: `${this.config.cleanupAge} hours`,
|
||||
});
|
||||
|
||||
return removedCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if alert threshold is exceeded
|
||||
*/
|
||||
private async checkAlertThreshold(): Promise<void> {
|
||||
const stats = await this.getStats();
|
||||
|
||||
if (stats.total >= this.config.alertThreshold) {
|
||||
this.logger.error('DLQ alert threshold exceeded', {
|
||||
threshold: this.config.alertThreshold,
|
||||
currentCount: stats.total,
|
||||
byJobName: stats.byJobName,
|
||||
});
|
||||
// In a real implementation, this would trigger alerts
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get failed jobs for inspection
|
||||
*/
|
||||
async inspectFailedJobs(limit = 10): Promise<
|
||||
Array<{
|
||||
id: string;
|
||||
name: string;
|
||||
data: unknown;
|
||||
error: unknown;
|
||||
failedAt: string;
|
||||
attempts: number;
|
||||
}>
|
||||
> {
|
||||
const jobs = await this.dlq.getCompleted(0, limit);
|
||||
|
||||
return jobs.map(job => ({
|
||||
id: job.data.originalJob.id,
|
||||
name: job.data.originalJob.name,
|
||||
data: job.data.originalJob.data,
|
||||
error: job.data.error,
|
||||
failedAt: job.data.movedToDLQAt,
|
||||
attempts: job.data.originalJob.attemptsMade,
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown DLQ handler
|
||||
*/
|
||||
async shutdown(): Promise<void> {
|
||||
await this.dlq.close();
|
||||
this.failureCount.clear();
|
||||
}
|
||||
}
|
||||
75
libs/core/queue/src/index.ts
Normal file
75
libs/core/queue/src/index.ts
Normal file
|
|
@ -0,0 +1,75 @@
|
|||
// Core exports
|
||||
export { Queue } from './queue';
|
||||
export { QueueManager } from './queue-manager';
|
||||
export { SmartQueueManager } from './smart-queue-manager';
|
||||
export { ServiceCache, createServiceCache } from './service-cache';
|
||||
export {
|
||||
SERVICE_REGISTRY,
|
||||
getServiceConfig,
|
||||
findServiceForHandler,
|
||||
getFullQueueName,
|
||||
parseQueueName
|
||||
} from './service-registry';
|
||||
|
||||
// Re-export handler registry and utilities from handlers package
|
||||
export { handlerRegistry, createJobHandler } from '@stock-bot/handlers';
|
||||
|
||||
// Batch processing
|
||||
export { processBatchJob, processItems } from './batch-processor';
|
||||
|
||||
// DLQ handling
|
||||
export { DeadLetterQueueHandler } from './dlq-handler';
|
||||
|
||||
// Metrics
|
||||
export { QueueMetricsCollector } from './queue-metrics';
|
||||
|
||||
// Rate limiting
|
||||
export { QueueRateLimiter } from './rate-limiter';
|
||||
|
||||
// Types
|
||||
export type {
|
||||
// Core types
|
||||
JobData,
|
||||
JobOptions,
|
||||
QueueOptions,
|
||||
GlobalStats,
|
||||
|
||||
// Batch processing types
|
||||
BatchResult,
|
||||
ProcessOptions,
|
||||
BatchJobData,
|
||||
|
||||
// Handler types
|
||||
JobHandler,
|
||||
TypedJobHandler,
|
||||
HandlerConfig,
|
||||
HandlerConfigWithSchedule,
|
||||
HandlerInitializer,
|
||||
QueueStats,
|
||||
QueueWorkerConfig,
|
||||
|
||||
// Configuration types
|
||||
RedisConfig,
|
||||
QueueConfig,
|
||||
QueueManagerConfig,
|
||||
|
||||
// Rate limiting types
|
||||
RateLimitConfig,
|
||||
RateLimitRule,
|
||||
|
||||
// DLQ types
|
||||
DLQConfig,
|
||||
DLQJobInfo,
|
||||
|
||||
// Scheduled job types
|
||||
ScheduledJob,
|
||||
ScheduleConfig,
|
||||
|
||||
// Smart Queue types
|
||||
SmartQueueConfig,
|
||||
QueueRoute,
|
||||
|
||||
} from './types';
|
||||
|
||||
// Re-export service registry types
|
||||
export type { ServiceConfig } from './service-registry';
|
||||
444
libs/core/queue/src/queue-manager.ts
Normal file
444
libs/core/queue/src/queue-manager.ts
Normal file
|
|
@ -0,0 +1,444 @@
|
|||
import { createCache } from '@stock-bot/cache';
|
||||
import type { CacheProvider } from '@stock-bot/cache';
|
||||
import { Queue, type QueueWorkerConfig } from './queue';
|
||||
import { QueueRateLimiter } from './rate-limiter';
|
||||
import type {
|
||||
GlobalStats,
|
||||
QueueManagerConfig,
|
||||
QueueOptions,
|
||||
QueueStats,
|
||||
RateLimitRule,
|
||||
} from './types';
|
||||
import { getRedisConnection } from './utils';
|
||||
|
||||
// Logger interface for type safety
|
||||
interface Logger {
|
||||
info(message: string, meta?: Record<string, unknown>): void;
|
||||
error(message: string, meta?: Record<string, unknown>): void;
|
||||
warn(message: string, meta?: Record<string, unknown>): void;
|
||||
debug(message: string, meta?: Record<string, unknown>): void;
|
||||
trace(message: string, meta?: Record<string, unknown>): void;
|
||||
child?(name: string, context?: Record<string, unknown>): Logger;
|
||||
}
|
||||
|
||||
/**
|
||||
* QueueManager provides unified queue and cache management
|
||||
* Main entry point for all queue operations with getQueue() method
|
||||
*/
|
||||
export class QueueManager {
|
||||
private queues = new Map<string, Queue>();
|
||||
private caches = new Map<string, CacheProvider>();
|
||||
private rateLimiter?: QueueRateLimiter;
|
||||
private redisConnection: ReturnType<typeof getRedisConnection>;
|
||||
private isShuttingDown = false;
|
||||
private shutdownPromise: Promise<void> | null = null;
|
||||
private config: QueueManagerConfig;
|
||||
private readonly logger: Logger;
|
||||
|
||||
constructor(config: QueueManagerConfig, logger?: Logger) {
|
||||
this.config = config;
|
||||
this.logger = logger || console;
|
||||
this.redisConnection = getRedisConnection(config.redis);
|
||||
|
||||
// Initialize rate limiter if rules are provided
|
||||
if (config.rateLimitRules && config.rateLimitRules.length > 0) {
|
||||
this.rateLimiter = new QueueRateLimiter(this.redisConnection, this.logger);
|
||||
config.rateLimitRules.forEach(rule => {
|
||||
if (this.rateLimiter) {
|
||||
this.rateLimiter.addRule(rule);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
this.logger.info('QueueManager initialized', {
|
||||
redis: `${config.redis.host}:${config.redis.port}`,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get or create a queue - unified method that handles both scenarios
|
||||
* This is the main method for accessing queues
|
||||
*/
|
||||
getQueue(queueName: string, options: QueueOptions = {}): Queue {
|
||||
// Return existing queue if it exists
|
||||
if (this.queues.has(queueName)) {
|
||||
const existingQueue = this.queues.get(queueName);
|
||||
if (existingQueue) {
|
||||
return existingQueue;
|
||||
}
|
||||
}
|
||||
|
||||
// Create new queue with merged options
|
||||
const mergedOptions = {
|
||||
...this.config.defaultQueueOptions,
|
||||
...options,
|
||||
};
|
||||
|
||||
// Prepare queue configuration
|
||||
const workers = mergedOptions.workers ?? this.config.defaultQueueOptions?.workers ?? 1;
|
||||
const concurrency = mergedOptions.concurrency ?? this.config.defaultQueueOptions?.concurrency ?? 1;
|
||||
|
||||
const queueConfig: QueueWorkerConfig = {
|
||||
workers,
|
||||
concurrency,
|
||||
startWorker: workers > 0 && !this.config.delayWorkerStart,
|
||||
};
|
||||
|
||||
const queue = new Queue(
|
||||
queueName,
|
||||
this.config.redis,
|
||||
mergedOptions.defaultJobOptions || {},
|
||||
queueConfig,
|
||||
this.logger
|
||||
);
|
||||
|
||||
// Store the queue
|
||||
this.queues.set(queueName, queue);
|
||||
|
||||
// Automatically initialize batch cache for the queue
|
||||
this.initializeBatchCacheSync(queueName);
|
||||
|
||||
// Add queue-specific rate limit rules
|
||||
if (this.rateLimiter && mergedOptions.rateLimitRules) {
|
||||
mergedOptions.rateLimitRules.forEach(rule => {
|
||||
// Ensure queue name is set for queue-specific rules
|
||||
const ruleWithQueue = { ...rule, queueName };
|
||||
if (this.rateLimiter) {
|
||||
this.rateLimiter.addRule(ruleWithQueue);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
this.logger.info('Queue created with batch cache', {
|
||||
queueName,
|
||||
workers: workers,
|
||||
concurrency: concurrency,
|
||||
});
|
||||
|
||||
return queue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a queue exists
|
||||
*/
|
||||
hasQueue(queueName: string): boolean {
|
||||
return this.queues.has(queueName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all queue names
|
||||
*/
|
||||
getQueueNames(): string[] {
|
||||
return Array.from(this.queues.keys());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get or create a cache for a queue
|
||||
*/
|
||||
getCache(queueName: string): CacheProvider {
|
||||
if (!this.caches.has(queueName)) {
|
||||
const cacheProvider = createCache({
|
||||
redisConfig: this.config.redis,
|
||||
keyPrefix: `batch:${queueName}:`,
|
||||
ttl: 86400, // 24 hours default
|
||||
enableMetrics: true,
|
||||
logger: this.logger,
|
||||
});
|
||||
this.caches.set(queueName, cacheProvider);
|
||||
this.logger.trace('Cache created for queue', { queueName });
|
||||
}
|
||||
const cache = this.caches.get(queueName);
|
||||
if (!cache) {
|
||||
throw new Error(`Expected cache for queue ${queueName} to exist`);
|
||||
}
|
||||
return cache;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize cache for a queue (ensures it's ready)
|
||||
*/
|
||||
async initializeCache(queueName: string): Promise<void> {
|
||||
const cache = this.getCache(queueName);
|
||||
await cache.waitForReady(10000);
|
||||
this.logger.info('Cache initialized for queue', { queueName });
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize batch cache synchronously (for automatic initialization)
|
||||
* The cache will be ready for use, but we don't wait for Redis connection
|
||||
*/
|
||||
private initializeBatchCacheSync(queueName: string): void {
|
||||
// Just create the cache - it will connect automatically when first used
|
||||
this.getCache(queueName);
|
||||
this.logger.trace('Batch cache initialized synchronously for queue', { queueName });
|
||||
}
|
||||
|
||||
/**
|
||||
* Get statistics for all queues
|
||||
*/
|
||||
async getGlobalStats(): Promise<GlobalStats> {
|
||||
const queueStats: Record<string, QueueStats> = {};
|
||||
let totalJobs = 0;
|
||||
let totalWorkers = 0;
|
||||
|
||||
for (const [queueName, queue] of this.queues) {
|
||||
const stats = await queue.getStats();
|
||||
queueStats[queueName] = stats;
|
||||
|
||||
totalJobs += stats.waiting + stats.active + stats.completed + stats.failed + stats.delayed;
|
||||
totalWorkers += stats.workers || 0;
|
||||
}
|
||||
|
||||
return {
|
||||
queues: queueStats,
|
||||
totalJobs,
|
||||
totalWorkers,
|
||||
uptime: process.uptime(),
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Get statistics for a specific queue
|
||||
*/
|
||||
async getQueueStats(queueName: string): Promise<QueueStats | undefined> {
|
||||
const queue = this.queues.get(queueName);
|
||||
if (!queue) {
|
||||
return undefined;
|
||||
}
|
||||
return await queue.getStats();
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a rate limit rule
|
||||
*/
|
||||
addRateLimitRule(rule: RateLimitRule): void {
|
||||
if (!this.rateLimiter) {
|
||||
this.rateLimiter = new QueueRateLimiter(this.redisConnection, this.logger);
|
||||
}
|
||||
this.rateLimiter.addRule(rule);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check rate limits for a job
|
||||
*/
|
||||
async checkRateLimit(
|
||||
queueName: string,
|
||||
handler: string,
|
||||
operation: string
|
||||
): Promise<{
|
||||
allowed: boolean;
|
||||
retryAfter?: number;
|
||||
remainingPoints?: number;
|
||||
appliedRule?: RateLimitRule;
|
||||
}> {
|
||||
if (!this.rateLimiter) {
|
||||
return { allowed: true };
|
||||
}
|
||||
|
||||
return await this.rateLimiter.checkLimit(queueName, handler, operation);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get rate limit status
|
||||
*/
|
||||
async getRateLimitStatus(queueName: string, handler: string, operation: string) {
|
||||
if (!this.rateLimiter) {
|
||||
return {
|
||||
queueName,
|
||||
handler,
|
||||
operation,
|
||||
};
|
||||
}
|
||||
|
||||
return await this.rateLimiter.getStatus(queueName, handler, operation);
|
||||
}
|
||||
|
||||
/**
|
||||
* Pause all queues
|
||||
*/
|
||||
async pauseAll(): Promise<void> {
|
||||
const pausePromises = Array.from(this.queues.values()).map(queue => queue.pause());
|
||||
await Promise.all(pausePromises);
|
||||
this.logger.info('All queues paused');
|
||||
}
|
||||
|
||||
/**
|
||||
* Resume all queues
|
||||
*/
|
||||
async resumeAll(): Promise<void> {
|
||||
const resumePromises = Array.from(this.queues.values()).map(queue => queue.resume());
|
||||
await Promise.all(resumePromises);
|
||||
this.logger.info('All queues resumed');
|
||||
}
|
||||
|
||||
/**
|
||||
* Pause a specific queue
|
||||
*/
|
||||
async pauseQueue(queueName: string): Promise<boolean> {
|
||||
const queue = this.queues.get(queueName);
|
||||
if (!queue) {
|
||||
return false;
|
||||
}
|
||||
|
||||
await queue.pause();
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Resume a specific queue
|
||||
*/
|
||||
async resumeQueue(queueName: string): Promise<boolean> {
|
||||
const queue = this.queues.get(queueName);
|
||||
if (!queue) {
|
||||
return false;
|
||||
}
|
||||
|
||||
await queue.resume();
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Drain all queues
|
||||
*/
|
||||
async drainAll(delayed = false): Promise<void> {
|
||||
const drainPromises = Array.from(this.queues.values()).map(queue => queue.drain(delayed));
|
||||
await Promise.all(drainPromises);
|
||||
this.logger.info('All queues drained', { delayed });
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean all queues
|
||||
*/
|
||||
async cleanAll(
|
||||
grace: number = 0,
|
||||
limit: number = 100,
|
||||
type: 'completed' | 'failed' = 'completed'
|
||||
): Promise<void> {
|
||||
const cleanPromises = Array.from(this.queues.values()).map(queue =>
|
||||
queue.clean(grace, limit, type)
|
||||
);
|
||||
await Promise.all(cleanPromises);
|
||||
this.logger.info('All queues cleaned', { type, grace, limit });
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown all queues and workers (thread-safe)
|
||||
*/
|
||||
async shutdown(): Promise<void> {
|
||||
// If already shutting down, return the existing promise
|
||||
if (this.shutdownPromise) {
|
||||
return this.shutdownPromise;
|
||||
}
|
||||
|
||||
if (this.isShuttingDown) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.isShuttingDown = true;
|
||||
this.logger.info('Shutting down QueueManager...');
|
||||
|
||||
// Create shutdown promise
|
||||
this.shutdownPromise = this.performShutdown();
|
||||
return this.shutdownPromise;
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform the actual shutdown
|
||||
*/
|
||||
private async performShutdown(): Promise<void> {
|
||||
try {
|
||||
// Close all queues (this now includes workers since they're managed by Queue class)
|
||||
const queueShutdownPromises = Array.from(this.queues.values()).map(async queue => {
|
||||
try {
|
||||
// Add timeout to queue.close() to prevent hanging
|
||||
await queue.close();
|
||||
// const timeoutPromise = new Promise<never>((_, reject) =>
|
||||
// setTimeout(() => reject(new Error('Queue close timeout')), 100)
|
||||
// );
|
||||
|
||||
// await Promise.race([closePromise, timeoutPromise]);
|
||||
} catch (error) {
|
||||
this.logger.warn('Error closing queue', { error: (error as Error).message });
|
||||
}
|
||||
});
|
||||
|
||||
await Promise.all(queueShutdownPromises);
|
||||
|
||||
// Close all caches
|
||||
const cacheShutdownPromises = Array.from(this.caches.values()).map(async cache => {
|
||||
try {
|
||||
// Clear cache before shutdown
|
||||
await cache.clear();
|
||||
} catch (error) {
|
||||
this.logger.warn('Error clearing cache', { error: (error as Error).message });
|
||||
}
|
||||
});
|
||||
|
||||
await Promise.all(cacheShutdownPromises);
|
||||
|
||||
// Clear collections
|
||||
this.queues.clear();
|
||||
this.caches.clear();
|
||||
|
||||
this.logger.info('QueueManager shutdown complete');
|
||||
} catch (error) {
|
||||
this.logger.error('Error during shutdown', { error: (error as Error).message });
|
||||
throw error;
|
||||
} finally {
|
||||
// Reset shutdown state
|
||||
this.shutdownPromise = null;
|
||||
this.isShuttingDown = false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start workers for all queues (used when delayWorkerStart is enabled)
|
||||
*/
|
||||
startAllWorkers(): void {
|
||||
if (!this.config.delayWorkerStart) {
|
||||
this.logger.info(
|
||||
'startAllWorkers() called but workers already started automatically (delayWorkerStart is false)'
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
let workersStarted = 0;
|
||||
for (const queue of this.queues.values()) {
|
||||
const workerCount = this.config.defaultQueueOptions?.workers || 1;
|
||||
const concurrency = this.config.defaultQueueOptions?.concurrency || 1;
|
||||
|
||||
if (workerCount > 0) {
|
||||
queue.startWorkersManually(workerCount, concurrency);
|
||||
workersStarted++;
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.info('All workers started', {
|
||||
totalQueues: this.queues.size,
|
||||
queuesWithWorkers: workersStarted,
|
||||
delayWorkerStart: this.config.delayWorkerStart,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for all queues to be ready
|
||||
*/
|
||||
async waitUntilReady(): Promise<void> {
|
||||
const readyPromises = Array.from(this.queues.values()).map(queue => queue.waitUntilReady());
|
||||
await Promise.all(readyPromises);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Redis configuration (for backward compatibility)
|
||||
*/
|
||||
getRedisConfig() {
|
||||
return this.config.redis;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current configuration
|
||||
*/
|
||||
getConfig(): Readonly<QueueManagerConfig> {
|
||||
return { ...this.config };
|
||||
}
|
||||
}
|
||||
318
libs/core/queue/src/queue-metrics.ts
Normal file
318
libs/core/queue/src/queue-metrics.ts
Normal file
|
|
@ -0,0 +1,318 @@
|
|||
import { Queue, QueueEvents } from 'bullmq';
|
||||
|
||||
// import { getLogger } from '@stock-bot/logger';
|
||||
|
||||
// const logger = getLogger('queue-metrics');
|
||||
|
||||
export interface QueueMetrics {
|
||||
// Job counts
|
||||
waiting: number;
|
||||
active: number;
|
||||
completed: number;
|
||||
failed: number;
|
||||
delayed: number;
|
||||
paused?: number;
|
||||
|
||||
// Performance metrics
|
||||
processingTime: {
|
||||
avg: number;
|
||||
min: number;
|
||||
max: number;
|
||||
p95: number;
|
||||
p99: number;
|
||||
};
|
||||
|
||||
// Throughput
|
||||
throughput: {
|
||||
completedPerMinute: number;
|
||||
failedPerMinute: number;
|
||||
totalPerMinute: number;
|
||||
};
|
||||
|
||||
// Job age
|
||||
oldestWaitingJob: Date | null;
|
||||
|
||||
// Health
|
||||
isHealthy: boolean;
|
||||
healthIssues: string[];
|
||||
}
|
||||
|
||||
export class QueueMetricsCollector {
|
||||
private processingTimes: number[] = [];
|
||||
private completedTimestamps: number[] = [];
|
||||
private failedTimestamps: number[] = [];
|
||||
private jobStartTimes = new Map<string, number>();
|
||||
private readonly maxSamples = 1000;
|
||||
private readonly metricsInterval = 60000; // 1 minute
|
||||
|
||||
constructor(
|
||||
private queue: Queue,
|
||||
private queueEvents: QueueEvents
|
||||
) {
|
||||
this.setupEventListeners();
|
||||
}
|
||||
|
||||
/**
|
||||
* Setup event listeners for metrics collection
|
||||
*/
|
||||
private setupEventListeners(): void {
|
||||
this.queueEvents.on('completed', () => {
|
||||
// Record completion
|
||||
this.completedTimestamps.push(Date.now());
|
||||
this.cleanupOldTimestamps();
|
||||
});
|
||||
|
||||
this.queueEvents.on('failed', () => {
|
||||
// Record failure
|
||||
this.failedTimestamps.push(Date.now());
|
||||
this.cleanupOldTimestamps();
|
||||
});
|
||||
|
||||
// Track processing times
|
||||
this.queueEvents.on('active', ({ jobId }) => {
|
||||
this.jobStartTimes.set(jobId, Date.now());
|
||||
});
|
||||
|
||||
this.queueEvents.on('completed', ({ jobId }) => {
|
||||
const startTime = this.jobStartTimes.get(jobId);
|
||||
if (startTime) {
|
||||
const processingTime = Date.now() - startTime;
|
||||
this.recordProcessingTime(processingTime);
|
||||
this.jobStartTimes.delete(jobId);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Record processing time
|
||||
*/
|
||||
private recordProcessingTime(time: number): void {
|
||||
this.processingTimes.push(time);
|
||||
|
||||
// Keep only recent samples
|
||||
if (this.processingTimes.length > this.maxSamples) {
|
||||
this.processingTimes = this.processingTimes.slice(-this.maxSamples);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean up old timestamps
|
||||
*/
|
||||
private cleanupOldTimestamps(): void {
|
||||
const cutoff = Date.now() - this.metricsInterval;
|
||||
|
||||
this.completedTimestamps = this.completedTimestamps.filter(ts => ts > cutoff);
|
||||
this.failedTimestamps = this.failedTimestamps.filter(ts => ts > cutoff);
|
||||
}
|
||||
|
||||
/**
|
||||
* Collect current metrics
|
||||
*/
|
||||
async collect(): Promise<QueueMetrics> {
|
||||
// Get job counts
|
||||
const [waiting, active, completed, failed, delayed] = await Promise.all([
|
||||
this.queue.getWaitingCount(),
|
||||
this.queue.getActiveCount(),
|
||||
this.queue.getCompletedCount(),
|
||||
this.queue.getFailedCount(),
|
||||
this.queue.getDelayedCount(),
|
||||
]);
|
||||
|
||||
// BullMQ doesn't have getPausedCount, check if queue is paused
|
||||
const paused = (await this.queue.isPaused()) ? waiting : 0;
|
||||
|
||||
// Calculate processing time metrics
|
||||
const processingTime = this.calculateProcessingTimeMetrics();
|
||||
|
||||
// Calculate throughput
|
||||
const throughput = this.calculateThroughput();
|
||||
|
||||
// Get oldest waiting job
|
||||
const oldestWaitingJob = await this.getOldestWaitingJob();
|
||||
|
||||
// Check health
|
||||
const { isHealthy, healthIssues } = this.checkHealth({
|
||||
waiting,
|
||||
active,
|
||||
failed,
|
||||
processingTime,
|
||||
});
|
||||
|
||||
return {
|
||||
waiting,
|
||||
active,
|
||||
completed,
|
||||
failed,
|
||||
delayed,
|
||||
paused,
|
||||
processingTime,
|
||||
throughput,
|
||||
oldestWaitingJob,
|
||||
isHealthy,
|
||||
healthIssues,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate processing time metrics
|
||||
*/
|
||||
private calculateProcessingTimeMetrics(): QueueMetrics['processingTime'] {
|
||||
if (this.processingTimes.length === 0) {
|
||||
return { avg: 0, min: 0, max: 0, p95: 0, p99: 0 };
|
||||
}
|
||||
|
||||
const sorted = [...this.processingTimes].sort((a, b) => a - b);
|
||||
const sum = sorted.reduce((acc, val) => acc + val, 0);
|
||||
|
||||
return {
|
||||
avg: sorted.length > 0 ? Math.round(sum / sorted.length) : 0,
|
||||
min: sorted[0] || 0,
|
||||
max: sorted[sorted.length - 1] || 0,
|
||||
p95: sorted[Math.floor(sorted.length * 0.95)] || 0,
|
||||
p99: sorted[Math.floor(sorted.length * 0.99)] || 0,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate throughput metrics
|
||||
*/
|
||||
private calculateThroughput(): QueueMetrics['throughput'] {
|
||||
const now = Date.now();
|
||||
const oneMinuteAgo = now - 60000;
|
||||
|
||||
const completedPerMinute = this.completedTimestamps.filter(ts => ts > oneMinuteAgo).length;
|
||||
const failedPerMinute = this.failedTimestamps.filter(ts => ts > oneMinuteAgo).length;
|
||||
|
||||
return {
|
||||
completedPerMinute,
|
||||
failedPerMinute,
|
||||
totalPerMinute: completedPerMinute + failedPerMinute,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Get oldest waiting job
|
||||
*/
|
||||
private async getOldestWaitingJob(): Promise<Date | null> {
|
||||
const waitingJobs = await this.queue.getWaiting(0, 1);
|
||||
|
||||
if (waitingJobs.length > 0) {
|
||||
return new Date(waitingJobs[0].timestamp);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check queue health
|
||||
*/
|
||||
private checkHealth(metrics: {
|
||||
waiting: number;
|
||||
active: number;
|
||||
failed: number;
|
||||
processingTime: QueueMetrics['processingTime'];
|
||||
}): { isHealthy: boolean; healthIssues: string[] } {
|
||||
const issues: string[] = [];
|
||||
|
||||
// Check for high failure rate
|
||||
const failureRate = metrics.failed / (metrics.failed + this.completedTimestamps.length);
|
||||
if (failureRate > 0.1) {
|
||||
issues.push(`High failure rate: ${(failureRate * 100).toFixed(1)}%`);
|
||||
}
|
||||
|
||||
// Check for queue backlog
|
||||
if (metrics.waiting > 1000) {
|
||||
issues.push(`Large queue backlog: ${metrics.waiting} jobs waiting`);
|
||||
}
|
||||
|
||||
// Check for slow processing
|
||||
if (metrics.processingTime.avg > 30000) {
|
||||
// 30 seconds
|
||||
issues.push(
|
||||
`Slow average processing time: ${(metrics.processingTime.avg / 1000).toFixed(1)}s`
|
||||
);
|
||||
}
|
||||
|
||||
// Check for stalled active jobs
|
||||
if (metrics.active > 100) {
|
||||
issues.push(`High number of active jobs: ${metrics.active}`);
|
||||
}
|
||||
|
||||
return {
|
||||
isHealthy: issues.length === 0,
|
||||
healthIssues: issues,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Get formatted metrics report
|
||||
*/
|
||||
async getReport(): Promise<string> {
|
||||
const metrics = await this.collect();
|
||||
|
||||
return `
|
||||
Queue Metrics Report
|
||||
===================
|
||||
Status: ${metrics.isHealthy ? '✅ Healthy' : '⚠️ Issues Detected'}
|
||||
|
||||
Job Counts:
|
||||
- Waiting: ${metrics.waiting}
|
||||
- Active: ${metrics.active}
|
||||
- Completed: ${metrics.completed}
|
||||
- Failed: ${metrics.failed}
|
||||
- Delayed: ${metrics.delayed}
|
||||
- Paused: ${metrics.paused}
|
||||
|
||||
Performance:
|
||||
- Avg Processing Time: ${(metrics.processingTime.avg / 1000).toFixed(2)}s
|
||||
- Min/Max: ${(metrics.processingTime.min / 1000).toFixed(2)}s / ${(metrics.processingTime.max / 1000).toFixed(2)}s
|
||||
- P95/P99: ${(metrics.processingTime.p95 / 1000).toFixed(2)}s / ${(metrics.processingTime.p99 / 1000).toFixed(2)}s
|
||||
|
||||
Throughput:
|
||||
- Completed/min: ${metrics.throughput.completedPerMinute}
|
||||
- Failed/min: ${metrics.throughput.failedPerMinute}
|
||||
- Total/min: ${metrics.throughput.totalPerMinute}
|
||||
|
||||
${metrics.oldestWaitingJob ? `Oldest Waiting Job: ${metrics.oldestWaitingJob.toISOString()}` : 'No waiting jobs'}
|
||||
|
||||
${metrics.healthIssues.length > 0 ? `\nHealth Issues:\n${metrics.healthIssues.map(issue => `- ${issue}`).join('\n')}` : ''}
|
||||
`.trim();
|
||||
}
|
||||
|
||||
/**
|
||||
* Export metrics in Prometheus format
|
||||
*/
|
||||
async getPrometheusMetrics(): Promise<string> {
|
||||
const metrics = await this.collect();
|
||||
const queueName = this.queue.name;
|
||||
|
||||
return `
|
||||
# HELP queue_jobs_total Total number of jobs by status
|
||||
# TYPE queue_jobs_total gauge
|
||||
queue_jobs_total{queue="${queueName}",status="waiting"} ${metrics.waiting}
|
||||
queue_jobs_total{queue="${queueName}",status="active"} ${metrics.active}
|
||||
queue_jobs_total{queue="${queueName}",status="completed"} ${metrics.completed}
|
||||
queue_jobs_total{queue="${queueName}",status="failed"} ${metrics.failed}
|
||||
queue_jobs_total{queue="${queueName}",status="delayed"} ${metrics.delayed}
|
||||
queue_jobs_total{queue="${queueName}",status="paused"} ${metrics.paused}
|
||||
|
||||
# HELP queue_processing_time_seconds Job processing time in seconds
|
||||
# TYPE queue_processing_time_seconds summary
|
||||
queue_processing_time_seconds{queue="${queueName}",quantile="0.5"} ${(metrics.processingTime.avg / 1000).toFixed(3)}
|
||||
queue_processing_time_seconds{queue="${queueName}",quantile="0.95"} ${(metrics.processingTime.p95 / 1000).toFixed(3)}
|
||||
queue_processing_time_seconds{queue="${queueName}",quantile="0.99"} ${(metrics.processingTime.p99 / 1000).toFixed(3)}
|
||||
queue_processing_time_seconds_sum{queue="${queueName}"} ${((metrics.processingTime.avg * this.processingTimes.length) / 1000).toFixed(3)}
|
||||
queue_processing_time_seconds_count{queue="${queueName}"} ${this.processingTimes.length}
|
||||
|
||||
# HELP queue_throughput_per_minute Jobs processed per minute
|
||||
# TYPE queue_throughput_per_minute gauge
|
||||
queue_throughput_per_minute{queue="${queueName}",status="completed"} ${metrics.throughput.completedPerMinute}
|
||||
queue_throughput_per_minute{queue="${queueName}",status="failed"} ${metrics.throughput.failedPerMinute}
|
||||
queue_throughput_per_minute{queue="${queueName}",status="total"} ${metrics.throughput.totalPerMinute}
|
||||
|
||||
# HELP queue_health Queue health status
|
||||
# TYPE queue_health gauge
|
||||
queue_health{queue="${queueName}"} ${metrics.isHealthy ? 1 : 0}
|
||||
`.trim();
|
||||
}
|
||||
}
|
||||
394
libs/core/queue/src/queue.ts
Normal file
394
libs/core/queue/src/queue.ts
Normal file
|
|
@ -0,0 +1,394 @@
|
|||
import { Queue as BullQueue, QueueEvents, Worker, type Job } from 'bullmq';
|
||||
import { handlerRegistry } from '@stock-bot/handlers';
|
||||
import type { JobData, JobOptions, ExtendedJobOptions, QueueStats, RedisConfig } from './types';
|
||||
import { getRedisConnection } from './utils';
|
||||
|
||||
// Logger interface for type safety
|
||||
interface Logger {
|
||||
info(message: string, meta?: Record<string, unknown>): void;
|
||||
error(message: string, meta?: Record<string, unknown>): void;
|
||||
warn(message: string, meta?: Record<string, unknown>): void;
|
||||
debug(message: string, meta?: Record<string, unknown>): void;
|
||||
trace(message: string, meta?: Record<string, unknown>): void;
|
||||
child?(name: string, context?: Record<string, unknown>): Logger;
|
||||
}
|
||||
|
||||
export interface QueueWorkerConfig {
|
||||
workers?: number;
|
||||
concurrency?: number;
|
||||
startWorker?: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Consolidated Queue class that handles both job operations and optional worker management
|
||||
* Can be used as a simple job queue or with workers for automatic processing
|
||||
*/
|
||||
export class Queue {
|
||||
private bullQueue: BullQueue;
|
||||
private workers: Worker[] = [];
|
||||
private queueEvents?: QueueEvents;
|
||||
private queueName: string;
|
||||
private redisConfig: RedisConfig;
|
||||
private readonly logger: Logger;
|
||||
|
||||
constructor(
|
||||
queueName: string,
|
||||
redisConfig: RedisConfig,
|
||||
defaultJobOptions: JobOptions = {},
|
||||
config: QueueWorkerConfig = {},
|
||||
logger?: Logger
|
||||
) {
|
||||
this.queueName = queueName;
|
||||
this.redisConfig = redisConfig;
|
||||
this.logger = logger || console;
|
||||
|
||||
const connection = getRedisConnection(redisConfig);
|
||||
|
||||
// Initialize BullMQ queue
|
||||
this.bullQueue = new BullQueue(`{${queueName}}`, {
|
||||
connection,
|
||||
defaultJobOptions: {
|
||||
removeOnComplete: 10,
|
||||
removeOnFail: 5,
|
||||
attempts: 3,
|
||||
backoff: {
|
||||
type: 'exponential',
|
||||
delay: 1000,
|
||||
},
|
||||
...defaultJobOptions,
|
||||
},
|
||||
});
|
||||
|
||||
// Initialize queue events if workers will be used
|
||||
if (config.workers && config.workers > 0) {
|
||||
this.queueEvents = new QueueEvents(`{${queueName}}`, { connection });
|
||||
}
|
||||
|
||||
// Start workers if requested and not explicitly disabled
|
||||
if (config.workers && config.workers > 0 && config.startWorker !== false) {
|
||||
this.startWorkers(config.workers, config.concurrency || 1);
|
||||
}
|
||||
|
||||
this.logger.trace('Queue created', {
|
||||
queueName,
|
||||
workers: config.workers || 0,
|
||||
concurrency: config.concurrency || 1,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the queue name
|
||||
*/
|
||||
getName(): string {
|
||||
return this.queueName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the underlying BullMQ queue instance (for monitoring/admin purposes)
|
||||
*/
|
||||
getBullQueue(): BullQueue {
|
||||
return this.bullQueue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a single job to the queue
|
||||
*/
|
||||
async add(name: string, data: JobData, options: JobOptions = {}): Promise<Job> {
|
||||
this.logger.trace('Adding job', { queueName: this.queueName, jobName: name });
|
||||
return await this.bullQueue.add(name, data, options);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add multiple jobs to the queue in bulk
|
||||
*/
|
||||
async addBulk(jobs: Array<{ name: string; data: JobData; opts?: JobOptions }>): Promise<Job[]> {
|
||||
this.logger.trace('Adding bulk jobs', {
|
||||
queueName: this.queueName,
|
||||
jobCount: jobs.length,
|
||||
});
|
||||
return await this.bullQueue.addBulk(jobs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a scheduled job with cron-like pattern
|
||||
*/
|
||||
async addScheduledJob(
|
||||
name: string,
|
||||
data: JobData,
|
||||
cronPattern: string,
|
||||
options: ExtendedJobOptions = {}
|
||||
): Promise<Job> {
|
||||
const scheduledOptions: ExtendedJobOptions = {
|
||||
...options,
|
||||
repeat: {
|
||||
pattern: cronPattern,
|
||||
// Use job name as repeat key to prevent duplicates
|
||||
key: `${this.queueName}:${name}`,
|
||||
...options.repeat,
|
||||
},
|
||||
};
|
||||
|
||||
this.logger.info('Adding scheduled job', {
|
||||
queueName: this.queueName,
|
||||
jobName: name,
|
||||
cronPattern,
|
||||
repeatKey: scheduledOptions.repeat?.key,
|
||||
immediately: scheduledOptions.repeat?.immediately,
|
||||
});
|
||||
|
||||
return await this.bullQueue.add(name, data, scheduledOptions);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get queue statistics
|
||||
*/
|
||||
async getStats(): Promise<QueueStats> {
|
||||
const [waiting, active, completed, failed, delayed] = await Promise.all([
|
||||
this.bullQueue.getWaiting(),
|
||||
this.bullQueue.getActive(),
|
||||
this.bullQueue.getCompleted(),
|
||||
this.bullQueue.getFailed(),
|
||||
this.bullQueue.getDelayed(),
|
||||
]);
|
||||
|
||||
const isPaused = await this.bullQueue.isPaused();
|
||||
|
||||
return {
|
||||
waiting: waiting.length,
|
||||
active: active.length,
|
||||
completed: completed.length,
|
||||
failed: failed.length,
|
||||
delayed: delayed.length,
|
||||
paused: isPaused,
|
||||
workers: this.workers.length,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a specific job by ID
|
||||
*/
|
||||
async getJob(jobId: string): Promise<Job | undefined> {
|
||||
return await this.bullQueue.getJob(jobId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get jobs by state
|
||||
*/
|
||||
async getJobs(
|
||||
states: Array<'waiting' | 'active' | 'completed' | 'failed' | 'delayed'>,
|
||||
start = 0,
|
||||
end = 100
|
||||
): Promise<Job[]> {
|
||||
return await this.bullQueue.getJobs(states, start, end);
|
||||
}
|
||||
|
||||
/**
|
||||
* Pause the queue (stops processing new jobs)
|
||||
*/
|
||||
async pause(): Promise<void> {
|
||||
await this.bullQueue.pause();
|
||||
this.logger.info('Queue paused', { queueName: this.queueName });
|
||||
}
|
||||
|
||||
/**
|
||||
* Resume the queue
|
||||
*/
|
||||
async resume(): Promise<void> {
|
||||
await this.bullQueue.resume();
|
||||
this.logger.info('Queue resumed', { queueName: this.queueName });
|
||||
}
|
||||
|
||||
/**
|
||||
* Drain the queue (remove all jobs)
|
||||
*/
|
||||
async drain(delayed = false): Promise<void> {
|
||||
await this.bullQueue.drain(delayed);
|
||||
this.logger.info('Queue drained', { queueName: this.queueName, delayed });
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean completed and failed jobs
|
||||
*/
|
||||
async clean(
|
||||
grace: number = 0,
|
||||
limit: number = 100,
|
||||
type: 'completed' | 'failed' = 'completed'
|
||||
): Promise<void> {
|
||||
await this.bullQueue.clean(grace, limit, type);
|
||||
this.logger.debug('Queue cleaned', { queueName: this.queueName, type, grace, limit });
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait until the queue is ready
|
||||
*/
|
||||
async waitUntilReady(): Promise<void> {
|
||||
await this.bullQueue.waitUntilReady();
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the queue (cleanup resources)
|
||||
*/
|
||||
/**
|
||||
* Close the queue (cleanup resources)
|
||||
*/
|
||||
async close(): Promise<void> {
|
||||
try {
|
||||
// Close the queue itself
|
||||
await this.bullQueue.close();
|
||||
this.logger.info('Queue closed', { queueName: this.queueName });
|
||||
|
||||
// Close queue events
|
||||
if (this.queueEvents) {
|
||||
await this.queueEvents.close();
|
||||
this.logger.debug('Queue events closed', { queueName: this.queueName });
|
||||
}
|
||||
|
||||
// Close workers first
|
||||
if (this.workers.length > 0) {
|
||||
await Promise.all(
|
||||
this.workers.map(async worker => {
|
||||
return await worker.close();
|
||||
})
|
||||
);
|
||||
this.workers = [];
|
||||
this.logger.debug('Workers closed', { queueName: this.queueName });
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error('Error closing queue', { queueName: this.queueName, error });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a child logger with additional context
|
||||
* Useful for batch processing and other queue operations
|
||||
*/
|
||||
createChildLogger(name: string, context?: Record<string, unknown>) {
|
||||
if (this.logger && typeof this.logger.child === 'function') {
|
||||
return this.logger.child(name, context);
|
||||
}
|
||||
// Fallback to main logger if child not supported (e.g., console)
|
||||
return this.logger;
|
||||
}
|
||||
|
||||
/**
|
||||
* Start workers for this queue
|
||||
*/
|
||||
private startWorkers(workerCount: number, concurrency: number): void {
|
||||
const connection = getRedisConnection(this.redisConfig);
|
||||
|
||||
for (let i = 0; i < workerCount; i++) {
|
||||
const worker = new Worker(`{${this.queueName}}`, this.processJob.bind(this), {
|
||||
connection,
|
||||
concurrency,
|
||||
maxStalledCount: 3,
|
||||
stalledInterval: 30000,
|
||||
});
|
||||
|
||||
// Setup worker event handlers
|
||||
worker.on('completed', job => {
|
||||
this.logger.trace('Job completed', {
|
||||
queueName: this.queueName,
|
||||
jobId: job.id,
|
||||
handler: job.data?.handler,
|
||||
operation: job.data?.operation,
|
||||
});
|
||||
});
|
||||
|
||||
worker.on('failed', (job, err) => {
|
||||
this.logger.error('Job failed', {
|
||||
queueName: this.queueName,
|
||||
jobId: job?.id,
|
||||
handler: job?.data?.handler,
|
||||
operation: job?.data?.operation,
|
||||
error: err.message,
|
||||
});
|
||||
});
|
||||
|
||||
worker.on('error', error => {
|
||||
this.logger.error('Worker error', {
|
||||
queueName: this.queueName,
|
||||
workerId: i,
|
||||
error: error.message,
|
||||
});
|
||||
});
|
||||
|
||||
this.workers.push(worker);
|
||||
}
|
||||
|
||||
this.logger.info('Workers started', {
|
||||
queueName: this.queueName,
|
||||
workerCount,
|
||||
concurrency,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a job using the handler registry
|
||||
*/
|
||||
private async processJob(job: Job): Promise<unknown> {
|
||||
const { handler, operation, payload }: JobData = job.data;
|
||||
|
||||
this.logger.trace('Processing job', {
|
||||
id: job.id,
|
||||
handler,
|
||||
operation,
|
||||
queueName: this.queueName,
|
||||
});
|
||||
|
||||
try {
|
||||
// Look up handler in registry
|
||||
const jobHandler = handlerRegistry.getOperation(handler, operation);
|
||||
|
||||
if (!jobHandler) {
|
||||
throw new Error(`No handler found for ${handler}:${operation}`);
|
||||
}
|
||||
|
||||
const result = await jobHandler(payload);
|
||||
|
||||
this.logger.trace('Job completed successfully', {
|
||||
id: job.id,
|
||||
handler,
|
||||
operation,
|
||||
queueName: this.queueName,
|
||||
});
|
||||
|
||||
return result;
|
||||
} catch (error) {
|
||||
this.logger.error('Job processing failed', {
|
||||
id: job.id,
|
||||
handler,
|
||||
operation,
|
||||
queueName: this.queueName,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start workers manually (for delayed initialization)
|
||||
*/
|
||||
startWorkersManually(workerCount: number, concurrency: number = 1): void {
|
||||
if (this.workers.length > 0) {
|
||||
this.logger.warn('Workers already started for queue', { queueName: this.queueName });
|
||||
return;
|
||||
}
|
||||
|
||||
// Initialize queue events if not already done
|
||||
if (!this.queueEvents) {
|
||||
const connection = getRedisConnection(this.redisConfig);
|
||||
this.queueEvents = new QueueEvents(`{${this.queueName}}`, { connection });
|
||||
}
|
||||
|
||||
this.startWorkers(workerCount, concurrency);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of active workers
|
||||
*/
|
||||
getWorkerCount(): number {
|
||||
return this.workers.length;
|
||||
}
|
||||
|
||||
}
|
||||
338
libs/core/queue/src/rate-limiter.ts
Normal file
338
libs/core/queue/src/rate-limiter.ts
Normal file
|
|
@ -0,0 +1,338 @@
|
|||
import { RateLimiterRedis, RateLimiterRes } from 'rate-limiter-flexible';
|
||||
import type { RateLimitConfig as BaseRateLimitConfig, RateLimitRule } from './types';
|
||||
|
||||
// Logger interface for type safety
|
||||
interface Logger {
|
||||
info(message: string, meta?: Record<string, unknown>): void;
|
||||
error(message: string, meta?: Record<string, unknown>): void;
|
||||
warn(message: string, meta?: Record<string, unknown>): void;
|
||||
debug(message: string, meta?: Record<string, unknown>): void;
|
||||
}
|
||||
|
||||
// Extend the base config to add rate-limiter specific fields
|
||||
export interface RateLimitConfig extends BaseRateLimitConfig {
|
||||
keyPrefix?: string;
|
||||
}
|
||||
|
||||
export class QueueRateLimiter {
|
||||
private limiters = new Map<string, RateLimiterRedis>();
|
||||
private rules: RateLimitRule[] = [];
|
||||
private readonly logger: Logger;
|
||||
|
||||
constructor(
|
||||
private redisClient: ReturnType<typeof import('./utils').getRedisConnection>,
|
||||
logger?: Logger
|
||||
) {
|
||||
this.logger = logger || console;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a rate limit rule
|
||||
*/
|
||||
addRule(rule: RateLimitRule): void {
|
||||
this.rules.push(rule);
|
||||
|
||||
const key = this.getRuleKey(rule.level, rule.queueName, rule.handler, rule.operation);
|
||||
const limiter = new RateLimiterRedis({
|
||||
storeClient: this.redisClient,
|
||||
keyPrefix: `rl:${key}`,
|
||||
points: rule.config.points,
|
||||
duration: rule.config.duration,
|
||||
blockDuration: rule.config.blockDuration || 0,
|
||||
});
|
||||
|
||||
this.limiters.set(key, limiter);
|
||||
|
||||
this.logger.info('Rate limit rule added', {
|
||||
level: rule.level,
|
||||
queueName: rule.queueName,
|
||||
handler: rule.handler,
|
||||
operation: rule.operation,
|
||||
points: rule.config.points,
|
||||
duration: rule.config.duration,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a job can be processed based on rate limits
|
||||
* Uses hierarchical precedence: operation > handler > queue > global
|
||||
* The most specific matching rule takes precedence
|
||||
*/
|
||||
async checkLimit(
|
||||
queueName: string,
|
||||
handler: string,
|
||||
operation: string
|
||||
): Promise<{
|
||||
allowed: boolean;
|
||||
retryAfter?: number;
|
||||
remainingPoints?: number;
|
||||
appliedRule?: RateLimitRule;
|
||||
}> {
|
||||
const applicableRule = this.getMostSpecificRule(queueName, handler, operation);
|
||||
|
||||
if (!applicableRule) {
|
||||
return { allowed: true };
|
||||
}
|
||||
|
||||
const key = this.getRuleKey(
|
||||
applicableRule.level,
|
||||
applicableRule.queueName,
|
||||
applicableRule.handler,
|
||||
applicableRule.operation
|
||||
);
|
||||
const limiter = this.limiters.get(key);
|
||||
|
||||
if (!limiter) {
|
||||
this.logger.warn('Rate limiter not found for rule', { key, rule: applicableRule });
|
||||
return { allowed: true };
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await this.consumePoint(
|
||||
limiter,
|
||||
this.getConsumerKey(queueName, handler, operation)
|
||||
);
|
||||
|
||||
return {
|
||||
...result,
|
||||
appliedRule: applicableRule,
|
||||
};
|
||||
} catch (error) {
|
||||
this.logger.error('Rate limit check failed', { queueName, handler, operation, error });
|
||||
// On error, allow the request to proceed
|
||||
return { allowed: true };
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the most specific rule that applies to this job
|
||||
* Precedence: operation > handler > queue > global
|
||||
*/
|
||||
private getMostSpecificRule(
|
||||
queueName: string,
|
||||
handler: string,
|
||||
operation: string
|
||||
): RateLimitRule | undefined {
|
||||
// 1. Check for operation-specific rule (most specific)
|
||||
let rule = this.rules.find(
|
||||
r =>
|
||||
r.level === 'operation' &&
|
||||
r.queueName === queueName &&
|
||||
r.handler === handler &&
|
||||
r.operation === operation
|
||||
);
|
||||
if (rule) {
|
||||
return rule;
|
||||
}
|
||||
|
||||
// 2. Check for handler-specific rule
|
||||
rule = this.rules.find(
|
||||
r => r.level === 'handler' && r.queueName === queueName && r.handler === handler
|
||||
);
|
||||
if (rule) {
|
||||
return rule;
|
||||
}
|
||||
|
||||
// 3. Check for queue-specific rule
|
||||
rule = this.rules.find(r => r.level === 'queue' && r.queueName === queueName);
|
||||
if (rule) {
|
||||
return rule;
|
||||
}
|
||||
|
||||
// 4. Check for global rule (least specific)
|
||||
rule = this.rules.find(r => r.level === 'global');
|
||||
return rule;
|
||||
}
|
||||
|
||||
/**
|
||||
* Consume a point from the rate limiter
|
||||
*/
|
||||
private async consumePoint(
|
||||
limiter: RateLimiterRedis,
|
||||
key: string
|
||||
): Promise<{ allowed: boolean; retryAfter?: number; remainingPoints?: number }> {
|
||||
try {
|
||||
const result = await limiter.consume(key);
|
||||
return {
|
||||
allowed: true,
|
||||
remainingPoints: result.remainingPoints,
|
||||
};
|
||||
} catch (rejRes) {
|
||||
if (rejRes instanceof RateLimiterRes) {
|
||||
this.logger.warn('Rate limit exceeded', {
|
||||
key,
|
||||
retryAfter: rejRes.msBeforeNext,
|
||||
});
|
||||
|
||||
return {
|
||||
allowed: false,
|
||||
retryAfter: rejRes.msBeforeNext,
|
||||
remainingPoints: rejRes.remainingPoints,
|
||||
};
|
||||
}
|
||||
throw rejRes;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get rule key for storing rate limiter
|
||||
*/
|
||||
private getRuleKey(
|
||||
level: string,
|
||||
queueName?: string,
|
||||
handler?: string,
|
||||
operation?: string
|
||||
): string {
|
||||
switch (level) {
|
||||
case 'global':
|
||||
return 'global';
|
||||
case 'queue':
|
||||
return `queue:${queueName}`;
|
||||
case 'handler':
|
||||
return `handler:${queueName}:${handler}`;
|
||||
case 'operation':
|
||||
return `operation:${queueName}:${handler}:${operation}`;
|
||||
default:
|
||||
return level;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get consumer key for rate limiting (what gets counted)
|
||||
*/
|
||||
private getConsumerKey(queueName: string, handler: string, operation: string): string {
|
||||
return `${queueName}:${handler}:${operation}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get current rate limit status for a queue/handler/operation
|
||||
*/
|
||||
async getStatus(
|
||||
queueName: string,
|
||||
handler: string,
|
||||
operation: string
|
||||
): Promise<{
|
||||
queueName: string;
|
||||
handler: string;
|
||||
operation: string;
|
||||
appliedRule?: RateLimitRule;
|
||||
limit?: {
|
||||
level: string;
|
||||
points: number;
|
||||
duration: number;
|
||||
remaining: number;
|
||||
resetIn: number;
|
||||
};
|
||||
}> {
|
||||
const applicableRule = this.getMostSpecificRule(queueName, handler, operation);
|
||||
|
||||
if (!applicableRule) {
|
||||
return {
|
||||
queueName,
|
||||
handler,
|
||||
operation,
|
||||
};
|
||||
}
|
||||
|
||||
const key = this.getRuleKey(
|
||||
applicableRule.level,
|
||||
applicableRule.queueName,
|
||||
applicableRule.handler,
|
||||
applicableRule.operation
|
||||
);
|
||||
const limiter = this.limiters.get(key);
|
||||
|
||||
if (!limiter) {
|
||||
return {
|
||||
queueName,
|
||||
handler,
|
||||
operation,
|
||||
appliedRule: applicableRule,
|
||||
};
|
||||
}
|
||||
|
||||
try {
|
||||
const consumerKey = this.getConsumerKey(queueName, handler, operation);
|
||||
const result = await limiter.get(consumerKey);
|
||||
|
||||
const limit = {
|
||||
level: applicableRule.level,
|
||||
points: limiter.points,
|
||||
duration: limiter.duration,
|
||||
remaining: result?.remainingPoints ?? limiter.points,
|
||||
resetIn: result?.msBeforeNext ?? 0,
|
||||
};
|
||||
|
||||
return {
|
||||
queueName,
|
||||
handler,
|
||||
operation,
|
||||
appliedRule: applicableRule,
|
||||
limit,
|
||||
};
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to get rate limit status', { queueName, handler, operation, error });
|
||||
return {
|
||||
queueName,
|
||||
handler,
|
||||
operation,
|
||||
appliedRule: applicableRule,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset rate limits for a specific consumer
|
||||
*/
|
||||
async reset(queueName: string, handler?: string, operation?: string): Promise<void> {
|
||||
if (handler && operation) {
|
||||
// Reset specific operation
|
||||
const consumerKey = this.getConsumerKey(queueName, handler, operation);
|
||||
const rule = this.getMostSpecificRule(queueName, handler, operation);
|
||||
|
||||
if (rule) {
|
||||
const key = this.getRuleKey(rule.level, rule.queueName, rule.handler, rule.operation);
|
||||
const limiter = this.limiters.get(key);
|
||||
if (limiter) {
|
||||
await limiter.delete(consumerKey);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Reset broader scope - this is more complex with the new hierarchy
|
||||
this.logger.warn('Broad reset not implemented yet', { queueName, handler, operation });
|
||||
}
|
||||
|
||||
this.logger.info('Rate limits reset', { queueName, handler, operation });
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all configured rate limit rules
|
||||
*/
|
||||
getRules(): RateLimitRule[] {
|
||||
return [...this.rules];
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a rate limit rule
|
||||
*/
|
||||
removeRule(level: string, queueName?: string, handler?: string, operation?: string): boolean {
|
||||
const key = this.getRuleKey(level, queueName, handler, operation);
|
||||
const ruleIndex = this.rules.findIndex(
|
||||
r =>
|
||||
r.level === level &&
|
||||
(!queueName || r.queueName === queueName) &&
|
||||
(!handler || r.handler === handler) &&
|
||||
(!operation || r.operation === operation)
|
||||
);
|
||||
|
||||
if (ruleIndex >= 0) {
|
||||
this.rules.splice(ruleIndex, 1);
|
||||
this.limiters.delete(key);
|
||||
|
||||
this.logger.info('Rate limit rule removed', { level, queueName, handler, operation });
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
169
libs/core/queue/src/service-cache.ts
Normal file
169
libs/core/queue/src/service-cache.ts
Normal file
|
|
@ -0,0 +1,169 @@
|
|||
import { createCache, type CacheProvider, type CacheStats } from '@stock-bot/cache';
|
||||
import type { RedisConfig } from './types';
|
||||
import { getServiceConfig } from './service-registry';
|
||||
|
||||
/**
|
||||
* Service-aware cache that uses the service's Redis DB
|
||||
* Automatically prefixes keys with the service's cache namespace
|
||||
*/
|
||||
export class ServiceCache implements CacheProvider {
|
||||
private cache: CacheProvider;
|
||||
private prefix: string;
|
||||
|
||||
constructor(
|
||||
serviceName: string,
|
||||
redisConfig: RedisConfig,
|
||||
isGlobalCache: boolean = false,
|
||||
logger?: any
|
||||
) {
|
||||
// Get service configuration
|
||||
const serviceConfig = getServiceConfig(serviceName);
|
||||
if (!serviceConfig && !isGlobalCache) {
|
||||
throw new Error(`Unknown service: ${serviceName}`);
|
||||
}
|
||||
|
||||
// Determine Redis DB and prefix
|
||||
let db: number;
|
||||
let prefix: string;
|
||||
|
||||
if (isGlobalCache) {
|
||||
// Global cache uses db:0
|
||||
db = 0;
|
||||
prefix = 'stock-bot:shared';
|
||||
} else {
|
||||
// Service cache uses service's DB
|
||||
db = serviceConfig!.db;
|
||||
prefix = serviceConfig!.cachePrefix;
|
||||
}
|
||||
|
||||
// Create underlying cache with correct DB
|
||||
const cacheConfig = {
|
||||
redisConfig: {
|
||||
...redisConfig,
|
||||
db,
|
||||
},
|
||||
keyPrefix: prefix + ':',
|
||||
logger,
|
||||
};
|
||||
|
||||
this.cache = createCache(cacheConfig);
|
||||
this.prefix = prefix;
|
||||
}
|
||||
|
||||
// Implement CacheProvider interface
|
||||
async get<T = any>(key: string): Promise<T | null> {
|
||||
return this.cache.get<T>(key);
|
||||
}
|
||||
|
||||
async set<T = any>(
|
||||
key: string,
|
||||
value: T,
|
||||
options?:
|
||||
| number
|
||||
| {
|
||||
ttl?: number;
|
||||
preserveTTL?: boolean;
|
||||
onlyIfExists?: boolean;
|
||||
onlyIfNotExists?: boolean;
|
||||
getOldValue?: boolean;
|
||||
}
|
||||
): Promise<T | null> {
|
||||
return this.cache.set(key, value, options);
|
||||
}
|
||||
|
||||
async del(key: string): Promise<void> {
|
||||
return this.cache.del(key);
|
||||
}
|
||||
|
||||
async exists(key: string): Promise<boolean> {
|
||||
return this.cache.exists(key);
|
||||
}
|
||||
|
||||
async clear(): Promise<void> {
|
||||
return this.cache.clear();
|
||||
}
|
||||
|
||||
async keys(pattern: string): Promise<string[]> {
|
||||
return this.cache.keys(pattern);
|
||||
}
|
||||
|
||||
getStats(): CacheStats {
|
||||
return this.cache.getStats();
|
||||
}
|
||||
|
||||
async health(): Promise<boolean> {
|
||||
return this.cache.health();
|
||||
}
|
||||
|
||||
async waitForReady(timeout?: number): Promise<void> {
|
||||
return this.cache.waitForReady(timeout);
|
||||
}
|
||||
|
||||
isReady(): boolean {
|
||||
return this.cache.isReady();
|
||||
}
|
||||
|
||||
// Enhanced cache methods (delegate to underlying cache if available)
|
||||
async update<T = any>(key: string, value: T): Promise<T | null> {
|
||||
if (this.cache.update) {
|
||||
return this.cache.update(key, value);
|
||||
}
|
||||
// Fallback implementation
|
||||
return this.cache.set(key, value, { preserveTTL: true });
|
||||
}
|
||||
|
||||
async setIfExists<T = any>(key: string, value: T, ttl?: number): Promise<boolean> {
|
||||
if (this.cache.setIfExists) {
|
||||
return this.cache.setIfExists(key, value, ttl);
|
||||
}
|
||||
// Fallback implementation
|
||||
const result = await this.cache.set(key, value, { onlyIfExists: true, ttl });
|
||||
return result !== null;
|
||||
}
|
||||
|
||||
async setIfNotExists<T = any>(key: string, value: T, ttl?: number): Promise<boolean> {
|
||||
if (this.cache.setIfNotExists) {
|
||||
return this.cache.setIfNotExists(key, value, ttl);
|
||||
}
|
||||
// Fallback implementation
|
||||
const result = await this.cache.set(key, value, { onlyIfNotExists: true, ttl });
|
||||
return result !== null;
|
||||
}
|
||||
|
||||
async replace<T = any>(key: string, value: T, ttl?: number): Promise<T | null> {
|
||||
if (this.cache.replace) {
|
||||
return this.cache.replace(key, value, ttl);
|
||||
}
|
||||
// Fallback implementation
|
||||
return this.cache.set(key, value, ttl);
|
||||
}
|
||||
|
||||
async updateField<T = any>(key: string, updater: (current: T | null) => T, ttl?: number): Promise<T | null> {
|
||||
if (this.cache.updateField) {
|
||||
return this.cache.updateField(key, updater, ttl);
|
||||
}
|
||||
// Fallback implementation
|
||||
const current = await this.cache.get<T>(key);
|
||||
const updated = updater(current);
|
||||
return this.cache.set(key, updated, ttl);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the actual Redis key with prefix
|
||||
*/
|
||||
getKey(key: string): string {
|
||||
return `${this.prefix}:${key}`;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Factory function to create service cache
|
||||
*/
|
||||
export function createServiceCache(
|
||||
serviceName: string,
|
||||
redisConfig: RedisConfig,
|
||||
options: { global?: boolean; logger?: any } = {}
|
||||
): ServiceCache {
|
||||
return new ServiceCache(serviceName, redisConfig, options.global, options.logger);
|
||||
}
|
||||
115
libs/core/queue/src/service-registry.ts
Normal file
115
libs/core/queue/src/service-registry.ts
Normal file
|
|
@ -0,0 +1,115 @@
|
|||
/**
|
||||
* Service Registry Configuration
|
||||
* Maps services to their Redis databases and configurations
|
||||
*/
|
||||
|
||||
export interface ServiceConfig {
|
||||
/** Redis database number for this service (used for both queues and cache) */
|
||||
db: number;
|
||||
/** Prefix for queue keys (e.g., 'bull:di') */
|
||||
queuePrefix: string;
|
||||
/** Prefix for cache keys (e.g., 'cache:di') */
|
||||
cachePrefix: string;
|
||||
/** Whether this service only produces jobs (doesn't process them) */
|
||||
producerOnly?: boolean;
|
||||
/** List of handlers this service owns (auto-discovered if not provided) */
|
||||
handlers?: string[];
|
||||
}
|
||||
|
||||
/**
|
||||
* Central registry of all services and their configurations
|
||||
* Each service gets one Redis DB for both queues and cache
|
||||
*
|
||||
* Database assignments:
|
||||
* - db:0 = Global shared cache
|
||||
* - db:1 = data-ingestion (queues + cache)
|
||||
* - db:2 = data-pipeline (queues + cache)
|
||||
* - db:3 = web-api (cache only, producer-only for queues)
|
||||
*/
|
||||
export const SERVICE_REGISTRY: Record<string, ServiceConfig> = {
|
||||
'data-ingestion': {
|
||||
db: 1,
|
||||
queuePrefix: 'bull:di',
|
||||
cachePrefix: 'cache:di',
|
||||
handlers: ['ceo', 'qm', 'webshare', 'ib', 'proxy'],
|
||||
},
|
||||
'data-pipeline': {
|
||||
db: 2,
|
||||
queuePrefix: 'bull:dp',
|
||||
cachePrefix: 'cache:dp',
|
||||
handlers: ['exchanges', 'symbols'],
|
||||
},
|
||||
'web-api': {
|
||||
db: 3,
|
||||
queuePrefix: 'bull:api', // Not used since producer-only
|
||||
cachePrefix: 'cache:api',
|
||||
producerOnly: true,
|
||||
},
|
||||
// Add aliases for services with different naming conventions
|
||||
'webApi': {
|
||||
db: 3,
|
||||
queuePrefix: 'bull:api',
|
||||
cachePrefix: 'cache:api',
|
||||
producerOnly: true,
|
||||
},
|
||||
'dataIngestion': {
|
||||
db: 1,
|
||||
queuePrefix: 'bull:di',
|
||||
cachePrefix: 'cache:di',
|
||||
handlers: ['ceo', 'qm', 'webshare', 'ib', 'proxy'],
|
||||
},
|
||||
'dataPipeline': {
|
||||
db: 2,
|
||||
queuePrefix: 'bull:dp',
|
||||
cachePrefix: 'cache:dp',
|
||||
handlers: ['exchanges', 'symbols'],
|
||||
},
|
||||
};
|
||||
|
||||
/**
|
||||
* Get service configuration
|
||||
*/
|
||||
export function getServiceConfig(serviceName: string): ServiceConfig | undefined {
|
||||
return SERVICE_REGISTRY[serviceName];
|
||||
}
|
||||
|
||||
/**
|
||||
* Find which service owns a handler
|
||||
*/
|
||||
export function findServiceForHandler(handlerName: string): string | undefined {
|
||||
for (const [serviceName, config] of Object.entries(SERVICE_REGISTRY)) {
|
||||
if (config.handlers?.includes(handlerName)) {
|
||||
return serviceName;
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get full queue name - just the handler name since each service has its own Redis DB
|
||||
*/
|
||||
export function getFullQueueName(serviceName: string, handlerName: string): string {
|
||||
// Just return the handler name since DB isolation provides namespace separation
|
||||
return handlerName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse a full queue name into service and handler
|
||||
* Since queue names are just handler names now, we need to find the service from the handler
|
||||
*/
|
||||
export function parseQueueName(fullQueueName: string): { service: string; handler: string } | null {
|
||||
// Queue name is just the handler name now
|
||||
const handlerName = fullQueueName;
|
||||
|
||||
// Find which service owns this handler
|
||||
const serviceName = findServiceForHandler(handlerName);
|
||||
|
||||
if (!serviceName) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return {
|
||||
service: serviceName,
|
||||
handler: handlerName,
|
||||
};
|
||||
}
|
||||
349
libs/core/queue/src/smart-queue-manager.ts
Normal file
349
libs/core/queue/src/smart-queue-manager.ts
Normal file
|
|
@ -0,0 +1,349 @@
|
|||
import { Queue as BullQueue, type Job } from 'bullmq';
|
||||
import { handlerRegistry } from '@stock-bot/handlers';
|
||||
import { getLogger, type Logger } from '@stock-bot/logger';
|
||||
import { QueueManager } from './queue-manager';
|
||||
import { Queue } from './queue';
|
||||
import type {
|
||||
SmartQueueConfig,
|
||||
QueueRoute,
|
||||
JobData,
|
||||
JobOptions,
|
||||
RedisConfig
|
||||
} from './types';
|
||||
import {
|
||||
SERVICE_REGISTRY,
|
||||
getServiceConfig,
|
||||
findServiceForHandler,
|
||||
getFullQueueName,
|
||||
parseQueueName,
|
||||
type ServiceConfig
|
||||
} from './service-registry';
|
||||
import { getRedisConnection } from './utils';
|
||||
|
||||
/**
|
||||
* Smart Queue Manager with automatic service discovery and routing
|
||||
* Handles cross-service communication seamlessly
|
||||
*/
|
||||
export class SmartQueueManager extends QueueManager {
|
||||
private serviceName: string;
|
||||
private serviceConfig: ServiceConfig;
|
||||
private queueRoutes = new Map<string, QueueRoute>();
|
||||
private connections = new Map<number, any>(); // Redis connections by DB
|
||||
private producerQueues = new Map<string, BullQueue>(); // For cross-service sending
|
||||
private _logger: Logger;
|
||||
|
||||
constructor(config: SmartQueueConfig, logger?: Logger) {
|
||||
// Get service config
|
||||
const serviceConfig = getServiceConfig(config.serviceName);
|
||||
if (!serviceConfig) {
|
||||
throw new Error(`Unknown service: ${config.serviceName}`);
|
||||
}
|
||||
|
||||
// Update Redis config to use service's DB
|
||||
const modifiedConfig = {
|
||||
...config,
|
||||
redis: {
|
||||
...config.redis,
|
||||
db: serviceConfig.db,
|
||||
},
|
||||
};
|
||||
|
||||
super(modifiedConfig, logger);
|
||||
|
||||
this.serviceName = config.serviceName;
|
||||
this.serviceConfig = serviceConfig;
|
||||
this._logger = logger || getLogger('SmartQueueManager');
|
||||
|
||||
// Auto-discover routes if enabled
|
||||
if (config.autoDiscoverHandlers !== false) {
|
||||
this.discoverQueueRoutes();
|
||||
}
|
||||
|
||||
this._logger.info('SmartQueueManager initialized', {
|
||||
service: this.serviceName,
|
||||
db: serviceConfig.db,
|
||||
handlers: serviceConfig.handlers,
|
||||
producerOnly: serviceConfig.producerOnly,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Discover all available queue routes from handler registry
|
||||
*/
|
||||
private discoverQueueRoutes(): void {
|
||||
// Discover from handler registry if available
|
||||
try {
|
||||
const handlers = handlerRegistry.getAllHandlers();
|
||||
for (const [handlerName, handlerConfig] of handlers) {
|
||||
// Find which service owns this handler
|
||||
const ownerService = findServiceForHandler(handlerName);
|
||||
if (ownerService) {
|
||||
const ownerConfig = getServiceConfig(ownerService)!;
|
||||
const fullName = getFullQueueName(ownerService, handlerName);
|
||||
|
||||
this.queueRoutes.set(handlerName, {
|
||||
fullName,
|
||||
service: ownerService,
|
||||
handler: handlerName,
|
||||
db: ownerConfig.db,
|
||||
operations: Object.keys(handlerConfig.operations || {}),
|
||||
});
|
||||
|
||||
this._logger.trace('Discovered queue route', {
|
||||
handler: handlerName,
|
||||
service: ownerService,
|
||||
db: ownerConfig.db,
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
this._logger.warn('Handler registry not available, using static configuration', { error });
|
||||
}
|
||||
|
||||
// Also add routes from static configuration
|
||||
Object.entries(SERVICE_REGISTRY).forEach(([serviceName, config]) => {
|
||||
if (config.handlers) {
|
||||
config.handlers.forEach(handlerName => {
|
||||
if (!this.queueRoutes.has(handlerName)) {
|
||||
const fullName = getFullQueueName(serviceName, handlerName);
|
||||
this.queueRoutes.set(handlerName, {
|
||||
fullName,
|
||||
service: serviceName,
|
||||
handler: handlerName,
|
||||
db: config.db,
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get or create a Redis connection for a specific DB
|
||||
*/
|
||||
private getConnection(db: number): any {
|
||||
if (!this.connections.has(db)) {
|
||||
const redisConfig: RedisConfig = {
|
||||
...this.getRedisConfig(),
|
||||
db,
|
||||
};
|
||||
const connection = getRedisConnection(redisConfig);
|
||||
this.connections.set(db, connection);
|
||||
this._logger.debug('Created Redis connection', { db });
|
||||
}
|
||||
return this.connections.get(db);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a queue for the current service (for processing)
|
||||
* Overrides parent to use namespaced queue names
|
||||
*/
|
||||
override getQueue(queueName: string, options = {}): Queue {
|
||||
// For local queues, use the service namespace
|
||||
const fullQueueName = getFullQueueName(this.serviceName, queueName);
|
||||
return super.getQueue(fullQueueName, options);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Send a job to any queue (local or remote)
|
||||
* This is the main method for cross-service communication
|
||||
*/
|
||||
async send(
|
||||
targetQueue: string,
|
||||
operation: string,
|
||||
payload: unknown,
|
||||
options: JobOptions = {}
|
||||
): Promise<Job> {
|
||||
// Resolve the target queue
|
||||
const route = this.resolveQueueRoute(targetQueue);
|
||||
if (!route) {
|
||||
throw new Error(`Unknown queue: ${targetQueue}`);
|
||||
}
|
||||
|
||||
// Validate operation if we have metadata
|
||||
if (route.operations && !route.operations.includes(operation)) {
|
||||
this._logger.warn('Operation not found in handler metadata', {
|
||||
queue: targetQueue,
|
||||
operation,
|
||||
available: route.operations,
|
||||
});
|
||||
}
|
||||
|
||||
// Get or create producer queue for the target
|
||||
const producerQueue = this.getProducerQueue(route);
|
||||
|
||||
// Create job data
|
||||
const jobData: JobData = {
|
||||
handler: route.handler,
|
||||
operation,
|
||||
payload,
|
||||
};
|
||||
|
||||
// Send the job
|
||||
const job = await producerQueue.add(operation, jobData, options);
|
||||
|
||||
this._logger.debug('Job sent to queue', {
|
||||
from: this.serviceName,
|
||||
to: route.service,
|
||||
queue: route.handler,
|
||||
operation,
|
||||
jobId: job.id,
|
||||
});
|
||||
|
||||
return job;
|
||||
}
|
||||
|
||||
/**
|
||||
* Alias for send() with more explicit name
|
||||
*/
|
||||
async sendTo(
|
||||
targetService: string,
|
||||
handler: string,
|
||||
operation: string,
|
||||
payload: unknown,
|
||||
options: JobOptions = {}
|
||||
): Promise<Job> {
|
||||
const fullQueueName = `${targetService}:${handler}`;
|
||||
return this.send(fullQueueName, operation, payload, options);
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve a queue name to a route
|
||||
*/
|
||||
private resolveQueueRoute(queueName: string): QueueRoute | null {
|
||||
// Check if it's a handler name (which is now the full queue name)
|
||||
const parsed = parseQueueName(queueName);
|
||||
if (parsed) {
|
||||
const config = getServiceConfig(parsed.service);
|
||||
if (config) {
|
||||
return {
|
||||
fullName: queueName,
|
||||
service: parsed.service,
|
||||
handler: parsed.handler,
|
||||
db: config.db,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// Check if it's just a handler name
|
||||
const route = this.queueRoutes.get(queueName);
|
||||
if (route) {
|
||||
return route;
|
||||
}
|
||||
|
||||
// Try to find in static config
|
||||
const ownerService = findServiceForHandler(queueName);
|
||||
if (ownerService) {
|
||||
const config = getServiceConfig(ownerService)!;
|
||||
return {
|
||||
fullName: getFullQueueName(ownerService, queueName),
|
||||
service: ownerService,
|
||||
handler: queueName,
|
||||
db: config.db,
|
||||
};
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get or create a producer queue for cross-service communication
|
||||
*/
|
||||
private getProducerQueue(route: QueueRoute): BullQueue {
|
||||
if (!this.producerQueues.has(route.fullName)) {
|
||||
const connection = this.getConnection(route.db);
|
||||
// Match the queue name format used by workers: {queueName}
|
||||
const queue = new BullQueue(`{${route.fullName}}`, {
|
||||
connection,
|
||||
defaultJobOptions: this.getConfig().defaultQueueOptions?.defaultJobOptions || {},
|
||||
});
|
||||
this.producerQueues.set(route.fullName, queue);
|
||||
}
|
||||
return this.producerQueues.get(route.fullName)!;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all queues (for monitoring purposes)
|
||||
*/
|
||||
getAllQueues(): Record<string, BullQueue> {
|
||||
const allQueues: Record<string, BullQueue> = {};
|
||||
|
||||
// Get all worker queues using public API
|
||||
const workerQueueNames = this.getQueueNames();
|
||||
for (const name of workerQueueNames) {
|
||||
const queue = this.getQueue(name);
|
||||
if (queue && typeof queue.getBullQueue === 'function') {
|
||||
// Extract the underlying BullMQ queue using the public getter
|
||||
// Use the simple handler name without service prefix for display
|
||||
const parts = name.split(':');
|
||||
const simpleName = parts.length > 1 ? parts[1] : name;
|
||||
if (simpleName) {
|
||||
allQueues[simpleName] = queue.getBullQueue();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add producer queues
|
||||
for (const [name, queue] of this.producerQueues) {
|
||||
// Use the simple handler name without service prefix for display
|
||||
const parts = name.split(':');
|
||||
const simpleName = parts.length > 1 ? parts[1] : name;
|
||||
if (simpleName && !allQueues[simpleName]) {
|
||||
allQueues[simpleName] = queue;
|
||||
}
|
||||
}
|
||||
|
||||
// If no queues found, return all registered handlers as BullMQ queues
|
||||
if (Object.keys(allQueues).length === 0) {
|
||||
// Create BullMQ queue instances for known handlers
|
||||
const handlers = ['proxy', 'qm', 'ib', 'ceo', 'webshare', 'exchanges', 'symbols'];
|
||||
for (const handler of handlers) {
|
||||
const connection = this.getConnection(1); // Use default DB
|
||||
allQueues[handler] = new BullQueue(`{${handler}}`, {
|
||||
connection,
|
||||
defaultJobOptions: this.getConfig().defaultQueueOptions?.defaultJobOptions || {},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return allQueues;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get statistics for all queues across all services
|
||||
*/
|
||||
async getAllStats(): Promise<Record<string, any>> {
|
||||
const stats: Record<string, any> = {};
|
||||
|
||||
// Get stats for local queues
|
||||
stats[this.serviceName] = await this.getGlobalStats();
|
||||
|
||||
// Get stats for other services if we have access
|
||||
// This would require additional implementation
|
||||
|
||||
return stats;
|
||||
}
|
||||
|
||||
/**
|
||||
* Graceful shutdown
|
||||
*/
|
||||
override async shutdown(): Promise<void> {
|
||||
// Close producer queues
|
||||
for (const [name, queue] of this.producerQueues) {
|
||||
await queue.close();
|
||||
this._logger.debug('Closed producer queue', { queue: name });
|
||||
}
|
||||
|
||||
// Close additional connections
|
||||
for (const [db, connection] of this.connections) {
|
||||
if (db !== this.serviceConfig.db) { // Don't close our main connection
|
||||
connection.disconnect();
|
||||
this._logger.debug('Closed Redis connection', { db });
|
||||
}
|
||||
}
|
||||
|
||||
// Call parent shutdown
|
||||
await super.shutdown();
|
||||
}
|
||||
}
|
||||
169
libs/core/queue/src/types.ts
Normal file
169
libs/core/queue/src/types.ts
Normal file
|
|
@ -0,0 +1,169 @@
|
|||
// Import types we need to extend
|
||||
import type { JobOptions, QueueStats } from '@stock-bot/types';
|
||||
|
||||
// Re-export handler and queue types from shared types package
|
||||
export type {
|
||||
HandlerConfig,
|
||||
HandlerConfigWithSchedule,
|
||||
JobHandler,
|
||||
ScheduledJob,
|
||||
TypedJobHandler,
|
||||
JobData,
|
||||
JobOptions,
|
||||
QueueWorkerConfig,
|
||||
QueueStats
|
||||
} from '@stock-bot/types';
|
||||
|
||||
export interface ProcessOptions {
|
||||
totalDelayHours: number;
|
||||
batchSize?: number;
|
||||
priority?: number;
|
||||
useBatching?: boolean;
|
||||
retries?: number;
|
||||
ttl?: number;
|
||||
removeOnComplete?: number;
|
||||
removeOnFail?: number;
|
||||
// Job routing information
|
||||
handler?: string;
|
||||
operation?: string;
|
||||
}
|
||||
|
||||
export interface BatchResult {
|
||||
jobsCreated: number;
|
||||
mode: 'direct' | 'batch';
|
||||
totalItems: number;
|
||||
batchesCreated?: number;
|
||||
duration: number;
|
||||
}
|
||||
|
||||
// New improved types for the refactored architecture
|
||||
export interface RedisConfig {
|
||||
host: string;
|
||||
port: number;
|
||||
password?: string;
|
||||
db?: number;
|
||||
}
|
||||
|
||||
// Extended job options specific to this queue implementation
|
||||
export interface ExtendedJobOptions extends JobOptions {
|
||||
repeat?: {
|
||||
pattern?: string;
|
||||
key?: string;
|
||||
limit?: number;
|
||||
every?: number;
|
||||
immediately?: boolean;
|
||||
};
|
||||
}
|
||||
|
||||
export interface QueueOptions {
|
||||
defaultJobOptions?: ExtendedJobOptions;
|
||||
workers?: number;
|
||||
concurrency?: number;
|
||||
enableMetrics?: boolean;
|
||||
enableDLQ?: boolean;
|
||||
enableRateLimit?: boolean;
|
||||
rateLimitRules?: RateLimitRule[]; // Queue-specific rate limit rules
|
||||
}
|
||||
|
||||
export interface QueueManagerConfig {
|
||||
redis: RedisConfig;
|
||||
defaultQueueOptions?: QueueOptions;
|
||||
enableScheduledJobs?: boolean;
|
||||
globalRateLimit?: RateLimitConfig;
|
||||
rateLimitRules?: RateLimitRule[]; // Global rate limit rules
|
||||
delayWorkerStart?: boolean; // If true, workers won't start automatically
|
||||
}
|
||||
|
||||
// Queue-specific stats that extend the base types
|
||||
export interface GlobalStats {
|
||||
queues: Record<string, QueueStats>;
|
||||
totalJobs: number;
|
||||
totalWorkers: number;
|
||||
uptime: number;
|
||||
}
|
||||
|
||||
// Legacy type for backward compatibility
|
||||
export interface QueueConfig extends QueueManagerConfig {
|
||||
queueName?: string;
|
||||
workers?: number;
|
||||
concurrency?: number;
|
||||
handlers?: HandlerInitializer[];
|
||||
dlqConfig?: DLQConfig;
|
||||
enableMetrics?: boolean;
|
||||
}
|
||||
|
||||
|
||||
// Extended batch job data for queue implementation
|
||||
export interface BatchJobData {
|
||||
payloadKey: string;
|
||||
batchIndex: number;
|
||||
totalBatches: number;
|
||||
itemCount: number;
|
||||
totalDelayHours: number; // Total time to distribute all batches
|
||||
}
|
||||
|
||||
export interface HandlerInitializer {
|
||||
(): void | Promise<void>;
|
||||
}
|
||||
|
||||
// Rate limiting types
|
||||
export interface RateLimitConfig {
|
||||
points: number;
|
||||
duration: number;
|
||||
blockDuration?: number;
|
||||
}
|
||||
|
||||
export interface RateLimitRule {
|
||||
level: 'global' | 'queue' | 'handler' | 'operation';
|
||||
queueName?: string; // For queue-level limits
|
||||
handler?: string; // For handler-level limits
|
||||
operation?: string; // For operation-level limits (most specific)
|
||||
config: RateLimitConfig;
|
||||
}
|
||||
|
||||
// DLQ types
|
||||
export interface DLQConfig {
|
||||
maxRetries?: number;
|
||||
retryDelay?: number;
|
||||
alertThreshold?: number;
|
||||
cleanupAge?: number;
|
||||
}
|
||||
|
||||
export interface DLQJobInfo {
|
||||
id: string;
|
||||
name: string;
|
||||
failedReason: string;
|
||||
attemptsMade: number;
|
||||
timestamp: number;
|
||||
data: unknown;
|
||||
}
|
||||
|
||||
export interface ScheduleConfig {
|
||||
pattern: string;
|
||||
jobName: string;
|
||||
data?: unknown;
|
||||
options?: ExtendedJobOptions;
|
||||
}
|
||||
|
||||
// Smart Queue Types
|
||||
export interface SmartQueueConfig extends QueueManagerConfig {
|
||||
/** Name of the current service */
|
||||
serviceName: string;
|
||||
/** Whether to auto-discover handlers from registry */
|
||||
autoDiscoverHandlers?: boolean;
|
||||
/** Custom service registry (defaults to built-in) */
|
||||
serviceRegistry?: Record<string, any>;
|
||||
}
|
||||
|
||||
export interface QueueRoute {
|
||||
/** Full queue name (now just the handler name, e.g., 'ceo') */
|
||||
fullName: string;
|
||||
/** Service that owns this queue */
|
||||
service: string;
|
||||
/** Handler name */
|
||||
handler: string;
|
||||
/** Redis DB number */
|
||||
db: number;
|
||||
/** Available operations */
|
||||
operations?: string[];
|
||||
}
|
||||
28
libs/core/queue/src/utils.ts
Normal file
28
libs/core/queue/src/utils.ts
Normal file
|
|
@ -0,0 +1,28 @@
|
|||
import type { RedisConfig } from './types';
|
||||
|
||||
/**
|
||||
* Get Redis connection configuration with retry settings
|
||||
*/
|
||||
export function getRedisConnection(config: RedisConfig) {
|
||||
const isTest = process.env.NODE_ENV === 'test' || process.env['BUNIT'] === '1';
|
||||
|
||||
return {
|
||||
host: config.host,
|
||||
port: config.port,
|
||||
password: config.password,
|
||||
db: config.db,
|
||||
maxRetriesPerRequest: null, // Required by BullMQ
|
||||
enableReadyCheck: false,
|
||||
connectTimeout: isTest ? 1000 : 3000,
|
||||
lazyConnect: false, // Changed from true to ensure connection is established immediately
|
||||
keepAlive: true, // Changed from false to maintain persistent connections
|
||||
retryStrategy: (times: number) => {
|
||||
const maxRetries = isTest ? 1 : 3;
|
||||
if (times > maxRetries) {
|
||||
return null; // Stop retrying
|
||||
}
|
||||
const delay = isTest ? 100 : Math.min(times * 100, 3000);
|
||||
return delay;
|
||||
},
|
||||
};
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue