From 9c072f91f19a0b4f794c3534c01507d35d12862a Mon Sep 17 00:00:00 2001 From: Boki Date: Tue, 10 Jun 2025 22:28:56 -0400 Subject: [PATCH] cleanup old init code on batcher --- apps/data-service/src/config/app.config.ts | 0 apps/data-service/src/index.ts | 6 +++ apps/data-service/src/utils/batch-helpers.ts | 54 +++++--------------- 3 files changed, 20 insertions(+), 40 deletions(-) create mode 100644 apps/data-service/src/config/app.config.ts diff --git a/apps/data-service/src/config/app.config.ts b/apps/data-service/src/config/app.config.ts new file mode 100644 index 0000000..e69de29 diff --git a/apps/data-service/src/index.ts b/apps/data-service/src/index.ts index dd3fde2..49541ae 100644 --- a/apps/data-service/src/index.ts +++ b/apps/data-service/src/index.ts @@ -6,6 +6,7 @@ import { loadEnvVariables } from '@stock-bot/config'; import { Hono } from 'hono'; import { onShutdown, setShutdownTimeout } from '@stock-bot/shutdown'; import { queueManager } from './services/queue.service'; +import { initializeBatchCache } from './utils/batch-helpers'; import { healthRoutes, queueRoutes, @@ -34,6 +35,11 @@ async function initializeServices() { logger.info('Initializing data service...'); try { + // Initialize batch cache FIRST - before queue service + logger.info('Starting batch cache initialization...'); + await initializeBatchCache(); + logger.info('Batch cache initialized'); + // Initialize queue service (Redis connections should be ready now) logger.info('Starting queue service initialization...'); await queueManager.initialize(); diff --git a/apps/data-service/src/utils/batch-helpers.ts b/apps/data-service/src/utils/batch-helpers.ts index 80f3302..e48aeb5 100644 --- a/apps/data-service/src/utils/batch-helpers.ts +++ b/apps/data-service/src/utils/batch-helpers.ts @@ -30,8 +30,6 @@ export interface BatchResult { // Cache instance for payload storage let cacheProvider: CacheProvider | null = null; -let cacheInitialized = false; -let cacheInitPromise: Promise | null = null; function getCache(): CacheProvider { if (!cacheProvider) { @@ -44,27 +42,15 @@ function getCache(): CacheProvider { return cacheProvider; } -async function ensureCacheReady(): Promise { - if (cacheInitialized) { - return; - } - - if (cacheInitPromise) { - return cacheInitPromise; - } - - cacheInitPromise = (async () => { - const cache = getCache(); - try { - await cache.waitForReady(10000); - cacheInitialized = true; - } catch (error) { - logger.warn('Cache initialization timeout, proceeding anyway', { error }); - // Don't throw - let operations continue with potential fallback - } - })(); - - return cacheInitPromise; +/** + * Initialize the batch cache before any batch operations + * This should be called during application startup + */ +export async function initializeBatchCache(): Promise { + logger.info('Initializing batch cache...'); + const cache = getCache(); + await cache.waitForReady(10000); + logger.info('Batch cache initialized successfully'); } /** @@ -238,11 +224,12 @@ export async function processBatchJob(jobData: any, queue: QueueService): Promis logger.error('Invalid payload data', { payloadKey, payload }); throw new Error(`Invalid payload data for key: ${payloadKey}`); } + const { items, processorStr, options } = payload; // Deserialize the processor function const processor = new Function('return ' + processorStr)(); - + const jobs = items.map((item: any, index: number) => ({ name: 'process-item', data: { @@ -292,9 +279,6 @@ async function storePayload( processor: (item: T, index: number) => any, options: ProcessOptions ): Promise { - // Ensure cache is ready using shared initialization - await ensureCacheReady(); - const cache = getCache(); const key = `payload_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; @@ -315,8 +299,7 @@ async function storePayload( logger.debug('Storing batch payload', { key, - itemCount: items.length, - cacheReady: cache.isReady() + itemCount: items.length }); await cache.set(key, payload, options.ttl || 86400); @@ -330,23 +313,14 @@ async function storePayload( } async function loadPayload(key: string): Promise { - // Ensure cache is ready using shared initialization - await ensureCacheReady(); - const cache = getCache(); - logger.debug('Loading batch payload', { - key, - cacheReady: cache.isReady() - }); + logger.debug('Loading batch payload', { key }); const data = await cache.get(key); if (!data) { - logger.error('Payload not found in cache', { - key, - cacheReady: cache.isReady() - }); + logger.error('Payload not found in cache', { key }); throw new Error(`Payload not found: ${key}`); }