From 660a2a1ec2f9c84525c6cfc3ce3edec56e4a0462 Mon Sep 17 00:00:00 2001 From: Boki Date: Sun, 15 Jun 2025 12:26:38 -0400 Subject: [PATCH] initial masterExchanges --- apps/data-service/src/index.ts | 9 +- .../src/providers/exchange-sync.provider.ts | 274 ++++++++++++++++++ .../src/providers/proxy.provider.ts | 4 +- .../src/routes/exchange.routes.ts | 128 ++++++++ apps/data-service/src/routes/index.ts | 1 + libs/mongodb-client/src/index.ts | 2 + libs/mongodb-client/src/types.ts | 31 ++ 7 files changed, 446 insertions(+), 3 deletions(-) create mode 100644 apps/data-service/src/providers/exchange-sync.provider.ts create mode 100644 apps/data-service/src/routes/exchange.routes.ts diff --git a/apps/data-service/src/index.ts b/apps/data-service/src/index.ts index a4ec6ed..92a09a6 100644 --- a/apps/data-service/src/index.ts +++ b/apps/data-service/src/index.ts @@ -10,7 +10,7 @@ import { processItems, QueueManager, type QueueConfig } from '@stock-bot/queue'; import { Shutdown } from '@stock-bot/shutdown'; import { initializeIBResources } from './providers/ib.tasks'; import { initializeProxyResources } from './providers/proxy.tasks'; -import { healthRoutes, queueRoutes } from './routes'; +import { exchangeRoutes, healthRoutes, queueRoutes } from './routes'; // Load environment variables loadEnvVariables(); @@ -45,6 +45,12 @@ function createDataServiceQueueManager(): QueueManager { const { initializeProxyProvider } = await import('./providers/proxy.provider'); return initializeProxyProvider(); }, + async () => { + const { initializeExchangeSyncProvider } = await import( + './providers/exchange-sync.provider' + ); + return initializeExchangeSyncProvider(); + }, ], enableScheduledJobs: true, }; @@ -59,6 +65,7 @@ export const queueManager = createDataServiceQueueManager(); export { processItems }; // Register all routes +app.route('', exchangeRoutes); app.route('', healthRoutes); app.route('', queueRoutes); diff --git a/apps/data-service/src/providers/exchange-sync.provider.ts b/apps/data-service/src/providers/exchange-sync.provider.ts new file mode 100644 index 0000000..c522085 --- /dev/null +++ b/apps/data-service/src/providers/exchange-sync.provider.ts @@ -0,0 +1,274 @@ +/** + * Exchange Sync Provider - Queue provider for syncing IB exchanges to master records + */ +import { getLogger } from '@stock-bot/logger'; +import type { MasterExchange } from '@stock-bot/mongodb-client'; +import type { ProviderConfigWithSchedule } from '@stock-bot/queue'; +import { providerRegistry } from '@stock-bot/queue'; + +const logger = getLogger('exchange-sync'); + +export function initializeExchangeSyncProvider() { + logger.info('Registering exchange sync provider...'); + + const exchangeSyncConfig: ProviderConfigWithSchedule = { + name: 'exchange-sync', + + operations: { + 'sync-ib-exchanges': async _payload => { + logger.info('Syncing IB exchanges to master table'); + return await syncIBExchanges(); + }, + + 'get-master-exchange': async (payload: { masterExchangeId: string }) => { + logger.debug('Getting master exchange details', payload); + const exchange = await getMasterExchangeDetails(payload.masterExchangeId); + return { exchange, ...payload }; + }, + }, + + scheduledJobs: [ + { + type: 'exchange-sync-daily', + operation: 'sync-ib-exchanges', + payload: {}, + cronPattern: '0 3 * * *', // Daily at 3 AM + priority: 3, + description: 'Daily sync of IB exchanges to master table', + immediately: true, // Run on startup to test + }, + ], + }; + + providerRegistry.registerWithSchedule(exchangeSyncConfig); + logger.info('Exchange sync provider registered successfully'); +} + +/** + * Sync IB exchanges from actual ibExchanges table - creates 1:1 master records + */ +async function syncIBExchanges(): Promise<{ syncedCount: number; totalExchanges: number }> { + logger.info('Syncing IB exchanges from database...'); + + try { + const { connectMongoDB, getDatabase } = await import('@stock-bot/mongodb-client'); + + // Ensure MongoDB client is connected + await connectMongoDB(); + + const db = getDatabase(); + + // Filter by country code US and CA + const ibExchanges = await db + .collection('ibExchanges') + .find({ + country_code: { $in: ['US', 'CA'] }, + }) + .toArray(); + + logger.info('Found IB exchanges in database', { count: ibExchanges.length }); + + let syncedCount = 0; + + for (const exchange of ibExchanges) { + try { + await createOrUpdateMasterExchange(exchange); + syncedCount++; + + logger.debug('Synced IB exchange', { + ibId: exchange.id, + country: exchange.country_code, + }); + } catch (error) { + logger.error('Failed to sync IB exchange', { exchange: exchange.id, error }); + } + } + + logger.info('IB exchange sync completed', { + syncedCount, + totalExchanges: ibExchanges.length, + }); + + return { syncedCount, totalExchanges: ibExchanges.length }; + } catch (error) { + logger.error('Failed to fetch IB exchanges from database', { error }); + return { syncedCount: 0, totalExchanges: 0 }; + } +} + +/** + * Create or update master exchange record 1:1 from IB exchange + */ +async function createOrUpdateMasterExchange(ibExchange: { + id?: string; + name?: string; + code?: string; + country_code?: string; + currency?: string; + _id?: unknown; +}): Promise { + const { connectMongoDB, getDatabase } = await import('@stock-bot/mongodb-client'); + + await connectMongoDB(); + const db = getDatabase(); + const collection = db.collection('masterExchanges'); + + const masterExchangeId = generateMasterExchangeId(ibExchange); + const now = new Date(); + + // Check if master exchange already exists + const existing = await collection.findOne({ masterExchangeId }); + + if (existing) { + // Update existing record + await collection.updateOne( + { masterExchangeId }, + { + $set: { + officialName: ibExchange.name || `Exchange ${ibExchange.id}`, + country: ibExchange.country_code || 'UNKNOWN', + currency: ibExchange.currency || 'USD', + timezone: inferTimezone(ibExchange), + updated_at: now, + }, + } + ); + + logger.debug('Updated existing master exchange', { masterExchangeId }); + } else { + // Create new master exchange + const masterExchange: MasterExchange = { + masterExchangeId, + shortName: masterExchangeId, // Set shortName to masterExchangeId on creation + officialName: ibExchange.name || `Exchange ${ibExchange.id}`, + country: ibExchange.country_code || 'UNKNOWN', + currency: ibExchange.currency || 'USD', + timezone: inferTimezone(ibExchange), + active: false, // Set active to false only on creation + + sourceMappings: { + ib: { + id: ibExchange.id || ibExchange._id?.toString() || 'unknown', + name: ibExchange.name || `Exchange ${ibExchange.id}`, + code: ibExchange.code || ibExchange.id || '', + aliases: generateAliases(ibExchange), + lastUpdated: now, + }, + }, + + confidence: 1.0, // High confidence for direct IB mapping + verified: true, // Mark as verified since it's direct from IB + + // DocumentBase fields + source: 'exchange-sync-provider', + created_at: now, + updated_at: now, + }; + + await collection.insertOne(masterExchange); + logger.debug('Created new master exchange', { masterExchangeId }); + } +} + +/** + * Get master exchange details + */ +async function getMasterExchangeDetails(masterExchangeId: string): Promise { + try { + const { connectMongoDB, getDatabase } = await import('@stock-bot/mongodb-client'); + + await connectMongoDB(); + const db = getDatabase(); + const collection = db.collection('masterExchanges'); + + return await collection.findOne({ masterExchangeId }); + } catch (error) { + logger.error('Error getting master exchange details', { masterExchangeId, error }); + return null; + } +} + +/** + * Generate master exchange ID from IB exchange + */ +function generateMasterExchangeId(ibExchange: { + name?: string; + code?: string; + id?: string; +}): string { + // Use code if available, otherwise use ID, otherwise generate from name + if (ibExchange.code) { + return ibExchange.code.toUpperCase().replace(/[^A-Z0-9]/g, ''); + } + + if (ibExchange.id) { + return ibExchange.id.toUpperCase().replace(/[^A-Z0-9]/g, ''); + } + + if (ibExchange.name) { + return ibExchange.name + .toUpperCase() + .split(' ') + .slice(0, 2) + .join('_') + .replace(/[^A-Z0-9_]/g, ''); + } + + return 'UNKNOWN_EXCHANGE'; +} + +/** + * Generate aliases for the exchange + */ +function generateAliases(ibExchange: { name?: string; code?: string }): string[] { + const aliases: string[] = []; + + if (ibExchange.name && ibExchange.name.includes(' ')) { + // Add abbreviated version + aliases.push( + ibExchange.name + .split(' ') + .map(w => w[0]) + .join('') + .toUpperCase() + ); + } + + if (ibExchange.code) { + aliases.push(ibExchange.code.toUpperCase()); + } + + return aliases; +} + +/** + * Infer timezone from exchange name/location + */ +function inferTimezone(ibExchange: { name?: string }): string { + if (!ibExchange.name) { + return 'UTC'; + } + + const name = ibExchange.name.toUpperCase(); + + if (name.includes('NEW YORK') || name.includes('NYSE') || name.includes('NASDAQ')) { + return 'America/New_York'; + } + if (name.includes('LONDON')) { + return 'Europe/London'; + } + if (name.includes('TOKYO')) { + return 'Asia/Tokyo'; + } + if (name.includes('SHANGHAI')) { + return 'Asia/Shanghai'; + } + if (name.includes('TORONTO')) { + return 'America/Toronto'; + } + if (name.includes('FRANKFURT')) { + return 'Europe/Berlin'; + } + + return 'UTC'; // Default +} diff --git a/apps/data-service/src/providers/proxy.provider.ts b/apps/data-service/src/providers/proxy.provider.ts index eac5dbf..657b3d4 100644 --- a/apps/data-service/src/providers/proxy.provider.ts +++ b/apps/data-service/src/providers/proxy.provider.ts @@ -75,10 +75,10 @@ export function initializeProxyProvider() { type: 'proxy-fetch-and-check', operation: 'fetch-from-sources', payload: {}, - cronPattern: '0 */2 * * *', // Every 2 hours + cronPattern: '0 0 * * 0', // Every week at midnight on Sunday priority: 5, description: 'Fetch and validate proxy list from sources', - immediately: true, // Don't run immediately during startup to avoid conflicts + // immediately: true, // Don't run immediately during startup to avoid conflicts }, ], }; diff --git a/apps/data-service/src/routes/exchange.routes.ts b/apps/data-service/src/routes/exchange.routes.ts new file mode 100644 index 0000000..de96151 --- /dev/null +++ b/apps/data-service/src/routes/exchange.routes.ts @@ -0,0 +1,128 @@ +/** + * Exchange Routes - Simple API endpoints for exchange management + */ +import { Hono } from 'hono'; +import { getLogger } from '@stock-bot/logger'; +import type { MasterExchange } from '@stock-bot/mongodb-client'; +import { connectMongoDB, getDatabase } from '@stock-bot/mongodb-client'; +import { queueManager } from '../index'; + +const logger = getLogger('exchange-routes'); + +export const exchangeRoutes = new Hono(); + +// Get master exchange details +exchangeRoutes.get('/api/exchanges/:masterExchangeId', async c => { + try { + const masterExchangeId = c.req.param('masterExchangeId'); + + await connectMongoDB(); + const db = getDatabase(); + const collection = db.collection('masterExchanges'); + const exchange = await collection.findOne({ masterExchangeId }); + + if (!exchange) { + return c.json({ error: 'Exchange not found' }, 404); + } + + return c.json({ + status: 'success', + data: exchange, + }); + } catch (error) { + logger.error('Error getting exchange details', { error }); + return c.json({ error: 'Internal server error' }, 500); + } +}); + +// Get source mapping +exchangeRoutes.get('/api/exchanges/mapping/:sourceName/:sourceId', async c => { + try { + const sourceName = c.req.param('sourceName'); + const sourceId = c.req.param('sourceId'); + + await connectMongoDB(); + const db = getDatabase(); + const collection = db.collection('masterExchanges'); + + const result = await collection.findOne( + { [`sourceMappings.${sourceName}.id`]: sourceId }, + { projection: { masterExchangeId: 1 } } + ); + + if (!result) { + return c.json({ error: 'Mapping not found' }, 404); + } + + return c.json({ + status: 'success', + data: { masterExchangeId: result.masterExchangeId, sourceName, sourceId }, + }); + } catch (error) { + logger.error('Error getting exchange mapping', { error }); + return c.json({ error: 'Internal server error' }, 500); + } +}); + +// Get all exchanges from a specific source +exchangeRoutes.get('/api/exchanges/source/:sourceName', async c => { + try { + const sourceName = c.req.param('sourceName'); + + await connectMongoDB(); + const db = getDatabase(); + const collection = db.collection('masterExchanges'); + + const results = await collection + .find( + { [`sourceMappings.${sourceName}`]: { $exists: true } }, + { + projection: { + masterExchangeId: 1, + [`sourceMappings.${sourceName}`]: 1, + }, + } + ) + .toArray(); + + const exchanges = results.map(result => ({ + masterExchangeId: result.masterExchangeId, + sourceMapping: result.sourceMappings[sourceName], + })); + + return c.json({ + status: 'success', + data: { + sourceName, + count: exchanges.length, + exchanges, + }, + }); + } catch (error) { + logger.error('Error getting source exchanges', { error }); + return c.json({ error: 'Internal server error' }, 500); + } +}); + +// Trigger exchange sync +exchangeRoutes.post('/api/exchanges/sync', async c => { + try { + const job = await queueManager.add('exchange-sync', { + type: 'exchange-sync', + provider: 'exchange-sync', + operation: 'sync-ib-exchanges', + payload: {}, + priority: 2, + }); + + return c.json({ + status: 'success', + message: 'IB exchange sync job queued', + jobId: job.id, + operation: 'sync-ib-exchanges', + }); + } catch (error) { + logger.error('Error triggering exchange sync', { error }); + return c.json({ error: 'Internal server error' }, 500); + } +}); diff --git a/apps/data-service/src/routes/index.ts b/apps/data-service/src/routes/index.ts index 9a1af7d..2dc8cfe 100644 --- a/apps/data-service/src/routes/index.ts +++ b/apps/data-service/src/routes/index.ts @@ -1,5 +1,6 @@ /** * Routes index - exports all route modules */ +export { exchangeRoutes } from './exchange.routes'; export { healthRoutes } from './health.routes'; export { queueRoutes } from './queue.routes'; diff --git a/libs/mongodb-client/src/index.ts b/libs/mongodb-client/src/index.ts index d132030..72f385d 100644 --- a/libs/mongodb-client/src/index.ts +++ b/libs/mongodb-client/src/index.ts @@ -12,6 +12,8 @@ export type { AnalystReport, DocumentBase, EarningsTranscript, + ExchangeSourceMapping, + MasterExchange, NewsArticle, RawDocument, SecFiling, diff --git a/libs/mongodb-client/src/types.ts b/libs/mongodb-client/src/types.ts index cb54811..4680f10 100644 --- a/libs/mongodb-client/src/types.ts +++ b/libs/mongodb-client/src/types.ts @@ -198,3 +198,34 @@ export interface AnalystReport extends DocumentBase { key_points: string[]; financial_projections?: Record; } + +/** + * Exchange-related types + */ +export interface ExchangeSourceMapping { + id: string; + name: string; + code?: string; + aliases?: string[]; + lastUpdated: Date; +} + +export interface MasterExchange extends DocumentBase { + masterExchangeId: string; + shortName?: string; + officialName: string; + country: string; + currency: string; + timezone: string; + active?: boolean; + + tradingHours?: { + open: string; + close: string; + timezone: string; + }; + + sourceMappings: Record; + confidence: number; + verified: boolean; +}