di-refactor coming along

This commit is contained in:
Boki 2025-06-22 18:14:34 -04:00
parent 7d9044ab29
commit 60ada5f6a3
20 changed files with 582 additions and 335 deletions

View file

@ -1,5 +1,6 @@
import { getLogger } from '@stock-bot/logger';
import { handlerRegistry, type HandlerConfig, type ScheduledJobConfig } from '@stock-bot/queue';
import { handlerRegistry, createJobHandler, type HandlerConfig, type ScheduledJobConfig } from '@stock-bot/queue';
import type { ServiceContainer } from '@stock-bot/di';
import { exchangeOperations } from './operations';
const logger = getLogger('exchanges-handler');
@ -51,8 +52,23 @@ const exchangesHandlerConfig: HandlerConfig = {
},
};
export function initializeExchangesHandler(): void {
export function initializeExchangesHandler(container: ServiceContainer) {
logger.info('Registering exchanges handler...');
handlerRegistry.registerHandler(HANDLER_NAME, exchangesHandlerConfig);
// Update operations to use container
const containerAwareOperations = Object.entries(exchangeOperations).reduce((acc, [key, operation]) => {
acc[key] = createJobHandler(async (payload: any) => {
return operation(payload, container);
});
return acc;
}, {} as Record<string, any>);
const exchangesHandlerConfigWithContainer: HandlerConfig = {
...exchangesHandlerConfig,
operations: containerAwareOperations,
};
handlerRegistry.register(HANDLER_NAME, exchangesHandlerConfigWithContainer);
logger.info('Exchanges handler registered successfully');
}

View file

@ -1,10 +1,13 @@
import { getLogger } from '@stock-bot/logger';
import { getPostgreSQLClient } from '../../../clients';
import type { ServiceContainer } from '@stock-bot/di';
import type { JobPayload } from '../../../types/job-payloads';
const logger = getLogger('enhanced-sync-clear-postgresql-data');
export async function clearPostgreSQLData(payload: JobPayload): Promise<{
export async function clearPostgreSQLData(
payload: JobPayload,
container: ServiceContainer
): Promise<{
exchangesCleared: number;
symbolsCleared: number;
mappingsCleared: number;
@ -12,7 +15,7 @@ export async function clearPostgreSQLData(payload: JobPayload): Promise<{
logger.info('Clearing existing PostgreSQL data...');
try {
const postgresClient = getPostgreSQLClient();
const postgresClient = container.postgres;
// Start transaction for atomic operations
await postgresClient.query('BEGIN');
@ -50,7 +53,7 @@ export async function clearPostgreSQLData(payload: JobPayload): Promise<{
return { exchangesCleared, symbolsCleared, mappingsCleared };
} catch (error) {
const postgresClient = getPostgreSQLClient();
const postgresClient = container.postgres;
await postgresClient.query('ROLLBACK');
logger.error('Failed to clear PostgreSQL data', { error });
throw error;

View file

@ -1,14 +1,17 @@
import { getLogger } from '@stock-bot/logger';
import { getPostgreSQLClient } from '../../../clients';
import type { ServiceContainer } from '@stock-bot/di';
import type { JobPayload, SyncStatus } from '../../../types/job-payloads';
const logger = getLogger('enhanced-sync-status');
export async function getSyncStatus(payload: JobPayload): Promise<SyncStatus[]> {
export async function getSyncStatus(
payload: JobPayload,
container: ServiceContainer
): Promise<SyncStatus[]> {
logger.info('Getting comprehensive sync status...');
try {
const postgresClient = getPostgreSQLClient();
const postgresClient = container.postgres;
const query = `
SELECT provider, data_type as "dataType", last_sync_at as "lastSyncAt",
last_sync_count as "lastSyncCount", sync_errors as "syncErrors"

View file

@ -1,14 +1,17 @@
import { getLogger } from '@stock-bot/logger';
import { getPostgreSQLClient } from '../../../clients';
import type { ServiceContainer } from '@stock-bot/di';
import type { JobPayload } from '../../../types/job-payloads';
const logger = getLogger('enhanced-sync-exchange-stats');
export async function getExchangeStats(payload: JobPayload): Promise<any> {
export async function getExchangeStats(
payload: JobPayload,
container: ServiceContainer
): Promise<any> {
logger.info('Getting exchange statistics...');
try {
const postgresClient = getPostgreSQLClient();
const postgresClient = container.postgres;
const query = `
SELECT
COUNT(*) as total_exchanges,

View file

@ -1,14 +1,17 @@
import { getLogger } from '@stock-bot/logger';
import { getPostgreSQLClient } from '../../../clients';
import type { ServiceContainer } from '@stock-bot/di';
import type { JobPayload } from '../../../types/job-payloads';
const logger = getLogger('enhanced-sync-provider-mapping-stats');
export async function getProviderMappingStats(payload: JobPayload): Promise<any> {
export async function getProviderMappingStats(
payload: JobPayload,
container: ServiceContainer
): Promise<any> {
logger.info('Getting provider mapping statistics...');
try {
const postgresClient = getPostgreSQLClient();
const postgresClient = container.postgres;
const query = `
SELECT
provider,

View file

@ -1,10 +1,10 @@
import { getLogger } from '@stock-bot/logger';
import { getMongoDBClient, getPostgreSQLClient } from '../../../clients';
import type { ServiceContainer } from '@stock-bot/di';
import type { JobPayload, SyncResult } from '../../../types/job-payloads';
const logger = getLogger('enhanced-sync-all-exchanges');
export async function syncAllExchanges(payload: JobPayload): Promise<SyncResult> {
export async function syncAllExchanges(payload: JobPayload, container: ServiceContainer): Promise<SyncResult> {
const clearFirst = payload.clearFirst || true;
logger.info('Starting comprehensive exchange sync...', { clearFirst });
@ -17,7 +17,7 @@ export async function syncAllExchanges(payload: JobPayload): Promise<SyncResult>
};
try {
const postgresClient = getPostgreSQLClient();
const postgresClient = container.postgres;
// Clear existing data if requested
if (clearFirst) {
@ -28,11 +28,11 @@ export async function syncAllExchanges(payload: JobPayload): Promise<SyncResult>
await postgresClient.query('BEGIN');
// 1. Sync from EOD exchanges (comprehensive global data)
const eodResult = await syncEODExchanges();
const eodResult = await syncEODExchanges(container);
mergeResults(result, eodResult);
// 2. Sync from IB exchanges (detailed asset information)
const ibResult = await syncIBExchanges();
const ibResult = await syncIBExchanges(container);
mergeResults(result, ibResult);
// 3. Update sync status
@ -43,13 +43,14 @@ export async function syncAllExchanges(payload: JobPayload): Promise<SyncResult>
logger.info('Comprehensive exchange sync completed', result);
return result;
} catch (error) {
const postgresClient = getPostgreSQLClient();
const postgresClient = container.postgres;
await postgresClient.query('ROLLBACK');
logger.error('Comprehensive exchange sync failed', { error });
throw error;
}
}
async function clearPostgreSQLData(postgresClient: any): Promise<void> {
logger.info('Clearing existing PostgreSQL data...');
@ -66,8 +67,8 @@ async function clearPostgreSQLData(postgresClient: any): Promise<void> {
logger.info('PostgreSQL data cleared successfully');
}
async function syncEODExchanges(): Promise<SyncResult> {
const mongoClient = getMongoDBClient();
async function syncEODExchanges(container: ServiceContainer): Promise<SyncResult> {
const mongoClient = container.mongodb;
const exchanges = await mongoClient.find('eodExchanges', { active: true });
const result: SyncResult = { processed: 0, created: 0, updated: 0, skipped: 0, errors: 0 };
@ -80,7 +81,8 @@ async function syncEODExchanges(): Promise<SyncResult> {
exchange.Name,
exchange.CountryISO2,
exchange.Currency,
0.95 // very high confidence for EOD data
0.95, // very high confidence for EOD data
container
);
result.processed++;
@ -94,8 +96,8 @@ async function syncEODExchanges(): Promise<SyncResult> {
return result;
}
async function syncIBExchanges(): Promise<SyncResult> {
const mongoClient = getMongoDBClient();
async function syncIBExchanges(container: ServiceContainer): Promise<SyncResult> {
const mongoClient = container.mongodb;
const exchanges = await mongoClient.find('ibExchanges', {});
const result: SyncResult = { processed: 0, created: 0, updated: 0, skipped: 0, errors: 0 };
@ -108,7 +110,8 @@ async function syncIBExchanges(): Promise<SyncResult> {
exchange.name,
exchange.country_code,
'USD', // IB doesn't specify currency, default to USD
0.85 // good confidence for IB data
0.85, // good confidence for IB data
container
);
result.processed++;
@ -128,16 +131,17 @@ async function createProviderExchangeMapping(
providerExchangeName: string,
countryCode: string | null,
currency: string | null,
confidence: number
confidence: number,
container: ServiceContainer
): Promise<void> {
if (!providerExchangeCode) {
return;
}
const postgresClient = getPostgreSQLClient();
const postgresClient = container.postgres;
// Check if mapping already exists
const existingMapping = await findProviderExchangeMapping(provider, providerExchangeCode);
const existingMapping = await findProviderExchangeMapping(provider, providerExchangeCode, container);
if (existingMapping) {
// Don't override existing mappings to preserve manual work
return;
@ -148,7 +152,8 @@ async function createProviderExchangeMapping(
providerExchangeCode,
providerExchangeName,
countryCode,
currency
currency,
container
);
// Create the provider exchange mapping
@ -175,12 +180,13 @@ async function findOrCreateMasterExchange(
providerCode: string,
providerName: string,
countryCode: string | null,
currency: string | null
currency: string | null,
container: ServiceContainer
): Promise<any> {
const postgresClient = getPostgreSQLClient();
const postgresClient = container.postgres;
// First, try to find exact match
let masterExchange = await findExchangeByCode(providerCode);
let masterExchange = await findExchangeByCode(providerCode, container);
if (masterExchange) {
return masterExchange;
@ -189,7 +195,7 @@ async function findOrCreateMasterExchange(
// Try to find by similar codes (basic mapping)
const basicMapping = getBasicExchangeMapping(providerCode);
if (basicMapping) {
masterExchange = await findExchangeByCode(basicMapping);
masterExchange = await findExchangeByCode(basicMapping, container);
if (masterExchange) {
return masterExchange;
}
@ -230,17 +236,18 @@ function getBasicExchangeMapping(providerCode: string): string | null {
async function findProviderExchangeMapping(
provider: string,
providerExchangeCode: string
providerExchangeCode: string,
container: ServiceContainer
): Promise<any> {
const postgresClient = getPostgreSQLClient();
const postgresClient = container.postgres;
const query =
'SELECT * FROM provider_exchange_mappings WHERE provider = $1 AND provider_exchange_code = $2';
const result = await postgresClient.query(query, [provider, providerExchangeCode]);
return result.rows[0] || null;
}
async function findExchangeByCode(code: string): Promise<any> {
const postgresClient = getPostgreSQLClient();
async function findExchangeByCode(code: string, container: ServiceContainer): Promise<any> {
const postgresClient = container.postgres;
const query = 'SELECT * FROM exchanges WHERE code = $1';
const result = await postgresClient.query(query, [code]);
return result.rows[0] || null;