From eca03962936c40c2ccc57bb03eaea743ccee9187 Mon Sep 17 00:00:00 2001 From: Boki Date: Tue, 10 Jun 2025 23:08:46 -0400 Subject: [PATCH] simplified providers a bit --- .../src/examples/batch-processing-examples.ts | 116 ------------------ .../src/providers/proxy.provider.ts | 7 +- .../data-service/src/providers/proxy.tasks.ts | 2 - .../src/providers/quotemedia.provider.ts | 1 - .../src/providers/yahoo.provider.ts | 1 - apps/data-service/src/routes/proxy.routes.ts | 9 +- .../src/services/provider-registry.service.ts | 49 ++++---- .../src/services/queue.service.ts | 67 ++++------ apps/data-service/src/utils/batch-helpers.ts | 5 - 9 files changed, 48 insertions(+), 209 deletions(-) delete mode 100644 apps/data-service/src/examples/batch-processing-examples.ts diff --git a/apps/data-service/src/examples/batch-processing-examples.ts b/apps/data-service/src/examples/batch-processing-examples.ts deleted file mode 100644 index 72e5e8f..0000000 --- a/apps/data-service/src/examples/batch-processing-examples.ts +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Example usage of the new functional batch processing approach - */ - -import { processItems, processBatchJob } from '../utils/batch-helpers'; -import { queueManager } from '../services/queue.service'; - -// Example 1: Process a list of symbols for live data -export async function exampleSymbolProcessing() { - const symbols = ['AAPL', 'GOOGL', 'MSFT', 'TSLA', 'AMZN']; - - const result = await processItems( - symbols, - (symbol, index) => ({ - symbol, - index, - source: 'batch-processing' - }), - queueManager, - { - totalDelayMs: 60000, // 1 minute total - useBatching: false, // Process directly - priority: 1, - service: 'market-data', - provider: 'yahoo', - operation: 'live-data' - } - ); - - console.log('Symbol processing result:', result); - // Output: { jobsCreated: 5, mode: 'direct', totalItems: 5, duration: 1234 } -} - -// Example 2: Process proxies in batches -export async function exampleProxyProcessing() { - const proxies = [ - { host: '1.1.1.1', port: 8080 }, - { host: '2.2.2.2', port: 3128 }, - // ... more proxies - ]; - - const result = await processItems( - proxies, - (proxy, index) => ({ - proxy, - index, - source: 'batch-processing' - }), - queueManager, - { - totalDelayMs: 3600000, // 1 hour total - useBatching: true, // Use batch mode - batchSize: 100, // 100 proxies per batch - priority: 2, - service: 'proxy', - provider: 'proxy-service', - operation: 'check-proxy' - } - ); - - console.log('Proxy processing result:', result); - // Output: { jobsCreated: 10, mode: 'batch', totalItems: 1000, batchesCreated: 10, duration: 2345 } -} - -// Example 3: Custom processing with generic function -export async function exampleCustomProcessing() { - const customData = [ - { id: 1, name: 'Item 1' }, - { id: 2, name: 'Item 2' }, - { id: 3, name: 'Item 3' } - ]; - - const result = await processItems( - customData, - (item, index) => ({ - // Transform each item for processing - itemId: item.id, - itemName: item.name, - processIndex: index, - timestamp: new Date().toISOString() - }), - queueManager, - { - totalDelayMs: 30000, // 30 seconds total - useBatching: false, // Direct processing - priority: 1, - retries: 3 - } - ); - - console.log('Custom processing result:', result); -} - -// Example 4: Batch job processor (used by workers) -export async function exampleBatchJobProcessor(jobData: any) { - // This would be called by a BullMQ worker when processing batch jobs - const result = await processBatchJob(jobData, queueManager); - - console.log('Batch job processed:', result); - // Output: { batchIndex: 0, itemsProcessed: 100, jobsCreated: 100 } - - return result; -} - -// Example: Simple functional approach using generic processItems -/* -await processItems(symbols, (symbol, index) => ({ symbol, index }), queueManager, { - totalDelayMs: 3600000, - useBatching: true, - batchSize: 200, - priority: 2, - service: 'data-service', - provider: 'yahoo', - operation: 'live-data' -}); -*/ diff --git a/apps/data-service/src/providers/proxy.provider.ts b/apps/data-service/src/providers/proxy.provider.ts index bf67137..59d3a12 100644 --- a/apps/data-service/src/providers/proxy.provider.ts +++ b/apps/data-service/src/providers/proxy.provider.ts @@ -15,8 +15,7 @@ const getEvery24HourCron = (): string => { export const proxyProvider: ProviderConfig = { name: 'proxy-provider', - service: 'data-service', - operations: { 'fetch-and-check': async (payload: { sources?: string[] }) => { + operations: {'fetch-and-check': async (payload: { sources?: string[] }) => { const { proxyService } = await import('./proxy.tasks'); const { queueManager } = await import('../services/queue.service'); const { processItems } = await import('../utils/batch-helpers'); @@ -35,13 +34,11 @@ export const proxyProvider: ProviderConfig = { index, source: 'batch-processing' }), - queueManager, - { + queueManager, { totalDelayMs: parseInt(process.env.PROXY_VALIDATION_HOURS || '4') * 60 * 60 * 1000, batchSize: parseInt(process.env.PROXY_BATCH_SIZE || '200'), useBatching: process.env.PROXY_DIRECT_MODE !== 'true', priority: 2, - service: 'data-service', provider: 'proxy-provider', operation: 'check-proxy' } diff --git a/apps/data-service/src/providers/proxy.tasks.ts b/apps/data-service/src/providers/proxy.tasks.ts index df160b5..c24fe2d 100644 --- a/apps/data-service/src/providers/proxy.tasks.ts +++ b/apps/data-service/src/providers/proxy.tasks.ts @@ -152,7 +152,6 @@ export async function queueProxyFetch(): Promise { const { queueManager } = await import('../services/queue.service'); const job = await queueManager.addJob({ type: 'proxy-fetch', - service: 'proxy', provider: 'proxy-service', operation: 'fetch-and-check', payload: {}, @@ -170,7 +169,6 @@ export async function queueProxyCheck(proxies: ProxyInfo[]): Promise { const { queueManager } = await import('../services/queue.service'); const job = await queueManager.addJob({ type: 'proxy-check', - service: 'proxy', provider: 'proxy-service', operation: 'check-specific', payload: { proxies }, diff --git a/apps/data-service/src/providers/quotemedia.provider.ts b/apps/data-service/src/providers/quotemedia.provider.ts index 2203c23..257fa6a 100644 --- a/apps/data-service/src/providers/quotemedia.provider.ts +++ b/apps/data-service/src/providers/quotemedia.provider.ts @@ -5,7 +5,6 @@ const logger = getLogger('quotemedia-provider'); export const quotemediaProvider: ProviderConfig = { name: 'quotemedia', - service: 'market-data', operations: { 'live-data': async (payload: { symbol: string; fields?: string[] }) => { logger.info('Fetching live data from QuoteMedia', { symbol: payload.symbol }); diff --git a/apps/data-service/src/providers/yahoo.provider.ts b/apps/data-service/src/providers/yahoo.provider.ts index 2eb26b1..23c851a 100644 --- a/apps/data-service/src/providers/yahoo.provider.ts +++ b/apps/data-service/src/providers/yahoo.provider.ts @@ -5,7 +5,6 @@ const logger = getLogger('yahoo-provider'); export const yahooProvider: ProviderConfig = { name: 'yahoo-finance', - service: 'market-data', operations: { 'live-data': async (payload: { symbol: string; modules?: string[] }) => { diff --git a/apps/data-service/src/routes/proxy.routes.ts b/apps/data-service/src/routes/proxy.routes.ts index 1d899e6..bbbf1d7 100644 --- a/apps/data-service/src/routes/proxy.routes.ts +++ b/apps/data-service/src/routes/proxy.routes.ts @@ -14,8 +14,7 @@ proxyRoutes.post('/api/proxy/fetch', async (c) => { try { const job = await queueManager.addJob({ type: 'proxy-fetch', - service: 'proxy', - provider: 'proxy-service', + provider: 'proxy-provider', operation: 'fetch-and-check', payload: {}, priority: 5 @@ -37,8 +36,7 @@ proxyRoutes.post('/api/proxy/check', async (c) => { const { proxies } = await c.req.json(); const job = await queueManager.addJob({ type: 'proxy-check', - service: 'proxy', - provider: 'proxy-service', + provider: 'proxy-provider', operation: 'check-specific', payload: { proxies }, priority: 8 @@ -60,8 +58,7 @@ proxyRoutes.get('/api/proxy/stats', async (c) => { try { const job = await queueManager.addJob({ type: 'proxy-stats', - service: 'proxy', - provider: 'proxy-service', + provider: 'proxy-provider', operation: 'get-stats', payload: {}, priority: 3 diff --git a/apps/data-service/src/services/provider-registry.service.ts b/apps/data-service/src/services/provider-registry.service.ts index ac7ab0f..59fdcbd 100644 --- a/apps/data-service/src/services/provider-registry.service.ts +++ b/apps/data-service/src/services/provider-registry.service.ts @@ -4,6 +4,15 @@ export interface JobHandler { (payload: any): Promise; } +export interface JobData { + type?: string; + provider: string; + operation: string; + payload: any; + priority?: number; + immediately?: boolean; +} + export interface ScheduledJob { type: string; operation: string; @@ -16,7 +25,6 @@ export interface ScheduledJob { export interface ProviderConfig { name: string; - service: string; operations: Record; scheduledJobs?: ScheduledJob[]; } @@ -27,51 +35,47 @@ export class ProviderRegistry { /** * Register a provider with its operations - */ registerProvider(config: ProviderConfig): void { - const key = `${config.service}:${config.name}`; - this.providers.set(key, config); - this.logger.info(`Registered provider: ${key}`, { + */ + registerProvider(config: ProviderConfig): void { + // const key = `${config.service}:${config.name}`; + this.providers.set(config.name, config); + this.logger.info(`Registered provider: ${config.name}`, { operations: Object.keys(config.operations), scheduledJobs: config.scheduledJobs?.length || 0 }); } - /** * Get a job handler for a specific provider and operation */ - getHandler(service: string, provider: string, operation: string): JobHandler | null { - const key = `${service}:${provider}`; - const providerConfig = this.providers.get(key); + getHandler(provider: string, operation: string): JobHandler | null { + const providerConfig = this.providers.get(provider); if (!providerConfig) { - this.logger.warn(`Provider not found: ${key}`); + this.logger.warn(`Provider not found: ${provider}`); return null; } const handler = providerConfig.operations[operation]; if (!handler) { - this.logger.warn(`Operation not found: ${operation} in provider ${key}`); + this.logger.warn(`Operation not found: ${operation} in provider ${provider}`); return null; } return handler; } - /** - * Get all registered providers + * Get all scheduled jobs from all providers */ getAllScheduledJobs(): Array<{ - service: string; provider: string; job: ScheduledJob; }> { - const allJobs: Array<{ service: string; provider: string; job: ScheduledJob }> = []; + const allJobs: Array<{ provider: string; job: ScheduledJob }> = []; for (const [key, config] of this.providers) { if (config.scheduledJobs) { for (const job of config.scheduledJobs) { allJobs.push({ - service: config.service, provider: config.name, job }); @@ -88,21 +92,12 @@ export class ProviderRegistry { config })); } - /** * Check if a provider exists */ - hasProvider(service: string, provider: string): boolean { - return this.providers.has(`${service}:${provider}`); + hasProvider(provider: string): boolean { + return this.providers.has(provider); } - - /** - * Get providers by service type - */ - getProvidersByService(service: string): ProviderConfig[] { - return Array.from(this.providers.values()).filter(provider => provider.service === service); - } - /** * Clear all providers (useful for testing) */ diff --git a/apps/data-service/src/services/queue.service.ts b/apps/data-service/src/services/queue.service.ts index 11e1eef..438dc6c 100644 --- a/apps/data-service/src/services/queue.service.ts +++ b/apps/data-service/src/services/queue.service.ts @@ -1,16 +1,6 @@ import { Queue, Worker, QueueEvents } from 'bullmq'; import { getLogger } from '@stock-bot/logger'; -import { providerRegistry } from './provider-registry.service'; - -export interface JobData { - type: string; - service: string; - provider: string; - operation: string; - payload: any; - priority?: number; - immediately?: boolean; -} +import { providerRegistry, JobData } from './provider-registry.service'; export class QueueService { private logger = getLogger('queue-service'); @@ -135,13 +125,11 @@ export class QueueService { this.logger.error('Failed to register providers', { error }); throw error; } - } - private async processJob(job: any) { - const { service, provider, operation, payload }: JobData = job.data; + } private async processJob(job: any) { + const { provider, operation, payload }: JobData = job.data; this.logger.info('Processing job', { id: job.id, - service, provider, operation, payloadKeys: Object.keys(payload || {}) @@ -155,10 +143,10 @@ export class QueueService { } // Get handler from registry - const handler = providerRegistry.getHandler(service, provider, operation); + const handler = providerRegistry.getHandler(provider, operation); if (!handler) { - throw new Error(`No handler found for ${service}:${provider}:${operation}`); + throw new Error(`No handler found for ${provider}:${operation}`); } // Execute the handler @@ -166,7 +154,6 @@ export class QueueService { this.logger.info('Job completed successfully', { id: job.id, - service, provider, operation }); @@ -177,7 +164,6 @@ export class QueueService { const errorMessage = error instanceof Error ? error.message : String(error); this.logger.error('Job failed', { id: job.id, - service, provider, operation, error: errorMessage @@ -220,12 +206,10 @@ export class QueueService { let successCount = 0; let failureCount = 0; let updatedCount = 0; - let newCount = 0; - - // Process each scheduled job - for (const { service, provider, job } of allScheduledJobs) { + let newCount = 0; // Process each scheduled job + for (const { provider, job } of allScheduledJobs) { try { - const jobKey = `${service}-${provider}-${job.operation}`; + const jobKey = `${provider}-${job.operation}`; // Check if this job already exists const existingJob = existingJobs.find(existing => @@ -257,7 +241,6 @@ export class QueueService { await this.addRecurringJob({ type: job.type, - service: service, provider: provider, operation: job.operation, payload: job.payload, @@ -267,7 +250,6 @@ export class QueueService { this.logger.info('Scheduled job registered', { type: job.type, - service, provider, operation: job.operation, cronPattern: job.cronPattern, @@ -280,7 +262,6 @@ export class QueueService { } catch (error) { this.logger.error('Failed to register scheduled job', { type: job.type, - service, provider, error: error instanceof Error ? error.message : String(error) }); @@ -300,12 +281,12 @@ export class QueueService { this.logger.error('Failed to setup scheduled tasks', error); } } - async addJob(jobData: JobData, options?: any) { if (!this.isInitialized) { throw new Error('Queue service not initialized. Call initialize() first.'); } - return this.queue.add(jobData.type, jobData, { + const jobType = jobData.type || `${jobData.provider}-${jobData.operation}`; + return this.queue.add(jobType, jobData, { priority: jobData.priority || 0, removeOnComplete: 10, removeOnFail: 5, @@ -318,9 +299,8 @@ export class QueueService { throw new Error('Queue service not initialized. Call initialize() first.'); } - try { - // Create a unique job key for this specific job - const jobKey = `${jobData.service}-${jobData.provider}-${jobData.operation}`; + try { // Create a unique job key for this specific job + const jobKey = `${jobData.provider}-${jobData.operation}`; // Get all existing repeatable jobs const existingJobs = await this.queue.getRepeatableJobs(); @@ -336,19 +316,18 @@ export class QueueService { jobKey, existingPattern: existingJob.pattern, newPattern: cronPattern - }); - - // Remove the existing job - await this.queue.removeRepeatableByKey(existingJob.key); + }); // Remove the existing job + if (existingJob.key) { + await this.queue.removeRepeatableByKey(existingJob.key); + } // Small delay to ensure cleanup is complete await new Promise(resolve => setTimeout(resolve, 100)); } else { this.logger.info('Creating new recurring job', { jobKey, cronPattern }); - } - - // Add the new/updated recurring job - const job = await this.queue.add(jobData.type, jobData, { + } // Add the new/updated recurring job + const jobType = jobData.type || `${jobData.provider}-${jobData.operation}`; + const job = await this.queue.add(jobType, jobData, { repeat: { pattern: cronPattern, tz: 'UTC', @@ -435,21 +414,17 @@ export class QueueService { } return this.workers.length; } - getRegisteredProviders() { return providerRegistry.getProviders().map(({ key, config }) => ({ key, name: config.name, - service: config.service, operations: Object.keys(config.operations), scheduledJobs: config.scheduledJobs?.length || 0 })); } - getScheduledJobsInfo() { - return providerRegistry.getAllScheduledJobs().map(({ service, provider, job }) => ({ - id: `${service}-${provider}-${job.type}`, - service, + return providerRegistry.getAllScheduledJobs().map(({ provider, job }) => ({ + id: `${provider}-${job.type}`, provider, type: job.type, operation: job.operation, diff --git a/apps/data-service/src/utils/batch-helpers.ts b/apps/data-service/src/utils/batch-helpers.ts index 3179df8..c441d11 100644 --- a/apps/data-service/src/utils/batch-helpers.ts +++ b/apps/data-service/src/utils/batch-helpers.ts @@ -15,7 +15,6 @@ export interface ProcessOptions { removeOnComplete?: number; removeOnFail?: number; // Job routing information - service?: string; provider?: string; operation?: string; } @@ -121,7 +120,6 @@ async function processDirect( name: 'process-item', data: { type: 'process-item', - service: options.service || 'data-service', provider: options.provider || 'generic', operation: options.operation || 'process-item', payload: processor(item, index), @@ -174,7 +172,6 @@ async function processBatched( name: 'process-batch', data: { type: 'process-batch', - service: options.service || 'generic', provider: options.provider || 'generic', operation: 'process-batch-items', payload: { @@ -234,7 +231,6 @@ export async function processBatchJob(jobData: any, queue: QueueService): Promis name: 'process-item', data: { type: 'process-item', - service: options.service || 'generic', provider: options.provider || 'generic', operation: options.operation || 'generic', payload: processor(item, index), @@ -297,7 +293,6 @@ async function storePayload( priority: options.priority || 1, retries: options.retries || 3, // Store routing information for later use - service: options.service || 'generic', provider: options.provider || 'generic', operation: options.operation || 'generic' },