cleanup old init code on batcher

This commit is contained in:
Boki 2025-06-10 22:28:56 -04:00
parent 716c90060a
commit 9c072f91f1
3 changed files with 20 additions and 40 deletions

View file

@ -6,6 +6,7 @@ import { loadEnvVariables } from '@stock-bot/config';
import { Hono } from 'hono';
import { onShutdown, setShutdownTimeout } from '@stock-bot/shutdown';
import { queueManager } from './services/queue.service';
import { initializeBatchCache } from './utils/batch-helpers';
import {
healthRoutes,
queueRoutes,
@ -34,6 +35,11 @@ async function initializeServices() {
logger.info('Initializing data service...');
try {
// Initialize batch cache FIRST - before queue service
logger.info('Starting batch cache initialization...');
await initializeBatchCache();
logger.info('Batch cache initialized');
// Initialize queue service (Redis connections should be ready now)
logger.info('Starting queue service initialization...');
await queueManager.initialize();

View file

@ -30,8 +30,6 @@ export interface BatchResult {
// Cache instance for payload storage
let cacheProvider: CacheProvider | null = null;
let cacheInitialized = false;
let cacheInitPromise: Promise<void> | null = null;
function getCache(): CacheProvider {
if (!cacheProvider) {
@ -44,27 +42,15 @@ function getCache(): CacheProvider {
return cacheProvider;
}
async function ensureCacheReady(): Promise<void> {
if (cacheInitialized) {
return;
}
if (cacheInitPromise) {
return cacheInitPromise;
}
cacheInitPromise = (async () => {
const cache = getCache();
try {
await cache.waitForReady(10000);
cacheInitialized = true;
} catch (error) {
logger.warn('Cache initialization timeout, proceeding anyway', { error });
// Don't throw - let operations continue with potential fallback
}
})();
return cacheInitPromise;
/**
* Initialize the batch cache before any batch operations
* This should be called during application startup
*/
export async function initializeBatchCache(): Promise<void> {
logger.info('Initializing batch cache...');
const cache = getCache();
await cache.waitForReady(10000);
logger.info('Batch cache initialized successfully');
}
/**
@ -238,11 +224,12 @@ export async function processBatchJob(jobData: any, queue: QueueService): Promis
logger.error('Invalid payload data', { payloadKey, payload });
throw new Error(`Invalid payload data for key: ${payloadKey}`);
}
const { items, processorStr, options } = payload;
// Deserialize the processor function
const processor = new Function('return ' + processorStr)();
const jobs = items.map((item: any, index: number) => ({
name: 'process-item',
data: {
@ -292,9 +279,6 @@ async function storePayload<T>(
processor: (item: T, index: number) => any,
options: ProcessOptions
): Promise<string> {
// Ensure cache is ready using shared initialization
await ensureCacheReady();
const cache = getCache();
const key = `payload_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
@ -315,8 +299,7 @@ async function storePayload<T>(
logger.debug('Storing batch payload', {
key,
itemCount: items.length,
cacheReady: cache.isReady()
itemCount: items.length
});
await cache.set(key, payload, options.ttl || 86400);
@ -330,23 +313,14 @@ async function storePayload<T>(
}
async function loadPayload(key: string): Promise<any> {
// Ensure cache is ready using shared initialization
await ensureCacheReady();
const cache = getCache();
logger.debug('Loading batch payload', {
key,
cacheReady: cache.isReady()
});
logger.debug('Loading batch payload', { key });
const data = await cache.get(key);
if (!data) {
logger.error('Payload not found in cache', {
key,
cacheReady: cache.isReady()
});
logger.error('Payload not found in cache', { key });
throw new Error(`Payload not found: ${key}`);
}