diff --git a/libs/queue/src/index.ts b/libs/queue/src/index.ts index 77af269..8786b37 100644 --- a/libs/queue/src/index.ts +++ b/libs/queue/src/index.ts @@ -7,14 +7,7 @@ export { handlerRegistry } from './handler-registry'; export { processBatchJob, processItems } from './batch-processor'; // Queue factory functions -export { - initializeQueueSystem, - getQueue, - processItemsWithQueue, - getActiveQueueNames, - getQueueManager, - shutdownAllQueues -} from './queue-factory'; +// QueueFactory removed - use QueueManager directly // DLQ handling export { DeadLetterQueueHandler, DeadLetterQueueHandler as DLQHandler } from './dlq-handler'; diff --git a/libs/queue/src/queue-factory.ts b/libs/queue/src/queue-factory.ts deleted file mode 100644 index d1f7c55..0000000 --- a/libs/queue/src/queue-factory.ts +++ /dev/null @@ -1,82 +0,0 @@ -import { getLogger } from '@stock-bot/logger'; -import { QueueManager } from './queue-manager'; -import { Queue } from './queue'; -import { processItems } from './batch-processor'; -import type { ProcessOptions, BatchResult, QueueManagerConfig, RedisConfig, JobOptions } from './types'; - -const logger = getLogger('queue-factory'); - -/** - * Initialize the queue system with global configuration - * This now uses the singleton QueueManager pattern - */ -export async function initializeQueueSystem(config: { - redis: RedisConfig; - defaultJobOptions?: JobOptions; - workers?: number; - concurrency?: number; -}): Promise { - logger.info('Initializing global queue system...'); - - const queueManagerConfig: QueueManagerConfig = { - redis: config.redis, - defaultQueueOptions: { - defaultJobOptions: config.defaultJobOptions, - workers: config.workers || 5, - concurrency: config.concurrency || 20, - }, - }; - - // Initialize the singleton QueueManager - QueueManager.initialize(queueManagerConfig); - - logger.info('Queue system initialized with singleton QueueManager'); -} - -/** - * Get or create a queue for the given queue name - * Now uses the singleton QueueManager - */ -export function getQueue(queueName: string): Queue { - const queueManager = QueueManager.getInstance(); - return queueManager.getQueue(queueName); -} - -/** - * Process items using the specified queue - * Now uses the batch processor directly - */ -export async function processItemsWithQueue( - queueName: string, - items: T[], - options: ProcessOptions -): Promise { - return processItems(items, queueName, options); -} - -/** - * Get all active queue names - */ -export function getActiveQueueNames(): string[] { - const queueManager = QueueManager.getInstance(); - return queueManager.getQueueNames(); -} - -/** - * Get the global queue manager (for advanced operations) - */ -export function getQueueManager(): QueueManager { - return QueueManager.getInstance(); -} - -/** - * Shutdown all queues and the queue manager - */ -export async function shutdownAllQueues(): Promise { - logger.info('Shutting down all queues...'); - - // Reset the singleton QueueManager (handles all cleanup) - await QueueManager.reset(); - - logger.info('All queues shut down'); -} \ No newline at end of file