still trying
This commit is contained in:
parent
2f074271cc
commit
47ff92b567
4 changed files with 110 additions and 130 deletions
|
|
@ -2,21 +2,30 @@
|
||||||
* Example usage of the new functional batch processing approach
|
* Example usage of the new functional batch processing approach
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { processItems, processSymbols, processProxies, processBatchJob } from '../utils/batch-helpers';
|
import { processItems, processBatchJob } from '../utils/batch-helpers';
|
||||||
import { queueManager } from '../services/queue.service';
|
import { queueManager } from '../services/queue.service';
|
||||||
|
|
||||||
// Example 1: Process a list of symbols for live data
|
// Example 1: Process a list of symbols for live data
|
||||||
export async function exampleSymbolProcessing() {
|
export async function exampleSymbolProcessing() {
|
||||||
const symbols = ['AAPL', 'GOOGL', 'MSFT', 'TSLA', 'AMZN'];
|
const symbols = ['AAPL', 'GOOGL', 'MSFT', 'TSLA', 'AMZN'];
|
||||||
|
|
||||||
const result = await processSymbols(symbols, queueManager, {
|
const result = await processItems(
|
||||||
operation: 'live-data',
|
symbols,
|
||||||
service: 'market-data',
|
(symbol, index) => ({
|
||||||
provider: 'yahoo',
|
symbol,
|
||||||
totalDelayMs: 60000, // 1 minute total
|
index,
|
||||||
useBatching: false, // Process directly
|
source: 'batch-processing'
|
||||||
priority: 1
|
}),
|
||||||
});
|
queueManager,
|
||||||
|
{
|
||||||
|
totalDelayMs: 60000, // 1 minute total
|
||||||
|
useBatching: false, // Process directly
|
||||||
|
priority: 1,
|
||||||
|
service: 'market-data',
|
||||||
|
provider: 'yahoo',
|
||||||
|
operation: 'live-data'
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
console.log('Symbol processing result:', result);
|
console.log('Symbol processing result:', result);
|
||||||
// Output: { jobsCreated: 5, mode: 'direct', totalItems: 5, duration: 1234 }
|
// Output: { jobsCreated: 5, mode: 'direct', totalItems: 5, duration: 1234 }
|
||||||
|
|
@ -30,12 +39,24 @@ export async function exampleProxyProcessing() {
|
||||||
// ... more proxies
|
// ... more proxies
|
||||||
];
|
];
|
||||||
|
|
||||||
const result = await processProxies(proxies, queueManager, {
|
const result = await processItems(
|
||||||
totalDelayMs: 3600000, // 1 hour total
|
proxies,
|
||||||
useBatching: true, // Use batch mode
|
(proxy, index) => ({
|
||||||
batchSize: 100, // 100 proxies per batch
|
proxy,
|
||||||
priority: 2
|
index,
|
||||||
});
|
source: 'batch-processing'
|
||||||
|
}),
|
||||||
|
queueManager,
|
||||||
|
{
|
||||||
|
totalDelayMs: 3600000, // 1 hour total
|
||||||
|
useBatching: true, // Use batch mode
|
||||||
|
batchSize: 100, // 100 proxies per batch
|
||||||
|
priority: 2,
|
||||||
|
service: 'proxy',
|
||||||
|
provider: 'proxy-service',
|
||||||
|
operation: 'check-proxy'
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
console.log('Proxy processing result:', result);
|
console.log('Proxy processing result:', result);
|
||||||
// Output: { jobsCreated: 10, mode: 'batch', totalItems: 1000, batchesCreated: 10, duration: 2345 }
|
// Output: { jobsCreated: 10, mode: 'batch', totalItems: 1000, batchesCreated: 10, duration: 2345 }
|
||||||
|
|
@ -81,15 +102,15 @@ export async function exampleBatchJobProcessor(jobData: any) {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Example: Simple functional approach
|
// Example: Simple functional approach using generic processItems
|
||||||
/*
|
/*
|
||||||
await processSymbols(symbols, queueManager, {
|
await processItems(symbols, (symbol, index) => ({ symbol, index }), queueManager, {
|
||||||
operation: 'live-data',
|
|
||||||
service: 'data-service',
|
|
||||||
provider: 'yahoo',
|
|
||||||
totalDelayMs: 3600000,
|
totalDelayMs: 3600000,
|
||||||
useBatching: true,
|
useBatching: true,
|
||||||
batchSize: 200,
|
batchSize: 200,
|
||||||
priority: 2
|
priority: 2,
|
||||||
|
service: 'data-service',
|
||||||
|
provider: 'yahoo',
|
||||||
|
operation: 'live-data'
|
||||||
});
|
});
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -14,27 +14,38 @@ const getEvery24HourCron = (): string => {
|
||||||
};
|
};
|
||||||
|
|
||||||
export const proxyProvider: ProviderConfig = {
|
export const proxyProvider: ProviderConfig = {
|
||||||
name: 'proxy-service',
|
name: 'proxy-provider',
|
||||||
service: 'proxy',
|
service: 'data-service',
|
||||||
operations: { 'fetch-and-check': async (payload: { sources?: string[] }) => {
|
operations: { 'fetch-and-check': async (payload: { sources?: string[] }) => {
|
||||||
const { proxyService } = await import('./proxy.tasks');
|
const { proxyService } = await import('./proxy.tasks');
|
||||||
const { queueManager } = await import('../services/queue.service');
|
const { queueManager } = await import('../services/queue.service');
|
||||||
const { processProxies } = await import('../utils/batch-helpers');
|
const { processItems } = await import('../utils/batch-helpers');
|
||||||
|
|
||||||
const proxies = await proxyService.fetchProxiesFromSources();
|
const proxies = await proxyService.fetchProxiesFromSources();
|
||||||
|
|
||||||
if (proxies.length === 0) {
|
if (proxies.length === 0) {
|
||||||
return { proxiesFetched: 0, jobsCreated: 0 };
|
return { proxiesFetched: 0, jobsCreated: 0 };
|
||||||
} // Use simplified functional approach
|
}
|
||||||
const result = await processProxies(proxies, queueManager, {
|
|
||||||
totalDelayMs: parseInt(process.env.PROXY_VALIDATION_HOURS || '4') * 60 * 60 * 1000,
|
// Use generic function with routing parameters
|
||||||
batchSize: parseInt(process.env.PROXY_BATCH_SIZE || '200'),
|
const result = await processItems(
|
||||||
useBatching: process.env.PROXY_DIRECT_MODE !== 'true',
|
proxies,
|
||||||
priority: 2,
|
(proxy, index) => ({
|
||||||
service: 'proxy',
|
proxy,
|
||||||
provider: 'proxy-service',
|
index,
|
||||||
operation: 'check-proxy'
|
source: 'batch-processing'
|
||||||
});return {
|
}),
|
||||||
|
queueManager,
|
||||||
|
{
|
||||||
|
totalDelayMs: parseInt(process.env.PROXY_VALIDATION_HOURS || '4') * 60 * 60 * 1000,
|
||||||
|
batchSize: parseInt(process.env.PROXY_BATCH_SIZE || '200'),
|
||||||
|
useBatching: process.env.PROXY_DIRECT_MODE !== 'true',
|
||||||
|
priority: 2,
|
||||||
|
service: 'data-service',
|
||||||
|
provider: 'proxy-provider',
|
||||||
|
operation: 'check-proxy'
|
||||||
|
}
|
||||||
|
);return {
|
||||||
proxiesFetched: result.totalItems,
|
proxiesFetched: result.totalItems,
|
||||||
jobsCreated: result.jobsCreated,
|
jobsCreated: result.jobsCreated,
|
||||||
mode: result.mode,
|
mode: result.mode,
|
||||||
|
|
|
||||||
|
|
@ -30,6 +30,8 @@ export interface BatchResult {
|
||||||
|
|
||||||
// Cache instance for payload storage
|
// Cache instance for payload storage
|
||||||
let cacheProvider: CacheProvider | null = null;
|
let cacheProvider: CacheProvider | null = null;
|
||||||
|
let cacheInitialized = false;
|
||||||
|
let cacheInitPromise: Promise<void> | null = null;
|
||||||
|
|
||||||
function getCache(): CacheProvider {
|
function getCache(): CacheProvider {
|
||||||
if (!cacheProvider) {
|
if (!cacheProvider) {
|
||||||
|
|
@ -42,6 +44,29 @@ function getCache(): CacheProvider {
|
||||||
return cacheProvider;
|
return cacheProvider;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function ensureCacheReady(): Promise<void> {
|
||||||
|
if (cacheInitialized) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cacheInitPromise) {
|
||||||
|
return cacheInitPromise;
|
||||||
|
}
|
||||||
|
|
||||||
|
cacheInitPromise = (async () => {
|
||||||
|
const cache = getCache();
|
||||||
|
try {
|
||||||
|
await cache.waitForReady(10000);
|
||||||
|
cacheInitialized = true;
|
||||||
|
} catch (error) {
|
||||||
|
logger.warn('Cache initialization timeout, proceeding anyway', { error });
|
||||||
|
// Don't throw - let operations continue with potential fallback
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
|
||||||
|
return cacheInitPromise;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Main function - processes items either directly or in batches
|
* Main function - processes items either directly or in batches
|
||||||
*/
|
*/
|
||||||
|
|
@ -163,9 +188,9 @@ async function processBatched<T>(
|
||||||
name: 'process-batch',
|
name: 'process-batch',
|
||||||
data: {
|
data: {
|
||||||
type: 'process-batch',
|
type: 'process-batch',
|
||||||
service: 'batch-processor',
|
service: options.service || 'generic',
|
||||||
provider: 'batch',
|
provider: options.provider || 'generic',
|
||||||
operation: 'process-batch-items',
|
operation: options.operation || 'generic',
|
||||||
payload: {
|
payload: {
|
||||||
payloadKey,
|
payloadKey,
|
||||||
batchIndex,
|
batchIndex,
|
||||||
|
|
@ -222,9 +247,9 @@ export async function processBatchJob(jobData: any, queue: QueueService): Promis
|
||||||
name: 'process-item',
|
name: 'process-item',
|
||||||
data: {
|
data: {
|
||||||
type: 'process-item',
|
type: 'process-item',
|
||||||
service: options.service || 'data-service',
|
service: options.service || 'generic',
|
||||||
provider: options.provider || 'generic',
|
provider: options.provider || 'generic',
|
||||||
operation: options.operation || 'process-item',
|
operation: options.operation || 'generic',
|
||||||
payload: processor(item, index),
|
payload: processor(item, index),
|
||||||
priority: options.priority || 1
|
priority: options.priority || 1
|
||||||
},
|
},
|
||||||
|
|
@ -267,11 +292,10 @@ async function storePayload<T>(
|
||||||
processor: (item: T, index: number) => any,
|
processor: (item: T, index: number) => any,
|
||||||
options: ProcessOptions
|
options: ProcessOptions
|
||||||
): Promise<string> {
|
): Promise<string> {
|
||||||
|
// Ensure cache is ready using shared initialization
|
||||||
|
await ensureCacheReady();
|
||||||
|
|
||||||
const cache = getCache();
|
const cache = getCache();
|
||||||
|
|
||||||
// Wait for cache to be ready before storing
|
|
||||||
await cache.waitForReady(5000);
|
|
||||||
|
|
||||||
const key = `payload_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
|
const key = `payload_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
|
||||||
|
|
||||||
const payload = {
|
const payload = {
|
||||||
|
|
@ -282,9 +306,9 @@ async function storePayload<T>(
|
||||||
priority: options.priority || 1,
|
priority: options.priority || 1,
|
||||||
retries: options.retries || 3,
|
retries: options.retries || 3,
|
||||||
// Store routing information for later use
|
// Store routing information for later use
|
||||||
service: options.service || 'data-service',
|
service: options.service || 'generic',
|
||||||
provider: options.provider || 'generic',
|
provider: options.provider || 'generic',
|
||||||
operation: options.operation || 'process-item'
|
operation: options.operation || 'generic'
|
||||||
},
|
},
|
||||||
createdAt: Date.now()
|
createdAt: Date.now()
|
||||||
};
|
};
|
||||||
|
|
@ -306,10 +330,10 @@ async function storePayload<T>(
|
||||||
}
|
}
|
||||||
|
|
||||||
async function loadPayload(key: string): Promise<any> {
|
async function loadPayload(key: string): Promise<any> {
|
||||||
const cache = getCache();
|
// Ensure cache is ready using shared initialization
|
||||||
|
await ensureCacheReady();
|
||||||
|
|
||||||
// Wait for cache to be ready before loading
|
const cache = getCache();
|
||||||
await cache.waitForReady(5000);
|
|
||||||
|
|
||||||
logger.debug('Loading batch payload', {
|
logger.debug('Loading batch payload', {
|
||||||
key,
|
key,
|
||||||
|
|
@ -365,70 +389,4 @@ async function addJobsInChunks(queue: QueueService, jobs: any[], chunkSize = 100
|
||||||
return allCreatedJobs;
|
return allCreatedJobs;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Convenience functions for common use cases
|
|
||||||
|
|
||||||
export async function processSymbols(
|
|
||||||
symbols: string[],
|
|
||||||
queue: QueueService,
|
|
||||||
options: {
|
|
||||||
operation: string;
|
|
||||||
service: string;
|
|
||||||
provider: string;
|
|
||||||
totalDelayMs: number;
|
|
||||||
useBatching?: boolean;
|
|
||||||
batchSize?: number;
|
|
||||||
priority?: number;
|
|
||||||
}
|
|
||||||
): Promise<BatchResult> {
|
|
||||||
return processItems(
|
|
||||||
symbols,
|
|
||||||
(symbol, index) => ({
|
|
||||||
symbol,
|
|
||||||
index,
|
|
||||||
source: 'batch-processing'
|
|
||||||
}),
|
|
||||||
queue,
|
|
||||||
{
|
|
||||||
totalDelayMs: options.totalDelayMs,
|
|
||||||
batchSize: options.batchSize || 100,
|
|
||||||
priority: options.priority || 1,
|
|
||||||
useBatching: options.useBatching || false,
|
|
||||||
service: options.service,
|
|
||||||
provider: options.provider,
|
|
||||||
operation: options.operation
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function processProxies(
|
|
||||||
proxies: any[],
|
|
||||||
queue: QueueService,
|
|
||||||
options: {
|
|
||||||
totalDelayMs: number;
|
|
||||||
useBatching?: boolean;
|
|
||||||
batchSize?: number;
|
|
||||||
priority?: number;
|
|
||||||
service?: string;
|
|
||||||
provider?: string;
|
|
||||||
operation?: string;
|
|
||||||
}
|
|
||||||
): Promise<BatchResult> {
|
|
||||||
return processItems(
|
|
||||||
proxies,
|
|
||||||
(proxy, index) => ({
|
|
||||||
proxy,
|
|
||||||
index,
|
|
||||||
source: 'batch-processing'
|
|
||||||
}),
|
|
||||||
queue,
|
|
||||||
{
|
|
||||||
totalDelayMs: options.totalDelayMs,
|
|
||||||
batchSize: options.batchSize || 200,
|
|
||||||
priority: options.priority || 2,
|
|
||||||
useBatching: options.useBatching || true,
|
|
||||||
service: options.service || 'data-service',
|
|
||||||
provider: options.provider || 'proxy-service',
|
|
||||||
operation: options.operation || 'check-proxy'
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -50,24 +50,14 @@ const result = await processSymbols(['AAPL', 'GOOGL'], queueManager, {
|
||||||
provider: 'yahoo',
|
provider: 'yahoo',
|
||||||
totalDelayMs: 300000,
|
totalDelayMs: 300000,
|
||||||
useBatching: false,
|
useBatching: false,
|
||||||
priority: 1
|
priority: 1,
|
||||||
|
service: 'market-data',
|
||||||
|
provider: 'yahoo',
|
||||||
|
operation: 'live-data'
|
||||||
});
|
});
|
||||||
```
|
```
|
||||||
|
|
||||||
### 3. `processProxies()` - Proxy validation
|
### 3. `processBatchJob()` - Worker batch handler
|
||||||
|
|
||||||
```typescript
|
|
||||||
import { processProxies } from '../utils/batch-helpers';
|
|
||||||
|
|
||||||
const result = await processProxies(proxies, queueManager, {
|
|
||||||
totalDelayMs: 3600000,
|
|
||||||
useBatching: true,
|
|
||||||
batchSize: 200,
|
|
||||||
priority: 2
|
|
||||||
});
|
|
||||||
```
|
|
||||||
|
|
||||||
### 4. `processBatchJob()` - Worker batch handler
|
|
||||||
|
|
||||||
```typescript
|
```typescript
|
||||||
import { processBatchJob } from '../utils/batch-helpers';
|
import { processBatchJob } from '../utils/batch-helpers';
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue