reorganized stuff

This commit is contained in:
Bojan Kucera 2025-06-08 14:20:45 -04:00
parent 2bc46cdb2a
commit 8daaff27fd
6 changed files with 116 additions and 662 deletions

View file

@ -1,19 +1,17 @@
/**
* BullMQ Queue Service
* Handles job scheduling and processing for the data service
*/
import { Queue, Worker, QueueEvents } from 'bullmq';
import { getLogger } from '@stock-bot/logger';
import type { ProxyInfo } from '@stock-bot/http';
import { Logger } from '@stock-bot/logger';
const logger = getLogger('queue-service');
export interface ProxyJobData {
type: 'fetch-and-check' | 'check-specific' | 'clear-cache';
proxies?: ProxyInfo[];
export interface JobData {
type: 'proxy-fetch' | 'proxy-check' | 'market-data' | 'historical-data';
service: 'proxy' | 'market-data' | 'analytics';
provider: string;
operation: string;
payload: any;
priority?: number;
}
export class QueueService {
private logger = new Logger('queue-manager');
private queue: Queue;
private worker: Worker;
private queueEvents: QueueEvents;
@ -24,151 +22,125 @@ export class QueueService {
port: parseInt(process.env.DRAGONFLY_PORT || '6379'),
};
// Create queue
this.queue = new Queue('proxy-tasks', { connection });
// Create worker
this.worker = new Worker('proxy-tasks', this.processJob.bind(this), {
this.queue = new Queue('data-service-queue', { connection });
this.worker = new Worker('data-service-queue', this.processJob.bind(this), {
connection,
concurrency: 3,
concurrency: 10
});
// Create queue events for monitoring
this.queueEvents = new QueueEvents('proxy-tasks', { connection });
this.queueEvents = new QueueEvents('data-service-queue', { connection });
this.setupEventListeners();
logger.info('Queue service initialized', { connection });
this.setupScheduledTasks();
}
private async processJob(job: any) {
const { type, proxies }: ProxyJobData = job.data;
logger.info('Processing job', {
id: job.id,
type,
proxiesCount: proxies?.length
});
const { type, service, provider, operation, payload }: JobData = job.data;
this.logger.info('Processing job', { id: job.id, type, service, provider, operation });
try {
switch (type) {
case 'fetch-and-check':
// Import proxy service dynamically to avoid circular dependencies
const { proxyService } = await import('./proxy.service');
return await proxyService.fetchProxiesFromSources();
case 'check-specific':
if (!proxies) throw new Error('Proxies required for check-specific job');
const { proxyService: ps } = await import('./proxy.service');
return await ps.checkProxies(proxies);
case 'clear-cache':
// Clear proxy cache
const { proxyService: pcs } = await import('./proxy.service');
// Assuming you have a clearCache method
// return await pcs.clearCache();
logger.info('Cache clear job processed');
return { cleared: true };
case 'proxy-fetch':
return await this.handleProxyFetch(payload);
case 'proxy-check':
return await this.handleProxyCheck(payload);
case 'market-data':
return await this.handleMarketData(payload);
case 'historical-data':
return await this.handleHistoricalData(payload);
default:
throw new Error(`Unknown job type: ${type}`);
}
} catch (error) {
logger.error('Job processing failed', {
id: job.id,
type,
error: error instanceof Error ? error.message : String(error)
});
this.logger.error('Job failed', { id: job.id, type, error });
throw error;
}
}
private async handleProxyFetch(payload: any) {
const { proxyService } = await import('./proxy.service');
return await proxyService.fetchProxiesFromSources();
}
private async handleProxyCheck(payload: { proxies: any[] }) {
const { proxyService } = await import('./proxy.service');
return await proxyService.checkProxies(payload.proxies);
}
private async handleMarketData(payload: { symbol: string }) {
const { marketDataProvider } = await import('../providers/market-data.provider.js');
return await marketDataProvider.getLiveData(payload.symbol);
}
private async handleHistoricalData(payload: { symbol: string; from: Date; to: Date; interval: string }) {
const { marketDataProvider } = await import('../providers/market-data.provider.js');
return await marketDataProvider.getHistoricalData(payload.symbol, payload.from, payload.to, payload.interval);
}
private setupEventListeners() {
this.worker.on('completed', (job) => {
logger.info('Job completed', {
id: job.id,
type: job.data.type,
result: job.returnvalue
});
this.queueEvents.on('completed', (job) => {
this.logger.info('Job completed', { id: job.jobId });
});
this.worker.on('failed', (job, err) => {
logger.error('Job failed', {
id: job?.id,
type: job?.data.type,
error: err.message
});
this.queueEvents.on('failed', (job) => {
this.logger.error('Job failed', { id: job.jobId, error: job.failedReason });
});
this.worker.on('progress', (job, progress) => {
logger.debug('Job progress', {
id: job.id,
progress: `${progress}%`
});
});
this.queueEvents.on('waiting', ({ jobId }) => {
logger.debug('Job waiting', { jobId });
});
this.queueEvents.on('active', ({ jobId }) => {
logger.debug('Job active', { jobId });
this.logger.debug('Job progress', { id: job.id, progress });
});
}
async scheduleRecurringTasks() {
// Fetch and check proxies every 15 minutes
await this.queue.add('fetch-and-check',
{ type: 'fetch-and-check' },
{
repeat: { pattern: '*/15 * * * *' },
removeOnComplete: 10,
removeOnFail: 5,
jobId: 'recurring-proxy-fetch', // Use consistent ID to prevent duplicates
}
);
private setupScheduledTasks() {
// Market data refresh every minute
this.addRecurringJob({
type: 'market-data',
service: 'market-data',
provider: 'unified-data',
operation: 'refresh-cache',
payload: { symbols: ['AAPL', 'GOOGL', 'MSFT'] }
}, '*/1 * * * *');
// Clear cache daily at midnight
await this.queue.add('clear-cache',
{ type: 'clear-cache' },
// Proxy check every 15 minutes
this.addRecurringJob({
type: 'proxy-fetch',
service: 'proxy',
provider: 'proxy-service',
operation: 'fetch-and-check',
payload: {}
}, '*/15 * * * *');
this.logger.info('Scheduled tasks configured');
}
async addJob(jobData: JobData, options?: any) {
return this.queue.add(jobData.type, jobData, {
priority: jobData.priority || 0,
removeOnComplete: 10,
removeOnFail: 5,
...options
});
}
async addRecurringJob(jobData: JobData, cronPattern: string) {
return this.queue.add(
`recurring-${jobData.type}`,
jobData,
{
repeat: { pattern: '0 0 * * *' },
repeat: { pattern: cronPattern },
removeOnComplete: 1,
removeOnFail: 1,
jobId: 'daily-cache-clear',
}
);
logger.info('Recurring tasks scheduled');
}
async addImmediateProxyCheck(proxies: ProxyInfo[]) {
return await this.queue.add('check-specific',
{ type: 'check-specific', proxies },
{
priority: 10,
removeOnComplete: 5,
removeOnFail: 3,
jobId: `recurring-${jobData.service}-${jobData.provider}-${jobData.operation}`
}
);
}
async addManualProxyFetch() {
return await this.queue.add('fetch-and-check',
{ type: 'fetch-and-check' },
{
priority: 5,
removeOnComplete: 5,
removeOnFail: 3,
}
);
}
async getQueueStats() {
async getJobStats() {
const [waiting, active, completed, failed, delayed] = await Promise.all([
this.queue.getWaiting(),
this.queue.getActive(),
this.queue.getCompleted(),
this.queue.getFailed(),
this.queue.getDelayed(),
this.queue.getDelayed()
]);
return {
@ -176,23 +148,39 @@ export class QueueService {
active: active.length,
completed: completed.length,
failed: failed.length,
delayed: delayed.length,
delayed: delayed.length
};
}
async getQueueStatus() {
const stats = await this.getJobStats();
return {
...stats,
workers: this.getWorkerCount(),
queue: this.queue.name,
connection: {
host: process.env.DRAGONFLY_HOST || 'localhost',
port: parseInt(process.env.DRAGONFLY_PORT || '6379')
}
};
}
async getQueue() {
return this.queue;
getWorkerCount() {
return this.worker.opts.concurrency || 1;
}
getRegisteredProviders() {
return [
{ name: 'proxy-service', type: 'proxy', operations: ['fetch-and-check', 'check-specific'] },
{ name: 'market-data-provider', type: 'market-data', operations: ['live-data', 'historical-data'] }
];
}
async shutdown() {
logger.info('Shutting down queue service...');
this.logger.info('Shutting down queue manager');
await this.worker.close();
await this.queue.close();
await this.queueEvents.close();
logger.info('Queue service shut down');
}
}
export const queueService = new QueueService();
export const queueManager = new QueueService();