starting to implement the queue job service
This commit is contained in:
parent
c10a524aa8
commit
8681c34529
9 changed files with 458 additions and 20 deletions
198
apps/data-service/src/services/queue.service.ts
Normal file
198
apps/data-service/src/services/queue.service.ts
Normal file
|
|
@ -0,0 +1,198 @@
|
|||
/**
|
||||
* BullMQ Queue Service
|
||||
* Handles job scheduling and processing for the data service
|
||||
*/
|
||||
import { Queue, Worker, QueueEvents } from 'bullmq';
|
||||
import { getLogger } from '@stock-bot/logger';
|
||||
import type { ProxyInfo } from '@stock-bot/http';
|
||||
|
||||
const logger = getLogger('queue-service');
|
||||
|
||||
export interface ProxyJobData {
|
||||
type: 'fetch-and-check' | 'check-specific' | 'clear-cache';
|
||||
proxies?: ProxyInfo[];
|
||||
}
|
||||
|
||||
export class QueueService {
|
||||
private queue: Queue;
|
||||
private worker: Worker;
|
||||
private queueEvents: QueueEvents;
|
||||
|
||||
constructor() {
|
||||
const connection = {
|
||||
host: process.env.DRAGONFLY_HOST || 'localhost',
|
||||
port: parseInt(process.env.DRAGONFLY_PORT || '6379'),
|
||||
};
|
||||
|
||||
// Create queue
|
||||
this.queue = new Queue('proxy-tasks', { connection });
|
||||
|
||||
// Create worker
|
||||
this.worker = new Worker('proxy-tasks', this.processJob.bind(this), {
|
||||
connection,
|
||||
concurrency: 3,
|
||||
});
|
||||
|
||||
// Create queue events for monitoring
|
||||
this.queueEvents = new QueueEvents('proxy-tasks', { connection });
|
||||
|
||||
this.setupEventListeners();
|
||||
logger.info('Queue service initialized', { connection });
|
||||
}
|
||||
|
||||
private async processJob(job: any) {
|
||||
const { type, proxies }: ProxyJobData = job.data;
|
||||
logger.info('Processing job', {
|
||||
id: job.id,
|
||||
type,
|
||||
proxiesCount: proxies?.length
|
||||
});
|
||||
|
||||
try {
|
||||
switch (type) {
|
||||
case 'fetch-and-check':
|
||||
// Import proxy service dynamically to avoid circular dependencies
|
||||
const { proxyService } = await import('./proxy.service');
|
||||
return await proxyService.fetchProxiesFromSources();
|
||||
|
||||
case 'check-specific':
|
||||
if (!proxies) throw new Error('Proxies required for check-specific job');
|
||||
const { proxyService: ps } = await import('./proxy.service');
|
||||
return await ps.checkProxies(proxies);
|
||||
|
||||
case 'clear-cache':
|
||||
// Clear proxy cache
|
||||
const { proxyService: pcs } = await import('./proxy.service');
|
||||
// Assuming you have a clearCache method
|
||||
// return await pcs.clearCache();
|
||||
logger.info('Cache clear job processed');
|
||||
return { cleared: true };
|
||||
|
||||
default:
|
||||
throw new Error(`Unknown job type: ${type}`);
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Job processing failed', {
|
||||
id: job.id,
|
||||
type,
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
private setupEventListeners() {
|
||||
this.worker.on('completed', (job) => {
|
||||
logger.info('Job completed', {
|
||||
id: job.id,
|
||||
type: job.data.type,
|
||||
result: job.returnvalue
|
||||
});
|
||||
});
|
||||
|
||||
this.worker.on('failed', (job, err) => {
|
||||
logger.error('Job failed', {
|
||||
id: job?.id,
|
||||
type: job?.data.type,
|
||||
error: err.message
|
||||
});
|
||||
});
|
||||
|
||||
this.worker.on('progress', (job, progress) => {
|
||||
logger.debug('Job progress', {
|
||||
id: job.id,
|
||||
progress: `${progress}%`
|
||||
});
|
||||
});
|
||||
|
||||
this.queueEvents.on('waiting', ({ jobId }) => {
|
||||
logger.debug('Job waiting', { jobId });
|
||||
});
|
||||
|
||||
this.queueEvents.on('active', ({ jobId }) => {
|
||||
logger.debug('Job active', { jobId });
|
||||
});
|
||||
}
|
||||
|
||||
async scheduleRecurringTasks() {
|
||||
// Fetch and check proxies every 15 minutes
|
||||
await this.queue.add('fetch-and-check',
|
||||
{ type: 'fetch-and-check' },
|
||||
{
|
||||
repeat: { pattern: '*/15 * * * *' },
|
||||
removeOnComplete: 10,
|
||||
removeOnFail: 5,
|
||||
jobId: 'recurring-proxy-fetch', // Use consistent ID to prevent duplicates
|
||||
}
|
||||
);
|
||||
|
||||
// Clear cache daily at midnight
|
||||
await this.queue.add('clear-cache',
|
||||
{ type: 'clear-cache' },
|
||||
{
|
||||
repeat: { pattern: '0 0 * * *' },
|
||||
removeOnComplete: 1,
|
||||
removeOnFail: 1,
|
||||
jobId: 'daily-cache-clear',
|
||||
}
|
||||
);
|
||||
|
||||
logger.info('Recurring tasks scheduled');
|
||||
}
|
||||
|
||||
async addImmediateProxyCheck(proxies: ProxyInfo[]) {
|
||||
return await this.queue.add('check-specific',
|
||||
{ type: 'check-specific', proxies },
|
||||
{
|
||||
priority: 10,
|
||||
removeOnComplete: 5,
|
||||
removeOnFail: 3,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
async addManualProxyFetch() {
|
||||
return await this.queue.add('fetch-and-check',
|
||||
{ type: 'fetch-and-check' },
|
||||
{
|
||||
priority: 5,
|
||||
removeOnComplete: 5,
|
||||
removeOnFail: 3,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
async getQueueStats() {
|
||||
const [waiting, active, completed, failed, delayed] = await Promise.all([
|
||||
this.queue.getWaiting(),
|
||||
this.queue.getActive(),
|
||||
this.queue.getCompleted(),
|
||||
this.queue.getFailed(),
|
||||
this.queue.getDelayed(),
|
||||
]);
|
||||
|
||||
return {
|
||||
waiting: waiting.length,
|
||||
active: active.length,
|
||||
completed: completed.length,
|
||||
failed: failed.length,
|
||||
delayed: delayed.length,
|
||||
};
|
||||
}
|
||||
|
||||
async getQueue() {
|
||||
return this.queue;
|
||||
}
|
||||
|
||||
async shutdown() {
|
||||
logger.info('Shutting down queue service...');
|
||||
|
||||
await this.worker.close();
|
||||
await this.queue.close();
|
||||
await this.queueEvents.close();
|
||||
|
||||
logger.info('Queue service shut down');
|
||||
}
|
||||
}
|
||||
|
||||
export const queueService = new QueueService();
|
||||
Loading…
Add table
Add a link
Reference in a new issue