cleanup old init code on batcher

This commit is contained in:
Boki 2025-06-10 22:28:56 -04:00
parent 47ff92b567
commit ed326c025e
3 changed files with 20 additions and 40 deletions

View file

@ -6,6 +6,7 @@ import { loadEnvVariables } from '@stock-bot/config';
import { Hono } from 'hono'; import { Hono } from 'hono';
import { onShutdown, setShutdownTimeout } from '@stock-bot/shutdown'; import { onShutdown, setShutdownTimeout } from '@stock-bot/shutdown';
import { queueManager } from './services/queue.service'; import { queueManager } from './services/queue.service';
import { initializeBatchCache } from './utils/batch-helpers';
import { import {
healthRoutes, healthRoutes,
queueRoutes, queueRoutes,
@ -34,6 +35,11 @@ async function initializeServices() {
logger.info('Initializing data service...'); logger.info('Initializing data service...');
try { try {
// Initialize batch cache FIRST - before queue service
logger.info('Starting batch cache initialization...');
await initializeBatchCache();
logger.info('Batch cache initialized');
// Initialize queue service (Redis connections should be ready now) // Initialize queue service (Redis connections should be ready now)
logger.info('Starting queue service initialization...'); logger.info('Starting queue service initialization...');
await queueManager.initialize(); await queueManager.initialize();

View file

@ -30,8 +30,6 @@ export interface BatchResult {
// Cache instance for payload storage // Cache instance for payload storage
let cacheProvider: CacheProvider | null = null; let cacheProvider: CacheProvider | null = null;
let cacheInitialized = false;
let cacheInitPromise: Promise<void> | null = null;
function getCache(): CacheProvider { function getCache(): CacheProvider {
if (!cacheProvider) { if (!cacheProvider) {
@ -44,27 +42,15 @@ function getCache(): CacheProvider {
return cacheProvider; return cacheProvider;
} }
async function ensureCacheReady(): Promise<void> { /**
if (cacheInitialized) { * Initialize the batch cache before any batch operations
return; * This should be called during application startup
} */
export async function initializeBatchCache(): Promise<void> {
if (cacheInitPromise) { logger.info('Initializing batch cache...');
return cacheInitPromise; const cache = getCache();
} await cache.waitForReady(10000);
logger.info('Batch cache initialized successfully');
cacheInitPromise = (async () => {
const cache = getCache();
try {
await cache.waitForReady(10000);
cacheInitialized = true;
} catch (error) {
logger.warn('Cache initialization timeout, proceeding anyway', { error });
// Don't throw - let operations continue with potential fallback
}
})();
return cacheInitPromise;
} }
/** /**
@ -238,11 +224,12 @@ export async function processBatchJob(jobData: any, queue: QueueService): Promis
logger.error('Invalid payload data', { payloadKey, payload }); logger.error('Invalid payload data', { payloadKey, payload });
throw new Error(`Invalid payload data for key: ${payloadKey}`); throw new Error(`Invalid payload data for key: ${payloadKey}`);
} }
const { items, processorStr, options } = payload; const { items, processorStr, options } = payload;
// Deserialize the processor function // Deserialize the processor function
const processor = new Function('return ' + processorStr)(); const processor = new Function('return ' + processorStr)();
const jobs = items.map((item: any, index: number) => ({ const jobs = items.map((item: any, index: number) => ({
name: 'process-item', name: 'process-item',
data: { data: {
@ -292,9 +279,6 @@ async function storePayload<T>(
processor: (item: T, index: number) => any, processor: (item: T, index: number) => any,
options: ProcessOptions options: ProcessOptions
): Promise<string> { ): Promise<string> {
// Ensure cache is ready using shared initialization
await ensureCacheReady();
const cache = getCache(); const cache = getCache();
const key = `payload_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; const key = `payload_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
@ -315,8 +299,7 @@ async function storePayload<T>(
logger.debug('Storing batch payload', { logger.debug('Storing batch payload', {
key, key,
itemCount: items.length, itemCount: items.length
cacheReady: cache.isReady()
}); });
await cache.set(key, payload, options.ttl || 86400); await cache.set(key, payload, options.ttl || 86400);
@ -330,23 +313,14 @@ async function storePayload<T>(
} }
async function loadPayload(key: string): Promise<any> { async function loadPayload(key: string): Promise<any> {
// Ensure cache is ready using shared initialization
await ensureCacheReady();
const cache = getCache(); const cache = getCache();
logger.debug('Loading batch payload', { logger.debug('Loading batch payload', { key });
key,
cacheReady: cache.isReady()
});
const data = await cache.get(key); const data = await cache.get(key);
if (!data) { if (!data) {
logger.error('Payload not found in cache', { logger.error('Payload not found in cache', { key });
key,
cacheReady: cache.isReady()
});
throw new Error(`Payload not found: ${key}`); throw new Error(`Payload not found: ${key}`);
} }