This commit is contained in:
Bojan Kucera 2025-06-08 14:36:23 -04:00
parent 8daaff27fd
commit 811fc86c92
3 changed files with 78 additions and 138 deletions

View file

@ -1,115 +0,0 @@
/**
* Queue API Endpoints
* REST API for monitoring and controlling job queues
*/
import { Router } from 'express';
import { getLogger } from '@stock-bot/logger';
import { proxyQueueIntegration } from '../services/proxy-queue-integration';
const logger = getLogger('queue-api');
const router = Router();
/**
* GET /api/queue/stats
* Get queue statistics
*/
router.get('/stats', async (req, res) => {
try {
const stats = await proxyQueueIntegration.getStats();
res.json({ success: true, data: stats });
} catch (error) {
logger.error('Failed to get queue stats', { error });
res.status(500).json({
success: false,
error: 'Failed to get queue statistics'
});
}
});
/**
* POST /api/queue/proxy/fetch
* Manually trigger proxy fetching
*/
router.post('/proxy/fetch', async (req, res) => {
try {
const job = await proxyQueueIntegration.triggerProxyFetch();
res.json({
success: true,
data: {
jobId: job.id,
message: 'Proxy fetch job queued'
}
});
} catch (error) {
logger.error('Failed to trigger proxy fetch', { error });
res.status(500).json({
success: false,
error: 'Failed to queue proxy fetch job'
});
}
});
/**
* POST /api/queue/proxy/check
* Check specific proxies
*/
router.post('/proxy/check', async (req, res) => {
try {
const { proxies } = req.body;
if (!Array.isArray(proxies) || proxies.length === 0) {
return res.status(400).json({
success: false,
error: 'Proxies array is required'
});
}
const job = await proxyQueueIntegration.checkSpecificProxies(proxies);
res.json({
success: true,
data: {
jobId: job.id,
proxiesCount: proxies.length,
message: 'Proxy check job queued'
}
});
} catch (error) {
logger.error('Failed to queue proxy check', { error });
res.status(500).json({
success: false,
error: 'Failed to queue proxy check job'
});
}
});
/**
* GET /api/queue/health
* Health check for queue service
*/
router.get('/health', async (req, res) => {
try {
const stats = await proxyQueueIntegration.getStats();
const isHealthy = stats.active >= 0; // Basic health check
res.status(isHealthy ? 200 : 503).json({
success: isHealthy,
data: {
status: isHealthy ? 'healthy' : 'unhealthy',
stats,
timestamp: new Date().toISOString()
}
});
} catch (error) {
logger.error('Queue health check failed', { error });
res.status(503).json({
success: false,
data: {
status: 'unhealthy',
error: 'Queue service unavailable',
timestamp: new Date().toISOString()
}
});
}
});
export default router;

View file

@ -122,15 +122,13 @@ async function initializeServices() {
logger.info('Initializing data service...');
try {
// Queue manager is initialized automatically when imported
logger.info('Queue manager initialized');
// Initialize providers
// Initialize queue service
await queueManager.initialize();
logger.info('Queue service initialized');
logger.info('All services initialized successfully');
} catch (error) {
logger.error('Failed to initialize services', { error });
process.exit(1);
throw error;
}
}
@ -155,6 +153,19 @@ async function startServer() {
logger.info(' GET /api/providers - List registered providers');
}
// Graceful shutdown
process.on('SIGINT', async () => {
logger.info('Received SIGINT, shutting down gracefully...');
await queueManager.shutdown();
process.exit(0);
});
process.on('SIGTERM', async () => {
logger.info('Received SIGTERM, shutting down gracefully...');
await queueManager.shutdown();
process.exit(0);
});
startServer().catch(error => {
logger.error('Failed to start server', { error });
process.exit(1);

View file

@ -11,26 +11,53 @@ export interface JobData {
}
export class QueueService {
private logger = new Logger('queue-manager');
private queue: Queue;
private worker: Worker;
private queueEvents: QueueEvents;
private logger = new Logger('queue-service');
private queue!: Queue;
private worker!: Worker;
private queueEvents!: QueueEvents;
private isInitialized = false;
constructor() {
// Don't initialize in constructor to allow for proper async initialization
}
async initialize() {
if (this.isInitialized) {
this.logger.warn('Queue service already initialized');
return;
}
this.logger.info('Initializing queue service...');
const connection = {
host: process.env.DRAGONFLY_HOST || 'localhost',
port: parseInt(process.env.DRAGONFLY_PORT || '6379'),
};
this.queue = new Queue('data-service-queue', { connection });
this.worker = new Worker('data-service-queue', this.processJob.bind(this), {
connection,
concurrency: 10
});
this.queueEvents = new QueueEvents('data-service-queue', { connection });
this.logger.info('Connecting to Redis/Dragonfly', connection);
this.setupEventListeners();
this.setupScheduledTasks();
try {
this.queue = new Queue('data-service-queue', { connection });
this.worker = new Worker('data-service-queue', this.processJob.bind(this), {
connection,
concurrency: 10
});
this.queueEvents = new QueueEvents('data-service-queue', { connection });
// Test connection
await this.queue.waitUntilReady();
await this.worker.waitUntilReady();
await this.queueEvents.waitUntilReady();
this.setupEventListeners();
this.setupScheduledTasks();
this.isInitialized = true;
this.logger.info('Queue service initialized successfully');
} catch (error) {
this.logger.error('Failed to initialize queue service', { error });
throw error;
}
}
private async processJob(job: any) {
@ -111,8 +138,10 @@ export class QueueService {
this.logger.info('Scheduled tasks configured');
}
async addJob(jobData: JobData, options?: any) {
if (!this.isInitialized) {
throw new Error('Queue service not initialized. Call initialize() first.');
}
return this.queue.add(jobData.type, jobData, {
priority: jobData.priority || 0,
removeOnComplete: 10,
@ -122,6 +151,9 @@ export class QueueService {
}
async addRecurringJob(jobData: JobData, cronPattern: string) {
if (!this.isInitialized) {
throw new Error('Queue service not initialized. Call initialize() first.');
}
return this.queue.add(
`recurring-${jobData.type}`,
jobData,
@ -135,6 +167,9 @@ export class QueueService {
}
async getJobStats() {
if (!this.isInitialized) {
throw new Error('Queue service not initialized. Call initialize() first.');
}
const [waiting, active, completed, failed, delayed] = await Promise.all([
this.queue.getWaiting(),
this.queue.getActive(),
@ -150,8 +185,10 @@ export class QueueService {
failed: failed.length,
delayed: delayed.length
};
}
async getQueueStatus() {
} async getQueueStatus() {
if (!this.isInitialized) {
throw new Error('Queue service not initialized. Call initialize() first.');
}
const stats = await this.getJobStats();
return {
...stats,
@ -165,6 +202,9 @@ export class QueueService {
}
getWorkerCount() {
if (!this.isInitialized) {
return 0;
}
return this.worker.opts.concurrency || 1;
}
@ -174,12 +214,16 @@ export class QueueService {
{ name: 'market-data-provider', type: 'market-data', operations: ['live-data', 'historical-data'] }
];
}
async shutdown() {
this.logger.info('Shutting down queue manager');
if (!this.isInitialized) {
this.logger.warn('Queue service not initialized, nothing to shutdown');
return;
}
this.logger.info('Shutting down queue service');
await this.worker.close();
await this.queue.close();
await this.queueEvents.close();
this.isInitialized = false;
}
}