added new queue lib with batch processor and provider

This commit is contained in:
Boki 2025-06-14 15:02:10 -04:00
parent ddcf94a587
commit 6c548416d1
19 changed files with 1939 additions and 35 deletions

View file

@ -0,0 +1,345 @@
import { CacheProvider, createCache } from '@stock-bot/cache';
import { getLogger } from '@stock-bot/logger';
import type { QueueManager } from './queue-manager';
import type { BatchJobData, BatchResult, JobData, ProcessOptions } from './types';
const logger = getLogger('batch-processor');
const cacheProviders = new Map<string, CacheProvider>();
function getCache(queueName: string): CacheProvider {
if (!cacheProviders.has(queueName)) {
const cacheProvider = createCache({
keyPrefix: `batch:${queueName}:`,
ttl: 86400, // 24 hours default
enableMetrics: true,
});
cacheProviders.set(queueName, cacheProvider);
}
return cacheProviders.get(queueName) as CacheProvider;
}
/**
* Initialize the batch cache before any batch operations
* This should be called during application startup
*/
export async function initializeBatchCache(queueManager: QueueManager): Promise<void> {
const queueName = queueManager.getQueueName();
logger.info('Initializing batch cache...', { queueName });
const cache = getCache(queueName);
await cache.waitForReady(10000);
logger.info('Batch cache initialized successfully', { queueName });
}
/**
* Main function - processes items either directly or in batches
* Each item becomes payload: item (no processing needed)
*/
export async function processItems<T>(
items: T[],
queue: QueueManager,
options: ProcessOptions
): Promise<BatchResult> {
const startTime = Date.now();
if (items.length === 0) {
return {
jobsCreated: 0,
mode: 'direct',
totalItems: 0,
duration: 0,
};
}
logger.info('Starting batch processing', {
totalItems: items.length,
mode: options.useBatching ? 'batch' : 'direct',
batchSize: options.batchSize,
totalDelayMs: options.totalDelayMs,
});
try {
const result = options.useBatching
? await processBatched(items, queue, options)
: await processDirect(items, queue, options);
const duration = Date.now() - startTime;
logger.info('Batch processing completed', {
...result,
duration: `${(duration / 1000).toFixed(1)}s`,
});
return { ...result, duration };
} catch (error) {
logger.error('Batch processing failed', error);
throw error;
}
}
/**
* Process items directly - each item becomes a separate job
*/
async function processDirect<T>(
items: T[],
queue: QueueManager,
options: ProcessOptions
): Promise<Omit<BatchResult, 'duration'>> {
const delayPerItem = options.totalDelayMs / items.length;
logger.info('Creating direct jobs', {
totalItems: items.length,
delayPerItem: `${(delayPerItem / 1000).toFixed(1)}s`,
});
const jobs = items.map((item, index) => ({
name: 'process-item',
data: {
type: 'process-item',
provider: options.provider || 'generic',
operation: options.operation || 'process-item',
payload: item, // Just the item directly - no wrapper!
priority: options.priority || undefined,
},
opts: {
delay: index * delayPerItem,
priority: options.priority || undefined,
attempts: options.retries || 3,
removeOnComplete: options.removeOnComplete || 10,
removeOnFail: options.removeOnFail || 5,
},
}));
const createdJobs = await addJobsInChunks(queue, jobs);
return {
totalItems: items.length,
jobsCreated: createdJobs.length,
mode: 'direct',
};
}
/**
* Process items in batches - store items directly
*/
async function processBatched<T>(
items: T[],
queue: QueueManager,
options: ProcessOptions
): Promise<Omit<BatchResult, 'duration'>> {
const batchSize = options.batchSize || 100;
const batches = createBatches(items, batchSize);
const delayPerBatch = options.totalDelayMs / batches.length;
logger.info('Creating batch jobs', {
totalItems: items.length,
batchSize,
totalBatches: batches.length,
delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes`,
});
const batchJobs = await Promise.all(
batches.map(async (batch, batchIndex) => {
// Just store the items directly - no processing needed
const payloadKey = await storeItems(batch, queue, options);
return {
name: 'process-batch',
data: {
type: 'process-batch',
provider: options.provider || 'generic',
operation: 'process-batch-items',
payload: {
payloadKey,
batchIndex,
totalBatches: batches.length,
itemCount: batch.length,
} as BatchJobData,
priority: options.priority || undefined,
},
opts: {
delay: batchIndex * delayPerBatch,
priority: options.priority || undefined,
attempts: options.retries || 3,
removeOnComplete: options.removeOnComplete || 10,
removeOnFail: options.removeOnFail || 5,
},
};
})
);
const createdJobs = await addJobsInChunks(queue, batchJobs);
return {
totalItems: items.length,
jobsCreated: createdJobs.length,
batchesCreated: batches.length,
mode: 'batch',
};
}
/**
* Process a batch job - loads items and creates individual jobs
*/
export async function processBatchJob(
jobData: BatchJobData,
queue: QueueManager
): Promise<unknown> {
const { payloadKey, batchIndex, totalBatches, itemCount } = jobData;
logger.debug('Processing batch job', {
batchIndex,
totalBatches,
itemCount,
});
try {
const payload = await loadPayload(payloadKey, queue);
if (!payload || !payload.items || !payload.options) {
logger.error('Invalid payload data', { payloadKey, payload });
throw new Error(`Invalid payload data for key: ${payloadKey}`);
}
const { items, options } = payload;
// Create jobs directly from items - each item becomes payload: item
const jobs = items.map((item: unknown, index: number) => ({
name: 'process-item',
data: {
type: 'process-item',
provider: options.provider || 'generic',
operation: options.operation || 'generic',
payload: item, // Just the item directly!
priority: options.priority || undefined,
},
opts: {
delay: index * (options.delayPerItem || 1000),
priority: options.priority || undefined,
attempts: options.retries || 3,
},
}));
const createdJobs = await addJobsInChunks(queue, jobs);
// Cleanup payload after successful processing
await cleanupPayload(payloadKey, queue);
return {
batchIndex,
itemsProcessed: items.length,
jobsCreated: createdJobs.length,
};
} catch (error) {
logger.error('Batch job processing failed', { batchIndex, error });
throw error;
}
}
// Helper functions
function createBatches<T>(items: T[], batchSize: number): T[][] {
const batches: T[][] = [];
for (let i = 0; i < items.length; i += batchSize) {
batches.push(items.slice(i, i + batchSize));
}
return batches;
}
async function storeItems<T>(
items: T[],
queue: QueueManager,
options: ProcessOptions
): Promise<string> {
if (!queue) {
throw new Error('Batch cache not initialized. Call initializeBatchCache() first.');
}
const cache = getCache(queue.getQueueName());
const payloadKey = `payload:${Date.now()}:${Math.random().toString(36).substr(2, 9)}`;
const payload = {
items, // Just store the items directly
options: {
delayPerItem: 1000,
priority: options.priority || undefined,
retries: options.retries || 3,
provider: options.provider || 'generic',
operation: options.operation || 'generic',
},
createdAt: new Date().toISOString(),
};
const ttlSeconds = options.ttl || 86400; // 24 hours default
await cache.set(payloadKey, payload, ttlSeconds);
return payloadKey;
}
async function loadPayload<T>(
key: string,
queue: QueueManager
): Promise<{
items: T[];
options: {
delayPerItem: number;
priority?: number;
retries: number;
provider: string;
operation: string;
};
} | null> {
if (!queue) {
throw new Error('Batch cache not initialized. Call initializeBatchCache() first.');
}
const cache = getCache(queue.getQueueName());
return (await cache.get(key)) as {
items: T[];
options: {
delayPerItem: number;
priority?: number;
retries: number;
provider: string;
operation: string;
};
} | null;
}
async function cleanupPayload(key: string, queue: QueueManager): Promise<void> {
if (!queue) {
throw new Error('Batch cache not initialized. Call initializeBatchCache() first.');
}
const cache = getCache(queue.getQueueName());
await cache.del(key);
}
async function addJobsInChunks(
queue: QueueManager,
jobs: Array<{ name: string; data: JobData; opts?: Record<string, unknown> }>,
chunkSize = 100
): Promise<unknown[]> {
const allCreatedJobs = [];
for (let i = 0; i < jobs.length; i += chunkSize) {
const chunk = jobs.slice(i, i + chunkSize);
try {
const createdJobs = await queue.addBulk(chunk);
allCreatedJobs.push(...createdJobs);
// Small delay between chunks to avoid overwhelming Redis
if (i + chunkSize < jobs.length) {
await new Promise(resolve => setTimeout(resolve, 100));
}
} catch (error) {
logger.error('Failed to add job chunk', {
startIndex: i,
chunkSize: chunk.length,
error,
});
}
}
return allCreatedJobs;
}

11
libs/queue/src/index.ts Normal file
View file

@ -0,0 +1,11 @@
export * from './batch-processor';
export * from './provider-registry';
export * from './queue-manager';
export * from './types';
// Re-export commonly used functions
export { initializeBatchCache, processBatchJob, processItems } from './batch-processor';
export { QueueManager } from './queue-manager';
export { providerRegistry } from './provider-registry';

View file

@ -0,0 +1,102 @@
import { getLogger } from '@stock-bot/logger';
import type { JobHandler, ProviderConfig } from './types';
const logger = getLogger('provider-registry');
class ProviderRegistry {
private providers = new Map<string, ProviderConfig>();
/**
* Register a provider with its operations
*/
register(providerName: string, config: ProviderConfig): void {
logger.info(`Registering provider: ${providerName}`, {
operations: Object.keys(config),
});
this.providers.set(providerName, config);
}
/**
* Get a handler for a specific provider and operation
*/
getHandler(provider: string, operation: string): JobHandler | null {
const providerConfig = this.providers.get(provider);
if (!providerConfig) {
logger.warn(`Provider not found: ${provider}`);
return null;
}
const handler = providerConfig[operation];
if (!handler) {
logger.warn(`Operation not found: ${provider}:${operation}`, {
availableOperations: Object.keys(providerConfig),
});
return null;
}
return handler;
}
/**
* Get all registered providers
*/
getProviders(): string[] {
return Array.from(this.providers.keys());
}
/**
* Get operations for a specific provider
*/
getOperations(provider: string): string[] {
const providerConfig = this.providers.get(provider);
return providerConfig ? Object.keys(providerConfig) : [];
}
/**
* Check if a provider exists
*/
hasProvider(provider: string): boolean {
return this.providers.has(provider);
}
/**
* Check if a provider has a specific operation
*/
hasOperation(provider: string, operation: string): boolean {
const providerConfig = this.providers.get(provider);
return providerConfig ? operation in providerConfig : false;
}
/**
* Remove a provider
*/
unregister(provider: string): boolean {
return this.providers.delete(provider);
}
/**
* Clear all providers
*/
clear(): void {
this.providers.clear();
}
/**
* Get registry statistics
*/
getStats(): { providers: number; totalOperations: number } {
let totalOperations = 0;
for (const config of this.providers.values()) {
totalOperations += Object.keys(config).length;
}
return {
providers: this.providers.size,
totalOperations,
};
}
}
// Export singleton instance
export const providerRegistry = new ProviderRegistry();

View file

@ -0,0 +1,312 @@
import { Queue, QueueEvents, Worker, type Job } from 'bullmq';
import { getLogger } from '@stock-bot/logger';
import { processBatchJob } from './batch-processor';
import { providerRegistry } from './provider-registry';
import type { JobData, ProviderConfig, QueueConfig } from './types';
const logger = getLogger('queue-manager');
export class QueueManager {
private queue!: Queue;
private workers: Worker[] = [];
private queueEvents!: QueueEvents;
private config: Required<QueueConfig>;
private get isInitialized() {
return !!this.queue;
}
/**
* Get the queue name
*/
get queueName(): string {
return this.config.queueName;
}
constructor(config: QueueConfig = {}) {
// Set default configuration
this.config = {
workers: config.workers || parseInt(process.env.WORKER_COUNT || '5'),
concurrency: config.concurrency || parseInt(process.env.WORKER_CONCURRENCY || '20'),
redis: {
host: config.redis?.host || process.env.DRAGONFLY_HOST || 'localhost',
port: config.redis?.port || parseInt(process.env.DRAGONFLY_PORT || '6379'),
password: config.redis?.password || process.env.DRAGONFLY_PASSWORD,
db: config.redis?.db || parseInt(process.env.DRAGONFLY_DB || '0'),
},
queueName: config.queueName || 'default-queue',
defaultJobOptions: {
removeOnComplete: 10,
removeOnFail: 5,
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
...config.defaultJobOptions,
},
};
}
/**
* Initialize the queue manager
*/
async initialize(): Promise<void> {
if (this.isInitialized) {
logger.warn('Queue manager already initialized');
return;
}
logger.info('Initializing queue manager...', {
queueName: this.config.queueName,
workers: this.config.workers,
concurrency: this.config.concurrency,
});
try {
const connection = this.getConnection();
const queueName = `{${this.config.queueName}}`;
// Initialize queue
this.queue = new Queue(queueName, {
connection,
defaultJobOptions: this.config.defaultJobOptions,
});
// Initialize queue events
this.queueEvents = new QueueEvents(queueName, { connection });
// Start workers
await this.startWorkers();
// Setup event listeners
this.setupEventListeners();
logger.info('Queue manager initialized successfully');
} catch (error) {
logger.error('Failed to initialize queue manager', { error });
throw error;
}
}
/**
* Register a provider with its operations
*/
registerProvider(providerName: string, config: ProviderConfig): void {
providerRegistry.register(providerName, config);
}
/**
* Add a single job to the queue
*/
async add(name: string, data: JobData, options: any = {}): Promise<Job> {
this.ensureInitialized();
return await this.queue.add(name, data, options);
}
/**
* Add multiple jobs to the queue in bulk
*/
async addBulk(jobs: Array<{ name: string; data: JobData; opts?: any }>): Promise<Job[]> {
this.ensureInitialized();
return await this.queue.addBulk(jobs);
}
/**
* Get queue statistics
*/
async getStats(): Promise<{
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
}> {
this.ensureInitialized();
const [waiting, active, completed, failed, delayed] = await Promise.all([
this.queue.getWaiting(),
this.queue.getActive(),
this.queue.getCompleted(),
this.queue.getFailed(),
this.queue.getDelayed(),
]);
return {
waiting: waiting.length,
active: active.length,
completed: completed.length,
failed: failed.length,
delayed: delayed.length,
};
}
/**
* Pause the queue
*/
async pause(): Promise<void> {
this.ensureInitialized();
await this.queue.pause();
logger.info('Queue paused');
}
/**
* Resume the queue
*/
async resume(): Promise<void> {
this.ensureInitialized();
await this.queue.resume();
logger.info('Queue resumed');
}
/**
* Clean completed and failed jobs
*/
async clean(grace: number = 0, limit: number = 100): Promise<void> {
this.ensureInitialized();
await Promise.all([
this.queue.clean(grace, limit, 'completed'),
this.queue.clean(grace, limit, 'failed'),
]);
logger.info('Queue cleaned', { grace, limit });
}
/**
* Get the queue name
*/
getQueueName(): string {
return this.config.queueName;
}
/**
* Shutdown the queue manager
*/
async shutdown(): Promise<void> {
logger.info('Shutting down queue manager...');
try {
// Close workers
await Promise.all(this.workers.map(worker => worker.close()));
this.workers = [];
// Close queue events
if (this.queueEvents) {
await this.queueEvents.close();
}
// Close queue
if (this.queue) {
await this.queue.close();
}
logger.info('Queue manager shutdown complete');
} catch (error) {
logger.error('Error during queue manager shutdown', { error });
throw error;
}
}
private getConnection() {
return {
host: this.config.redis.host,
port: this.config.redis.port,
password: this.config.redis.password,
db: this.config.redis.db,
};
}
private async startWorkers(): Promise<void> {
const connection = this.getConnection();
const queueName = `{${this.config.queueName}}`;
for (let i = 0; i < this.config.workers; i++) {
const worker = new Worker(queueName, this.processJob.bind(this), {
connection,
concurrency: this.config.concurrency,
});
worker.on('completed', job => {
logger.debug('Job completed', {
id: job.id,
name: job.name,
});
});
worker.on('failed', (job, err) => {
logger.error('Job failed', {
id: job?.id,
name: job?.name,
error: err.message,
});
});
this.workers.push(worker);
}
logger.info(`Started ${this.config.workers} workers`);
}
private async processJob(job: Job) {
const { provider, operation, payload }: JobData = job.data;
logger.info('Processing job', {
id: job.id,
provider,
operation,
payloadKeys: Object.keys(payload || {}),
});
try {
let result;
if (operation === 'process-batch-items') {
// Special handling for batch processing - requires queue manager instance
result = await processBatchJob(payload, this);
} else {
// Regular handler lookup
const handler = providerRegistry.getHandler(provider, operation);
if (!handler) {
throw new Error(`No handler found for ${provider}:${operation}`);
}
result = await handler(payload);
}
logger.info('Job completed successfully', {
id: job.id,
provider,
operation,
});
return result;
} catch (error) {
logger.error('Job processing failed', {
id: job.id,
provider,
operation,
error: error instanceof Error ? error.message : String(error),
});
throw error;
}
}
private setupEventListeners(): void {
this.queueEvents.on('completed', ({ jobId }) => {
logger.debug('Job completed event', { jobId });
});
this.queueEvents.on('failed', ({ jobId, failedReason }) => {
logger.warn('Job failed event', { jobId, failedReason });
});
this.queueEvents.on('stalled', ({ jobId }) => {
logger.warn('Job stalled event', { jobId });
});
}
private ensureInitialized(): void {
if (!this.isInitialized) {
throw new Error('Queue manager not initialized. Call initialize() first.');
}
}
}

68
libs/queue/src/types.ts Normal file
View file

@ -0,0 +1,68 @@
// Types for queue operations
export interface JobData {
type?: string;
provider: string;
operation: string;
payload: any;
priority?: number;
}
export interface ProcessOptions {
totalDelayMs: number;
batchSize?: number;
priority?: number;
useBatching?: boolean;
retries?: number;
ttl?: number;
removeOnComplete?: number;
removeOnFail?: number;
// Job routing information
provider?: string;
operation?: string;
// Optional queue for overloaded function signatures
queue?: any; // QueueManager reference
}
export interface BatchResult {
jobsCreated: number;
mode: 'direct' | 'batch';
totalItems: number;
batchesCreated?: number;
duration: number;
}
export interface QueueConfig {
workers?: number;
concurrency?: number;
redis?: {
host?: string;
port?: number;
password?: string;
db?: number;
};
queueName?: string;
defaultJobOptions?: {
removeOnComplete?: number;
removeOnFail?: number;
attempts?: number;
backoff?: {
type: string;
delay: number;
};
};
}
export interface JobHandler {
(payload: any): Promise<any>;
}
export interface ProviderConfig {
[operation: string]: JobHandler;
}
export interface BatchJobData {
payloadKey: string;
batchIndex: number;
totalBatches: number;
itemCount: number;
}