fixed batcher for proxy

This commit is contained in:
Boki 2025-06-15 09:08:28 -04:00
parent e8c90532d5
commit 98d5f43eb8
5 changed files with 15 additions and 565 deletions

View file

@ -32,10 +32,7 @@ export function initializeProxyProvider() {
}
// Batch process the proxies through check-proxy operation
const batchResult = await processItems(
proxies,
queueManager,
{
const batchResult = await processItems(proxies, queueManager, {
provider: 'proxy',
operation: 'check-proxy',
totalDelayHours: 0.083, // 5 minutes (5/60 hours)
@ -46,22 +43,21 @@ export function initializeProxyProvider() {
ttl: 30000, // 30 second timeout per proxy check
removeOnComplete: 5,
removeOnFail: 3,
}
);
});
logger.info('Batch proxy validation completed', {
totalProxies: proxies.length,
jobsCreated: batchResult.jobsCreated,
mode: batchResult.mode,
batchesCreated: batchResult.batchesCreated,
duration: `${batchResult.duration}ms`
duration: `${batchResult.duration}ms`,
});
return {
processed: proxies.length,
jobsCreated: batchResult.jobsCreated,
batchesCreated: batchResult.batchesCreated,
mode: batchResult.mode
mode: batchResult.mode,
};
},

View file

@ -1,87 +0,0 @@
import { initializeBatchCache, processItems, QueueManager } from '@stock-bot/queue';
async function basicUsageExample() {
console.log('=== Basic Queue Usage Example ===');
// 1. Initialize queue manager
const queueManager = new QueueManager({
queueName: 'example-queue',
workers: 3,
concurrency: 10,
redis: {
host: 'localhost',
port: 6379,
},
});
// 2. Register providers
queueManager.registerProvider('market-data', {
'fetch-price': async payload => {
// payload is now the raw symbol string
console.log(`Fetching price for ${payload}`);
// Simulate API call
await new Promise(resolve => setTimeout(resolve, 100));
return {
symbol: payload,
price: Math.random() * 1000,
timestamp: new Date().toISOString(),
};
},
'update-cache': async payload => {
// payload is now the raw symbol string
console.log(`Updating cache for ${payload}`);
// Simulate cache update
await new Promise(resolve => setTimeout(resolve, 50));
return { success: true, symbol: payload };
},
});
// 3. Initialize
await queueManager.initialize();
await initializeBatchCache(queueManager);
// 4. Add individual jobs
console.log('Adding individual jobs...');
await queueManager.add('fetch-price', {
provider: 'market-data',
operation: 'fetch-price',
payload: 'AAPL', // Direct symbol instead of wrapped object
});
// 5. Process items in batch
console.log('Processing items in batch...');
const symbols = ['GOOGL', 'MSFT', 'TSLA', 'AMZN'];
const result = await processItems(symbols, queueManager, {
totalDelayHours: 0.0083, // 30 seconds
useBatching: true,
batchSize: 2,
priority: 1,
provider: 'market-data',
operation: 'fetch-price',
});
console.log('Batch processing result:', result);
// 6. Get queue statistics
const stats = await queueManager.getStats();
console.log('Queue stats:', stats);
// 7. Clean up old jobs
await queueManager.clean(60000); // Clean jobs older than 1 minute
// 8. Shutdown gracefully
setTimeout(async () => {
console.log('Shutting down...');
await queueManager.shutdown();
console.log('Shutdown complete');
}, 35000);
}
// Run the example
if (require.main === module) {
basicUsageExample().catch(console.error);
}
export { basicUsageExample };

View file

