From 5c87f068d6e28d1a26816aaea0cc31f69af36e03 Mon Sep 17 00:00:00 2001 From: Boki Date: Mon, 23 Jun 2025 14:36:47 -0400 Subject: [PATCH] added logging --- apps/stock/config/config/default.json | 11 +++++++--- .../process-individual-symbol.action.ts | 2 +- .../actions/update-unique-symbols.action.ts | 2 +- apps/stock/data-ingestion/src/index.ts | 3 +++ apps/stock/data-pipeline/src/index.ts | 3 +++ apps/stock/web-api/src/index.ts | 4 +++- docker-compose.yml | 2 +- libs/services/queue/src/service-registry.ts | 20 +++++++++++++------ .../services/queue/src/smart-queue-manager.ts | 5 +++-- libs/services/queue/src/types.ts | 2 +- 10 files changed, 38 insertions(+), 16 deletions(-) diff --git a/apps/stock/config/config/default.json b/apps/stock/config/config/default.json index ecc76ef..902d26b 100644 --- a/apps/stock/config/config/default.json +++ b/apps/stock/config/config/default.json @@ -77,8 +77,8 @@ "port": 6379, "db": 1 }, - "workers": 2, - "concurrency": 2, + "workers": 1, + "concurrency": 1, "enableScheduledJobs": true, "delayWorkerStart": false, "defaultJobOptions": { @@ -111,8 +111,13 @@ "timeout": 30000 }, "proxy": { + "enabled": true, "cachePrefix": "proxy:", - "ttl": 3600 + "ttl": 3600, + "webshare": { + "apiKey": "y8ay534rcbybdkk3evnzmt640xxfhy7252ce2t98", + "apiUrl": "https://proxy.webshare.io/api/v2/" + } }, "providers": { "yahoo": { diff --git a/apps/stock/data-ingestion/src/handlers/ceo/actions/process-individual-symbol.action.ts b/apps/stock/data-ingestion/src/handlers/ceo/actions/process-individual-symbol.action.ts index f721b4c..58096c6 100644 --- a/apps/stock/data-ingestion/src/handlers/ceo/actions/process-individual-symbol.action.ts +++ b/apps/stock/data-ingestion/src/handlers/ceo/actions/process-individual-symbol.action.ts @@ -98,7 +98,7 @@ export async function processIndividualSymbol( await this.scheduleOperation('process-individual-symbol', { ceoId: ceoId, timestamp: latestSpielTime, - }, {priority: 1}); + }, {priority: 0}); } this.logger.info( diff --git a/apps/stock/data-ingestion/src/handlers/ceo/actions/update-unique-symbols.action.ts b/apps/stock/data-ingestion/src/handlers/ceo/actions/update-unique-symbols.action.ts index 5f8bb59..024e104 100644 --- a/apps/stock/data-ingestion/src/handlers/ceo/actions/update-unique-symbols.action.ts +++ b/apps/stock/data-ingestion/src/handlers/ceo/actions/update-unique-symbols.action.ts @@ -34,7 +34,7 @@ export async function updateUniqueSymbols( await this.scheduleOperation('process-individual-symbol', { ceoId: symbol.ceoId, symbol: symbol.symbol, - }); + }, {priority: 10 }); scheduledJobs++; // Add small delay to avoid overwhelming the queue diff --git a/apps/stock/data-ingestion/src/index.ts b/apps/stock/data-ingestion/src/index.ts index aa9920b..2084612 100644 --- a/apps/stock/data-ingestion/src/index.ts +++ b/apps/stock/data-ingestion/src/index.ts @@ -16,6 +16,9 @@ import { createRoutes } from './routes/create-routes'; // Initialize configuration with service-specific overrides const config = initializeStockConfig('dataIngestion'); +// Log the full configuration +const logger = getLogger('data-ingestion'); +logger.info('Service configuration:', config); // Create service application const app = new ServiceApplication( diff --git a/apps/stock/data-pipeline/src/index.ts b/apps/stock/data-pipeline/src/index.ts index 4ba3ed1..cd567ca 100644 --- a/apps/stock/data-pipeline/src/index.ts +++ b/apps/stock/data-pipeline/src/index.ts @@ -15,6 +15,9 @@ import { setupServiceContainer } from './container-setup'; // Initialize configuration with service-specific overrides const config = initializeStockConfig('dataPipeline'); +// Log the full configuration +const logger = getLogger('data-pipeline'); +logger.info('Service configuration:', config); // Create service application const app = new ServiceApplication( diff --git a/apps/stock/web-api/src/index.ts b/apps/stock/web-api/src/index.ts index 4058aab..02192b8 100644 --- a/apps/stock/web-api/src/index.ts +++ b/apps/stock/web-api/src/index.ts @@ -21,7 +21,9 @@ if (config.queue) { config.queue.delayWorkerStart = true; } - +// Log the full configuration +const logger = getLogger('web-api'); +logger.info('Service configuration:', config); // Create service application const app = new ServiceApplication( diff --git a/docker-compose.yml b/docker-compose.yml index 506cb42..c89a242 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -213,7 +213,7 @@ services: # Dragonfly - Redis replacement for caching and events - REDIS_HOST=dragonfly - REDIS_PORT=6379 - REDIS_PASSWORD= - - REDIS_DB=0 + - REDIS_DB=1 - REDIS_URL=redis://dragonfly:6379 depends_on: - dragonfly diff --git a/libs/services/queue/src/service-registry.ts b/libs/services/queue/src/service-registry.ts index 5d14a3c..4891392 100644 --- a/libs/services/queue/src/service-registry.ts +++ b/libs/services/queue/src/service-registry.ts @@ -86,22 +86,30 @@ export function findServiceForHandler(handlerName: string): string | undefined { } /** - * Get full queue name with service namespace + * Get full queue name - just the handler name since each service has its own Redis DB */ export function getFullQueueName(serviceName: string, handlerName: string): string { - return `${serviceName}_${handlerName}`; + // Just return the handler name since DB isolation provides namespace separation + return handlerName; } /** * Parse a full queue name into service and handler + * Since queue names are just handler names now, we need to find the service from the handler */ export function parseQueueName(fullQueueName: string): { service: string; handler: string } | null { - const parts = fullQueueName.split('_'); - if (parts.length !== 2 || !parts[0] || !parts[1]) { + // Queue name is just the handler name now + const handlerName = fullQueueName; + + // Find which service owns this handler + const serviceName = findServiceForHandler(handlerName); + + if (!serviceName) { return null; } + return { - service: parts[0], - handler: parts[1], + service: serviceName, + handler: handlerName, }; } \ No newline at end of file diff --git a/libs/services/queue/src/smart-queue-manager.ts b/libs/services/queue/src/smart-queue-manager.ts index cb80a78..9714132 100644 --- a/libs/services/queue/src/smart-queue-manager.ts +++ b/libs/services/queue/src/smart-queue-manager.ts @@ -212,7 +212,7 @@ export class SmartQueueManager extends QueueManager { * Resolve a queue name to a route */ private resolveQueueRoute(queueName: string): QueueRoute | null { - // Check if it's a full name (service_handler) + // Check if it's a handler name (which is now the full queue name) const parsed = parseQueueName(queueName); if (parsed) { const config = getServiceConfig(parsed.service); @@ -253,7 +253,8 @@ export class SmartQueueManager extends QueueManager { private getProducerQueue(route: QueueRoute): BullQueue { if (!this.producerQueues.has(route.fullName)) { const connection = this.getConnection(route.db); - const queue = new BullQueue(route.fullName, { + // Match the queue name format used by workers: {queueName} + const queue = new BullQueue(`{${route.fullName}}`, { connection, defaultJobOptions: this.getConfig().defaultQueueOptions?.defaultJobOptions || {}, }); diff --git a/libs/services/queue/src/types.ts b/libs/services/queue/src/types.ts index ae600c7..0ca8312 100644 --- a/libs/services/queue/src/types.ts +++ b/libs/services/queue/src/types.ts @@ -175,7 +175,7 @@ export interface SmartQueueConfig extends QueueManagerConfig { } export interface QueueRoute { - /** Full queue name (e.g., 'data-ingestion_ceo') */ + /** Full queue name (now just the handler name, e.g., 'ceo') */ fullName: string; /** Service that owns this queue */ service: string;