stock-bot/apps/data-sync-service/src/services/sync-manager.ts
2025-06-20 09:25:06 -04:00

306 lines
9 KiB
TypeScript

/**
* Sync Manager - Handles syncing raw MongoDB data to PostgreSQL master records
*/
import { getLogger } from '@stock-bot/logger';
import { getMongoDBClient, getPostgreSQLClient } from '../clients';
const logger = getLogger('sync-manager');
export class SyncManager {
private isInitialized = false;
private mongoClient: unknown;
private postgresClient: unknown;
async initialize(): Promise<void> {
if (this.isInitialized) {
logger.warn('Sync manager already initialized');
return;
}
try {
this.mongoClient = getMongoDBClient();
this.postgresClient = getPostgreSQLClient();
this.isInitialized = true;
logger.info('Sync manager initialized successfully');
} catch (error) {
logger.error('Failed to initialize sync manager', { error });
throw error;
}
}
async shutdown(): Promise<void> {
if (!this.isInitialized) {
return;
}
logger.info('Shutting down sync manager...');
this.isInitialized = false;
logger.info('Sync manager shut down successfully');
}
/**
* Sync QM symbols from MongoDB to PostgreSQL
*/
async syncQMSymbols(): Promise<{ processed: number; created: number; updated: number }> {
if (!this.isInitialized) {
throw new Error('Sync manager not initialized');
}
logger.info('Starting QM symbols sync...');
try {
// 1. Get all QM symbols from MongoDB
const qmSymbols = await this.mongoClient.find('qmSymbols', {});
logger.info(`Found ${qmSymbols.length} QM symbols to process`);
let created = 0;
let updated = 0;
for (const symbol of qmSymbols) {
try {
// 2. Resolve exchange
const exchangeId = await this.resolveExchange(symbol.exchangeCode || symbol.exchange);
if (!exchangeId) {
logger.warn('Unknown exchange, skipping symbol', {
symbol: symbol.symbol,
exchange: symbol.exchangeCode || symbol.exchange,
});
continue;
}
// 3. Check if symbol exists
const existingSymbol = await this.findSymbol(symbol.symbol, exchangeId);
if (existingSymbol) {
// Update existing
await this.updateSymbol(existingSymbol.id, symbol);
await this.upsertProviderMapping(existingSymbol.id, 'qm', symbol);
updated++;
} else {
// Create new
const newSymbolId = await this.createSymbol(symbol, exchangeId);
await this.upsertProviderMapping(newSymbolId, 'qm', symbol);
created++;
}
} catch (error) {
logger.error('Failed to process symbol', { error, symbol: symbol.symbol });
}
}
// 4. Update sync status
await this.updateSyncStatus('qm', 'symbols', qmSymbols.length);
const result = { processed: qmSymbols.length, created, updated };
logger.info('QM symbols sync completed', result);
return result;
} catch (error) {
logger.error('QM symbols sync failed', { error });
throw error;
}
}
/**
* Sync QM exchanges from MongoDB to PostgreSQL
*/
async syncQMExchanges(): Promise<{ processed: number; created: number; updated: number }> {
if (!this.isInitialized) {
throw new Error('Sync manager not initialized');
}
logger.info('Starting QM exchanges sync...');
try {
// 1. Get all QM exchanges from MongoDB
const qmExchanges = await this.mongoClient.find('qmExchanges', {});
logger.info(`Found ${qmExchanges.length} QM exchanges to process`);
let created = 0;
let updated = 0;
for (const exchange of qmExchanges) {
try {
// 2. Check if exchange exists
const existingExchange = await this.findExchange(exchange.exchangeCode);
if (existingExchange) {
// Update existing
await this.updateExchange(existingExchange.id, exchange);
updated++;
} else {
// Create new
await this.createExchange(exchange);
created++;
}
} catch (error) {
logger.error('Failed to process exchange', { error, exchange: exchange.exchangeCode });
}
}
// 3. Update sync status
await this.updateSyncStatus('qm', 'exchanges', qmExchanges.length);
const result = { processed: qmExchanges.length, created, updated };
logger.info('QM exchanges sync completed', result);
return result;
} catch (error) {
logger.error('QM exchanges sync failed', { error });
throw error;
}
}
/**
* Get sync status for all providers
*/
async getSyncStatus(): Promise<Record<string, unknown>[]> {
const query = 'SELECT * FROM sync_status ORDER BY provider, data_type';
const result = await this.postgresClient.query(query);
return result.rows;
}
// Helper methods
private async resolveExchange(exchangeCode: string): Promise<string | null> {
if (!exchangeCode) {return null;}
// Simple mapping - expand this as needed
const exchangeMap: Record<string, string> = {
NASDAQ: 'NASDAQ',
NYSE: 'NYSE',
TSX: 'TSX',
TSE: 'TSX', // TSE maps to TSX
LSE: 'LSE',
CME: 'CME',
};
const normalizedCode = exchangeMap[exchangeCode.toUpperCase()];
if (!normalizedCode) {
return null;
}
const query = 'SELECT id FROM exchanges WHERE code = $1';
const result = await this.postgresClient.query(query, [normalizedCode]);
return result.rows[0]?.id || null;
}
private async findSymbol(symbol: string, exchangeId: string): Promise<any> {
const query = 'SELECT * FROM symbols WHERE symbol = $1 AND exchange_id = $2';
const result = await this.postgresClient.query(query, [symbol, exchangeId]);
return result.rows[0] || null;
}
private async createSymbol(qmSymbol: any, exchangeId: string): Promise<string> {
const query = `
INSERT INTO symbols (symbol, exchange_id, company_name, country, currency)
VALUES ($1, $2, $3, $4, $5)
RETURNING id
`;
const result = await this.postgresClient.query(query, [
qmSymbol.symbol,
exchangeId,
qmSymbol.companyName || qmSymbol.name,
qmSymbol.countryCode || 'US',
qmSymbol.currency || 'USD',
]);
return result.rows[0].id;
}
private async updateSymbol(symbolId: string, qmSymbol: any): Promise<void> {
const query = `
UPDATE symbols
SET company_name = COALESCE($2, company_name),
country = COALESCE($3, country),
currency = COALESCE($4, currency),
updated_at = NOW()
WHERE id = $1
`;
await this.postgresClient.query(query, [
symbolId,
qmSymbol.companyName || qmSymbol.name,
qmSymbol.countryCode,
qmSymbol.currency,
]);
}
private async upsertProviderMapping(
symbolId: string,
provider: string,
qmSymbol: any
): Promise<void> {
const query = `
INSERT INTO provider_mappings
(symbol_id, provider, provider_symbol, provider_exchange, last_seen)
VALUES ($1, $2, $3, $4, NOW())
ON CONFLICT (provider, provider_symbol)
DO UPDATE SET
symbol_id = EXCLUDED.symbol_id,
provider_exchange = EXCLUDED.provider_exchange,
last_seen = NOW()
`;
await this.postgresClient.query(query, [
symbolId,
provider,
qmSymbol.qmSearchCode || qmSymbol.symbol,
qmSymbol.exchangeCode || qmSymbol.exchange,
]);
}
private async findExchange(exchangeCode: string): Promise<any> {
const query = 'SELECT * FROM exchanges WHERE code = $1';
const result = await this.postgresClient.query(query, [exchangeCode]);
return result.rows[0] || null;
}
private async createExchange(qmExchange: any): Promise<void> {
const query = `
INSERT INTO exchanges (code, name, country, currency, visible)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (code) DO NOTHING
`;
await this.postgresClient.query(query, [
qmExchange.exchangeCode || qmExchange.exchange,
qmExchange.exchangeShortName || qmExchange.name,
qmExchange.countryCode || 'US',
'USD', // Default currency, can be improved
true, // New exchanges are visible by default
]);
}
private async updateExchange(exchangeId: string, qmExchange: any): Promise<void> {
const query = `
UPDATE exchanges
SET name = COALESCE($2, name),
country = COALESCE($3, country),
updated_at = NOW()
WHERE id = $1
`;
await this.postgresClient.query(query, [
exchangeId,
qmExchange.exchangeShortName || qmExchange.name,
qmExchange.countryCode,
]);
}
private async updateSyncStatus(provider: string, dataType: string, count: number): Promise<void> {
const query = `
UPDATE sync_status
SET last_sync_at = NOW(),
last_sync_count = $3,
sync_errors = NULL,
updated_at = NOW()
WHERE provider = $1 AND data_type = $2
`;
await this.postgresClient.query(query, [provider, dataType, count]);
}
}
// Export singleton instance
export const syncManager = new SyncManager();