fully cleaned things up, few more things to go.

This commit is contained in:
Boki 2025-06-14 15:42:47 -04:00
parent e5170b1c78
commit ad5e353ec3
11 changed files with 180 additions and 362 deletions

View file

@ -9,8 +9,8 @@ import { connectMongoDB, disconnectMongoDB } from '@stock-bot/mongodb-client';
import { Shutdown } from '@stock-bot/shutdown'; import { Shutdown } from '@stock-bot/shutdown';
import { initializeIBResources } from './providers/ib.tasks'; import { initializeIBResources } from './providers/ib.tasks';
import { initializeProxyResources } from './providers/proxy.tasks'; import { initializeProxyResources } from './providers/proxy.tasks';
import { queueManager } from './services/queue-manager.service'; import { initializeQueueManager, shutdownQueueManager } from './services/queue-manager.service';
import { healthRoutes, marketDataRoutes, proxyRoutes, queueRoutes, testRoutes } from './routes'; import { healthRoutes, queueRoutes } from './routes';
// Load environment variables // Load environment variables
loadEnvVariables(); loadEnvVariables();
@ -26,9 +26,6 @@ const shutdown = Shutdown.getInstance({ timeout: 15000 });
// Register all routes // Register all routes
app.route('', healthRoutes); app.route('', healthRoutes);
app.route('', queueRoutes); app.route('', queueRoutes);
app.route('', marketDataRoutes);
app.route('', proxyRoutes);
app.route('', testRoutes);
// Initialize services // Initialize services
async function initializeServices() { async function initializeServices() {
@ -57,7 +54,7 @@ async function initializeServices() {
// Initialize queue manager (includes batch cache initialization) // Initialize queue manager (includes batch cache initialization)
logger.info('Starting queue manager initialization...'); logger.info('Starting queue manager initialization...');
await queueManager.initialize(); await initializeQueueManager();
logger.info('Queue manager initialized'); logger.info('Queue manager initialized');
logger.info('All services initialized successfully'); logger.info('All services initialized successfully');
@ -95,7 +92,7 @@ shutdown.onShutdown(async () => {
shutdown.onShutdown(async () => { shutdown.onShutdown(async () => {
logger.info('Shutting down queue manager...'); logger.info('Shutting down queue manager...');
try { try {
await queueManager.shutdown(); await shutdownQueueManager();
logger.info('Queue manager shut down successfully'); logger.info('Queue manager shut down successfully');
} catch (error) { } catch (error) {
logger.error('Error shutting down queue manager', { error }); logger.error('Error shutting down queue manager', { error });

View file

@ -2,7 +2,7 @@
* Health check routes * Health check routes
*/ */
import { Hono } from 'hono'; import { Hono } from 'hono';
import { queueManager } from '../services/queue.service'; import { queueManager } from '../services/queue-manager.service';
export const healthRoutes = new Hono(); export const healthRoutes = new Hono();
@ -14,7 +14,7 @@ healthRoutes.get('/health', c => {
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
queue: { queue: {
status: 'running', status: 'running',
workers: queueManager.getWorkerCount(), name: queueManager.getQueueName(),
}, },
}); });
}); });

View file

@ -3,6 +3,3 @@
*/ */
export { healthRoutes } from './health.routes'; export { healthRoutes } from './health.routes';
export { queueRoutes } from './queue.routes'; export { queueRoutes } from './queue.routes';
export { marketDataRoutes } from './market-data.routes';
export { proxyRoutes } from './proxy.routes';
export { testRoutes } from './test.routes';

View file

@ -3,7 +3,7 @@
*/ */
import { Hono } from 'hono'; import { Hono } from 'hono';
import { getLogger } from '@stock-bot/logger'; import { getLogger } from '@stock-bot/logger';
import { queueManager } from '../services/queue-manager.service'; import { processItems, queueManager } from '../services/queue-manager.service';
const logger = getLogger('market-data-routes'); const logger = getLogger('market-data-routes');
@ -16,7 +16,7 @@ marketDataRoutes.get('/api/live/:symbol', async c => {
try { try {
// Queue job for live data using Yahoo provider // Queue job for live data using Yahoo provider
const job = await queueManager.addJob('market-data-live', { const job = await queueManager.add('market-data-live', {
type: 'market-data-live', type: 'market-data-live',
provider: 'yahoo-finance', provider: 'yahoo-finance',
operation: 'live-data', operation: 'live-data',
@ -46,7 +46,7 @@ marketDataRoutes.get('/api/historical/:symbol', async c => {
const toDate = to ? new Date(to) : new Date(); // Now const toDate = to ? new Date(to) : new Date(); // Now
// Queue job for historical data using Yahoo provider // Queue job for historical data using Yahoo provider
const job = await queueManager.addJob('market-data-historical', { const job = await queueManager.add('market-data-historical', {
type: 'market-data-historical', type: 'market-data-historical',
provider: 'yahoo-finance', provider: 'yahoo-finance',
operation: 'historical-data', operation: 'historical-data',
@ -94,7 +94,7 @@ marketDataRoutes.post('/api/process-symbols', async c => {
useBatching, useBatching,
}); });
const result = await queueManager.processSymbols(symbols, { const result = await processItems(symbols, queueManager, {
totalDelayMs, totalDelayMs,
useBatching, useBatching,
batchSize, batchSize,

View file

@ -1,76 +0,0 @@
/**
* Proxy management routes
*/
import { Hono } from 'hono';
import { getLogger } from '@stock-bot/logger';
import { queueManager } from '../services/queue.service';
const logger = getLogger('proxy-routes');
export const proxyRoutes = new Hono();
// Proxy management endpoints
proxyRoutes.post('/api/proxy/fetch', async c => {
try {
const job = await queueManager.addJob({
type: 'proxy-fetch',
provider: 'proxy-provider',
operation: 'fetch-and-check',
payload: {},
priority: 5,
});
return c.json({
status: 'success',
jobId: job.id,
message: 'Proxy fetch job queued',
});
} catch (error) {
logger.error('Failed to queue proxy fetch', { error });
return c.json({ status: 'error', message: 'Failed to queue proxy fetch' }, 500);
}
});
proxyRoutes.post('/api/proxy/check', async c => {
try {
const { proxies } = await c.req.json();
const job = await queueManager.addJob({
type: 'proxy-check',
provider: 'proxy-provider',
operation: 'check-specific',
payload: { proxies },
priority: 8,
});
return c.json({
status: 'success',
jobId: job.id,
message: `Proxy check job queued for ${proxies.length} proxies`,
});
} catch (error) {
logger.error('Failed to queue proxy check', { error });
return c.json({ status: 'error', message: 'Failed to queue proxy check' }, 500);
}
});
// Get proxy stats via queue
proxyRoutes.get('/api/proxy/stats', async c => {
try {
const job = await queueManager.addJob({
type: 'proxy-stats',
provider: 'proxy-provider',
operation: 'get-stats',
payload: {},
priority: 3,
});
return c.json({
status: 'success',
jobId: job.id,
message: 'Proxy stats job queued',
});
} catch (error) {
logger.error('Failed to queue proxy stats', { error });
return c.json({ status: 'error', message: 'Failed to queue proxy stats' }, 500);
}
});

View file

@ -23,7 +23,7 @@ queueRoutes.get('/api/queue/status', async c => {
queueRoutes.post('/api/queue/job', async c => { queueRoutes.post('/api/queue/job', async c => {
try { try {
const { name, data, options } = await c.req.json(); const { name, data, options } = await c.req.json();
const job = await queueManager.addJob(name, data, options); const job = await queueManager.add(name, data, options);
return c.json({ status: 'success', jobId: job.id }); return c.json({ status: 'success', jobId: job.id });
} catch (error) { } catch (error) {
logger.error('Failed to add job', { error }); logger.error('Failed to add job', { error });

View file

@ -1,87 +0,0 @@
/**
* Test and development routes for batch processing
*/
import { Hono } from 'hono';
import { getLogger } from '@stock-bot/logger';
import { queueManager } from '../services/queue.service';
const logger = getLogger('test-routes');
export const testRoutes = new Hono();
// Test endpoint for new functional batch processing
testRoutes.post('/api/test/batch-symbols', async c => {
try {
const { symbols, useBatching = false, totalDelayHours = 1 } = await c.req.json();
const { processItems } = await import('../utils/batch-helpers');
if (!symbols || !Array.isArray(symbols)) {
return c.json({ status: 'error', message: 'symbols array is required' }, 400);
}
const result = await processItems(
symbols,
(symbol, index) => ({
symbol,
index,
timestamp: new Date().toISOString(),
}),
queueManager,
{
totalDelayHours,
useBatching,
batchSize: 10,
priority: 1,
provider: 'test-provider',
operation: 'live-data',
}
);
return c.json({
status: 'success',
message: 'Batch processing started',
result,
});
} catch (error) {
logger.error('Failed to start batch symbol processing', { error });
return c.json({ status: 'error', message: 'Failed to start batch processing' }, 500);
}
});
testRoutes.post('/api/test/batch-custom', async c => {
try {
const { items, useBatching = false, totalDelayHours = 0.5 } = await c.req.json();
const { processItems } = await import('../utils/batch-helpers');
if (!items || !Array.isArray(items)) {
return c.json({ status: 'error', message: 'items array is required' }, 400);
}
const result = await processItems(
items,
(item, index) => ({
originalItem: item,
processIndex: index,
timestamp: new Date().toISOString(),
}),
queueManager,
{
totalDelayHours,
useBatching,
batchSize: 5,
priority: 1,
provider: 'test-provider',
operation: 'custom-test',
}
);
return c.json({
status: 'success',
message: 'Custom batch processing started',
result,
});
} catch (error) {
logger.error('Failed to start custom batch processing', { error });
return c.json({ status: 'error', message: 'Failed to start custom batch processing' }, 500);
}
});

View file

@ -1,184 +1,57 @@
/** /**
* Data Service Queue Manager * Data Service Queue Manager
* Uses the new @stock-bot/queue library with provider registry * Uses the enhanced @stock-bot/queue library with provider registry
*/ */
import { getLogger } from '@stock-bot/logger'; import { getLogger } from '@stock-bot/logger';
import type { JobData } from '@stock-bot/queue'; import type { QueueConfig } from '@stock-bot/queue';
import { initializeBatchCache, providerRegistry, QueueManager } from '@stock-bot/queue'; import { processItems, QueueManager } from '@stock-bot/queue';
const logger = getLogger('queue-manager-service'); const logger = getLogger('queue-manager-service');
class DataServiceQueueManager { /**
private queueManager: QueueManager; * Create and configure the enhanced queue manager for data service
private isInitialized = false; */
function createDataServiceQueueManager(): QueueManager {
constructor() { const config: QueueConfig = {
this.queueManager = new QueueManager({ queueName: 'data-service-queue',
queueName: 'data-service-queue', workers: parseInt(process.env.WORKER_COUNT || '5'),
workers: parseInt(process.env.WORKER_COUNT || '5'), concurrency: parseInt(process.env.WORKER_CONCURRENCY || '20'),
concurrency: parseInt(process.env.WORKER_CONCURRENCY || '20'), redis: {
redis: { host: process.env.DRAGONFLY_HOST || 'localhost',
host: process.env.DRAGONFLY_HOST || 'localhost', port: parseInt(process.env.DRAGONFLY_PORT || '6379'),
port: parseInt(process.env.DRAGONFLY_PORT || '6379'), },
providers: [
// Import and initialize providers lazily
async () => {
const { initializeIBProvider } = await import('../providers/ib.provider');
return initializeIBProvider();
}, },
}); async () => {
} const { initializeProxyProvider } = await import('../providers/proxy.provider');
return initializeProxyProvider();
},
],
enableScheduledJobs: true,
};
async initialize() { return new QueueManager(config);
if (this.isInitialized) {
logger.warn('Queue manager already initialized');
return;
}
logger.info('Initializing data service queue manager...');
try {
// Register all providers
await this.registerProviders();
// Initialize the queue manager
await this.queueManager.initialize();
// Initialize batch cache
await initializeBatchCache(this.queueManager);
// Set up scheduled jobs
await this.setupScheduledJobs();
this.isInitialized = true;
logger.info('Data service queue manager initialized successfully');
} catch (error) {
logger.error('Failed to initialize queue manager', error);
throw error;
}
}
private async registerProviders() {
logger.info('Registering queue providers...');
// Initialize providers using the new provider registry system
const { initializeIBProvider } = await import('../providers/ib.provider');
const { initializeProxyProvider } = await import('../providers/proxy.provider');
// Register providers with scheduled jobs
initializeIBProvider();
initializeProxyProvider();
// Now register all providers from the registry with the queue manager
const allProviders = providerRegistry.getAllProviders();
for (const [providerName, config] of allProviders) {
this.queueManager.registerProvider(providerName, config.operations);
logger.info(`Registered provider: ${providerName}`);
}
// Log scheduled jobs
const scheduledJobs = providerRegistry.getAllScheduledJobs();
logger.info(`Registered ${scheduledJobs.length} scheduled jobs across all providers`);
for (const { provider, job } of scheduledJobs) {
logger.info(
`Scheduled job: ${provider}.${job.type} - ${job.description} (${job.cronPattern})`
);
}
logger.info('All providers registered successfully');
}
private async setupScheduledJobs() {
const scheduledJobs = providerRegistry.getAllScheduledJobs();
if (scheduledJobs.length === 0) {
logger.info('No scheduled jobs found');
return;
}
logger.info(`Setting up ${scheduledJobs.length} scheduled jobs...`);
for (const { provider, job } of scheduledJobs) {
try {
const jobData: JobData = {
type: job.type,
provider,
operation: job.operation,
payload: job.payload,
priority: job.priority,
};
await this.queueManager.add(`recurring-${provider}-${job.operation}`, jobData, {
repeat: {
pattern: job.cronPattern,
tz: 'UTC',
immediately: job.immediately || false,
},
removeOnComplete: 1,
removeOnFail: 1,
attempts: 2,
backoff: {
type: 'fixed',
delay: 5000,
},
});
logger.info(`Scheduled job registered: ${provider}.${job.type} (${job.cronPattern})`);
} catch (error) {
logger.error(`Failed to register scheduled job: ${provider}.${job.type}`, { error });
}
}
logger.info('Scheduled jobs setup complete');
}
async addJob(name: string, data: JobData, options?: Record<string, unknown>) {
if (!this.isInitialized) {
throw new Error('Queue manager not initialized');
}
return this.queueManager.add(name, data, options);
}
async addBulk(jobs: Array<{ name: string; data: JobData; opts?: Record<string, unknown> }>) {
if (!this.isInitialized) {
throw new Error('Queue manager not initialized');
}
return this.queueManager.addBulk(jobs);
}
async getStats() {
if (!this.isInitialized) {
throw new Error('Queue manager not initialized');
}
return this.queueManager.getStats();
}
async clean(grace: number) {
if (!this.isInitialized) {
throw new Error('Queue manager not initialized');
}
return this.queueManager.clean(grace);
}
async shutdown() {
if (!this.isInitialized) {
return;
}
logger.info('Shutting down queue manager...');
await this.queueManager.shutdown();
this.isInitialized = false;
logger.info('Queue manager shutdown complete');
}
// Compatibility methods for existing code
getQueueName() {
return this.queueManager.getQueueName();
}
get queue() {
return this.queueManager;
}
} }
// Export singleton instance // Create singleton instance
export const queueManager = new DataServiceQueueManager(); export const queueManager = createDataServiceQueueManager();
// Export convenience functions that use the queue manager
export async function initializeQueueManager() {
logger.info('Initializing data service queue manager...');
await queueManager.initialize();
logger.info('Data service queue manager initialized successfully');
}
export async function shutdownQueueManager() {
logger.info('Shutting down data service queue manager...');
await queueManager.shutdown();
logger.info('Data service queue manager shutdown complete');
}
// Export processItems for direct use
export { processItems };

View file

@ -17,6 +17,7 @@ export type {
ProcessOptions, ProcessOptions,
ProviderConfig, ProviderConfig,
ProviderConfigWithSchedule, ProviderConfigWithSchedule,
ProviderInitializer,
QueueConfig, QueueConfig,
ScheduledJob, ScheduledJob,
} from './types'; } from './types';

View file

@ -2,7 +2,7 @@ import { Queue, QueueEvents, Worker, type Job } from 'bullmq';
import { getLogger } from '@stock-bot/logger'; import { getLogger } from '@stock-bot/logger';
import { processBatchJob } from './batch-processor'; import { processBatchJob } from './batch-processor';
import { providerRegistry } from './provider-registry'; import { providerRegistry } from './provider-registry';
import type { JobData, ProviderConfig, QueueConfig } from './types'; import type { JobData, ProviderConfig, ProviderInitializer, QueueConfig } from './types';
const logger = getLogger('queue-manager'); const logger = getLogger('queue-manager');
@ -11,6 +11,8 @@ export class QueueManager {
private workers: Worker[] = []; private workers: Worker[] = [];
private queueEvents!: QueueEvents; private queueEvents!: QueueEvents;
private config: Required<QueueConfig>; private config: Required<QueueConfig>;
private providers: ProviderInitializer[];
private enableScheduledJobs: boolean;
private get isInitialized() { private get isInitialized() {
return !!this.queue; return !!this.queue;
@ -24,6 +26,10 @@ export class QueueManager {
} }
constructor(config: QueueConfig = {}) { constructor(config: QueueConfig = {}) {
// Enhanced configuration
this.providers = config.providers || [];
this.enableScheduledJobs = config.enableScheduledJobs ?? true;
// Set default configuration // Set default configuration
this.config = { this.config = {
workers: config.workers || parseInt(process.env.WORKER_COUNT || '5'), workers: config.workers || parseInt(process.env.WORKER_COUNT || '5'),
@ -45,11 +51,13 @@ export class QueueManager {
}, },
...config.defaultJobOptions, ...config.defaultJobOptions,
}, },
providers: this.providers,
enableScheduledJobs: this.enableScheduledJobs,
}; };
} }
/** /**
* Initialize the queue manager * Initialize the queue manager with enhanced provider and scheduled job support
*/ */
async initialize(): Promise<void> { async initialize(): Promise<void> {
if (this.isInitialized) { if (this.isInitialized) {
@ -57,13 +65,19 @@ export class QueueManager {
return; return;
} }
logger.info('Initializing queue manager...', { logger.info('Initializing enhanced queue manager...', {
queueName: this.config.queueName, queueName: this.config.queueName,
workers: this.config.workers, workers: this.config.workers,
concurrency: this.config.concurrency, concurrency: this.config.concurrency,
providers: this.providers.length,
enableScheduledJobs: this.enableScheduledJobs,
}); });
try { try {
// Step 1: Register all providers
await this.registerProviders();
// Step 2: Initialize core queue infrastructure
const connection = this.getConnection(); const connection = this.getConnection();
const queueName = `{${this.config.queueName}}`; const queueName = `{${this.config.queueName}}`;
@ -76,19 +90,110 @@ export class QueueManager {
// Initialize queue events // Initialize queue events
this.queueEvents = new QueueEvents(queueName, { connection }); this.queueEvents = new QueueEvents(queueName, { connection });
// Start workers // Step 3: Start workers
await this.startWorkers(); await this.startWorkers();
// Setup event listeners // Step 4: Setup event listeners
this.setupEventListeners(); this.setupEventListeners();
logger.info('Queue manager initialized successfully'); // Step 5: Initialize batch cache
const { initializeBatchCache } = await import('./batch-processor');
await initializeBatchCache(this);
// Step 6: Set up scheduled jobs
if (this.enableScheduledJobs) {
await this.setupScheduledJobs();
}
logger.info('Enhanced queue manager initialized successfully');
} catch (error) { } catch (error) {
logger.error('Failed to initialize queue manager', { error }); logger.error('Failed to initialize enhanced queue manager', { error });
throw error; throw error;
} }
} }
/**
* Register all configured providers
*/
private async registerProviders(): Promise<void> {
logger.info('Registering queue providers...', { count: this.providers.length });
// Initialize providers using the configured provider initializers
for (const providerInitializer of this.providers) {
try {
await providerInitializer();
} catch (error) {
logger.error('Failed to initialize provider', { error });
throw error;
}
}
// Now register all providers from the registry with the queue manager
const allProviders = providerRegistry.getAllProviders();
for (const [providerName, config] of allProviders) {
this.registerProvider(providerName, config.operations);
logger.info(`Registered provider: ${providerName}`);
}
// Log scheduled jobs
const scheduledJobs = providerRegistry.getAllScheduledJobs();
logger.info(`Registered ${scheduledJobs.length} scheduled jobs across all providers`);
for (const { provider, job } of scheduledJobs) {
logger.info(
`Scheduled job: ${provider}.${job.type} - ${job.description} (${job.cronPattern})`
);
}
logger.info('All providers registered successfully');
}
/**
* Set up scheduled jobs from provider registry
*/
private async setupScheduledJobs(): Promise<void> {
const scheduledJobs = providerRegistry.getAllScheduledJobs();
if (scheduledJobs.length === 0) {
logger.info('No scheduled jobs found');
return;
}
logger.info(`Setting up ${scheduledJobs.length} scheduled jobs...`);
for (const { provider, job } of scheduledJobs) {
try {
const jobData: JobData = {
type: job.type,
provider,
operation: job.operation,
payload: job.payload,
priority: job.priority,
};
await this.add(`recurring-${provider}-${job.operation}`, jobData, {
repeat: {
pattern: job.cronPattern,
tz: 'UTC',
immediately: job.immediately || false,
},
removeOnComplete: 1,
removeOnFail: 1,
attempts: 2,
backoff: {
type: 'fixed',
delay: 5000,
},
});
logger.info(`Scheduled job registered: ${provider}.${job.type} (${job.cronPattern})`);
} catch (error) {
logger.error(`Failed to register scheduled job: ${provider}.${job.type}`, { error });
}
}
logger.info('Scheduled jobs setup complete');
}
/** /**
* Register a provider with its operations * Register a provider with its operations
*/ */
@ -99,7 +204,7 @@ export class QueueManager {
/** /**
* Add a single job to the queue * Add a single job to the queue
*/ */
async add(name: string, data: JobData, options: any = {}): Promise<Job> { async add(name: string, data: JobData, options: Record<string, unknown> = {}): Promise<Job> {
this.ensureInitialized(); this.ensureInitialized();
return await this.queue.add(name, data, options); return await this.queue.add(name, data, options);
} }
@ -107,7 +212,9 @@ export class QueueManager {
/** /**
* Add multiple jobs to the queue in bulk * Add multiple jobs to the queue in bulk
*/ */
async addBulk(jobs: Array<{ name: string; data: JobData; opts?: any }>): Promise<Job[]> { async addBulk(
jobs: Array<{ name: string; data: JobData; opts?: Record<string, unknown> }>
): Promise<Job[]> {
this.ensureInitialized(); this.ensureInitialized();
return await this.queue.addBulk(jobs); return await this.queue.addBulk(jobs);
} }

View file

@ -50,6 +50,8 @@ export interface QueueConfig {
delay: number; delay: number;
}; };
}; };
providers?: ProviderInitializer[];
enableScheduledJobs?: boolean;
} }
export interface JobHandler { export interface JobHandler {
@ -82,3 +84,7 @@ export interface BatchJobData {
totalBatches: number; totalBatches: number;
itemCount: number; itemCount: number;
} }
export interface ProviderInitializer {
(): void | Promise<void>;
}