200 lines
5.7 KiB
TypeScript
200 lines
5.7 KiB
TypeScript
import { initializeBatchCache, processItems, QueueManager } from '@stock-bot/queue';
|
|
|
|
async function batchProcessingExample() {
|
|
console.log('=== Batch Processing Example ===');
|
|
|
|
// Initialize queue manager
|
|
const queueManager = new QueueManager({
|
|
queueName: 'batch-example-queue',
|
|
workers: 2,
|
|
concurrency: 5,
|
|
});
|
|
|
|
// Register data processing provider
|
|
queueManager.registerProvider('data-processor', {
|
|
'process-item': async payload => {
|
|
console.log(`Processing item: ${JSON.stringify(payload)}`);
|
|
// Simulate processing time
|
|
await new Promise(resolve => setTimeout(resolve, 200));
|
|
return { processed: true, originalData: payload };
|
|
},
|
|
|
|
'analyze-symbol': async payload => {
|
|
// payload is now the raw symbol string
|
|
console.log(`Analyzing symbol: ${payload}`);
|
|
// Simulate analysis
|
|
await new Promise(resolve => setTimeout(resolve, 150));
|
|
return {
|
|
symbol: payload,
|
|
analysis: {
|
|
trend: Math.random() > 0.5 ? 'up' : 'down',
|
|
confidence: Math.random(),
|
|
timestamp: new Date().toISOString(),
|
|
},
|
|
};
|
|
},
|
|
});
|
|
|
|
await queueManager.initialize();
|
|
await initializeBatchCache(queueManager);
|
|
|
|
// Example 1: Direct processing (each item = separate job)
|
|
console.log('\n--- Direct Processing Example ---');
|
|
const directResult = await processItems(
|
|
[1, 2, 3, 4, 5], // Just pass the array directly!
|
|
queueManager,
|
|
{
|
|
totalDelayHours: 0.0042, // 15 seconds
|
|
useBatching: false, // Direct mode
|
|
priority: 2,
|
|
provider: 'data-processor',
|
|
operation: 'process-item',
|
|
}
|
|
);
|
|
|
|
console.log('Direct processing result:', directResult);
|
|
|
|
// Example 2: Batch processing (groups of items)
|
|
console.log('\n--- Batch Processing Example ---');
|
|
const batchData = Array.from({ length: 25 }, (_, i) => ({
|
|
id: i + 1,
|
|
value: Math.random() * 100,
|
|
category: i % 3 === 0 ? 'A' : i % 3 === 1 ? 'B' : 'C',
|
|
}));
|
|
|
|
const batchResult = await processItems(batchData, queueManager, {
|
|
totalDelayHours: 0.0056, // 20 seconds
|
|
useBatching: true, // Batch mode
|
|
batchSize: 5, // 5 items per batch
|
|
priority: 1,
|
|
provider: 'data-processor',
|
|
operation: 'process-item',
|
|
});
|
|
|
|
console.log('Batch processing result:', batchResult);
|
|
|
|
// Example 3: Symbol processing (using processItems)
|
|
console.log('\n--- Symbol Processing Example ---');
|
|
const symbols = ['AAPL', 'GOOGL', 'MSFT', 'TSLA', 'AMZN', 'META', 'NFLX'];
|
|
|
|
const symbolResult = await processItems(symbols, queueManager, {
|
|
operation: 'analyze-symbol',
|
|
provider: 'data-processor',
|
|
totalDelayHours: 0.0069, // 25 seconds
|
|
useBatching: true,
|
|
batchSize: 3,
|
|
priority: 1,
|
|
});
|
|
|
|
console.log('Symbol processing result:', symbolResult);
|
|
|
|
// Example 4: Large dataset with optimal batching
|
|
console.log('\n--- Large Dataset Example ---');
|
|
const largeDataset = Array.from({ length: 1000 }, (_, i) => ({
|
|
id: i + 1,
|
|
data: `item-${i + 1}`,
|
|
random: Math.random(),
|
|
}));
|
|
|
|
const largeResult = await processItems(largeDataset, queueManager, {
|
|
totalDelayHours: 0.0167, // 1 minute
|
|
useBatching: true,
|
|
batchSize: 50, // 50 items per batch
|
|
priority: 3,
|
|
provider: 'data-processor',
|
|
operation: 'process-item',
|
|
retries: 2,
|
|
removeOnComplete: 5,
|
|
removeOnFail: 10,
|
|
});
|
|
|
|
console.log('Large dataset result:', largeResult);
|
|
|
|
// Monitor queue progress
|
|
console.log('\n--- Monitoring Queue ---');
|
|
const monitorInterval = setInterval(async () => {
|
|
const stats = await queueManager.getStats();
|
|
console.log('Queue stats:', {
|
|
waiting: stats.waiting,
|
|
active: stats.active,
|
|
completed: stats.completed,
|
|
failed: stats.failed,
|
|
});
|
|
|
|
// Stop monitoring when queue is mostly empty
|
|
if (stats.waiting === 0 && stats.active === 0) {
|
|
clearInterval(monitorInterval);
|
|
console.log('Queue processing complete!');
|
|
|
|
setTimeout(async () => {
|
|
await queueManager.shutdown();
|
|
console.log('Shutdown complete');
|
|
}, 2000);
|
|
}
|
|
}, 5000);
|
|
}
|
|
|
|
// Utility function to compare processing modes
|
|
async function compareProcessingModes() {
|
|
console.log('\n=== Processing Mode Comparison ===');
|
|
|
|
const queueManager = new QueueManager({
|
|
queueName: 'comparison-queue',
|
|
workers: 2,
|
|
concurrency: 10,
|
|
});
|
|
|
|
queueManager.registerProvider('test', {
|
|
process: async payload => {
|
|
await new Promise(resolve => setTimeout(resolve, 100));
|
|
return { processed: true, originalData: payload };
|
|
},
|
|
});
|
|
|
|
await queueManager.initialize();
|
|
await initializeBatchCache(queueManager);
|
|
|
|
const testData = Array.from({ length: 20 }, (_, i) => ({ id: i + 1 }));
|
|
|
|
// Test direct mode
|
|
console.log('Testing direct mode...');
|
|
const directStart = Date.now();
|
|
const directResult = await processItems(testData, queueManager, {
|
|
totalDelayHours: 0.0028, // 10 seconds
|
|
useBatching: false,
|
|
provider: 'test',
|
|
operation: 'process',
|
|
});
|
|
console.log('Direct mode:', {
|
|
...directResult,
|
|
actualDuration: Date.now() - directStart,
|
|
});
|
|
|
|
// Test batch mode
|
|
console.log('Testing batch mode...');
|
|
const batchStart = Date.now();
|
|
const batchResult = await processItems(testData, queueManager, {
|
|
totalDelayHours: 0.0028, // 10 seconds
|
|
useBatching: true,
|
|
batchSize: 5,
|
|
provider: 'test',
|
|
operation: 'process',
|
|
});
|
|
console.log('Batch mode:', {
|
|
...batchResult,
|
|
actualDuration: Date.now() - batchStart,
|
|
});
|
|
|
|
setTimeout(async () => {
|
|
await queueManager.shutdown();
|
|
}, 15000);
|
|
}
|
|
|
|
// Run examples
|
|
if (require.main === module) {
|
|
batchProcessingExample()
|
|
.then(() => compareProcessingModes())
|
|
.catch(console.error);
|
|
}
|
|
|
|
export { batchProcessingExample, compareProcessingModes };
|