From e5170b1c787e22c016845379b59e73e99acf7673 Mon Sep 17 00:00:00 2001 From: Boki Date: Sat, 14 Jun 2025 15:28:51 -0400 Subject: [PATCH] switching to generic queue lib --- apps/data-service/package.json | 2 +- apps/data-service/src/index.ts | 30 +- .../data-service/src/providers/ib.provider.ts | 106 +++-- .../src/providers/proxy.provider.ts | 132 ++---- .../src/routes/market-data.routes.ts | 55 ++- apps/data-service/src/routes/queue.routes.ts | 31 +- .../src/services/provider-registry.service.ts | 135 ------ .../src/services/queue-manager.service.ts | 184 ++++++++ .../src/services/queue.service.ts | 419 ------------------ apps/data-service/src/utils/batch-helpers.ts | 364 --------------- apps/data-service/tsconfig.json | 3 +- libs/queue/src/index.ts | 11 + libs/queue/src/provider-registry.ts | 95 +++- libs/queue/src/types.ts | 16 + libs/queue/tsconfig.json | 3 +- 15 files changed, 500 insertions(+), 1086 deletions(-) delete mode 100644 apps/data-service/src/services/provider-registry.service.ts create mode 100644 apps/data-service/src/services/queue-manager.service.ts delete mode 100644 apps/data-service/src/services/queue.service.ts delete mode 100644 apps/data-service/src/utils/batch-helpers.ts diff --git a/apps/data-service/package.json b/apps/data-service/package.json index cf40b9d..7ce4e6d 100644 --- a/apps/data-service/package.json +++ b/apps/data-service/package.json @@ -18,10 +18,10 @@ "@stock-bot/http": "*", "@stock-bot/logger": "*", "@stock-bot/mongodb-client": "*", + "@stock-bot/queue": "*", "@stock-bot/questdb-client": "*", "@stock-bot/shutdown": "*", "@stock-bot/types": "*", - "bullmq": "^5.53.2", "hono": "^4.0.0", "p-limit": "^6.2.0", "ws": "^8.0.0" diff --git a/apps/data-service/src/index.ts b/apps/data-service/src/index.ts index 8700374..b08f31e 100644 --- a/apps/data-service/src/index.ts +++ b/apps/data-service/src/index.ts @@ -9,8 +9,7 @@ import { connectMongoDB, disconnectMongoDB } from '@stock-bot/mongodb-client'; import { Shutdown } from '@stock-bot/shutdown'; import { initializeIBResources } from './providers/ib.tasks'; import { initializeProxyResources } from './providers/proxy.tasks'; -import { queueManager } from './services/queue.service'; -import { initializeBatchCache } from './utils/batch-helpers'; +import { queueManager } from './services/queue-manager.service'; import { healthRoutes, marketDataRoutes, proxyRoutes, queueRoutes, testRoutes } from './routes'; // Load environment variables @@ -46,25 +45,20 @@ async function initializeServices() { await Browser.initialize(); logger.info('Browser resources initialized'); - // Initialize batch cache FIRST - before queue service - logger.info('Starting batch cache initialization...'); - await initializeBatchCache(); - logger.info('Batch cache initialized'); + // Initialize proxy resources + logger.info('Starting proxy resources initialization...'); + await initializeProxyResources(); + logger.info('Proxy resources initialized'); - // Initialize proxy cache - before queue service - logger.info('Starting proxy cache initialization...'); - await initializeProxyResources(true); // Wait for cache during startup - logger.info('Proxy cache initialized'); + // Initialize IB resources + logger.info('Starting IB resources initialization...'); + await initializeIBResources(); + logger.info('IB resources initialized'); - // Initialize proxy cache - before queue service - logger.info('Starting proxy cache initialization...'); - await initializeIBResources(true); // Wait for cache during startup - logger.info('Proxy cache initialized'); - - // Initialize queue service (Redis connections should be ready now) - logger.info('Starting queue service initialization...'); + // Initialize queue manager (includes batch cache initialization) + logger.info('Starting queue manager initialization...'); await queueManager.initialize(); - logger.info('Queue service initialized'); + logger.info('Queue manager initialized'); logger.info('All services initialized successfully'); } catch (error) { diff --git a/apps/data-service/src/providers/ib.provider.ts b/apps/data-service/src/providers/ib.provider.ts index 3b1edd2..3225935 100644 --- a/apps/data-service/src/providers/ib.provider.ts +++ b/apps/data-service/src/providers/ib.provider.ts @@ -1,42 +1,82 @@ +/** + * Interactive Brokers Provider for new queue system + */ import { getLogger } from '@stock-bot/logger'; -import { ProviderConfig } from '../services/provider-registry.service'; +import type { ProviderConfigWithSchedule } from '@stock-bot/queue'; +import { providerRegistry } from '@stock-bot/queue'; const logger = getLogger('ib-provider'); -export const ibProvider: ProviderConfig = { - name: 'ib', - operations: { - 'ib-exchanges-and-symbols': async () => { - const { ibTasks } = await import('./ib.tasks'); - logger.info('Fetching symbol summary from IB'); - const sessionHeaders = await ibTasks.fetchSession(); - logger.info('Fetched symbol summary from IB'); +// Initialize and register the IB provider +export function initializeIBProvider() { + logger.info('Registering IB provider with scheduled jobs...'); - if (sessionHeaders) { - logger.info('Fetching exchanges from IB'); - const exchanges = await ibTasks.fetchExchanges(sessionHeaders); - logger.info('Fetched exchanges from IB', { count: exchanges.lenght }); + const ibProviderConfig: ProviderConfigWithSchedule = { + name: 'ib', + operations: { + 'fetch-session': async _payload => { + // payload contains session configuration (not used in current implementation) + logger.debug('Processing session fetch request'); + const { fetchSession } = await import('./ib.tasks'); + return fetchSession(); + }, - // do the same as above but for symbols - logger.info('Fetching symbols from IB'); - const symbols = await ibTasks.fetchSymbols(sessionHeaders); - logger.info('Fetched symbols from IB', { symbols }); + 'fetch-exchanges': async _payload => { + // payload should contain session headers + logger.debug('Processing exchanges fetch request'); + const { fetchSession, fetchExchanges } = await import('./ib.tasks'); + const sessionHeaders = await fetchSession(); + if (sessionHeaders) { + return fetchExchanges(sessionHeaders); + } + throw new Error('Failed to get session headers'); + }, - return { exchangesCount: exchanges?.length, symbolsCount: symbols?.length }; - } + 'fetch-symbols': async _payload => { + // payload should contain session headers + logger.debug('Processing symbols fetch request'); + const { fetchSession, fetchSymbols } = await import('./ib.tasks'); + const sessionHeaders = await fetchSession(); + if (sessionHeaders) { + return fetchSymbols(sessionHeaders); + } + throw new Error('Failed to get session headers'); + }, + + 'ib-exchanges-and-symbols': async _payload => { + // Legacy operation for scheduled jobs + logger.info('Fetching symbol summary from IB'); + const { fetchSession, fetchExchanges, fetchSymbols } = await import('./ib.tasks'); + + const sessionHeaders = await fetchSession(); + logger.info('Fetched symbol summary from IB'); + + if (sessionHeaders) { + logger.info('Fetching exchanges from IB'); + const exchanges = await fetchExchanges(sessionHeaders); + logger.info('Fetched exchanges from IB', { count: exchanges?.length }); + + logger.info('Fetching symbols from IB'); + const symbols = await fetchSymbols(sessionHeaders); + logger.info('Fetched symbols from IB', { symbols }); + + return { exchangesCount: exchanges?.length, symbolsCount: symbols?.length }; + } + }, }, - }, + scheduledJobs: [ + { + type: 'ib-exchanges-and-symbols', + operation: 'ib-exchanges-and-symbols', + payload: {}, + cronPattern: '0 0 * * 0', // Every Sunday at midnight + priority: 5, + description: 'Fetch and update IB exchanges and symbols data', + // immediately: true, // Don't run immediately during startup to avoid conflicts + }, + ], + }; - scheduledJobs: [ - { - type: 'ib-exchanges-and-symbols', - operation: 'ib-exchanges-and-symbols', - payload: {}, - // should remove and just run at the same time so app restarts dont keeping adding same jobs - cronPattern: '0 0 * * 0', - priority: 5, - // immediately: true, // Don't run immediately during startup to avoid conflicts - description: 'Fetch and validate proxy list from sources', - }, - ], -}; + providerRegistry.registerWithSchedule(ibProviderConfig); + logger.info('IB provider registered successfully with scheduled jobs'); +} diff --git a/apps/data-service/src/providers/proxy.provider.ts b/apps/data-service/src/providers/proxy.provider.ts index 0cb492d..3432b1c 100644 --- a/apps/data-service/src/providers/proxy.provider.ts +++ b/apps/data-service/src/providers/proxy.provider.ts @@ -1,98 +1,48 @@ -import { ProxyInfo } from 'libs/http/src/types'; +/** + * Proxy Provider for new queue system + */ +import { ProxyInfo } from '@stock-bot/http'; import { getLogger } from '@stock-bot/logger'; -import { ProviderConfig } from '../services/provider-registry.service'; +import type { ProviderConfigWithSchedule } from '@stock-bot/queue'; +import { providerRegistry } from '@stock-bot/queue'; -// Create logger for this provider const logger = getLogger('proxy-provider'); -// This will run at the same time each day as when the app started -const getEvery24HourCron = (): string => { - const now = new Date(); - const hours = now.getHours(); - const minutes = now.getMinutes(); - return `${minutes} ${hours} * * *`; // Every day at startup time -}; +// Initialize and register the Proxy provider +export function initializeProxyProvider() { + logger.info('Registering proxy provider with scheduled jobs...'); -export const proxyProvider: ProviderConfig = { - name: 'proxy-provider', - operations: { - 'fetch-and-check': async (_payload: { sources?: string[] }) => { - const { proxyService } = await import('./proxy.tasks'); - const { queueManager } = await import('../services/queue.service'); - const { processItems } = await import('../utils/batch-helpers'); - - const proxies = await proxyService.fetchProxiesFromSources(); - - if (proxies.length === 0) { - return { proxiesFetched: 0, jobsCreated: 0 }; - } - - // Use generic function with routing parameters - const result = await processItems( - proxies, - (proxy, index) => ({ - proxy, - index, - source: 'batch-processing', - }), - queueManager, - { - totalDelayHours: 12, //parseFloat(process.env.PROXY_VALIDATION_HOURS || '1'), - batchSize: parseInt(process.env.PROXY_BATCH_SIZE || '200'), - useBatching: process.env.PROXY_DIRECT_MODE !== 'true', - provider: 'proxy-provider', - operation: 'check-proxy', - } - ); - return result; - }, - 'process-batch-items': async (payload: any) => { - // Process a batch using the simplified batch helpers - const { processBatchJob } = await import('../utils/batch-helpers'); - const { queueManager } = await import('../services/queue.service'); - - return await processBatchJob(payload, queueManager); - }, - - 'check-proxy': async (payload: { - proxy: ProxyInfo; - source?: string; - batchIndex?: number; - itemIndex?: number; - total?: number; - }) => { - const { checkProxy } = await import('./proxy.tasks'); - - try { - const result = await checkProxy(payload.proxy); - - logger.debug('Proxy validated', { - proxy: `${payload.proxy.host}:${payload.proxy.port}`, - isWorking: result.isWorking, - responseTime: result.responseTime, + const proxyProviderConfig: ProviderConfigWithSchedule = { + name: 'proxy', + operations: { + 'check-proxy': async (payload: ProxyInfo) => { + // payload is now the raw proxy info object + logger.debug('Processing proxy check request', { + proxy: `${payload.host}:${payload.port}`, }); - - return { result, proxy: payload.proxy }; - } catch (error) { - logger.warn('Proxy validation failed', { - proxy: `${payload.proxy.host}:${payload.proxy.port}`, - error: error instanceof Error ? error.message : String(error), - }); - - return { result: { isWorking: false, error: String(error) }, proxy: payload.proxy }; - } + const { checkProxy } = await import('./proxy.tasks'); + return checkProxy(payload); + }, + 'fetch-from-sources': async _payload => { + // Fetch proxies from all configured sources + logger.info('Processing fetch proxies from sources request'); + const { fetchProxiesFromSources } = await import('./proxy.tasks'); + return fetchProxiesFromSources(); + }, }, - }, - scheduledJobs: [ - // { - // type: 'proxy-maintenance', - // operation: 'fetch-and-check', - // payload: {}, - // // should remove and just run at the same time so app restarts dont keeping adding same jobs - // cronPattern: getEvery24HourCron(), - // priority: 5, - // immediately: true, // Don't run immediately during startup to avoid conflicts - // description: 'Fetch and validate proxy list from sources', - // }, - ], -}; + scheduledJobs: [ + { + type: 'proxy-fetch-and-check', + operation: 'fetch-from-sources', + payload: {}, + cronPattern: '0 */2 * * *', // Every 2 hours + priority: 5, + description: 'Fetch and validate proxy list from sources', + // immediately: true, // Don't run immediately during startup to avoid conflicts + }, + ], + }; + + providerRegistry.registerWithSchedule(proxyProviderConfig); + logger.info('Proxy provider registered successfully with scheduled jobs'); +} diff --git a/apps/data-service/src/routes/market-data.routes.ts b/apps/data-service/src/routes/market-data.routes.ts index 490d1b9..2b34509 100644 --- a/apps/data-service/src/routes/market-data.routes.ts +++ b/apps/data-service/src/routes/market-data.routes.ts @@ -3,7 +3,7 @@ */ import { Hono } from 'hono'; import { getLogger } from '@stock-bot/logger'; -import { queueManager } from '../services/queue.service'; +import { queueManager } from '../services/queue-manager.service'; const logger = getLogger('market-data-routes'); @@ -16,9 +16,8 @@ marketDataRoutes.get('/api/live/:symbol', async c => { try { // Queue job for live data using Yahoo provider - const job = await queueManager.addJob({ + const job = await queueManager.addJob('market-data-live', { type: 'market-data-live', - service: 'market-data', provider: 'yahoo-finance', operation: 'live-data', payload: { symbol }, @@ -47,9 +46,8 @@ marketDataRoutes.get('/api/historical/:symbol', async c => { const toDate = to ? new Date(to) : new Date(); // Now // Queue job for historical data using Yahoo provider - const job = await queueManager.addJob({ + const job = await queueManager.addJob('market-data-historical', { type: 'market-data-historical', - service: 'market-data', provider: 'yahoo-finance', operation: 'historical-data', payload: { @@ -72,3 +70,50 @@ marketDataRoutes.get('/api/historical/:symbol', async c => { return c.json({ status: 'error', message: 'Failed to queue historical data job' }, 500); } }); + +// Batch processing endpoint using new queue system +marketDataRoutes.post('/api/process-symbols', async c => { + try { + const { + symbols, + provider = 'ib', + operation = 'fetch-session', + useBatching = true, + totalDelayMs = 30000, + batchSize = 10, + } = await c.req.json(); + + if (!symbols || !Array.isArray(symbols) || symbols.length === 0) { + return c.json({ status: 'error', message: 'Invalid symbols array' }, 400); + } + + logger.info('Batch processing symbols', { + count: symbols.length, + provider, + operation, + useBatching, + }); + + const result = await queueManager.processSymbols(symbols, { + totalDelayMs, + useBatching, + batchSize, + priority: 2, + provider, + operation, + retries: 2, + removeOnComplete: 5, + removeOnFail: 10, + }); + + return c.json({ + status: 'success', + message: 'Batch processing initiated', + result, + symbols: symbols.length, + }); + } catch (error) { + logger.error('Failed to process symbols batch', { error }); + return c.json({ status: 'error', message: 'Failed to process symbols batch' }, 500); + } +}); diff --git a/apps/data-service/src/routes/queue.routes.ts b/apps/data-service/src/routes/queue.routes.ts index a39ea45..3c0422e 100644 --- a/apps/data-service/src/routes/queue.routes.ts +++ b/apps/data-service/src/routes/queue.routes.ts @@ -3,7 +3,7 @@ */ import { Hono } from 'hono'; import { getLogger } from '@stock-bot/logger'; -import { queueManager } from '../services/queue.service'; +import { queueManager } from '../services/queue-manager.service'; const logger = getLogger('queue-routes'); @@ -12,8 +12,8 @@ export const queueRoutes = new Hono(); // Queue management endpoints queueRoutes.get('/api/queue/status', async c => { try { - const status = await queueManager.getQueueStatus(); - return c.json({ status: 'success', data: status }); + const stats = await queueManager.getStats(); + return c.json({ status: 'success', data: stats }); } catch (error) { logger.error('Failed to get queue status', { error }); return c.json({ status: 'error', message: 'Failed to get queue status' }, 500); @@ -22,8 +22,8 @@ queueRoutes.get('/api/queue/status', async c => { queueRoutes.post('/api/queue/job', async c => { try { - const jobData = await c.req.json(); - const job = await queueManager.addJob(jobData); + const { name, data, options } = await c.req.json(); + const job = await queueManager.addJob(name, data, options); return c.json({ status: 'success', jobId: job.id }); } catch (error) { logger.error('Failed to add job', { error }); @@ -34,9 +34,9 @@ queueRoutes.post('/api/queue/job', async c => { // Provider registry endpoints queueRoutes.get('/api/providers', async c => { try { - const { providerRegistry } = await import('../services/provider-registry.service'); - const providers = providerRegistry.getProviders(); - return c.json({ status: 'success', providers }); + const { providerRegistry } = await import('@stock-bot/queue'); + const configs = providerRegistry.getProviderConfigs(); + return c.json({ status: 'success', providers: configs }); } catch (error) { logger.error('Failed to get providers', { error }); return c.json({ status: 'error', message: 'Failed to get providers' }, 500); @@ -46,7 +46,7 @@ queueRoutes.get('/api/providers', async c => { // Add new endpoint to see scheduled jobs queueRoutes.get('/api/scheduled-jobs', async c => { try { - const { providerRegistry } = await import('../services/provider-registry.service'); + const { providerRegistry } = await import('@stock-bot/queue'); const jobs = providerRegistry.getAllScheduledJobs(); return c.json({ status: 'success', @@ -59,13 +59,14 @@ queueRoutes.get('/api/scheduled-jobs', async c => { } }); -queueRoutes.post('/api/queue/drain', async c => { +queueRoutes.post('/api/queue/clean', async c => { try { - await queueManager.drainQueue(); - const status = await queueManager.getQueueStatus(); - return c.json({ status: 'success', message: 'Queue drained', queueStatus: status }); + const { grace = 60000 } = await c.req.json(); // Default 1 minute + await queueManager.clean(grace); + const stats = await queueManager.getStats(); + return c.json({ status: 'success', message: 'Queue cleaned', queueStats: stats }); } catch (error) { - logger.error('Failed to drain queue', { error }); - return c.json({ status: 'error', message: 'Failed to drain queue' }, 500); + logger.error('Failed to clean queue', { error }); + return c.json({ status: 'error', message: 'Failed to clean queue' }, 500); } }); diff --git a/apps/data-service/src/services/provider-registry.service.ts b/apps/data-service/src/services/provider-registry.service.ts deleted file mode 100644 index d485d97..0000000 --- a/apps/data-service/src/services/provider-registry.service.ts +++ /dev/null @@ -1,135 +0,0 @@ -import { getLogger } from '@stock-bot/logger'; - -export interface JobHandler { - (payload: any): Promise; -} - -export interface JobData { - type?: string; - provider: string; - operation: string; - payload: any; - priority?: number; - immediately?: boolean; -} - -export interface ScheduledJob { - type: string; - operation: string; - payload: any; - cronPattern: string; - priority?: number; - description?: string; - immediately?: boolean; -} - -export interface ProviderConfig { - name: string; - operations: Record; - scheduledJobs?: ScheduledJob[]; -} - -export interface ProviderRegistry { - registerProvider: (config: ProviderConfig) => void; - getHandler: (provider: string, operation: string) => JobHandler | null; - getAllScheduledJobs: () => Array<{ provider: string; job: ScheduledJob }>; - getProviders: () => Array<{ key: string; config: ProviderConfig }>; - hasProvider: (provider: string) => boolean; - clear: () => void; -} - -/** - * Create a new provider registry instance - */ -export function createProviderRegistry(): ProviderRegistry { - const logger = getLogger('provider-registry'); - const providers = new Map(); - - /** - * Register a provider with its operations - */ - function registerProvider(config: ProviderConfig): void { - providers.set(config.name, config); - logger.info(`Registered provider: ${config.name}`, { - operations: Object.keys(config.operations), - scheduledJobs: config.scheduledJobs?.length || 0, - }); - } - - /** - * Get a job handler for a specific provider and operation - */ - function getHandler(provider: string, operation: string): JobHandler | null { - const providerConfig = providers.get(provider); - - if (!providerConfig) { - logger.warn(`Provider not found: ${provider}`); - return null; - } - - const handler = providerConfig.operations[operation]; - if (!handler) { - logger.warn(`Operation not found: ${operation} in provider ${provider}`); - return null; - } - - return handler; - } - - /** - * Get all scheduled jobs from all providers - */ - function getAllScheduledJobs(): Array<{ provider: string; job: ScheduledJob }> { - const allJobs: Array<{ provider: string; job: ScheduledJob }> = []; - - for (const [, config] of providers) { - if (config.scheduledJobs) { - for (const job of config.scheduledJobs) { - allJobs.push({ - provider: config.name, - job, - }); - } - } - } - - return allJobs; - } - - /** - * Get all registered providers with their configurations - */ - function getProviders(): Array<{ key: string; config: ProviderConfig }> { - return Array.from(providers.entries()).map(([key, config]) => ({ - key, - config, - })); - } - - /** - * Check if a provider exists - */ - function hasProvider(provider: string): boolean { - return providers.has(provider); - } - - /** - * Clear all providers (useful for testing) - */ - function clear(): void { - providers.clear(); - logger.info('All providers cleared'); - } - - return { - registerProvider, - getHandler, - getAllScheduledJobs, - getProviders, - hasProvider, - clear, - }; -} - -// Create the default shared registry instance -export const providerRegistry = createProviderRegistry(); diff --git a/apps/data-service/src/services/queue-manager.service.ts b/apps/data-service/src/services/queue-manager.service.ts new file mode 100644 index 0000000..478c2f0 --- /dev/null +++ b/apps/data-service/src/services/queue-manager.service.ts @@ -0,0 +1,184 @@ +/** + * Data Service Queue Manager + * Uses the new @stock-bot/queue library with provider registry + */ +import { getLogger } from '@stock-bot/logger'; +import type { JobData } from '@stock-bot/queue'; +import { initializeBatchCache, providerRegistry, QueueManager } from '@stock-bot/queue'; + +const logger = getLogger('queue-manager-service'); + +class DataServiceQueueManager { + private queueManager: QueueManager; + private isInitialized = false; + + constructor() { + this.queueManager = new QueueManager({ + queueName: 'data-service-queue', + workers: parseInt(process.env.WORKER_COUNT || '5'), + concurrency: parseInt(process.env.WORKER_CONCURRENCY || '20'), + redis: { + host: process.env.DRAGONFLY_HOST || 'localhost', + port: parseInt(process.env.DRAGONFLY_PORT || '6379'), + }, + }); + } + + async initialize() { + if (this.isInitialized) { + logger.warn('Queue manager already initialized'); + return; + } + + logger.info('Initializing data service queue manager...'); + + try { + // Register all providers + await this.registerProviders(); + + // Initialize the queue manager + await this.queueManager.initialize(); + + // Initialize batch cache + await initializeBatchCache(this.queueManager); + + // Set up scheduled jobs + await this.setupScheduledJobs(); + + this.isInitialized = true; + logger.info('Data service queue manager initialized successfully'); + } catch (error) { + logger.error('Failed to initialize queue manager', error); + throw error; + } + } + + private async registerProviders() { + logger.info('Registering queue providers...'); + + // Initialize providers using the new provider registry system + const { initializeIBProvider } = await import('../providers/ib.provider'); + const { initializeProxyProvider } = await import('../providers/proxy.provider'); + + // Register providers with scheduled jobs + initializeIBProvider(); + initializeProxyProvider(); + + // Now register all providers from the registry with the queue manager + const allProviders = providerRegistry.getAllProviders(); + for (const [providerName, config] of allProviders) { + this.queueManager.registerProvider(providerName, config.operations); + logger.info(`Registered provider: ${providerName}`); + } + + // Log scheduled jobs + const scheduledJobs = providerRegistry.getAllScheduledJobs(); + logger.info(`Registered ${scheduledJobs.length} scheduled jobs across all providers`); + for (const { provider, job } of scheduledJobs) { + logger.info( + `Scheduled job: ${provider}.${job.type} - ${job.description} (${job.cronPattern})` + ); + } + + logger.info('All providers registered successfully'); + } + + private async setupScheduledJobs() { + const scheduledJobs = providerRegistry.getAllScheduledJobs(); + + if (scheduledJobs.length === 0) { + logger.info('No scheduled jobs found'); + return; + } + + logger.info(`Setting up ${scheduledJobs.length} scheduled jobs...`); + + for (const { provider, job } of scheduledJobs) { + try { + const jobData: JobData = { + type: job.type, + provider, + operation: job.operation, + payload: job.payload, + priority: job.priority, + }; + + await this.queueManager.add(`recurring-${provider}-${job.operation}`, jobData, { + repeat: { + pattern: job.cronPattern, + tz: 'UTC', + immediately: job.immediately || false, + }, + removeOnComplete: 1, + removeOnFail: 1, + attempts: 2, + backoff: { + type: 'fixed', + delay: 5000, + }, + }); + + logger.info(`Scheduled job registered: ${provider}.${job.type} (${job.cronPattern})`); + } catch (error) { + logger.error(`Failed to register scheduled job: ${provider}.${job.type}`, { error }); + } + } + + logger.info('Scheduled jobs setup complete'); + } + + async addJob(name: string, data: JobData, options?: Record) { + if (!this.isInitialized) { + throw new Error('Queue manager not initialized'); + } + + return this.queueManager.add(name, data, options); + } + + async addBulk(jobs: Array<{ name: string; data: JobData; opts?: Record }>) { + if (!this.isInitialized) { + throw new Error('Queue manager not initialized'); + } + + return this.queueManager.addBulk(jobs); + } + + async getStats() { + if (!this.isInitialized) { + throw new Error('Queue manager not initialized'); + } + + return this.queueManager.getStats(); + } + + async clean(grace: number) { + if (!this.isInitialized) { + throw new Error('Queue manager not initialized'); + } + + return this.queueManager.clean(grace); + } + + async shutdown() { + if (!this.isInitialized) { + return; + } + + logger.info('Shutting down queue manager...'); + await this.queueManager.shutdown(); + this.isInitialized = false; + logger.info('Queue manager shutdown complete'); + } + + // Compatibility methods for existing code + getQueueName() { + return this.queueManager.getQueueName(); + } + + get queue() { + return this.queueManager; + } +} + +// Export singleton instance +export const queueManager = new DataServiceQueueManager(); diff --git a/apps/data-service/src/services/queue.service.ts b/apps/data-service/src/services/queue.service.ts deleted file mode 100644 index 7f95dc5..0000000 --- a/apps/data-service/src/services/queue.service.ts +++ /dev/null @@ -1,419 +0,0 @@ -import { Queue, QueueEvents, Worker, type Job } from 'bullmq'; -import { getLogger } from '@stock-bot/logger'; -import { providerRegistry, type JobData } from './provider-registry.service'; - -export class QueueService { - private logger = getLogger('queue-service'); - private queue!: Queue; - private workers: Worker[] = []; - private queueEvents!: QueueEvents; - - private config = { - workers: parseInt(process.env.WORKER_COUNT || '5'), - concurrency: parseInt(process.env.WORKER_CONCURRENCY || '20'), - redis: { - host: process.env.DRAGONFLY_HOST || 'localhost', - port: parseInt(process.env.DRAGONFLY_PORT || '6379'), - }, - }; - - private get isInitialized() { - return !!this.queue; - } - - constructor() { - // Don't initialize in constructor to allow for proper async initialization - } - async initialize() { - if (this.isInitialized) { - this.logger.warn('Queue service already initialized'); - return; - } - - this.logger.info('Initializing queue service...'); - - try { - // Step 1: Register providers - await this.registerProviders(); - - // Step 2: Setup queue and workers - const connection = this.getConnection(); - const queueName = '{data-service-queue}'; - this.queue = new Queue(queueName, { - connection, - defaultJobOptions: { - removeOnComplete: 10, - removeOnFail: 5, - attempts: 3, - backoff: { - type: 'exponential', - delay: 1000, - }, - }, - }); - - this.queueEvents = new QueueEvents(queueName, { connection }); - - // Step 3: Create workers - const { workerCount, totalConcurrency } = this.createWorkers(queueName, connection); - - // Step 4: Wait for readiness (parallel) - await Promise.all([ - this.queue.waitUntilReady(), - this.queueEvents.waitUntilReady(), - ...this.workers.map(worker => worker.waitUntilReady()), - ]); - - // Step 5: Setup events and scheduled tasks - this.setupQueueEvents(); - await this.setupScheduledTasks(); - - this.logger.info('Queue service initialized successfully', { - workers: workerCount, - totalConcurrency, - }); - } catch (error) { - this.logger.error('Failed to initialize queue service', { error }); - throw error; - } - } - private getConnection() { - return { - ...this.config.redis, - maxRetriesPerRequest: null, - retryDelayOnFailover: 100, - lazyConnect: false, - }; - } - - private createWorkers(queueName: string, connection: any) { - for (let i = 0; i < this.config.workers; i++) { - const worker = new Worker(queueName, this.processJob.bind(this), { - connection: { ...connection }, - concurrency: this.config.concurrency, - maxStalledCount: 1, - stalledInterval: 30000, - }); - - // Setup events inline - worker.on('ready', () => this.logger.info(`Worker ${i + 1} ready`)); - worker.on('error', error => this.logger.error(`Worker ${i + 1} error`, { error })); - - this.workers.push(worker); - } - - return { - workerCount: this.config.workers, - totalConcurrency: this.config.workers * this.config.concurrency, - }; - } - private setupQueueEvents() { - // Add comprehensive logging to see job flow - this.queueEvents.on('added', job => { - this.logger.debug('Job added to queue', { - id: job.jobId, - }); - }); - - this.queueEvents.on('waiting', job => { - this.logger.debug('Job moved to waiting', { - id: job.jobId, - }); - }); - - this.queueEvents.on('active', job => { - this.logger.debug('Job became active', { - id: job.jobId, - }); - }); - - this.queueEvents.on('delayed', job => { - this.logger.debug('Job delayed', { - id: job.jobId, - delay: job.delay, - }); - }); - - this.queueEvents.on('completed', job => { - this.logger.debug('Job completed', { - id: job.jobId, - }); - }); - - this.queueEvents.on('failed', (job, error) => { - this.logger.debug('Job failed', { - id: job.jobId, - error: String(error), - }); - }); - } - private async registerProviders() { - this.logger.info('Registering providers...'); - - try { - // Define providers to register - const providers = [ - { module: '../providers/proxy.provider', export: 'proxyProvider' }, - { module: '../providers/ib.provider', export: 'ibProvider' }, - // { module: '../providers/yahoo.provider', export: 'yahooProvider' }, - ]; - - // Import and register all providers - for (const { module, export: exportName } of providers) { - const providerModule = await import(module); - providerRegistry.registerProvider(providerModule[exportName]); - } - - this.logger.info('All providers registered successfully'); - } catch (error) { - this.logger.error('Failed to register providers', { error }); - throw error; - } - } - private async processJob(job: Job) { - const { provider, operation, payload }: JobData = job.data; - - this.logger.info('Processing job', { - id: job.id, - provider, - operation, - payloadKeys: Object.keys(payload || {}), - }); - try { - let result; - - if (operation === 'process-batch-items') { - // Special handling for batch processing - requires 2 parameters - const { processBatchJob } = await import('../utils/batch-helpers'); - result = await processBatchJob(payload, this); - } else { - // Regular handler lookup - requires 1 parameter - const handler = providerRegistry.getHandler(provider, operation); - - if (!handler) { - throw new Error(`No handler found for ${provider}:${operation}`); - } - - result = await handler(payload); - } - - this.logger.info('Job completed successfully', { - id: job.id, - provider, - operation, - }); - - return result; - } catch (error) { - const errorMessage = error instanceof Error ? error.message : String(error); - this.logger.error('Job failed', { - id: job.id, - provider, - operation, - error: errorMessage, - }); - throw error; - } - } - - async addBulk(jobs: any[]): Promise { - return await this.queue.addBulk(jobs); - } - - private getTotalConcurrency() { - return this.workers.reduce((total, worker) => total + (worker.opts.concurrency || 1), 0); - } - - private async setupScheduledTasks() { - const allScheduledJobs = providerRegistry.getAllScheduledJobs(); - - if (allScheduledJobs.length === 0) { - this.logger.warn('No scheduled jobs found in providers'); - return; - } - - this.logger.info('Setting up scheduled tasks...', { count: allScheduledJobs.length }); - - // Use Promise.allSettled for parallel processing + better error handling - const results = await Promise.allSettled( - allScheduledJobs.map(async ({ provider, job }) => { - await this.addRecurringJob( - { - type: job.type, - provider, - operation: job.operation, - payload: job.payload, - priority: job.priority, - immediately: job.immediately || false, - }, - job.cronPattern - ); - - return { provider, operation: job.operation }; - }) - ); - - // Log results - const successful = results.filter(r => r.status === 'fulfilled'); - const failed = results.filter(r => r.status === 'rejected'); - - if (failed.length > 0) { - failed.forEach((result, index) => { - const { provider, job } = allScheduledJobs[index]; - this.logger.error('Failed to register scheduled job', { - provider, - operation: job.operation, - error: result.reason, - }); - }); - } - - this.logger.info('Scheduled tasks setup complete', { - successful: successful.length, - failed: failed.length, - }); - } - private async addJobInternal(jobData: JobData, options: any = {}) { - if (!this.isInitialized) { - throw new Error('Queue service not initialized'); - } - - const jobType = jobData.type || `${jobData.provider}-${jobData.operation}`; - return this.queue.add(jobType, jobData, { - priority: jobData.priority || undefined, - removeOnComplete: 10, - removeOnFail: 5, - ...options, - }); - } - - async addJob(jobData: JobData, options?: any) { - return this.addJobInternal(jobData, options); - } - - async addRecurringJob(jobData: JobData, cronPattern: string, options?: any) { - const jobKey = `recurring-${jobData.provider}-${jobData.operation}`; - - return this.addJobInternal(jobData, { - repeat: { - pattern: cronPattern, - tz: 'UTC', - immediately: jobData.immediately || false, - }, - jobId: jobKey, - removeOnComplete: 1, - removeOnFail: 1, - attempts: 2, - backoff: { - type: 'fixed', - delay: 5000, - }, - ...options, - }); - } - async getJobStats() { - if (!this.isInitialized) { - throw new Error('Queue service not initialized. Call initialize() first.'); - } - const [waiting, active, completed, failed, delayed] = await Promise.all([ - this.queue.getWaiting(), - this.queue.getActive(), - this.queue.getCompleted(), - this.queue.getFailed(), - this.queue.getDelayed(), - ]); - - return { - waiting: waiting.length, - active: active.length, - completed: completed.length, - failed: failed.length, - delayed: delayed.length, - }; - } - async drainQueue() { - if (this.isInitialized) { - await this.queue.drain(); - } - } - async getQueueStatus() { - if (!this.isInitialized) { - throw new Error('Queue service not initialized'); - } - - const stats = await this.getJobStats(); - return { - ...stats, - workers: this.workers.length, - concurrency: this.getTotalConcurrency(), - }; - } - async shutdown() { - if (!this.isInitialized) { - this.logger.warn('Queue service not initialized, nothing to shutdown'); - return; - } - - this.logger.info('Shutting down queue service gracefully...'); - - try { - // Step 1: Stop accepting new jobs and wait for current jobs to finish - this.logger.debug('Closing workers gracefully...'); - const workerClosePromises = this.workers.map(async (worker, index) => { - this.logger.debug(`Closing worker ${index + 1}/${this.workers.length}`); - try { - // Wait for current jobs to finish, then close - await Promise.race([ - worker.close(), - new Promise((_, reject) => - setTimeout(() => reject(new Error(`Worker ${index + 1} close timeout`)), 5000) - ), - ]); - this.logger.debug(`Worker ${index + 1} closed successfully`); - } catch (error) { - this.logger.error(`Failed to close worker ${index + 1}`, { error }); - // Force close if graceful close fails - await worker.close(true); - } - }); - - await Promise.allSettled(workerClosePromises); - this.logger.debug('All workers closed'); - - // Step 2: Close queue and events with timeout protection - this.logger.debug('Closing queue and events...'); - await Promise.allSettled([ - Promise.race([ - this.queue.close(), - new Promise((_, reject) => - setTimeout(() => reject(new Error('Queue close timeout')), 3000) - ), - ]).catch(error => this.logger.error('Queue close error', { error })), - - Promise.race([ - this.queueEvents.close(), - new Promise((_, reject) => - setTimeout(() => reject(new Error('QueueEvents close timeout')), 3000) - ), - ]).catch(error => this.logger.error('QueueEvents close error', { error })), - ]); - - this.logger.info('Queue service shutdown completed successfully'); - } catch (error) { - this.logger.error('Error during queue service shutdown', { error }); - // Force close everything as last resort - try { - await Promise.allSettled([ - ...this.workers.map(worker => worker.close(true)), - this.queue.close(), - this.queueEvents.close(), - ]); - } catch (forceCloseError) { - this.logger.error('Force close also failed', { error: forceCloseError }); - } - throw error; - } - } -} - -export const queueManager = new QueueService(); diff --git a/apps/data-service/src/utils/batch-helpers.ts b/apps/data-service/src/utils/batch-helpers.ts deleted file mode 100644 index d859eaa..0000000 --- a/apps/data-service/src/utils/batch-helpers.ts +++ /dev/null @@ -1,364 +0,0 @@ -import { CacheProvider, createCache } from '@stock-bot/cache'; -import { getLogger } from '@stock-bot/logger'; -import type { QueueService } from '../services/queue.service'; - -const logger = getLogger('batch-helpers'); - -// Simple interfaces -export interface ProcessOptions { - totalDelayHours: number; - batchSize?: number; - priority?: number; - useBatching?: boolean; - retries?: number; - ttl?: number; - removeOnComplete?: number; - removeOnFail?: number; - // Job routing information - provider?: string; - operation?: string; -} - -export interface BatchResult { - jobsCreated: number; - mode: 'direct' | 'batch'; - totalItems: number; - batchesCreated?: number; - duration: number; -} - -// Cache instance for payload storage -let cacheProvider: CacheProvider | null = null; - -function getCache(): CacheProvider { - if (!cacheProvider) { - cacheProvider = createCache({ - keyPrefix: 'batch:', - ttl: 86400, // 24 hours default - enableMetrics: true, - }); - } - return cacheProvider; -} - -/** - * Initialize the batch cache before any batch operations - * This should be called during application startup - */ -export async function initializeBatchCache(): Promise { - logger.info('Initializing batch cache...'); - const cache = getCache(); - await cache.waitForReady(10000); - logger.info('Batch cache initialized successfully'); -} - -/** - * Main function - processes items either directly or in batches - */ -export async function processItems( - items: T[], - processor: (item: T, index: number) => any, - queue: QueueService, - options: ProcessOptions -): Promise { - const startTime = Date.now(); - - if (items.length === 0) { - return { - jobsCreated: 0, - mode: 'direct', - totalItems: 0, - duration: 0, - }; - } - - logger.info('Starting batch processing', { - totalItems: items.length, - mode: options.useBatching ? 'batch' : 'direct', - batchSize: options.batchSize, - totalDelayHours: options.totalDelayHours, - }); - - try { - const result = options.useBatching - ? await processBatched(items, processor, queue, options) - : await processDirect(items, processor, queue, options); - - const duration = Date.now() - startTime; - - logger.info('Batch processing completed', { - ...result, - duration: `${(duration / 1000).toFixed(1)}s`, - }); - - return { ...result, duration }; - } catch (error) { - logger.error('Batch processing failed', error); - throw error; - } -} - -/** - * Process items directly - each item becomes a separate job - */ -async function processDirect( - items: T[], - processor: (item: T, index: number) => any, - queue: QueueService, - options: ProcessOptions -): Promise> { - const totalDelayMs = options.totalDelayHours * 60 * 60 * 1000; - const delayPerItem = totalDelayMs / items.length; - - logger.info('Creating direct jobs', { - totalItems: items.length, - delayPerItem: `${(delayPerItem / 1000).toFixed(1)}s`, - }); - - const jobs = items.map((item, index) => ({ - name: 'process-item', - data: { - type: 'process-item', - provider: options.provider || 'generic', - operation: options.operation || 'process-item', - payload: processor(item, index), - priority: options.priority || undefined, - }, - opts: { - delay: index * delayPerItem, - priority: options.priority || undefined, - attempts: options.retries || 3, - removeOnComplete: options.removeOnComplete || 10, - removeOnFail: options.removeOnFail || 5, - }, - })); - - const createdJobs = await addJobsInChunks(queue, jobs); - - return { - totalItems: items.length, - jobsCreated: createdJobs.length, - mode: 'direct', - }; -} - -/** - * Process items in batches - groups of items are stored and processed together - */ -async function processBatched( - items: T[], - processor: (item: T, index: number) => any, - queue: QueueService, - options: ProcessOptions -): Promise> { - const batchSize = options.batchSize || 100; - const batches = createBatches(items, batchSize); - const totalDelayMs = options.totalDelayHours * 60 * 60 * 1000; - const delayPerBatch = totalDelayMs / batches.length; - - logger.info('Creating batch jobs', { - totalItems: items.length, - batchSize, - totalBatches: batches.length, - delayPerBatch: `${(delayPerBatch / 1000 / 60).toFixed(2)} minutes`, - }); - - const batchJobs = await Promise.all( - batches.map(async (batch, batchIndex) => { - const payloadKey = await storePayload(batch, processor, options); - - return { - name: 'process-batch', - data: { - type: 'process-batch', - provider: options.provider || 'generic', - operation: 'process-batch-items', - payload: { - payloadKey, - batchIndex, - totalBatches: batches.length, - itemCount: batch.length, - }, - priority: options.priority || undefined, - }, - opts: { - delay: batchIndex * delayPerBatch, - priority: options.priority || undefined, - attempts: options.retries || 3, - removeOnComplete: options.removeOnComplete || 10, - removeOnFail: options.removeOnFail || 5, - }, - }; - }) - ); - - const createdJobs = await addJobsInChunks(queue, batchJobs); - - return { - totalItems: items.length, - jobsCreated: createdJobs.length, - batchesCreated: batches.length, - mode: 'batch', - }; -} - -/** - * Process a batch job - loads payload from cache and creates individual jobs - */ -export async function processBatchJob(jobData: any, queue: QueueService): Promise { - const { payloadKey, batchIndex, totalBatches, itemCount } = jobData; - - logger.debug('Processing batch job', { - batchIndex, - totalBatches, - itemCount, - }); - - try { - const payload = await loadPayload(payloadKey); - if (!payload || !payload.items || !payload.processorStr) { - logger.error('Invalid payload data', { payloadKey, payload }); - throw new Error(`Invalid payload data for key: ${payloadKey}`); - } - - const { items, processorStr, options } = payload; - - // Deserialize the processor function - const processor = new Function('return ' + processorStr)(); - - const jobs = items.map((item: any, index: number) => ({ - name: 'process-item', - data: { - type: 'process-item', - provider: options.provider || 'generic', - operation: options.operation || 'generic', - payload: processor(item, index), - priority: options.priority || undefined, - }, - opts: { - delay: index * (options.delayPerItem || 1000), - priority: options.priority || undefined, - attempts: options.retries || 3, - }, - })); - - const createdJobs = await addJobsInChunks(queue, jobs); - - // Cleanup payload after successful processing - await cleanupPayload(payloadKey); - - return { - batchIndex, - itemsProcessed: items.length, - jobsCreated: createdJobs.length, - }; - } catch (error) { - logger.error('Batch job processing failed', { batchIndex, error }); - throw error; - } -} - -// Helper functions - -function createBatches(items: T[], batchSize: number): T[][] { - const batches: T[][] = []; - for (let i = 0; i < items.length; i += batchSize) { - batches.push(items.slice(i, i + batchSize)); - } - return batches; -} - -async function storePayload( - items: T[], - processor: (item: T, index: number) => any, - options: ProcessOptions -): Promise { - const cache = getCache(); - - // Create more specific key: batch:provider:operation:payload_timestamp_random - const timestamp = Date.now(); - const randomId = Math.random().toString(36).substr(2, 9); - const provider = options.provider || 'generic'; - const operation = options.operation || 'generic'; - - const key = `${provider}:${operation}:payload_${timestamp}_${randomId}`; - - const payload = { - items, - processorStr: processor.toString(), - options: { - delayPerItem: 1000, - priority: options.priority || undefined, - retries: options.retries || 3, - // Store routing information for later use - provider: options.provider || 'generic', - operation: options.operation || 'generic', - }, - createdAt: Date.now(), - }; - - logger.debug('Storing batch payload', { - key, - itemCount: items.length, - }); - - await cache.set(key, payload, options.ttl || 86400); - - logger.debug('Stored batch payload successfully', { - key, - itemCount: items.length, - }); - - return key; -} - -async function loadPayload(key: string): Promise { - const cache = getCache(); - - logger.debug('Loading batch payload', { key }); - - const data = await cache.get(key); - - if (!data) { - logger.error('Payload not found in cache', { key }); - throw new Error(`Payload not found: ${key}`); - } - - logger.debug('Loaded batch payload successfully', { key }); - return data; -} - -async function cleanupPayload(key: string): Promise { - try { - const cache = getCache(); - await cache.del(key); - logger.debug('Cleaned up payload', { key }); - } catch (error) { - logger.warn('Failed to cleanup payload', { key, error }); - } -} - -async function addJobsInChunks(queue: QueueService, jobs: any[], chunkSize = 100): Promise { - const allCreatedJobs = []; - - for (let i = 0; i < jobs.length; i += chunkSize) { - const chunk = jobs.slice(i, i + chunkSize); - try { - const createdJobs = await queue.addBulk(chunk); - allCreatedJobs.push(...createdJobs); - - // Small delay between chunks to avoid overwhelming Redis - if (i + chunkSize < jobs.length) { - await new Promise(resolve => setTimeout(resolve, 100)); - } - } catch (error) { - logger.error('Failed to add job chunk', { - startIndex: i, - chunkSize: chunk.length, - error, - }); - } - } - - return allCreatedJobs; -} diff --git a/apps/data-service/tsconfig.json b/apps/data-service/tsconfig.json index 55a3676..8dc7fd0 100644 --- a/apps/data-service/tsconfig.json +++ b/apps/data-service/tsconfig.json @@ -25,6 +25,7 @@ { "path": "../../libs/event-bus" }, { "path": "../../libs/shutdown" }, { "path": "../../libs/utils" }, - { "path": "../../libs/browser" } + { "path": "../../libs/browser" }, + { "path": "../../libs/queue" } ] } diff --git a/libs/queue/src/index.ts b/libs/queue/src/index.ts index 7078fbe..f10c791 100644 --- a/libs/queue/src/index.ts +++ b/libs/queue/src/index.ts @@ -9,3 +9,14 @@ export { initializeBatchCache, processBatchJob, processItems } from './batch-pro export { QueueManager } from './queue-manager'; export { providerRegistry } from './provider-registry'; + +// Re-export types for convenience +export type { + BatchResult, + JobHandler, + ProcessOptions, + ProviderConfig, + ProviderConfigWithSchedule, + QueueConfig, + ScheduledJob, +} from './types'; diff --git a/libs/queue/src/provider-registry.ts b/libs/queue/src/provider-registry.ts index e36643c..d3ea938 100644 --- a/libs/queue/src/provider-registry.ts +++ b/libs/queue/src/provider-registry.ts @@ -1,13 +1,14 @@ import { getLogger } from '@stock-bot/logger'; -import type { JobHandler, ProviderConfig } from './types'; +import type { JobHandler, ProviderConfig, ProviderConfigWithSchedule, ScheduledJob } from './types'; const logger = getLogger('provider-registry'); class ProviderRegistry { private providers = new Map(); + private providerSchedules = new Map(); /** - * Register a provider with its operations + * Register a provider with its operations (simple config) */ register(providerName: string, config: ProviderConfig): void { logger.info(`Registering provider: ${providerName}`, { @@ -17,6 +18,22 @@ class ProviderRegistry { this.providers.set(providerName, config); } + /** + * Register a provider with operations and scheduled jobs (full config) + */ + registerWithSchedule(config: ProviderConfigWithSchedule): void { + logger.info(`Registering provider with schedule: ${config.name}`, { + operations: Object.keys(config.operations), + scheduledJobs: config.scheduledJobs?.length || 0, + }); + + this.providers.set(config.name, config.operations); + + if (config.scheduledJobs && config.scheduledJobs.length > 0) { + this.providerSchedules.set(config.name, config.scheduledJobs); + } + } + /** * Get a handler for a specific provider and operation */ @@ -38,6 +55,69 @@ class ProviderRegistry { return handler; } + /** + * Get all scheduled jobs from all providers + */ + getAllScheduledJobs(): Array<{ provider: string; job: ScheduledJob }> { + const allJobs: Array<{ provider: string; job: ScheduledJob }> = []; + + for (const [providerName, jobs] of this.providerSchedules) { + for (const job of jobs) { + allJobs.push({ + provider: providerName, + job, + }); + } + } + + return allJobs; + } + + /** + * Get scheduled jobs for a specific provider + */ + getScheduledJobs(provider: string): ScheduledJob[] { + return this.providerSchedules.get(provider) || []; + } + + /** + * Check if a provider has scheduled jobs + */ + hasScheduledJobs(provider: string): boolean { + return this.providerSchedules.has(provider); + } + + /** + * Get all registered providers with their configurations + */ + getProviderConfigs(): Array<{ name: string; operations: string[]; scheduledJobs: number }> { + return Array.from(this.providers.keys()).map(name => ({ + name, + operations: Object.keys(this.providers.get(name) || {}), + scheduledJobs: this.providerSchedules.get(name)?.length || 0, + })); + } + + /** + * Get all providers with their full configurations for queue manager registration + */ + getAllProviders(): Map { + const result = new Map< + string, + { operations: ProviderConfig; scheduledJobs?: ScheduledJob[] } + >(); + + for (const [name, operations] of this.providers) { + const scheduledJobs = this.providerSchedules.get(name); + result.set(name, { + operations, + scheduledJobs, + }); + } + + return result; + } + /** * Get all registered providers */ @@ -72,6 +152,7 @@ class ProviderRegistry { * Remove a provider */ unregister(provider: string): boolean { + this.providerSchedules.delete(provider); return this.providers.delete(provider); } @@ -80,20 +161,28 @@ class ProviderRegistry { */ clear(): void { this.providers.clear(); + this.providerSchedules.clear(); } /** * Get registry statistics */ - getStats(): { providers: number; totalOperations: number } { + getStats(): { providers: number; totalOperations: number; totalScheduledJobs: number } { let totalOperations = 0; + let totalScheduledJobs = 0; + for (const config of this.providers.values()) { totalOperations += Object.keys(config).length; } + for (const jobs of this.providerSchedules.values()) { + totalScheduledJobs += jobs.length; + } + return { providers: this.providers.size, totalOperations, + totalScheduledJobs, }; } } diff --git a/libs/queue/src/types.ts b/libs/queue/src/types.ts index e49cdb3..a973381 100644 --- a/libs/queue/src/types.ts +++ b/libs/queue/src/types.ts @@ -56,10 +56,26 @@ export interface JobHandler { (payload: any): Promise; } +export interface ScheduledJob { + type: string; + operation: string; + payload: any; + cronPattern: string; + priority?: number; + description?: string; + immediately?: boolean; +} + export interface ProviderConfig { [operation: string]: JobHandler; } +export interface ProviderConfigWithSchedule { + name: string; + operations: Record; + scheduledJobs?: ScheduledJob[]; +} + export interface BatchJobData { payloadKey: string; batchIndex: number; diff --git a/libs/queue/tsconfig.json b/libs/queue/tsconfig.json index 3c2a392..d601b74 100644 --- a/libs/queue/tsconfig.json +++ b/libs/queue/tsconfig.json @@ -14,7 +14,8 @@ "sourceMap": true, "moduleResolution": "node", "resolveJsonModule": true, - "allowSyntheticDefaultImports": true + "allowSyntheticDefaultImports": true, + "composite": true }, "include": ["src/**/*"], "exclude": ["node_modules", "dist", "**/*.test.ts", "**/*.spec.ts"]