From 2db7c0dc368e90d7b313b8b341cb8a1739d69319 Mon Sep 17 00:00:00 2001 From: Boki Date: Wed, 18 Jun 2025 21:17:32 -0400 Subject: [PATCH] trying to fix build --- libs/queue/src/queue-instance.ts | 96 +++++++++++++++++++++++++++++++- libs/queue/src/queue-manager.ts | 21 +++---- scripts/build-libs.sh | 32 ++++++----- 3 files changed, 119 insertions(+), 30 deletions(-) diff --git a/libs/queue/src/queue-instance.ts b/libs/queue/src/queue-instance.ts index 9cb68fe..817c10b 100644 --- a/libs/queue/src/queue-instance.ts +++ b/libs/queue/src/queue-instance.ts @@ -1,7 +1,8 @@ import { Queue as BullQueue, Worker, QueueEvents, type Job } from 'bullmq'; import { getLogger } from '@stock-bot/logger'; -import { processItems } from './batch-processor'; -import type { JobData, ProcessOptions, BatchResult } from './types'; +import { processItems, processBatchJob } from './batch-processor'; +import { providerRegistry } from './provider-registry'; +import type { JobData, ProcessOptions, BatchResult, BatchJobData } from './types'; const logger = getLogger('queue-instance'); @@ -40,6 +41,9 @@ export class Queue { // Initialize queue events this.queueEvents = new QueueEvents(`{${queueName}}`, { connection }); + + // Start a worker for this queue + this.startWorker(); } /** @@ -177,6 +181,94 @@ export class Queue { } } + /** + * Start a worker for this queue + */ + private startWorker(): void { + const connection = { + host: this.redisConfig.host, + port: this.redisConfig.port, + password: this.redisConfig.password, + db: this.redisConfig.db, + }; + + const worker = new Worker(`{${this.queueName}}`, this.processJob.bind(this), { + connection, + concurrency: 20, + }); + + worker.on('completed', job => { + logger.debug('Job completed', { + id: job.id, + name: job.name, + queue: this.queueName, + }); + }); + + worker.on('failed', (job, err) => { + logger.error('Job failed', { + id: job?.id, + name: job?.name, + queue: this.queueName, + error: err.message, + }); + }); + + this.workers.push(worker); + logger.info(`Started worker for queue: ${this.queueName}`); + } + + /** + * Process a job + */ + private async processJob(job: Job) { + const { provider, operation, payload }: JobData = job.data; + + logger.info('Processing job', { + id: job.id, + provider, + operation, + queue: this.queueName, + payloadKeys: Object.keys(payload || {}), + }); + + try { + let result; + + if (operation === 'process-batch-items') { + // Special handling for batch processing + result = await processBatchJob(payload as BatchJobData, this); + } else { + // Regular handler lookup + const handler = providerRegistry.getHandler(provider, operation); + + if (!handler) { + throw new Error(`No handler found for ${provider}:${operation}`); + } + + result = await handler(payload); + } + + logger.info('Job completed successfully', { + id: job.id, + provider, + operation, + queue: this.queueName, + }); + + return result; + } catch (error) { + logger.error('Job processing failed', { + id: job.id, + provider, + operation, + queue: this.queueName, + error: error instanceof Error ? error.message : String(error), + }); + throw error; + } + } + /** * Get the BullMQ queue instance (for advanced operations) */ diff --git a/libs/queue/src/queue-manager.ts b/libs/queue/src/queue-manager.ts index 85db012..2e0a936 100644 --- a/libs/queue/src/queue-manager.ts +++ b/libs/queue/src/queue-manager.ts @@ -96,9 +96,7 @@ export class QueueManager { // Step 4: Setup event listeners this.setupEventListeners(); - // Step 5: Initialize batch cache - const { initializeBatchCache } = await import('./batch-processor'); - await initializeBatchCache(this); + // Step 5: Batch cache will be initialized by individual Queue instances // Step 6: Set up scheduled jobs if (this.enableScheduledJobs) { @@ -373,20 +371,15 @@ export class QueueManager { try { let result; - if (operation === 'process-batch-items') { - // Special handling for batch processing - requires queue manager instance - result = await processBatchJob(payload, this); - } else { - // Regular handler lookup - const handler = providerRegistry.getHandler(provider, operation); + // Regular handler lookup + const handler = providerRegistry.getHandler(provider, operation); - if (!handler) { - throw new Error(`No handler found for ${provider}:${operation}`); - } - - result = await handler(payload); + if (!handler) { + throw new Error(`No handler found for ${provider}:${operation}`); } + result = await handler(payload); + logger.info('Job completed successfully', { id: job.id, provider, diff --git a/scripts/build-libs.sh b/scripts/build-libs.sh index fb92491..8df631f 100755 --- a/scripts/build-libs.sh +++ b/scripts/build-libs.sh @@ -31,26 +31,28 @@ trap cleanup EXIT # Build order is important due to dependencies libs=( + # Core Libraries "types" # Base types - no dependencies "config" # Configuration - depends on types - "config" # Configuration - depends on types "logger" # Logging utilities - depends on types "utils" # Utilities - depends on types and config - # Database clients - "postgres-client" # PostgreSQL client - depends on types, config, logger - # "mongodb-client" # MongoDB client - depends on types, config, logger (temporarily disabled - needs zod->yup conversion) - "questdb-client" # QuestDB client - depends on types, config, logger + + # # Database clients + # "postgres-client" # PostgreSQL client - depends on types, config, logger + # "mongodb-client" # MongoDB client - depends on types, config, logger + # "questdb-client" # QuestDB client - depends on types, config, logger - # Service libraries - "cache" # Cache - depends on types and logger - "http" # HTTP client - depends on types, config, logger - "event-bus" # Event bus - depends on types, logger - "shutdown" # Shutdown - depends on types, logger + # # Service libraries + # "cache" # Cache - depends on types and logger + # "http" # HTTP client - depends on types, config, logger + # "event-bus" # Event bus - depends on types, logger + # # "queue" # Queue - depends on types, logger, cache + # "shutdown" # Shutdown - depends on types, logger - # Engine libraries - "data-frame" # Data frame - depends on types, utils - "vector-engine" # Vector engine - depends on types, utils, data-frame - "strategy-engine" # Strategy engine - depends on types, utils, event-bus + # # Engine libraries + # "data-frame" # Data frame - depends on types, utils + # "vector-engine" # Vector engine - depends on types, utils, data-frame + # "strategy-engine" # Strategy engine - depends on types, utils, event-bus ) # Build each library in order @@ -59,6 +61,8 @@ for lib in "${libs[@]}"; do echo -e "${GREEN}Building $lib...${NC}" cd "$lib_path" + # print the current working directory + rm -rf dist tsconfig.tsbuildinfo bun run build if [ $? -ne 0 ]; then