/** * Enhanced Sync Manager - Improved syncing with comprehensive exchange data */ import { getLogger } from '@stock-bot/logger'; import { MongoDBClient } from '@stock-bot/mongodb-client'; import { PostgreSQLClient } from '@stock-bot/postgres-client'; import { getMongoDBClient, getPostgreSQLClient } from '../clients'; const logger = getLogger('enhanced-sync-manager'); interface ExchangeMapping { id: string; code: string; name: string; country: string; currency: string; } interface SyncResult { processed: number; created: number; updated: number; skipped: number; errors: number; } interface SyncStatus { provider: string; dataType: string; lastSyncAt?: Date; lastSyncCount: number; syncErrors?: string; } export class EnhancedSyncManager { private isInitialized = false; private mongoClient: MongoDBClient; private postgresClient: PostgreSQLClient; private exchangeCache: Map = new Map(); async initialize(): Promise { if (this.isInitialized) { logger.warn('Enhanced sync manager already initialized'); return; } try { this.mongoClient = getMongoDBClient(); this.postgresClient = getPostgreSQLClient(); // Pre-load exchange mappings for performance await this.loadExchangeCache(); this.isInitialized = true; logger.info('Enhanced sync manager initialized successfully'); } catch (error) { logger.error('Failed to initialize enhanced sync manager', { error }); throw error; } } /** * Helper method to get MongoDB database for direct queries */ private getMongoDatabase() { return this.mongoClient.getDatabase(); } async shutdown(): Promise { if (!this.isInitialized) { return; } logger.info('Shutting down enhanced sync manager...'); this.exchangeCache.clear(); this.isInitialized = false; logger.info('Enhanced sync manager shut down successfully'); } /** * Clear all exchange and symbol data from PostgreSQL */ async clearPostgreSQLData(): Promise<{ exchangesCleared: number; symbolsCleared: number; mappingsCleared: number; }> { if (!this.isInitialized) { throw new Error('Enhanced sync manager not initialized'); } logger.info('Clearing existing PostgreSQL data...'); try { // Start transaction for atomic operations await this.postgresClient.query('BEGIN'); // Get counts before clearing const exchangeCountResult = await this.postgresClient.query( 'SELECT COUNT(*) as count FROM exchanges' ); const symbolCountResult = await this.postgresClient.query( 'SELECT COUNT(*) as count FROM symbols' ); const mappingCountResult = await this.postgresClient.query( 'SELECT COUNT(*) as count FROM provider_mappings' ); const exchangesCleared = parseInt(exchangeCountResult.rows[0].count); const symbolsCleared = parseInt(symbolCountResult.rows[0].count); const mappingsCleared = parseInt(mappingCountResult.rows[0].count); // Clear data in correct order (respect foreign keys) await this.postgresClient.query('DELETE FROM provider_mappings'); await this.postgresClient.query('DELETE FROM symbols'); await this.postgresClient.query('DELETE FROM exchanges'); // Reset sync status await this.postgresClient.query( 'UPDATE sync_status SET last_sync_at = NULL, last_sync_count = 0, sync_errors = NULL' ); await this.postgresClient.query('COMMIT'); logger.info('PostgreSQL data cleared successfully', { exchangesCleared, symbolsCleared, mappingsCleared, }); return { exchangesCleared, symbolsCleared, mappingsCleared }; } catch (error) { await this.postgresClient.query('ROLLBACK'); logger.error('Failed to clear PostgreSQL data', { error }); throw error; } } /** * Comprehensive exchange sync from all MongoDB providers */ async syncAllExchanges(clearFirst: boolean = true): Promise { if (!this.isInitialized) { throw new Error('Enhanced sync manager not initialized'); } logger.info('Starting comprehensive exchange sync...', { clearFirst }); const result: SyncResult = { processed: 0, created: 0, updated: 0, skipped: 0, errors: 0, }; try { // Clear existing data if requested if (clearFirst) { await this.clearPostgreSQLData(); } // Start transaction for atomic operations await this.postgresClient.query('BEGIN'); // 1. Sync from EOD exchanges (comprehensive global data) const eodResult = await this.syncEODExchanges(); this.mergeResults(result, eodResult); // 2. Sync from IB exchanges (detailed asset information) const ibResult = await this.syncIBExchanges(); this.mergeResults(result, ibResult); // 3. Update sync status await this.updateSyncStatus('all', 'exchanges', result.processed); await this.postgresClient.query('COMMIT'); // Refresh exchange cache with new data await this.loadExchangeCache(); logger.info('Comprehensive exchange sync completed', result); return result; } catch (error) { await this.postgresClient.query('ROLLBACK'); logger.error('Comprehensive exchange sync failed', { error }); throw error; } } /** * Sync QM provider exchange mappings */ async syncQMProviderMappings(): Promise { if (!this.isInitialized) { throw new Error('Enhanced sync manager not initialized'); } logger.info('Starting QM provider exchange mappings sync...'); const result: SyncResult = { processed: 0, created: 0, updated: 0, skipped: 0, errors: 0, }; try { // Start transaction await this.postgresClient.query('BEGIN'); // Get unique exchange combinations from QM symbols const db = this.getMongoDatabase(); const pipeline = [ { $group: { _id: { exchangeCode: '$exchangeCode', exchange: '$exchange', countryCode: '$countryCode', }, count: { $sum: 1 }, sampleExchange: { $first: '$exchange' }, }, }, { $project: { exchangeCode: '$_id.exchangeCode', exchange: '$_id.exchange', countryCode: '$_id.countryCode', count: 1, sampleExchange: 1, }, }, ]; const qmExchanges = await db.collection('qmSymbols').aggregate(pipeline).toArray(); logger.info(`Found ${qmExchanges.length} unique QM exchange combinations`); for (const exchange of qmExchanges) { try { // Create provider exchange mapping for QM await this.createProviderExchangeMapping( 'qm', // provider exchange.exchangeCode, exchange.sampleExchange || exchange.exchangeCode, exchange.countryCode, exchange.countryCode === 'CA' ? 'CAD' : 'USD', // Simple currency mapping 0.8 // good confidence for QM data ); result.processed++; result.created++; } catch (error) { logger.error('Failed to process QM exchange mapping', { error, exchange }); result.errors++; } } await this.postgresClient.query('COMMIT'); logger.info('QM provider exchange mappings sync completed', result); return result; } catch (error) { await this.postgresClient.query('ROLLBACK'); logger.error('QM provider exchange mappings sync failed', { error }); throw error; } } /** * Enhanced symbol sync with multi-provider mapping */ async syncSymbolsFromProvider( provider: string, clearFirst: boolean = false ): Promise { if (!this.isInitialized) { throw new Error('Enhanced sync manager not initialized'); } logger.info(`Starting ${provider} symbols sync...`, { clearFirst }); const result: SyncResult = { processed: 0, created: 0, updated: 0, skipped: 0, errors: 0, }; try { // Clear existing data if requested (only symbols and mappings, keep exchanges) if (clearFirst) { await this.postgresClient.query('BEGIN'); await this.postgresClient.query('DELETE FROM provider_mappings'); await this.postgresClient.query('DELETE FROM symbols'); await this.postgresClient.query('COMMIT'); logger.info('Cleared existing symbols and mappings before sync'); } // Start transaction await this.postgresClient.query('BEGIN'); let symbols: Record[] = []; // Get symbols based on provider const db = this.getMongoDatabase(); switch (provider.toLowerCase()) { case 'qm': symbols = await db.collection('qmSymbols').find({}).toArray(); break; case 'eod': symbols = await db.collection('eodSymbols').find({}).toArray(); break; case 'ib': symbols = await db.collection('ibSymbols').find({}).toArray(); break; default: throw new Error(`Unsupported provider: ${provider}`); } logger.info(`Found ${symbols.length} ${provider} symbols to process`); result.processed = symbols.length; for (const symbol of symbols) { try { await this.processSingleSymbol(symbol, provider, result); } catch (error) { logger.error('Failed to process symbol', { error, symbol: symbol.symbol || symbol.code, provider, }); result.errors++; } } // Update sync status await this.updateSyncStatus(provider, 'symbols', result.processed); await this.postgresClient.query('COMMIT'); logger.info(`${provider} symbols sync completed`, result); return result; } catch (error) { await this.postgresClient.query('ROLLBACK'); logger.error(`${provider} symbols sync failed`, { error }); throw error; } } /** * Get comprehensive sync status */ async getSyncStatus(): Promise { const query = ` SELECT provider, data_type as "dataType", last_sync_at as "lastSyncAt", last_sync_count as "lastSyncCount", sync_errors as "syncErrors" FROM sync_status ORDER BY provider, data_type `; const result = await this.postgresClient.query(query); return result.rows; } /** * Get exchange statistics */ async getExchangeStats(): Promise { const query = ` SELECT COUNT(*) as total_exchanges, COUNT(CASE WHEN active = true THEN 1 END) as active_exchanges, COUNT(DISTINCT country) as countries, COUNT(DISTINCT currency) as currencies FROM exchanges `; const result = await this.postgresClient.query(query); return result.rows[0]; } /** * Get provider exchange mapping statistics */ async getProviderMappingStats(): Promise { const query = ` SELECT provider, COUNT(*) as total_mappings, COUNT(CASE WHEN active = true THEN 1 END) as active_mappings, COUNT(CASE WHEN verified = true THEN 1 END) as verified_mappings, COUNT(CASE WHEN auto_mapped = true THEN 1 END) as auto_mapped, AVG(confidence) as avg_confidence FROM provider_exchange_mappings GROUP BY provider ORDER BY provider `; const result = await this.postgresClient.query(query); return result.rows; } // Private helper methods private async loadExchangeCache(): Promise { const query = 'SELECT id, code, name, country, currency FROM exchanges'; const result = await this.postgresClient.query(query); this.exchangeCache.clear(); for (const exchange of result.rows) { this.exchangeCache.set(exchange.code.toUpperCase(), exchange); } logger.info(`Loaded ${this.exchangeCache.size} exchanges into cache`); } private async syncEODExchanges(): Promise { const db = this.getMongoDatabase(); const exchanges = await db.collection('eodExchanges').find({ active: true }).toArray(); const result: SyncResult = { processed: 0, created: 0, updated: 0, skipped: 0, errors: 0 }; for (const exchange of exchanges) { try { // Create provider exchange mapping for EOD await this.createProviderExchangeMapping( 'eod', // provider exchange.Code, exchange.Name, exchange.CountryISO2, exchange.Currency, 0.95 // very high confidence for EOD data ); result.processed++; result.created++; // Count as created mapping } catch (error) { logger.error('Failed to process EOD exchange', { error, exchange }); result.errors++; } } return result; } private async syncIBExchanges(): Promise { const db = this.getMongoDatabase(); const exchanges = await db.collection('ibExchanges').find({}).toArray(); const result: SyncResult = { processed: 0, created: 0, updated: 0, skipped: 0, errors: 0 }; for (const exchange of exchanges) { try { // Create provider exchange mapping for IB await this.createProviderExchangeMapping( 'ib', // provider exchange.exchange_id, exchange.name, exchange.country_code, 'USD', // IB doesn't specify currency, default to USD 0.85 // good confidence for IB data ); result.processed++; result.created++; // Count as created mapping } catch (error) { logger.error('Failed to process IB exchange', { error, exchange }); result.errors++; } } return result; } /** * Create or update a provider exchange mapping * This method intelligently maps provider exchanges to master exchanges */ private async createProviderExchangeMapping( provider: string, providerExchangeCode: string, providerExchangeName: string, countryCode: string | null, currency: string | null, confidence: number ): Promise { if (!providerExchangeCode) { return; } // Check if mapping already exists const existingMapping = await this.findProviderExchangeMapping(provider, providerExchangeCode); if (existingMapping) { // Don't override existing mappings to preserve manual work return; } // Find or create master exchange const masterExchange = await this.findOrCreateMasterExchange( providerExchangeCode, providerExchangeName, countryCode, currency ); // Create the provider exchange mapping const query = ` INSERT INTO provider_exchange_mappings (provider, provider_exchange_code, provider_exchange_name, master_exchange_id, country_code, currency, confidence, active, auto_mapped) VALUES ($1, $2, $3, $4, $5, $6, $7, false, true) ON CONFLICT (provider, provider_exchange_code) DO NOTHING `; await this.postgresClient.query(query, [ provider, providerExchangeCode, providerExchangeName, masterExchange.id, countryCode, currency, confidence, ]); } /** * Find or create a master exchange based on provider data */ private async findOrCreateMasterExchange( providerCode: string, providerName: string, countryCode: string | null, currency: string | null ): Promise { // First, try to find exact match let masterExchange = await this.findExchangeByCode(providerCode); if (masterExchange) { return masterExchange; } // Try to find by similar codes (basic mapping) const basicMapping = this.getBasicExchangeMapping(providerCode); if (basicMapping) { masterExchange = await this.findExchangeByCode(basicMapping); if (masterExchange) { return masterExchange; } } // Create new master exchange (inactive by default) const query = ` INSERT INTO exchanges (code, name, country, currency, active) VALUES ($1, $2, $3, $4, false) ON CONFLICT (code) DO UPDATE SET name = COALESCE(EXCLUDED.name, exchanges.name), country = COALESCE(EXCLUDED.country, exchanges.country), currency = COALESCE(EXCLUDED.currency, exchanges.currency) RETURNING id, code, name, country, currency `; const result = await this.postgresClient.query(query, [ providerCode, providerName || providerCode, countryCode || 'US', currency || 'USD', ]); const newExchange = result.rows[0]; // Update cache this.exchangeCache.set(newExchange.code.toUpperCase(), newExchange); return newExchange; } /** * Basic exchange code mapping for common cases */ private getBasicExchangeMapping(providerCode: string): string | null { const mappings: Record = { NYE: 'NYSE', NAS: 'NASDAQ', TO: 'TSX', LN: 'LSE', LON: 'LSE', }; return mappings[providerCode.toUpperCase()] || null; } private async findProviderExchangeMapping( provider: string, providerExchangeCode: string ): Promise { const query = 'SELECT * FROM provider_exchange_mappings WHERE provider = $1 AND provider_exchange_code = $2'; const result = await this.postgresClient.query(query, [provider, providerExchangeCode]); return result.rows[0] || null; } private async processSingleSymbol( symbol: any, provider: string, result: SyncResult ): Promise { const symbolCode = symbol.symbol || symbol.code; const exchangeCode = symbol.exchangeCode || symbol.exchange || symbol.exchange_id; if (!symbolCode || !exchangeCode) { result.skipped++; return; } // Find active provider exchange mapping const providerMapping = await this.findActiveProviderExchangeMapping(provider, exchangeCode); if (!providerMapping) { result.skipped++; return; } // Check if symbol exists const existingSymbol = await this.findSymbolByCodeAndExchange( symbolCode, providerMapping.master_exchange_id ); if (existingSymbol) { await this.updateSymbol(existingSymbol.id, symbol); await this.upsertProviderMapping(existingSymbol.id, provider, symbol); result.updated++; } else { const newSymbolId = await this.createSymbol(symbol, providerMapping.master_exchange_id); await this.upsertProviderMapping(newSymbolId, provider, symbol); result.created++; } } private async findActiveProviderExchangeMapping( provider: string, providerExchangeCode: string ): Promise { const query = ` SELECT pem.*, e.code as master_exchange_code FROM provider_exchange_mappings pem JOIN exchanges e ON pem.master_exchange_id = e.id WHERE pem.provider = $1 AND pem.provider_exchange_code = $2 AND pem.active = true `; const result = await this.postgresClient.query(query, [provider, providerExchangeCode]); return result.rows[0] || null; } private async findExchangeByCode(code: string): Promise { const query = 'SELECT * FROM exchanges WHERE code = $1'; const result = await this.postgresClient.query(query, [code]); return result.rows[0] || null; } private async findSymbolByCodeAndExchange(symbol: string, exchangeId: string): Promise { const query = 'SELECT * FROM symbols WHERE symbol = $1 AND exchange_id = $2'; const result = await this.postgresClient.query(query, [symbol, exchangeId]); return result.rows[0] || null; } private async createSymbol(symbol: any, exchangeId: string): Promise { const query = ` INSERT INTO symbols (symbol, exchange_id, company_name, country, currency) VALUES ($1, $2, $3, $4, $5) RETURNING id `; const result = await this.postgresClient.query(query, [ symbol.symbol || symbol.code, exchangeId, symbol.companyName || symbol.name || symbol.company_name, symbol.countryCode || symbol.country_code || 'US', symbol.currency || 'USD', ]); return result.rows[0].id; } private async updateSymbol(symbolId: string, symbol: any): Promise { const query = ` UPDATE symbols SET company_name = COALESCE($2, company_name), country = COALESCE($3, country), currency = COALESCE($4, currency), updated_at = NOW() WHERE id = $1 `; await this.postgresClient.query(query, [ symbolId, symbol.companyName || symbol.name || symbol.company_name, symbol.countryCode || symbol.country_code, symbol.currency, ]); } private async upsertProviderMapping( symbolId: string, provider: string, symbol: any ): Promise { const query = ` INSERT INTO provider_mappings (symbol_id, provider, provider_symbol, provider_exchange, last_seen) VALUES ($1, $2, $3, $4, NOW()) ON CONFLICT (provider, provider_symbol) DO UPDATE SET symbol_id = EXCLUDED.symbol_id, provider_exchange = EXCLUDED.provider_exchange, last_seen = NOW() `; await this.postgresClient.query(query, [ symbolId, provider, symbol.qmSearchCode || symbol.symbol || symbol.code, symbol.exchangeCode || symbol.exchange || symbol.exchange_id, ]); } private async updateSyncStatus(provider: string, dataType: string, count: number): Promise { const query = ` INSERT INTO sync_status (provider, data_type, last_sync_at, last_sync_count, sync_errors) VALUES ($1, $2, NOW(), $3, NULL) ON CONFLICT (provider, data_type) DO UPDATE SET last_sync_at = NOW(), last_sync_count = EXCLUDED.last_sync_count, sync_errors = NULL, updated_at = NOW() `; await this.postgresClient.query(query, [provider, dataType, count]); } private normalizeCountryCode(countryCode: string): string | null { if (!countryCode) { return null; } // Map common country variations to ISO 2-letter codes const countryMap: Record = { 'United States': 'US', USA: 'US', Canada: 'CA', 'United Kingdom': 'GB', UK: 'GB', Germany: 'DE', Japan: 'JP', Australia: 'AU', }; const normalized = countryMap[countryCode]; return normalized || (countryCode.length === 2 ? countryCode.toUpperCase() : null); } private mergeResults(target: SyncResult, source: SyncResult): void { target.processed += source.processed; target.created += source.created; target.updated += source.updated; target.skipped += source.skipped; target.errors += source.errors; } } // Export singleton instance export const enhancedSyncManager = new EnhancedSyncManager();