still trying

This commit is contained in:
Boki 2025-06-10 22:16:11 -04:00
parent 682b50d3b2
commit 716c90060a
4 changed files with 110 additions and 130 deletions

View file

@ -2,21 +2,30 @@
* Example usage of the new functional batch processing approach
*/
import { processItems, processSymbols, processProxies, processBatchJob } from '../utils/batch-helpers';
import { processItems, processBatchJob } from '../utils/batch-helpers';
import { queueManager } from '../services/queue.service';
// Example 1: Process a list of symbols for live data
export async function exampleSymbolProcessing() {
const symbols = ['AAPL', 'GOOGL', 'MSFT', 'TSLA', 'AMZN'];
const result = await processSymbols(symbols, queueManager, {
operation: 'live-data',
service: 'market-data',
provider: 'yahoo',
totalDelayMs: 60000, // 1 minute total
useBatching: false, // Process directly
priority: 1
});
const result = await processItems(
symbols,
(symbol, index) => ({
symbol,
index,
source: 'batch-processing'
}),
queueManager,
{
totalDelayMs: 60000, // 1 minute total
useBatching: false, // Process directly
priority: 1,
service: 'market-data',
provider: 'yahoo',
operation: 'live-data'
}
);
console.log('Symbol processing result:', result);
// Output: { jobsCreated: 5, mode: 'direct', totalItems: 5, duration: 1234 }
@ -30,12 +39,24 @@ export async function exampleProxyProcessing() {
// ... more proxies
];
const result = await processProxies(proxies, queueManager, {
totalDelayMs: 3600000, // 1 hour total
useBatching: true, // Use batch mode
batchSize: 100, // 100 proxies per batch
priority: 2
});
const result = await processItems(
proxies,
(proxy, index) => ({
proxy,
index,
source: 'batch-processing'
}),
queueManager,
{
totalDelayMs: 3600000, // 1 hour total
useBatching: true, // Use batch mode
batchSize: 100, // 100 proxies per batch
priority: 2,
service: 'proxy',
provider: 'proxy-service',
operation: 'check-proxy'
}
);
console.log('Proxy processing result:', result);
// Output: { jobsCreated: 10, mode: 'batch', totalItems: 1000, batchesCreated: 10, duration: 2345 }
@ -81,15 +102,15 @@ export async function exampleBatchJobProcessor(jobData: any) {
return result;
}
// Example: Simple functional approach
// Example: Simple functional approach using generic processItems
/*
await processSymbols(symbols, queueManager, {
operation: 'live-data',
service: 'data-service',
provider: 'yahoo',
await processItems(symbols, (symbol, index) => ({ symbol, index }), queueManager, {
totalDelayMs: 3600000,
useBatching: true,
batchSize: 200,
priority: 2
priority: 2,
service: 'data-service',
provider: 'yahoo',
operation: 'live-data'
});
*/

View file

@ -14,27 +14,38 @@ const getEvery24HourCron = (): string => {
};
export const proxyProvider: ProviderConfig = {
name: 'proxy-service',
service: 'proxy',
operations: { 'fetch-and-check': async (payload: { sources?: string[] }) => {
name: 'proxy-provider',
service: 'data-service',
operations: { 'fetch-and-check': async (payload: { sources?: string[] }) => {
const { proxyService } = await import('./proxy.tasks');
const { queueManager } = await import('../services/queue.service');
const { processProxies } = await import('../utils/batch-helpers');
const { processItems } = await import('../utils/batch-helpers');
const proxies = await proxyService.fetchProxiesFromSources();
if (proxies.length === 0) {
return { proxiesFetched: 0, jobsCreated: 0 };
} // Use simplified functional approach
const result = await processProxies(proxies, queueManager, {
totalDelayMs: parseInt(process.env.PROXY_VALIDATION_HOURS || '4') * 60 * 60 * 1000,
batchSize: parseInt(process.env.PROXY_BATCH_SIZE || '200'),
useBatching: process.env.PROXY_DIRECT_MODE !== 'true',
priority: 2,
service: 'proxy',
provider: 'proxy-service',
operation: 'check-proxy'
});return {
}
// Use generic function with routing parameters
const result = await processItems(
proxies,
(proxy, index) => ({
proxy,
index,
source: 'batch-processing'
}),
queueManager,
{
totalDelayMs: parseInt(process.env.PROXY_VALIDATION_HOURS || '4') * 60 * 60 * 1000,
batchSize: parseInt(process.env.PROXY_BATCH_SIZE || '200'),
useBatching: process.env.PROXY_DIRECT_MODE !== 'true',
priority: 2,
service: 'data-service',
provider: 'proxy-provider',
operation: 'check-proxy'
}
);return {
proxiesFetched: result.totalItems,
jobsCreated: result.jobsCreated,
mode: result.mode,

View file

@ -30,6 +30,8 @@ export interface BatchResult {
// Cache instance for payload storage
let cacheProvider: CacheProvider | null = null;
let cacheInitialized = false;
let cacheInitPromise: Promise<void> | null = null;
function getCache(): CacheProvider {
if (!cacheProvider) {
@ -42,6 +44,29 @@ function getCache(): CacheProvider {
return cacheProvider;
}
async function ensureCacheReady(): Promise<void> {
if (cacheInitialized) {
return;
}
if (cacheInitPromise) {
return cacheInitPromise;
}
cacheInitPromise = (async () => {
const cache = getCache();
try {
await cache.waitForReady(10000);
cacheInitialized = true;
} catch (error) {
logger.warn('Cache initialization timeout, proceeding anyway', { error });
// Don't throw - let operations continue with potential fallback
}
})();
return cacheInitPromise;
}
/**
* Main function - processes items either directly or in batches
*/
@ -163,9 +188,9 @@ async function processBatched<T>(
name: 'process-batch',
data: {
type: 'process-batch',
service: 'batch-processor',
provider: 'batch',
operation: 'process-batch-items',
service: options.service || 'generic',
provider: options.provider || 'generic',
operation: options.operation || 'generic',
payload: {
payloadKey,
batchIndex,
@ -222,9 +247,9 @@ export async function processBatchJob(jobData: any, queue: QueueService): Promis
name: 'process-item',
data: {
type: 'process-item',
service: options.service || 'data-service',
service: options.service || 'generic',
provider: options.provider || 'generic',
operation: options.operation || 'process-item',
operation: options.operation || 'generic',
payload: processor(item, index),
priority: options.priority || 1
},
@ -267,11 +292,10 @@ async function storePayload<T>(
processor: (item: T, index: number) => any,
options: ProcessOptions
): Promise<string> {
// Ensure cache is ready using shared initialization
await ensureCacheReady();
const cache = getCache();
// Wait for cache to be ready before storing
await cache.waitForReady(5000);
const key = `payload_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
const payload = {
@ -282,9 +306,9 @@ async function storePayload<T>(
priority: options.priority || 1,
retries: options.retries || 3,
// Store routing information for later use
service: options.service || 'data-service',
service: options.service || 'generic',
provider: options.provider || 'generic',
operation: options.operation || 'process-item'
operation: options.operation || 'generic'
},
createdAt: Date.now()
};
@ -306,10 +330,10 @@ async function storePayload<T>(
}
async function loadPayload(key: string): Promise<any> {
const cache = getCache();
// Ensure cache is ready using shared initialization
await ensureCacheReady();
// Wait for cache to be ready before loading
await cache.waitForReady(5000);
const cache = getCache();
logger.debug('Loading batch payload', {
key,
@ -365,70 +389,4 @@ async function addJobsInChunks(queue: QueueService, jobs: any[], chunkSize = 100
return allCreatedJobs;
}
// Convenience functions for common use cases
export async function processSymbols(
symbols: string[],
queue: QueueService,
options: {
operation: string;
service: string;
provider: string;
totalDelayMs: number;
useBatching?: boolean;
batchSize?: number;
priority?: number;
}
): Promise<BatchResult> {
return processItems(
symbols,
(symbol, index) => ({
symbol,
index,
source: 'batch-processing'
}),
queue,
{
totalDelayMs: options.totalDelayMs,
batchSize: options.batchSize || 100,
priority: options.priority || 1,
useBatching: options.useBatching || false,
service: options.service,
provider: options.provider,
operation: options.operation
}
);
}
export async function processProxies(
proxies: any[],
queue: QueueService,
options: {
totalDelayMs: number;
useBatching?: boolean;
batchSize?: number;
priority?: number;
service?: string;
provider?: string;
operation?: string;
}
): Promise<BatchResult> {
return processItems(
proxies,
(proxy, index) => ({
proxy,
index,
source: 'batch-processing'
}),
queue,
{
totalDelayMs: options.totalDelayMs,
batchSize: options.batchSize || 200,
priority: options.priority || 2,
useBatching: options.useBatching || true,
service: options.service || 'data-service',
provider: options.provider || 'proxy-service',
operation: options.operation || 'check-proxy'
}
);
}

View file

@ -50,24 +50,14 @@ const result = await processSymbols(['AAPL', 'GOOGL'], queueManager, {
provider: 'yahoo',
totalDelayMs: 300000,
useBatching: false,
priority: 1
priority: 1,
service: 'market-data',
provider: 'yahoo',
operation: 'live-data'
});
```
### 3. `processProxies()` - Proxy validation
```typescript
import { processProxies } from '../utils/batch-helpers';
const result = await processProxies(proxies, queueManager, {
totalDelayMs: 3600000,
useBatching: true,
batchSize: 200,
priority: 2
});
```
### 4. `processBatchJob()` - Worker batch handler
### 3. `processBatchJob()` - Worker batch handler
```typescript
import { processBatchJob } from '../utils/batch-helpers';