| .. | ||
| src | ||
| debug-batch-cleanup.ts | ||
| package.json | ||
| README.md | ||
| test-simplified-api.ts | ||
| tsconfig.json | ||
| turbo.json | ||
@stock-bot/queue
A reusable queue library with batch processing capabilities for the stock-bot project.
Features
- Queue Management: Built on BullMQ with Redis backing
- Batch Processing: Efficient processing of large datasets
- Provider Registry: Pluggable job handler system
- Cache Integration: Uses @stock-bot/cache for payload storage
- TypeScript Support: Full type safety and IntelliSense
- Configurable: Flexible configuration for different environments
Installation
npm install @stock-bot/queue
Quick Start
Basic Queue Setup
import { QueueManager, providerRegistry } from '@stock-bot/queue';
// Initialize queue manager
const queueManager = new QueueManager({
queueName: 'my-service-queue',
workers: 5,
concurrency: 20,
redis: {
host: 'localhost',
port: 6379,
},
});
// Register providers
providerRegistry.register('market-data', {
'fetch-price': async (payload) => {
// Handle price fetching
return { price: 100, symbol: payload.symbol };
},
'update-data': async (payload) => {
// Handle data updates
return { success: true };
},
});
// Initialize
await queueManager.initialize();
Batch Processing
import { processItems, initializeBatchCache } from '@stock-bot/queue';
// Initialize cache first
await initializeBatchCache();
// Process items in batches
const result = await processItems(
['AAPL', 'GOOGL', 'MSFT'],
(symbol, index) => ({ symbol, timestamp: Date.now() }),
queueManager,
{
totalDelayMs: 60000, // 1 minute total
useBatching: true,
batchSize: 100,
priority: 1,
provider: 'market-data',
operation: 'fetch-price',
}
);
console.log(result);
// {
// jobsCreated: 1,
// mode: 'batch',
// totalItems: 3,
// batchesCreated: 1,
// duration: 150
// }
Generic Processing
import { processItems } from '@stock-bot/queue';
const result = await processItems(
['AAPL', 'GOOGL', 'MSFT'],
(symbol, index) => ({
symbol,
index,
timestamp: Date.now(),
}),
queueManager,
{
operation: 'live-data',
provider: 'yahoo',
totalDelayMs: 300000, // 5 minutes
useBatching: false,
priority: 1,
}
);
API Reference
QueueManager
The main queue management class.
Constructor
new QueueManager(config?: QueueConfig)
Methods
initialize(): Initialize the queue and workersregisterProvider(name, config): Register a job provideradd(name, data, options): Add a single jobaddBulk(jobs): Add multiple jobs in bulkgetStats(): Get queue statisticspause(): Pause job processingresume(): Resume job processingclean(grace, limit): Clean completed/failed jobsshutdown(): Shutdown the queue manager
Batch Processing Functions
processItems()
Process items either directly or in batches.
processItems<T>(
items: T[],
processor: (item: T, index: number) => any,
queue: QueueManager,
options: ProcessOptions
): Promise<BatchResult>
processBatchJob()
Process a batch job (used internally by workers).
processBatchJob(
jobData: BatchJobData,
queue: QueueManager
): Promise<any>
Provider Registry
Manage job handlers for different providers.
// Register provider
providerRegistry.register('provider-name', {
'operation-1': async (payload) => { /* handle */ },
'operation-2': async (payload) => { /* handle */ },
});
// Check provider exists
if (providerRegistry.hasProvider('provider-name')) {
// Provider is registered
}
// Get handler
const handler = providerRegistry.getHandler('provider-name', 'operation-1');
Configuration
QueueConfig
interface QueueConfig {
workers?: number; // Number of worker processes
concurrency?: number; // Jobs per worker
redis?: {
host?: string;
port?: number;
password?: string;
db?: number;
};
queueName?: string; // Name for the queue
defaultJobOptions?: {
removeOnComplete?: number;
removeOnFail?: number;
attempts?: number;
backoff?: {
type: string;
delay: number;
};
};
}
ProcessOptions
interface ProcessOptions {
totalDelayMs: number; // Total time to spread jobs over
batchSize?: number; // Items per batch (batch mode)
priority?: number; // Job priority
useBatching?: boolean; // Use batch vs direct mode
retries?: number; // Number of retry attempts
ttl?: number; // Cache TTL for batch payloads
removeOnComplete?: number; // Keep N completed jobs
removeOnFail?: number; // Keep N failed jobs
provider?: string; // Provider name for job routing
operation?: string; // Operation name for job routing
}
Migration from Existing Queue
If you're migrating from an existing queue implementation:
-
Replace imports:
// Before import { QueueService } from '../services/queue.service'; import { processItems } from '../utils/batch-helpers'; // After import { QueueManager, processItems } from '@stock-bot/queue'; -
Update initialization:
// Before const queueService = new QueueService(); await queueService.initialize(); // After const queueManager = new QueueManager(); await queueManager.initialize(); -
Update provider registration:
// Before providerRegistry.register('provider', config); // After queueManager.registerProvider('provider', config);
Examples
See the /examples directory for complete implementation examples:
basic-usage.ts- Basic queue setup and job processingbatch-processing.ts- Advanced batch processing scenariosprovider-setup.ts- Provider registration patternsmigration-example.ts- Migration from existing queue service
Best Practices
-
Initialize cache before batch operations:
await initializeBatchCache(); -
Use appropriate batch sizes:
- Small items: 500-1000 per batch
- Large items: 50-100 per batch
-
Set reasonable delays:
- Spread jobs over time to avoid overwhelming services
- Consider rate limits of external APIs
-
Clean up periodically:
await queueManager.clean(24 * 60 * 60 * 1000); // Clean jobs older than 24h -
Monitor queue stats:
const stats = await queueManager.getStats(); console.log('Queue status:', stats);
Environment Variables
WORKER_COUNT: Number of worker processes (default: 5)WORKER_CONCURRENCY: Jobs per worker (default: 20)DRAGONFLY_HOST: Redis/Dragonfly host (default: localhost)DRAGONFLY_PORT: Redis/Dragonfly port (default: 6379)DRAGONFLY_PASSWORD: Redis/Dragonfly passwordDRAGONFLY_DB: Redis/Dragonfly database number (default: 0)