added routes and simplified batch processor

This commit is contained in:
Boki 2025-06-10 20:59:53 -04:00
parent 0357908b69
commit 4883daa3e2
12 changed files with 1130 additions and 238 deletions

View file

@ -0,0 +1,117 @@
/**
* Example usage of the new functional batch processing approach
*/
import { processItems, processSymbols, processProxies, processBatchJob } from '../utils/batch-helpers';
import { queueManager } from '../services/queue.service';
// Example 1: Process a list of symbols for live data
export async function exampleSymbolProcessing() {
const symbols = ['AAPL', 'GOOGL', 'MSFT', 'TSLA', 'AMZN'];
const result = await processSymbols(symbols, queueManager, {
operation: 'live-data',
service: 'market-data',
provider: 'yahoo',
totalDelayMs: 60000, // 1 minute total
useBatching: false, // Process directly
priority: 1
});
console.log('Symbol processing result:', result);
// Output: { jobsCreated: 5, mode: 'direct', totalItems: 5, duration: 1234 }
}
// Example 2: Process proxies in batches
export async function exampleProxyProcessing() {
const proxies = [
{ host: '1.1.1.1', port: 8080 },
{ host: '2.2.2.2', port: 3128 },
// ... more proxies
];
const result = await processProxies(proxies, queueManager, {
totalDelayMs: 3600000, // 1 hour total
useBatching: true, // Use batch mode
batchSize: 100, // 100 proxies per batch
priority: 2
});
console.log('Proxy processing result:', result);
// Output: { jobsCreated: 10, mode: 'batch', totalItems: 1000, batchesCreated: 10, duration: 2345 }
}
// Example 3: Custom processing with generic function
export async function exampleCustomProcessing() {
const customData = [
{ id: 1, name: 'Item 1' },
{ id: 2, name: 'Item 2' },
{ id: 3, name: 'Item 3' }
];
const result = await processItems(
customData,
(item, index) => ({
// Transform each item for processing
itemId: item.id,
itemName: item.name,
processIndex: index,
timestamp: new Date().toISOString()
}),
queueManager,
{
totalDelayMs: 30000, // 30 seconds total
useBatching: false, // Direct processing
priority: 1,
retries: 3
}
);
console.log('Custom processing result:', result);
}
// Example 4: Batch job processor (used by workers)
export async function exampleBatchJobProcessor(jobData: any) {
// This would be called by a BullMQ worker when processing batch jobs
const result = await processBatchJob(jobData, queueManager);
console.log('Batch job processed:', result);
// Output: { batchIndex: 0, itemsProcessed: 100, jobsCreated: 100 }
return result;
}
// Comparison: Old vs New approach
// OLD COMPLEX WAY:
/*
const batchProcessor = new BatchProcessor(queueManager);
await batchProcessor.initialize();
await batchProcessor.processItems({
items: symbols,
batchSize: 200,
totalDelayMs: 3600000,
jobNamePrefix: 'yahoo-live',
operation: 'live-data',
service: 'data-service',
provider: 'yahoo',
priority: 2,
createJobData: (symbol, index) => ({ symbol }),
useBatching: true,
removeOnComplete: 5,
removeOnFail: 3
});
*/
// NEW SIMPLE WAY:
/*
await processSymbols(symbols, queueManager, {
operation: 'live-data',
service: 'data-service',
provider: 'yahoo',
totalDelayMs: 3600000,
useBatching: true,
batchSize: 200,
priority: 2
});
*/

View file

