refactored monorepo for more projects
This commit is contained in:
parent
4632c174dc
commit
9492f1b15e
180 changed files with 1438 additions and 424 deletions
34
apps/stock/data-pipeline/src/container-setup.ts
Normal file
34
apps/stock/data-pipeline/src/container-setup.ts
Normal file
|
|
@ -0,0 +1,34 @@
|
|||
/**
|
||||
* Service Container Setup for Data Pipeline
|
||||
* Configures dependency injection for the data pipeline service
|
||||
*/
|
||||
|
||||
import type { IServiceContainer } from '@stock-bot/handlers';
|
||||
import { getLogger } from '@stock-bot/logger';
|
||||
import type { AppConfig } from '@stock-bot/config';
|
||||
|
||||
const logger = getLogger('data-pipeline-container');
|
||||
|
||||
/**
|
||||
* Configure the service container for data pipeline workloads
|
||||
*/
|
||||
export function setupServiceContainer(
|
||||
config: AppConfig,
|
||||
container: IServiceContainer
|
||||
): IServiceContainer {
|
||||
logger.info('Configuring data pipeline service container...');
|
||||
|
||||
// Data pipeline specific configuration
|
||||
// This service does more complex queries and transformations
|
||||
const poolSizes = {
|
||||
mongodb: config.environment === 'production' ? 40 : 20,
|
||||
postgres: config.environment === 'production' ? 50 : 25,
|
||||
cache: config.environment === 'production' ? 30 : 15,
|
||||
};
|
||||
|
||||
logger.info('Data pipeline pool sizes configured', poolSizes);
|
||||
|
||||
// The container is already configured with connections
|
||||
// Just return it with our logging
|
||||
return container;
|
||||
}
|
||||
|
|
@ -0,0 +1,74 @@
|
|||
import { getLogger } from '@stock-bot/logger';
|
||||
import { handlerRegistry, createJobHandler, type HandlerConfig, type ScheduledJobConfig } from '@stock-bot/queue';
|
||||
import type { IServiceContainer } from '@stock-bot/handlers';
|
||||
import { exchangeOperations } from './operations';
|
||||
|
||||
const logger = getLogger('exchanges-handler');
|
||||
|
||||
const HANDLER_NAME = 'exchanges';
|
||||
|
||||
const exchangesHandlerConfig: HandlerConfig = {
|
||||
concurrency: 1,
|
||||
maxAttempts: 3,
|
||||
scheduledJobs: [
|
||||
{
|
||||
operation: 'sync-all-exchanges',
|
||||
cronPattern: '0 0 * * 0', // Weekly on Sunday at midnight
|
||||
payload: { clearFirst: true },
|
||||
priority: 10,
|
||||
immediately: false,
|
||||
} as ScheduledJobConfig,
|
||||
{
|
||||
operation: 'sync-qm-exchanges',
|
||||
cronPattern: '0 1 * * *', // Daily at 1 AM
|
||||
payload: {},
|
||||
priority: 5,
|
||||
immediately: false,
|
||||
} as ScheduledJobConfig,
|
||||
{
|
||||
operation: 'sync-ib-exchanges',
|
||||
cronPattern: '0 3 * * *', // Daily at 3 AM
|
||||
payload: {},
|
||||
priority: 3,
|
||||
immediately: false,
|
||||
} as ScheduledJobConfig,
|
||||
{
|
||||
operation: 'sync-qm-provider-mappings',
|
||||
cronPattern: '0 3 * * *', // Daily at 3 AM
|
||||
payload: {},
|
||||
priority: 7,
|
||||
immediately: false,
|
||||
} as ScheduledJobConfig,
|
||||
],
|
||||
operations: {
|
||||
'sync-all-exchanges': exchangeOperations.syncAllExchanges,
|
||||
'sync-qm-exchanges': exchangeOperations.syncQMExchanges,
|
||||
'sync-ib-exchanges': exchangeOperations.syncIBExchanges,
|
||||
'sync-qm-provider-mappings': exchangeOperations.syncQMProviderMappings,
|
||||
'clear-postgresql-data': exchangeOperations.clearPostgreSQLData,
|
||||
'get-exchange-stats': exchangeOperations.getExchangeStats,
|
||||
'get-provider-mapping-stats': exchangeOperations.getProviderMappingStats,
|
||||
'enhanced-sync-status': exchangeOperations['enhanced-sync-status'],
|
||||
},
|
||||
};
|
||||
|
||||
export function initializeExchangesHandler(container: IServiceContainer) {
|
||||
logger.info('Registering exchanges handler...');
|
||||
|
||||
// Update operations to use container
|
||||
const containerAwareOperations = Object.entries(exchangeOperations).reduce((acc, [key, operation]) => {
|
||||
acc[key] = createJobHandler(async (payload: any) => {
|
||||
return operation(payload, container);
|
||||
});
|
||||
return acc;
|
||||
}, {} as Record<string, any>);
|
||||
|
||||
const exchangesHandlerConfigWithContainer: HandlerConfig = {
|
||||
...exchangesHandlerConfig,
|
||||
operations: containerAwareOperations,
|
||||
};
|
||||
|
||||
handlerRegistry.register(HANDLER_NAME, exchangesHandlerConfigWithContainer);
|
||||
logger.info('Exchanges handler registered successfully');
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,61 @@
|
|||
import { getLogger } from '@stock-bot/logger';
|
||||
import type { IServiceContainer } from '@stock-bot/handlers';
|
||||
import type { JobPayload } from '../../../types/job-payloads';
|
||||
|
||||
const logger = getLogger('enhanced-sync-clear-postgresql-data');
|
||||
|
||||
export async function clearPostgreSQLData(
|
||||
payload: JobPayload,
|
||||
container: IServiceContainer
|
||||
): Promise<{
|
||||
exchangesCleared: number;
|
||||
symbolsCleared: number;
|
||||
mappingsCleared: number;
|
||||
}> {
|
||||
logger.info('Clearing existing PostgreSQL data...');
|
||||
|
||||
try {
|
||||
const postgresClient = container.postgres;
|
||||
|
||||
// Start transaction for atomic operations
|
||||
await postgresClient.query('BEGIN');
|
||||
|
||||
// Get counts before clearing
|
||||
const exchangeCountResult = await postgresClient.query(
|
||||
'SELECT COUNT(*) as count FROM exchanges'
|
||||
);
|
||||
const symbolCountResult = await postgresClient.query('SELECT COUNT(*) as count FROM symbols');
|
||||
const mappingCountResult = await postgresClient.query(
|
||||
'SELECT COUNT(*) as count FROM provider_mappings'
|
||||
);
|
||||
|
||||
const exchangesCleared = parseInt(exchangeCountResult.rows[0].count);
|
||||
const symbolsCleared = parseInt(symbolCountResult.rows[0].count);
|
||||
const mappingsCleared = parseInt(mappingCountResult.rows[0].count);
|
||||
|
||||
// Clear data in correct order (respect foreign keys)
|
||||
await postgresClient.query('DELETE FROM provider_mappings');
|
||||
await postgresClient.query('DELETE FROM symbols');
|
||||
await postgresClient.query('DELETE FROM exchanges');
|
||||
|
||||
// Reset sync status
|
||||
await postgresClient.query(
|
||||
'UPDATE sync_status SET last_sync_at = NULL, last_sync_count = 0, sync_errors = NULL'
|
||||
);
|
||||
|
||||
await postgresClient.query('COMMIT');
|
||||
|
||||
logger.info('PostgreSQL data cleared successfully', {
|
||||
exchangesCleared,
|
||||
symbolsCleared,
|
||||
mappingsCleared,
|
||||
});
|
||||
|
||||
return { exchangesCleared, symbolsCleared, mappingsCleared };
|
||||
} catch (error) {
|
||||
const postgresClient = container.postgres;
|
||||
await postgresClient.query('ROLLBACK');
|
||||
logger.error('Failed to clear PostgreSQL data', { error });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,29 @@
|
|||
import { getLogger } from '@stock-bot/logger';
|
||||
import type { IServiceContainer } from '@stock-bot/handlers';
|
||||
import type { JobPayload, SyncStatus } from '../../../types/job-payloads';
|
||||
|
||||
const logger = getLogger('enhanced-sync-status');
|
||||
|
||||
export async function getSyncStatus(
|
||||
payload: JobPayload,
|
||||
container: IServiceContainer
|
||||
): Promise<SyncStatus[]> {
|
||||
logger.info('Getting comprehensive sync status...');
|
||||
|
||||
try {
|
||||
const postgresClient = container.postgres;
|
||||
const query = `
|
||||
SELECT provider, data_type as "dataType", last_sync_at as "lastSyncAt",
|
||||
last_sync_count as "lastSyncCount", sync_errors as "syncErrors"
|
||||
FROM sync_status
|
||||
ORDER BY provider, data_type
|
||||
`;
|
||||
const result = await postgresClient.query(query);
|
||||
|
||||
logger.info(`Retrieved sync status for ${result.rows.length} entries`);
|
||||
return result.rows;
|
||||
} catch (error) {
|
||||
logger.error('Failed to get sync status', { error });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,31 @@
|
|||
import { getLogger } from '@stock-bot/logger';
|
||||
import type { IServiceContainer } from '@stock-bot/handlers';
|
||||
import type { JobPayload } from '../../../types/job-payloads';
|
||||
|
||||
const logger = getLogger('enhanced-sync-exchange-stats');
|
||||
|
||||
export async function getExchangeStats(
|
||||
payload: JobPayload,
|
||||
container: IServiceContainer
|
||||
): Promise<any> {
|
||||
logger.info('Getting exchange statistics...');
|
||||
|
||||
try {
|
||||
const postgresClient = container.postgres;
|
||||
const query = `
|
||||
SELECT
|
||||
COUNT(*) as total_exchanges,
|
||||
COUNT(CASE WHEN active = true THEN 1 END) as active_exchanges,
|
||||
COUNT(DISTINCT country) as countries,
|
||||
COUNT(DISTINCT currency) as currencies
|
||||
FROM exchanges
|
||||
`;
|
||||
const result = await postgresClient.query(query);
|
||||
|
||||
logger.info('Retrieved exchange statistics');
|
||||
return result.rows[0];
|
||||
} catch (error) {
|
||||
logger.error('Failed to get exchange statistics', { error });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,19 @@
|
|||
import { clearPostgreSQLData } from './clear-postgresql-data.operations';
|
||||
import { getSyncStatus } from './enhanced-sync-status.operations';
|
||||
import { getExchangeStats } from './exchange-stats.operations';
|
||||
import { getProviderMappingStats } from './provider-mapping-stats.operations';
|
||||
import { syncQMExchanges } from './qm-exchanges.operations';
|
||||
import { syncAllExchanges } from './sync-all-exchanges.operations';
|
||||
import { syncIBExchanges } from './sync-ib-exchanges.operations';
|
||||
import { syncQMProviderMappings } from './sync-qm-provider-mappings.operations';
|
||||
|
||||
export const exchangeOperations = {
|
||||
syncAllExchanges,
|
||||
syncQMExchanges,
|
||||
syncIBExchanges,
|
||||
syncQMProviderMappings,
|
||||
clearPostgreSQLData,
|
||||
getExchangeStats,
|
||||
getProviderMappingStats,
|
||||
'enhanced-sync-status': getSyncStatus,
|
||||
};
|
||||
|
|
@ -0,0 +1,35 @@
|
|||
import { getLogger } from '@stock-bot/logger';
|
||||
import type { IServiceContainer } from '@stock-bot/handlers';
|
||||
import type { JobPayload } from '../../../types/job-payloads';
|
||||
|
||||
const logger = getLogger('enhanced-sync-provider-mapping-stats');
|
||||
|
||||
export async function getProviderMappingStats(
|
||||
payload: JobPayload,
|
||||
container: IServiceContainer
|
||||
): Promise<any> {
|
||||
logger.info('Getting provider mapping statistics...');
|
||||
|
||||
try {
|
||||
const postgresClient = container.postgres;
|
||||
const query = `
|
||||
SELECT
|
||||
provider,
|
||||
COUNT(*) as total_mappings,
|
||||
COUNT(CASE WHEN active = true THEN 1 END) as active_mappings,
|
||||
COUNT(CASE WHEN verified = true THEN 1 END) as verified_mappings,
|
||||
COUNT(CASE WHEN auto_mapped = true THEN 1 END) as auto_mapped,
|
||||
AVG(confidence) as avg_confidence
|
||||
FROM provider_exchange_mappings
|
||||
GROUP BY provider
|
||||
ORDER BY provider
|
||||
`;
|
||||
const result = await postgresClient.query(query);
|
||||
|
||||
logger.info('Retrieved provider mapping statistics');
|
||||
return result.rows;
|
||||
} catch (error) {
|
||||
logger.error('Failed to get provider mapping statistics', { error });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,114 @@
|
|||
import { getLogger } from '@stock-bot/logger';
|
||||
import type { IServiceContainer } from '@stock-bot/handlers';
|
||||
import type { JobPayload } from '../../../types/job-payloads';
|
||||
|
||||
const logger = getLogger('sync-qm-exchanges');
|
||||
|
||||
export async function syncQMExchanges(
|
||||
payload: JobPayload,
|
||||
container: IServiceContainer
|
||||
): Promise<{ processed: number; created: number; updated: number }> {
|
||||
logger.info('Starting QM exchanges sync...');
|
||||
|
||||
try {
|
||||
const mongoClient = container.mongodb;
|
||||
const postgresClient = container.postgres;
|
||||
|
||||
// 1. Get all QM exchanges from MongoDB
|
||||
const qmExchanges = await mongoClient.find('qmExchanges', {});
|
||||
logger.info(`Found ${qmExchanges.length} QM exchanges to process`);
|
||||
|
||||
let created = 0;
|
||||
let updated = 0;
|
||||
|
||||
for (const exchange of qmExchanges) {
|
||||
try {
|
||||
// 2. Check if exchange exists
|
||||
const existingExchange = await findExchange(exchange.exchangeCode, postgresClient);
|
||||
|
||||
if (existingExchange) {
|
||||
// Update existing
|
||||
await updateExchange(existingExchange.id, exchange, postgresClient);
|
||||
updated++;
|
||||
} else {
|
||||
// Create new
|
||||
await createExchange(exchange, postgresClient);
|
||||
created++;
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Failed to process exchange', { error, exchange: exchange.exchangeCode });
|
||||
}
|
||||
}
|
||||
|
||||
// 3. Update sync status
|
||||
await updateSyncStatus('qm', 'exchanges', qmExchanges.length, postgresClient);
|
||||
|
||||
const result = { processed: qmExchanges.length, created, updated };
|
||||
logger.info('QM exchanges sync completed', result);
|
||||
return result;
|
||||
} catch (error) {
|
||||
logger.error('QM exchanges sync failed', { error });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
// Helper functions
|
||||
async function findExchange(exchangeCode: string, postgresClient: any): Promise<any> {
|
||||
const query = 'SELECT * FROM exchanges WHERE code = $1';
|
||||
const result = await postgresClient.query(query, [exchangeCode]);
|
||||
return result.rows[0] || null;
|
||||
}
|
||||
|
||||
async function createExchange(qmExchange: any, postgresClient: any): Promise<void> {
|
||||
const query = `
|
||||
INSERT INTO exchanges (code, name, country, currency, visible)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
ON CONFLICT (code) DO NOTHING
|
||||
`;
|
||||
|
||||
await postgresClient.query(query, [
|
||||
qmExchange.exchangeCode || qmExchange.exchange,
|
||||
qmExchange.exchangeShortName || qmExchange.name,
|
||||
qmExchange.countryCode || 'US',
|
||||
'USD', // Default currency, can be improved
|
||||
true, // New exchanges are visible by default
|
||||
]);
|
||||
}
|
||||
|
||||
async function updateExchange(
|
||||
exchangeId: string,
|
||||
qmExchange: any,
|
||||
postgresClient: any
|
||||
): Promise<void> {
|
||||
const query = `
|
||||
UPDATE exchanges
|
||||
SET name = COALESCE($2, name),
|
||||
country = COALESCE($3, country),
|
||||
updated_at = NOW()
|
||||
WHERE id = $1
|
||||
`;
|
||||
|
||||
await postgresClient.query(query, [
|
||||
exchangeId,
|
||||
qmExchange.exchangeShortName || qmExchange.name,
|
||||
qmExchange.countryCode,
|
||||
]);
|
||||
}
|
||||
|
||||
async function updateSyncStatus(
|
||||
provider: string,
|
||||
dataType: string,
|
||||
count: number,
|
||||
postgresClient: any
|
||||
): Promise<void> {
|
||||
const query = `
|
||||
UPDATE sync_status
|
||||
SET last_sync_at = NOW(),
|
||||
last_sync_count = $3,
|
||||
sync_errors = NULL,
|
||||
updated_at = NOW()
|
||||
WHERE provider = $1 AND data_type = $2
|
||||
`;
|
||||
|
||||
await postgresClient.query(query, [provider, dataType, count]);
|
||||
}
|
||||
|
|
@ -0,0 +1,282 @@
|
|||
import { getLogger } from '@stock-bot/logger';
|
||||
import type { IServiceContainer } from '@stock-bot/handlers';
|
||||
import type { JobPayload, SyncResult } from '../../../types/job-payloads';
|
||||
|
||||
const logger = getLogger('enhanced-sync-all-exchanges');
|
||||
|
||||
export async function syncAllExchanges(payload: JobPayload, container: IServiceContainer): Promise<SyncResult> {
|
||||
const clearFirst = payload.clearFirst || true;
|
||||
logger.info('Starting comprehensive exchange sync...', { clearFirst });
|
||||
|
||||
const result: SyncResult = {
|
||||
processed: 0,
|
||||
created: 0,
|
||||
updated: 0,
|
||||
skipped: 0,
|
||||
errors: 0,
|
||||
};
|
||||
|
||||
try {
|
||||
const postgresClient = container.postgres;
|
||||
|
||||
// Clear existing data if requested
|
||||
if (clearFirst) {
|
||||
await clearPostgreSQLData(postgresClient);
|
||||
}
|
||||
|
||||
// Start transaction for atomic operations
|
||||
await postgresClient.query('BEGIN');
|
||||
|
||||
// 1. Sync from EOD exchanges (comprehensive global data)
|
||||
const eodResult = await syncEODExchanges(container);
|
||||
mergeResults(result, eodResult);
|
||||
|
||||
// 2. Sync from IB exchanges (detailed asset information)
|
||||
const ibResult = await syncIBExchanges(container);
|
||||
mergeResults(result, ibResult);
|
||||
|
||||
// 3. Update sync status
|
||||
await updateSyncStatus('all', 'exchanges', result.processed, postgresClient);
|
||||
|
||||
await postgresClient.query('COMMIT');
|
||||
|
||||
logger.info('Comprehensive exchange sync completed', result);
|
||||
return result;
|
||||
} catch (error) {
|
||||
const postgresClient = container.postgres;
|
||||
await postgresClient.query('ROLLBACK');
|
||||
logger.error('Comprehensive exchange sync failed', { error });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
async function clearPostgreSQLData(postgresClient: any): Promise<void> {
|
||||
logger.info('Clearing existing PostgreSQL data...');
|
||||
|
||||
// Clear data in correct order (respect foreign keys)
|
||||
await postgresClient.query('DELETE FROM provider_mappings');
|
||||
await postgresClient.query('DELETE FROM symbols');
|
||||
await postgresClient.query('DELETE FROM exchanges');
|
||||
|
||||
// Reset sync status
|
||||
await postgresClient.query(
|
||||
'UPDATE sync_status SET last_sync_at = NULL, last_sync_count = 0, sync_errors = NULL'
|
||||
);
|
||||
|
||||
logger.info('PostgreSQL data cleared successfully');
|
||||
}
|
||||
|
||||
async function syncEODExchanges(container: IServiceContainer): Promise<SyncResult> {
|
||||
const mongoClient = container.mongodb;
|
||||
const exchanges = await mongoClient.find('eodExchanges', { active: true });
|
||||
const result: SyncResult = { processed: 0, created: 0, updated: 0, skipped: 0, errors: 0 };
|
||||
|
||||
for (const exchange of exchanges) {
|
||||
try {
|
||||
// Create provider exchange mapping for EOD
|
||||
await createProviderExchangeMapping(
|
||||
'eod', // provider
|
||||
exchange.Code,
|
||||
exchange.Name,
|
||||
exchange.CountryISO2,
|
||||
exchange.Currency,
|
||||
0.95, // very high confidence for EOD data
|
||||
container
|
||||
);
|
||||
|
||||
result.processed++;
|
||||
result.created++; // Count as created mapping
|
||||
} catch (error) {
|
||||
logger.error('Failed to process EOD exchange', { error, exchange });
|
||||
result.errors++;
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
async function syncIBExchanges(container: IServiceContainer): Promise<SyncResult> {
|
||||
const mongoClient = container.mongodb;
|
||||
const exchanges = await mongoClient.find('ibExchanges', {});
|
||||
const result: SyncResult = { processed: 0, created: 0, updated: 0, skipped: 0, errors: 0 };
|
||||
|
||||
for (const exchange of exchanges) {
|
||||
try {
|
||||
// Create provider exchange mapping for IB
|
||||
await createProviderExchangeMapping(
|
||||
'ib', // provider
|
||||
exchange.exchange_id,
|
||||
exchange.name,
|
||||
exchange.country_code,
|
||||
'USD', // IB doesn't specify currency, default to USD
|
||||
0.85, // good confidence for IB data
|
||||
container
|
||||
);
|
||||
|
||||
result.processed++;
|
||||
result.created++; // Count as created mapping
|
||||
} catch (error) {
|
||||
logger.error('Failed to process IB exchange', { error, exchange });
|
||||
result.errors++;
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
async function createProviderExchangeMapping(
|
||||
provider: string,
|
||||
providerExchangeCode: string,
|
||||
providerExchangeName: string,
|
||||
countryCode: string | null,
|
||||
currency: string | null,
|
||||
confidence: number,
|
||||
container: IServiceContainer
|
||||
): Promise<void> {
|
||||
if (!providerExchangeCode) {
|
||||
return;
|
||||
}
|
||||
|
||||
const postgresClient = container.postgres;
|
||||
|
||||
// Check if mapping already exists
|
||||
const existingMapping = await findProviderExchangeMapping(provider, providerExchangeCode, container);
|
||||
if (existingMapping) {
|
||||
// Don't override existing mappings to preserve manual work
|
||||
return;
|
||||
}
|
||||
|
||||
// Find or create master exchange
|
||||
const masterExchange = await findOrCreateMasterExchange(
|
||||
providerExchangeCode,
|
||||
providerExchangeName,
|
||||
countryCode,
|
||||
currency,
|
||||
container
|
||||
);
|
||||
|
||||
// Create the provider exchange mapping
|
||||
const query = `
|
||||
INSERT INTO provider_exchange_mappings
|
||||
(provider, provider_exchange_code, provider_exchange_name, master_exchange_id,
|
||||
country_code, currency, confidence, active, auto_mapped)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, false, true)
|
||||
ON CONFLICT (provider, provider_exchange_code) DO NOTHING
|
||||
`;
|
||||
|
||||
await postgresClient.query(query, [
|
||||
provider,
|
||||
providerExchangeCode,
|
||||
providerExchangeName,
|
||||
masterExchange.id,
|
||||
countryCode,
|
||||
currency,
|
||||
confidence,
|
||||
]);
|
||||
}
|
||||
|
||||
async function findOrCreateMasterExchange(
|
||||
providerCode: string,
|
||||
providerName: string,
|
||||
countryCode: string | null,
|
||||
currency: string | null,
|
||||
container: IServiceContainer
|
||||
): Promise<any> {
|
||||
const postgresClient = container.postgres;
|
||||
|
||||
// First, try to find exact match
|
||||
let masterExchange = await findExchangeByCode(providerCode, container);
|
||||
|
||||
if (masterExchange) {
|
||||
return masterExchange;
|
||||
}
|
||||
|
||||
// Try to find by similar codes (basic mapping)
|
||||
const basicMapping = getBasicExchangeMapping(providerCode);
|
||||
if (basicMapping) {
|
||||
masterExchange = await findExchangeByCode(basicMapping, container);
|
||||
if (masterExchange) {
|
||||
return masterExchange;
|
||||
}
|
||||
}
|
||||
|
||||
// Create new master exchange (inactive by default)
|
||||
const query = `
|
||||
INSERT INTO exchanges (code, name, country, currency, active)
|
||||
VALUES ($1, $2, $3, $4, false)
|
||||
ON CONFLICT (code) DO UPDATE SET
|
||||
name = COALESCE(EXCLUDED.name, exchanges.name),
|
||||
country = COALESCE(EXCLUDED.country, exchanges.country),
|
||||
currency = COALESCE(EXCLUDED.currency, exchanges.currency)
|
||||
RETURNING id, code, name, country, currency
|
||||
`;
|
||||
|
||||
const result = await postgresClient.query(query, [
|
||||
providerCode,
|
||||
providerName || providerCode,
|
||||
countryCode || 'US',
|
||||
currency || 'USD',
|
||||
]);
|
||||
|
||||
return result.rows[0];
|
||||
}
|
||||
|
||||
function getBasicExchangeMapping(providerCode: string): string | null {
|
||||
const mappings: Record<string, string> = {
|
||||
NYE: 'NYSE',
|
||||
NAS: 'NASDAQ',
|
||||
TO: 'TSX',
|
||||
LN: 'LSE',
|
||||
LON: 'LSE',
|
||||
};
|
||||
|
||||
return mappings[providerCode.toUpperCase()] || null;
|
||||
}
|
||||
|
||||
async function findProviderExchangeMapping(
|
||||
provider: string,
|
||||
providerExchangeCode: string,
|
||||
container: IServiceContainer
|
||||
): Promise<any> {
|
||||
const postgresClient = container.postgres;
|
||||
const query =
|
||||
'SELECT * FROM provider_exchange_mappings WHERE provider = $1 AND provider_exchange_code = $2';
|
||||
const result = await postgresClient.query(query, [provider, providerExchangeCode]);
|
||||
return result.rows[0] || null;
|
||||
}
|
||||
|
||||
async function findExchangeByCode(code: string, container: IServiceContainer): Promise<any> {
|
||||
const postgresClient = container.postgres;
|
||||
const query = 'SELECT * FROM exchanges WHERE code = $1';
|
||||
const result = await postgresClient.query(query, [code]);
|
||||
return result.rows[0] || null;
|
||||
}
|
||||
|
||||
async function updateSyncStatus(
|
||||
provider: string,
|
||||
dataType: string,
|
||||
count: number,
|
||||
postgresClient: any
|
||||
): Promise<void> {
|
||||
const query = `
|
||||
INSERT INTO sync_status (provider, data_type, last_sync_at, last_sync_count, sync_errors)
|
||||
VALUES ($1, $2, NOW(), $3, NULL)
|
||||
ON CONFLICT (provider, data_type)
|
||||
DO UPDATE SET
|
||||
last_sync_at = NOW(),
|
||||
last_sync_count = EXCLUDED.last_sync_count,
|
||||
sync_errors = NULL,
|
||||
updated_at = NOW()
|
||||
`;
|
||||
|
||||
await postgresClient.query(query, [provider, dataType, count]);
|
||||
}
|
||||
|
||||
function mergeResults(target: SyncResult, source: SyncResult): void {
|
||||
target.processed += source.processed;
|
||||
target.created += source.created;
|
||||
target.updated += source.updated;
|
||||
target.skipped += source.skipped;
|
||||
target.errors += source.errors;
|
||||
}
|
||||
|
|
@ -0,0 +1,209 @@
|
|||
import { getLogger } from '@stock-bot/logger';
|
||||
import type { MasterExchange } from '@stock-bot/mongodb';
|
||||
import type { IServiceContainer } from '@stock-bot/handlers';
|
||||
import type { JobPayload } from '../../../types/job-payloads';
|
||||
|
||||
const logger = getLogger('sync-ib-exchanges');
|
||||
|
||||
interface IBExchange {
|
||||
id?: string;
|
||||
_id?: any;
|
||||
name?: string;
|
||||
code?: string;
|
||||
country_code?: string;
|
||||
currency?: string;
|
||||
}
|
||||
|
||||
export async function syncIBExchanges(
|
||||
payload: JobPayload,
|
||||
container: IServiceContainer
|
||||
): Promise<{ syncedCount: number; totalExchanges: number }> {
|
||||
logger.info('Syncing IB exchanges from database...');
|
||||
|
||||
try {
|
||||
const mongoClient = container.mongodb;
|
||||
const db = mongoClient.getDatabase();
|
||||
|
||||
// Filter by country code US and CA
|
||||
const ibExchanges = await db
|
||||
.collection<IBExchange>('ibExchanges')
|
||||
.find({
|
||||
country_code: { $in: ['US', 'CA'] },
|
||||
})
|
||||
.toArray();
|
||||
|
||||
logger.info('Found IB exchanges in database', { count: ibExchanges.length });
|
||||
|
||||
let syncedCount = 0;
|
||||
|
||||
for (const exchange of ibExchanges) {
|
||||
try {
|
||||
await createOrUpdateMasterExchange(exchange, container);
|
||||
syncedCount++;
|
||||
|
||||
logger.debug('Synced IB exchange', {
|
||||
ibId: exchange.id,
|
||||
country: exchange.country_code,
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error('Failed to sync IB exchange', { exchange: exchange.id, error });
|
||||
}
|
||||
}
|
||||
|
||||
logger.info('IB exchange sync completed', {
|
||||
syncedCount,
|
||||
totalExchanges: ibExchanges.length,
|
||||
});
|
||||
|
||||
return { syncedCount, totalExchanges: ibExchanges.length };
|
||||
} catch (error) {
|
||||
logger.error('Failed to fetch IB exchanges from database', { error });
|
||||
return { syncedCount: 0, totalExchanges: 0 };
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create or update master exchange record 1:1 from IB exchange
|
||||
*/
|
||||
async function createOrUpdateMasterExchange(ibExchange: IBExchange, container: IServiceContainer): Promise<void> {
|
||||
const mongoClient = container.mongodb;
|
||||
const db = mongoClient.getDatabase();
|
||||
const collection = db.collection<MasterExchange>('masterExchanges');
|
||||
|
||||
const masterExchangeId = generateMasterExchangeId(ibExchange);
|
||||
const now = new Date();
|
||||
|
||||
// Check if master exchange already exists
|
||||
const existing = await collection.findOne({ masterExchangeId });
|
||||
|
||||
if (existing) {
|
||||
// Update existing record
|
||||
await collection.updateOne(
|
||||
{ masterExchangeId },
|
||||
{
|
||||
$set: {
|
||||
officialName: ibExchange.name || `Exchange ${ibExchange.id}`,
|
||||
country: ibExchange.country_code || 'UNKNOWN',
|
||||
currency: ibExchange.currency || 'USD',
|
||||
timezone: inferTimezone(ibExchange),
|
||||
updated_at: now,
|
||||
},
|
||||
}
|
||||
);
|
||||
|
||||
logger.debug('Updated existing master exchange', { masterExchangeId });
|
||||
} else {
|
||||
// Create new master exchange
|
||||
const masterExchange: MasterExchange = {
|
||||
masterExchangeId,
|
||||
shortName: masterExchangeId, // Set shortName to masterExchangeId on creation
|
||||
officialName: ibExchange.name || `Exchange ${ibExchange.id}`,
|
||||
country: ibExchange.country_code || 'UNKNOWN',
|
||||
currency: ibExchange.currency || 'USD',
|
||||
timezone: inferTimezone(ibExchange),
|
||||
active: false, // Set active to false only on creation
|
||||
|
||||
sourceMappings: {
|
||||
ib: {
|
||||
id: ibExchange.id || ibExchange._id?.toString() || 'unknown',
|
||||
name: ibExchange.name || `Exchange ${ibExchange.id}`,
|
||||
code: ibExchange.code || ibExchange.id || '',
|
||||
aliases: generateAliases(ibExchange),
|
||||
lastUpdated: now,
|
||||
},
|
||||
},
|
||||
|
||||
confidence: 1.0, // High confidence for direct IB mapping
|
||||
verified: true, // Mark as verified since it's direct from IB
|
||||
|
||||
// DocumentBase fields
|
||||
source: 'ib-exchange-sync',
|
||||
created_at: now,
|
||||
updated_at: now,
|
||||
};
|
||||
|
||||
await collection.insertOne(masterExchange);
|
||||
logger.debug('Created new master exchange', { masterExchangeId });
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate master exchange ID from IB exchange
|
||||
*/
|
||||
function generateMasterExchangeId(ibExchange: IBExchange): string {
|
||||
// Use code if available, otherwise use ID, otherwise generate from name
|
||||
if (ibExchange.code) {
|
||||
return ibExchange.code.toUpperCase().replace(/[^A-Z0-9]/g, '');
|
||||
}
|
||||
|
||||
if (ibExchange.id) {
|
||||
return ibExchange.id.toUpperCase().replace(/[^A-Z0-9]/g, '');
|
||||
}
|
||||
|
||||
if (ibExchange.name) {
|
||||
return ibExchange.name
|
||||
.toUpperCase()
|
||||
.split(' ')
|
||||
.slice(0, 2)
|
||||
.join('_')
|
||||
.replace(/[^A-Z0-9_]/g, '');
|
||||
}
|
||||
|
||||
return 'UNKNOWN_EXCHANGE';
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate aliases for the exchange
|
||||
*/
|
||||
function generateAliases(ibExchange: IBExchange): string[] {
|
||||
const aliases: string[] = [];
|
||||
|
||||
if (ibExchange.name && ibExchange.name.includes(' ')) {
|
||||
// Add abbreviated version
|
||||
aliases.push(
|
||||
ibExchange.name
|
||||
.split(' ')
|
||||
.map(w => w[0])
|
||||
.join('')
|
||||
.toUpperCase()
|
||||
);
|
||||
}
|
||||
|
||||
if (ibExchange.code) {
|
||||
aliases.push(ibExchange.code.toUpperCase());
|
||||
}
|
||||
|
||||
return aliases;
|
||||
}
|
||||
|
||||
/**
|
||||
* Infer timezone from exchange name/location
|
||||
*/
|
||||
function inferTimezone(ibExchange: IBExchange): string {
|
||||
if (!ibExchange.name) {
|
||||
return 'UTC';
|
||||
}
|
||||
|
||||
const name = ibExchange.name.toUpperCase();
|
||||
|
||||
if (name.includes('NEW YORK') || name.includes('NYSE') || name.includes('NASDAQ')) {
|
||||
return 'America/New_York';
|
||||
}
|
||||
if (name.includes('LONDON')) {
|
||||
return 'Europe/London';
|
||||
}
|
||||
if (name.includes('TOKYO')) {
|
||||
return 'Asia/Tokyo';
|
||||
}
|
||||
if (name.includes('SHANGHAI')) {
|
||||
return 'Asia/Shanghai';
|
||||
}
|
||||
if (name.includes('TORONTO')) {
|
||||
return 'America/Toronto';
|
||||
}
|
||||
if (name.includes('FRANKFURT')) {
|
||||
return 'Europe/Berlin';
|
||||
}
|
||||
|
||||
return 'UTC'; // Default
|
||||
}
|
||||
|
|
@ -0,0 +1,216 @@
|
|||
import { getLogger } from '@stock-bot/logger';
|
||||
import type { IServiceContainer } from '@stock-bot/handlers';
|
||||
import type { JobPayload, SyncResult } from '../../../types/job-payloads';
|
||||
|
||||
const logger = getLogger('enhanced-sync-qm-provider-mappings');
|
||||
|
||||
export async function syncQMProviderMappings(
|
||||
payload: JobPayload,
|
||||
container: IServiceContainer
|
||||
): Promise<SyncResult> {
|
||||
logger.info('Starting QM provider exchange mappings sync...');
|
||||
|
||||
const result: SyncResult = {
|
||||
processed: 0,
|
||||
created: 0,
|
||||
updated: 0,
|
||||
skipped: 0,
|
||||
errors: 0,
|
||||
};
|
||||
|
||||
try {
|
||||
const mongoClient = container.mongodb;
|
||||
const postgresClient = container.postgres;
|
||||
|
||||
// Start transaction
|
||||
await postgresClient.query('BEGIN');
|
||||
|
||||
// Get unique exchange combinations from QM symbols
|
||||
const db = mongoClient.getDatabase();
|
||||
const pipeline = [
|
||||
{
|
||||
$group: {
|
||||
_id: {
|
||||
exchangeCode: '$exchangeCode',
|
||||
exchange: '$exchange',
|
||||
countryCode: '$countryCode',
|
||||
},
|
||||
count: { $sum: 1 },
|
||||
sampleExchange: { $first: '$exchange' },
|
||||
},
|
||||
},
|
||||
{
|
||||
$project: {
|
||||
exchangeCode: '$_id.exchangeCode',
|
||||
exchange: '$_id.exchange',
|
||||
countryCode: '$_id.countryCode',
|
||||
count: 1,
|
||||
sampleExchange: 1,
|
||||
},
|
||||
},
|
||||
];
|
||||
|
||||
const qmExchanges = await db.collection('qmSymbols').aggregate(pipeline).toArray();
|
||||
logger.info(`Found ${qmExchanges.length} unique QM exchange combinations`);
|
||||
|
||||
for (const exchange of qmExchanges) {
|
||||
try {
|
||||
// Create provider exchange mapping for QM
|
||||
await createProviderExchangeMapping(
|
||||
'qm', // provider
|
||||
exchange.exchangeCode,
|
||||
exchange.sampleExchange || exchange.exchangeCode,
|
||||
exchange.countryCode,
|
||||
exchange.countryCode === 'CA' ? 'CAD' : 'USD', // Simple currency mapping
|
||||
0.8, // good confidence for QM data
|
||||
container
|
||||
);
|
||||
|
||||
result.processed++;
|
||||
result.created++;
|
||||
} catch (error) {
|
||||
logger.error('Failed to process QM exchange mapping', { error, exchange });
|
||||
result.errors++;
|
||||
}
|
||||
}
|
||||
|
||||
await postgresClient.query('COMMIT');
|
||||
|
||||
logger.info('QM provider exchange mappings sync completed', result);
|
||||
return result;
|
||||
} catch (error) {
|
||||
const postgresClient = container.postgres;
|
||||
await postgresClient.query('ROLLBACK');
|
||||
logger.error('QM provider exchange mappings sync failed', { error });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
async function createProviderExchangeMapping(
|
||||
provider: string,
|
||||
providerExchangeCode: string,
|
||||
providerExchangeName: string,
|
||||
countryCode: string | null,
|
||||
currency: string | null,
|
||||
confidence: number,
|
||||
container: IServiceContainer
|
||||
): Promise<void> {
|
||||
if (!providerExchangeCode) {
|
||||
return;
|
||||
}
|
||||
|
||||
const postgresClient = container.postgres;
|
||||
|
||||
// Check if mapping already exists
|
||||
const existingMapping = await findProviderExchangeMapping(provider, providerExchangeCode, container);
|
||||
if (existingMapping) {
|
||||
// Don't override existing mappings to preserve manual work
|
||||
return;
|
||||
}
|
||||
|
||||
// Find or create master exchange
|
||||
const masterExchange = await findOrCreateMasterExchange(
|
||||
providerExchangeCode,
|
||||
providerExchangeName,
|
||||
countryCode,
|
||||
currency,
|
||||
container
|
||||
);
|
||||
|
||||
// Create the provider exchange mapping
|
||||
const query = `
|
||||
INSERT INTO provider_exchange_mappings
|
||||
(provider, provider_exchange_code, provider_exchange_name, master_exchange_id,
|
||||
country_code, currency, confidence, active, auto_mapped)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, false, true)
|
||||
ON CONFLICT (provider, provider_exchange_code) DO NOTHING
|
||||
`;
|
||||
|
||||
await postgresClient.query(query, [
|
||||
provider,
|
||||
providerExchangeCode,
|
||||
providerExchangeName,
|
||||
masterExchange.id,
|
||||
countryCode,
|
||||
currency,
|
||||
confidence,
|
||||
]);
|
||||
}
|
||||
|
||||
async function findProviderExchangeMapping(
|
||||
provider: string,
|
||||
providerExchangeCode: string,
|
||||
container: IServiceContainer
|
||||
): Promise<any> {
|
||||
const postgresClient = container.postgres;
|
||||
const query =
|
||||
'SELECT * FROM provider_exchange_mappings WHERE provider = $1 AND provider_exchange_code = $2';
|
||||
const result = await postgresClient.query(query, [provider, providerExchangeCode]);
|
||||
return result.rows[0] || null;
|
||||
}
|
||||
|
||||
async function findOrCreateMasterExchange(
|
||||
providerCode: string,
|
||||
providerName: string,
|
||||
countryCode: string | null,
|
||||
currency: string | null,
|
||||
container: IServiceContainer
|
||||
): Promise<any> {
|
||||
const postgresClient = container.postgres;
|
||||
|
||||
// First, try to find exact match
|
||||
let masterExchange = await findExchangeByCode(providerCode, container);
|
||||
|
||||
if (masterExchange) {
|
||||
return masterExchange;
|
||||
}
|
||||
|
||||
// Try to find by similar codes (basic mapping)
|
||||
const basicMapping = getBasicExchangeMapping(providerCode);
|
||||
if (basicMapping) {
|
||||
masterExchange = await findExchangeByCode(basicMapping, container);
|
||||
if (masterExchange) {
|
||||
return masterExchange;
|
||||
}
|
||||
}
|
||||
|
||||
// Create new master exchange (inactive by default)
|
||||
const query = `
|
||||
INSERT INTO exchanges (code, name, country, currency, active)
|
||||
VALUES ($1, $2, $3, $4, false)
|
||||
ON CONFLICT (code) DO UPDATE SET
|
||||
name = COALESCE(EXCLUDED.name, exchanges.name),
|
||||
country = COALESCE(EXCLUDED.country, exchanges.country),
|
||||
currency = COALESCE(EXCLUDED.currency, exchanges.currency)
|
||||
RETURNING id, code, name, country, currency
|
||||
`;
|
||||
|
||||
const result = await postgresClient.query(query, [
|
||||
providerCode,
|
||||
providerName || providerCode,
|
||||
countryCode || 'US',
|
||||
currency || 'USD',
|
||||
]);
|
||||
|
||||
return result.rows[0];
|
||||
}
|
||||
|
||||
function getBasicExchangeMapping(providerCode: string): string | null {
|
||||
const mappings: Record<string, string> = {
|
||||
NYE: 'NYSE',
|
||||
NAS: 'NASDAQ',
|
||||
TO: 'TSX',
|
||||
LN: 'LSE',
|
||||
LON: 'LSE',
|
||||
};
|
||||
|
||||
return mappings[providerCode.toUpperCase()] || null;
|
||||
}
|
||||
|
||||
async function findExchangeByCode(code: string, container: IServiceContainer): Promise<any> {
|
||||
const postgresClient = container.postgres;
|
||||
const query = 'SELECT * FROM exchanges WHERE code = $1';
|
||||
const result = await postgresClient.query(query, [code]);
|
||||
return result.rows[0] || null;
|
||||
}
|
||||
33
apps/stock/data-pipeline/src/handlers/index.ts
Normal file
33
apps/stock/data-pipeline/src/handlers/index.ts
Normal file
|
|
@ -0,0 +1,33 @@
|
|||
/**
|
||||
* Handler initialization for data pipeline service
|
||||
* Registers all handlers with the service container
|
||||
*/
|
||||
|
||||
import type { ServiceContainer } from '@stock-bot/di';
|
||||
import { getLogger } from '@stock-bot/logger';
|
||||
import { initializeExchangesHandler } from './exchanges/exchanges.handler';
|
||||
import { initializeSymbolsHandler } from './symbols/symbols.handler';
|
||||
|
||||
const logger = getLogger('pipeline-handler-init');
|
||||
|
||||
/**
|
||||
* Initialize all handlers with the service container
|
||||
*/
|
||||
export async function initializeAllHandlers(container: ServiceContainer): Promise<void> {
|
||||
logger.info('Initializing data pipeline handlers...');
|
||||
|
||||
try {
|
||||
// Initialize exchanges handler with container
|
||||
initializeExchangesHandler(container);
|
||||
logger.debug('Exchanges handler initialized');
|
||||
|
||||
// Initialize symbols handler with container
|
||||
initializeSymbolsHandler(container);
|
||||
logger.debug('Symbols handler initialized');
|
||||
|
||||
logger.info('All pipeline handlers initialized successfully');
|
||||
} catch (error) {
|
||||
logger.error('Failed to initialize handlers', { error });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,9 @@
|
|||
import { syncQMSymbols } from './qm-symbols.operations';
|
||||
import { getSyncStatus } from './sync-status.operations';
|
||||
import { syncSymbolsFromProvider } from './sync-symbols-from-provider.operations';
|
||||
|
||||
export const symbolOperations = {
|
||||
syncQMSymbols,
|
||||
syncSymbolsFromProvider,
|
||||
getSyncStatus,
|
||||
};
|
||||
|
|
@ -0,0 +1,184 @@
|
|||
import { getLogger } from '@stock-bot/logger';
|
||||
import type { IServiceContainer } from '@stock-bot/handlers';
|
||||
import type { JobPayload } from '../../../types/job-payloads';
|
||||
|
||||
const logger = getLogger('sync-qm-symbols');
|
||||
|
||||
export async function syncQMSymbols(
|
||||
payload: JobPayload,
|
||||
container: IServiceContainer
|
||||
): Promise<{ processed: number; created: number; updated: number }> {
|
||||
logger.info('Starting QM symbols sync...');
|
||||
|
||||
try {
|
||||
const mongoClient = container.mongodb;
|
||||
const postgresClient = container.postgres;
|
||||
|
||||
// 1. Get all QM symbols from MongoDB
|
||||
const qmSymbols = await mongoClient.find('qmSymbols', {});
|
||||
logger.info(`Found ${qmSymbols.length} QM symbols to process`);
|
||||
|
||||
let created = 0;
|
||||
let updated = 0;
|
||||
|
||||
for (const symbol of qmSymbols) {
|
||||
try {
|
||||
// 2. Resolve exchange
|
||||
const exchangeId = await resolveExchange(
|
||||
symbol.exchangeCode || symbol.exchange,
|
||||
postgresClient
|
||||
);
|
||||
|
||||
if (!exchangeId) {
|
||||
logger.warn('Unknown exchange, skipping symbol', {
|
||||
symbol: symbol.symbol,
|
||||
exchange: symbol.exchangeCode || symbol.exchange,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
// 3. Check if symbol exists
|
||||
const existingSymbol = await findSymbol(symbol.symbol, exchangeId, postgresClient);
|
||||
|
||||
if (existingSymbol) {
|
||||
// Update existing
|
||||
await updateSymbol(existingSymbol.id, symbol, postgresClient);
|
||||
await upsertProviderMapping(existingSymbol.id, 'qm', symbol, postgresClient);
|
||||
updated++;
|
||||
} else {
|
||||
// Create new
|
||||
const newSymbolId = await createSymbol(symbol, exchangeId, postgresClient);
|
||||
await upsertProviderMapping(newSymbolId, 'qm', symbol, postgresClient);
|
||||
created++;
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Failed to process symbol', { error, symbol: symbol.symbol });
|
||||
}
|
||||
}
|
||||
|
||||
// 4. Update sync status
|
||||
await updateSyncStatus('qm', 'symbols', qmSymbols.length, postgresClient);
|
||||
|
||||
const result = { processed: qmSymbols.length, created, updated };
|
||||
logger.info('QM symbols sync completed', result);
|
||||
return result;
|
||||
} catch (error) {
|
||||
logger.error('QM symbols sync failed', { error });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
// Helper functions
|
||||
async function resolveExchange(exchangeCode: string, postgresClient: any): Promise<string | null> {
|
||||
if (!exchangeCode) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Simple mapping - expand this as needed
|
||||
const exchangeMap: Record<string, string> = {
|
||||
NASDAQ: 'NASDAQ',
|
||||
NYSE: 'NYSE',
|
||||
TSX: 'TSX',
|
||||
TSE: 'TSX', // TSE maps to TSX
|
||||
LSE: 'LSE',
|
||||
CME: 'CME',
|
||||
};
|
||||
|
||||
const normalizedCode = exchangeMap[exchangeCode.toUpperCase()];
|
||||
if (!normalizedCode) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const query = 'SELECT id FROM exchanges WHERE code = $1';
|
||||
const result = await postgresClient.query(query, [normalizedCode]);
|
||||
return result.rows[0]?.id || null;
|
||||
}
|
||||
|
||||
async function findSymbol(symbol: string, exchangeId: string, postgresClient: any): Promise<any> {
|
||||
const query = 'SELECT * FROM symbols WHERE symbol = $1 AND exchange_id = $2';
|
||||
const result = await postgresClient.query(query, [symbol, exchangeId]);
|
||||
return result.rows[0] || null;
|
||||
}
|
||||
|
||||
async function createSymbol(
|
||||
qmSymbol: any,
|
||||
exchangeId: string,
|
||||
postgresClient: any
|
||||
): Promise<string> {
|
||||
const query = `
|
||||
INSERT INTO symbols (symbol, exchange_id, company_name, country, currency)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
RETURNING id
|
||||
`;
|
||||
|
||||
const result = await postgresClient.query(query, [
|
||||
qmSymbol.symbol,
|
||||
exchangeId,
|
||||
qmSymbol.companyName || qmSymbol.name,
|
||||
qmSymbol.countryCode || 'US',
|
||||
qmSymbol.currency || 'USD',
|
||||
]);
|
||||
|
||||
return result.rows[0].id;
|
||||
}
|
||||
|
||||
async function updateSymbol(symbolId: string, qmSymbol: any, postgresClient: any): Promise<void> {
|
||||
const query = `
|
||||
UPDATE symbols
|
||||
SET company_name = COALESCE($2, company_name),
|
||||
country = COALESCE($3, country),
|
||||
currency = COALESCE($4, currency),
|
||||
updated_at = NOW()
|
||||
WHERE id = $1
|
||||
`;
|
||||
|
||||
await postgresClient.query(query, [
|
||||
symbolId,
|
||||
qmSymbol.companyName || qmSymbol.name,
|
||||
qmSymbol.countryCode,
|
||||
qmSymbol.currency,
|
||||
]);
|
||||
}
|
||||
|
||||
async function upsertProviderMapping(
|
||||
symbolId: string,
|
||||
provider: string,
|
||||
qmSymbol: any,
|
||||
postgresClient: any
|
||||
): Promise<void> {
|
||||
const query = `
|
||||
INSERT INTO provider_mappings
|
||||
(symbol_id, provider, provider_symbol, provider_exchange, last_seen)
|
||||
VALUES ($1, $2, $3, $4, NOW())
|
||||
ON CONFLICT (provider, provider_symbol)
|
||||
DO UPDATE SET
|
||||
symbol_id = EXCLUDED.symbol_id,
|
||||
provider_exchange = EXCLUDED.provider_exchange,
|
||||
last_seen = NOW()
|
||||
`;
|
||||
|
||||
await postgresClient.query(query, [
|
||||
symbolId,
|
||||
provider,
|
||||
qmSymbol.qmSearchCode || qmSymbol.symbol,
|
||||
qmSymbol.exchangeCode || qmSymbol.exchange,
|
||||
]);
|
||||
}
|
||||
|
||||
async function updateSyncStatus(
|
||||
provider: string,
|
||||
dataType: string,
|
||||
count: number,
|
||||
postgresClient: any
|
||||
): Promise<void> {
|
||||
const query = `
|
||||
UPDATE sync_status
|
||||
SET last_sync_at = NOW(),
|
||||
last_sync_count = $3,
|
||||
sync_errors = NULL,
|
||||
updated_at = NOW()
|
||||
WHERE provider = $1 AND data_type = $2
|
||||
`;
|
||||
|
||||
await postgresClient.query(query, [provider, dataType, count]);
|
||||
}
|
||||
|
|
@ -0,0 +1,24 @@
|
|||
import { getLogger } from '@stock-bot/logger';
|
||||
import type { IServiceContainer } from '@stock-bot/handlers';
|
||||
import type { JobPayload } from '../../../types/job-payloads';
|
||||
|
||||
const logger = getLogger('sync-status');
|
||||
|
||||
export async function getSyncStatus(
|
||||
payload: JobPayload,
|
||||
container: IServiceContainer
|
||||
): Promise<Record<string, unknown>[]> {
|
||||
logger.info('Getting sync status...');
|
||||
|
||||
try {
|
||||
const postgresClient = container.postgres;
|
||||
const query = 'SELECT * FROM sync_status ORDER BY provider, data_type';
|
||||
const result = await postgresClient.query(query);
|
||||
|
||||
logger.info(`Retrieved sync status for ${result.rows.length} entries`);
|
||||
return result.rows;
|
||||
} catch (error) {
|
||||
logger.error('Failed to get sync status', { error });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,237 @@
|
|||
import { getLogger } from '@stock-bot/logger';
|
||||
import type { IServiceContainer } from '@stock-bot/handlers';
|
||||
import type { JobPayload, SyncResult } from '../../../types/job-payloads';
|
||||
|
||||
const logger = getLogger('enhanced-sync-symbols-from-provider');
|
||||
|
||||
export async function syncSymbolsFromProvider(
|
||||
payload: JobPayload,
|
||||
container: IServiceContainer
|
||||
): Promise<SyncResult> {
|
||||
const provider = payload.provider;
|
||||
const clearFirst = payload.clearFirst || false;
|
||||
|
||||
if (!provider) {
|
||||
throw new Error('Provider is required in payload');
|
||||
}
|
||||
|
||||
logger.info(`Starting ${provider} symbols sync...`, { clearFirst });
|
||||
|
||||
const result: SyncResult = {
|
||||
processed: 0,
|
||||
created: 0,
|
||||
updated: 0,
|
||||
skipped: 0,
|
||||
errors: 0,
|
||||
};
|
||||
|
||||
try {
|
||||
const mongoClient = container.mongodb;
|
||||
const postgresClient = container.postgres;
|
||||
|
||||
// Clear existing data if requested (only symbols and mappings, keep exchanges)
|
||||
if (clearFirst) {
|
||||
await postgresClient.query('BEGIN');
|
||||
await postgresClient.query('DELETE FROM provider_mappings');
|
||||
await postgresClient.query('DELETE FROM symbols');
|
||||
await postgresClient.query('COMMIT');
|
||||
logger.info('Cleared existing symbols and mappings before sync');
|
||||
}
|
||||
|
||||
// Start transaction
|
||||
await postgresClient.query('BEGIN');
|
||||
|
||||
let symbols: Record<string, unknown>[] = [];
|
||||
|
||||
// Get symbols based on provider
|
||||
const db = mongoClient.getDatabase();
|
||||
switch (provider.toLowerCase()) {
|
||||
case 'qm':
|
||||
symbols = await db.collection('qmSymbols').find({}).toArray();
|
||||
break;
|
||||
case 'eod':
|
||||
symbols = await db.collection('eodSymbols').find({}).toArray();
|
||||
break;
|
||||
case 'ib':
|
||||
symbols = await db.collection('ibSymbols').find({}).toArray();
|
||||
break;
|
||||
default:
|
||||
throw new Error(`Unsupported provider: ${provider}`);
|
||||
}
|
||||
|
||||
logger.info(`Found ${symbols.length} ${provider} symbols to process`);
|
||||
result.processed = symbols.length;
|
||||
|
||||
for (const symbol of symbols) {
|
||||
try {
|
||||
await processSingleSymbol(symbol, provider, result, container);
|
||||
} catch (error) {
|
||||
logger.error('Failed to process symbol', {
|
||||
error,
|
||||
symbol: symbol.symbol || symbol.code,
|
||||
provider,
|
||||
});
|
||||
result.errors++;
|
||||
}
|
||||
}
|
||||
|
||||
// Update sync status
|
||||
await updateSyncStatus(provider, 'symbols', result.processed, container.postgres);
|
||||
|
||||
await postgresClient.query('COMMIT');
|
||||
|
||||
logger.info(`${provider} symbols sync completed`, result);
|
||||
return result;
|
||||
} catch (error) {
|
||||
await container.postgres.query('ROLLBACK');
|
||||
logger.error(`${provider} symbols sync failed`, { error });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function processSingleSymbol(
|
||||
symbol: any,
|
||||
provider: string,
|
||||
result: SyncResult,
|
||||
container: IServiceContainer
|
||||
): Promise<void> {
|
||||
const symbolCode = symbol.symbol || symbol.code;
|
||||
const exchangeCode = symbol.exchangeCode || symbol.exchange || symbol.exchange_id;
|
||||
|
||||
if (!symbolCode || !exchangeCode) {
|
||||
result.skipped++;
|
||||
return;
|
||||
}
|
||||
|
||||
// Find active provider exchange mapping
|
||||
const providerMapping = await findActiveProviderExchangeMapping(provider, exchangeCode, container);
|
||||
|
||||
if (!providerMapping) {
|
||||
result.skipped++;
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if symbol exists
|
||||
const existingSymbol = await findSymbolByCodeAndExchange(
|
||||
symbolCode,
|
||||
providerMapping.master_exchange_id,
|
||||
container
|
||||
);
|
||||
|
||||
if (existingSymbol) {
|
||||
await updateSymbol(existingSymbol.id, symbol, container);
|
||||
await upsertProviderMapping(existingSymbol.id, provider, symbol, container);
|
||||
result.updated++;
|
||||
} else {
|
||||
const newSymbolId = await createSymbol(symbol, providerMapping.master_exchange_id, container);
|
||||
await upsertProviderMapping(newSymbolId, provider, symbol, container);
|
||||
result.created++;
|
||||
}
|
||||
}
|
||||
|
||||
async function findActiveProviderExchangeMapping(
|
||||
provider: string,
|
||||
providerExchangeCode: string,
|
||||
container: IServiceContainer
|
||||
): Promise<any> {
|
||||
const postgresClient = container.postgres;
|
||||
const query = `
|
||||
SELECT pem.*, e.code as master_exchange_code
|
||||
FROM provider_exchange_mappings pem
|
||||
JOIN exchanges e ON pem.master_exchange_id = e.id
|
||||
WHERE pem.provider = $1 AND pem.provider_exchange_code = $2 AND pem.active = true
|
||||
`;
|
||||
const result = await postgresClient.query(query, [provider, providerExchangeCode]);
|
||||
return result.rows[0] || null;
|
||||
}
|
||||
|
||||
async function findSymbolByCodeAndExchange(symbol: string, exchangeId: string, container: IServiceContainer): Promise<any> {
|
||||
const postgresClient = container.postgres;
|
||||
const query = 'SELECT * FROM symbols WHERE symbol = $1 AND exchange_id = $2';
|
||||
const result = await postgresClient.query(query, [symbol, exchangeId]);
|
||||
return result.rows[0] || null;
|
||||
}
|
||||
|
||||
async function createSymbol(symbol: any, exchangeId: string, container: IServiceContainer): Promise<string> {
|
||||
const postgresClient = container.postgres;
|
||||
const query = `
|
||||
INSERT INTO symbols (symbol, exchange_id, company_name, country, currency)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
RETURNING id
|
||||
`;
|
||||
|
||||
const result = await postgresClient.query(query, [
|
||||
symbol.symbol || symbol.code,
|
||||
exchangeId,
|
||||
symbol.companyName || symbol.name || symbol.company_name,
|
||||
symbol.countryCode || symbol.country_code || 'US',
|
||||
symbol.currency || 'USD',
|
||||
]);
|
||||
|
||||
return result.rows[0].id;
|
||||
}
|
||||
|
||||
async function updateSymbol(symbolId: string, symbol: any, container: IServiceContainer): Promise<void> {
|
||||
const postgresClient = container.postgres;
|
||||
const query = `
|
||||
UPDATE symbols
|
||||
SET company_name = COALESCE($2, company_name),
|
||||
country = COALESCE($3, country),
|
||||
currency = COALESCE($4, currency),
|
||||
updated_at = NOW()
|
||||
WHERE id = $1
|
||||
`;
|
||||
|
||||
await postgresClient.query(query, [
|
||||
symbolId,
|
||||
symbol.companyName || symbol.name || symbol.company_name,
|
||||
symbol.countryCode || symbol.country_code,
|
||||
symbol.currency,
|
||||
]);
|
||||
}
|
||||
|
||||
async function upsertProviderMapping(
|
||||
symbolId: string,
|
||||
provider: string,
|
||||
symbol: any,
|
||||
container: IServiceContainer
|
||||
): Promise<void> {
|
||||
const postgresClient = container.postgres;
|
||||
const query = `
|
||||
INSERT INTO provider_mappings
|
||||
(symbol_id, provider, provider_symbol, provider_exchange, last_seen)
|
||||
VALUES ($1, $2, $3, $4, NOW())
|
||||
ON CONFLICT (provider, provider_symbol)
|
||||
DO UPDATE SET
|
||||
symbol_id = EXCLUDED.symbol_id,
|
||||
provider_exchange = EXCLUDED.provider_exchange,
|
||||
last_seen = NOW()
|
||||
`;
|
||||
|
||||
await postgresClient.query(query, [
|
||||
symbolId,
|
||||
provider,
|
||||
symbol.qmSearchCode || symbol.symbol || symbol.code,
|
||||
symbol.exchangeCode || symbol.exchange || symbol.exchange_id,
|
||||
]);
|
||||
}
|
||||
|
||||
async function updateSyncStatus(
|
||||
provider: string,
|
||||
dataType: string,
|
||||
count: number,
|
||||
postgresClient: any
|
||||
): Promise<void> {
|
||||
const query = `
|
||||
INSERT INTO sync_status (provider, data_type, last_sync_at, last_sync_count, sync_errors)
|
||||
VALUES ($1, $2, NOW(), $3, NULL)
|
||||
ON CONFLICT (provider, data_type)
|
||||
DO UPDATE SET
|
||||
last_sync_at = NOW(),
|
||||
last_sync_count = EXCLUDED.last_sync_count,
|
||||
sync_errors = NULL,
|
||||
updated_at = NOW()
|
||||
`;
|
||||
|
||||
await postgresClient.query(query, [provider, dataType, count]);
|
||||
}
|
||||
|
|
@ -0,0 +1,56 @@
|
|||
import { getLogger } from '@stock-bot/logger';
|
||||
import { handlerRegistry, createJobHandler, type HandlerConfig, type ScheduledJobConfig } from '@stock-bot/queue';
|
||||
import type { ServiceContainer } from '@stock-bot/di';
|
||||
import { symbolOperations } from './operations';
|
||||
|
||||
const logger = getLogger('symbols-handler');
|
||||
|
||||
const HANDLER_NAME = 'symbols';
|
||||
|
||||
const symbolsHandlerConfig: HandlerConfig = {
|
||||
concurrency: 1,
|
||||
maxAttempts: 3,
|
||||
scheduledJobs: [
|
||||
{
|
||||
operation: 'sync-qm-symbols',
|
||||
cronPattern: '0 2 * * *', // Daily at 2 AM
|
||||
payload: {},
|
||||
priority: 5,
|
||||
immediately: false,
|
||||
} as ScheduledJobConfig,
|
||||
{
|
||||
operation: 'sync-symbols-qm',
|
||||
cronPattern: '0 4 * * *', // Daily at 4 AM
|
||||
payload: { provider: 'qm', clearFirst: false },
|
||||
priority: 5,
|
||||
immediately: false,
|
||||
} as ScheduledJobConfig,
|
||||
],
|
||||
operations: {
|
||||
'sync-qm-symbols': symbolOperations.syncQMSymbols,
|
||||
'sync-symbols-qm': symbolOperations.syncSymbolsFromProvider,
|
||||
'sync-symbols-eod': symbolOperations.syncSymbolsFromProvider,
|
||||
'sync-symbols-ib': symbolOperations.syncSymbolsFromProvider,
|
||||
'sync-status': symbolOperations.getSyncStatus,
|
||||
},
|
||||
};
|
||||
|
||||
export function initializeSymbolsHandler(container: ServiceContainer): void {
|
||||
logger.info('Registering symbols handler...');
|
||||
|
||||
// Update operations to use container
|
||||
const containerAwareOperations = Object.entries(symbolOperations).reduce((acc, [key, operation]) => {
|
||||
acc[key] = createJobHandler(async (payload: any) => {
|
||||
return operation(payload, container);
|
||||
});
|
||||
return acc;
|
||||
}, {} as Record<string, any>);
|
||||
|
||||
const symbolsHandlerConfigWithContainer: HandlerConfig = {
|
||||
...symbolsHandlerConfig,
|
||||
operations: containerAwareOperations,
|
||||
};
|
||||
|
||||
handlerRegistry.register(HANDLER_NAME, symbolsHandlerConfigWithContainer);
|
||||
logger.info('Symbols handler registered successfully');
|
||||
}
|
||||
80
apps/stock/data-pipeline/src/index.ts
Normal file
80
apps/stock/data-pipeline/src/index.ts
Normal file
|
|
@ -0,0 +1,80 @@
|
|||
/**
|
||||
* Data Pipeline Service
|
||||
* Simplified entry point using ServiceApplication framework
|
||||
*/
|
||||
|
||||
import { initializeStockConfig } from '@stock-bot/stock-config';
|
||||
import {
|
||||
ServiceApplication,
|
||||
createServiceContainerFromConfig,
|
||||
initializeServices as initializeAwilixServices,
|
||||
} from '@stock-bot/di';
|
||||
import { getLogger } from '@stock-bot/logger';
|
||||
|
||||
// Local imports
|
||||
import { initializeAllHandlers } from './handlers';
|
||||
import { createRoutes } from './routes/create-routes';
|
||||
import { setupServiceContainer } from './container-setup';
|
||||
|
||||
// Initialize configuration with service-specific overrides
|
||||
const config = initializeStockConfig('dataPipeline');
|
||||
console.log('Data Pipeline Service Configuration:', JSON.stringify(config, null, 2));
|
||||
|
||||
// Create service application
|
||||
const app = new ServiceApplication(
|
||||
config,
|
||||
{
|
||||
serviceName: 'data-pipeline',
|
||||
enableHandlers: true,
|
||||
enableScheduledJobs: true,
|
||||
corsConfig: {
|
||||
origin: '*',
|
||||
allowMethods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS', 'PATCH'],
|
||||
allowHeaders: ['Content-Type', 'Authorization'],
|
||||
credentials: false,
|
||||
},
|
||||
serviceMetadata: {
|
||||
version: '1.0.0',
|
||||
description: 'Data processing and transformation pipeline',
|
||||
endpoints: {
|
||||
health: '/health',
|
||||
operations: '/api/operations',
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
// Custom lifecycle hooks
|
||||
onContainerReady: (container) => {
|
||||
// Setup service-specific configuration
|
||||
const enhancedContainer = setupServiceContainer(config, container);
|
||||
return enhancedContainer;
|
||||
},
|
||||
onStarted: (port) => {
|
||||
const logger = getLogger('data-pipeline');
|
||||
logger.info('Data pipeline service startup initiated with ServiceApplication framework');
|
||||
},
|
||||
}
|
||||
);
|
||||
|
||||
// Container factory function
|
||||
async function createContainer(config: any) {
|
||||
const container = createServiceContainerFromConfig(config, {
|
||||
enableQuestDB: config.database.questdb?.enabled || false,
|
||||
// Data pipeline needs all databases
|
||||
enableMongoDB: true,
|
||||
enablePostgres: true,
|
||||
enableCache: true,
|
||||
enableQueue: true,
|
||||
enableBrowser: false, // Data pipeline doesn't need browser
|
||||
enableProxy: false, // Data pipeline doesn't need proxy
|
||||
});
|
||||
await initializeAwilixServices(container);
|
||||
return container;
|
||||
}
|
||||
|
||||
// Start the service
|
||||
app.start(createContainer, createRoutes, initializeAllHandlers).catch(error => {
|
||||
const logger = getLogger('data-pipeline');
|
||||
logger.fatal('Failed to start data pipeline service', { error });
|
||||
process.exit(1);
|
||||
});
|
||||
29
apps/stock/data-pipeline/src/routes/create-routes.ts
Normal file
29
apps/stock/data-pipeline/src/routes/create-routes.ts
Normal file
|
|
@ -0,0 +1,29 @@
|
|||
/**
|
||||
* Route factory for data pipeline service
|
||||
* Creates routes with access to the service container
|
||||
*/
|
||||
|
||||
import { Hono } from 'hono';
|
||||
import type { IServiceContainer } from '@stock-bot/handlers';
|
||||
import { healthRoutes } from './health.routes';
|
||||
import { createSyncRoutes } from './sync.routes';
|
||||
import { createEnhancedSyncRoutes } from './enhanced-sync.routes';
|
||||
import { createStatsRoutes } from './stats.routes';
|
||||
|
||||
export function createRoutes(container: IServiceContainer): Hono {
|
||||
const app = new Hono();
|
||||
|
||||
// Add container to context for all routes
|
||||
app.use('*', async (c, next) => {
|
||||
c.set('container', container);
|
||||
await next();
|
||||
});
|
||||
|
||||
// Mount routes
|
||||
app.route('/health', healthRoutes);
|
||||
app.route('/sync', createSyncRoutes(container));
|
||||
app.route('/sync', createEnhancedSyncRoutes(container));
|
||||
app.route('/sync/stats', createStatsRoutes(container));
|
||||
|
||||
return app;
|
||||
}
|
||||
154
apps/stock/data-pipeline/src/routes/enhanced-sync.routes.ts
Normal file
154
apps/stock/data-pipeline/src/routes/enhanced-sync.routes.ts
Normal file
|
|
@ -0,0 +1,154 @@
|
|||
import { Hono } from 'hono';
|
||||
import { getLogger } from '@stock-bot/logger';
|
||||
import type { IServiceContainer } from '@stock-bot/handlers';
|
||||
|
||||
const logger = getLogger('enhanced-sync-routes');
|
||||
|
||||
export function createEnhancedSyncRoutes(container: IServiceContainer) {
|
||||
const enhancedSync = new Hono();
|
||||
|
||||
// Enhanced sync endpoints
|
||||
enhancedSync.post('/exchanges/all', async c => {
|
||||
try {
|
||||
const clearFirst = c.req.query('clear') === 'true';
|
||||
const queueManager = container.queue;
|
||||
if (!queueManager) {
|
||||
return c.json({ success: false, error: 'Queue manager not available' }, 503);
|
||||
}
|
||||
|
||||
const exchangesQueue = queueManager.getQueue('exchanges');
|
||||
|
||||
const job = await exchangesQueue.addJob('sync-all-exchanges', {
|
||||
handler: 'exchanges',
|
||||
operation: 'sync-all-exchanges',
|
||||
payload: { clearFirst },
|
||||
});
|
||||
|
||||
return c.json({ success: true, jobId: job.id, message: 'Enhanced exchange sync job queued' });
|
||||
} catch (error) {
|
||||
logger.error('Failed to queue enhanced exchange sync job', { error });
|
||||
return c.json(
|
||||
{ success: false, error: error instanceof Error ? error.message : 'Unknown error' },
|
||||
500
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
enhancedSync.post('/provider-mappings/qm', async c => {
|
||||
try {
|
||||
const queueManager = container.queue;
|
||||
if (!queueManager) {
|
||||
return c.json({ success: false, error: 'Queue manager not available' }, 503);
|
||||
}
|
||||
|
||||
const exchangesQueue = queueManager.getQueue('exchanges');
|
||||
|
||||
const job = await exchangesQueue.addJob('sync-qm-provider-mappings', {
|
||||
handler: 'exchanges',
|
||||
operation: 'sync-qm-provider-mappings',
|
||||
payload: {},
|
||||
});
|
||||
|
||||
return c.json({
|
||||
success: true,
|
||||
jobId: job.id,
|
||||
message: 'QM provider mappings sync job queued',
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error('Failed to queue QM provider mappings sync job', { error });
|
||||
return c.json(
|
||||
{ success: false, error: error instanceof Error ? error.message : 'Unknown error' },
|
||||
500
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
enhancedSync.post('/provider-mappings/ib', async c => {
|
||||
try {
|
||||
const queueManager = container.queue;
|
||||
if (!queueManager) {
|
||||
return c.json({ success: false, error: 'Queue manager not available' }, 503);
|
||||
}
|
||||
|
||||
const exchangesQueue = queueManager.getQueue('exchanges');
|
||||
|
||||
const job = await exchangesQueue.addJob('sync-ib-exchanges', {
|
||||
handler: 'exchanges',
|
||||
operation: 'sync-ib-exchanges',
|
||||
payload: {},
|
||||
});
|
||||
|
||||
return c.json({
|
||||
success: true,
|
||||
jobId: job.id,
|
||||
message: 'IB exchanges sync job queued',
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error('Failed to queue IB exchanges sync job', { error });
|
||||
return c.json(
|
||||
{ success: false, error: error instanceof Error ? error.message : 'Unknown error' },
|
||||
500
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
enhancedSync.get('/status', async c => {
|
||||
try {
|
||||
const queueManager = container.queue;
|
||||
if (!queueManager) {
|
||||
return c.json({ success: false, error: 'Queue manager not available' }, 503);
|
||||
}
|
||||
|
||||
const symbolsQueue = queueManager.getQueue('symbols');
|
||||
|
||||
const job = await symbolsQueue.addJob('sync-status', {
|
||||
handler: 'symbols',
|
||||
operation: 'sync-status',
|
||||
payload: {},
|
||||
});
|
||||
|
||||
return c.json({ success: true, jobId: job.id, message: 'Sync status job queued' });
|
||||
} catch (error) {
|
||||
logger.error('Failed to queue sync status job', { error });
|
||||
return c.json(
|
||||
{ success: false, error: error instanceof Error ? error.message : 'Unknown error' },
|
||||
500
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
enhancedSync.post('/clear/postgresql', async c => {
|
||||
try {
|
||||
const dataType = c.req.query('type') as 'exchanges' | 'provider_mappings' | 'all';
|
||||
const queueManager = container.queue;
|
||||
if (!queueManager) {
|
||||
return c.json({ success: false, error: 'Queue manager not available' }, 503);
|
||||
}
|
||||
|
||||
const exchangesQueue = queueManager.getQueue('exchanges');
|
||||
|
||||
const job = await exchangesQueue.addJob('clear-postgresql-data', {
|
||||
handler: 'exchanges',
|
||||
operation: 'clear-postgresql-data',
|
||||
payload: { dataType: dataType || 'all' },
|
||||
});
|
||||
|
||||
return c.json({
|
||||
success: true,
|
||||
jobId: job.id,
|
||||
message: 'PostgreSQL data clear job queued',
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error('Failed to queue PostgreSQL clear job', { error });
|
||||
return c.json(
|
||||
{ success: false, error: error instanceof Error ? error.message : 'Unknown error' },
|
||||
500
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
return enhancedSync;
|
||||
}
|
||||
|
||||
// Legacy export for backward compatibility
|
||||
export const enhancedSyncRoutes = createEnhancedSyncRoutes({} as IServiceContainer);
|
||||
14
apps/stock/data-pipeline/src/routes/health.routes.ts
Normal file
14
apps/stock/data-pipeline/src/routes/health.routes.ts
Normal file
|
|
@ -0,0 +1,14 @@
|
|||
import { Hono } from 'hono';
|
||||
|
||||
const health = new Hono();
|
||||
|
||||
// Basic health check endpoint
|
||||
health.get('/', c => {
|
||||
return c.json({
|
||||
status: 'healthy',
|
||||
service: 'data-pipeline',
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
});
|
||||
|
||||
export { health as healthRoutes };
|
||||
5
apps/stock/data-pipeline/src/routes/index.ts
Normal file
5
apps/stock/data-pipeline/src/routes/index.ts
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
// Export all route modules
|
||||
export { healthRoutes } from './health.routes';
|
||||
export { syncRoutes } from './sync.routes';
|
||||
export { enhancedSyncRoutes } from './enhanced-sync.routes';
|
||||
export { statsRoutes } from './stats.routes';
|
||||
63
apps/stock/data-pipeline/src/routes/stats.routes.ts
Normal file
63
apps/stock/data-pipeline/src/routes/stats.routes.ts
Normal file
|
|
@ -0,0 +1,63 @@
|
|||
import { Hono } from 'hono';
|
||||
import { getLogger } from '@stock-bot/logger';
|
||||
import type { IServiceContainer } from '@stock-bot/handlers';
|
||||
|
||||
const logger = getLogger('stats-routes');
|
||||
|
||||
export function createStatsRoutes(container: IServiceContainer) {
|
||||
const stats = new Hono();
|
||||
|
||||
// Statistics endpoints
|
||||
stats.get('/exchanges', async c => {
|
||||
try {
|
||||
const queueManager = container.queue;
|
||||
if (!queueManager) {
|
||||
return c.json({ error: 'Queue manager not available' }, 503);
|
||||
}
|
||||
|
||||
const exchangesQueue = queueManager.getQueue('exchanges');
|
||||
|
||||
const job = await exchangesQueue.addJob('get-exchange-stats', {
|
||||
handler: 'exchanges',
|
||||
operation: 'get-exchange-stats',
|
||||
payload: {},
|
||||
});
|
||||
|
||||
// Wait for job to complete and return result
|
||||
const result = await job.waitUntilFinished();
|
||||
return c.json(result);
|
||||
} catch (error) {
|
||||
logger.error('Failed to get exchange stats', { error });
|
||||
return c.json({ error: error instanceof Error ? error.message : 'Unknown error' }, 500);
|
||||
}
|
||||
});
|
||||
|
||||
stats.get('/provider-mappings', async c => {
|
||||
try {
|
||||
const queueManager = container.queue;
|
||||
if (!queueManager) {
|
||||
return c.json({ error: 'Queue manager not available' }, 503);
|
||||
}
|
||||
|
||||
const exchangesQueue = queueManager.getQueue('exchanges');
|
||||
|
||||
const job = await exchangesQueue.addJob('get-provider-mapping-stats', {
|
||||
handler: 'exchanges',
|
||||
operation: 'get-provider-mapping-stats',
|
||||
payload: {},
|
||||
});
|
||||
|
||||
// Wait for job to complete and return result
|
||||
const result = await job.waitUntilFinished();
|
||||
return c.json(result);
|
||||
} catch (error) {
|
||||
logger.error('Failed to get provider mapping stats', { error });
|
||||
return c.json({ error: error instanceof Error ? error.message : 'Unknown error' }, 500);
|
||||
}
|
||||
});
|
||||
|
||||
return stats;
|
||||
}
|
||||
|
||||
// Legacy export for backward compatibility
|
||||
export const statsRoutes = createStatsRoutes({} as IServiceContainer);
|
||||
95
apps/stock/data-pipeline/src/routes/sync.routes.ts
Normal file
95
apps/stock/data-pipeline/src/routes/sync.routes.ts
Normal file
|
|
@ -0,0 +1,95 @@
|
|||
import { Hono } from 'hono';
|
||||
import { getLogger } from '@stock-bot/logger';
|
||||
import type { IServiceContainer } from '@stock-bot/handlers';
|
||||
|
||||
const logger = getLogger('sync-routes');
|
||||
|
||||
export function createSyncRoutes(container: IServiceContainer) {
|
||||
const sync = new Hono();
|
||||
|
||||
// Manual sync trigger endpoints
|
||||
sync.post('/symbols', async c => {
|
||||
try {
|
||||
const queueManager = container.queue;
|
||||
if (!queueManager) {
|
||||
return c.json({ success: false, error: 'Queue manager not available' }, 503);
|
||||
}
|
||||
|
||||
const symbolsQueue = queueManager.getQueue('symbols');
|
||||
|
||||
const job = await symbolsQueue.addJob('sync-qm-symbols', {
|
||||
handler: 'symbols',
|
||||
operation: 'sync-qm-symbols',
|
||||
payload: {},
|
||||
});
|
||||
|
||||
return c.json({ success: true, jobId: job.id, message: 'QM symbols sync job queued' });
|
||||
} catch (error) {
|
||||
logger.error('Failed to queue symbol sync job', { error });
|
||||
return c.json(
|
||||
{ success: false, error: error instanceof Error ? error.message : 'Unknown error' },
|
||||
500
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
sync.post('/exchanges', async c => {
|
||||
try {
|
||||
const queueManager = container.queue;
|
||||
if (!queueManager) {
|
||||
return c.json({ success: false, error: 'Queue manager not available' }, 503);
|
||||
}
|
||||
|
||||
const exchangesQueue = queueManager.getQueue('exchanges');
|
||||
|
||||
const job = await exchangesQueue.addJob('sync-qm-exchanges', {
|
||||
handler: 'exchanges',
|
||||
operation: 'sync-qm-exchanges',
|
||||
payload: {},
|
||||
});
|
||||
|
||||
return c.json({ success: true, jobId: job.id, message: 'QM exchanges sync job queued' });
|
||||
} catch (error) {
|
||||
logger.error('Failed to queue exchange sync job', { error });
|
||||
return c.json(
|
||||
{ success: false, error: error instanceof Error ? error.message : 'Unknown error' },
|
||||
500
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
sync.post('/symbols/:provider', async c => {
|
||||
try {
|
||||
const provider = c.req.param('provider');
|
||||
const queueManager = container.queue;
|
||||
if (!queueManager) {
|
||||
return c.json({ success: false, error: 'Queue manager not available' }, 503);
|
||||
}
|
||||
|
||||
const symbolsQueue = queueManager.getQueue('symbols');
|
||||
|
||||
const job = await symbolsQueue.addJob('sync-symbols-from-provider', {
|
||||
handler: 'symbols',
|
||||
operation: 'sync-symbols-from-provider',
|
||||
payload: { provider },
|
||||
});
|
||||
|
||||
return c.json({
|
||||
success: true,
|
||||
jobId: job.id,
|
||||
message: `${provider} symbols sync job queued`,
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error('Failed to queue provider symbol sync job', { error });
|
||||
return c.json(
|
||||
{ success: false, error: error instanceof Error ? error.message : 'Unknown error' },
|
||||
500
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
return sync;
|
||||
}
|
||||
|
||||
// Legacy export for backward compatibility
|
||||
export const syncRoutes = createSyncRoutes({} as IServiceContainer);
|
||||
27
apps/stock/data-pipeline/src/types/job-payloads.ts
Normal file
27
apps/stock/data-pipeline/src/types/job-payloads.ts
Normal file
|
|
@ -0,0 +1,27 @@
|
|||
export interface JobPayload {
|
||||
[key: string]: any;
|
||||
}
|
||||
|
||||
export interface SyncResult {
|
||||
processed: number;
|
||||
created: number;
|
||||
updated: number;
|
||||
skipped: number;
|
||||
errors: number;
|
||||
}
|
||||
|
||||
export interface SyncStatus {
|
||||
provider: string;
|
||||
dataType: string;
|
||||
lastSyncAt?: Date;
|
||||
lastSyncCount: number;
|
||||
syncErrors?: string;
|
||||
}
|
||||
|
||||
export interface ExchangeMapping {
|
||||
id: string;
|
||||
code: string;
|
||||
name: string;
|
||||
country: string;
|
||||
currency: string;
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue