From 47ff92b567b5c788d02c4a8ec548d68e364bd694 Mon Sep 17 00:00:00 2001 From: Boki Date: Tue, 10 Jun 2025 22:16:11 -0400 Subject: [PATCH] still trying --- .../src/examples/batch-processing-examples.ts | 63 ++++++---- .../src/providers/proxy.provider.ts | 39 +++--- apps/data-service/src/utils/batch-helpers.ts | 118 ++++++------------ docs/batch-processing-migration.md | 20 +-- 4 files changed, 110 insertions(+), 130 deletions(-) diff --git a/apps/data-service/src/examples/batch-processing-examples.ts b/apps/data-service/src/examples/batch-processing-examples.ts index 21011b4..72e5e8f 100644 --- a/apps/data-service/src/examples/batch-processing-examples.ts +++ b/apps/data-service/src/examples/batch-processing-examples.ts @@ -2,21 +2,30 @@ * Example usage of the new functional batch processing approach */ -import { processItems, processSymbols, processProxies, processBatchJob } from '../utils/batch-helpers'; +import { processItems, processBatchJob } from '../utils/batch-helpers'; import { queueManager } from '../services/queue.service'; // Example 1: Process a list of symbols for live data export async function exampleSymbolProcessing() { const symbols = ['AAPL', 'GOOGL', 'MSFT', 'TSLA', 'AMZN']; - const result = await processSymbols(symbols, queueManager, { - operation: 'live-data', - service: 'market-data', - provider: 'yahoo', - totalDelayMs: 60000, // 1 minute total - useBatching: false, // Process directly - priority: 1 - }); + const result = await processItems( + symbols, + (symbol, index) => ({ + symbol, + index, + source: 'batch-processing' + }), + queueManager, + { + totalDelayMs: 60000, // 1 minute total + useBatching: false, // Process directly + priority: 1, + service: 'market-data', + provider: 'yahoo', + operation: 'live-data' + } + ); console.log('Symbol processing result:', result); // Output: { jobsCreated: 5, mode: 'direct', totalItems: 5, duration: 1234 } @@ -30,12 +39,24 @@ export async function exampleProxyProcessing() { // ... more proxies ]; - const result = await processProxies(proxies, queueManager, { - totalDelayMs: 3600000, // 1 hour total - useBatching: true, // Use batch mode - batchSize: 100, // 100 proxies per batch - priority: 2 - }); + const result = await processItems( + proxies, + (proxy, index) => ({ + proxy, + index, + source: 'batch-processing' + }), + queueManager, + { + totalDelayMs: 3600000, // 1 hour total + useBatching: true, // Use batch mode + batchSize: 100, // 100 proxies per batch + priority: 2, + service: 'proxy', + provider: 'proxy-service', + operation: 'check-proxy' + } + ); console.log('Proxy processing result:', result); // Output: { jobsCreated: 10, mode: 'batch', totalItems: 1000, batchesCreated: 10, duration: 2345 } @@ -81,15 +102,15 @@ export async function exampleBatchJobProcessor(jobData: any) { return result; } -// Example: Simple functional approach +// Example: Simple functional approach using generic processItems /* -await processSymbols(symbols, queueManager, { - operation: 'live-data', - service: 'data-service', - provider: 'yahoo', +await processItems(symbols, (symbol, index) => ({ symbol, index }), queueManager, { totalDelayMs: 3600000, useBatching: true, batchSize: 200, - priority: 2 + priority: 2, + service: 'data-service', + provider: 'yahoo', + operation: 'live-data' }); */ diff --git a/apps/data-service/src/providers/proxy.provider.ts b/apps/data-service/src/providers/proxy.provider.ts index 44b5c97..bf67137 100644 --- a/apps/data-service/src/providers/proxy.provider.ts +++ b/apps/data-service/src/providers/proxy.provider.ts @@ -14,27 +14,38 @@ const getEvery24HourCron = (): string => { }; export const proxyProvider: ProviderConfig = { - name: 'proxy-service', - service: 'proxy', - operations: { 'fetch-and-check': async (payload: { sources?: string[] }) => { + name: 'proxy-provider', + service: 'data-service', + operations: { 'fetch-and-check': async (payload: { sources?: string[] }) => { const { proxyService } = await import('./proxy.tasks'); const { queueManager } = await import('../services/queue.service'); - const { processProxies } = await import('../utils/batch-helpers'); + const { processItems } = await import('../utils/batch-helpers'); const proxies = await proxyService.fetchProxiesFromSources(); if (proxies.length === 0) { return { proxiesFetched: 0, jobsCreated: 0 }; - } // Use simplified functional approach - const result = await processProxies(proxies, queueManager, { - totalDelayMs: parseInt(process.env.PROXY_VALIDATION_HOURS || '4') * 60 * 60 * 1000, - batchSize: parseInt(process.env.PROXY_BATCH_SIZE || '200'), - useBatching: process.env.PROXY_DIRECT_MODE !== 'true', - priority: 2, - service: 'proxy', - provider: 'proxy-service', - operation: 'check-proxy' - });return { + } + + // Use generic function with routing parameters + const result = await processItems( + proxies, + (proxy, index) => ({ + proxy, + index, + source: 'batch-processing' + }), + queueManager, + { + totalDelayMs: parseInt(process.env.PROXY_VALIDATION_HOURS || '4') * 60 * 60 * 1000, + batchSize: parseInt(process.env.PROXY_BATCH_SIZE || '200'), + useBatching: process.env.PROXY_DIRECT_MODE !== 'true', + priority: 2, + service: 'data-service', + provider: 'proxy-provider', + operation: 'check-proxy' + } + );return { proxiesFetched: result.totalItems, jobsCreated: result.jobsCreated, mode: result.mode, diff --git a/apps/data-service/src/utils/batch-helpers.ts b/apps/data-service/src/utils/batch-helpers.ts index 398fc1c..80f3302 100644 --- a/apps/data-service/src/utils/batch-helpers.ts +++ b/apps/data-service/src/utils/batch-helpers.ts @@ -30,6 +30,8 @@ export interface BatchResult { // Cache instance for payload storage let cacheProvider: CacheProvider | null = null; +let cacheInitialized = false; +let cacheInitPromise: Promise | null = null; function getCache(): CacheProvider { if (!cacheProvider) { @@ -42,6 +44,29 @@ function getCache(): CacheProvider { return cacheProvider; } +async function ensureCacheReady(): Promise { + if (cacheInitialized) { + return; + } + + if (cacheInitPromise) { + return cacheInitPromise; + } + + cacheInitPromise = (async () => { + const cache = getCache(); + try { + await cache.waitForReady(10000); + cacheInitialized = true; + } catch (error) { + logger.warn('Cache initialization timeout, proceeding anyway', { error }); + // Don't throw - let operations continue with potential fallback + } + })(); + + return cacheInitPromise; +} + /** * Main function - processes items either directly or in batches */ @@ -163,9 +188,9 @@ async function processBatched( name: 'process-batch', data: { type: 'process-batch', - service: 'batch-processor', - provider: 'batch', - operation: 'process-batch-items', + service: options.service || 'generic', + provider: options.provider || 'generic', + operation: options.operation || 'generic', payload: { payloadKey, batchIndex, @@ -222,9 +247,9 @@ export async function processBatchJob(jobData: any, queue: QueueService): Promis name: 'process-item', data: { type: 'process-item', - service: options.service || 'data-service', + service: options.service || 'generic', provider: options.provider || 'generic', - operation: options.operation || 'process-item', + operation: options.operation || 'generic', payload: processor(item, index), priority: options.priority || 1 }, @@ -267,11 +292,10 @@ async function storePayload( processor: (item: T, index: number) => any, options: ProcessOptions ): Promise { + // Ensure cache is ready using shared initialization + await ensureCacheReady(); + const cache = getCache(); - - // Wait for cache to be ready before storing - await cache.waitForReady(5000); - const key = `payload_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; const payload = { @@ -282,9 +306,9 @@ async function storePayload( priority: options.priority || 1, retries: options.retries || 3, // Store routing information for later use - service: options.service || 'data-service', + service: options.service || 'generic', provider: options.provider || 'generic', - operation: options.operation || 'process-item' + operation: options.operation || 'generic' }, createdAt: Date.now() }; @@ -306,10 +330,10 @@ async function storePayload( } async function loadPayload(key: string): Promise { - const cache = getCache(); + // Ensure cache is ready using shared initialization + await ensureCacheReady(); - // Wait for cache to be ready before loading - await cache.waitForReady(5000); + const cache = getCache(); logger.debug('Loading batch payload', { key, @@ -365,70 +389,4 @@ async function addJobsInChunks(queue: QueueService, jobs: any[], chunkSize = 100 return allCreatedJobs; } -// Convenience functions for common use cases -export async function processSymbols( - symbols: string[], - queue: QueueService, - options: { - operation: string; - service: string; - provider: string; - totalDelayMs: number; - useBatching?: boolean; - batchSize?: number; - priority?: number; - } -): Promise { - return processItems( - symbols, - (symbol, index) => ({ - symbol, - index, - source: 'batch-processing' - }), - queue, - { - totalDelayMs: options.totalDelayMs, - batchSize: options.batchSize || 100, - priority: options.priority || 1, - useBatching: options.useBatching || false, - service: options.service, - provider: options.provider, - operation: options.operation - } - ); -} - -export async function processProxies( - proxies: any[], - queue: QueueService, - options: { - totalDelayMs: number; - useBatching?: boolean; - batchSize?: number; - priority?: number; - service?: string; - provider?: string; - operation?: string; - } -): Promise { - return processItems( - proxies, - (proxy, index) => ({ - proxy, - index, - source: 'batch-processing' - }), - queue, - { - totalDelayMs: options.totalDelayMs, - batchSize: options.batchSize || 200, - priority: options.priority || 2, - useBatching: options.useBatching || true, - service: options.service || 'data-service', - provider: options.provider || 'proxy-service', - operation: options.operation || 'check-proxy' - } - ); -} diff --git a/docs/batch-processing-migration.md b/docs/batch-processing-migration.md index b596e79..2c7fcaa 100644 --- a/docs/batch-processing-migration.md +++ b/docs/batch-processing-migration.md @@ -50,24 +50,14 @@ const result = await processSymbols(['AAPL', 'GOOGL'], queueManager, { provider: 'yahoo', totalDelayMs: 300000, useBatching: false, - priority: 1 + priority: 1, + service: 'market-data', + provider: 'yahoo', + operation: 'live-data' }); ``` -### 3. `processProxies()` - Proxy validation - -```typescript -import { processProxies } from '../utils/batch-helpers'; - -const result = await processProxies(proxies, queueManager, { - totalDelayMs: 3600000, - useBatching: true, - batchSize: 200, - priority: 2 -}); -``` - -### 4. `processBatchJob()` - Worker batch handler +### 3. `processBatchJob()` - Worker batch handler ```typescript import { processBatchJob } from '../utils/batch-helpers';