@ -1,200 +0,0 @@
import { initializeBatchCache, processItems, QueueManager } from '@stock-bot/queue';
async function batchProcessingExample() {
console.log('=== Batch Processing Example ===');
// Initialize queue manager
const queueManager = new QueueManager({
queueName: 'batch-example-queue',
workers: 2,
concurrency: 5,
});
// Register data processing provider
queueManager.registerProvider('data-processor', {
'process-item': async payload => {
console.log(`Processing item: ${JSON.stringify(payload)}`);
// Simulate processing time
await new Promise(resolve => setTimeout(resolve, 200));
return { processed: true, originalData: payload };
},
'analyze-symbol': async payload => {
// payload is now the raw symbol string
console.log(`Analyzing symbol: ${payload}`);
// Simulate analysis
await new Promise(resolve => setTimeout(resolve, 150));
return {
symbol: payload,
analysis: {
trend: Math.random() > 0.5 ? 'up' : 'down',
confidence: Math.random(),
timestamp: new Date().toISOString(),
},
};
},
});
await queueManager.initialize();
await initializeBatchCache(queueManager);
// Example 1: Direct processing (each item = separate job)
console.log('\n--- Direct Processing Example ---');
const directResult = await processItems(
[1, 2, 3, 4, 5], // Just pass the array directly!
queueManager,
{
totalDelayHours: 0.0042, // 15 seconds
useBatching: false, // Direct mode
priority: 2,
provider: 'data-processor',
operation: 'process-item',
}
);
console.log('Direct processing result:', directResult);
// Example 2: Batch processing (groups of items)
console.log('\n--- Batch Processing Example ---');
const batchData = Array.from({ length: 25 }, (_, i) => ({
id: i + 1,
value: Math.random() * 100,
category: i % 3 === 0 ? 'A' : i % 3 === 1 ? 'B' : 'C',
}));
const batchResult = await processItems(batchData, queueManager, {
totalDelayHours: 0.0056, // 20 seconds
useBatching: true, // Batch mode
batchSize: 5, // 5 items per batch
priority: 1,
provider: 'data-processor',
operation: 'process-item',
});
console.log('Batch processing result:', batchResult);
// Example 3: Symbol processing (using processItems)
console.log('\n--- Symbol Processing Example ---');
const symbols = ['AAPL', 'GOOGL', 'MSFT', 'TSLA', 'AMZN', 'META', 'NFLX'];
const symbolResult = await processItems(symbols, queueManager, {
operation: 'analyze-symbol',
provider: 'data-processor',
totalDelayHours: 0.0069, // 25 seconds
useBatching: true,
batchSize: 3,
priority: 1,
});
console.log('Symbol processing result:', symbolResult);
// Example 4: Large dataset with optimal batching
console.log('\n--- Large Dataset Example ---');
const largeDataset = Array.from({ length: 1000 }, (_, i) => ({
id: i + 1,
data: `item-${i + 1}`,
random: Math.random(),
}));
const largeResult = await processItems(largeDataset, queueManager, {
totalDelayHours: 0.0167, // 1 minute
useBatching: true,
batchSize: 50, // 50 items per batch
priority: 3,
provider: 'data-processor',
operation: 'process-item',
retries: 2,
removeOnComplete: 5,
removeOnFail: 10,
});
console.log('Large dataset result:', largeResult);
// Monitor queue progress
console.log('\n--- Monitoring Queue ---');
const monitorInterval = setInterval(async () => {
const stats = await queueManager.getStats();
console.log('Queue stats:', {
waiting: stats.waiting,
active: stats.active,
completed: stats.completed,
failed: stats.failed,
});
// Stop monitoring when queue is mostly empty
if (stats.waiting === 0 && stats.active === 0) {
clearInterval(monitorInterval);
console.log('Queue processing complete!');
setTimeout(async () => {
await queueManager.shutdown();
console.log('Shutdown complete');
}, 2000);
}
}, 5000);
}
// Utility function to compare processing modes
async function compareProcessingModes() {
console.log('\n=== Processing Mode Comparison ===');
const queueManager = new QueueManager({
queueName: 'comparison-queue',
workers: 2,
concurrency: 10,
});
queueManager.registerProvider('test', {
process: async payload => {
await new Promise(resolve => setTimeout(resolve, 100));
return { processed: true, originalData: payload };
},
});
await queueManager.initialize();
await initializeBatchCache(queueManager);
const testData = Array.from({ length: 20 }, (_, i) => ({ id: i + 1 }));
// Test direct mode
console.log('Testing direct mode...');
const directStart = Date.now();
const directResult = await processItems(testData, queueManager, {
totalDelayHours: 0.0028, // 10 seconds
useBatching: false,
provider: 'test',
operation: 'process',
});
console.log('Direct mode:', {
...directResult,
actualDuration: Date.now() - directStart,
});
// Test batch mode
console.log('Testing batch mode...');
const batchStart = Date.now();
const batchResult = await processItems(testData, queueManager, {
totalDelayHours: 0.0028, // 10 seconds
useBatching: true,
batchSize: 5,
provider: 'test',
operation: 'process',
});
console.log('Batch mode:', {
...batchResult,
actualDuration: Date.now() - batchStart,
});
setTimeout(async () => {
await queueManager.shutdown();
}, 15000);
}
// Run examples
if (require.main === module) {
batchProcessingExample()
.then(() => compareProcessingModes())
.catch(console.error);
}
export { batchProcessingExample, compareProcessingModes };

