switching to generic queue lib

This commit is contained in:
Boki 2025-06-14 15:28:51 -04:00
parent 6c548416d1
commit e5170b1c78
15 changed files with 500 additions and 1086 deletions

View file

@ -18,10 +18,10 @@
"@stock-bot/http": "*",
"@stock-bot/logger": "*",
"@stock-bot/mongodb-client": "*",
"@stock-bot/queue": "*",
"@stock-bot/questdb-client": "*",
"@stock-bot/shutdown": "*",
"@stock-bot/types": "*",
"bullmq": "^5.53.2",
"hono": "^4.0.0",
"p-limit": "^6.2.0",
"ws": "^8.0.0"

View file

@ -9,8 +9,7 @@ import { connectMongoDB, disconnectMongoDB } from '@stock-bot/mongodb-client';
import { Shutdown } from '@stock-bot/shutdown';
import { initializeIBResources } from './providers/ib.tasks';
import { initializeProxyResources } from './providers/proxy.tasks';
import { queueManager } from './services/queue.service';
import { initializeBatchCache } from './utils/batch-helpers';
import { queueManager } from './services/queue-manager.service';
import { healthRoutes, marketDataRoutes, proxyRoutes, queueRoutes, testRoutes } from './routes';
// Load environment variables
@ -46,25 +45,20 @@ async function initializeServices() {
await Browser.initialize();
logger.info('Browser resources initialized');
// Initialize batch cache FIRST - before queue service
logger.info('Starting batch cache initialization...');
await initializeBatchCache();
logger.info('Batch cache initialized');
// Initialize proxy resources
logger.info('Starting proxy resources initialization...');
await initializeProxyResources();
logger.info('Proxy resources initialized');
// Initialize proxy cache - before queue service
logger.info('Starting proxy cache initialization...');
await initializeProxyResources(true); // Wait for cache during startup
logger.info('Proxy cache initialized');
// Initialize IB resources
logger.info('Starting IB resources initialization...');
await initializeIBResources();
logger.info('IB resources initialized');
// Initialize proxy cache - before queue service
logger.info('Starting proxy cache initialization...');
await initializeIBResources(true); // Wait for cache during startup
logger.info('Proxy cache initialized');
// Initialize queue service (Redis connections should be ready now)
logger.info('Starting queue service initialization...');
// Initialize queue manager (includes batch cache initialization)
logger.info('Starting queue manager initialization...');
await queueManager.initialize();
logger.info('Queue service initialized');
logger.info('Queue manager initialized');
logger.info('All services initialized successfully');
} catch (error) {

View file

@ -1,42 +1,82 @@
/**
* Interactive Brokers Provider for new queue system
*/
import { getLogger } from '@stock-bot/logger';
import { ProviderConfig } from '../services/provider-registry.service';
import type { ProviderConfigWithSchedule } from '@stock-bot/queue';
import { providerRegistry } from '@stock-bot/queue';
const logger = getLogger('ib-provider');
export const ibProvider: ProviderConfig = {
name: 'ib',
operations: {
'ib-exchanges-and-symbols': async () => {
const { ibTasks } = await import('./ib.tasks');
logger.info('Fetching symbol summary from IB');
const sessionHeaders = await ibTasks.fetchSession();
logger.info('Fetched symbol summary from IB');
// Initialize and register the IB provider
export function initializeIBProvider() {
logger.info('Registering IB provider with scheduled jobs...');
if (sessionHeaders) {
logger.info('Fetching exchanges from IB');
const exchanges = await ibTasks.fetchExchanges(sessionHeaders);
logger.info('Fetched exchanges from IB', { count: exchanges.lenght });
const ibProviderConfig: ProviderConfigWithSchedule = {
name: 'ib',
operations: {
'fetch-session': async _payload => {
// payload contains session configuration (not used in current implementation)
logger.debug('Processing session fetch request');
const { fetchSession } = await import('./ib.tasks');
return fetchSession();
},
// do the same as above but for symbols
logger.info('Fetching symbols from IB');
const symbols = await ibTasks.fetchSymbols(sessionHeaders);
logger.info('Fetched symbols from IB', { symbols });
'fetch-exchanges': async _payload => {
// payload should contain session headers
logger.debug('Processing exchanges fetch request');
const { fetchSession, fetchExchanges } = await import('./ib.tasks');
const sessionHeaders = await fetchSession();
if (sessionHeaders) {
return fetchExchanges(sessionHeaders);
}
throw new Error('Failed to get session headers');
},
return { exchangesCount: exchanges?.length, symbolsCount: symbols?.length };
}
'fetch-symbols': async _payload => {
// payload should contain session headers
logger.debug('Processing symbols fetch request');
const { fetchSession, fetchSymbols } = await import('./ib.tasks');
const sessionHeaders = await fetchSession();
if (sessionHeaders) {
return fetchSymbols(sessionHeaders);
}
throw new Error('Failed to get session headers');
},
'ib-exchanges-and-symbols': async _payload => {
// Legacy operation for scheduled jobs
logger.info('Fetching symbol summary from IB');
const { fetchSession, fetchExchanges, fetchSymbols } = await import('./ib.tasks');
const sessionHeaders = await fetchSession();
logger.info('Fetched symbol summary from IB');
if (sessionHeaders) {
logger.info('Fetching exchanges from IB');
const exchanges = await fetchExchanges(sessionHeaders);
logger.info('Fetched exchanges from IB', { count: exchanges?.length });
logger.info('Fetching symbols from IB');
const symbols = await fetchSymbols(sessionHeaders);
logger.info('Fetched symbols from IB', { symbols });
return { exchangesCount: exchanges?.length, symbolsCount: symbols?.length };
}
},
},
},
scheduledJobs: [
{
type: 'ib-exchanges-and-symbols',
operation: 'ib-exchanges-and-symbols',
payload: {},
cronPattern: '0 0 * * 0', // Every Sunday at midnight
priority: 5,
description: 'Fetch and update IB exchanges and symbols data',
// immediately: true, // Don't run immediately during startup to avoid conflicts
},
],
};
scheduledJobs: [
{
type: 'ib-exchanges-and-symbols',
operation: 'ib-exchanges-and-symbols',
payload: {},
// should remove and just run at the same time so app restarts dont keeping adding same jobs
cronPattern: '0 0 * * 0',
priority: 5,
// immediately: true, // Don't run immediately during startup to avoid conflicts
description: 'Fetch and validate proxy list from sources',
},
],
};
providerRegistry.registerWithSchedule(ibProviderConfig);
logger.info('IB provider registered successfully with scheduled jobs');
}

View file

@ -1,98 +1,48 @@
import { ProxyInfo } from 'libs/http/src/types';
/**
* Proxy Provider for new queue system
*/
import { ProxyInfo } from '@stock-bot/http';
import { getLogger } from '@stock-bot/logger';
import { ProviderConfig } from '../services/provider-registry.service';
import type { ProviderConfigWithSchedule } from '@stock-bot/queue';
import { providerRegistry } from '@stock-bot/queue';
// Create logger for this provider
const logger = getLogger('proxy-provider');
// This will run at the same time each day as when the app started
const getEvery24HourCron = (): string => {
const now = new Date();
const hours = now.getHours();
const minutes = now.getMinutes();
return `${minutes} ${hours} * * *`; // Every day at startup time
};
// Initialize and register the Proxy provider
export function initializeProxyProvider() {
logger.info('Registering proxy provider with scheduled jobs...');
export const proxyProvider: ProviderConfig = {
name: 'proxy-provider',
operations: {
'fetch-and-check': async (_payload: { sources?: string[] }) => {
const { proxyService } = await import('./proxy.tasks');
const { queueManager } = await import('../services/queue.service');
const { processItems } = await import('../utils/batch-helpers');
const proxies = await proxyService.fetchProxiesFromSources();
if (proxies.length === 0) {
return { proxiesFetched: 0, jobsCreated: 0 };
}
// Use generic function with routing parameters
const result = await processItems(
proxies,
(proxy, index) => ({
proxy,
index,
source: 'batch-processing',
}),
queueManager,
{
totalDelayHours: 12, //parseFloat(process.env.PROXY_VALIDATION_HOURS || '1'),
batchSize: parseInt(process.env.PROXY_BATCH_SIZE || '200'),
useBatching: process.env.PROXY_DIRECT_MODE !== 'true',
provider: 'proxy-provider',
operation: 'check-proxy',
}
);
return result;
},
'process-batch-items': async (payload: any) => {
// Process a batch using the simplified batch helpers
const { processBatchJob } = await import('../utils/batch-helpers');
const { queueManager } = await import('../services/queue.service');
return await processBatchJob(payload, queueManager);
},
'check-proxy': async (payload: {
proxy: ProxyInfo;
source?: string;
batchIndex?: number;
itemIndex?: number;
total?: number;
}) => {
const { checkProxy } = await import('./proxy.tasks');
try {
const result = await checkProxy(payload.proxy);
logger.debug('Proxy validated', {
proxy: `${payload.proxy.host}:${payload.proxy.port}`,
isWorking: result.isWorking,
responseTime: result.responseTime,
const proxyProviderConfig: ProviderConfigWithSchedule = {
name: 'proxy',
operations: {
'check-proxy': async (payload: ProxyInfo) => {
// payload is now the raw proxy info object
logger.debug('Processing proxy check request', {
proxy: `${payload.host}:${payload.port}`,
});
return { result, proxy: payload.proxy };
} catch (error) {
logger.warn('Proxy validation failed', {
proxy: `${payload.proxy.host}:${payload.proxy.port}`,
error: error instanceof Error ? error.message : String(error),
});
return { result: { isWorking: false, error: String(error) }, proxy: payload.proxy };
}
const { checkProxy } = await import('./proxy.tasks');
return checkProxy(payload);
},
'fetch-from-sources': async _payload => {
// Fetch proxies from all configured sources
logger.info('Processing fetch proxies from sources request');
const { fetchProxiesFromSources } = await import('./proxy.tasks');
return fetchProxiesFromSources();
},
},
},
scheduledJobs: [
// {
// type: 'proxy-maintenance',
// operation: 'fetch-and-check',
// payload: {},
// // should remove and just run at the same time so app restarts dont keeping adding same jobs
// cronPattern: getEvery24HourCron(),
// priority: 5,
// immediately: true, // Don't run immediately during startup to avoid conflicts
// description: 'Fetch and validate proxy list from sources',
// },
],
};
scheduledJobs: [
{
type: 'proxy-fetch-and-check',
operation: 'fetch-from-sources',
payload: {},
cronPattern: '0 */2 * * *', // Every 2 hours
priority: 5,
description: 'Fetch and validate proxy list from sources',
// immediately: true, // Don't run immediately during startup to avoid conflicts
},
],
};
providerRegistry.registerWithSchedule(proxyProviderConfig);
logger.info('Proxy provider registered successfully with scheduled jobs');
}

View file

@ -3,7 +3,7 @@
*/
import { Hono } from 'hono';
import { getLogger } from '@stock-bot/logger';
import { queueManager } from '../services/queue.service';
import { queueManager } from '../services/queue-manager.service';
const logger = getLogger('market-data-routes');
@ -16,9 +16,8 @@ marketDataRoutes.get('/api/live/:symbol', async c => {
try {
// Queue job for live data using Yahoo provider
const job = await queueManager.addJob({
const job = await queueManager.addJob('market-data-live', {
type: 'market-data-live',
service: 'market-data',
provider: 'yahoo-finance',
operation: 'live-data',
payload: { symbol },
@ -47,9 +46,8 @@ marketDataRoutes.get('/api/historical/:symbol', async c => {
const toDate = to ? new Date(to) : new Date(); // Now
// Queue job for historical data using Yahoo provider
const job = await queueManager.addJob({
const job = await queueManager.addJob('market-data-historical', {
type: 'market-data-historical',
service: 'market-data',
provider: 'yahoo-finance',
operation: 'historical-data',
payload: {
@ -72,3 +70,50 @@ marketDataRoutes.get('/api/historical/:symbol', async c => {
return c.json({ status: 'error', message: 'Failed to queue historical data job' }, 500);
}
});
// Batch processing endpoint using new queue system
marketDataRoutes.post('/api/process-symbols', async c => {
try {
const {
symbols,
provider = 'ib',
operation = 'fetch-session',
useBatching = true,
totalDelayMs = 30000,
batchSize = 10,
} = await c.req.json();
if (!symbols || !Array.isArray(symbols) || symbols.length === 0) {
return c.json({ status: 'error', message: 'Invalid symbols array' }, 400);
}
logger.info('Batch processing symbols', {
count: symbols.length,
provider,
operation,
useBatching,
});
const result = await queueManager.processSymbols(symbols, {
totalDelayMs,
useBatching,
batchSize,
priority: 2,
provider,
operation,
retries: 2,
removeOnComplete: 5,
removeOnFail: 10,
});
return c.json({
status: 'success',
message: 'Batch processing initiated',
result,
symbols: symbols.length,
});
} catch (error) {
logger.error('Failed to process symbols batch', { error });
return c.json({ status: 'error', message: 'Failed to process symbols batch' }, 500);
}
});

View file

@ -3,7 +3,7 @@
*/
import { Hono } from 'hono';
import { getLogger } from '@stock-bot/logger';
import { queueManager } from '../services/queue.service';
import { queueManager } from '../services/queue-manager.service';
const logger = getLogger('queue-routes');
@ -12,8 +12,8 @@ export const queueRoutes = new Hono();
// Queue management endpoints
queueRoutes.get('/api/queue/status', async c => {
try {
const status = await queueManager.getQueueStatus();
return c.json({ status: 'success', data: status });
const stats = await queueManager.getStats();
return c.json({ status: 'success', data: stats });
} catch (error) {
logger.error('Failed to get queue status', { error });
return c.json({ status: 'error', message: 'Failed to get queue status' }, 500);
@ -22,8 +22,8 @@ queueRoutes.get('/api/queue/status', async c => {
queueRoutes.post('/api/queue/job', async c => {
try {
const jobData = await c.req.json();
const job = await queueManager.addJob(jobData);
const { name, data, options } = await c.req.json();
const job = await queueManager.addJob(name, data, options);
return c.json({ status: 'success', jobId: job.id });
} catch (error) {
logger.error('Failed to add job', { error });
@ -34,9 +34,9 @@ queueRoutes.post('/api/queue/job', async c => {
// Provider registry endpoints
queueRoutes.get('/api/providers', async c => {
try {
const { providerRegistry } = await import('../services/provider-registry.service');
const providers = providerRegistry.getProviders();
return c.json({ status: 'success', providers });
const { providerRegistry } = await import('@stock-bot/queue');
const configs = providerRegistry.getProviderConfigs();
return c.json({ status: 'success', providers: configs });
} catch (error) {
logger.error('Failed to get providers', { error });
return c.json({ status: 'error', message: 'Failed to get providers' }, 500);
@ -46,7 +46,7 @@ queueRoutes.get('/api/providers', async c => {
// Add new endpoint to see scheduled jobs
queueRoutes.get('/api/scheduled-jobs', async c => {
try {
const { providerRegistry } = await import('../services/provider-registry.service');
const { providerRegistry } = await import('@stock-bot/queue');
const jobs = providerRegistry.getAllScheduledJobs();
return c.json({
status: 'success',
@ -59,13 +59,14 @@ queueRoutes.get('/api/scheduled-jobs', async c => {
}
});
queueRoutes.post('/api/queue/drain', async c => {
queueRoutes.post('/api/queue/clean', async c => {
try {
await queueManager.drainQueue();
const status = await queueManager.getQueueStatus();
return c.json({ status: 'success', message: 'Queue drained', queueStatus: status });
const { grace = 60000 } = await c.req.json(); // Default 1 minute
await queueManager.clean(grace);
const stats = await queueManager.getStats();
return c.json({ status: 'success', message: 'Queue cleaned', queueStats: stats });
} catch (error) {
logger.error('Failed to drain queue', { error });
return c.json({ status: 'error', message: 'Failed to drain queue' }, 500);
logger.error('Failed to clean queue', { error });
return c.json({ status: 'error', message: 'Failed to clean queue' }, 500);
}
});

View file

@ -1,135 +0,0 @@
import { getLogger } from '@stock-bot/logger';
export interface JobHandler {
(payload: any): Promise<any>;
}
export interface JobData {
type?: string;
provider: string;
operation: string;
payload: any;
priority?: number;
immediately?: boolean;
}
export interface ScheduledJob {
type: string;
operation: string;
payload: any;
cronPattern: string;
priority?: number;
description?: string;
immediately?: boolean;
}
export interface ProviderConfig {
name: string;
operations: Record<string, JobHandler>;
scheduledJobs?: ScheduledJob[];
}
export interface ProviderRegistry {
registerProvider: (config: ProviderConfig) => void;
getHandler: (provider: string, operation: string) => JobHandler | null;
getAllScheduledJobs: () => Array<{ provider: string; job: ScheduledJob }>;
getProviders: () => Array<{ key: string; config: ProviderConfig }>;
hasProvider: (provider: string) => boolean;
clear: () => void;
}
/**
* Create a new provider registry instance
*/
export function createProviderRegistry(): ProviderRegistry {
const logger = getLogger('provider-registry');
const providers = new Map<string, ProviderConfig>();
/**
* Register a provider with its operations
*/
function registerProvider(config: ProviderConfig): void {
providers.set(config.name, config);
logger.info(`Registered provider: ${config.name}`, {
operations: Object.keys(config.operations),
scheduledJobs: config.scheduledJobs?.length || 0,
});
}
/**
* Get a job handler for a specific provider and operation
*/
function getHandler(provider: string, operation: string): JobHandler | null {
const providerConfig = providers.get(provider);
if (!providerConfig) {
logger.warn(`Provider not found: ${provider}`);
return null;
}
const handler = providerConfig.operations[operation];
if (!handler) {
logger.warn(`Operation not found: ${operation} in provider ${provider}`);
return null;
}
return handler;
}
/**
* Get all scheduled jobs from all providers
*/
function getAllScheduledJobs(): Array<{ provider: string; job: ScheduledJob }> {
const allJobs: Array<{ provider: string; job: ScheduledJob }> = [];
for (const [, config] of providers) {
if (config.scheduledJobs) {
for (const job of config.scheduledJobs) {
allJobs.push({
provider: config.name,
job,
});
}
}
}
return allJobs;
}
/**
* Get all registered providers with their configurations
*/
function getProviders(): Array<{ key: string; config: ProviderConfig }> {
return Array.from(providers.entries()).map(([key, config]) => ({
key,
config,
}));
}
/**
* Check if a provider exists
*/
function hasProvider(provider: string): boolean {
return providers.has(provider);
}
/**
* Clear all providers (useful for testing)
*/
function clear(): void {
providers.clear();
logger.info('All providers cleared');
}
return {
registerProvider,
getHandler,
getAllScheduledJobs,
getProviders,
hasProvider,
clear,
};
}
// Create the default shared registry instance
export const providerRegistry = createProviderRegistry();

View file

@ -0,0 +1,184 @@
/**
* Data Service Queue Manager
* Uses the new @stock-bot/queue library with provider registry
*/
import { getLogger } from '@stock-bot/logger';
import type { JobData } from '@stock-bot/queue';
import { initializeBatchCache, providerRegistry, QueueManager } from '@stock-bot/queue';
const logger = getLogger('queue-manager-service');
class DataServiceQueueManager {
private queueManager: QueueManager;
private isInitialized = false;
constructor() {
this.queueManager = new QueueManager({
queueName: 'data-service-queue',
workers: parseInt(process.env.WORKER_COUNT || '5'),
concurrency: parseInt(process.env.WORKER_CONCURRENCY || '20'),
redis: {
host: process.env.DRAGONFLY_HOST || 'localhost',
port: parseInt(process.env.DRAGONFLY_PORT || '6379'),
},
});
}
async initialize() {
if (this.isInitialized) {
logger.warn('Queue manager already initialized');
return;
}
logger.info('Initializing data service queue manager...');
try {
// Register all providers
await this.registerProviders();
// Initialize the queue manager
await this.queueManager.initialize();
// Initialize batch cache
await initializeBatchCache(this.queueManager);
// Set up scheduled jobs
await this.setupScheduledJobs();
this.isInitialized = true;
logger.info('Data service queue manager initialized successfully');
} catch (error) {
logger.error('Failed to initialize queue manager', error);
throw error;
}
}
private async registerProviders() {
logger.info('Registering queue providers...');
// Initialize providers using the new provider registry system
const { initializeIBProvider } = await import('../providers/ib.provider');
const { initializeProxyProvider } = await import('../providers/proxy.provider');
// Register providers with scheduled jobs
initializeIBProvider();
initializeProxyProvider();
// Now register all providers from the registry with the queue manager
const allProviders = providerRegistry.getAllProviders();
for (const [providerName, config] of allProviders) {
this.queueManager.registerProvider(providerName, config.operations);
logger.info(`Registered provider: ${providerName}`);
}
// Log scheduled jobs
const scheduledJobs = providerRegistry.getAllScheduledJobs();
logger.info(`Registered ${scheduledJobs.length} scheduled jobs across all providers`);
for (const { provider, job } of scheduledJobs) {
logger.info(
`Scheduled job: ${provider}.${job.type} - ${job.description} (${job.cronPattern})`
);
}
logger.info('All providers registered successfully');
}
private async setupScheduledJobs() {
const scheduledJobs = providerRegistry.getAllScheduledJobs();
if (scheduledJobs.length === 0) {
logger.info('No scheduled jobs found');
return;
}
logger.info(`Setting up ${scheduledJobs.length} scheduled jobs...`);
for (const { provider, job } of scheduledJobs) {
try {
const jobData: JobData = {
type: job.type,
provider,
operation: job.operation,
payload: job.payload,
priority: job.priority,
};
await this.queueManager.add(`recurring-${provider}-${job.operation}`, jobData, {
repeat: {
pattern: job.cronPattern,
tz: 'UTC',
immediately: job.immediately || false,
},
removeOnComplete: 1,
removeOnFail: 1,
attempts: 2,
backoff: {
type: 'fixed',
delay: 5000,
},
});
logger.info(`Scheduled job registered: ${provider}.${job.type} (${job.cronPattern})`);
} catch (error) {
logger.error(`Failed to register scheduled job: ${provider}.${job.type}`, { error });
}
}
logger.info('Scheduled jobs setup complete');
}
async addJob(name: string, data: JobData, options?: Record<string, unknown>) {
if (!this.isInitialized) {
throw new Error('Queue manager not initialized');
}
return this.queueManager.add(name, data, options);
}
async addBulk(jobs: Array<{ name: string; data: JobData; opts?: Record<string, unknown> }>) {
if (!this.isInitialized) {
throw new Error('Queue manager not initialized');
}
return this.queueManager.addBulk(jobs);
}
async getStats() {
if (!this.isInitialized) {
throw new Error('Queue manager not initialized');
}
return this.queueManager.getStats();
}
async clean(grace: number) {
if (!this.isInitialized) {
throw new Error('Queue manager not initialized');
}
return this.queueManager.clean(grace);
}
async shutdown() {
if (!this.isInitialized) {
return;
}
logger.info('Shutting down queue manager...');
await this.queueManager.shutdown();
this.isInitialized = false;
logger.info('Queue manager shutdown complete');
}
// Compatibility methods for existing code
getQueueName() {
return this.queueManager.getQueueName();
}
get queue() {
return this.queueManager;
}
}
// Export singleton instance
export const queueManager = new DataServiceQueueManager();

View file

@ -1,419 +0,0 @@
import { Queue, QueueEvents, Worker, type Job } from 'bullmq';
import { getLogger } from '@stock-bot/logger';
import { providerRegistry, type JobData } from './provider-registry.service';
export class QueueService {
private logger = getLogger('queue-service');
private queue!: Queue;
private workers: Worker[] = [];
private queueEvents!: QueueEvents;
private config = {
workers: parseInt(process.env.WORKER_COUNT || '5'),
concurrency: parseInt(process.env.WORKER_CONCURRENCY || '20'),
redis: {
host: process.env.DRAGONFLY_HOST || 'localhost',
port: parseInt(process.env.DRAGONFLY_PORT || '6379'),
},
};
private get isInitialized() {
return !!this.queue;
}
constructor() {
// Don't initialize in constructor to allow for proper async initialization
}
async initialize() {
if (this.isInitialized) {
this.logger.warn('Queue service already initialized');
return;
}
this.logger.info('Initializing queue service...');
try {
// Step 1: Register providers
await this.registerProviders();
// Step 2: Setup queue and workers
const connection = this.getConnection();
const queueName = '{data-service-queue}';
this.queue = new Queue(queueName, {
connection,
defaultJobOptions: {
removeOnComplete: 10,
removeOnFail: 5,
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
},
});
this.queueEvents = new QueueEvents(queueName, { connection });
// Step 3: Create workers
const { workerCount, totalConcurrency } = this.createWorkers(queueName, connection);
// Step 4: Wait for readiness (parallel)
await Promise.all([
this.queue.waitUntilReady(),
this.queueEvents.waitUntilReady(),
...this.workers.map(worker => worker.waitUntilReady()),
]);
// Step 5: Setup events and scheduled tasks
this.setupQueueEvents();
await this.setupScheduledTasks();
this.logger.info('Queue service initialized successfully', {
workers: workerCount,
totalConcurrency,
});
} catch (error) {
this.logger.error('Failed to initialize queue service', { error });
throw error;
}
}
private getConnection() {
return {
...this.config.redis,
maxRetriesPerRequest: null,
retryDelayOnFailover: 100,
lazyConnect: false,
};
}
private createWorkers(queueName: string, connection: any) {
for (let i = 0; i < this.config.workers; i++) {
const worker = new Worker(queueName, this.processJob.bind(this), {
connection: { ...connection },
concurrency: this.config.concurrency,
maxStalledCount: 1,
stalledInterval: 30000,
});
// Setup events inline
worker.on('ready', () => this.logger.info(`Worker ${i + 1} ready`));
worker.on('error', error => this.logger.error(`Worker ${i + 1} error`, { error }));
this.workers.push(worker);
}
return {
workerCount: this.config.workers,
totalConcurrency: this.config.workers * this.config.concurrency,
};
}
private setupQueueEvents() {
// Add comprehensive logging to see job flow
this.queueEvents.on('added', job => {
this.logger.debug('Job added to queue', {
id: job.jobId,
});
});
this.queueEvents.on('waiting', job => {
this.logger.debug('Job moved to waiting', {
id: job.jobId,
});
});
this.queueEvents.on('active', job => {
this.logger.debug('Job became active', {
id: job.jobId,
});
});
this.queueEvents.on('delayed', job => {
this.logger.debug('Job delayed', {
id: job.jobId,
delay: job.delay,
});
});
this.queueEvents.on('completed', job => {
this.logger.debug('Job completed', {
id: job.jobId,
});
});
this.queueEvents.on('failed', (job, error) => {
this.logger.debug('Job failed', {
id: job.jobId,
error: String(error),
});
});
}
private async registerProviders() {
this.logger.info('Registering providers...');
try {
// Define providers to register
const providers = [
{ module: '../providers/proxy.provider', export: 'proxyProvider' },
{ module: '../providers/ib.provider', export: 'ibProvider' },
// { module: '../providers/yahoo.provider', export: 'yahooProvider' },
];
// Import and register all providers
for (const { module, export: exportName } of providers) {
const providerModule = await import(module);
providerRegistry.registerProvider(providerModule[exportName]);
}
this.logger.info('All providers registered successfully');
} catch (error) {
this.logger.error('Failed to register providers', { error });
throw error;
}
}
private async processJob(job: Job) {
const { provider, operation, payload }: JobData = job.data;
this.logger.info('Processing job', {
id: job.id,
provider,
operation,
payloadKeys: Object.keys(payload || {}),
});
try {
let result;
if (operation === 'process-batch-items') {
// Special handling for batch processing - requires 2 parameters
const { processBatchJob } = await import('../utils/batch-helpers');
result = await processBatchJob(payload, this);
} else {
// Regular handler lookup - requires 1 parameter
const handler = providerRegistry.getHandler(provider, operation);
if (!handler) {
throw new Error(`No handler found for ${provider}:${operation}`);
}
result = await handler(payload);
}
this.logger.info('Job completed successfully', {
id: job.id,
provider,
operation,
});
return result;
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
this.logger.error('Job failed', {
id: job.id,
provider,
operation,
error: errorMessage,
});
throw error;
}
}
async addBulk(jobs: any[]): Promise<any[]> {
return await this.queue.addBulk(jobs);
}
private getTotalConcurrency() {
return this.workers.reduce((total, worker) => total + (worker.opts.concurrency || 1), 0);
}
private async setupScheduledTasks() {
const allScheduledJobs = providerRegistry.getAllScheduledJobs();
if (allScheduledJobs.length === 0) {
this.logger.warn('No scheduled jobs found in providers');
return;
}
this.logger.info('Setting up scheduled tasks...', { count: allScheduledJobs.length });
// Use Promise.allSettled for parallel processing + better error handling
const results = await Promise.allSettled(
allScheduledJobs.map(async ({ provider, job }) => {
await this.addRecurringJob(
{
type: job.type,
provider,
operation: job.operation,
payload: job.payload,
priority: job.priority,
immediately: job.immediately || false,
},
job.cronPattern
);
return { provider, operation: job.operation };
})
);
// Log results
const successful = results.filter(r => r.status === 'fulfilled');
const failed = results.filter(r => r.status === 'rejected');
if (failed.length > 0) {
failed.forEach((result, index) => {
const { provider, job } = allScheduledJobs[index];
this.logger.error('Failed to register scheduled job', {
provider,
operation: job.operation,
error: result.reason,
});
});
}
this.logger.info('Scheduled tasks setup complete', {
successful: successful.length,
failed: failed.length,
});
}
private async addJobInternal(jobData: JobData, options: any = {}) {
if (!this.isInitialized) {
throw new Error('Queue service not initialized');
}
const jobType = jobData.type || `${jobData.provider}-${jobData.operation}`;
return this.queue.add(jobType, jobData, {
priority: jobData.priority || undefined,
removeOnComplete: 10,
removeOnFail: 5,
...options,
});
}
async addJob(jobData: JobData, options?: any) {
return this.addJobInternal(jobData, options);
}
async addRecurringJob(jobData: JobData, cronPattern: string, options?: any) {
const jobKey = `recurring-${jobData.provider}-${jobData.operation}`;
return this.addJobInternal(jobData, {
repeat: {
pattern: cronPattern,
tz: 'UTC',
immediately: jobData.immediately || false,
},
jobId: jobKey,
removeOnComplete: 1,
removeOnFail: 1,
attempts: 2,
backoff: {
type: 'fixed',
delay: 5000,
},
...options,
});
}
async getJobStats() {
if (!this.isInitialized) {
throw new Error('Queue service not initialized. Call initialize() first.');
}
const [waiting, active, completed, failed, delayed] = await Promise.all([
this.queue.getWaiting(),
this.queue.getActive(),
this.queue.getCompleted(),
this.queue.getFailed(),
this.queue.getDelayed(),
]);
return {
waiting: waiting.length,
active: active.length,
completed: completed.length,
failed: failed.length,
delayed: delayed.length,
};
}
async drainQueue() {
if (this.isInitialized) {
await this.queue.drain();
}
}
async getQueueStatus() {
if (!this.isInitialized) {
throw new Error('Queue service not initialized');
}
const stats = await this.getJobStats();
return {
...stats,
workers: this.workers.length,
concurrency: this.getTotalConcurrency(),
};
}
async shutdown() {
if (!this.isInitialized) {
this.logger.warn('Queue service not initialized, nothing to shutdown');
return;
}
this.logger.info('Shutting down queue service gracefully...');
try {
// Step 1: Stop accepting new jobs and wait for current jobs to finish
this.logger.debug('Closing workers gracefully...');
const workerClosePromises = this.workers.map(async (worker, index) => {
this.logger.debug(`Closing worker ${index + 1}/${this.workers.length}`);
try {
// Wait for current jobs to finish, then close
await Promise.race([
worker.close(),
new Promise((_, reject) =>
setTimeout(() => reject(new Error(`Worker ${index + 1} close timeout`)), 5000)
),
]);
this.logger.debug(`Worker ${index + 1} closed successfully`);
} catch (error) {
this.logger.error(`Failed to close worker ${index + 1}`, { error });
// Force close if graceful close fails
await worker.close(true);
}
});
await Promise.allSettled(workerClosePromises);
this.logger.debug('All workers closed');
// Step 2: Close queue and events with timeout protection
this.logger.debug('Closing queue and events...');
await Promise.allSettled([
Promise.race([
this.queue.close(),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Queue close timeout')), 3000)
),
]).catch(error => this.logger.error('Queue close error', { error })),
Promise.race([
this.queueEvents.close(),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('QueueEvents close timeout')), 3000)
),
]).catch(error => this.logger.error('QueueEvents close error', { error })),
]);
this.logger.info('Queue service shutdown completed successfully');
} catch (error) {
this.logger.error('Error during queue service shutdown', { error });
// Force close everything as last resort
try {
await Promise.allSettled([
...this.workers.map(worker => worker.close(true)),
this.queue.close(),
this.queueEvents.close(),
]);
} catch (forceCloseError) {
this.logger.error('Force close also failed', { error: forceCloseError });
}
throw error;
}
}
}
export const queueManager = new QueueService();

View file

@ -1,364 +0,0 @@
import { CacheProvider, createCache } from '@stock-bot/cache';
import { getLogger } from '@stock-bot/logger';
import type { QueueService } from '../services/queue.service';
const logger = getLogger('batch-helpers');
// Simple interfaces
export interface ProcessOptions {
totalDelayHours: number;
batchSize?: number;
priority?: number;
useBatching?: boolean;
retries?: number;
ttl?: number;
removeOnComplete?: number;
removeOnFail?: number;
// Job routing information
provider?: string;
operation?: string;
}
export interface BatchResult {
jobsCreated: number;
mode: 'direct' | 'batch';
totalItems: number;
batchesCreated?: number;
duration: number;
}
// Cache instance for payload storage
let cacheProvider: CacheProvider | null = null;
function getCache(): CacheProvider {
if (!cacheProvider) {
cacheProvider = createCache({
keyPrefix: 'batch:',
ttl: 86400, // 24 hours default
enableMetrics: true,
});
}
return cacheProvider;
}
/**
* Initialize the batch cache before any batch operations
* This should be called during application startup
*/
export async function initializeBatchCache(): Promise<void> {
logger.info('Initializing batch cache...');
const cache = getCache();
await cache.waitForReady(10000);
logger.info('Batch cache initialized successfully');
}
/**
* Main function - processes items either directly or in batches
*/
export async function processItems<T>(
items: T[],
processor: (item: T, index: number) => any,
queue: QueueService,
options: ProcessOptions
): Promise<BatchResult> {
const startTime = Date.now();
if (items.length === 0) {
return {
jobsCreated: 0,
mode: 'direct',
totalItems: 0,
duration: 0,
};
}
logger.info('Starting batch processing', {
totalItems: items.length,
mode: options.useBatching ? 'batch' : 'direct',
batchSize: options.batchSize,
totalDelayHours: options.totalDelayHours,
});
try {
const result = options.useBatching
? await processBatched(items, processor, queue, options)
: await processDirect(items, processor, queue, options);
const duration = Date.now() - startTime;
logger.info('Batch processing completed', {
...result,
duration: `${(duration / 1000).toFixed(1)}s`,
});
return { ...result, duration };
} catch (error) {
logger.error('Batch processing failed', error);
throw error;
}
}
/**
* Process items directly - each item becomes a separate job
*/
async function processDirect<T>(
items: T[],
processor: (item: T, index: number) => any,
queue: QueueService,
options: ProcessOptions
): Promise<Omit<BatchResult, 'duration'>> {
const totalDelayMs = options.totalDelayHours * 60 * 60 * 1000;
const delayPerItem = totalDelayMs / items.length;
logger.info('Creating direct jobs', {
totalItems: items.length,
delayPerItem: `${(delayPerItem / 1000).toFixed(1)}s`,
});
const jobs = items.map((item, index) => ({
name: 'process-item',
data: {
type: 'process-item',
provider: options.provider || 'generic',
operation: options.operation || 'process-item',
payload: processor(item, index),
priority: options.priority || undefined,
},
opts: {
delay: index * delayPerItem,
priority: options.priority || undefined,
attempts: options.retries || 3,
removeOnComplete: options.removeOnComplete || 10,
removeOnFail: options.removeOnFail || 5,
},
}));
const createdJobs = await addJobsInChunks(queue, jobs);
return {
totalItems: items.length,
jobsCreated: createdJobs.length,
mode: 'direct',
};
}
/**
* Process items in batches - groups of items are stored and processed together
*/
async function processBatched<T>(
items: T[],
processor: (item: T, index: number) => any,
queue: QueueService,
options: ProcessOptions
): Promise<Omit<BatchResult, 'duration'>> {
const batchSize = options.batchSize || 100;
const batches = createBatches(items, batchSize);
const totalDelayMs = options.totalDelayHours * 60 * 60 * 1000;
const delayPerBatch = totalDelayMs / batches.length;
logger.info('Creating batch jobs', {
totalItems: items.length,
batchSize,
totalBatches: batches.length,
delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes`,
});
const batchJobs = await Promise.all(
batches.map(async (batch, batchIndex) => {
const payloadKey = await storePayload(batch, processor, options);
return {
name: 'process-batch',
data: {
type: 'process-batch',
provider: options.provider || 'generic',
operation: 'process-batch-items',
payload: {
payloadKey,
batchIndex,
totalBatches: batches.length,
itemCount: batch.length,
},
priority: options.priority || undefined,
},
opts: {
delay: batchIndex * delayPerBatch,
priority: options.priority || undefined,
attempts: options.retries || 3,
removeOnComplete: options.removeOnComplete || 10,
removeOnFail: options.removeOnFail || 5,
},
};
})
);
const createdJobs = await addJobsInChunks(queue, batchJobs);
return {
totalItems: items.length,
jobsCreated: createdJobs.length,
batchesCreated: batches.length,
mode: 'batch',
};
}
/**
* Process a batch job - loads payload from cache and creates individual jobs
*/
export async function processBatchJob(jobData: any, queue: QueueService): Promise<any> {
const { payloadKey, batchIndex, totalBatches, itemCount } = jobData;
logger.debug('Processing batch job', {
batchIndex,
totalBatches,
itemCount,
});
try {
const payload = await loadPayload(payloadKey);
if (!payload || !payload.items || !payload.processorStr) {
logger.error('Invalid payload data', { payloadKey, payload });
throw new Error(`Invalid payload data for key: ${payloadKey}`);
}
const { items, processorStr, options } = payload;
// Deserialize the processor function
const processor = new Function('return ' + processorStr)();
const jobs = items.map((item: any, index: number) => ({
name: 'process-item',
data: {
type: 'process-item',
provider: options.provider || 'generic',
operation: options.operation || 'generic',
payload: processor(item, index),
priority: options.priority || undefined,
},
opts: {
delay: index * (options.delayPerItem || 1000),
priority: options.priority || undefined,
attempts: options.retries || 3,
},
}));
const createdJobs = await addJobsInChunks(queue, jobs);
// Cleanup payload after successful processing
await cleanupPayload(payloadKey);
return {
batchIndex,
itemsProcessed: items.length,
jobsCreated: createdJobs.length,
};
} catch (error) {
logger.error('Batch job processing failed', { batchIndex, error });
throw error;
}
}
// Helper functions
function createBatches<T>(items: T[], batchSize: number): T[][] {
const batches: T[][] = [];
for (let i = 0; i < items.length; i += batchSize) {
batches.push(items.slice(i, i + batchSize));
}
return batches;
}
async function storePayload<T>(
items: T[],
processor: (item: T, index: number) => any,
options: ProcessOptions
): Promise<string> {
const cache = getCache();
// Create more specific key: batch:provider:operation:payload_timestamp_random
const timestamp = Date.now();
const randomId = Math.random().toString(36).substr(2, 9);
const provider = options.provider || 'generic';
const operation = options.operation || 'generic';
const key = `${provider}:${operation}:payload_${timestamp}_${randomId}`;
const payload = {
items,
processorStr: processor.toString(),
options: {
delayPerItem: 1000,
priority: options.priority || undefined,
retries: options.retries || 3,
// Store routing information for later use
provider: options.provider || 'generic',
operation: options.operation || 'generic',
},
createdAt: Date.now(),
};
logger.debug('Storing batch payload', {
key,
itemCount: items.length,
});
await cache.set(key, payload, options.ttl || 86400);
logger.debug('Stored batch payload successfully', {
key,
itemCount: items.length,
});
return key;
}
async function loadPayload(key: string): Promise<any> {
const cache = getCache();
logger.debug('Loading batch payload', { key });
const data = await cache.get(key);
if (!data) {
logger.error('Payload not found in cache', { key });
throw new Error(`Payload not found: ${key}`);
}
logger.debug('Loaded batch payload successfully', { key });
return data;
}
async function cleanupPayload(key: string): Promise<void> {
try {
const cache = getCache();
await cache.del(key);
logger.debug('Cleaned up payload', { key });
} catch (error) {
logger.warn('Failed to cleanup payload', { key, error });
}
}
async function addJobsInChunks(queue: QueueService, jobs: any[], chunkSize = 100): Promise<any[]> {
const allCreatedJobs = [];
for (let i = 0; i < jobs.length; i += chunkSize) {
const chunk = jobs.slice(i, i + chunkSize);
try {
const createdJobs = await queue.addBulk(chunk);
allCreatedJobs.push(...createdJobs);
// Small delay between chunks to avoid overwhelming Redis
if (i + chunkSize < jobs.length) {
await new Promise(resolve => setTimeout(resolve, 100));
}
} catch (error) {
logger.error('Failed to add job chunk', {
startIndex: i,
chunkSize: chunk.length,
error,
});
}
}
return allCreatedJobs;
}

View file

@ -25,6 +25,7 @@
{ "path": "../../libs/event-bus" },
{ "path": "../../libs/shutdown" },
{ "path": "../../libs/utils" },
{ "path": "../../libs/browser" }
{ "path": "../../libs/browser" },
{ "path": "../../libs/queue" }
]
}

View file

@ -9,3 +9,14 @@ export { initializeBatchCache, processBatchJob, processItems } from './batch-pro
export { QueueManager } from './queue-manager';
export { providerRegistry } from './provider-registry';
// Re-export types for convenience
export type {
BatchResult,
JobHandler,
ProcessOptions,
ProviderConfig,
ProviderConfigWithSchedule,
QueueConfig,
ScheduledJob,
} from './types';

View file

@ -1,13 +1,14 @@
import { getLogger } from '@stock-bot/logger';
import type { JobHandler, ProviderConfig } from './types';
import type { JobHandler, ProviderConfig, ProviderConfigWithSchedule, ScheduledJob } from './types';
const logger = getLogger('provider-registry');
class ProviderRegistry {
private providers = new Map<string, ProviderConfig>();
private providerSchedules = new Map<string, ScheduledJob[]>();
/**
* Register a provider with its operations
* Register a provider with its operations (simple config)
*/
register(providerName: string, config: ProviderConfig): void {
logger.info(`Registering provider: ${providerName}`, {
@ -17,6 +18,22 @@ class ProviderRegistry {
this.providers.set(providerName, config);
}
/**
* Register a provider with operations and scheduled jobs (full config)
*/
registerWithSchedule(config: ProviderConfigWithSchedule): void {
logger.info(`Registering provider with schedule: ${config.name}`, {
operations: Object.keys(config.operations),
scheduledJobs: config.scheduledJobs?.length || 0,
});
this.providers.set(config.name, config.operations);
if (config.scheduledJobs && config.scheduledJobs.length > 0) {
this.providerSchedules.set(config.name, config.scheduledJobs);
}
}
/**
* Get a handler for a specific provider and operation
*/
@ -38,6 +55,69 @@ class ProviderRegistry {
return handler;
}
/**
* Get all scheduled jobs from all providers
*/
getAllScheduledJobs(): Array<{ provider: string; job: ScheduledJob }> {
const allJobs: Array<{ provider: string; job: ScheduledJob }> = [];
for (const [providerName, jobs] of this.providerSchedules) {
for (const job of jobs) {
allJobs.push({
provider: providerName,
job,
});
}
}
return allJobs;
}
/**
* Get scheduled jobs for a specific provider
*/
getScheduledJobs(provider: string): ScheduledJob[] {
return this.providerSchedules.get(provider) || [];
}
/**
* Check if a provider has scheduled jobs
*/
hasScheduledJobs(provider: string): boolean {
return this.providerSchedules.has(provider);
}
/**
* Get all registered providers with their configurations
*/
getProviderConfigs(): Array<{ name: string; operations: string[]; scheduledJobs: number }> {
return Array.from(this.providers.keys()).map(name => ({
name,
operations: Object.keys(this.providers.get(name) || {}),
scheduledJobs: this.providerSchedules.get(name)?.length || 0,
}));
}
/**
* Get all providers with their full configurations for queue manager registration
*/
getAllProviders(): Map<string, { operations: ProviderConfig; scheduledJobs?: ScheduledJob[] }> {
const result = new Map<
string,
{ operations: ProviderConfig; scheduledJobs?: ScheduledJob[] }
>();
for (const [name, operations] of this.providers) {
const scheduledJobs = this.providerSchedules.get(name);
result.set(name, {
operations,
scheduledJobs,
});
}
return result;
}
/**
* Get all registered providers
*/
@ -72,6 +152,7 @@ class ProviderRegistry {
* Remove a provider
*/
unregister(provider: string): boolean {
this.providerSchedules.delete(provider);
return this.providers.delete(provider);
}
@ -80,20 +161,28 @@ class ProviderRegistry {
*/
clear(): void {
this.providers.clear();
this.providerSchedules.clear();
}
/**
* Get registry statistics
*/
getStats(): { providers: number; totalOperations: number } {
getStats(): { providers: number; totalOperations: number; totalScheduledJobs: number } {
let totalOperations = 0;
let totalScheduledJobs = 0;
for (const config of this.providers.values()) {
totalOperations += Object.keys(config).length;
}
for (const jobs of this.providerSchedules.values()) {
totalScheduledJobs += jobs.length;
}
return {
providers: this.providers.size,
totalOperations,
totalScheduledJobs,
};
}
}

View file

@ -56,10 +56,26 @@ export interface JobHandler {
(payload: any): Promise<any>;
}
export interface ScheduledJob {
type: string;
operation: string;
payload: any;
cronPattern: string;
priority?: number;
description?: string;
immediately?: boolean;
}
export interface ProviderConfig {
[operation: string]: JobHandler;
}
export interface ProviderConfigWithSchedule {
name: string;
operations: Record<string, JobHandler>;
scheduledJobs?: ScheduledJob[];
}
export interface BatchJobData {
payloadKey: string;
batchIndex: number;

View file

@ -14,7 +14,8 @@
"sourceMap": true,
"moduleResolution": "node",
"resolveJsonModule": true,
"allowSyntheticDefaultImports": true
"allowSyntheticDefaultImports": true,
"composite": true
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist", "**/*.test.ts", "**/*.spec.ts"]