import { getLogger } from '@stock-bot/logger'; import { getMongoDBClient, getPostgreSQLClient } from '../../../clients'; import type { JobPayload, SyncResult } from '../../../types/job-payloads'; const logger = getLogger('enhanced-sync-symbols-from-provider'); export async function syncSymbolsFromProvider(payload: JobPayload): Promise { const provider = payload.provider; const clearFirst = payload.clearFirst || false; if (!provider) { throw new Error('Provider is required in payload'); } logger.info(`Starting ${provider} symbols sync...`, { clearFirst }); const result: SyncResult = { processed: 0, created: 0, updated: 0, skipped: 0, errors: 0, }; try { const mongoClient = getMongoDBClient(); const postgresClient = getPostgreSQLClient(); // Clear existing data if requested (only symbols and mappings, keep exchanges) if (clearFirst) { await postgresClient.query('BEGIN'); await postgresClient.query('DELETE FROM provider_mappings'); await postgresClient.query('DELETE FROM symbols'); await postgresClient.query('COMMIT'); logger.info('Cleared existing symbols and mappings before sync'); } // Start transaction await postgresClient.query('BEGIN'); let symbols: Record[] = []; // Get symbols based on provider const db = mongoClient.getDatabase(); switch (provider.toLowerCase()) { case 'qm': symbols = await db.collection('qmSymbols').find({}).toArray(); break; case 'eod': symbols = await db.collection('eodSymbols').find({}).toArray(); break; case 'ib': symbols = await db.collection('ibSymbols').find({}).toArray(); break; default: throw new Error(`Unsupported provider: ${provider}`); } logger.info(`Found ${symbols.length} ${provider} symbols to process`); result.processed = symbols.length; for (const symbol of symbols) { try { await processSingleSymbol(symbol, provider, result); } catch (error) { logger.error('Failed to process symbol', { error, symbol: symbol.symbol || symbol.code, provider, }); result.errors++; } } // Update sync status await updateSyncStatus(provider, 'symbols', result.processed, postgresClient); await postgresClient.query('COMMIT'); logger.info(`${provider} symbols sync completed`, result); return result; } catch (error) { const postgresClient = getPostgreSQLClient(); await postgresClient.query('ROLLBACK'); logger.error(`${provider} symbols sync failed`, { error }); throw error; } } async function processSingleSymbol(symbol: any, provider: string, result: SyncResult): Promise { const symbolCode = symbol.symbol || symbol.code; const exchangeCode = symbol.exchangeCode || symbol.exchange || symbol.exchange_id; if (!symbolCode || !exchangeCode) { result.skipped++; return; } // Find active provider exchange mapping const providerMapping = await findActiveProviderExchangeMapping(provider, exchangeCode); if (!providerMapping) { result.skipped++; return; } // Check if symbol exists const existingSymbol = await findSymbolByCodeAndExchange( symbolCode, providerMapping.master_exchange_id ); if (existingSymbol) { await updateSymbol(existingSymbol.id, symbol); await upsertProviderMapping(existingSymbol.id, provider, symbol); result.updated++; } else { const newSymbolId = await createSymbol(symbol, providerMapping.master_exchange_id); await upsertProviderMapping(newSymbolId, provider, symbol); result.created++; } } async function findActiveProviderExchangeMapping(provider: string, providerExchangeCode: string): Promise { const postgresClient = getPostgreSQLClient(); const query = ` SELECT pem.*, e.code as master_exchange_code FROM provider_exchange_mappings pem JOIN exchanges e ON pem.master_exchange_id = e.id WHERE pem.provider = $1 AND pem.provider_exchange_code = $2 AND pem.active = true `; const result = await postgresClient.query(query, [provider, providerExchangeCode]); return result.rows[0] || null; } async function findSymbolByCodeAndExchange(symbol: string, exchangeId: string): Promise { const postgresClient = getPostgreSQLClient(); const query = 'SELECT * FROM symbols WHERE symbol = $1 AND exchange_id = $2'; const result = await postgresClient.query(query, [symbol, exchangeId]); return result.rows[0] || null; } async function createSymbol(symbol: any, exchangeId: string): Promise { const postgresClient = getPostgreSQLClient(); const query = ` INSERT INTO symbols (symbol, exchange_id, company_name, country, currency) VALUES ($1, $2, $3, $4, $5) RETURNING id `; const result = await postgresClient.query(query, [ symbol.symbol || symbol.code, exchangeId, symbol.companyName || symbol.name || symbol.company_name, symbol.countryCode || symbol.country_code || 'US', symbol.currency || 'USD', ]); return result.rows[0].id; } async function updateSymbol(symbolId: string, symbol: any): Promise { const postgresClient = getPostgreSQLClient(); const query = ` UPDATE symbols SET company_name = COALESCE($2, company_name), country = COALESCE($3, country), currency = COALESCE($4, currency), updated_at = NOW() WHERE id = $1 `; await postgresClient.query(query, [ symbolId, symbol.companyName || symbol.name || symbol.company_name, symbol.countryCode || symbol.country_code, symbol.currency, ]); } async function upsertProviderMapping(symbolId: string, provider: string, symbol: any): Promise { const postgresClient = getPostgreSQLClient(); const query = ` INSERT INTO provider_mappings (symbol_id, provider, provider_symbol, provider_exchange, last_seen) VALUES ($1, $2, $3, $4, NOW()) ON CONFLICT (provider, provider_symbol) DO UPDATE SET symbol_id = EXCLUDED.symbol_id, provider_exchange = EXCLUDED.provider_exchange, last_seen = NOW() `; await postgresClient.query(query, [ symbolId, provider, symbol.qmSearchCode || symbol.symbol || symbol.code, symbol.exchangeCode || symbol.exchange || symbol.exchange_id, ]); } async function updateSyncStatus(provider: string, dataType: string, count: number, postgresClient: any): Promise { const query = ` INSERT INTO sync_status (provider, data_type, last_sync_at, last_sync_count, sync_errors) VALUES ($1, $2, NOW(), $3, NULL) ON CONFLICT (provider, data_type) DO UPDATE SET last_sync_at = NOW(), last_sync_count = EXCLUDED.last_sync_count, sync_errors = NULL, updated_at = NOW() `; await postgresClient.query(query, [provider, dataType, count]); }