237 lines
7 KiB
TypeScript
237 lines
7 KiB
TypeScript
import { getLogger } from '@stock-bot/logger';
|
|
import type { IServiceContainer } from '@stock-bot/handlers';
|
|
import type { JobPayload, SyncResult } from '../../../types/job-payloads';
|
|
|
|
const logger = getLogger('enhanced-sync-symbols-from-provider');
|
|
|
|
export async function syncSymbolsFromProvider(
|
|
payload: JobPayload,
|
|
container: IServiceContainer
|
|
): Promise<SyncResult> {
|
|
const provider = payload.provider;
|
|
const clearFirst = payload.clearFirst || false;
|
|
|
|
if (!provider) {
|
|
throw new Error('Provider is required in payload');
|
|
}
|
|
|
|
logger.info(`Starting ${provider} symbols sync...`, { clearFirst });
|
|
|
|
const result: SyncResult = {
|
|
processed: 0,
|
|
created: 0,
|
|
updated: 0,
|
|
skipped: 0,
|
|
errors: 0,
|
|
};
|
|
|
|
try {
|
|
const mongoClient = container.mongodb;
|
|
const postgresClient = container.postgres;
|
|
|
|
// Clear existing data if requested (only symbols and mappings, keep exchanges)
|
|
if (clearFirst) {
|
|
await postgresClient.query('BEGIN');
|
|
await postgresClient.query('DELETE FROM provider_mappings');
|
|
await postgresClient.query('DELETE FROM symbols');
|
|
await postgresClient.query('COMMIT');
|
|
logger.info('Cleared existing symbols and mappings before sync');
|
|
}
|
|
|
|
// Start transaction
|
|
await postgresClient.query('BEGIN');
|
|
|
|
let symbols: Record<string, unknown>[] = [];
|
|
|
|
// Get symbols based on provider
|
|
const db = mongoClient.getDatabase();
|
|
switch (provider.toLowerCase()) {
|
|
case 'qm':
|
|
symbols = await db.collection('qmSymbols').find({}).toArray();
|
|
break;
|
|
case 'eod':
|
|
symbols = await db.collection('eodSymbols').find({}).toArray();
|
|
break;
|
|
case 'ib':
|
|
symbols = await db.collection('ibSymbols').find({}).toArray();
|
|
break;
|
|
default:
|
|
throw new Error(`Unsupported provider: ${provider}`);
|
|
}
|
|
|
|
logger.info(`Found ${symbols.length} ${provider} symbols to process`);
|
|
result.processed = symbols.length;
|
|
|
|
for (const symbol of symbols) {
|
|
try {
|
|
await processSingleSymbol(symbol, provider, result, container);
|
|
} catch (error) {
|
|
logger.error('Failed to process symbol', {
|
|
error,
|
|
symbol: symbol.symbol || symbol.code,
|
|
provider,
|
|
});
|
|
result.errors++;
|
|
}
|
|
}
|
|
|
|
// Update sync status
|
|
await updateSyncStatus(provider, 'symbols', result.processed, container.postgres);
|
|
|
|
await postgresClient.query('COMMIT');
|
|
|
|
logger.info(`${provider} symbols sync completed`, result);
|
|
return result;
|
|
} catch (error) {
|
|
await container.postgres.query('ROLLBACK');
|
|
logger.error(`${provider} symbols sync failed`, { error });
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
async function processSingleSymbol(
|
|
symbol: any,
|
|
provider: string,
|
|
result: SyncResult,
|
|
container: IServiceContainer
|
|
): Promise<void> {
|
|
const symbolCode = symbol.symbol || symbol.code;
|
|
const exchangeCode = symbol.exchangeCode || symbol.exchange || symbol.exchange_id;
|
|
|
|
if (!symbolCode || !exchangeCode) {
|
|
result.skipped++;
|
|
return;
|
|
}
|
|
|
|
// Find active provider exchange mapping
|
|
const providerMapping = await findActiveProviderExchangeMapping(provider, exchangeCode, container);
|
|
|
|
if (!providerMapping) {
|
|
result.skipped++;
|
|
return;
|
|
}
|
|
|
|
// Check if symbol exists
|
|
const existingSymbol = await findSymbolByCodeAndExchange(
|
|
symbolCode,
|
|
providerMapping.master_exchange_id,
|
|
container
|
|
);
|
|
|
|
if (existingSymbol) {
|
|
await updateSymbol(existingSymbol.id, symbol, container);
|
|
await upsertProviderMapping(existingSymbol.id, provider, symbol, container);
|
|
result.updated++;
|
|
} else {
|
|
const newSymbolId = await createSymbol(symbol, providerMapping.master_exchange_id, container);
|
|
await upsertProviderMapping(newSymbolId, provider, symbol, container);
|
|
result.created++;
|
|
}
|
|
}
|
|
|
|
async function findActiveProviderExchangeMapping(
|
|
provider: string,
|
|
providerExchangeCode: string,
|
|
container: IServiceContainer
|
|
): Promise<any> {
|
|
const postgresClient = container.postgres;
|
|
const query = `
|
|
SELECT pem.*, e.code as master_exchange_code
|
|
FROM provider_exchange_mappings pem
|
|
JOIN exchanges e ON pem.master_exchange_id = e.id
|
|
WHERE pem.provider = $1 AND pem.provider_exchange_code = $2 AND pem.active = true
|
|
`;
|
|
const result = await postgresClient.query(query, [provider, providerExchangeCode]);
|
|
return result.rows[0] || null;
|
|
}
|
|
|
|
async function findSymbolByCodeAndExchange(symbol: string, exchangeId: string, container: IServiceContainer): Promise<any> {
|
|
const postgresClient = container.postgres;
|
|
const query = 'SELECT * FROM symbols WHERE symbol = $1 AND exchange_id = $2';
|
|
const result = await postgresClient.query(query, [symbol, exchangeId]);
|
|
return result.rows[0] || null;
|
|
}
|
|
|
|
async function createSymbol(symbol: any, exchangeId: string, container: IServiceContainer): Promise<string> {
|
|
const postgresClient = container.postgres;
|
|
const query = `
|
|
INSERT INTO symbols (symbol, exchange_id, company_name, country, currency)
|
|
VALUES ($1, $2, $3, $4, $5)
|
|
RETURNING id
|
|
`;
|
|
|
|
const result = await postgresClient.query(query, [
|
|
symbol.symbol || symbol.code,
|
|
exchangeId,
|
|
symbol.companyName || symbol.name || symbol.company_name,
|
|
symbol.countryCode || symbol.country_code || 'US',
|
|
symbol.currency || 'USD',
|
|
]);
|
|
|
|
return result.rows[0].id;
|
|
}
|
|
|
|
async function updateSymbol(symbolId: string, symbol: any, container: IServiceContainer): Promise<void> {
|
|
const postgresClient = container.postgres;
|
|
const query = `
|
|
UPDATE symbols
|
|
SET company_name = COALESCE($2, company_name),
|
|
country = COALESCE($3, country),
|
|
currency = COALESCE($4, currency),
|
|
updated_at = NOW()
|
|
WHERE id = $1
|
|
`;
|
|
|
|
await postgresClient.query(query, [
|
|
symbolId,
|
|
symbol.companyName || symbol.name || symbol.company_name,
|
|
symbol.countryCode || symbol.country_code,
|
|
symbol.currency,
|
|
]);
|
|
}
|
|
|
|
async function upsertProviderMapping(
|
|
symbolId: string,
|
|
provider: string,
|
|
symbol: any,
|
|
container: IServiceContainer
|
|
): Promise<void> {
|
|
const postgresClient = container.postgres;
|
|
const query = `
|
|
INSERT INTO provider_mappings
|
|
(symbol_id, provider, provider_symbol, provider_exchange, last_seen)
|
|
VALUES ($1, $2, $3, $4, NOW())
|
|
ON CONFLICT (provider, provider_symbol)
|
|
DO UPDATE SET
|
|
symbol_id = EXCLUDED.symbol_id,
|
|
provider_exchange = EXCLUDED.provider_exchange,
|
|
last_seen = NOW()
|
|
`;
|
|
|
|
await postgresClient.query(query, [
|
|
symbolId,
|
|
provider,
|
|
symbol.qmSearchCode || symbol.symbol || symbol.code,
|
|
symbol.exchangeCode || symbol.exchange || symbol.exchange_id,
|
|
]);
|
|
}
|
|
|
|
async function updateSyncStatus(
|
|
provider: string,
|
|
dataType: string,
|
|
count: number,
|
|
postgresClient: any
|
|
): Promise<void> {
|
|
const query = `
|
|
INSERT INTO sync_status (provider, data_type, last_sync_at, last_sync_count, sync_errors)
|
|
VALUES ($1, $2, NOW(), $3, NULL)
|
|
ON CONFLICT (provider, data_type)
|
|
DO UPDATE SET
|
|
last_sync_at = NOW(),
|
|
last_sync_count = EXCLUDED.last_sync_count,
|
|
sync_errors = NULL,
|
|
updated_at = NOW()
|
|
`;
|
|
|
|
await postgresClient.query(query, [provider, dataType, count]);
|
|
}
|