stock-bot/libs/core/queue
2025-07-06 19:23:11 -04:00
..
src cleaned up stuff 2025-07-06 19:23:11 -04:00
test cleaned up stuff 2025-07-06 19:23:11 -04:00
package.json tests 2025-06-25 10:47:00 -04:00
README.md restructured libs to be more aligned with core components 2025-06-23 19:51:48 -04:00
tsconfig.json huge refactor to remove depenencie hell and add typesafe container 2025-06-24 09:37:51 -04:00
turbo.json restructured libs to be more aligned with core components 2025-06-23 19:51:48 -04:00

@stock-bot/queue

A reusable queue library with batch processing capabilities for the stock-bot project.

Features

  • Queue Management: Built on BullMQ with Redis backing
  • Batch Processing: Efficient processing of large datasets
  • Provider Registry: Pluggable job handler system
  • Cache Integration: Uses @stock-bot/cache for payload storage
  • TypeScript Support: Full type safety and IntelliSense
  • Configurable: Flexible configuration for different environments

Installation

npm install @stock-bot/queue

Quick Start

Basic Queue Setup

import { QueueManager, handlerRegistry } from '@stock-bot/queue';

// Initialize queue manager (typically done via dependency injection)
const queueManager = new QueueManager({
  redis: {
    host: 'localhost',
    port: 6379,
  },
});

// Get or create a queue
const queue = queueManager.getQueue('my-service-queue', {
  workers: 5,
  concurrency: 20,
});

// Register handlers
handlerRegistry.register('market-data', {
  'fetch-price': async (payload) => {
    // Handle price fetching
    return { price: 100, symbol: payload.symbol };
  },
  'update-data': async (payload) => {
    // Handle data updates
    return { success: true };
  },
});

// Queue is ready to use - no initialization needed

Batch Processing

import { processItems, initializeBatchCache } from '@stock-bot/queue';

// Initialize cache first
await initializeBatchCache();

// Process items in batches
const result = await processItems(
  ['AAPL', 'GOOGL', 'MSFT'],
  (symbol, index) => ({ symbol, timestamp: Date.now() }),
  queueManager,
  {
    totalDelayMs: 60000, // 1 minute total
    useBatching: true,
    batchSize: 100,
    priority: 1,
    provider: 'market-data',
    operation: 'fetch-price',
  }
);

console.log(result);
// {
//   jobsCreated: 1,
//   mode: 'batch',
//   totalItems: 3,
//   batchesCreated: 1,
//   duration: 150
// }

Generic Processing

import { processItems } from '@stock-bot/queue';

const result = await processItems(
  ['AAPL', 'GOOGL', 'MSFT'],
  (symbol, index) => ({
    symbol,
    index,
    timestamp: Date.now(),
  }),
  queueManager,
  {
    operation: 'live-data',
    provider: 'yahoo',
    totalDelayMs: 300000, // 5 minutes
    useBatching: false,
    priority: 1,
  }
);

API Reference

QueueManager

The main queue management class.

Constructor

new QueueManager(config?: QueueConfig)

Methods

  • initialize(): Initialize the queue and workers
  • registerProvider(name, config): Register a job provider
  • add(name, data, options): Add a single job
  • addBulk(jobs): Add multiple jobs in bulk
  • getStats(): Get queue statistics
  • pause(): Pause job processing
  • resume(): Resume job processing
  • clean(grace, limit): Clean completed/failed jobs
  • shutdown(): Shutdown the queue manager

Batch Processing Functions

processItems()

Process items either directly or in batches.

processItems<T>(
  items: T[],
  processor: (item: T, index: number) => any,
  queue: QueueManager,
  options: ProcessOptions
): Promise<BatchResult>

processBatchJob()

Process a batch job (used internally by workers).

processBatchJob(
  jobData: BatchJobData,
  queue: QueueManager
): Promise<any>

Provider Registry

Manage job handlers for different providers.

// Register provider
providerRegistry.register('provider-name', {
  'operation-1': async (payload) => { /* handle */ },
  'operation-2': async (payload) => { /* handle */ },
});

// Check provider exists
if (providerRegistry.hasProvider('provider-name')) {
  // Provider is registered
}

// Get handler
const handler = providerRegistry.getHandler('provider-name', 'operation-1');

Configuration

QueueConfig

interface QueueConfig {
  workers?: number;           // Number of worker processes
  concurrency?: number;       // Jobs per worker
  redis?: {
    host?: string;
    port?: number;
    password?: string;
    db?: number;
  };
  queueName?: string;         // Name for the queue
  defaultJobOptions?: {
    removeOnComplete?: number;
    removeOnFail?: number;
    attempts?: number;
    backoff?: {
      type: string;
      delay: number;
    };
  };
}

ProcessOptions

interface ProcessOptions {
  totalDelayMs: number;       // Total time to spread jobs over
  batchSize?: number;         // Items per batch (batch mode)
  priority?: number;          // Job priority
  useBatching?: boolean;      // Use batch vs direct mode
  retries?: number;           // Number of retry attempts
  ttl?: number;               // Cache TTL for batch payloads
  removeOnComplete?: number;  // Keep N completed jobs
  removeOnFail?: number;      // Keep N failed jobs
  provider?: string;          // Provider name for job routing
  operation?: string;         // Operation name for job routing
}

Migration from Existing Queue

If you're migrating from an existing queue implementation:

  1. Replace imports:

    // Before
    import { QueueService } from '../services/queue.service';
    import { processItems } from '../utils/batch-helpers';
    
    // After
    import { QueueManager, processItems } from '@stock-bot/queue';
    
  2. Update initialization:

    // Before
    const queueService = new QueueService();
    await queueService.initialize();
    
    // After
    const queueManager = new QueueManager({
      redis: { host: 'localhost', port: 6379 }
    });
    // No initialization needed
    
  3. Update provider registration:

    // Before
    providerRegistry.register('provider', config);
    
    // After
    handlerRegistry.register('provider', config);
    

Examples

See the /examples directory for complete implementation examples:

  • basic-usage.ts - Basic queue setup and job processing
  • batch-processing.ts - Advanced batch processing scenarios
  • provider-setup.ts - Provider registration patterns
  • migration-example.ts - Migration from existing queue service

Best Practices

  1. Initialize cache before batch operations:

    await initializeBatchCache();
    
  2. Use appropriate batch sizes:

    • Small items: 500-1000 per batch
    • Large items: 50-100 per batch
  3. Set reasonable delays:

    • Spread jobs over time to avoid overwhelming services
    • Consider rate limits of external APIs
  4. Clean up periodically:

    const queue = queueManager.getQueue('my-queue');
    await queue.clean(24 * 60 * 60 * 1000); // Clean jobs older than 24h
    
  5. Monitor queue stats:

    const queue = queueManager.getQueue('my-queue');
    const stats = await queue.getStats();
    console.log('Queue status:', stats);
    

Environment Variables

  • WORKER_COUNT: Number of worker processes (default: 5)
  • WORKER_CONCURRENCY: Jobs per worker (default: 20)
  • DRAGONFLY_HOST: Redis/Dragonfly host (default: localhost)
  • DRAGONFLY_PORT: Redis/Dragonfly port (default: 6379)
  • DRAGONFLY_PASSWORD: Redis/Dragonfly password
  • DRAGONFLY_DB: Redis/Dragonfly database number (default: 0)