added new exchanges system
This commit is contained in:
parent
95eda4a842
commit
263e9513b7
98 changed files with 4643 additions and 1496 deletions
306
apps/data-sync-service/src/services/sync-manager.ts
Normal file
306
apps/data-sync-service/src/services/sync-manager.ts
Normal file
|
|
@ -0,0 +1,306 @@
|
|||
/**
|
||||
* Sync Manager - Handles syncing raw MongoDB data to PostgreSQL master records
|
||||
*/
|
||||
import { getLogger } from '@stock-bot/logger';
|
||||
import { getMongoDBClient } from '@stock-bot/mongodb-client';
|
||||
import { getPostgreSQLClient } from '@stock-bot/postgres-client';
|
||||
|
||||
const logger = getLogger('sync-manager');
|
||||
|
||||
export class SyncManager {
|
||||
private isInitialized = false;
|
||||
private mongoClient: any;
|
||||
private postgresClient: any;
|
||||
|
||||
async initialize(): Promise<void> {
|
||||
if (this.isInitialized) {
|
||||
logger.warn('Sync manager already initialized');
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
this.mongoClient = getMongoDBClient();
|
||||
this.postgresClient = getPostgreSQLClient();
|
||||
|
||||
this.isInitialized = true;
|
||||
logger.info('Sync manager initialized successfully');
|
||||
} catch (error) {
|
||||
logger.error('Failed to initialize sync manager', { error });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async shutdown(): Promise<void> {
|
||||
if (!this.isInitialized) {
|
||||
return;
|
||||
}
|
||||
|
||||
logger.info('Shutting down sync manager...');
|
||||
this.isInitialized = false;
|
||||
logger.info('Sync manager shut down successfully');
|
||||
}
|
||||
|
||||
/**
|
||||
* Sync QM symbols from MongoDB to PostgreSQL
|
||||
*/
|
||||
async syncQMSymbols(): Promise<{ processed: number; created: number; updated: number }> {
|
||||
if (!this.isInitialized) {
|
||||
throw new Error('Sync manager not initialized');
|
||||
}
|
||||
|
||||
logger.info('Starting QM symbols sync...');
|
||||
|
||||
try {
|
||||
// 1. Get all QM symbols from MongoDB
|
||||
const qmSymbols = await this.mongoClient.find('qmSymbols', {});
|
||||
logger.info(`Found ${qmSymbols.length} QM symbols to process`);
|
||||
|
||||
let created = 0;
|
||||
let updated = 0;
|
||||
|
||||
for (const symbol of qmSymbols) {
|
||||
try {
|
||||
// 2. Resolve exchange
|
||||
const exchangeId = await this.resolveExchange(symbol.exchangeCode || symbol.exchange);
|
||||
|
||||
if (!exchangeId) {
|
||||
logger.warn('Unknown exchange, skipping symbol', {
|
||||
symbol: symbol.symbol,
|
||||
exchange: symbol.exchangeCode || symbol.exchange,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
// 3. Check if symbol exists
|
||||
const existingSymbol = await this.findSymbol(symbol.symbol, exchangeId);
|
||||
|
||||
if (existingSymbol) {
|
||||
// Update existing
|
||||
await this.updateSymbol(existingSymbol.id, symbol);
|
||||
await this.upsertProviderMapping(existingSymbol.id, 'qm', symbol);
|
||||
updated++;
|
||||
} else {
|
||||
// Create new
|
||||
const newSymbolId = await this.createSymbol(symbol, exchangeId);
|
||||
await this.upsertProviderMapping(newSymbolId, 'qm', symbol);
|
||||
created++;
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Failed to process symbol', { error, symbol: symbol.symbol });
|
||||
}
|
||||
}
|
||||
|
||||
// 4. Update sync status
|
||||
await this.updateSyncStatus('qm', 'symbols', qmSymbols.length);
|
||||
|
||||
const result = { processed: qmSymbols.length, created, updated };
|
||||
logger.info('QM symbols sync completed', result);
|
||||
return result;
|
||||
} catch (error) {
|
||||
logger.error('QM symbols sync failed', { error });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sync QM exchanges from MongoDB to PostgreSQL
|
||||
*/
|
||||
async syncQMExchanges(): Promise<{ processed: number; created: number; updated: number }> {
|
||||
if (!this.isInitialized) {
|
||||
throw new Error('Sync manager not initialized');
|
||||
}
|
||||
|
||||
logger.info('Starting QM exchanges sync...');
|
||||
|
||||
try {
|
||||
// 1. Get all QM exchanges from MongoDB
|
||||
const qmExchanges = await this.mongoClient.find('qmExchanges', {});
|
||||
logger.info(`Found ${qmExchanges.length} QM exchanges to process`);
|
||||
|
||||
let created = 0;
|
||||
let updated = 0;
|
||||
|
||||
for (const exchange of qmExchanges) {
|
||||
try {
|
||||
// 2. Check if exchange exists
|
||||
const existingExchange = await this.findExchange(exchange.exchangeCode);
|
||||
|
||||
if (existingExchange) {
|
||||
// Update existing
|
||||
await this.updateExchange(existingExchange.id, exchange);
|
||||
updated++;
|
||||
} else {
|
||||
// Create new
|
||||
await this.createExchange(exchange);
|
||||
created++;
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Failed to process exchange', { error, exchange: exchange.exchangeCode });
|
||||
}
|
||||
}
|
||||
|
||||
// 3. Update sync status
|
||||
await this.updateSyncStatus('qm', 'exchanges', qmExchanges.length);
|
||||
|
||||
const result = { processed: qmExchanges.length, created, updated };
|
||||
logger.info('QM exchanges sync completed', result);
|
||||
return result;
|
||||
} catch (error) {
|
||||
logger.error('QM exchanges sync failed', { error });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get sync status for all providers
|
||||
*/
|
||||
async getSyncStatus(): Promise<any[]> {
|
||||
const query = 'SELECT * FROM sync_status ORDER BY provider, data_type';
|
||||
const result = await this.postgresClient.query(query);
|
||||
return result.rows;
|
||||
}
|
||||
|
||||
// Helper methods
|
||||
|
||||
private async resolveExchange(exchangeCode: string): Promise<string | null> {
|
||||
if (!exchangeCode) return null;
|
||||
|
||||
// Simple mapping - expand this as needed
|
||||
const exchangeMap: Record<string, string> = {
|
||||
NASDAQ: 'NASDAQ',
|
||||
NYSE: 'NYSE',
|
||||
TSX: 'TSX',
|
||||
TSE: 'TSX', // TSE maps to TSX
|
||||
LSE: 'LSE',
|
||||
CME: 'CME',
|
||||
};
|
||||
|
||||
const normalizedCode = exchangeMap[exchangeCode.toUpperCase()];
|
||||
if (!normalizedCode) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const query = 'SELECT id FROM exchanges WHERE code = $1';
|
||||
const result = await this.postgresClient.query(query, [normalizedCode]);
|
||||
return result.rows[0]?.id || null;
|
||||
}
|
||||
|
||||
private async findSymbol(symbol: string, exchangeId: string): Promise<any> {
|
||||
const query = 'SELECT * FROM symbols WHERE symbol = $1 AND exchange_id = $2';
|
||||
const result = await this.postgresClient.query(query, [symbol, exchangeId]);
|
||||
return result.rows[0] || null;
|
||||
}
|
||||
|
||||
private async createSymbol(qmSymbol: any, exchangeId: string): Promise<string> {
|
||||
const query = `
|
||||
INSERT INTO symbols (symbol, exchange_id, company_name, country, currency)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
RETURNING id
|
||||
`;
|
||||
|
||||
const result = await this.postgresClient.query(query, [
|
||||
qmSymbol.symbol,
|
||||
exchangeId,
|
||||
qmSymbol.companyName || qmSymbol.name,
|
||||
qmSymbol.countryCode || 'US',
|
||||
qmSymbol.currency || 'USD',
|
||||
]);
|
||||
|
||||
return result.rows[0].id;
|
||||
}
|
||||
|
||||
private async updateSymbol(symbolId: string, qmSymbol: any): Promise<void> {
|
||||
const query = `
|
||||
UPDATE symbols
|
||||
SET company_name = COALESCE($2, company_name),
|
||||
country = COALESCE($3, country),
|
||||
currency = COALESCE($4, currency),
|
||||
updated_at = NOW()
|
||||
WHERE id = $1
|
||||
`;
|
||||
|
||||
await this.postgresClient.query(query, [
|
||||
symbolId,
|
||||
qmSymbol.companyName || qmSymbol.name,
|
||||
qmSymbol.countryCode,
|
||||
qmSymbol.currency,
|
||||
]);
|
||||
}
|
||||
|
||||
private async upsertProviderMapping(
|
||||
symbolId: string,
|
||||
provider: string,
|
||||
qmSymbol: any
|
||||
): Promise<void> {
|
||||
const query = `
|
||||
INSERT INTO provider_mappings
|
||||
(symbol_id, provider, provider_symbol, provider_exchange, last_seen)
|
||||
VALUES ($1, $2, $3, $4, NOW())
|
||||
ON CONFLICT (provider, provider_symbol)
|
||||
DO UPDATE SET
|
||||
symbol_id = EXCLUDED.symbol_id,
|
||||
provider_exchange = EXCLUDED.provider_exchange,
|
||||
last_seen = NOW()
|
||||
`;
|
||||
|
||||
await this.postgresClient.query(query, [
|
||||
symbolId,
|
||||
provider,
|
||||
qmSymbol.qmSearchCode || qmSymbol.symbol,
|
||||
qmSymbol.exchangeCode || qmSymbol.exchange,
|
||||
]);
|
||||
}
|
||||
|
||||
private async findExchange(exchangeCode: string): Promise<any> {
|
||||
const query = 'SELECT * FROM exchanges WHERE code = $1';
|
||||
const result = await this.postgresClient.query(query, [exchangeCode]);
|
||||
return result.rows[0] || null;
|
||||
}
|
||||
|
||||
private async createExchange(qmExchange: any): Promise<void> {
|
||||
const query = `
|
||||
INSERT INTO exchanges (code, name, country, currency)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
ON CONFLICT (code) DO NOTHING
|
||||
`;
|
||||
|
||||
await this.postgresClient.query(query, [
|
||||
qmExchange.exchangeCode || qmExchange.exchange,
|
||||
qmExchange.exchangeShortName || qmExchange.name,
|
||||
qmExchange.countryCode || 'US',
|
||||
'USD', // Default currency, can be improved
|
||||
]);
|
||||
}
|
||||
|
||||
private async updateExchange(exchangeId: string, qmExchange: any): Promise<void> {
|
||||
const query = `
|
||||
UPDATE exchanges
|
||||
SET name = COALESCE($2, name),
|
||||
country = COALESCE($3, country),
|
||||
updated_at = NOW()
|
||||
WHERE id = $1
|
||||
`;
|
||||
|
||||
await this.postgresClient.query(query, [
|
||||
exchangeId,
|
||||
qmExchange.exchangeShortName || qmExchange.name,
|
||||
qmExchange.countryCode,
|
||||
]);
|
||||
}
|
||||
|
||||
private async updateSyncStatus(provider: string, dataType: string, count: number): Promise<void> {
|
||||
const query = `
|
||||
UPDATE sync_status
|
||||
SET last_sync_at = NOW(),
|
||||
last_sync_count = $3,
|
||||
sync_errors = NULL,
|
||||
updated_at = NOW()
|
||||
WHERE provider = $1 AND data_type = $2
|
||||
`;
|
||||
|
||||
await this.postgresClient.query(query, [provider, dataType, count]);
|
||||
}
|
||||
}
|
||||
|
||||
// Export singleton instance
|
||||
export const syncManager = new SyncManager();
|
||||
Loading…
Add table
Add a link
Reference in a new issue