diff --git a/apps/stock/data-ingestion/src/handlers/ceo/actions/process-individual-short.action.ts b/apps/stock/data-ingestion/src/handlers/ceo/actions/process-individual-short.action.ts new file mode 100644 index 0000000..d5257c4 --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/ceo/actions/process-individual-short.action.ts @@ -0,0 +1,55 @@ + +import { getRandomUserAgent } from '@stock-bot/utils'; +import type { CeoHandler } from '../ceo.handler'; + +export async function processIndividualSymbol( + this: CeoHandler, + payload: any, + _context: any +): Promise { + const { ceoId, symbol, timestamp } = payload; + const proxy = this.proxy?.getProxy(); + if (!proxy) { + this.logger.warn('No proxy available for processing individual CEO symbol'); + return; + } + + this.logger.debug(`Processing individual CEO symbol for ${symbol}`, { + ceoId, + timestamp, + }); + try { + // Update Shorts + const response = await this.http.get( + `https://api.ceo.ca/api/short_positions/one?symbol=${symbol}`, + { + proxy: proxy, + headers: { + 'User-Agent': getRandomUserAgent(), + }, + } + ); + let shortCount = 0; + + if (response.ok) { + const shortData = await response.json(); + if (shortData && shortData.positions) { + shortCount = shortData.positions.length; + await this.mongodb.batchUpsert('ceoShorts', shortData.positions, ['id']); + } + } + + this.logger.info( + `Successfully processed CEO symbol ${symbol} shorts and found ${shortCount} positions`, + ); + + return { ceoId, shortCount, timestamp }; + } catch (error) { + this.logger.error(`Failed to process individual symbol ${symbol}`, { + error, + ceoId, + timestamp, + }); + throw error; + } +} diff --git a/apps/stock/data-ingestion/src/handlers/ceo/actions/process-individual-symbol.action.ts b/apps/stock/data-ingestion/src/handlers/ceo/actions/process-individual-symbol.action.ts index e8e57da..8e44c17 100644 --- a/apps/stock/data-ingestion/src/handlers/ceo/actions/process-individual-symbol.action.ts +++ b/apps/stock/data-ingestion/src/handlers/ceo/actions/process-individual-symbol.action.ts @@ -39,6 +39,7 @@ export async function processIndividualSymbol( const spielCount = data.spiels.length; if (spielCount === 0) { this.logger.warn(`No spiels found for ceoId ${ceoId}`); + await this.mongodb.updateMany('ceoChannels', { ceoId }, { $set: { lastSpielTime: timestamp, finished: true } }); return null; // No data to process } const latestSpielTime = data.spiels[0]?.timestamp; @@ -76,35 +77,18 @@ export async function processIndividualSymbol( })); await this.mongodb.batchUpsert('ceoPosts', posts, ['spielId']); + await this.mongodb.updateMany('ceoChannels', { ceoId }, { $set: { lastSpielTime: latestSpielTime } }); this.logger.info(`Fetched ${spielCount} spiels for ceoId ${ceoId}`); - // Update Shorts - const shortRes = await this.http.get( - `https://api.ceo.ca/api/short_positions/one?symbol=${symbol}`, + await this.scheduleOperation( + 'process-individual-symbol', { - proxy: proxy, - headers: { - 'User-Agent': getRandomUserAgent(), - }, - } + ceoId: ceoId, + timestamp: latestSpielTime, + }, + { priority: 0 } ); - if (shortRes.ok) { - const shortData = await shortRes.json(); - if (shortData && shortData.positions) { - await this.mongodb.batchUpsert('ceoShorts', shortData.positions, ['id']); - } - - await this.scheduleOperation( - 'process-individual-symbol', - { - ceoId: ceoId, - timestamp: latestSpielTime, - }, - { priority: 0 } - ); - } - this.logger.info( `Successfully processed channel ${ceoId} and added channel ${ceoId} at timestamp ${latestSpielTime}` ); diff --git a/apps/stock/web-api/src/services/monitoring.service.ts b/apps/stock/web-api/src/services/monitoring.service.ts index b1f7e4c..6bcda1e 100644 --- a/apps/stock/web-api/src/services/monitoring.service.ts +++ b/apps/stock/web-api/src/services/monitoring.service.ts @@ -100,6 +100,16 @@ export class MonitoringService { symbols: 'data-pipeline', }; + // Worker configuration per queue (from service configs) + const workerConfig: Record = { + qm: { count: 1, concurrency: 2 }, + ib: { count: 1, concurrency: 1 }, + ceo: { count: 1, concurrency: 2 }, + webshare: { count: 1, concurrency: 1 }, + exchanges: { count: 1, concurrency: 1 }, + symbols: { count: 1, concurrency: 2 }, + }; + const queueNames = Object.keys(handlerMapping); this.logger.debug('Using known queue names', { count: queueNames.length, names: queueNames }); @@ -124,6 +134,10 @@ export class MonitoringService { // Get stats directly from BullMQ const queueStats = await this.getQueueStatsForBullQueue(bullQueue, handlerName); + // Get actual worker count from BullMQ + const actualWorkerCount = await this.getActiveWorkerCountFromBullMQ(bullQueue); + const configuredWorkers = workerConfig[handlerName] || { count: 0, concurrency: 1 }; + stats.push({ name: handlerName, service: serviceName, @@ -131,8 +145,8 @@ export class MonitoringService { connected: true, jobs: queueStats, workers: { - count: 0, - concurrency: 1, + count: actualWorkerCount, + concurrency: configuredWorkers.concurrency, }, }); @@ -790,4 +804,30 @@ export class MonitoringService { percentage: (usedMem / totalMem) * 100, }; } + + /** + * Get active worker count from Redis + */ + /** + * Get active worker count using BullMQ's built-in tracking + */ + private async getActiveWorkerCountFromBullMQ(bullQueue: any): Promise { + try { + // Use BullMQ's built-in getWorkers method + if (bullQueue.getWorkers && typeof bullQueue.getWorkers === 'function') { + const workers = await bullQueue.getWorkers(); + return workers.length; + } + + // Fallback to getWorkersCount if available + if (bullQueue.getWorkersCount && typeof bullQueue.getWorkersCount === 'function') { + return await bullQueue.getWorkersCount(); + } + + return 0; + } catch (error) { + this.logger.debug('Failed to get active worker count from BullMQ', { error }); + return 0; + } + } } diff --git a/libs/core/queue/src/queue-manager.ts b/libs/core/queue/src/queue-manager.ts index 104b95a..8b4a603 100644 --- a/libs/core/queue/src/queue-manager.ts +++ b/libs/core/queue/src/queue-manager.ts @@ -152,6 +152,7 @@ export class QueueManager { concurrency, startWorker: workers > 0 && !this.config.delayWorkerStart, handlerRegistry: options.handlerRegistry || this.handlerRegistry, + serviceName: this.config.serviceName, }; const queue = new Queue( diff --git a/libs/core/queue/src/queue.ts b/libs/core/queue/src/queue.ts index ebdb572..4bebe06 100644 --- a/libs/core/queue/src/queue.ts +++ b/libs/core/queue/src/queue.ts @@ -19,6 +19,7 @@ export interface QueueWorkerConfig { concurrency?: number; startWorker?: boolean; handlerRegistry?: HandlerRegistry; + serviceName?: string; } /** @@ -33,6 +34,7 @@ export class Queue { private redisConfig: RedisConfig; private readonly logger: Logger; private readonly handlerRegistry?: HandlerRegistry; + private serviceName?: string; constructor( queueName: string, @@ -45,9 +47,11 @@ export class Queue { this.redisConfig = redisConfig; this.logger = logger || console; this.handlerRegistry = config.handlerRegistry; + this.serviceName = config.serviceName; this.logger.debug('Queue constructor called', { queueName, + serviceName: this.serviceName, hasHandlerRegistry: !!config.handlerRegistry, handlerRegistryType: config.handlerRegistry ? typeof config.handlerRegistry : 'undefined', configKeys: Object.keys(config), @@ -176,6 +180,9 @@ export class Queue { ]); const isPaused = await this.bullQueue.isPaused(); + + // Get actual active worker count from BullMQ + const activeWorkerCount = await this.getActiveWorkerCount(); return { waiting: waiting.length, @@ -184,7 +191,7 @@ export class Queue { failed: failed.length, delayed: delayed.length, paused: isPaused, - workers: this.workers.length, + workers: activeWorkerCount, // Use BullMQ's tracked count instead of local array }; } @@ -307,11 +314,13 @@ export class Queue { concurrency, maxStalledCount: 3, stalledInterval: 30000, + // Add a name to identify the worker + name: `${this.serviceName || 'unknown'}_worker_${i}`, }); this.logger.info(`Starting worker ${i + 1}/${workerCount} for queue`, { queueName: this.queueName, - workerId: i, + workerName: worker.name, concurrency, }); @@ -438,4 +447,69 @@ export class Queue { getWorkerCount(): number { return this.workers.length; } + + /** + * Get active workers from BullMQ + * This uses BullMQ's built-in worker tracking + */ + async getActiveWorkers(): Promise { + try { + const workers = await this.bullQueue.getWorkers(); + return workers; + } catch (error) { + this.logger.error('Failed to get active workers', { + queueName: this.queueName, + error + }); + return []; + } + } + + /** + * Get count of active workers from BullMQ + */ + async getActiveWorkerCount(): Promise { + try { + const workers = await this.bullQueue.getWorkers(); + return workers.length; + } catch (error) { + this.logger.error('Failed to get active worker count', { + queueName: this.queueName, + error + }); + return 0; + } + } + + /** + * Get detailed worker information + */ + async getWorkerDetails(): Promise> { + try { + const workers = await this.bullQueue.getWorkers(); + return workers.map(worker => ({ + id: worker.id || 'unknown', + name: worker.name, + addr: worker.addr, + age: typeof worker.age === 'string' ? parseInt(worker.age) : worker.age, + idle: typeof worker.idle === 'string' ? parseInt(worker.idle) : worker.idle, + started: typeof worker.started === 'string' ? parseInt(worker.started) : worker.started, + })); + } catch (error) { + this.logger.error('Failed to get worker details', { + queueName: this.queueName, + error + }); + return []; + } + } + + }