View file

@ -1,211 +0,0 @@
// Migration example from existing QueueService to new QueueManager
// OLD WAY (using existing QueueService)
/*
import { QueueService } from '../services/queue.service';
import { providerRegistry } from '../services/provider-registry.service';
import { processItems, initializeBatchCache } from '../utils/batch-helpers';
class OldDataService {
private queueService: QueueService;
constructor() {
this.queueService = new QueueService();
}
async initialize() {
// Register providers
providerRegistry.register('market-data', {
'live-data': async (payload) => {
// Handle live data
},
});
await this.queueService.initialize();
}
async processSymbols(symbols: string[]) {
return processSymbols(symbols, this.queueService, {
operation: 'live-data',
service: 'market-data',
provider: 'yahoo',
totalDelayMs: 300000,
});
}
}
*/
// NEW WAY (using @stock-bot/queue)
import { initializeBatchCache, processItems, QueueManager } from '@stock-bot/queue';
class NewDataService {
private queueManager: QueueManager;
constructor() {
this.queueManager = new QueueManager({
queueName: 'data-service-queue',
workers: 5,
concurrency: 20,
});
}
async initialize() {
// Register providers using the new API
this.queueManager.registerProvider('market-data', {
'live-data': async payload => {
// payload is now the raw symbol string
console.log('Processing live data for:', payload);
// Handle live data - same logic as before
return {
symbol: payload,
price: Math.random() * 1000,
timestamp: new Date().toISOString(),
};
},
'historical-data': async payload => {
// payload is now the raw symbol string
console.log('Processing historical data for:', payload);
// Handle historical data
return {
symbol: payload,
data: Array.from({ length: 100 }, (_, i) => ({
date: new Date(Date.now() - i * 86400000).toISOString(),
price: Math.random() * 1000,
})),
};
},
});
this.queueManager.registerProvider('analytics', {
'calculate-indicators': async payload => {
// payload is now the raw symbol string
console.log('Calculating indicators for:', payload);
// Calculate technical indicators
return {
symbol: payload,
indicators: {
sma20: Math.random() * 1000,
rsi: Math.random() * 100,
macd: Math.random() * 10,
},
};
},
});
await this.queueManager.initialize();
await initializeBatchCache(this.queueManager);
}
// Method that works exactly like before
async processSymbols(symbols: string[]) {
return processItems(symbols, this.queueManager, {
operation: 'live-data',
provider: 'market-data', // Note: provider name in the new system
totalDelayMs: 300000,
useBatching: false,
priority: 1,
});
}
// New method showcasing batch processing
async processSymbolsBatch(symbols: string[]) {
return processItems(symbols, this.queueManager, {
totalDelayMs: 300000,
useBatching: true,
batchSize: 50,
priority: 1,
provider: 'market-data',
operation: 'live-data',
});
}
// Analytics processing
async processAnalytics(symbols: string[]) {
return processItems(symbols, this.queueManager, {
totalDelayMs: 180000, // 3 minutes
useBatching: true,
batchSize: 20,
priority: 2,
provider: 'analytics',
operation: 'calculate-indicators',
});
}
async getQueueStats() {
return this.queueManager.getStats();
}
async shutdown() {
await this.queueManager.shutdown();
}
}
// Example usage
async function migrationExample() {
console.log('=== Migration Example ===');
const dataService = new NewDataService();
await dataService.initialize();
const symbols = ['AAPL', 'GOOGL', 'MSFT', 'TSLA'];
// Test symbol processing (works like before)
console.log('Processing symbols (direct)...');
const directResult = await dataService.processSymbols(symbols.slice(0, 2));
console.log('Direct result:', directResult);
// Test batch processing (new capability)
console.log('Processing symbols (batch)...');
const batchResult = await dataService.processSymbolsBatch(symbols);
console.log('Batch result:', batchResult);
// Test analytics processing
console.log('Processing analytics...');
const analyticsResult = await dataService.processAnalytics(symbols);
console.log('Analytics result:', analyticsResult);
// Monitor progress
setInterval(async () => {
const stats = await dataService.getQueueStats();
console.log('Queue stats:', stats);
if (stats.waiting === 0 && stats.active === 0) {
console.log('All jobs complete!');
await dataService.shutdown();
process.exit(0);
}
}, 3000);
}
// Key Migration Steps:
/*
1. IMPORTS:
- Replace: import { QueueService } from '../services/queue.service'
- With: import { QueueManager } from '@stock-bot/queue'
2. PROVIDER REGISTRATION:
- Replace: providerRegistry.register(...)
- With: queueManager.registerProvider(...)
3. INITIALIZATION:
- Replace: await queueService.initialize()
- With: await queueManager.initialize() + await initializeBatchCache()
4. BATCH HELPERS:
- Replace: import { processItems } from '../utils/batch-helpers'
- With: import { processItems } from '@stock-bot/queue'
5. JOB PARAMETERS:
- totalDelayHours totalDelayMs (convert hours to milliseconds)
- Ensure provider names match registered providers
6. CONFIGURATION:
- Use QueueConfig interface for type safety
- Environment variables work the same way
*/
if (require.main === module) {
migrationExample().catch(console.error);
}
export { migrationExample, NewDataService };

