diff --git a/apps/data-service/src/index.ts b/apps/data-service/src/index.ts index b026b20..8062df7 100644 --- a/apps/data-service/src/index.ts +++ b/apps/data-service/src/index.ts @@ -109,8 +109,8 @@ async function initializeServices() { type: 'exponential', delay: 1000, }, - removeOnComplete: true, - removeOnFail: false, + removeOnComplete: 10, + removeOnFail: 5, }, workers: 2, concurrency: 1, @@ -132,13 +132,11 @@ async function initializeServices() { // Initialize handlers (register handlers and scheduled jobs) logger.debug('Initializing data handlers...'); const { initializeWebShareProvider } = await import('./handlers/webshare/webshare.handler'); - const { initializeExchangeSyncProvider } = await import('./handlers/exchange-sync/exchange-sync.handler'); const { initializeIBProvider } = await import('./handlers/ib/ib.handler'); const { initializeProxyProvider } = await import('./handlers/proxy/proxy.handler'); const { initializeQMProvider } = await import('./handlers/qm/qm.handler'); initializeWebShareProvider(); - initializeExchangeSyncProvider(); initializeIBProvider(); initializeProxyProvider(); initializeQMProvider(); diff --git a/apps/data-sync-service/package.json b/apps/data-sync-service/package.json index b9506cd..bdf6351 100644 --- a/apps/data-sync-service/package.json +++ b/apps/data-sync-service/package.json @@ -12,10 +12,13 @@ "clean": "rm -rf dist" }, "dependencies": { + "@stock-bot/cache": "*", "@stock-bot/config": "*", "@stock-bot/logger": "*", "@stock-bot/mongodb-client": "*", "@stock-bot/postgres-client": "*", + "@stock-bot/questdb-client": "*", + "@stock-bot/queue": "*", "@stock-bot/shutdown": "*", "hono": "^4.0.0" }, diff --git a/apps/data-sync-service/src/clients.ts b/apps/data-sync-service/src/clients.ts deleted file mode 100644 index 5dfe266..0000000 --- a/apps/data-sync-service/src/clients.ts +++ /dev/null @@ -1,27 +0,0 @@ -import { PostgreSQLClient } from '@stock-bot/postgres-client'; -import { MongoDBClient } from '@stock-bot/mongodb-client'; - -let postgresClient: PostgreSQLClient | null = null; -let mongodbClient: MongoDBClient | null = null; - -export function setPostgreSQLClient(client: PostgreSQLClient): void { - postgresClient = client; -} - -export function getPostgreSQLClient(): PostgreSQLClient { - if (!postgresClient) { - throw new Error('PostgreSQL client not initialized. Call setPostgreSQLClient first.'); - } - return postgresClient; -} - -export function setMongoDBClient(client: MongoDBClient): void { - mongodbClient = client; -} - -export function getMongoDBClient(): MongoDBClient { - if (!mongodbClient) { - throw new Error('MongoDB client not initialized. Call setMongoDBClient first.'); - } - return mongodbClient; -} \ No newline at end of file diff --git a/apps/data-sync-service/src/handlers/exchanges/exchanges.handler.ts b/apps/data-sync-service/src/handlers/exchanges/exchanges.handler.ts new file mode 100644 index 0000000..06aa283 --- /dev/null +++ b/apps/data-sync-service/src/handlers/exchanges/exchanges.handler.ts @@ -0,0 +1,58 @@ +import { getLogger } from '@stock-bot/logger'; +import { handlerRegistry, type HandlerConfig, type ScheduledJobConfig } from '@stock-bot/queue'; +import { exchangeOperations } from './operations'; + +const logger = getLogger('exchanges-handler'); + +const HANDLER_NAME = 'exchanges'; + +const exchangesHandlerConfig: HandlerConfig = { + concurrency: 1, + maxAttempts: 3, + scheduledJobs: [ + { + operation: 'sync-all-exchanges', + cronPattern: '0 0 * * 0', // Weekly on Sunday at midnight + payload: { clearFirst: true }, + priority: 10, + immediately: false, + } as ScheduledJobConfig, + { + operation: 'sync-qm-exchanges', + cronPattern: '0 1 * * *', // Daily at 1 AM + payload: {}, + priority: 5, + immediately: false, + } as ScheduledJobConfig, + { + operation: 'sync-ib-exchanges', + cronPattern: '0 3 * * *', // Daily at 3 AM + payload: {}, + priority: 3, + immediately: false, + } as ScheduledJobConfig, + { + operation: 'sync-qm-provider-mappings', + cronPattern: '0 3 * * *', // Daily at 3 AM + payload: {}, + priority: 7, + immediately: false, + } as ScheduledJobConfig, + ], + operations: { + 'sync-all-exchanges': exchangeOperations.syncAllExchanges, + 'sync-qm-exchanges': exchangeOperations.syncQMExchanges, + 'sync-ib-exchanges': exchangeOperations.syncIBExchanges, + 'sync-qm-provider-mappings': exchangeOperations.syncQMProviderMappings, + 'clear-postgresql-data': exchangeOperations.clearPostgreSQLData, + 'get-exchange-stats': exchangeOperations.getExchangeStats, + 'get-provider-mapping-stats': exchangeOperations.getProviderMappingStats, + 'enhanced-sync-status': exchangeOperations['enhanced-sync-status'], + }, +}; + +export function initializeExchangesHandler(): void { + logger.info('Registering exchanges handler...'); + handlerRegistry.registerHandler(HANDLER_NAME, exchangesHandlerConfig); + logger.info('Exchanges handler registered successfully'); +} \ No newline at end of file diff --git a/apps/data-sync-service/src/handlers/exchanges/operations/clear-postgresql-data.operations.ts b/apps/data-sync-service/src/handlers/exchanges/operations/clear-postgresql-data.operations.ts new file mode 100644 index 0000000..8880530 --- /dev/null +++ b/apps/data-sync-service/src/handlers/exchanges/operations/clear-postgresql-data.operations.ts @@ -0,0 +1,60 @@ +import { getLogger } from '@stock-bot/logger'; +import { getPostgreSQLClient } from '@stock-bot/postgres-client'; +import type { JobPayload } from '../../../types/job-payloads'; + +const logger = getLogger('enhanced-sync-clear-postgresql-data'); + +export async function clearPostgreSQLData(payload: JobPayload): Promise<{ + exchangesCleared: number; + symbolsCleared: number; + mappingsCleared: number; +}> { + logger.info('Clearing existing PostgreSQL data...'); + + try { + const postgresClient = getPostgreSQLClient(); + + // Start transaction for atomic operations + await postgresClient.query('BEGIN'); + + // Get counts before clearing + const exchangeCountResult = await postgresClient.query( + 'SELECT COUNT(*) as count FROM exchanges' + ); + const symbolCountResult = await postgresClient.query( + 'SELECT COUNT(*) as count FROM symbols' + ); + const mappingCountResult = await postgresClient.query( + 'SELECT COUNT(*) as count FROM provider_mappings' + ); + + const exchangesCleared = parseInt(exchangeCountResult.rows[0].count); + const symbolsCleared = parseInt(symbolCountResult.rows[0].count); + const mappingsCleared = parseInt(mappingCountResult.rows[0].count); + + // Clear data in correct order (respect foreign keys) + await postgresClient.query('DELETE FROM provider_mappings'); + await postgresClient.query('DELETE FROM symbols'); + await postgresClient.query('DELETE FROM exchanges'); + + // Reset sync status + await postgresClient.query( + 'UPDATE sync_status SET last_sync_at = NULL, last_sync_count = 0, sync_errors = NULL' + ); + + await postgresClient.query('COMMIT'); + + logger.info('PostgreSQL data cleared successfully', { + exchangesCleared, + symbolsCleared, + mappingsCleared, + }); + + return { exchangesCleared, symbolsCleared, mappingsCleared }; + } catch (error) { + const postgresClient = getPostgreSQLClient(); + await postgresClient.query('ROLLBACK'); + logger.error('Failed to clear PostgreSQL data', { error }); + throw error; + } +} \ No newline at end of file diff --git a/apps/data-sync-service/src/handlers/exchanges/operations/enhanced-sync-status.operations.ts b/apps/data-sync-service/src/handlers/exchanges/operations/enhanced-sync-status.operations.ts new file mode 100644 index 0000000..2a42451 --- /dev/null +++ b/apps/data-sync-service/src/handlers/exchanges/operations/enhanced-sync-status.operations.ts @@ -0,0 +1,26 @@ +import { getLogger } from '@stock-bot/logger'; +import { getPostgreSQLClient } from '@stock-bot/postgres-client'; +import type { JobPayload, SyncStatus } from '../../../types/job-payloads'; + +const logger = getLogger('enhanced-sync-status'); + +export async function getSyncStatus(payload: JobPayload): Promise { + logger.info('Getting comprehensive sync status...'); + + try { + const postgresClient = getPostgreSQLClient(); + const query = ` + SELECT provider, data_type as "dataType", last_sync_at as "lastSyncAt", + last_sync_count as "lastSyncCount", sync_errors as "syncErrors" + FROM sync_status + ORDER BY provider, data_type + `; + const result = await postgresClient.query(query); + + logger.info(`Retrieved sync status for ${result.rows.length} entries`); + return result.rows; + } catch (error) { + logger.error('Failed to get sync status', { error }); + throw error; + } +} \ No newline at end of file diff --git a/apps/data-sync-service/src/handlers/exchanges/operations/exchange-stats.operations.ts b/apps/data-sync-service/src/handlers/exchanges/operations/exchange-stats.operations.ts new file mode 100644 index 0000000..74806d3 --- /dev/null +++ b/apps/data-sync-service/src/handlers/exchanges/operations/exchange-stats.operations.ts @@ -0,0 +1,28 @@ +import { getLogger } from '@stock-bot/logger'; +import { getPostgreSQLClient } from '@stock-bot/postgres-client'; +import type { JobPayload } from '../../../types/job-payloads'; + +const logger = getLogger('enhanced-sync-exchange-stats'); + +export async function getExchangeStats(payload: JobPayload): Promise { + logger.info('Getting exchange statistics...'); + + try { + const postgresClient = getPostgreSQLClient(); + const query = ` + SELECT + COUNT(*) as total_exchanges, + COUNT(CASE WHEN active = true THEN 1 END) as active_exchanges, + COUNT(DISTINCT country) as countries, + COUNT(DISTINCT currency) as currencies + FROM exchanges + `; + const result = await postgresClient.query(query); + + logger.info('Retrieved exchange statistics'); + return result.rows[0]; + } catch (error) { + logger.error('Failed to get exchange statistics', { error }); + throw error; + } +} \ No newline at end of file diff --git a/apps/data-sync-service/src/handlers/exchanges/operations/index.ts b/apps/data-sync-service/src/handlers/exchanges/operations/index.ts new file mode 100644 index 0000000..b5157d7 --- /dev/null +++ b/apps/data-sync-service/src/handlers/exchanges/operations/index.ts @@ -0,0 +1,19 @@ +import { syncAllExchanges } from './sync-all-exchanges.operations'; +import { syncQMExchanges } from './qm-exchanges.operations'; +import { syncIBExchanges } from './sync-ib-exchanges.operations'; +import { syncQMProviderMappings } from './sync-qm-provider-mappings.operations'; +import { clearPostgreSQLData } from './clear-postgresql-data.operations'; +import { getExchangeStats } from './exchange-stats.operations'; +import { getProviderMappingStats } from './provider-mapping-stats.operations'; +import { getSyncStatus } from './enhanced-sync-status.operations'; + +export const exchangeOperations = { + syncAllExchanges, + syncQMExchanges, + syncIBExchanges, + syncQMProviderMappings, + clearPostgreSQLData, + getExchangeStats, + getProviderMappingStats, + 'enhanced-sync-status': getSyncStatus, +}; \ No newline at end of file diff --git a/apps/data-sync-service/src/handlers/exchanges/operations/provider-mapping-stats.operations.ts b/apps/data-sync-service/src/handlers/exchanges/operations/provider-mapping-stats.operations.ts new file mode 100644 index 0000000..416f8dc --- /dev/null +++ b/apps/data-sync-service/src/handlers/exchanges/operations/provider-mapping-stats.operations.ts @@ -0,0 +1,32 @@ +import { getLogger } from '@stock-bot/logger'; +import { getPostgreSQLClient } from '@stock-bot/postgres-client'; +import type { JobPayload } from '../../../types/job-payloads'; + +const logger = getLogger('enhanced-sync-provider-mapping-stats'); + +export async function getProviderMappingStats(payload: JobPayload): Promise { + logger.info('Getting provider mapping statistics...'); + + try { + const postgresClient = getPostgreSQLClient(); + const query = ` + SELECT + provider, + COUNT(*) as total_mappings, + COUNT(CASE WHEN active = true THEN 1 END) as active_mappings, + COUNT(CASE WHEN verified = true THEN 1 END) as verified_mappings, + COUNT(CASE WHEN auto_mapped = true THEN 1 END) as auto_mapped, + AVG(confidence) as avg_confidence + FROM provider_exchange_mappings + GROUP BY provider + ORDER BY provider + `; + const result = await postgresClient.query(query); + + logger.info('Retrieved provider mapping statistics'); + return result.rows; + } catch (error) { + logger.error('Failed to get provider mapping statistics', { error }); + throw error; + } +} \ No newline at end of file diff --git a/apps/data-sync-service/src/handlers/exchanges/operations/qm-exchanges.operations.ts b/apps/data-sync-service/src/handlers/exchanges/operations/qm-exchanges.operations.ts new file mode 100644 index 0000000..5646854 --- /dev/null +++ b/apps/data-sync-service/src/handlers/exchanges/operations/qm-exchanges.operations.ts @@ -0,0 +1,103 @@ +import { getLogger } from '@stock-bot/logger'; +import { getMongoDBClient } from '@stock-bot/mongodb-client'; +import { getPostgreSQLClient } from '@stock-bot/postgres-client'; +import type { JobPayload } from '../../../types/job-payloads'; + +const logger = getLogger('sync-qm-exchanges'); + +export async function syncQMExchanges(payload: JobPayload): Promise<{ processed: number; created: number; updated: number }> { + logger.info('Starting QM exchanges sync...'); + + try { + const mongoClient = getMongoDBClient(); + const postgresClient = getPostgreSQLClient(); + + // 1. Get all QM exchanges from MongoDB + const qmExchanges = await mongoClient.find('qmExchanges', {}); + logger.info(`Found ${qmExchanges.length} QM exchanges to process`); + + let created = 0; + let updated = 0; + + for (const exchange of qmExchanges) { + try { + // 2. Check if exchange exists + const existingExchange = await findExchange(exchange.exchangeCode, postgresClient); + + if (existingExchange) { + // Update existing + await updateExchange(existingExchange.id, exchange, postgresClient); + updated++; + } else { + // Create new + await createExchange(exchange, postgresClient); + created++; + } + } catch (error) { + logger.error('Failed to process exchange', { error, exchange: exchange.exchangeCode }); + } + } + + // 3. Update sync status + await updateSyncStatus('qm', 'exchanges', qmExchanges.length, postgresClient); + + const result = { processed: qmExchanges.length, created, updated }; + logger.info('QM exchanges sync completed', result); + return result; + } catch (error) { + logger.error('QM exchanges sync failed', { error }); + throw error; + } +} + +// Helper functions +async function findExchange(exchangeCode: string, postgresClient: any): Promise { + const query = 'SELECT * FROM exchanges WHERE code = $1'; + const result = await postgresClient.query(query, [exchangeCode]); + return result.rows[0] || null; +} + +async function createExchange(qmExchange: any, postgresClient: any): Promise { + const query = ` + INSERT INTO exchanges (code, name, country, currency, visible) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (code) DO NOTHING + `; + + await postgresClient.query(query, [ + qmExchange.exchangeCode || qmExchange.exchange, + qmExchange.exchangeShortName || qmExchange.name, + qmExchange.countryCode || 'US', + 'USD', // Default currency, can be improved + true, // New exchanges are visible by default + ]); +} + +async function updateExchange(exchangeId: string, qmExchange: any, postgresClient: any): Promise { + const query = ` + UPDATE exchanges + SET name = COALESCE($2, name), + country = COALESCE($3, country), + updated_at = NOW() + WHERE id = $1 + `; + + await postgresClient.query(query, [ + exchangeId, + qmExchange.exchangeShortName || qmExchange.name, + qmExchange.countryCode, + ]); +} + +async function updateSyncStatus(provider: string, dataType: string, count: number, postgresClient: any): Promise { + const query = ` + UPDATE sync_status + SET last_sync_at = NOW(), + last_sync_count = $3, + sync_errors = NULL, + updated_at = NOW() + WHERE provider = $1 AND data_type = $2 + `; + + await postgresClient.query(query, [provider, dataType, count]); +} \ No newline at end of file diff --git a/apps/data-sync-service/src/handlers/exchanges/operations/sync-all-exchanges.operations.ts b/apps/data-sync-service/src/handlers/exchanges/operations/sync-all-exchanges.operations.ts new file mode 100644 index 0000000..5c289dd --- /dev/null +++ b/apps/data-sync-service/src/handlers/exchanges/operations/sync-all-exchanges.operations.ts @@ -0,0 +1,267 @@ +import { getLogger } from '@stock-bot/logger'; +import { getMongoDBClient } from "@stock-bot/mongodb-client"; +import { getPostgreSQLClient } from '@stock-bot/postgres-client'; +import type { JobPayload, SyncResult } from '../../../types/job-payloads'; + +const logger = getLogger('enhanced-sync-all-exchanges'); + +export async function syncAllExchanges(payload: JobPayload): Promise { + const clearFirst = payload.clearFirst || true; + logger.info('Starting comprehensive exchange sync...', { clearFirst }); + + const result: SyncResult = { + processed: 0, + created: 0, + updated: 0, + skipped: 0, + errors: 0, + }; + + try { + const postgresClient = getPostgreSQLClient(); + + // Clear existing data if requested + if (clearFirst) { + await clearPostgreSQLData(postgresClient); + } + + // Start transaction for atomic operations + await postgresClient.query('BEGIN'); + + // 1. Sync from EOD exchanges (comprehensive global data) + const eodResult = await syncEODExchanges(); + mergeResults(result, eodResult); + + // 2. Sync from IB exchanges (detailed asset information) + const ibResult = await syncIBExchanges(); + mergeResults(result, ibResult); + + // 3. Update sync status + await updateSyncStatus('all', 'exchanges', result.processed, postgresClient); + + await postgresClient.query('COMMIT'); + + logger.info('Comprehensive exchange sync completed', result); + return result; + } catch (error) { + const postgresClient = getPostgreSQLClient(); + await postgresClient.query('ROLLBACK'); + logger.error('Comprehensive exchange sync failed', { error }); + throw error; + } +} + +async function clearPostgreSQLData(postgresClient: any): Promise { + logger.info('Clearing existing PostgreSQL data...'); + + // Clear data in correct order (respect foreign keys) + await postgresClient.query('DELETE FROM provider_mappings'); + await postgresClient.query('DELETE FROM symbols'); + await postgresClient.query('DELETE FROM exchanges'); + + // Reset sync status + await postgresClient.query( + 'UPDATE sync_status SET last_sync_at = NULL, last_sync_count = 0, sync_errors = NULL' + ); + + logger.info('PostgreSQL data cleared successfully'); +} + +async function syncEODExchanges(): Promise { + const mongoClient = getMongoDBClient(); + const exchanges = await mongoClient.find('eodExchanges', { active: true }); + const result: SyncResult = { processed: 0, created: 0, updated: 0, skipped: 0, errors: 0 }; + + for (const exchange of exchanges) { + try { + // Create provider exchange mapping for EOD + await createProviderExchangeMapping( + 'eod', // provider + exchange.Code, + exchange.Name, + exchange.CountryISO2, + exchange.Currency, + 0.95 // very high confidence for EOD data + ); + + result.processed++; + result.created++; // Count as created mapping + } catch (error) { + logger.error('Failed to process EOD exchange', { error, exchange }); + result.errors++; + } + } + + return result; +} + +async function syncIBExchanges(): Promise { + const mongoClient = getMongoDBClient(); + const exchanges = await mongoClient.find('ibExchanges', {}); + const result: SyncResult = { processed: 0, created: 0, updated: 0, skipped: 0, errors: 0 }; + + for (const exchange of exchanges) { + try { + // Create provider exchange mapping for IB + await createProviderExchangeMapping( + 'ib', // provider + exchange.exchange_id, + exchange.name, + exchange.country_code, + 'USD', // IB doesn't specify currency, default to USD + 0.85 // good confidence for IB data + ); + + result.processed++; + result.created++; // Count as created mapping + } catch (error) { + logger.error('Failed to process IB exchange', { error, exchange }); + result.errors++; + } + } + + return result; +} + +async function createProviderExchangeMapping( + provider: string, + providerExchangeCode: string, + providerExchangeName: string, + countryCode: string | null, + currency: string | null, + confidence: number +): Promise { + if (!providerExchangeCode) { + return; + } + + const postgresClient = getPostgreSQLClient(); + + // Check if mapping already exists + const existingMapping = await findProviderExchangeMapping(provider, providerExchangeCode); + if (existingMapping) { + // Don't override existing mappings to preserve manual work + return; + } + + // Find or create master exchange + const masterExchange = await findOrCreateMasterExchange( + providerExchangeCode, + providerExchangeName, + countryCode, + currency + ); + + // Create the provider exchange mapping + const query = ` + INSERT INTO provider_exchange_mappings + (provider, provider_exchange_code, provider_exchange_name, master_exchange_id, + country_code, currency, confidence, active, auto_mapped) + VALUES ($1, $2, $3, $4, $5, $6, $7, false, true) + ON CONFLICT (provider, provider_exchange_code) DO NOTHING + `; + + await postgresClient.query(query, [ + provider, + providerExchangeCode, + providerExchangeName, + masterExchange.id, + countryCode, + currency, + confidence, + ]); +} + +async function findOrCreateMasterExchange( + providerCode: string, + providerName: string, + countryCode: string | null, + currency: string | null +): Promise { + const postgresClient = getPostgreSQLClient(); + + // First, try to find exact match + let masterExchange = await findExchangeByCode(providerCode); + + if (masterExchange) { + return masterExchange; + } + + // Try to find by similar codes (basic mapping) + const basicMapping = getBasicExchangeMapping(providerCode); + if (basicMapping) { + masterExchange = await findExchangeByCode(basicMapping); + if (masterExchange) { + return masterExchange; + } + } + + // Create new master exchange (inactive by default) + const query = ` + INSERT INTO exchanges (code, name, country, currency, active) + VALUES ($1, $2, $3, $4, false) + ON CONFLICT (code) DO UPDATE SET + name = COALESCE(EXCLUDED.name, exchanges.name), + country = COALESCE(EXCLUDED.country, exchanges.country), + currency = COALESCE(EXCLUDED.currency, exchanges.currency) + RETURNING id, code, name, country, currency + `; + + const result = await postgresClient.query(query, [ + providerCode, + providerName || providerCode, + countryCode || 'US', + currency || 'USD', + ]); + + return result.rows[0]; +} + +function getBasicExchangeMapping(providerCode: string): string | null { + const mappings: Record = { + NYE: 'NYSE', + NAS: 'NASDAQ', + TO: 'TSX', + LN: 'LSE', + LON: 'LSE', + }; + + return mappings[providerCode.toUpperCase()] || null; +} + +async function findProviderExchangeMapping(provider: string, providerExchangeCode: string): Promise { + const postgresClient = getPostgreSQLClient(); + const query = 'SELECT * FROM provider_exchange_mappings WHERE provider = $1 AND provider_exchange_code = $2'; + const result = await postgresClient.query(query, [provider, providerExchangeCode]); + return result.rows[0] || null; +} + +async function findExchangeByCode(code: string): Promise { + const postgresClient = getPostgreSQLClient(); + const query = 'SELECT * FROM exchanges WHERE code = $1'; + const result = await postgresClient.query(query, [code]); + return result.rows[0] || null; +} + +async function updateSyncStatus(provider: string, dataType: string, count: number, postgresClient: any): Promise { + const query = ` + INSERT INTO sync_status (provider, data_type, last_sync_at, last_sync_count, sync_errors) + VALUES ($1, $2, NOW(), $3, NULL) + ON CONFLICT (provider, data_type) + DO UPDATE SET + last_sync_at = NOW(), + last_sync_count = EXCLUDED.last_sync_count, + sync_errors = NULL, + updated_at = NOW() + `; + + await postgresClient.query(query, [provider, dataType, count]); +} + +function mergeResults(target: SyncResult, source: SyncResult): void { + target.processed += source.processed; + target.created += source.created; + target.updated += source.updated; + target.skipped += source.skipped; + target.errors += source.errors; +} \ No newline at end of file diff --git a/apps/data-service/src/handlers/exchange-sync/exchange-sync.handler.ts b/apps/data-sync-service/src/handlers/exchanges/operations/sync-ib-exchanges.operations.ts similarity index 66% rename from apps/data-service/src/handlers/exchange-sync/exchange-sync.handler.ts rename to apps/data-sync-service/src/handlers/exchanges/operations/sync-ib-exchanges.operations.ts index fc60d80..3f924e6 100644 --- a/apps/data-service/src/handlers/exchange-sync/exchange-sync.handler.ts +++ b/apps/data-sync-service/src/handlers/exchanges/operations/sync-ib-exchanges.operations.ts @@ -1,263 +1,206 @@ -/** - * Exchange Sync Provider - Queue provider for syncing IB exchanges to master records - */ -import { getLogger } from '@stock-bot/logger'; -import type { MasterExchange } from '@stock-bot/mongodb-client'; -import type { HandlerConfigWithSchedule } from '@stock-bot/queue'; -import { createJobHandler, handlerRegistry } from '@stock-bot/queue'; -import type { IBExchange } from '../types/exchange.types'; - -const logger = getLogger('exchange-sync'); - -export function initializeExchangeSyncProvider() { - logger.info('Registering exchange sync provider...'); - - const exchangeSyncConfig: HandlerConfigWithSchedule = { - name: 'exchange-sync', - - operations: { - 'sync-ib-exchanges': createJobHandler(async () => { - logger.info('Syncing IB exchanges to master table'); - return await syncIBExchanges(); - }), - - 'get-master-exchange': createJobHandler(async (payload: { masterExchangeId: string }) => { - logger.debug('Getting master exchange details', payload); - const exchange = await getMasterExchangeDetails(payload.masterExchangeId); - return { exchange, ...payload }; - }), - }, - - scheduledJobs: [ - { - type: 'exchange-sync-daily', - operation: 'sync-ib-exchanges', - cronPattern: '0 3 * * *', // Daily at 3 AM - priority: 3, - description: 'Daily sync of IB exchanges to master table', - immediately: true, // Run on startup to test - }, - ], - }; - - handlerRegistry.registerWithSchedule(exchangeSyncConfig); - logger.info('Exchange sync provider registered successfully'); -} - -/** - * Sync IB exchanges from actual ibExchanges table - creates 1:1 master records - */ -async function syncIBExchanges(): Promise<{ syncedCount: number; totalExchanges: number }> { - logger.info('Syncing IB exchanges from database...'); - - try { - const { connectMongoDB, getDatabase } = await import('@stock-bot/mongodb-client'); - - // Ensure MongoDB client is connected - await connectMongoDB(); - - const db = getDatabase(); - - // Filter by country code US and CA - const ibExchanges = await db - .collection('ibExchanges') - .find({ - country_code: { $in: ['US', 'CA'] }, - }) - .toArray(); - - logger.info('Found IB exchanges in database', { count: ibExchanges.length }); - - let syncedCount = 0; - - for (const exchange of ibExchanges) { - try { - await createOrUpdateMasterExchange(exchange); - syncedCount++; - - logger.debug('Synced IB exchange', { - ibId: exchange.id, - country: exchange.country_code, - }); - } catch (error) { - logger.error('Failed to sync IB exchange', { exchange: exchange.id, error }); - } - } - - logger.info('IB exchange sync completed', { - syncedCount, - totalExchanges: ibExchanges.length, - }); - - return { syncedCount, totalExchanges: ibExchanges.length }; - } catch (error) { - logger.error('Failed to fetch IB exchanges from database', { error }); - return { syncedCount: 0, totalExchanges: 0 }; - } -} - -/** - * Create or update master exchange record 1:1 from IB exchange - */ -async function createOrUpdateMasterExchange(ibExchange: IBExchange): Promise { - const { connectMongoDB, getDatabase } = await import('@stock-bot/mongodb-client'); - - await connectMongoDB(); - const db = getDatabase(); - const collection = db.collection('masterExchanges'); - - const masterExchangeId = generateMasterExchangeId(ibExchange); - const now = new Date(); - - // Check if master exchange already exists - const existing = await collection.findOne({ masterExchangeId }); - - if (existing) { - // Update existing record - await collection.updateOne( - { masterExchangeId }, - { - $set: { - officialName: ibExchange.name || `Exchange ${ibExchange.id}`, - country: ibExchange.country_code || 'UNKNOWN', - currency: ibExchange.currency || 'USD', - timezone: inferTimezone(ibExchange), - updated_at: now, - }, - } - ); - - logger.debug('Updated existing master exchange', { masterExchangeId }); - } else { - // Create new master exchange - const masterExchange: MasterExchange = { - masterExchangeId, - shortName: masterExchangeId, // Set shortName to masterExchangeId on creation - officialName: ibExchange.name || `Exchange ${ibExchange.id}`, - country: ibExchange.country_code || 'UNKNOWN', - currency: ibExchange.currency || 'USD', - timezone: inferTimezone(ibExchange), - active: false, // Set active to false only on creation - - sourceMappings: { - ib: { - id: ibExchange.id || ibExchange._id?.toString() || 'unknown', - name: ibExchange.name || `Exchange ${ibExchange.id}`, - code: ibExchange.code || ibExchange.id || '', - aliases: generateAliases(ibExchange), - lastUpdated: now, - }, - }, - - confidence: 1.0, // High confidence for direct IB mapping - verified: true, // Mark as verified since it's direct from IB - - // DocumentBase fields - source: 'exchange-sync-provider', - created_at: now, - updated_at: now, - }; - - await collection.insertOne(masterExchange); - logger.debug('Created new master exchange', { masterExchangeId }); - } -} - -/** - * Get master exchange details - */ -async function getMasterExchangeDetails(masterExchangeId: string): Promise { - try { - const { connectMongoDB, getDatabase } = await import('@stock-bot/mongodb-client'); - - await connectMongoDB(); - const db = getDatabase(); - const collection = db.collection('masterExchanges'); - - return await collection.findOne({ masterExchangeId }); - } catch (error) { - logger.error('Error getting master exchange details', { masterExchangeId, error }); - return null; - } -} - -/** - * Generate master exchange ID from IB exchange - */ -function generateMasterExchangeId(ibExchange: IBExchange): string { - // Use code if available, otherwise use ID, otherwise generate from name - if (ibExchange.code) { - return ibExchange.code.toUpperCase().replace(/[^A-Z0-9]/g, ''); - } - - if (ibExchange.id) { - return ibExchange.id.toUpperCase().replace(/[^A-Z0-9]/g, ''); - } - - if (ibExchange.name) { - return ibExchange.name - .toUpperCase() - .split(' ') - .slice(0, 2) - .join('_') - .replace(/[^A-Z0-9_]/g, ''); - } - - return 'UNKNOWN_EXCHANGE'; -} - -/** - * Generate aliases for the exchange - */ -function generateAliases(ibExchange: IBExchange): string[] { - const aliases: string[] = []; - - if (ibExchange.name && ibExchange.name.includes(' ')) { - // Add abbreviated version - aliases.push( - ibExchange.name - .split(' ') - .map(w => w[0]) - .join('') - .toUpperCase() - ); - } - - if (ibExchange.code) { - aliases.push(ibExchange.code.toUpperCase()); - } - - return aliases; -} - -/** - * Infer timezone from exchange name/location - */ -function inferTimezone(ibExchange: IBExchange): string { - if (!ibExchange.name) { - return 'UTC'; - } - - const name = ibExchange.name.toUpperCase(); - - if (name.includes('NEW YORK') || name.includes('NYSE') || name.includes('NASDAQ')) { - return 'America/New_York'; - } - if (name.includes('LONDON')) { - return 'Europe/London'; - } - if (name.includes('TOKYO')) { - return 'Asia/Tokyo'; - } - if (name.includes('SHANGHAI')) { - return 'Asia/Shanghai'; - } - if (name.includes('TORONTO')) { - return 'America/Toronto'; - } - if (name.includes('FRANKFURT')) { - return 'Europe/Berlin'; - } - - return 'UTC'; // Default -} +import { getLogger } from '@stock-bot/logger'; +import { getMongoDBClient } from '@stock-bot/mongodb-client'; +import type { JobPayload } from '../../../types/job-payloads'; +import type { MasterExchange } from '@stock-bot/mongodb-client'; + +const logger = getLogger('sync-ib-exchanges'); + +interface IBExchange { + id?: string; + _id?: any; + name?: string; + code?: string; + country_code?: string; + currency?: string; +} + +export async function syncIBExchanges(payload: JobPayload): Promise<{ syncedCount: number; totalExchanges: number }> { + logger.info('Syncing IB exchanges from database...'); + + try { + const mongoClient = getMongoDBClient(); + const db = mongoClient.getDatabase(); + + // Filter by country code US and CA + const ibExchanges = await db + .collection('ibExchanges') + .find({ + country_code: { $in: ['US', 'CA'] }, + }) + .toArray(); + + logger.info('Found IB exchanges in database', { count: ibExchanges.length }); + + let syncedCount = 0; + + for (const exchange of ibExchanges) { + try { + await createOrUpdateMasterExchange(exchange); + syncedCount++; + + logger.debug('Synced IB exchange', { + ibId: exchange.id, + country: exchange.country_code, + }); + } catch (error) { + logger.error('Failed to sync IB exchange', { exchange: exchange.id, error }); + } + } + + logger.info('IB exchange sync completed', { + syncedCount, + totalExchanges: ibExchanges.length, + }); + + return { syncedCount, totalExchanges: ibExchanges.length }; + } catch (error) { + logger.error('Failed to fetch IB exchanges from database', { error }); + return { syncedCount: 0, totalExchanges: 0 }; + } +} + +/** + * Create or update master exchange record 1:1 from IB exchange + */ +async function createOrUpdateMasterExchange(ibExchange: IBExchange): Promise { + const mongoClient = getMongoDBClient(); + const db = mongoClient.getDatabase(); + const collection = db.collection('masterExchanges'); + + const masterExchangeId = generateMasterExchangeId(ibExchange); + const now = new Date(); + + // Check if master exchange already exists + const existing = await collection.findOne({ masterExchangeId }); + + if (existing) { + // Update existing record + await collection.updateOne( + { masterExchangeId }, + { + $set: { + officialName: ibExchange.name || `Exchange ${ibExchange.id}`, + country: ibExchange.country_code || 'UNKNOWN', + currency: ibExchange.currency || 'USD', + timezone: inferTimezone(ibExchange), + updated_at: now, + }, + } + ); + + logger.debug('Updated existing master exchange', { masterExchangeId }); + } else { + // Create new master exchange + const masterExchange: MasterExchange = { + masterExchangeId, + shortName: masterExchangeId, // Set shortName to masterExchangeId on creation + officialName: ibExchange.name || `Exchange ${ibExchange.id}`, + country: ibExchange.country_code || 'UNKNOWN', + currency: ibExchange.currency || 'USD', + timezone: inferTimezone(ibExchange), + active: false, // Set active to false only on creation + + sourceMappings: { + ib: { + id: ibExchange.id || ibExchange._id?.toString() || 'unknown', + name: ibExchange.name || `Exchange ${ibExchange.id}`, + code: ibExchange.code || ibExchange.id || '', + aliases: generateAliases(ibExchange), + lastUpdated: now, + }, + }, + + confidence: 1.0, // High confidence for direct IB mapping + verified: true, // Mark as verified since it's direct from IB + + // DocumentBase fields + source: 'ib-exchange-sync', + created_at: now, + updated_at: now, + }; + + await collection.insertOne(masterExchange); + logger.debug('Created new master exchange', { masterExchangeId }); + } +} + +/** + * Generate master exchange ID from IB exchange + */ +function generateMasterExchangeId(ibExchange: IBExchange): string { + // Use code if available, otherwise use ID, otherwise generate from name + if (ibExchange.code) { + return ibExchange.code.toUpperCase().replace(/[^A-Z0-9]/g, ''); + } + + if (ibExchange.id) { + return ibExchange.id.toUpperCase().replace(/[^A-Z0-9]/g, ''); + } + + if (ibExchange.name) { + return ibExchange.name + .toUpperCase() + .split(' ') + .slice(0, 2) + .join('_') + .replace(/[^A-Z0-9_]/g, ''); + } + + return 'UNKNOWN_EXCHANGE'; +} + +/** + * Generate aliases for the exchange + */ +function generateAliases(ibExchange: IBExchange): string[] { + const aliases: string[] = []; + + if (ibExchange.name && ibExchange.name.includes(' ')) { + // Add abbreviated version + aliases.push( + ibExchange.name + .split(' ') + .map(w => w[0]) + .join('') + .toUpperCase() + ); + } + + if (ibExchange.code) { + aliases.push(ibExchange.code.toUpperCase()); + } + + return aliases; +} + +/** + * Infer timezone from exchange name/location + */ +function inferTimezone(ibExchange: IBExchange): string { + if (!ibExchange.name) { + return 'UTC'; + } + + const name = ibExchange.name.toUpperCase(); + + if (name.includes('NEW YORK') || name.includes('NYSE') || name.includes('NASDAQ')) { + return 'America/New_York'; + } + if (name.includes('LONDON')) { + return 'Europe/London'; + } + if (name.includes('TOKYO')) { + return 'Asia/Tokyo'; + } + if (name.includes('SHANGHAI')) { + return 'Asia/Shanghai'; + } + if (name.includes('TORONTO')) { + return 'America/Toronto'; + } + if (name.includes('FRANKFURT')) { + return 'Europe/Berlin'; + } + + return 'UTC'; // Default +} \ No newline at end of file diff --git a/apps/data-sync-service/src/handlers/exchanges/operations/sync-qm-provider-mappings.operations.ts b/apps/data-sync-service/src/handlers/exchanges/operations/sync-qm-provider-mappings.operations.ts new file mode 100644 index 0000000..73a7107 --- /dev/null +++ b/apps/data-sync-service/src/handlers/exchanges/operations/sync-qm-provider-mappings.operations.ts @@ -0,0 +1,204 @@ +import { getLogger } from '@stock-bot/logger'; +import { getMongoDBClient } from "@stock-bot/mongodb-client"; +import { getPostgreSQLClient } from '@stock-bot/postgres-client'; +import type { JobPayload, SyncResult } from '../../../types/job-payloads'; + +const logger = getLogger('enhanced-sync-qm-provider-mappings'); + +export async function syncQMProviderMappings(payload: JobPayload): Promise { + logger.info('Starting QM provider exchange mappings sync...'); + + const result: SyncResult = { + processed: 0, + created: 0, + updated: 0, + skipped: 0, + errors: 0, + }; + + try { + const mongoClient = getMongoDBClient(); + const postgresClient = getPostgreSQLClient(); + + // Start transaction + await postgresClient.query('BEGIN'); + + // Get unique exchange combinations from QM symbols + const db = mongoClient.getDatabase(); + const pipeline = [ + { + $group: { + _id: { + exchangeCode: '$exchangeCode', + exchange: '$exchange', + countryCode: '$countryCode', + }, + count: { $sum: 1 }, + sampleExchange: { $first: '$exchange' }, + }, + }, + { + $project: { + exchangeCode: '$_id.exchangeCode', + exchange: '$_id.exchange', + countryCode: '$_id.countryCode', + count: 1, + sampleExchange: 1, + }, + }, + ]; + + const qmExchanges = await db.collection('qmSymbols').aggregate(pipeline).toArray(); + logger.info(`Found ${qmExchanges.length} unique QM exchange combinations`); + + for (const exchange of qmExchanges) { + try { + // Create provider exchange mapping for QM + await createProviderExchangeMapping( + 'qm', // provider + exchange.exchangeCode, + exchange.sampleExchange || exchange.exchangeCode, + exchange.countryCode, + exchange.countryCode === 'CA' ? 'CAD' : 'USD', // Simple currency mapping + 0.8 // good confidence for QM data + ); + + result.processed++; + result.created++; + } catch (error) { + logger.error('Failed to process QM exchange mapping', { error, exchange }); + result.errors++; + } + } + + await postgresClient.query('COMMIT'); + + logger.info('QM provider exchange mappings sync completed', result); + return result; + } catch (error) { + const postgresClient = getPostgreSQLClient(); + await postgresClient.query('ROLLBACK'); + logger.error('QM provider exchange mappings sync failed', { error }); + throw error; + } +} + +async function createProviderExchangeMapping( + provider: string, + providerExchangeCode: string, + providerExchangeName: string, + countryCode: string | null, + currency: string | null, + confidence: number +): Promise { + if (!providerExchangeCode) { + return; + } + + const postgresClient = getPostgreSQLClient(); + + // Check if mapping already exists + const existingMapping = await findProviderExchangeMapping(provider, providerExchangeCode); + if (existingMapping) { + // Don't override existing mappings to preserve manual work + return; + } + + // Find or create master exchange + const masterExchange = await findOrCreateMasterExchange( + providerExchangeCode, + providerExchangeName, + countryCode, + currency + ); + + // Create the provider exchange mapping + const query = ` + INSERT INTO provider_exchange_mappings + (provider, provider_exchange_code, provider_exchange_name, master_exchange_id, + country_code, currency, confidence, active, auto_mapped) + VALUES ($1, $2, $3, $4, $5, $6, $7, false, true) + ON CONFLICT (provider, provider_exchange_code) DO NOTHING + `; + + await postgresClient.query(query, [ + provider, + providerExchangeCode, + providerExchangeName, + masterExchange.id, + countryCode, + currency, + confidence, + ]); +} + +async function findProviderExchangeMapping(provider: string, providerExchangeCode: string): Promise { + const postgresClient = getPostgreSQLClient(); + const query = 'SELECT * FROM provider_exchange_mappings WHERE provider = $1 AND provider_exchange_code = $2'; + const result = await postgresClient.query(query, [provider, providerExchangeCode]); + return result.rows[0] || null; +} + +async function findOrCreateMasterExchange( + providerCode: string, + providerName: string, + countryCode: string | null, + currency: string | null +): Promise { + const postgresClient = getPostgreSQLClient(); + + // First, try to find exact match + let masterExchange = await findExchangeByCode(providerCode); + + if (masterExchange) { + return masterExchange; + } + + // Try to find by similar codes (basic mapping) + const basicMapping = getBasicExchangeMapping(providerCode); + if (basicMapping) { + masterExchange = await findExchangeByCode(basicMapping); + if (masterExchange) { + return masterExchange; + } + } + + // Create new master exchange (inactive by default) + const query = ` + INSERT INTO exchanges (code, name, country, currency, active) + VALUES ($1, $2, $3, $4, false) + ON CONFLICT (code) DO UPDATE SET + name = COALESCE(EXCLUDED.name, exchanges.name), + country = COALESCE(EXCLUDED.country, exchanges.country), + currency = COALESCE(EXCLUDED.currency, exchanges.currency) + RETURNING id, code, name, country, currency + `; + + const result = await postgresClient.query(query, [ + providerCode, + providerName || providerCode, + countryCode || 'US', + currency || 'USD', + ]); + + return result.rows[0]; +} + +function getBasicExchangeMapping(providerCode: string): string | null { + const mappings: Record = { + NYE: 'NYSE', + NAS: 'NASDAQ', + TO: 'TSX', + LN: 'LSE', + LON: 'LSE', + }; + + return mappings[providerCode.toUpperCase()] || null; +} + +async function findExchangeByCode(code: string): Promise { + const postgresClient = getPostgreSQLClient(); + const query = 'SELECT * FROM exchanges WHERE code = $1'; + const result = await postgresClient.query(query, [code]); + return result.rows[0] || null; +} \ No newline at end of file diff --git a/apps/data-sync-service/src/handlers/symbols/operations/index.ts b/apps/data-sync-service/src/handlers/symbols/operations/index.ts new file mode 100644 index 0000000..378fdd1 --- /dev/null +++ b/apps/data-sync-service/src/handlers/symbols/operations/index.ts @@ -0,0 +1,9 @@ +import { syncQMSymbols } from './qm-symbols.operations'; +import { syncSymbolsFromProvider } from './sync-symbols-from-provider.operations'; +import { getSyncStatus } from './sync-status.operations'; + +export const symbolOperations = { + syncQMSymbols, + syncSymbolsFromProvider, + getSyncStatus, +}; \ No newline at end of file diff --git a/apps/data-sync-service/src/handlers/symbols/operations/qm-symbols.operations.ts b/apps/data-sync-service/src/handlers/symbols/operations/qm-symbols.operations.ts new file mode 100644 index 0000000..6f86841 --- /dev/null +++ b/apps/data-sync-service/src/handlers/symbols/operations/qm-symbols.operations.ts @@ -0,0 +1,168 @@ +import { getLogger } from '@stock-bot/logger'; +import { getMongoDBClient } from '@stock-bot/mongodb-client'; +import { getPostgreSQLClient } from '@stock-bot/postgres-client'; +import type { JobPayload } from '../../../types/job-payloads'; + +const logger = getLogger('sync-qm-symbols'); + +export async function syncQMSymbols(payload: JobPayload): Promise<{ processed: number; created: number; updated: number }> { + logger.info('Starting QM symbols sync...'); + + try { + const mongoClient = getMongoDBClient(); + const postgresClient = getPostgreSQLClient(); + + // 1. Get all QM symbols from MongoDB + const qmSymbols = await mongoClient.find('qmSymbols', {}); + logger.info(`Found ${qmSymbols.length} QM symbols to process`); + + let created = 0; + let updated = 0; + + for (const symbol of qmSymbols) { + try { + // 2. Resolve exchange + const exchangeId = await resolveExchange(symbol.exchangeCode || symbol.exchange, postgresClient); + + if (!exchangeId) { + logger.warn('Unknown exchange, skipping symbol', { + symbol: symbol.symbol, + exchange: symbol.exchangeCode || symbol.exchange, + }); + continue; + } + + // 3. Check if symbol exists + const existingSymbol = await findSymbol(symbol.symbol, exchangeId, postgresClient); + + if (existingSymbol) { + // Update existing + await updateSymbol(existingSymbol.id, symbol, postgresClient); + await upsertProviderMapping(existingSymbol.id, 'qm', symbol, postgresClient); + updated++; + } else { + // Create new + const newSymbolId = await createSymbol(symbol, exchangeId, postgresClient); + await upsertProviderMapping(newSymbolId, 'qm', symbol, postgresClient); + created++; + } + } catch (error) { + logger.error('Failed to process symbol', { error, symbol: symbol.symbol }); + } + } + + // 4. Update sync status + await updateSyncStatus('qm', 'symbols', qmSymbols.length, postgresClient); + + const result = { processed: qmSymbols.length, created, updated }; + logger.info('QM symbols sync completed', result); + return result; + } catch (error) { + logger.error('QM symbols sync failed', { error }); + throw error; + } +} + +// Helper functions +async function resolveExchange(exchangeCode: string, postgresClient: any): Promise { + if (!exchangeCode) return null; + + // Simple mapping - expand this as needed + const exchangeMap: Record = { + NASDAQ: 'NASDAQ', + NYSE: 'NYSE', + TSX: 'TSX', + TSE: 'TSX', // TSE maps to TSX + LSE: 'LSE', + CME: 'CME', + }; + + const normalizedCode = exchangeMap[exchangeCode.toUpperCase()]; + if (!normalizedCode) { + return null; + } + + const query = 'SELECT id FROM exchanges WHERE code = $1'; + const result = await postgresClient.query(query, [normalizedCode]); + return result.rows[0]?.id || null; +} + +async function findSymbol(symbol: string, exchangeId: string, postgresClient: any): Promise { + const query = 'SELECT * FROM symbols WHERE symbol = $1 AND exchange_id = $2'; + const result = await postgresClient.query(query, [symbol, exchangeId]); + return result.rows[0] || null; +} + +async function createSymbol(qmSymbol: any, exchangeId: string, postgresClient: any): Promise { + const query = ` + INSERT INTO symbols (symbol, exchange_id, company_name, country, currency) + VALUES ($1, $2, $3, $4, $5) + RETURNING id + `; + + const result = await postgresClient.query(query, [ + qmSymbol.symbol, + exchangeId, + qmSymbol.companyName || qmSymbol.name, + qmSymbol.countryCode || 'US', + qmSymbol.currency || 'USD', + ]); + + return result.rows[0].id; +} + +async function updateSymbol(symbolId: string, qmSymbol: any, postgresClient: any): Promise { + const query = ` + UPDATE symbols + SET company_name = COALESCE($2, company_name), + country = COALESCE($3, country), + currency = COALESCE($4, currency), + updated_at = NOW() + WHERE id = $1 + `; + + await postgresClient.query(query, [ + symbolId, + qmSymbol.companyName || qmSymbol.name, + qmSymbol.countryCode, + qmSymbol.currency, + ]); +} + +async function upsertProviderMapping( + symbolId: string, + provider: string, + qmSymbol: any, + postgresClient: any +): Promise { + const query = ` + INSERT INTO provider_mappings + (symbol_id, provider, provider_symbol, provider_exchange, last_seen) + VALUES ($1, $2, $3, $4, NOW()) + ON CONFLICT (provider, provider_symbol) + DO UPDATE SET + symbol_id = EXCLUDED.symbol_id, + provider_exchange = EXCLUDED.provider_exchange, + last_seen = NOW() + `; + + await postgresClient.query(query, [ + symbolId, + provider, + qmSymbol.qmSearchCode || qmSymbol.symbol, + qmSymbol.exchangeCode || qmSymbol.exchange, + ]); +} + +async function updateSyncStatus(provider: string, dataType: string, count: number, postgresClient: any): Promise { + const query = ` + UPDATE sync_status + SET last_sync_at = NOW(), + last_sync_count = $3, + sync_errors = NULL, + updated_at = NOW() + WHERE provider = $1 AND data_type = $2 + `; + + await postgresClient.query(query, [provider, dataType, count]); +} \ No newline at end of file diff --git a/apps/data-sync-service/src/handlers/symbols/operations/sync-status.operations.ts b/apps/data-sync-service/src/handlers/symbols/operations/sync-status.operations.ts new file mode 100644 index 0000000..768b199 --- /dev/null +++ b/apps/data-sync-service/src/handlers/symbols/operations/sync-status.operations.ts @@ -0,0 +1,21 @@ +import { getLogger } from '@stock-bot/logger'; +import { getPostgreSQLClient } from '@stock-bot/postgres-client'; +import type { JobPayload } from '../../../types/job-payloads'; + +const logger = getLogger('sync-status'); + +export async function getSyncStatus(payload: JobPayload): Promise[]> { + logger.info('Getting sync status...'); + + try { + const postgresClient = getPostgreSQLClient(); + const query = 'SELECT * FROM sync_status ORDER BY provider, data_type'; + const result = await postgresClient.query(query); + + logger.info(`Retrieved sync status for ${result.rows.length} entries`); + return result.rows; + } catch (error) { + logger.error('Failed to get sync status', { error }); + throw error; + } +} \ No newline at end of file diff --git a/apps/data-sync-service/src/handlers/symbols/operations/sync-symbols-from-provider.operations.ts b/apps/data-sync-service/src/handlers/symbols/operations/sync-symbols-from-provider.operations.ts new file mode 100644 index 0000000..7a8b9a6 --- /dev/null +++ b/apps/data-sync-service/src/handlers/symbols/operations/sync-symbols-from-provider.operations.ts @@ -0,0 +1,216 @@ +import { getLogger } from '@stock-bot/logger'; +import { getMongoDBClient } from "@stock-bot/mongodb-client"; +import { getPostgreSQLClient } from '@stock-bot/postgres-client'; +import type { JobPayload, SyncResult } from '../../../types/job-payloads'; + +const logger = getLogger('enhanced-sync-symbols-from-provider'); + +export async function syncSymbolsFromProvider(payload: JobPayload): Promise { + const provider = payload.provider; + const clearFirst = payload.clearFirst || false; + + if (!provider) { + throw new Error('Provider is required in payload'); + } + + logger.info(`Starting ${provider} symbols sync...`, { clearFirst }); + + const result: SyncResult = { + processed: 0, + created: 0, + updated: 0, + skipped: 0, + errors: 0, + }; + + try { + const mongoClient = getMongoDBClient(); + const postgresClient = getPostgreSQLClient(); + + // Clear existing data if requested (only symbols and mappings, keep exchanges) + if (clearFirst) { + await postgresClient.query('BEGIN'); + await postgresClient.query('DELETE FROM provider_mappings'); + await postgresClient.query('DELETE FROM symbols'); + await postgresClient.query('COMMIT'); + logger.info('Cleared existing symbols and mappings before sync'); + } + + // Start transaction + await postgresClient.query('BEGIN'); + + let symbols: Record[] = []; + + // Get symbols based on provider + const db = mongoClient.getDatabase(); + switch (provider.toLowerCase()) { + case 'qm': + symbols = await db.collection('qmSymbols').find({}).toArray(); + break; + case 'eod': + symbols = await db.collection('eodSymbols').find({}).toArray(); + break; + case 'ib': + symbols = await db.collection('ibSymbols').find({}).toArray(); + break; + default: + throw new Error(`Unsupported provider: ${provider}`); + } + + logger.info(`Found ${symbols.length} ${provider} symbols to process`); + result.processed = symbols.length; + + for (const symbol of symbols) { + try { + await processSingleSymbol(symbol, provider, result); + } catch (error) { + logger.error('Failed to process symbol', { + error, + symbol: symbol.symbol || symbol.code, + provider, + }); + result.errors++; + } + } + + // Update sync status + await updateSyncStatus(provider, 'symbols', result.processed, postgresClient); + + await postgresClient.query('COMMIT'); + + logger.info(`${provider} symbols sync completed`, result); + return result; + } catch (error) { + const postgresClient = getPostgreSQLClient(); + await postgresClient.query('ROLLBACK'); + logger.error(`${provider} symbols sync failed`, { error }); + throw error; + } +} + +async function processSingleSymbol(symbol: any, provider: string, result: SyncResult): Promise { + const symbolCode = symbol.symbol || symbol.code; + const exchangeCode = symbol.exchangeCode || symbol.exchange || symbol.exchange_id; + + if (!symbolCode || !exchangeCode) { + result.skipped++; + return; + } + + // Find active provider exchange mapping + const providerMapping = await findActiveProviderExchangeMapping(provider, exchangeCode); + + if (!providerMapping) { + result.skipped++; + return; + } + + // Check if symbol exists + const existingSymbol = await findSymbolByCodeAndExchange( + symbolCode, + providerMapping.master_exchange_id + ); + + if (existingSymbol) { + await updateSymbol(existingSymbol.id, symbol); + await upsertProviderMapping(existingSymbol.id, provider, symbol); + result.updated++; + } else { + const newSymbolId = await createSymbol(symbol, providerMapping.master_exchange_id); + await upsertProviderMapping(newSymbolId, provider, symbol); + result.created++; + } +} + +async function findActiveProviderExchangeMapping(provider: string, providerExchangeCode: string): Promise { + const postgresClient = getPostgreSQLClient(); + const query = ` + SELECT pem.*, e.code as master_exchange_code + FROM provider_exchange_mappings pem + JOIN exchanges e ON pem.master_exchange_id = e.id + WHERE pem.provider = $1 AND pem.provider_exchange_code = $2 AND pem.active = true + `; + const result = await postgresClient.query(query, [provider, providerExchangeCode]); + return result.rows[0] || null; +} + +async function findSymbolByCodeAndExchange(symbol: string, exchangeId: string): Promise { + const postgresClient = getPostgreSQLClient(); + const query = 'SELECT * FROM symbols WHERE symbol = $1 AND exchange_id = $2'; + const result = await postgresClient.query(query, [symbol, exchangeId]); + return result.rows[0] || null; +} + +async function createSymbol(symbol: any, exchangeId: string): Promise { + const postgresClient = getPostgreSQLClient(); + const query = ` + INSERT INTO symbols (symbol, exchange_id, company_name, country, currency) + VALUES ($1, $2, $3, $4, $5) + RETURNING id + `; + + const result = await postgresClient.query(query, [ + symbol.symbol || symbol.code, + exchangeId, + symbol.companyName || symbol.name || symbol.company_name, + symbol.countryCode || symbol.country_code || 'US', + symbol.currency || 'USD', + ]); + + return result.rows[0].id; +} + +async function updateSymbol(symbolId: string, symbol: any): Promise { + const postgresClient = getPostgreSQLClient(); + const query = ` + UPDATE symbols + SET company_name = COALESCE($2, company_name), + country = COALESCE($3, country), + currency = COALESCE($4, currency), + updated_at = NOW() + WHERE id = $1 + `; + + await postgresClient.query(query, [ + symbolId, + symbol.companyName || symbol.name || symbol.company_name, + symbol.countryCode || symbol.country_code, + symbol.currency, + ]); +} + +async function upsertProviderMapping(symbolId: string, provider: string, symbol: any): Promise { + const postgresClient = getPostgreSQLClient(); + const query = ` + INSERT INTO provider_mappings + (symbol_id, provider, provider_symbol, provider_exchange, last_seen) + VALUES ($1, $2, $3, $4, NOW()) + ON CONFLICT (provider, provider_symbol) + DO UPDATE SET + symbol_id = EXCLUDED.symbol_id, + provider_exchange = EXCLUDED.provider_exchange, + last_seen = NOW() + `; + + await postgresClient.query(query, [ + symbolId, + provider, + symbol.qmSearchCode || symbol.symbol || symbol.code, + symbol.exchangeCode || symbol.exchange || symbol.exchange_id, + ]); +} + +async function updateSyncStatus(provider: string, dataType: string, count: number, postgresClient: any): Promise { + const query = ` + INSERT INTO sync_status (provider, data_type, last_sync_at, last_sync_count, sync_errors) + VALUES ($1, $2, NOW(), $3, NULL) + ON CONFLICT (provider, data_type) + DO UPDATE SET + last_sync_at = NOW(), + last_sync_count = EXCLUDED.last_sync_count, + sync_errors = NULL, + updated_at = NOW() + `; + + await postgresClient.query(query, [provider, dataType, count]); +} \ No newline at end of file diff --git a/apps/data-sync-service/src/handlers/symbols/symbols.handler.ts b/apps/data-sync-service/src/handlers/symbols/symbols.handler.ts new file mode 100644 index 0000000..6fdd17f --- /dev/null +++ b/apps/data-sync-service/src/handlers/symbols/symbols.handler.ts @@ -0,0 +1,41 @@ +import { getLogger } from '@stock-bot/logger'; +import { handlerRegistry, type HandlerConfig, type ScheduledJobConfig } from '@stock-bot/queue'; +import { symbolOperations } from './operations'; + +const logger = getLogger('symbols-handler'); + +const HANDLER_NAME = 'symbols'; + +const symbolsHandlerConfig: HandlerConfig = { + concurrency: 1, + maxAttempts: 3, + scheduledJobs: [ + { + operation: 'sync-qm-symbols', + cronPattern: '0 2 * * *', // Daily at 2 AM + payload: {}, + priority: 5, + immediately: false, + } as ScheduledJobConfig, + { + operation: 'sync-symbols-qm', + cronPattern: '0 4 * * *', // Daily at 4 AM + payload: { provider: 'qm', clearFirst: false }, + priority: 5, + immediately: false, + } as ScheduledJobConfig, + ], + operations: { + 'sync-qm-symbols': symbolOperations.syncQMSymbols, + 'sync-symbols-qm': symbolOperations.syncSymbolsFromProvider, + 'sync-symbols-eod': symbolOperations.syncSymbolsFromProvider, + 'sync-symbols-ib': symbolOperations.syncSymbolsFromProvider, + 'sync-status': symbolOperations.getSyncStatus, + }, +}; + +export function initializeSymbolsHandler(): void { + logger.info('Registering symbols handler...'); + handlerRegistry.registerHandler(HANDLER_NAME, symbolsHandlerConfig); + logger.info('Symbols handler registered successfully'); +} \ No newline at end of file diff --git a/apps/data-sync-service/src/index.ts b/apps/data-sync-service/src/index.ts index b175564..cf0ebaf 100644 --- a/apps/data-sync-service/src/index.ts +++ b/apps/data-sync-service/src/index.ts @@ -1,38 +1,35 @@ -/** - * Data Sync Service - Sync raw MongoDB data to PostgreSQL master records - */ - // Framework imports +import { initializeServiceConfig } from '@stock-bot/config'; import { Hono } from 'hono'; import { cors } from 'hono/cors'; // Library imports -import { initializeServiceConfig } from '@stock-bot/config'; import { getLogger, setLoggerConfig, shutdownLoggers } from '@stock-bot/logger'; -import { createAndConnectMongoDBClient, MongoDBClient } from '@stock-bot/mongodb-client'; -import { createAndConnectPostgreSQLClient, PostgreSQLClient } from '@stock-bot/postgres-client'; +import { connectMongoDB } from '@stock-bot/mongodb-client'; +import { connectPostgreSQL } from '@stock-bot/postgres-client'; +import { QueueManager, type QueueManagerConfig } from '@stock-bot/queue'; import { Shutdown } from '@stock-bot/shutdown'; -import { enhancedSyncManager } from './services/enhanced-sync-manager'; -import { syncManager } from './services/sync-manager'; // Local imports -import { setMongoDBClient, setPostgreSQLClient } from './clients'; -import { enhancedSyncRoutes, healthRoutes, statsRoutes, syncRoutes } from './routes'; +import { healthRoutes, enhancedSyncRoutes, statsRoutes, syncRoutes } from './routes'; -// Initialize configuration with automatic monorepo config inheritance -const config = await initializeServiceConfig(); +const config = initializeServiceConfig(); +console.log('Data Sync Service Configuration:', JSON.stringify(config, null, 2)); const serviceConfig = config.service; const databaseConfig = config.database; +const queueConfig = config.queue; -// Initialize logger with config -const loggingConfig = config.logging; -if (loggingConfig) { +if (config.log) { setLoggerConfig({ - logLevel: loggingConfig.level, + logLevel: config.log.level, logConsole: true, logFile: false, environment: config.environment, + hideObject: config.log.hideObject, }); } +// Create logger AFTER config is set +const logger = getLogger('data-sync-service'); + const app = new Hono(); // Add CORS middleware @@ -40,17 +37,15 @@ app.use( '*', cors({ origin: '*', - allowMethods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'], + allowMethods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS', 'PATCH'], allowHeaders: ['Content-Type', 'Authorization'], credentials: false, }) ); - -const logger = getLogger('data-sync-service'); const PORT = serviceConfig.port; let server: ReturnType | null = null; -let postgresClient: PostgreSQLClient | null = null; -let mongoClient: MongoDBClient | null = null; +// Singleton clients are managed in libraries +let queueManager: QueueManager | null = null; // Initialize shutdown manager const shutdown = Shutdown.getInstance({ timeout: 15000 }); @@ -66,10 +61,10 @@ async function initializeServices() { logger.info('Initializing data sync service...'); try { - // Initialize MongoDB client + // Initialize MongoDB client singleton logger.debug('Connecting to MongoDB...'); const mongoConfig = databaseConfig.mongodb; - mongoClient = await createAndConnectMongoDBClient({ + await connectMongoDB({ uri: mongoConfig.uri, database: mongoConfig.database, host: mongoConfig.host || 'localhost', @@ -80,13 +75,12 @@ async function initializeServices() { serverSelectionTimeout: 5000, }, }); - setMongoDBClient(mongoClient); logger.info('MongoDB connected'); - // Initialize PostgreSQL client + // Initialize PostgreSQL client singleton logger.debug('Connecting to PostgreSQL...'); const pgConfig = databaseConfig.postgres; - postgresClient = await createAndConnectPostgreSQLClient({ + await connectPostgreSQL({ host: pgConfig.host, port: pgConfig.port, database: pgConfig.database, @@ -98,14 +92,98 @@ async function initializeServices() { idleTimeoutMillis: pgConfig.idleTimeout || 30000, }, }); - setPostgreSQLClient(postgresClient); logger.info('PostgreSQL connected'); - // Initialize sync managers - logger.debug('Initializing sync managers...'); - await syncManager.initialize(); - await enhancedSyncManager.initialize(); - logger.info('Sync managers initialized'); + // Initialize queue system (with delayed worker start) + logger.debug('Initializing queue system...'); + const queueManagerConfig: QueueManagerConfig = { + redis: queueConfig?.redis || { + host: 'localhost', + port: 6379, + db: 1, + }, + defaultQueueOptions: { + defaultJobOptions: queueConfig?.defaultJobOptions || { + attempts: 3, + backoff: { + type: 'exponential', + delay: 1000, + }, + removeOnComplete: 10, + removeOnFail: 5, + }, + workers: 2, + concurrency: 1, + enableMetrics: true, + enableDLQ: true, + }, + enableScheduledJobs: true, + delayWorkerStart: true, // Prevent workers from starting until all singletons are ready + }; + + queueManager = QueueManager.getOrInitialize(queueManagerConfig); + logger.info('Queue system initialized'); + + // Initialize handlers (register handlers and scheduled jobs) + logger.debug('Initializing sync handlers...'); + const { initializeExchangesHandler } = await import('./handlers/exchanges/exchanges.handler'); + const { initializeSymbolsHandler } = await import('./handlers/symbols/symbols.handler'); + + initializeExchangesHandler(); + initializeSymbolsHandler(); + + logger.info('Sync handlers initialized'); + + // Create scheduled jobs from registered handlers + logger.debug('Creating scheduled jobs from registered handlers...'); + const { handlerRegistry } = await import('@stock-bot/queue'); + const allHandlers = handlerRegistry.getAllHandlers(); + + let totalScheduledJobs = 0; + for (const [handlerName, config] of allHandlers) { + if (config.scheduledJobs && config.scheduledJobs.length > 0) { + const queue = queueManager.getQueue(handlerName); + + for (const scheduledJob of config.scheduledJobs) { + // Include handler and operation info in job data + const jobData = { + handler: handlerName, + operation: scheduledJob.operation, + payload: scheduledJob.payload || {}, + }; + + // Build job options from scheduled job config + const jobOptions = { + priority: scheduledJob.priority, + delay: scheduledJob.delay, + repeat: { + immediately: scheduledJob.immediately, + }, + }; + + await queue.addScheduledJob( + scheduledJob.operation, + jobData, + scheduledJob.cronPattern, + jobOptions + ); + totalScheduledJobs++; + logger.debug('Scheduled job created', { + handler: handlerName, + operation: scheduledJob.operation, + cronPattern: scheduledJob.cronPattern, + immediately: scheduledJob.immediately, + priority: scheduledJob.priority, + }); + } + } + } + logger.info('Scheduled jobs created', { totalJobs: totalScheduledJobs }); + + // Now that all singletons are initialized and jobs are scheduled, start the workers + logger.debug('Starting queue workers...'); + queueManager.startAllWorkers(); + logger.info('Queue workers started'); logger.info('All services initialized successfully'); } catch (error) { @@ -127,8 +205,22 @@ async function startServer() { logger.info(`Data Sync Service started on port ${PORT}`); } -// Register shutdown handlers -shutdown.onShutdown(async () => { +// Register shutdown handlers with priorities +// Priority 1: Queue system (highest priority) +shutdown.onShutdownHigh(async () => { + logger.info('Shutting down queue system...'); + try { + if (queueManager) { + await queueManager.shutdown(); + } + logger.info('Queue system shut down'); + } catch (error) { + logger.error('Error shutting down queue system', { error }); + } +}, 'Queue System'); + +// Priority 1: HTTP Server (high priority) +shutdown.onShutdownHigh(async () => { if (server) { logger.info('Stopping HTTP server...'); try { @@ -138,42 +230,33 @@ shutdown.onShutdown(async () => { logger.error('Error stopping HTTP server', { error }); } } -}); +}, 'HTTP Server'); -shutdown.onShutdown(async () => { - logger.info('Shutting down sync managers...'); - try { - await syncManager.shutdown(); - await enhancedSyncManager.shutdown(); - logger.info('Sync managers shut down'); - } catch (error) { - logger.error('Error shutting down sync managers', { error }); - } -}); - -shutdown.onShutdown(async () => { +// Priority 2: Database connections (medium priority) +shutdown.onShutdownMedium(async () => { logger.info('Disconnecting from databases...'); try { - if (mongoClient) { - await mongoClient.disconnect(); - } - if (postgresClient) { - await postgresClient.disconnect(); - } + const { disconnectMongoDB } = await import('@stock-bot/mongodb-client'); + const { disconnectPostgreSQL } = await import('@stock-bot/postgres-client'); + + await disconnectMongoDB(); + await disconnectPostgreSQL(); logger.info('Database connections closed'); } catch (error) { logger.error('Error closing database connections', { error }); } -}); +}, 'Databases'); -shutdown.onShutdown(async () => { +// Priority 3: Logger shutdown (lowest priority - runs last) +shutdown.onShutdownLow(async () => { try { + logger.info('Shutting down loggers...'); await shutdownLoggers(); - // process.stdout.write('Data sync service loggers shut down\n'); - } catch (error) { - process.stderr.write(`Error shutting down loggers: ${error}\n`); + // Don't log after shutdown + } catch { + // Silently ignore logger shutdown errors } -}); +}, 'Loggers'); // Start the service startServer().catch(error => { @@ -181,4 +264,4 @@ startServer().catch(error => { process.exit(1); }); -logger.info('Data sync service startup initiated'); +logger.info('Data sync service startup initiated'); \ No newline at end of file diff --git a/apps/data-sync-service/src/routes/enhanced-sync.routes.ts b/apps/data-sync-service/src/routes/enhanced-sync.routes.ts index 60c58bd..ba17805 100644 --- a/apps/data-sync-service/src/routes/enhanced-sync.routes.ts +++ b/apps/data-sync-service/src/routes/enhanced-sync.routes.ts @@ -1,6 +1,6 @@ import { Hono } from 'hono'; import { getLogger } from '@stock-bot/logger'; -import { enhancedSyncManager } from '../services/enhanced-sync-manager'; +import { QueueManager } from '@stock-bot/queue'; const logger = getLogger('enhanced-sync-routes'); const enhancedSync = new Hono(); @@ -9,10 +9,18 @@ const enhancedSync = new Hono(); enhancedSync.post('/exchanges/all', async c => { try { const clearFirst = c.req.query('clear') === 'true'; - const result = await enhancedSyncManager.syncAllExchanges(clearFirst); - return c.json({ success: true, result }); + const queueManager = QueueManager.getInstance(); + const exchangesQueue = queueManager.getQueue('exchanges'); + + const job = await exchangesQueue.addJob('sync-all-exchanges', { + handler: 'exchanges', + operation: 'sync-all-exchanges', + payload: { clearFirst }, + }); + + return c.json({ success: true, jobId: job.id, message: 'Enhanced exchange sync job queued' }); } catch (error) { - logger.error('Enhanced exchange sync failed', { error }); + logger.error('Failed to queue enhanced exchange sync job', { error }); return c.json( { success: false, error: error instanceof Error ? error.message : 'Unknown error' }, 500 @@ -22,10 +30,18 @@ enhancedSync.post('/exchanges/all', async c => { enhancedSync.post('/provider-mappings/qm', async c => { try { - const result = await enhancedSyncManager.syncQMProviderMappings(); - return c.json({ success: true, result }); + const queueManager = QueueManager.getInstance(); + const exchangesQueue = queueManager.getQueue('exchanges'); + + const job = await exchangesQueue.addJob('sync-qm-provider-mappings', { + handler: 'exchanges', + operation: 'sync-qm-provider-mappings', + payload: {}, + }); + + return c.json({ success: true, jobId: job.id, message: 'QM provider mappings sync job queued' }); } catch (error) { - logger.error('QM provider mappings sync failed', { error }); + logger.error('Failed to queue QM provider mappings sync job', { error }); return c.json( { success: false, error: error instanceof Error ? error.message : 'Unknown error' }, 500 @@ -37,10 +53,18 @@ enhancedSync.post('/symbols/:provider', async c => { try { const provider = c.req.param('provider'); const clearFirst = c.req.query('clear') === 'true'; - const result = await enhancedSyncManager.syncSymbolsFromProvider(provider, clearFirst); - return c.json({ success: true, result }); + const queueManager = QueueManager.getInstance(); + const symbolsQueue = queueManager.getQueue('symbols'); + + const job = await symbolsQueue.addJob(`sync-symbols-${provider}`, { + handler: 'symbols', + operation: `sync-symbols-${provider}`, + payload: { provider, clearFirst }, + }); + + return c.json({ success: true, jobId: job.id, message: `${provider} symbols sync job queued` }); } catch (error) { - logger.error('Enhanced symbol sync failed', { error }); + logger.error('Failed to queue enhanced symbol sync job', { error }); return c.json( { success: false, error: error instanceof Error ? error.message : 'Unknown error' }, 500 @@ -51,8 +75,18 @@ enhancedSync.post('/symbols/:provider', async c => { // Enhanced status endpoints enhancedSync.get('/status/enhanced', async c => { try { - const status = await enhancedSyncManager.getSyncStatus(); - return c.json(status); + const queueManager = QueueManager.getInstance(); + const exchangesQueue = queueManager.getQueue('exchanges'); + + const job = await exchangesQueue.addJob('enhanced-sync-status', { + handler: 'exchanges', + operation: 'enhanced-sync-status', + payload: {}, + }); + + // Wait for job to complete and return result + const result = await job.waitUntilFinished(); + return c.json(result); } catch (error) { logger.error('Failed to get enhanced sync status', { error }); return c.json({ error: error instanceof Error ? error.message : 'Unknown error' }, 500); diff --git a/apps/data-sync-service/src/routes/stats.routes.ts b/apps/data-sync-service/src/routes/stats.routes.ts index 813679c..8112c9c 100644 --- a/apps/data-sync-service/src/routes/stats.routes.ts +++ b/apps/data-sync-service/src/routes/stats.routes.ts @@ -1,6 +1,6 @@ import { Hono } from 'hono'; import { getLogger } from '@stock-bot/logger'; -import { enhancedSyncManager } from '../services/enhanced-sync-manager'; +import { QueueManager } from '@stock-bot/queue'; const logger = getLogger('stats-routes'); const stats = new Hono(); @@ -8,8 +8,18 @@ const stats = new Hono(); // Statistics endpoints stats.get('/exchanges', async c => { try { - const statsData = await enhancedSyncManager.getExchangeStats(); - return c.json(statsData); + const queueManager = QueueManager.getInstance(); + const exchangesQueue = queueManager.getQueue('exchanges'); + + const job = await exchangesQueue.addJob('get-exchange-stats', { + handler: 'exchanges', + operation: 'get-exchange-stats', + payload: {}, + }); + + // Wait for job to complete and return result + const result = await job.waitUntilFinished(); + return c.json(result); } catch (error) { logger.error('Failed to get exchange stats', { error }); return c.json({ error: error instanceof Error ? error.message : 'Unknown error' }, 500); @@ -18,8 +28,18 @@ stats.get('/exchanges', async c => { stats.get('/provider-mappings', async c => { try { - const statsData = await enhancedSyncManager.getProviderMappingStats(); - return c.json(statsData); + const queueManager = QueueManager.getInstance(); + const exchangesQueue = queueManager.getQueue('exchanges'); + + const job = await exchangesQueue.addJob('get-provider-mapping-stats', { + handler: 'exchanges', + operation: 'get-provider-mapping-stats', + payload: {}, + }); + + // Wait for job to complete and return result + const result = await job.waitUntilFinished(); + return c.json(result); } catch (error) { logger.error('Failed to get provider mapping stats', { error }); return c.json({ error: error instanceof Error ? error.message : 'Unknown error' }, 500); diff --git a/apps/data-sync-service/src/routes/sync.routes.ts b/apps/data-sync-service/src/routes/sync.routes.ts index 68eaf47..487e31d 100644 --- a/apps/data-sync-service/src/routes/sync.routes.ts +++ b/apps/data-sync-service/src/routes/sync.routes.ts @@ -1,7 +1,6 @@ import { Hono } from 'hono'; import { getLogger } from '@stock-bot/logger'; -import { syncManager } from '../services/sync-manager'; -import { enhancedSyncManager } from '../services/enhanced-sync-manager'; +import { QueueManager } from '@stock-bot/queue'; const logger = getLogger('sync-routes'); const sync = new Hono(); @@ -9,10 +8,18 @@ const sync = new Hono(); // Manual sync trigger endpoints sync.post('/symbols', async c => { try { - const result = await syncManager.syncQMSymbols(); - return c.json({ success: true, result }); + const queueManager = QueueManager.getInstance(); + const symbolsQueue = queueManager.getQueue('symbols'); + + const job = await symbolsQueue.addJob('sync-qm-symbols', { + handler: 'symbols', + operation: 'sync-qm-symbols', + payload: {}, + }); + + return c.json({ success: true, jobId: job.id, message: 'QM symbols sync job queued' }); } catch (error) { - logger.error('Manual symbol sync failed', { error }); + logger.error('Failed to queue symbol sync job', { error }); return c.json( { success: false, error: error instanceof Error ? error.message : 'Unknown error' }, 500 @@ -22,10 +29,18 @@ sync.post('/symbols', async c => { sync.post('/exchanges', async c => { try { - const result = await syncManager.syncQMExchanges(); - return c.json({ success: true, result }); + const queueManager = QueueManager.getInstance(); + const exchangesQueue = queueManager.getQueue('exchanges'); + + const job = await exchangesQueue.addJob('sync-qm-exchanges', { + handler: 'exchanges', + operation: 'sync-qm-exchanges', + payload: {}, + }); + + return c.json({ success: true, jobId: job.id, message: 'QM exchanges sync job queued' }); } catch (error) { - logger.error('Manual exchange sync failed', { error }); + logger.error('Failed to queue exchange sync job', { error }); return c.json( { success: false, error: error instanceof Error ? error.message : 'Unknown error' }, 500 @@ -36,8 +51,18 @@ sync.post('/exchanges', async c => { // Get sync status sync.get('/status', async c => { try { - const status = await syncManager.getSyncStatus(); - return c.json(status); + const queueManager = QueueManager.getInstance(); + const symbolsQueue = queueManager.getQueue('symbols'); + + const job = await symbolsQueue.addJob('sync-status', { + handler: 'symbols', + operation: 'sync-status', + payload: {}, + }); + + // Wait for job to complete and return result + const result = await job.waitUntilFinished(); + return c.json(result); } catch (error) { logger.error('Failed to get sync status', { error }); return c.json({ error: error instanceof Error ? error.message : 'Unknown error' }, 500); @@ -47,10 +72,20 @@ sync.get('/status', async c => { // Clear data endpoint sync.post('/clear', async c => { try { - const result = await enhancedSyncManager.clearPostgreSQLData(); + const queueManager = QueueManager.getInstance(); + const exchangesQueue = queueManager.getQueue('exchanges'); + + const job = await exchangesQueue.addJob('clear-postgresql-data', { + handler: 'exchanges', + operation: 'clear-postgresql-data', + payload: {}, + }); + + // Wait for job to complete and return result + const result = await job.waitUntilFinished(); return c.json({ success: true, result }); } catch (error) { - logger.error('Clear PostgreSQL data failed', { error }); + logger.error('Failed to clear PostgreSQL data', { error }); return c.json( { success: false, error: error instanceof Error ? error.message : 'Unknown error' }, 500 diff --git a/apps/data-sync-service/src/services/enhanced-sync-manager.ts b/apps/data-sync-service/src/services/enhanced-sync-manager.ts deleted file mode 100644 index 8730c63..0000000 --- a/apps/data-sync-service/src/services/enhanced-sync-manager.ts +++ /dev/null @@ -1,771 +0,0 @@ -/** - * Enhanced Sync Manager - Improved syncing with comprehensive exchange data - */ -import { getLogger } from '@stock-bot/logger'; -import { MongoDBClient } from '@stock-bot/mongodb-client'; -import { PostgreSQLClient } from '@stock-bot/postgres-client'; -import { getMongoDBClient, getPostgreSQLClient } from '../clients'; - -const logger = getLogger('enhanced-sync-manager'); - -interface ExchangeMapping { - id: string; - code: string; - name: string; - country: string; - currency: string; -} - -interface SyncResult { - processed: number; - created: number; - updated: number; - skipped: number; - errors: number; -} - -interface SyncStatus { - provider: string; - dataType: string; - lastSyncAt?: Date; - lastSyncCount: number; - syncErrors?: string; -} - -export class EnhancedSyncManager { - private isInitialized = false; - private mongoClient: MongoDBClient; - private postgresClient: PostgreSQLClient; - private exchangeCache: Map = new Map(); - - async initialize(): Promise { - if (this.isInitialized) { - logger.warn('Enhanced sync manager already initialized'); - return; - } - - try { - this.mongoClient = getMongoDBClient(); - this.postgresClient = getPostgreSQLClient(); - - // Pre-load exchange mappings for performance - await this.loadExchangeCache(); - - this.isInitialized = true; - logger.info('Enhanced sync manager initialized successfully'); - } catch (error) { - logger.error('Failed to initialize enhanced sync manager', { error }); - throw error; - } - } - - /** - * Helper method to get MongoDB database for direct queries - */ - private getMongoDatabase() { - return this.mongoClient.getDatabase(); - } - - async shutdown(): Promise { - if (!this.isInitialized) { - return; - } - - logger.info('Shutting down enhanced sync manager...'); - this.exchangeCache.clear(); - this.isInitialized = false; - logger.info('Enhanced sync manager shut down successfully'); - } - - /** - * Clear all exchange and symbol data from PostgreSQL - */ - async clearPostgreSQLData(): Promise<{ - exchangesCleared: number; - symbolsCleared: number; - mappingsCleared: number; - }> { - if (!this.isInitialized) { - throw new Error('Enhanced sync manager not initialized'); - } - - logger.info('Clearing existing PostgreSQL data...'); - - try { - // Start transaction for atomic operations - await this.postgresClient.query('BEGIN'); - - // Get counts before clearing - const exchangeCountResult = await this.postgresClient.query( - 'SELECT COUNT(*) as count FROM exchanges' - ); - const symbolCountResult = await this.postgresClient.query( - 'SELECT COUNT(*) as count FROM symbols' - ); - const mappingCountResult = await this.postgresClient.query( - 'SELECT COUNT(*) as count FROM provider_mappings' - ); - - const exchangesCleared = parseInt(exchangeCountResult.rows[0].count); - const symbolsCleared = parseInt(symbolCountResult.rows[0].count); - const mappingsCleared = parseInt(mappingCountResult.rows[0].count); - - // Clear data in correct order (respect foreign keys) - await this.postgresClient.query('DELETE FROM provider_mappings'); - await this.postgresClient.query('DELETE FROM symbols'); - await this.postgresClient.query('DELETE FROM exchanges'); - - // Reset sync status - await this.postgresClient.query( - 'UPDATE sync_status SET last_sync_at = NULL, last_sync_count = 0, sync_errors = NULL' - ); - - await this.postgresClient.query('COMMIT'); - - logger.info('PostgreSQL data cleared successfully', { - exchangesCleared, - symbolsCleared, - mappingsCleared, - }); - - return { exchangesCleared, symbolsCleared, mappingsCleared }; - } catch (error) { - await this.postgresClient.query('ROLLBACK'); - logger.error('Failed to clear PostgreSQL data', { error }); - throw error; - } - } - - /** - * Comprehensive exchange sync from all MongoDB providers - */ - async syncAllExchanges(clearFirst: boolean = true): Promise { - if (!this.isInitialized) { - throw new Error('Enhanced sync manager not initialized'); - } - - logger.info('Starting comprehensive exchange sync...', { clearFirst }); - - const result: SyncResult = { - processed: 0, - created: 0, - updated: 0, - skipped: 0, - errors: 0, - }; - - try { - // Clear existing data if requested - if (clearFirst) { - await this.clearPostgreSQLData(); - } - - // Start transaction for atomic operations - await this.postgresClient.query('BEGIN'); - - // 1. Sync from EOD exchanges (comprehensive global data) - const eodResult = await this.syncEODExchanges(); - this.mergeResults(result, eodResult); - - // 2. Sync from IB exchanges (detailed asset information) - const ibResult = await this.syncIBExchanges(); - this.mergeResults(result, ibResult); - - // 3. Update sync status - await this.updateSyncStatus('all', 'exchanges', result.processed); - - await this.postgresClient.query('COMMIT'); - - // Refresh exchange cache with new data - await this.loadExchangeCache(); - - logger.info('Comprehensive exchange sync completed', result); - return result; - } catch (error) { - await this.postgresClient.query('ROLLBACK'); - logger.error('Comprehensive exchange sync failed', { error }); - throw error; - } - } - - /** - * Sync QM provider exchange mappings - */ - async syncQMProviderMappings(): Promise { - if (!this.isInitialized) { - throw new Error('Enhanced sync manager not initialized'); - } - - logger.info('Starting QM provider exchange mappings sync...'); - - const result: SyncResult = { - processed: 0, - created: 0, - updated: 0, - skipped: 0, - errors: 0, - }; - - try { - // Start transaction - await this.postgresClient.query('BEGIN'); - - // Get unique exchange combinations from QM symbols - const db = this.getMongoDatabase(); - const pipeline = [ - { - $group: { - _id: { - exchangeCode: '$exchangeCode', - exchange: '$exchange', - countryCode: '$countryCode', - }, - count: { $sum: 1 }, - sampleExchange: { $first: '$exchange' }, - }, - }, - { - $project: { - exchangeCode: '$_id.exchangeCode', - exchange: '$_id.exchange', - countryCode: '$_id.countryCode', - count: 1, - sampleExchange: 1, - }, - }, - ]; - - const qmExchanges = await db.collection('qmSymbols').aggregate(pipeline).toArray(); - logger.info(`Found ${qmExchanges.length} unique QM exchange combinations`); - - for (const exchange of qmExchanges) { - try { - // Create provider exchange mapping for QM - await this.createProviderExchangeMapping( - 'qm', // provider - exchange.exchangeCode, - exchange.sampleExchange || exchange.exchangeCode, - exchange.countryCode, - exchange.countryCode === 'CA' ? 'CAD' : 'USD', // Simple currency mapping - 0.8 // good confidence for QM data - ); - - result.processed++; - result.created++; - } catch (error) { - logger.error('Failed to process QM exchange mapping', { error, exchange }); - result.errors++; - } - } - - await this.postgresClient.query('COMMIT'); - - logger.info('QM provider exchange mappings sync completed', result); - return result; - } catch (error) { - await this.postgresClient.query('ROLLBACK'); - logger.error('QM provider exchange mappings sync failed', { error }); - throw error; - } - } - - /** - * Enhanced symbol sync with multi-provider mapping - */ - async syncSymbolsFromProvider( - provider: string, - clearFirst: boolean = false - ): Promise { - if (!this.isInitialized) { - throw new Error('Enhanced sync manager not initialized'); - } - - logger.info(`Starting ${provider} symbols sync...`, { clearFirst }); - - const result: SyncResult = { - processed: 0, - created: 0, - updated: 0, - skipped: 0, - errors: 0, - }; - - try { - // Clear existing data if requested (only symbols and mappings, keep exchanges) - if (clearFirst) { - await this.postgresClient.query('BEGIN'); - await this.postgresClient.query('DELETE FROM provider_mappings'); - await this.postgresClient.query('DELETE FROM symbols'); - await this.postgresClient.query('COMMIT'); - logger.info('Cleared existing symbols and mappings before sync'); - } - - // Start transaction - await this.postgresClient.query('BEGIN'); - - let symbols: Record[] = []; - - // Get symbols based on provider - const db = this.getMongoDatabase(); - switch (provider.toLowerCase()) { - case 'qm': - symbols = await db.collection('qmSymbols').find({}).toArray(); - break; - case 'eod': - symbols = await db.collection('eodSymbols').find({}).toArray(); - break; - case 'ib': - symbols = await db.collection('ibSymbols').find({}).toArray(); - break; - default: - throw new Error(`Unsupported provider: ${provider}`); - } - - logger.info(`Found ${symbols.length} ${provider} symbols to process`); - result.processed = symbols.length; - - for (const symbol of symbols) { - try { - await this.processSingleSymbol(symbol, provider, result); - } catch (error) { - logger.error('Failed to process symbol', { - error, - symbol: symbol.symbol || symbol.code, - provider, - }); - result.errors++; - } - } - - // Update sync status - await this.updateSyncStatus(provider, 'symbols', result.processed); - - await this.postgresClient.query('COMMIT'); - - logger.info(`${provider} symbols sync completed`, result); - return result; - } catch (error) { - await this.postgresClient.query('ROLLBACK'); - logger.error(`${provider} symbols sync failed`, { error }); - throw error; - } - } - - /** - * Get comprehensive sync status - */ - async getSyncStatus(): Promise { - const query = ` - SELECT provider, data_type as "dataType", last_sync_at as "lastSyncAt", - last_sync_count as "lastSyncCount", sync_errors as "syncErrors" - FROM sync_status - ORDER BY provider, data_type - `; - const result = await this.postgresClient.query(query); - return result.rows; - } - - /** - * Get exchange statistics - */ - async getExchangeStats(): Promise { - const query = ` - SELECT - COUNT(*) as total_exchanges, - COUNT(CASE WHEN active = true THEN 1 END) as active_exchanges, - COUNT(DISTINCT country) as countries, - COUNT(DISTINCT currency) as currencies - FROM exchanges - `; - const result = await this.postgresClient.query(query); - return result.rows[0]; - } - - /** - * Get provider exchange mapping statistics - */ - async getProviderMappingStats(): Promise { - const query = ` - SELECT - provider, - COUNT(*) as total_mappings, - COUNT(CASE WHEN active = true THEN 1 END) as active_mappings, - COUNT(CASE WHEN verified = true THEN 1 END) as verified_mappings, - COUNT(CASE WHEN auto_mapped = true THEN 1 END) as auto_mapped, - AVG(confidence) as avg_confidence - FROM provider_exchange_mappings - GROUP BY provider - ORDER BY provider - `; - const result = await this.postgresClient.query(query); - return result.rows; - } - - // Private helper methods - - private async loadExchangeCache(): Promise { - const query = 'SELECT id, code, name, country, currency FROM exchanges'; - const result = await this.postgresClient.query(query); - - this.exchangeCache.clear(); - for (const exchange of result.rows) { - this.exchangeCache.set(exchange.code.toUpperCase(), exchange); - } - - logger.info(`Loaded ${this.exchangeCache.size} exchanges into cache`); - } - - private async syncEODExchanges(): Promise { - const db = this.getMongoDatabase(); - const exchanges = await db.collection('eodExchanges').find({ active: true }).toArray(); - const result: SyncResult = { processed: 0, created: 0, updated: 0, skipped: 0, errors: 0 }; - - for (const exchange of exchanges) { - try { - // Create provider exchange mapping for EOD - await this.createProviderExchangeMapping( - 'eod', // provider - exchange.Code, - exchange.Name, - exchange.CountryISO2, - exchange.Currency, - 0.95 // very high confidence for EOD data - ); - - result.processed++; - result.created++; // Count as created mapping - } catch (error) { - logger.error('Failed to process EOD exchange', { error, exchange }); - result.errors++; - } - } - - return result; - } - - private async syncIBExchanges(): Promise { - const db = this.getMongoDatabase(); - const exchanges = await db.collection('ibExchanges').find({}).toArray(); - const result: SyncResult = { processed: 0, created: 0, updated: 0, skipped: 0, errors: 0 }; - - for (const exchange of exchanges) { - try { - // Create provider exchange mapping for IB - await this.createProviderExchangeMapping( - 'ib', // provider - exchange.exchange_id, - exchange.name, - exchange.country_code, - 'USD', // IB doesn't specify currency, default to USD - 0.85 // good confidence for IB data - ); - - result.processed++; - result.created++; // Count as created mapping - } catch (error) { - logger.error('Failed to process IB exchange', { error, exchange }); - result.errors++; - } - } - - return result; - } - - /** - * Create or update a provider exchange mapping - * This method intelligently maps provider exchanges to master exchanges - */ - private async createProviderExchangeMapping( - provider: string, - providerExchangeCode: string, - providerExchangeName: string, - countryCode: string | null, - currency: string | null, - confidence: number - ): Promise { - if (!providerExchangeCode) { - return; - } - - // Check if mapping already exists - const existingMapping = await this.findProviderExchangeMapping(provider, providerExchangeCode); - if (existingMapping) { - // Don't override existing mappings to preserve manual work - return; - } - - // Find or create master exchange - const masterExchange = await this.findOrCreateMasterExchange( - providerExchangeCode, - providerExchangeName, - countryCode, - currency - ); - - // Create the provider exchange mapping - const query = ` - INSERT INTO provider_exchange_mappings - (provider, provider_exchange_code, provider_exchange_name, master_exchange_id, - country_code, currency, confidence, active, auto_mapped) - VALUES ($1, $2, $3, $4, $5, $6, $7, false, true) - ON CONFLICT (provider, provider_exchange_code) DO NOTHING - `; - - await this.postgresClient.query(query, [ - provider, - providerExchangeCode, - providerExchangeName, - masterExchange.id, - countryCode, - currency, - confidence, - ]); - } - - /** - * Find or create a master exchange based on provider data - */ - private async findOrCreateMasterExchange( - providerCode: string, - providerName: string, - countryCode: string | null, - currency: string | null - ): Promise { - // First, try to find exact match - let masterExchange = await this.findExchangeByCode(providerCode); - - if (masterExchange) { - return masterExchange; - } - - // Try to find by similar codes (basic mapping) - const basicMapping = this.getBasicExchangeMapping(providerCode); - if (basicMapping) { - masterExchange = await this.findExchangeByCode(basicMapping); - if (masterExchange) { - return masterExchange; - } - } - - // Create new master exchange (inactive by default) - const query = ` - INSERT INTO exchanges (code, name, country, currency, active) - VALUES ($1, $2, $3, $4, false) - ON CONFLICT (code) DO UPDATE SET - name = COALESCE(EXCLUDED.name, exchanges.name), - country = COALESCE(EXCLUDED.country, exchanges.country), - currency = COALESCE(EXCLUDED.currency, exchanges.currency) - RETURNING id, code, name, country, currency - `; - - const result = await this.postgresClient.query(query, [ - providerCode, - providerName || providerCode, - countryCode || 'US', - currency || 'USD', - ]); - - const newExchange = result.rows[0]; - - // Update cache - this.exchangeCache.set(newExchange.code.toUpperCase(), newExchange); - - return newExchange; - } - - /** - * Basic exchange code mapping for common cases - */ - private getBasicExchangeMapping(providerCode: string): string | null { - const mappings: Record = { - NYE: 'NYSE', - NAS: 'NASDAQ', - TO: 'TSX', - LN: 'LSE', - LON: 'LSE', - }; - - return mappings[providerCode.toUpperCase()] || null; - } - - private async findProviderExchangeMapping( - provider: string, - providerExchangeCode: string - ): Promise { - const query = - 'SELECT * FROM provider_exchange_mappings WHERE provider = $1 AND provider_exchange_code = $2'; - const result = await this.postgresClient.query(query, [provider, providerExchangeCode]); - return result.rows[0] || null; - } - - private async processSingleSymbol( - symbol: any, - provider: string, - result: SyncResult - ): Promise { - const symbolCode = symbol.symbol || symbol.code; - const exchangeCode = symbol.exchangeCode || symbol.exchange || symbol.exchange_id; - - if (!symbolCode || !exchangeCode) { - result.skipped++; - return; - } - - // Find active provider exchange mapping - const providerMapping = await this.findActiveProviderExchangeMapping(provider, exchangeCode); - - if (!providerMapping) { - result.skipped++; - return; - } - - // Check if symbol exists - const existingSymbol = await this.findSymbolByCodeAndExchange( - symbolCode, - providerMapping.master_exchange_id - ); - - if (existingSymbol) { - await this.updateSymbol(existingSymbol.id, symbol); - await this.upsertProviderMapping(existingSymbol.id, provider, symbol); - result.updated++; - } else { - const newSymbolId = await this.createSymbol(symbol, providerMapping.master_exchange_id); - await this.upsertProviderMapping(newSymbolId, provider, symbol); - result.created++; - } - } - - private async findActiveProviderExchangeMapping( - provider: string, - providerExchangeCode: string - ): Promise { - const query = ` - SELECT pem.*, e.code as master_exchange_code - FROM provider_exchange_mappings pem - JOIN exchanges e ON pem.master_exchange_id = e.id - WHERE pem.provider = $1 AND pem.provider_exchange_code = $2 AND pem.active = true - `; - const result = await this.postgresClient.query(query, [provider, providerExchangeCode]); - return result.rows[0] || null; - } - - private async findExchangeByCode(code: string): Promise { - const query = 'SELECT * FROM exchanges WHERE code = $1'; - const result = await this.postgresClient.query(query, [code]); - return result.rows[0] || null; - } - - private async findSymbolByCodeAndExchange(symbol: string, exchangeId: string): Promise { - const query = 'SELECT * FROM symbols WHERE symbol = $1 AND exchange_id = $2'; - const result = await this.postgresClient.query(query, [symbol, exchangeId]); - return result.rows[0] || null; - } - - private async createSymbol(symbol: any, exchangeId: string): Promise { - const query = ` - INSERT INTO symbols (symbol, exchange_id, company_name, country, currency) - VALUES ($1, $2, $3, $4, $5) - RETURNING id - `; - - const result = await this.postgresClient.query(query, [ - symbol.symbol || symbol.code, - exchangeId, - symbol.companyName || symbol.name || symbol.company_name, - symbol.countryCode || symbol.country_code || 'US', - symbol.currency || 'USD', - ]); - - return result.rows[0].id; - } - - private async updateSymbol(symbolId: string, symbol: any): Promise { - const query = ` - UPDATE symbols - SET company_name = COALESCE($2, company_name), - country = COALESCE($3, country), - currency = COALESCE($4, currency), - updated_at = NOW() - WHERE id = $1 - `; - - await this.postgresClient.query(query, [ - symbolId, - symbol.companyName || symbol.name || symbol.company_name, - symbol.countryCode || symbol.country_code, - symbol.currency, - ]); - } - - private async upsertProviderMapping( - symbolId: string, - provider: string, - symbol: any - ): Promise { - const query = ` - INSERT INTO provider_mappings - (symbol_id, provider, provider_symbol, provider_exchange, last_seen) - VALUES ($1, $2, $3, $4, NOW()) - ON CONFLICT (provider, provider_symbol) - DO UPDATE SET - symbol_id = EXCLUDED.symbol_id, - provider_exchange = EXCLUDED.provider_exchange, - last_seen = NOW() - `; - - await this.postgresClient.query(query, [ - symbolId, - provider, - symbol.qmSearchCode || symbol.symbol || symbol.code, - symbol.exchangeCode || symbol.exchange || symbol.exchange_id, - ]); - } - - private async updateSyncStatus(provider: string, dataType: string, count: number): Promise { - const query = ` - INSERT INTO sync_status (provider, data_type, last_sync_at, last_sync_count, sync_errors) - VALUES ($1, $2, NOW(), $3, NULL) - ON CONFLICT (provider, data_type) - DO UPDATE SET - last_sync_at = NOW(), - last_sync_count = EXCLUDED.last_sync_count, - sync_errors = NULL, - updated_at = NOW() - `; - - await this.postgresClient.query(query, [provider, dataType, count]); - } - - private normalizeCountryCode(countryCode: string): string | null { - if (!countryCode) { - return null; - } - - // Map common country variations to ISO 2-letter codes - const countryMap: Record = { - 'United States': 'US', - USA: 'US', - Canada: 'CA', - 'United Kingdom': 'GB', - UK: 'GB', - Germany: 'DE', - Japan: 'JP', - Australia: 'AU', - }; - - const normalized = countryMap[countryCode]; - return normalized || (countryCode.length === 2 ? countryCode.toUpperCase() : null); - } - - private mergeResults(target: SyncResult, source: SyncResult): void { - target.processed += source.processed; - target.created += source.created; - target.updated += source.updated; - target.skipped += source.skipped; - target.errors += source.errors; - } -} - -// Export singleton instance -export const enhancedSyncManager = new EnhancedSyncManager(); diff --git a/apps/data-sync-service/src/services/index.ts b/apps/data-sync-service/src/services/index.ts deleted file mode 100644 index 43ab0c3..0000000 --- a/apps/data-sync-service/src/services/index.ts +++ /dev/null @@ -1,2 +0,0 @@ -export { syncManager } from './sync-manager'; -export { enhancedSyncManager } from './enhanced-sync-manager'; diff --git a/apps/data-sync-service/src/services/sync-manager.ts b/apps/data-sync-service/src/services/sync-manager.ts deleted file mode 100644 index 884e724..0000000 --- a/apps/data-sync-service/src/services/sync-manager.ts +++ /dev/null @@ -1,306 +0,0 @@ -/** - * Sync Manager - Handles syncing raw MongoDB data to PostgreSQL master records - */ -import { getLogger } from '@stock-bot/logger'; -import { getMongoDBClient, getPostgreSQLClient } from '../clients'; - -const logger = getLogger('sync-manager'); - -export class SyncManager { - private isInitialized = false; - private mongoClient: unknown; - private postgresClient: unknown; - - async initialize(): Promise { - if (this.isInitialized) { - logger.warn('Sync manager already initialized'); - return; - } - - try { - this.mongoClient = getMongoDBClient(); - this.postgresClient = getPostgreSQLClient(); - - this.isInitialized = true; - logger.info('Sync manager initialized successfully'); - } catch (error) { - logger.error('Failed to initialize sync manager', { error }); - throw error; - } - } - - async shutdown(): Promise { - if (!this.isInitialized) { - return; - } - - logger.info('Shutting down sync manager...'); - this.isInitialized = false; - logger.info('Sync manager shut down successfully'); - } - - /** - * Sync QM symbols from MongoDB to PostgreSQL - */ - async syncQMSymbols(): Promise<{ processed: number; created: number; updated: number }> { - if (!this.isInitialized) { - throw new Error('Sync manager not initialized'); - } - - logger.info('Starting QM symbols sync...'); - - try { - // 1. Get all QM symbols from MongoDB - const qmSymbols = await this.mongoClient.find('qmSymbols', {}); - logger.info(`Found ${qmSymbols.length} QM symbols to process`); - - let created = 0; - let updated = 0; - - for (const symbol of qmSymbols) { - try { - // 2. Resolve exchange - const exchangeId = await this.resolveExchange(symbol.exchangeCode || symbol.exchange); - - if (!exchangeId) { - logger.warn('Unknown exchange, skipping symbol', { - symbol: symbol.symbol, - exchange: symbol.exchangeCode || symbol.exchange, - }); - continue; - } - - // 3. Check if symbol exists - const existingSymbol = await this.findSymbol(symbol.symbol, exchangeId); - - if (existingSymbol) { - // Update existing - await this.updateSymbol(existingSymbol.id, symbol); - await this.upsertProviderMapping(existingSymbol.id, 'qm', symbol); - updated++; - } else { - // Create new - const newSymbolId = await this.createSymbol(symbol, exchangeId); - await this.upsertProviderMapping(newSymbolId, 'qm', symbol); - created++; - } - } catch (error) { - logger.error('Failed to process symbol', { error, symbol: symbol.symbol }); - } - } - - // 4. Update sync status - await this.updateSyncStatus('qm', 'symbols', qmSymbols.length); - - const result = { processed: qmSymbols.length, created, updated }; - logger.info('QM symbols sync completed', result); - return result; - } catch (error) { - logger.error('QM symbols sync failed', { error }); - throw error; - } - } - - /** - * Sync QM exchanges from MongoDB to PostgreSQL - */ - async syncQMExchanges(): Promise<{ processed: number; created: number; updated: number }> { - if (!this.isInitialized) { - throw new Error('Sync manager not initialized'); - } - - logger.info('Starting QM exchanges sync...'); - - try { - // 1. Get all QM exchanges from MongoDB - const qmExchanges = await this.mongoClient.find('qmExchanges', {}); - logger.info(`Found ${qmExchanges.length} QM exchanges to process`); - - let created = 0; - let updated = 0; - - for (const exchange of qmExchanges) { - try { - // 2. Check if exchange exists - const existingExchange = await this.findExchange(exchange.exchangeCode); - - if (existingExchange) { - // Update existing - await this.updateExchange(existingExchange.id, exchange); - updated++; - } else { - // Create new - await this.createExchange(exchange); - created++; - } - } catch (error) { - logger.error('Failed to process exchange', { error, exchange: exchange.exchangeCode }); - } - } - - // 3. Update sync status - await this.updateSyncStatus('qm', 'exchanges', qmExchanges.length); - - const result = { processed: qmExchanges.length, created, updated }; - logger.info('QM exchanges sync completed', result); - return result; - } catch (error) { - logger.error('QM exchanges sync failed', { error }); - throw error; - } - } - - /** - * Get sync status for all providers - */ - async getSyncStatus(): Promise[]> { - const query = 'SELECT * FROM sync_status ORDER BY provider, data_type'; - const result = await this.postgresClient.query(query); - return result.rows; - } - - // Helper methods - - private async resolveExchange(exchangeCode: string): Promise { - if (!exchangeCode) {return null;} - - // Simple mapping - expand this as needed - const exchangeMap: Record = { - NASDAQ: 'NASDAQ', - NYSE: 'NYSE', - TSX: 'TSX', - TSE: 'TSX', // TSE maps to TSX - LSE: 'LSE', - CME: 'CME', - }; - - const normalizedCode = exchangeMap[exchangeCode.toUpperCase()]; - if (!normalizedCode) { - return null; - } - - const query = 'SELECT id FROM exchanges WHERE code = $1'; - const result = await this.postgresClient.query(query, [normalizedCode]); - return result.rows[0]?.id || null; - } - - private async findSymbol(symbol: string, exchangeId: string): Promise { - const query = 'SELECT * FROM symbols WHERE symbol = $1 AND exchange_id = $2'; - const result = await this.postgresClient.query(query, [symbol, exchangeId]); - return result.rows[0] || null; - } - - private async createSymbol(qmSymbol: any, exchangeId: string): Promise { - const query = ` - INSERT INTO symbols (symbol, exchange_id, company_name, country, currency) - VALUES ($1, $2, $3, $4, $5) - RETURNING id - `; - - const result = await this.postgresClient.query(query, [ - qmSymbol.symbol, - exchangeId, - qmSymbol.companyName || qmSymbol.name, - qmSymbol.countryCode || 'US', - qmSymbol.currency || 'USD', - ]); - - return result.rows[0].id; - } - - private async updateSymbol(symbolId: string, qmSymbol: any): Promise { - const query = ` - UPDATE symbols - SET company_name = COALESCE($2, company_name), - country = COALESCE($3, country), - currency = COALESCE($4, currency), - updated_at = NOW() - WHERE id = $1 - `; - - await this.postgresClient.query(query, [ - symbolId, - qmSymbol.companyName || qmSymbol.name, - qmSymbol.countryCode, - qmSymbol.currency, - ]); - } - - private async upsertProviderMapping( - symbolId: string, - provider: string, - qmSymbol: any - ): Promise { - const query = ` - INSERT INTO provider_mappings - (symbol_id, provider, provider_symbol, provider_exchange, last_seen) - VALUES ($1, $2, $3, $4, NOW()) - ON CONFLICT (provider, provider_symbol) - DO UPDATE SET - symbol_id = EXCLUDED.symbol_id, - provider_exchange = EXCLUDED.provider_exchange, - last_seen = NOW() - `; - - await this.postgresClient.query(query, [ - symbolId, - provider, - qmSymbol.qmSearchCode || qmSymbol.symbol, - qmSymbol.exchangeCode || qmSymbol.exchange, - ]); - } - - private async findExchange(exchangeCode: string): Promise { - const query = 'SELECT * FROM exchanges WHERE code = $1'; - const result = await this.postgresClient.query(query, [exchangeCode]); - return result.rows[0] || null; - } - - private async createExchange(qmExchange: any): Promise { - const query = ` - INSERT INTO exchanges (code, name, country, currency, visible) - VALUES ($1, $2, $3, $4, $5) - ON CONFLICT (code) DO NOTHING - `; - - await this.postgresClient.query(query, [ - qmExchange.exchangeCode || qmExchange.exchange, - qmExchange.exchangeShortName || qmExchange.name, - qmExchange.countryCode || 'US', - 'USD', // Default currency, can be improved - true, // New exchanges are visible by default - ]); - } - - private async updateExchange(exchangeId: string, qmExchange: any): Promise { - const query = ` - UPDATE exchanges - SET name = COALESCE($2, name), - country = COALESCE($3, country), - updated_at = NOW() - WHERE id = $1 - `; - - await this.postgresClient.query(query, [ - exchangeId, - qmExchange.exchangeShortName || qmExchange.name, - qmExchange.countryCode, - ]); - } - - private async updateSyncStatus(provider: string, dataType: string, count: number): Promise { - const query = ` - UPDATE sync_status - SET last_sync_at = NOW(), - last_sync_count = $3, - sync_errors = NULL, - updated_at = NOW() - WHERE provider = $1 AND data_type = $2 - `; - - await this.postgresClient.query(query, [provider, dataType, count]); - } -} - -// Export singleton instance -export const syncManager = new SyncManager(); diff --git a/apps/data-sync-service/src/types/job-payloads.ts b/apps/data-sync-service/src/types/job-payloads.ts new file mode 100644 index 0000000..6d53852 --- /dev/null +++ b/apps/data-sync-service/src/types/job-payloads.ts @@ -0,0 +1,27 @@ +export interface JobPayload { + [key: string]: any; +} + +export interface SyncResult { + processed: number; + created: number; + updated: number; + skipped: number; + errors: number; +} + +export interface SyncStatus { + provider: string; + dataType: string; + lastSyncAt?: Date; + lastSyncCount: number; + syncErrors?: string; +} + +export interface ExchangeMapping { + id: string; + code: string; + name: string; + country: string; + currency: string; +} \ No newline at end of file diff --git a/libs/config/src/schemas/service.schema.ts b/libs/config/src/schemas/service.schema.ts index 10f11e5..5268c85 100644 --- a/libs/config/src/schemas/service.schema.ts +++ b/libs/config/src/schemas/service.schema.ts @@ -41,8 +41,8 @@ export const queueConfigSchema = z.object({ type: z.enum(['exponential', 'fixed']).default('exponential'), delay: z.number().default(1000), }).default({}), - removeOnComplete: z.boolean().default(true), - removeOnFail: z.boolean().default(false), + removeOnComplete: z.number().default(10), + removeOnFail: z.number().default(5), }).default({}), });