@ -4,8 +4,15 @@
import { getLogger } from '@stock-bot/logger';
import { loadEnvVariables } from '@stock-bot/config';
import { Hono } from 'hono';
import { serve } from '@hono/node-server';
import { onShutdown, setShutdownTimeout } from '@stock-bot/shutdown';
import { queueManager } from './services/queue.service';
import {
healthRoutes,
queueRoutes,
marketDataRoutes,
proxyRoutes,
testRoutes
} from './routes';
// Load environment variables
loadEnvVariables();
@ -14,194 +21,13 @@ const app = new Hono();
const logger = getLogger('data-service');
const PORT = parseInt(process.env.DATA_SERVICE_PORT || '3002');
// Health check endpoint
app.get('/health', (c) => {
return c.json({
service: 'data-service',
status: 'healthy',
timestamp: new Date().toISOString(),
queue: {
status: 'running',
workers: queueManager.getWorkerCount()
}
});
});
// Queue management endpoints
app.get('/api/queue/status', async (c) => {
try {
const status = await queueManager.getQueueStatus();
return c.json({ status: 'success', data: status });
} catch (error) {
logger.error('Failed to get queue status', { error });
return c.json({ status: 'error', message: 'Failed to get queue status' }, 500);
}
});
app.post('/api/queue/job', async (c) => {
try {
const jobData = await c.req.json();
const job = await queueManager.addJob(jobData);
return c.json({ status: 'success', jobId: job.id });
} catch (error) {
logger.error('Failed to add job', { error });
return c.json({ status: 'error', message: 'Failed to add job' }, 500);
}
});
// Market data endpoints
app.get('/api/live/:symbol', async (c) => {
const symbol = c.req.param('symbol');
logger.info('Live data request', { symbol });
try { // Queue job for live data using Yahoo provider
const job = await queueManager.addJob({
type: 'market-data-live',
service: 'market-data',
provider: 'yahoo-finance',
operation: 'live-data',
payload: { symbol }
});
return c.json({
status: 'success',
message: 'Live data job queued',
jobId: job.id,
symbol
});
} catch (error) {
logger.error('Failed to queue live data job', { symbol, error });
return c.json({ status: 'error', message: 'Failed to queue live data job' }, 500);
}
});
app.get('/api/historical/:symbol', async (c) => {
const symbol = c.req.param('symbol');
const from = c.req.query('from');
const to = c.req.query('to');
logger.info('Historical data request', { symbol, from, to });
try {
const fromDate = from ? new Date(from) : new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); // 30 days ago
const toDate = to ? new Date(to) : new Date(); // Now
// Queue job for historical data using Yahoo provider
const job = await queueManager.addJob({
type: 'market-data-historical',
service: 'market-data',
provider: 'yahoo-finance',
operation: 'historical-data',
payload: {
symbol,
from: fromDate.toISOString(),
to: toDate.toISOString()
}
}); return c.json({
status: 'success',
message: 'Historical data job queued',
jobId: job.id,
symbol,
from: fromDate,
to: toDate
});
} catch (error) {
logger.error('Failed to queue historical data job', { symbol, from, to, error });
return c.json({ status: 'error', message: 'Failed to queue historical data job' }, 500); }
});
// Proxy management endpoints
app.post('/api/proxy/fetch', async (c) => {
try {
const job = await queueManager.addJob({
type: 'proxy-fetch',
service: 'proxy',
provider: 'proxy-service',
operation: 'fetch-and-check',
payload: {},
priority: 5
});
return c.json({
status: 'success',
jobId: job.id,
message: 'Proxy fetch job queued'
});
} catch (error) {
logger.error('Failed to queue proxy fetch', { error });
return c.json({ status: 'error', message: 'Failed to queue proxy fetch' }, 500);
}
});
app.post('/api/proxy/check', async (c) => {
try {
const { proxies } = await c.req.json();
const job = await queueManager.addJob({
type: 'proxy-check',
service: 'proxy',
provider: 'proxy-service',
operation: 'check-specific',
payload: { proxies },
priority: 8
});
return c.json({
status: 'success',
jobId: job.id,
message: `Proxy check job queued for ${proxies.length} proxies`
});
} catch (error) {
logger.error('Failed to queue proxy check', { error });
return c.json({ status: 'error', message: 'Failed to queue proxy check' }, 500);
}
});
// Get proxy stats via queue
app.get('/api/proxy/stats', async (c) => {
try {
const job = await queueManager.addJob({
type: 'proxy-stats',
service: 'proxy',
provider: 'proxy-service',
operation: 'get-stats',
payload: {},
priority: 3
});
return c.json({
status: 'success',
jobId: job.id,
message: 'Proxy stats job queued'
});
} catch (error) {
logger.error('Failed to queue proxy stats', { error });
return c.json({ status: 'error', message: 'Failed to queue proxy stats' }, 500);
}
});
// Provider registry endpoints
app.get('/api/providers', async (c) => {
try {
const providers = queueManager.getRegisteredProviders();
return c.json({ status: 'success', providers });
} catch (error) {
logger.error('Failed to get providers', { error });
return c.json({ status: 'error', message: 'Failed to get providers' }, 500);
}
});
// Add new endpoint to see scheduled jobs
app.get('/api/scheduled-jobs', async (c) => {
try {
const jobs = queueManager.getScheduledJobsInfo();
return c.json({
status: 'success',
count: jobs.length,
jobs
});
} catch (error) {
logger.error('Failed to get scheduled jobs info', { error });
return c.json({ status: 'error', message: 'Failed to get scheduled jobs' }, 500);
}
});
// Register all routes
app.route('', healthRoutes);
app.route('', queueRoutes);
app.route('', marketDataRoutes);
app.route('', proxyRoutes);
app.route('', testRoutes);
// Initialize services
async function initializeServices() {
@ -221,22 +47,45 @@ async function initializeServices() {
}
// Start server
let server: any = null;
async function startServer() {
await initializeServices();
// Start the HTTP server using Bun's native serve
server = Bun.serve({
port: PORT,
fetch: app.fetch,
development: process.env.NODE_ENV === 'development',
});
logger.info(`Data Service started on port ${PORT}`);
// Register shutdown callbacks
setupShutdownHandlers();
}
// Graceful shutdown
process.on('SIGINT', async () => {
logger.info('Received SIGINT, shutting down gracefully...');
await queueManager.shutdown();
process.exit(0);
});
process.on('SIGTERM', async () => {
logger.info('Received SIGTERM, shutting down gracefully...');
await queueManager.shutdown();
process.exit(0);
});
// Setup shutdown handlers using the shutdown library
function setupShutdownHandlers() {
// Set shutdown timeout to 15 seconds
setShutdownTimeout(15000);
// Register cleanup for HTTP server
onShutdown(async () => {
if (server) {
logger.info('Stopping HTTP server...');
server.stop();
}
});
// Register cleanup for queue manager
onShutdown(async () => {
logger.info('Shutting down queue manager...');
await queueManager.shutdown();
});
logger.info('Shutdown handlers registered');
}
startServer().catch(error => {
logger.error('Failed to start server', { error });

View file

@ -1,7 +1,6 @@
import { ProxyInfo } from 'libs/http/src/types';
import { ProviderConfig } from '../services/provider-registry.service';
import { getLogger } from '@stock-bot/logger';
import { BatchProcessor } from '../utils/batch-processor';
// Create logger for this provider
const logger = getLogger('proxy-provider');
@ -17,10 +16,10 @@ const getEvery24HourCron = (): string => {
export const proxyProvider: ProviderConfig = {
name: 'proxy-service',
service: 'proxy',
operations: {
'fetch-and-check': async (payload: { sources?: string[] }) => {
operations: { 'fetch-and-check': async (payload: { sources?: string[] }) => {
const { proxyService } = await import('./proxy.tasks');
const { queueManager } = await import('../services/queue.service');
const { processProxies } = await import('../utils/batch-helpers');
const proxies = await proxyService.fetchProxiesFromSources();
@ -28,44 +27,25 @@ export const proxyProvider: ProviderConfig = {
return { proxiesFetched: 0, jobsCreated: 0 };
}
const batchProcessor = new BatchProcessor(queueManager);
// Simplified configuration
const result = await batchProcessor.processItems({
items: proxies,
// Use simplified functional approach
const result = await processProxies(proxies, queueManager, {
totalDelayMs: parseInt(process.env.PROXY_VALIDATION_HOURS || '4') * 60 * 60 * 1000,
batchSize: parseInt(process.env.PROXY_BATCH_SIZE || '200'),
totalDelayMs: parseInt(process.env.PROXY_VALIDATION_HOURS || '4') * 60 * 60 * 1000 ,
jobNamePrefix: 'proxy',
operation: 'check-proxy',
service: 'proxy',
provider: 'proxy-service',
priority: 2,
useBatching: process.env.PROXY_DIRECT_MODE !== 'true', // Simple boolean flag
createJobData: (proxy: ProxyInfo) => ({
proxy,
source: 'fetch-and-check'
}),
removeOnComplete: 5,
removeOnFail: 3
});
return {
useBatching: process.env.PROXY_DIRECT_MODE !== 'true',
priority: 2
}); return {
proxiesFetched: result.totalItems,
...result
jobsCreated: result.jobsCreated,
mode: result.mode,
batchesCreated: result.batchesCreated,
processingTimeMs: result.duration
};
},
'process-proxy-batch': async (payload: any) => {
// Process a batch of proxies - uses the fetch-and-check JobNamePrefix process-(proxy)-batch
}, 'process-proxy-batch': async (payload: any) => {
// Process a batch using the simplified batch helpers
const { processBatchJob } = await import('../utils/batch-helpers');
const { queueManager } = await import('../services/queue.service');
const batchProcessor = new BatchProcessor(queueManager);
return await batchProcessor.processBatch(
payload,
(proxy: ProxyInfo) => ({
proxy,
source: payload.config?.source || 'batch-processing'
})
);
return await processBatchJob(payload, queueManager);
},
'check-proxy': async (payload: {

View file

@ -0,0 +1,20 @@
/**
* Health check routes
*/
import { Hono } from 'hono';
import { queueManager } from '../services/queue.service';
export const healthRoutes = new Hono();
// Health check endpoint
healthRoutes.get('/health', (c) => {
return c.json({
service: 'data-service',
status: 'healthy',
timestamp: new Date().toISOString(),
queue: {
status: 'running',
workers: queueManager.getWorkerCount()
}
});
});

View file

@ -0,0 +1,8 @@
/**
* Routes index - exports all route modules
*/
export { healthRoutes } from './health.routes';
export { queueRoutes } from './queue.routes';
export { marketDataRoutes } from './market-data.routes';
export { proxyRoutes } from './proxy.routes';
export { testRoutes } from './test.routes';

View file

@ -0,0 +1,74 @@
/**
* Market data routes
*/
import { Hono } from 'hono';
import { getLogger } from '@stock-bot/logger';
import { queueManager } from '../services/queue.service';
const logger = getLogger('market-data-routes');
export const marketDataRoutes = new Hono();
// Market data endpoints
marketDataRoutes.get('/api/live/:symbol', async (c) => {
const symbol = c.req.param('symbol');
logger.info('Live data request', { symbol });
try {
// Queue job for live data using Yahoo provider
const job = await queueManager.addJob({
type: 'market-data-live',
service: 'market-data',
provider: 'yahoo-finance',
operation: 'live-data',
payload: { symbol }
});
return c.json({
status: 'success',
message: 'Live data job queued',
jobId: job.id,
symbol
});
} catch (error) {
logger.error('Failed to queue live data job', { symbol, error });
return c.json({ status: 'error', message: 'Failed to queue live data job' }, 500);
}
});
marketDataRoutes.get('/api/historical/:symbol', async (c) => {
const symbol = c.req.param('symbol');
const from = c.req.query('from');
const to = c.req.query('to');
logger.info('Historical data request', { symbol, from, to });
try {
const fromDate = from ? new Date(from) : new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); // 30 days ago
const toDate = to ? new Date(to) : new Date(); // Now
// Queue job for historical data using Yahoo provider
const job = await queueManager.addJob({
type: 'market-data-historical',
service: 'market-data',
provider: 'yahoo-finance',
operation: 'historical-data',
payload: {
symbol,
from: fromDate.toISOString(),
to: toDate.toISOString()
}
});
return c.json({
status: 'success',
message: 'Historical data job queued',
jobId: job.id,
symbol,
from: fromDate,
to: toDate
});
} catch (error) {
logger.error('Failed to queue historical data job', { symbol, from, to, error });
return c.json({ status: 'error', message: 'Failed to queue historical data job' }, 500);
}
});

View file

@ -0,0 +1,79 @@
/**
* Proxy management routes
*/
import { Hono } from 'hono';
import { getLogger } from '@stock-bot/logger';
import { queueManager } from '../services/queue.service';
const logger = getLogger('proxy-routes');
export const proxyRoutes = new Hono();
// Proxy management endpoints
proxyRoutes.post('/api/proxy/fetch', async (c) => {
try {
const job = await queueManager.addJob({
type: 'proxy-fetch',
service: 'proxy',
provider: 'proxy-service',
operation: 'fetch-and-check',
payload: {},
priority: 5
});
return c.json({
status: 'success',
jobId: job.id,
message: 'Proxy fetch job queued'
});
} catch (error) {
logger.error('Failed to queue proxy fetch', { error });
return c.json({ status: 'error', message: 'Failed to queue proxy fetch' }, 500);
}
});
proxyRoutes.post('/api/proxy/check', async (c) => {
try {
const { proxies } = await c.req.json();
const job = await queueManager.addJob({
type: 'proxy-check',
service: 'proxy',
provider: 'proxy-service',
operation: 'check-specific',
payload: { proxies },
priority: 8
});
return c.json({
status: 'success',
jobId: job.id,
message: `Proxy check job queued for ${proxies.length} proxies`
});
} catch (error) {
logger.error('Failed to queue proxy check', { error });
return c.json({ status: 'error', message: 'Failed to queue proxy check' }, 500);
}
});
// Get proxy stats via queue
proxyRoutes.get('/api/proxy/stats', async (c) => {
try {
const job = await queueManager.addJob({
type: 'proxy-stats',
service: 'proxy',
provider: 'proxy-service',
operation: 'get-stats',
payload: {},
priority: 3
});
return c.json({
status: 'success',
jobId: job.id,
message: 'Proxy stats job queued'
});
} catch (error) {
logger.error('Failed to queue proxy stats', { error });
return c.json({ status: 'error', message: 'Failed to queue proxy stats' }, 500);
}
});

View file

@ -0,0 +1,58 @@
/**
* Queue management routes
*/
import { Hono } from 'hono';
import { getLogger } from '@stock-bot/logger';
import { queueManager } from '../services/queue.service';
const logger = getLogger('queue-routes');
export const queueRoutes = new Hono();
// Queue management endpoints
queueRoutes.get('/api/queue/status', async (c) => {
try {
const status = await queueManager.getQueueStatus();
return c.json({ status: 'success', data: status });
} catch (error) {
logger.error('Failed to get queue status', { error });
return c.json({ status: 'error', message: 'Failed to get queue status' }, 500);
}
});
queueRoutes.post('/api/queue/job', async (c) => {
try {
const jobData = await c.req.json();
const job = await queueManager.addJob(jobData);
return c.json({ status: 'success', jobId: job.id });
} catch (error) {
logger.error('Failed to add job', { error });
return c.json({ status: 'error', message: 'Failed to add job' }, 500);
}
});
// Provider registry endpoints
queueRoutes.get('/api/providers', async (c) => {
try {
const providers = queueManager.getRegisteredProviders();
return c.json({ status: 'success', providers });
} catch (error) {
logger.error('Failed to get providers', { error });
return c.json({ status: 'error', message: 'Failed to get providers' }, 500);
}
});
// Add new endpoint to see scheduled jobs
queueRoutes.get('/api/scheduled-jobs', async (c) => {
try {
const jobs = queueManager.getScheduledJobsInfo();
return c.json({
status: 'success',
count: jobs.length,
jobs
});
} catch (error) {
logger.error('Failed to get scheduled jobs info', { error });
return c.json({ status: 'error', message: 'Failed to get scheduled jobs' }, 500);
}
});

View file

@ -0,0 +1,77 @@
/**
* Test and development routes for batch processing
*/
import { Hono } from 'hono';
import { getLogger } from '@stock-bot/logger';
import { queueManager } from '../services/queue.service';
const logger = getLogger('test-routes');
export const testRoutes = new Hono();
// Test endpoint for new functional batch processing
testRoutes.post('/api/test/batch-symbols', async (c) => {
try {
const { symbols, useBatching = false, totalDelayMs = 60000 } = await c.req.json();
const { processSymbols } = await import('../utils/batch-helpers');
if (!symbols || !Array.isArray(symbols)) {
return c.json({ status: 'error', message: 'symbols array is required' }, 400);
}
const result = await processSymbols(symbols, queueManager, {
operation: 'live-data',
service: 'test',
provider: 'test-provider',
totalDelayMs,
useBatching,
batchSize: 10,
priority: 1
});
return c.json({
status: 'success',
message: 'Batch processing started',
result
});
} catch (error) {
logger.error('Failed to start batch symbol processing', { error });
return c.json({ status: 'error', message: 'Failed to start batch processing' }, 500);
}
});
testRoutes.post('/api/test/batch-custom', async (c) => {
try {
const { items, useBatching = false, totalDelayMs = 30000 } = await c.req.json();
const { processItems } = await import('../utils/batch-helpers');
if (!items || !Array.isArray(items)) {
return c.json({ status: 'error', message: 'items array is required' }, 400);
}
const result = await processItems(
items,
(item, index) => ({
originalItem: item,
processIndex: index,
timestamp: new Date().toISOString()
}),
queueManager,
{
totalDelayMs,
useBatching,
batchSize: 5,
priority: 1
}
);
return c.json({
status: 'success',
message: 'Custom batch processing started',
result
});
} catch (error) {
logger.error('Failed to start custom batch processing', { error });
return c.json({ status: 'error', message: 'Failed to start custom batch processing' }, 500);
}
});

View file

@ -136,7 +136,6 @@ export class QueueService {
throw error;
}
}
private async processJob(job: any) {
const { service, provider, operation, payload }: JobData = job.data;
@ -149,6 +148,12 @@ export class QueueService {
});
try {
// Handle special batch processing jobs
if (operation === 'process-batch-items') {
const { processBatchJob } = await import('../utils/batch-helpers');
return await processBatchJob(payload, this);
}
// Get handler from registry
const handler = providerRegistry.getHandler(service, provider, operation);

View file

@ -0,0 +1,389 @@
import { getLogger } from '@stock-bot/logger';
import { createCache, CacheProvider } from '@stock-bot/cache';
import type { QueueService } from '../services/queue.service';
const logger = getLogger('batch-helpers');
// Simple interfaces
export interface ProcessOptions {
totalDelayMs: number;
batchSize?: number;
priority?: number;
useBatching?: boolean;
retries?: number;
ttl?: number;
removeOnComplete?: number;
removeOnFail?: number;
}
export interface BatchResult {
jobsCreated: number;
mode: 'direct' | 'batch';
totalItems: number;
batchesCreated?: number;
duration: number;
}
// Cache instance for payload storage
let cacheProvider: CacheProvider | null = null;
function getCache(): CacheProvider {
if (!cacheProvider) {
cacheProvider = createCache({
keyPrefix: 'batch:',
ttl: 86400, // 24 hours default
enableMetrics: true
});
}
return cacheProvider;
}
/**
* Main function - processes items either directly or in batches
*/
export async function processItems<T>(
items: T[],
processor: (item: T, index: number) => any,
queue: QueueService,
options: ProcessOptions
): Promise<BatchResult> {
const startTime = Date.now();
if (items.length === 0) {
return {
jobsCreated: 0,
mode: 'direct',
totalItems: 0,
duration: 0
};
}
logger.info('Starting batch processing', {
totalItems: items.length,
mode: options.useBatching ? 'batch' : 'direct',
batchSize: options.batchSize,
totalDelayHours: (options.totalDelayMs / 1000 / 60 / 60).toFixed(1)
});
try {
const result = options.useBatching
? await processBatched(items, processor, queue, options)
: await processDirect(items, processor, queue, options);
const duration = Date.now() - startTime;
logger.info('Batch processing completed', {
...result,
duration: `${(duration / 1000).toFixed(1)}s`
});
return { ...result, duration };
} catch (error) {
logger.error('Batch processing failed', error);
throw error;
}
}
/**
* Process items directly - each item becomes a separate job
*/
async function processDirect<T>(
items: T[],
processor: (item: T, index: number) => any,
queue: QueueService,
options: ProcessOptions
): Promise<Omit<BatchResult, 'duration'>> {
const delayPerItem = Math.floor(options.totalDelayMs / items.length);
logger.info('Creating direct jobs', {
totalItems: items.length,
delayPerItem: `${(delayPerItem / 1000).toFixed(1)}s`
});
const jobs = items.map((item, index) => ({
name: 'process-item',
data: {
type: 'process-item',
service: 'batch-processor',
provider: 'direct',
operation: 'process-single-item',
payload: processor(item, index),
priority: options.priority || 1
},
opts: {
delay: index * delayPerItem,
priority: options.priority || 1,
attempts: options.retries || 3,
removeOnComplete: options.removeOnComplete || 10,
removeOnFail: options.removeOnFail || 5
}
}));
const createdJobs = await addJobsInChunks(queue, jobs);
return {
totalItems: items.length,
jobsCreated: createdJobs.length,
mode: 'direct'
};
}
/**
* Process items in batches - groups of items are stored and processed together
*/
async function processBatched<T>(
items: T[],
processor: (item: T, index: number) => any,
queue: QueueService,
options: ProcessOptions
): Promise<Omit<BatchResult, 'duration'>> {
const batchSize = options.batchSize || 100;
const batches = createBatches(items, batchSize);
const delayPerBatch = Math.floor(options.totalDelayMs / batches.length);
logger.info('Creating batch jobs', {
totalItems: items.length,
batchSize,
totalBatches: batches.length,
delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes`
});
const batchJobs = await Promise.all(
batches.map(async (batch, batchIndex) => {
const payloadKey = await storePayload(batch, processor, options);
return {
name: 'process-batch',
data: {
type: 'process-batch',
service: 'batch-processor',
provider: 'batch',
operation: 'process-batch-items',
payload: {
payloadKey,
batchIndex,
totalBatches: batches.length,
itemCount: batch.length
},
priority: options.priority || 2
},
opts: {
delay: batchIndex * delayPerBatch,
priority: options.priority || 2,
attempts: options.retries || 3,
removeOnComplete: options.removeOnComplete || 10,
removeOnFail: options.removeOnFail || 5
}
};
})
);
const createdJobs = await addJobsInChunks(queue, batchJobs);
return {
totalItems: items.length,
jobsCreated: createdJobs.length,
batchesCreated: batches.length,
mode: 'batch'
};
}
/**
* Process a batch job - loads payload from cache and creates individual jobs
*/
export async function processBatchJob(jobData: any, queue: QueueService): Promise<any> {
const { payloadKey, batchIndex, totalBatches, itemCount } = jobData;
logger.debug('Processing batch job', {
batchIndex,
totalBatches,
itemCount
});
try {
const payload = await loadPayload(payloadKey);
const { items, processorStr, options } = payload;
// Deserialize processor function (in production, use safer alternatives)
const processor = new Function('return ' + processorStr)();
const jobs = items.map((item: any, index: number) => ({
name: 'process-item',
data: {
type: 'process-item',
service: 'batch-processor',
provider: 'batch-item',
operation: 'process-single-item',
payload: processor(item, index),
priority: options.priority || 1
},
opts: {
delay: index * (options.delayPerItem || 1000),
priority: options.priority || 1,
attempts: options.retries || 3
}
}));
const createdJobs = await addJobsInChunks(queue, jobs);
// Cleanup payload after successful processing
await cleanupPayload(payloadKey);
return {
batchIndex,
itemsProcessed: items.length,
jobsCreated: createdJobs.length
};
} catch (error) {
logger.error('Batch job processing failed', { batchIndex, error });
throw error;
}
}
// Helper functions
function createBatches<T>(items: T[], batchSize: number): T[][] {
const batches: T[][] = [];
for (let i = 0; i < items.length; i += batchSize) {
batches.push(items.slice(i, i + batchSize));
}
return batches;
}
async function storePayload<T>(
items: T[],
processor: (item: T, index: number) => any,
options: ProcessOptions
): Promise<string> {
const cache = getCache();
const key = `payload_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
const payload = {
items,
processorStr: processor.toString(),
options: {
delayPerItem: 1000,
priority: options.priority || 1,
retries: options.retries || 3
},
createdAt: Date.now()
};
await cache.set(key, JSON.stringify(payload), options.ttl || 86400);
logger.debug('Stored batch payload', {
key,
itemCount: items.length
});
return key;
}
async function loadPayload(key: string): Promise<any> {
const cache = getCache();
const data = await cache.get(key);
if (!data) {
throw new Error(`Payload not found: ${key}`);
}
return JSON.parse(data as string);
}
async function cleanupPayload(key: string): Promise<void> {
try {
const cache = getCache();
await cache.del(key);
logger.debug('Cleaned up payload', { key });
} catch (error) {
logger.warn('Failed to cleanup payload', { key, error });
}
}
async function addJobsInChunks(queue: QueueService, jobs: any[], chunkSize = 100): Promise<any[]> {
const allCreatedJobs = [];
for (let i = 0; i < jobs.length; i += chunkSize) {
const chunk = jobs.slice(i, i + chunkSize);
try {
const createdJobs = await queue.addBulk(chunk);
allCreatedJobs.push(...createdJobs);
// Small delay between chunks to avoid overwhelming Redis
if (i + chunkSize < jobs.length) {
await new Promise(resolve => setTimeout(resolve, 100));
}
} catch (error) {
logger.error('Failed to add job chunk', {
startIndex: i,
chunkSize: chunk.length,
error
});
}
}
return allCreatedJobs;
}
// Convenience functions for common use cases
export async function processSymbols(
symbols: string[],
queue: QueueService,
options: {
operation: string;
service: string;
provider: string;
totalDelayMs: number;
useBatching?: boolean;
batchSize?: number;
priority?: number;
}
): Promise<BatchResult> {
return processItems(
symbols,
(symbol, index) => ({
symbol,
index,
source: 'batch-processing'
}),
queue,
{
totalDelayMs: options.totalDelayMs,
batchSize: options.batchSize || 100,
priority: options.priority || 1,
useBatching: options.useBatching || false
}
);
}
export async function processProxies(
proxies: any[],
queue: QueueService,
options: {
totalDelayMs: number;
useBatching?: boolean;
batchSize?: number;
priority?: number;
}
): Promise<BatchResult> {
return processItems(
proxies,
(proxy, index) => ({
proxy,
index,
source: 'batch-processing'
}),
queue,
{
totalDelayMs: options.totalDelayMs,
batchSize: options.batchSize || 200,
priority: options.priority || 2,
useBatching: options.useBatching || true
}
);
}