View file

@ -1,48 +0,0 @@
#!/usr/bin/env bun
// Simple test to verify the API is correctly structured
import { initializeBatchCache, processItems, QueueManager } from './src/index.js';
async function quickTest() {
console.log('🚀 Quick API structure test...');
try {
// Test 1: Check imports
console.log('✅ Imports successful');
console.log('- QueueManager type:', typeof QueueManager);
console.log('- processItems type:', typeof processItems);
console.log('- initializeBatchCache type:', typeof initializeBatchCache);
// Test 2: Check function signatures
const queueManager = new QueueManager({
queueName: 'test-api-structure',
});
console.log('✅ QueueManager created');
// Verify the processItems function signature
const items = [1, 2, 3];
const options = {
totalDelayHours: 0.0003, // ~1 second
useBatching: false,
provider: 'test',
operation: 'test',
};
// This should not throw a type error
console.log('✅ processItems signature is correct (no type errors)');
console.log('- Items:', items);
console.log('- Options:', options);
console.log('🎯 API structure test completed successfully!');
console.log('📋 Summary:');
console.log(' - Security vulnerability eliminated (no function serialization)');
console.log(' - Redundant processSymbols function removed');
console.log(' - API simplified to: processItems(items, queue, options)');
console.log(' - Items are passed directly as payloads');
console.log('🏆 Queue library is ready for production use!');
} catch (error) {
console.error('❌ Test failed:', error);
}
}
quickTest();