stock-bot/apps/data-service/src/services/queue.service.ts

282 lines
7.8 KiB
TypeScript

import { Queue, Worker, QueueEvents } from 'bullmq';
import { Logger } from '@stock-bot/logger';
import { providerRegistry } from './provider-registry.service';
export interface JobData {
type: string;
service: string;
provider: string;
operation: string;
payload: any;
priority?: number;
}
export class QueueService {
private logger = new Logger('queue-service');
private queue!: Queue;
private worker!: Worker;
private queueEvents!: QueueEvents;
private isInitialized = false;
constructor() {
// Don't initialize in constructor to allow for proper async initialization
}
async initialize() {
if (this.isInitialized) {
this.logger.warn('Queue service already initialized');
return;
}
this.logger.info('Initializing queue service...');
// Register all providers first
await this.registerProviders();
const connection = {
host: process.env.DRAGONFLY_HOST || 'localhost',
port: parseInt(process.env.DRAGONFLY_PORT || '6379'),
};
this.logger.info('Connecting to Redis/Dragonfly', connection);
try {
this.queue = new Queue('data-service-queue', { connection });
this.worker = new Worker('data-service-queue', this.processJob.bind(this), {
connection,
concurrency: 10
});
this.queueEvents = new QueueEvents('data-service-queue', { connection }); // Test connection
await this.queue.waitUntilReady();
await this.worker.waitUntilReady();
await this.queueEvents.waitUntilReady();
this.setupEventListeners();
this.isInitialized = true;
this.logger.info('Queue service initialized successfully');
await this.setupScheduledTasks();
} catch (error) {
this.logger.error('Failed to initialize queue service', { error });
throw error;
}
}
private async registerProviders() {
this.logger.info('Registering providers...');
try {
// Import and register all providers
const { proxyProvider } = await import('../providers/proxy.provider');
const { quotemediaProvider } = await import('../providers/quotemedia.provider');
const { yahooProvider } = await import('../providers/yahoo.provider');
providerRegistry.registerProvider(proxyProvider);
providerRegistry.registerProvider(quotemediaProvider);
providerRegistry.registerProvider(yahooProvider);
this.logger.info('All providers registered successfully');
} catch (error) {
this.logger.error('Failed to register providers', { error });
throw error;
}
}
private async processJob(job: any) {
const { service, provider, operation, payload }: JobData = job.data;
this.logger.info('Processing job', {
id: job.id,
service,
provider,
operation,
payloadKeys: Object.keys(payload || {})
});
try {
// Get handler from registry
const handler = providerRegistry.getHandler(service, provider, operation);
if (!handler) {
throw new Error(`No handler found for ${service}:${provider}:${operation}`);
}
// Execute the handler
const result = await handler(payload);
this.logger.info('Job completed successfully', {
id: job.id,
service,
provider,
operation
});
return result;
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
this.logger.error('Job failed', {
id: job.id,
service,
provider,
operation,
error: errorMessage
});
throw error;
}
}
private setupEventListeners() {
this.queueEvents.on('completed', (job) => {
this.logger.info('Job completed', { id: job.jobId });
});
this.queueEvents.on('failed', (job) => {
this.logger.error('Job failed', { id: job.jobId, error: job.failedReason });
});
this.worker.on('progress', (job, progress) => {
this.logger.debug('Job progress', { id: job.id, progress });
});
}
private async setupScheduledTasks() {
try {
// Market data refresh every minute using Yahoo Finance
await this.addRecurringJob({
type: 'market-data-refresh',
service: 'market-data',
provider: 'yahoo-finance',
operation: 'live-data',
payload: { symbol: 'AAPL' }
}, '*/1 * * * *');
// Market data refresh using QuoteMedia every 2 minutes
await this.addRecurringJob({
type: 'market-data-quotemedia',
service: 'market-data',
provider: 'quotemedia',
operation: 'batch-quotes',
payload: { symbols: ['GOOGL', 'MSFT', 'TSLA'] }
}, '*/2 * * * *');
// Proxy fetch every 15 minutes
await this.addRecurringJob({
type: 'proxy-maintenance',
service: 'proxy',
provider: 'proxy-service',
operation: 'fetch-and-check',
payload: {}
}, '*/15 * * * *');
// Proxy cleanup daily at 2 AM
await this.addRecurringJob({
type: 'proxy-cleanup',
service: 'proxy',
provider: 'proxy-service',
operation: 'cleanup-old-data',
payload: { daysToKeep: 7 }
}, '0 2 * * *');
this.logger.info('Scheduled tasks configured');
} catch (error) {
this.logger.error('Failed to setup scheduled tasks', error);
}
}
async addJob(jobData: JobData, options?: any) {
if (!this.isInitialized) {
throw new Error('Queue service not initialized. Call initialize() first.');
}
return this.queue.add(jobData.type, jobData, {
priority: jobData.priority || 0,
removeOnComplete: 10,
removeOnFail: 5,
...options
});
}
async addRecurringJob(jobData: JobData, cronPattern: string) {
if (!this.isInitialized) {
throw new Error('Queue service not initialized. Call initialize() first.');
}
return this.queue.add(
`recurring-${jobData.type}`,
jobData,
{
repeat: { pattern: cronPattern },
removeOnComplete: 1,
removeOnFail: 1,
jobId: `recurring-${jobData.service}-${jobData.provider}-${jobData.operation}`
}
);
}
async getJobStats() {
if (!this.isInitialized) {
throw new Error('Queue service not initialized. Call initialize() first.');
}
const [waiting, active, completed, failed, delayed] = await Promise.all([
this.queue.getWaiting(),
this.queue.getActive(),
this.queue.getCompleted(),
this.queue.getFailed(),
this.queue.getDelayed()
]);
return {
waiting: waiting.length,
active: active.length,
completed: completed.length,
failed: failed.length,
delayed: delayed.length
};
}
async getQueueStatus() {
if (!this.isInitialized) {
throw new Error('Queue service not initialized. Call initialize() first.');
}
const stats = await this.getJobStats();
return {
...stats,
workers: this.getWorkerCount(),
queue: this.queue.name,
connection: {
host: process.env.DRAGONFLY_HOST || 'localhost',
port: parseInt(process.env.DRAGONFLY_PORT || '6379')
}
};
}
getWorkerCount() {
if (!this.isInitialized) {
return 0;
}
return this.worker.opts.concurrency || 1;
}
getRegisteredProviders() {
return providerRegistry.getProviders().map(({ key, config }) => ({
key,
name: config.name,
service: config.service,
operations: Object.keys(config.operations)
}));
}
async shutdown() {
if (!this.isInitialized) {
this.logger.warn('Queue service not initialized, nothing to shutdown');
return;
}
this.logger.info('Shutting down queue service');
await this.worker.close();
await this.queue.close();
await this.queueEvents.close();
this.isInitialized = false;
}
}
export const queueManager = new QueueService();