diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/exchanges.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/exchanges.action.ts deleted file mode 100644 index 6f85b12..0000000 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/exchanges.action.ts +++ /dev/null @@ -1,34 +0,0 @@ -/** - * QM Exchanges Operations - Simple exchange data fetching - */ - -import type { IServiceContainer } from '@stock-bot/handlers'; - -interface QMExchange { - _id?: string; - code: string; - name: string; - country: string; - currency: string; - timezone?: string; -} - -export async function fetchExchanges(services: IServiceContainer): Promise { - // Get exchanges from MongoDB - const exchanges = await services.mongodb - .collection('qm_exchanges') - .find({}) - .toArray(); - - return exchanges; -} - -export async function getExchangeByCode( - services: IServiceContainer, - code: string -): Promise { - // Get specific exchange by code - const exchange = await services.mongodb.collection('qm_exchanges').findOne({ code }); - - return exchange; -} diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/session.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/session.action.ts index f24cd65..e920814 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/session.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/session.action.ts @@ -43,7 +43,7 @@ export async function checkSessions( const neededSessions = SESSION_CONFIG.MAX_SESSIONS - currentCount; // Queue up to 10 at a time to avoid overwhelming the system - const toQueue = Math.min(neededSessions, 10); + const toQueue = Math.min(neededSessions, 20); for (let i = 0; i < toQueue; i++) { await this.scheduleOperation('create-session', { sessionId, sessionType }, { diff --git a/libs/core/queue/src/queue-manager.ts b/libs/core/queue/src/queue-manager.ts index 16bd92e..5df43fa 100644 --- a/libs/core/queue/src/queue-manager.ts +++ b/libs/core/queue/src/queue-manager.ts @@ -443,12 +443,12 @@ export class QueueManager { const queueShutdownPromises = Array.from(this.queues.values()).map(async queue => { try { // Add timeout to queue.close() to prevent hanging - await queue.close(); - // const timeoutPromise = new Promise((_, reject) => - // setTimeout(() => reject(new Error('Queue close timeout')), 100) - // ); + const closePromise = queue.close(); + const timeoutPromise = new Promise((_, reject) => + setTimeout(() => reject(new Error('Queue close timeout')), 5000) // 5 second timeout per queue + ); - // await Promise.race([closePromise, timeoutPromise]); + await Promise.race([closePromise, timeoutPromise]); } catch (error) { this.logger.warn('Error closing queue', { error: (error as Error).message }); } diff --git a/libs/core/queue/src/queue.ts b/libs/core/queue/src/queue.ts index 811d098..3af41a3 100644 --- a/libs/core/queue/src/queue.ts +++ b/libs/core/queue/src/queue.ts @@ -264,26 +264,49 @@ export class Queue { */ async close(): Promise { try { - // Close the queue itself - await this.bullQueue.close(); - this.logger.info('Queue closed', { queueName: this.queueName }); - - // Close queue events - if (this.queueEvents) { - await this.queueEvents.close(); - this.logger.debug('Queue events closed', { queueName: this.queueName }); - } - - // Close workers first + // Close workers first with timeout if (this.workers.length > 0) { - await Promise.all( - this.workers.map(async worker => { - return await worker.close(); - }) - ); + const workerClosePromises = this.workers.map(async (worker, idx) => { + try { + const closePromise = worker.close(); + const timeoutPromise = new Promise((resolve) => { + setTimeout(() => { + this.logger.warn('Worker close timeout, forcing closure', { + queueName: this.queueName, + workerId: idx + }); + resolve(); + }, 2000); // 2 second timeout per worker + }); + + await Promise.race([closePromise, timeoutPromise]); + } catch (error) { + this.logger.warn('Error closing worker', { + queueName: this.queueName, + workerId: idx, + error + }); + } + }); + + await Promise.all(workerClosePromises); this.workers = []; this.logger.debug('Workers closed', { queueName: this.queueName }); } + + // Close queue events + if (this.queueEvents) { + try { + await this.queueEvents.close(); + this.logger.debug('Queue events closed', { queueName: this.queueName }); + } catch (error) { + this.logger.warn('Error closing queue events', { queueName: this.queueName, error }); + } + } + + // Close the queue itself + await this.bullQueue.close(); + this.logger.info('Queue closed', { queueName: this.queueName }); } catch (error) { this.logger.error('Error closing queue', { queueName: this.queueName, error }); throw error;