running prettier for cleanup
This commit is contained in:
parent
24b7ed15e4
commit
8955544593
151 changed files with 29158 additions and 27966 deletions
|
|
@ -1,380 +1,416 @@
|
|||
import { Queue, Worker, QueueEvents, type Job } from 'bullmq';
|
||||
import { getLogger } from '@stock-bot/logger';
|
||||
import { providerRegistry, type JobData } from './provider-registry.service';
|
||||
|
||||
export class QueueService {
|
||||
private logger = getLogger('queue-service');
|
||||
private queue!: Queue;
|
||||
private workers: Worker[] = [];
|
||||
private queueEvents!: QueueEvents;
|
||||
|
||||
private config = {
|
||||
workers: parseInt(process.env.WORKER_COUNT || '5'),
|
||||
concurrency: parseInt(process.env.WORKER_CONCURRENCY || '20'),
|
||||
redis: {
|
||||
host: process.env.DRAGONFLY_HOST || 'localhost',
|
||||
port: parseInt(process.env.DRAGONFLY_PORT || '6379')
|
||||
}
|
||||
};
|
||||
|
||||
private get isInitialized() {
|
||||
return !!this.queue;
|
||||
}
|
||||
|
||||
constructor() {
|
||||
// Don't initialize in constructor to allow for proper async initialization
|
||||
} async initialize() {
|
||||
if (this.isInitialized) {
|
||||
this.logger.warn('Queue service already initialized');
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.info('Initializing queue service...');
|
||||
|
||||
try {
|
||||
// Step 1: Register providers
|
||||
await this.registerProviders();
|
||||
|
||||
// Step 2: Setup queue and workers
|
||||
const connection = this.getConnection();
|
||||
const queueName = '{data-service-queue}';
|
||||
|
||||
this.queue = new Queue(queueName, {
|
||||
connection,
|
||||
defaultJobOptions: {
|
||||
removeOnComplete: 10,
|
||||
removeOnFail: 5,
|
||||
attempts: 3,
|
||||
backoff: { type: 'exponential', delay: 1000 }
|
||||
}
|
||||
});
|
||||
|
||||
this.queueEvents = new QueueEvents(queueName, { connection });
|
||||
|
||||
// Step 3: Create workers
|
||||
const { workerCount, totalConcurrency } = this.createWorkers(queueName, connection);
|
||||
|
||||
// Step 4: Wait for readiness (parallel)
|
||||
await Promise.all([
|
||||
this.queue.waitUntilReady(),
|
||||
this.queueEvents.waitUntilReady(),
|
||||
...this.workers.map(worker => worker.waitUntilReady())
|
||||
]);
|
||||
|
||||
// Step 5: Setup events and scheduled tasks
|
||||
this.setupQueueEvents();
|
||||
await this.setupScheduledTasks();
|
||||
|
||||
this.logger.info('Queue service initialized successfully', {
|
||||
workers: workerCount,
|
||||
totalConcurrency
|
||||
});
|
||||
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to initialize queue service', { error });
|
||||
throw error;
|
||||
}
|
||||
} private getConnection() {
|
||||
return {
|
||||
...this.config.redis,
|
||||
maxRetriesPerRequest: null,
|
||||
retryDelayOnFailover: 100,
|
||||
lazyConnect: false
|
||||
};
|
||||
}
|
||||
|
||||
private createWorkers(queueName: string, connection: any) {
|
||||
for (let i = 0; i < this.config.workers; i++) {
|
||||
const worker = new Worker(queueName, this.processJob.bind(this), {
|
||||
connection: { ...connection },
|
||||
concurrency: this.config.concurrency,
|
||||
maxStalledCount: 1,
|
||||
stalledInterval: 30000,
|
||||
});
|
||||
|
||||
// Setup events inline
|
||||
worker.on('ready', () => this.logger.info(`Worker ${i + 1} ready`));
|
||||
worker.on('error', (error) => this.logger.error(`Worker ${i + 1} error`, { error }));
|
||||
|
||||
this.workers.push(worker);
|
||||
}
|
||||
|
||||
return {
|
||||
workerCount: this.config.workers,
|
||||
totalConcurrency: this.config.workers * this.config.concurrency
|
||||
};
|
||||
} private setupQueueEvents() {
|
||||
// Only log failures, not every completion
|
||||
this.queueEvents.on('failed', (job, error) => {
|
||||
this.logger.error('Job failed', {
|
||||
id: job.jobId,
|
||||
error: String(error)
|
||||
});
|
||||
});
|
||||
|
||||
// Only log completions in debug mode
|
||||
if (process.env.LOG_LEVEL === 'debug') {
|
||||
this.queueEvents.on('completed', (job) => {
|
||||
this.logger.debug('Job completed', { id: job.jobId });
|
||||
});
|
||||
}
|
||||
}private async registerProviders() {
|
||||
this.logger.info('Registering providers...');
|
||||
|
||||
try {
|
||||
// Define providers to register
|
||||
const providers = [
|
||||
{ module: '../providers/proxy.provider', export: 'proxyProvider' },
|
||||
{ module: '../providers/quotemedia.provider', export: 'quotemediaProvider' },
|
||||
{ module: '../providers/yahoo.provider', export: 'yahooProvider' }
|
||||
];
|
||||
|
||||
// Import and register all providers
|
||||
for (const { module, export: exportName } of providers) {
|
||||
const providerModule = await import(module);
|
||||
providerRegistry.registerProvider(providerModule[exportName]);
|
||||
}
|
||||
|
||||
this.logger.info('All providers registered successfully');
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to register providers', { error });
|
||||
throw error;
|
||||
}
|
||||
}private async processJob(job: Job) {
|
||||
const { provider, operation, payload }: JobData = job.data;
|
||||
|
||||
this.logger.info('Processing job', {
|
||||
id: job.id,
|
||||
provider,
|
||||
operation,
|
||||
payloadKeys: Object.keys(payload || {})
|
||||
}); try {
|
||||
let result;
|
||||
|
||||
if (operation === 'process-batch-items') {
|
||||
// Special handling for batch processing - requires 2 parameters
|
||||
const { processBatchJob } = await import('../utils/batch-helpers');
|
||||
result = await processBatchJob(payload, this);
|
||||
} else {
|
||||
// Regular handler lookup - requires 1 parameter
|
||||
const handler = providerRegistry.getHandler(provider, operation);
|
||||
|
||||
if (!handler) {
|
||||
throw new Error(`No handler found for ${provider}:${operation}`);
|
||||
}
|
||||
|
||||
result = await handler(payload);
|
||||
}
|
||||
|
||||
this.logger.info('Job completed successfully', {
|
||||
id: job.id,
|
||||
provider,
|
||||
operation
|
||||
});
|
||||
|
||||
return result;
|
||||
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
this.logger.error('Job failed', {
|
||||
id: job.id,
|
||||
provider,
|
||||
operation,
|
||||
error: errorMessage
|
||||
});
|
||||
throw error;
|
||||
}
|
||||
} async addBulk(jobs: any[]): Promise<any[]> {
|
||||
return await this.queue.addBulk(jobs);
|
||||
}
|
||||
|
||||
private getTotalConcurrency() {
|
||||
return this.workers.reduce((total, worker) => total + (worker.opts.concurrency || 1), 0);
|
||||
}
|
||||
private async setupScheduledTasks() {
|
||||
const allScheduledJobs = providerRegistry.getAllScheduledJobs();
|
||||
|
||||
if (allScheduledJobs.length === 0) {
|
||||
this.logger.warn('No scheduled jobs found in providers');
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.info('Setting up scheduled tasks...', { count: allScheduledJobs.length });
|
||||
|
||||
// Use Promise.allSettled for parallel processing + better error handling
|
||||
const results = await Promise.allSettled(
|
||||
allScheduledJobs.map(async ({ provider, job }) => {
|
||||
await this.addRecurringJob({
|
||||
type: job.type,
|
||||
provider,
|
||||
operation: job.operation,
|
||||
payload: job.payload,
|
||||
priority: job.priority,
|
||||
immediately: job.immediately || false
|
||||
}, job.cronPattern);
|
||||
|
||||
return { provider, operation: job.operation };
|
||||
})
|
||||
);
|
||||
|
||||
// Log results
|
||||
const successful = results.filter(r => r.status === 'fulfilled');
|
||||
const failed = results.filter(r => r.status === 'rejected');
|
||||
|
||||
if (failed.length > 0) {
|
||||
failed.forEach((result, index) => {
|
||||
const { provider, job } = allScheduledJobs[index];
|
||||
this.logger.error('Failed to register scheduled job', {
|
||||
provider,
|
||||
operation: job.operation,
|
||||
error: result.reason
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
this.logger.info('Scheduled tasks setup complete', {
|
||||
successful: successful.length,
|
||||
failed: failed.length
|
||||
});
|
||||
} private async addJobInternal(jobData: JobData, options: any = {}) {
|
||||
if (!this.isInitialized) {
|
||||
throw new Error('Queue service not initialized');
|
||||
}
|
||||
|
||||
const jobType = jobData.type || `${jobData.provider}-${jobData.operation}`;
|
||||
return this.queue.add(jobType, jobData, {
|
||||
priority: jobData.priority || 0,
|
||||
removeOnComplete: 10,
|
||||
removeOnFail: 5,
|
||||
...options
|
||||
});
|
||||
}
|
||||
|
||||
async addJob(jobData: JobData, options?: any) {
|
||||
return this.addJobInternal(jobData, options);
|
||||
} async addRecurringJob(jobData: JobData, cronPattern: string, options?: any) {
|
||||
const jobKey = `recurring-${jobData.provider}-${jobData.operation}`;
|
||||
|
||||
return this.addJobInternal(jobData, {
|
||||
repeat: {
|
||||
pattern: cronPattern,
|
||||
tz: 'UTC',
|
||||
immediately: jobData.immediately || false,
|
||||
},
|
||||
jobId: jobKey,
|
||||
removeOnComplete: 1,
|
||||
removeOnFail: 1,
|
||||
attempts: 2,
|
||||
backoff: {
|
||||
type: 'fixed',
|
||||
delay: 5000
|
||||
},
|
||||
...options
|
||||
});
|
||||
}
|
||||
async getJobStats() {
|
||||
if (!this.isInitialized) {
|
||||
throw new Error('Queue service not initialized. Call initialize() first.');
|
||||
}
|
||||
const [waiting, active, completed, failed, delayed] = await Promise.all([
|
||||
this.queue.getWaiting(),
|
||||
this.queue.getActive(),
|
||||
this.queue.getCompleted(),
|
||||
this.queue.getFailed(),
|
||||
this.queue.getDelayed()
|
||||
]);
|
||||
|
||||
return {
|
||||
waiting: waiting.length,
|
||||
active: active.length,
|
||||
completed: completed.length,
|
||||
failed: failed.length,
|
||||
delayed: delayed.length
|
||||
};
|
||||
}
|
||||
async drainQueue() {
|
||||
if (this.isInitialized) {
|
||||
await this.queue.drain();
|
||||
}
|
||||
}
|
||||
async getQueueStatus() {
|
||||
if (!this.isInitialized) {
|
||||
throw new Error('Queue service not initialized');
|
||||
}
|
||||
|
||||
const stats = await this.getJobStats();
|
||||
return {
|
||||
...stats,
|
||||
workers: this.workers.length,
|
||||
concurrency: this.getTotalConcurrency()
|
||||
};
|
||||
}
|
||||
async shutdown() {
|
||||
if (!this.isInitialized) {
|
||||
this.logger.warn('Queue service not initialized, nothing to shutdown');
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.info('Shutting down queue service gracefully...');
|
||||
|
||||
try {
|
||||
// Step 1: Stop accepting new jobs and wait for current jobs to finish
|
||||
this.logger.debug('Closing workers gracefully...');
|
||||
const workerClosePromises = this.workers.map(async (worker, index) => {
|
||||
this.logger.debug(`Closing worker ${index + 1}/${this.workers.length}`);
|
||||
try {
|
||||
// Wait for current jobs to finish, then close
|
||||
await Promise.race([
|
||||
worker.close(),
|
||||
new Promise((_, reject) =>
|
||||
setTimeout(() => reject(new Error(`Worker ${index + 1} close timeout`)), 5000)
|
||||
)
|
||||
]);
|
||||
this.logger.debug(`Worker ${index + 1} closed successfully`);
|
||||
} catch (error) {
|
||||
this.logger.error(`Failed to close worker ${index + 1}`, { error });
|
||||
// Force close if graceful close fails
|
||||
await worker.close(true);
|
||||
}
|
||||
});
|
||||
|
||||
await Promise.allSettled(workerClosePromises);
|
||||
this.logger.debug('All workers closed');
|
||||
|
||||
// Step 2: Close queue and events with timeout protection
|
||||
this.logger.debug('Closing queue and events...');
|
||||
await Promise.allSettled([
|
||||
Promise.race([
|
||||
this.queue.close(),
|
||||
new Promise((_, reject) =>
|
||||
setTimeout(() => reject(new Error('Queue close timeout')), 3000)
|
||||
)
|
||||
]).catch(error => this.logger.error('Queue close error', { error })),
|
||||
|
||||
Promise.race([
|
||||
this.queueEvents.close(),
|
||||
new Promise((_, reject) =>
|
||||
setTimeout(() => reject(new Error('QueueEvents close timeout')), 3000)
|
||||
)
|
||||
]).catch(error => this.logger.error('QueueEvents close error', { error }))
|
||||
]);
|
||||
|
||||
this.logger.info('Queue service shutdown completed successfully');
|
||||
} catch (error) {
|
||||
this.logger.error('Error during queue service shutdown', { error });
|
||||
// Force close everything as last resort
|
||||
try {
|
||||
await Promise.allSettled([
|
||||
...this.workers.map(worker => worker.close(true)),
|
||||
this.queue.close(),
|
||||
this.queueEvents.close()
|
||||
]);
|
||||
} catch (forceCloseError) {
|
||||
this.logger.error('Force close also failed', { error: forceCloseError });
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const queueManager = new QueueService();
|
||||
import { Queue, QueueEvents, Worker, type Job } from 'bullmq';
|
||||
import { getLogger } from '@stock-bot/logger';
|
||||
import { providerRegistry, type JobData } from './provider-registry.service';
|
||||
|
||||
export class QueueService {
|
||||
private logger = getLogger('queue-service');
|
||||
private queue!: Queue;
|
||||
private workers: Worker[] = [];
|
||||
private queueEvents!: QueueEvents;
|
||||
|
||||
private config = {
|
||||
workers: parseInt(process.env.WORKER_COUNT || '5'),
|
||||
concurrency: parseInt(process.env.WORKER_CONCURRENCY || '20'),
|
||||
redis: {
|
||||
host: process.env.DRAGONFLY_HOST || 'localhost',
|
||||
port: parseInt(process.env.DRAGONFLY_PORT || '6379'),
|
||||
},
|
||||
};
|
||||
|
||||
private get isInitialized() {
|
||||
return !!this.queue;
|
||||
}
|
||||
|
||||
constructor() {
|
||||
// Don't initialize in constructor to allow for proper async initialization
|
||||
}
|
||||
async initialize() {
|
||||
if (this.isInitialized) {
|
||||
this.logger.warn('Queue service already initialized');
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.info('Initializing queue service...');
|
||||
|
||||
try {
|
||||
// Step 1: Register providers
|
||||
await this.registerProviders();
|
||||
|
||||
// Step 2: Setup queue and workers
|
||||
const connection = this.getConnection();
|
||||
const queueName = '{data-service-queue}';
|
||||
this.queue = new Queue(queueName, {
|
||||
connection,
|
||||
defaultJobOptions: {
|
||||
removeOnComplete: 10,
|
||||
removeOnFail: 5,
|
||||
attempts: 3,
|
||||
backoff: { type: 'exponential', delay: 1000 },
|
||||
},
|
||||
});
|
||||
|
||||
this.queueEvents = new QueueEvents(queueName, { connection });
|
||||
|
||||
// Step 3: Create workers
|
||||
const { workerCount, totalConcurrency } = this.createWorkers(queueName, connection);
|
||||
|
||||
// Step 4: Wait for readiness (parallel)
|
||||
await Promise.all([
|
||||
this.queue.waitUntilReady(),
|
||||
this.queueEvents.waitUntilReady(),
|
||||
...this.workers.map(worker => worker.waitUntilReady()),
|
||||
]);
|
||||
|
||||
// Step 5: Setup events and scheduled tasks
|
||||
this.setupQueueEvents();
|
||||
await this.setupScheduledTasks();
|
||||
|
||||
this.logger.info('Queue service initialized successfully', {
|
||||
workers: workerCount,
|
||||
totalConcurrency,
|
||||
});
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to initialize queue service', { error });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
private getConnection() {
|
||||
return {
|
||||
...this.config.redis,
|
||||
maxRetriesPerRequest: null,
|
||||
retryDelayOnFailover: 100,
|
||||
lazyConnect: false,
|
||||
};
|
||||
}
|
||||
|
||||
private createWorkers(queueName: string, connection: any) {
|
||||
for (let i = 0; i < this.config.workers; i++) {
|
||||
const worker = new Worker(queueName, this.processJob.bind(this), {
|
||||
connection: { ...connection },
|
||||
concurrency: this.config.concurrency,
|
||||
maxStalledCount: 1,
|
||||
stalledInterval: 30000,
|
||||
});
|
||||
|
||||
// Setup events inline
|
||||
worker.on('ready', () => this.logger.info(`Worker ${i + 1} ready`));
|
||||
worker.on('error', error => this.logger.error(`Worker ${i + 1} error`, { error }));
|
||||
|
||||
this.workers.push(worker);
|
||||
}
|
||||
|
||||
return {
|
||||
workerCount: this.config.workers,
|
||||
totalConcurrency: this.config.workers * this.config.concurrency,
|
||||
};
|
||||
}
|
||||
private setupQueueEvents() {
|
||||
// Add comprehensive logging to see job flow
|
||||
this.queueEvents.on('added', job => {
|
||||
this.logger.debug('Job added to queue', {
|
||||
id: job.jobId,
|
||||
});
|
||||
});
|
||||
|
||||
this.queueEvents.on('waiting', job => {
|
||||
this.logger.debug('Job moved to waiting', {
|
||||
id: job.jobId,
|
||||
});
|
||||
});
|
||||
|
||||
this.queueEvents.on('active', job => {
|
||||
this.logger.debug('Job became active', {
|
||||
id: job.jobId,
|
||||
});
|
||||
});
|
||||
|
||||
this.queueEvents.on('delayed', job => {
|
||||
this.logger.debug('Job delayed', {
|
||||
id: job.jobId,
|
||||
delay: job.delay,
|
||||
});
|
||||
});
|
||||
|
||||
this.queueEvents.on('completed', job => {
|
||||
this.logger.debug('Job completed', {
|
||||
id: job.jobId,
|
||||
});
|
||||
});
|
||||
|
||||
this.queueEvents.on('failed', (job, error) => {
|
||||
this.logger.error('Job failed', {
|
||||
id: job.jobId,
|
||||
error: String(error),
|
||||
});
|
||||
});
|
||||
}
|
||||
private async registerProviders() {
|
||||
this.logger.info('Registering providers...');
|
||||
|
||||
try {
|
||||
// Define providers to register
|
||||
const providers = [
|
||||
{ module: '../providers/proxy.provider', export: 'proxyProvider' },
|
||||
{ module: '../providers/quotemedia.provider', export: 'quotemediaProvider' },
|
||||
{ module: '../providers/yahoo.provider', export: 'yahooProvider' },
|
||||
];
|
||||
|
||||
// Import and register all providers
|
||||
for (const { module, export: exportName } of providers) {
|
||||
const providerModule = await import(module);
|
||||
providerRegistry.registerProvider(providerModule[exportName]);
|
||||
}
|
||||
|
||||
this.logger.info('All providers registered successfully');
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to register providers', { error });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
private async processJob(job: Job) {
|
||||
const { provider, operation, payload }: JobData = job.data;
|
||||
|
||||
this.logger.info('Processing job', {
|
||||
id: job.id,
|
||||
provider,
|
||||
operation,
|
||||
payloadKeys: Object.keys(payload || {}),
|
||||
});
|
||||
try {
|
||||
let result;
|
||||
|
||||
if (operation === 'process-batch-items') {
|
||||
// Special handling for batch processing - requires 2 parameters
|
||||
const { processBatchJob } = await import('../utils/batch-helpers');
|
||||
result = await processBatchJob(payload, this);
|
||||
} else {
|
||||
// Regular handler lookup - requires 1 parameter
|
||||
const handler = providerRegistry.getHandler(provider, operation);
|
||||
|
||||
if (!handler) {
|
||||
throw new Error(`No handler found for ${provider}:${operation}`);
|
||||
}
|
||||
|
||||
result = await handler(payload);
|
||||
}
|
||||
|
||||
this.logger.info('Job completed successfully', {
|
||||
id: job.id,
|
||||
provider,
|
||||
operation,
|
||||
});
|
||||
|
||||
return result;
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
this.logger.error('Job failed', {
|
||||
id: job.id,
|
||||
provider,
|
||||
operation,
|
||||
error: errorMessage,
|
||||
});
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async addBulk(jobs: any[]): Promise<any[]> {
|
||||
return await this.queue.addBulk(jobs);
|
||||
}
|
||||
|
||||
private getTotalConcurrency() {
|
||||
return this.workers.reduce((total, worker) => total + (worker.opts.concurrency || 1), 0);
|
||||
}
|
||||
|
||||
private async setupScheduledTasks() {
|
||||
const allScheduledJobs = providerRegistry.getAllScheduledJobs();
|
||||
|
||||
if (allScheduledJobs.length === 0) {
|
||||
this.logger.warn('No scheduled jobs found in providers');
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.info('Setting up scheduled tasks...', { count: allScheduledJobs.length });
|
||||
|
||||
// Use Promise.allSettled for parallel processing + better error handling
|
||||
const results = await Promise.allSettled(
|
||||
allScheduledJobs.map(async ({ provider, job }) => {
|
||||
await this.addRecurringJob(
|
||||
{
|
||||
type: job.type,
|
||||
provider,
|
||||
operation: job.operation,
|
||||
payload: job.payload,
|
||||
priority: job.priority,
|
||||
immediately: job.immediately || false,
|
||||
},
|
||||
job.cronPattern
|
||||
);
|
||||
|
||||
return { provider, operation: job.operation };
|
||||
})
|
||||
);
|
||||
|
||||
// Log results
|
||||
const successful = results.filter(r => r.status === 'fulfilled');
|
||||
const failed = results.filter(r => r.status === 'rejected');
|
||||
|
||||
if (failed.length > 0) {
|
||||
failed.forEach((result, index) => {
|
||||
const { provider, job } = allScheduledJobs[index];
|
||||
this.logger.error('Failed to register scheduled job', {
|
||||
provider,
|
||||
operation: job.operation,
|
||||
error: result.reason,
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
this.logger.info('Scheduled tasks setup complete', {
|
||||
successful: successful.length,
|
||||
failed: failed.length,
|
||||
});
|
||||
}
|
||||
private async addJobInternal(jobData: JobData, options: any = {}) {
|
||||
if (!this.isInitialized) {
|
||||
throw new Error('Queue service not initialized');
|
||||
}
|
||||
|
||||
const jobType = jobData.type || `${jobData.provider}-${jobData.operation}`;
|
||||
return this.queue.add(jobType, jobData, {
|
||||
priority: jobData.priority || 0,
|
||||
removeOnComplete: 10,
|
||||
removeOnFail: 5,
|
||||
...options,
|
||||
});
|
||||
}
|
||||
|
||||
async addJob(jobData: JobData, options?: any) {
|
||||
return this.addJobInternal(jobData, options);
|
||||
}
|
||||
|
||||
async addRecurringJob(jobData: JobData, cronPattern: string, options?: any) {
|
||||
const jobKey = `recurring-${jobData.provider}-${jobData.operation}`;
|
||||
|
||||
return this.addJobInternal(jobData, {
|
||||
repeat: {
|
||||
pattern: cronPattern,
|
||||
tz: 'UTC',
|
||||
immediately: jobData.immediately || false,
|
||||
},
|
||||
jobId: jobKey,
|
||||
removeOnComplete: 1,
|
||||
removeOnFail: 1,
|
||||
attempts: 2,
|
||||
backoff: {
|
||||
type: 'fixed',
|
||||
delay: 5000,
|
||||
},
|
||||
...options,
|
||||
});
|
||||
}
|
||||
async getJobStats() {
|
||||
if (!this.isInitialized) {
|
||||
throw new Error('Queue service not initialized. Call initialize() first.');
|
||||
}
|
||||
const [waiting, active, completed, failed, delayed] = await Promise.all([
|
||||
this.queue.getWaiting(),
|
||||
this.queue.getActive(),
|
||||
this.queue.getCompleted(),
|
||||
this.queue.getFailed(),
|
||||
this.queue.getDelayed(),
|
||||
]);
|
||||
|
||||
return {
|
||||
waiting: waiting.length,
|
||||
active: active.length,
|
||||
completed: completed.length,
|
||||
failed: failed.length,
|
||||
delayed: delayed.length,
|
||||
};
|
||||
}
|
||||
async drainQueue() {
|
||||
if (this.isInitialized) {
|
||||
await this.queue.drain();
|
||||
}
|
||||
}
|
||||
async getQueueStatus() {
|
||||
if (!this.isInitialized) {
|
||||
throw new Error('Queue service not initialized');
|
||||
}
|
||||
|
||||
const stats = await this.getJobStats();
|
||||
return {
|
||||
...stats,
|
||||
workers: this.workers.length,
|
||||
concurrency: this.getTotalConcurrency(),
|
||||
};
|
||||
}
|
||||
async shutdown() {
|
||||
if (!this.isInitialized) {
|
||||
this.logger.warn('Queue service not initialized, nothing to shutdown');
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.info('Shutting down queue service gracefully...');
|
||||
|
||||
try {
|
||||
// Step 1: Stop accepting new jobs and wait for current jobs to finish
|
||||
this.logger.debug('Closing workers gracefully...');
|
||||
const workerClosePromises = this.workers.map(async (worker, index) => {
|
||||
this.logger.debug(`Closing worker ${index + 1}/${this.workers.length}`);
|
||||
try {
|
||||
// Wait for current jobs to finish, then close
|
||||
await Promise.race([
|
||||
worker.close(),
|
||||
new Promise((_, reject) =>
|
||||
setTimeout(() => reject(new Error(`Worker ${index + 1} close timeout`)), 5000)
|
||||
),
|
||||
]);
|
||||
this.logger.debug(`Worker ${index + 1} closed successfully`);
|
||||
} catch (error) {
|
||||
this.logger.error(`Failed to close worker ${index + 1}`, { error });
|
||||
// Force close if graceful close fails
|
||||
await worker.close(true);
|
||||
}
|
||||
});
|
||||
|
||||
await Promise.allSettled(workerClosePromises);
|
||||
this.logger.debug('All workers closed');
|
||||
|
||||
// Step 2: Close queue and events with timeout protection
|
||||
this.logger.debug('Closing queue and events...');
|
||||
await Promise.allSettled([
|
||||
Promise.race([
|
||||
this.queue.close(),
|
||||
new Promise((_, reject) =>
|
||||
setTimeout(() => reject(new Error('Queue close timeout')), 3000)
|
||||
),
|
||||
]).catch(error => this.logger.error('Queue close error', { error })),
|
||||
|
||||
Promise.race([
|
||||
this.queueEvents.close(),
|
||||
new Promise((_, reject) =>
|
||||
setTimeout(() => reject(new Error('QueueEvents close timeout')), 3000)
|
||||
),
|
||||
]).catch(error => this.logger.error('QueueEvents close error', { error })),
|
||||
]);
|
||||
|
||||
this.logger.info('Queue service shutdown completed successfully');
|
||||
} catch (error) {
|
||||
this.logger.error('Error during queue service shutdown', { error });
|
||||
// Force close everything as last resort
|
||||
try {
|
||||
await Promise.allSettled([
|
||||
...this.workers.map(worker => worker.close(true)),
|
||||
this.queue.close(),
|
||||
this.queueEvents.close(),
|
||||
]);
|
||||
} catch (forceCloseError) {
|
||||
this.logger.error('Force close also failed', { error: forceCloseError });
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const queueManager = new QueueService();
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue