initial masterExchanges

This commit is contained in:
Boki 2025-06-15 12:26:38 -04:00
parent d068898e32
commit 660a2a1ec2
7 changed files with 446 additions and 3 deletions

View file

@ -10,7 +10,7 @@ import { processItems, QueueManager, type QueueConfig } from '@stock-bot/queue';
import { Shutdown } from '@stock-bot/shutdown'; import { Shutdown } from '@stock-bot/shutdown';
import { initializeIBResources } from './providers/ib.tasks'; import { initializeIBResources } from './providers/ib.tasks';
import { initializeProxyResources } from './providers/proxy.tasks'; import { initializeProxyResources } from './providers/proxy.tasks';
import { healthRoutes, queueRoutes } from './routes'; import { exchangeRoutes, healthRoutes, queueRoutes } from './routes';
// Load environment variables // Load environment variables
loadEnvVariables(); loadEnvVariables();
@ -45,6 +45,12 @@ function createDataServiceQueueManager(): QueueManager {
const { initializeProxyProvider } = await import('./providers/proxy.provider'); const { initializeProxyProvider } = await import('./providers/proxy.provider');
return initializeProxyProvider(); return initializeProxyProvider();
}, },
async () => {
const { initializeExchangeSyncProvider } = await import(
'./providers/exchange-sync.provider'
);
return initializeExchangeSyncProvider();
},
], ],
enableScheduledJobs: true, enableScheduledJobs: true,
}; };
@ -59,6 +65,7 @@ export const queueManager = createDataServiceQueueManager();
export { processItems }; export { processItems };
// Register all routes // Register all routes
app.route('', exchangeRoutes);
app.route('', healthRoutes); app.route('', healthRoutes);
app.route('', queueRoutes); app.route('', queueRoutes);

View file

@ -0,0 +1,274 @@
/**
* Exchange Sync Provider - Queue provider for syncing IB exchanges to master records
*/
import { getLogger } from '@stock-bot/logger';
import type { MasterExchange } from '@stock-bot/mongodb-client';
import type { ProviderConfigWithSchedule } from '@stock-bot/queue';
import { providerRegistry } from '@stock-bot/queue';
const logger = getLogger('exchange-sync');
export function initializeExchangeSyncProvider() {
logger.info('Registering exchange sync provider...');
const exchangeSyncConfig: ProviderConfigWithSchedule = {
name: 'exchange-sync',
operations: {
'sync-ib-exchanges': async _payload => {
logger.info('Syncing IB exchanges to master table');
return await syncIBExchanges();
},
'get-master-exchange': async (payload: { masterExchangeId: string }) => {
logger.debug('Getting master exchange details', payload);
const exchange = await getMasterExchangeDetails(payload.masterExchangeId);
return { exchange, ...payload };
},
},
scheduledJobs: [
{
type: 'exchange-sync-daily',
operation: 'sync-ib-exchanges',
payload: {},
cronPattern: '0 3 * * *', // Daily at 3 AM
priority: 3,
description: 'Daily sync of IB exchanges to master table',
immediately: true, // Run on startup to test
},
],
};
providerRegistry.registerWithSchedule(exchangeSyncConfig);
logger.info('Exchange sync provider registered successfully');
}
/**
* Sync IB exchanges from actual ibExchanges table - creates 1:1 master records
*/
async function syncIBExchanges(): Promise<{ syncedCount: number; totalExchanges: number }> {
logger.info('Syncing IB exchanges from database...');
try {
const { connectMongoDB, getDatabase } = await import('@stock-bot/mongodb-client');
// Ensure MongoDB client is connected
await connectMongoDB();
const db = getDatabase();
// Filter by country code US and CA
const ibExchanges = await db
.collection('ibExchanges')
.find({
country_code: { $in: ['US', 'CA'] },
})
.toArray();
logger.info('Found IB exchanges in database', { count: ibExchanges.length });
let syncedCount = 0;
for (const exchange of ibExchanges) {
try {
await createOrUpdateMasterExchange(exchange);
syncedCount++;
logger.debug('Synced IB exchange', {
ibId: exchange.id,
country: exchange.country_code,
});
} catch (error) {
logger.error('Failed to sync IB exchange', { exchange: exchange.id, error });
}
}
logger.info('IB exchange sync completed', {
syncedCount,
totalExchanges: ibExchanges.length,
});
return { syncedCount, totalExchanges: ibExchanges.length };
} catch (error) {
logger.error('Failed to fetch IB exchanges from database', { error });
return { syncedCount: 0, totalExchanges: 0 };
}
}
/**
* Create or update master exchange record 1:1 from IB exchange
*/
async function createOrUpdateMasterExchange(ibExchange: {
id?: string;
name?: string;
code?: string;
country_code?: string;
currency?: string;
_id?: unknown;
}): Promise<void> {
const { connectMongoDB, getDatabase } = await import('@stock-bot/mongodb-client');
await connectMongoDB();
const db = getDatabase();
const collection = db.collection<MasterExchange>('masterExchanges');
const masterExchangeId = generateMasterExchangeId(ibExchange);
const now = new Date();
// Check if master exchange already exists
const existing = await collection.findOne({ masterExchangeId });
if (existing) {
// Update existing record
await collection.updateOne(
{ masterExchangeId },
{
$set: {
officialName: ibExchange.name || `Exchange ${ibExchange.id}`,
country: ibExchange.country_code || 'UNKNOWN',
currency: ibExchange.currency || 'USD',
timezone: inferTimezone(ibExchange),
updated_at: now,
},
}
);
logger.debug('Updated existing master exchange', { masterExchangeId });
} else {
// Create new master exchange
const masterExchange: MasterExchange = {
masterExchangeId,
shortName: masterExchangeId, // Set shortName to masterExchangeId on creation
officialName: ibExchange.name || `Exchange ${ibExchange.id}`,
country: ibExchange.country_code || 'UNKNOWN',
currency: ibExchange.currency || 'USD',
timezone: inferTimezone(ibExchange),
active: false, // Set active to false only on creation
sourceMappings: {
ib: {
id: ibExchange.id || ibExchange._id?.toString() || 'unknown',
name: ibExchange.name || `Exchange ${ibExchange.id}`,
code: ibExchange.code || ibExchange.id || '',
aliases: generateAliases(ibExchange),
lastUpdated: now,
},
},
confidence: 1.0, // High confidence for direct IB mapping
verified: true, // Mark as verified since it's direct from IB
// DocumentBase fields
source: 'exchange-sync-provider',
created_at: now,
updated_at: now,
};
await collection.insertOne(masterExchange);
logger.debug('Created new master exchange', { masterExchangeId });
}
}
/**
* Get master exchange details
*/
async function getMasterExchangeDetails(masterExchangeId: string): Promise<MasterExchange | null> {
try {
const { connectMongoDB, getDatabase } = await import('@stock-bot/mongodb-client');
await connectMongoDB();
const db = getDatabase();
const collection = db.collection<MasterExchange>('masterExchanges');
return await collection.findOne({ masterExchangeId });
} catch (error) {
logger.error('Error getting master exchange details', { masterExchangeId, error });
return null;
}
}
/**
* Generate master exchange ID from IB exchange
*/
function generateMasterExchangeId(ibExchange: {
name?: string;
code?: string;
id?: string;
}): string {
// Use code if available, otherwise use ID, otherwise generate from name
if (ibExchange.code) {
return ibExchange.code.toUpperCase().replace(/[^A-Z0-9]/g, '');
}
if (ibExchange.id) {
return ibExchange.id.toUpperCase().replace(/[^A-Z0-9]/g, '');
}
if (ibExchange.name) {
return ibExchange.name
.toUpperCase()
.split(' ')
.slice(0, 2)
.join('_')
.replace(/[^A-Z0-9_]/g, '');
}
return 'UNKNOWN_EXCHANGE';
}
/**
* Generate aliases for the exchange
*/
function generateAliases(ibExchange: { name?: string; code?: string }): string[] {
const aliases: string[] = [];
if (ibExchange.name && ibExchange.name.includes(' ')) {
// Add abbreviated version
aliases.push(
ibExchange.name
.split(' ')
.map(w => w[0])
.join('')
.toUpperCase()
);
}
if (ibExchange.code) {
aliases.push(ibExchange.code.toUpperCase());
}
return aliases;
}
/**
* Infer timezone from exchange name/location
*/
function inferTimezone(ibExchange: { name?: string }): string {
if (!ibExchange.name) {
return 'UTC';
}
const name = ibExchange.name.toUpperCase();
if (name.includes('NEW YORK') || name.includes('NYSE') || name.includes('NASDAQ')) {
return 'America/New_York';
}
if (name.includes('LONDON')) {
return 'Europe/London';
}
if (name.includes('TOKYO')) {
return 'Asia/Tokyo';
}
if (name.includes('SHANGHAI')) {
return 'Asia/Shanghai';
}
if (name.includes('TORONTO')) {
return 'America/Toronto';
}
if (name.includes('FRANKFURT')) {
return 'Europe/Berlin';
}
return 'UTC'; // Default
}

View file

@ -75,10 +75,10 @@ export function initializeProxyProvider() {
type: 'proxy-fetch-and-check', type: 'proxy-fetch-and-check',
operation: 'fetch-from-sources', operation: 'fetch-from-sources',
payload: {}, payload: {},
cronPattern: '0 */2 * * *', // Every 2 hours cronPattern: '0 0 * * 0', // Every week at midnight on Sunday
priority: 5, priority: 5,
description: 'Fetch and validate proxy list from sources', description: 'Fetch and validate proxy list from sources',
immediately: true, // Don't run immediately during startup to avoid conflicts // immediately: true, // Don't run immediately during startup to avoid conflicts
}, },
], ],
}; };

View file

@ -0,0 +1,128 @@
/**
* Exchange Routes - Simple API endpoints for exchange management
*/
import { Hono } from 'hono';
import { getLogger } from '@stock-bot/logger';
import type { MasterExchange } from '@stock-bot/mongodb-client';
import { connectMongoDB, getDatabase } from '@stock-bot/mongodb-client';
import { queueManager } from '../index';
const logger = getLogger('exchange-routes');
export const exchangeRoutes = new Hono();
// Get master exchange details
exchangeRoutes.get('/api/exchanges/:masterExchangeId', async c => {
try {
const masterExchangeId = c.req.param('masterExchangeId');
await connectMongoDB();
const db = getDatabase();
const collection = db.collection<MasterExchange>('masterExchanges');
const exchange = await collection.findOne({ masterExchangeId });
if (!exchange) {
return c.json({ error: 'Exchange not found' }, 404);
}
return c.json({
status: 'success',
data: exchange,
});
} catch (error) {
logger.error('Error getting exchange details', { error });
return c.json({ error: 'Internal server error' }, 500);
}
});
// Get source mapping
exchangeRoutes.get('/api/exchanges/mapping/:sourceName/:sourceId', async c => {
try {
const sourceName = c.req.param('sourceName');
const sourceId = c.req.param('sourceId');
await connectMongoDB();
const db = getDatabase();
const collection = db.collection<MasterExchange>('masterExchanges');
const result = await collection.findOne(
{ [`sourceMappings.${sourceName}.id`]: sourceId },
{ projection: { masterExchangeId: 1 } }
);
if (!result) {
return c.json({ error: 'Mapping not found' }, 404);
}
return c.json({
status: 'success',
data: { masterExchangeId: result.masterExchangeId, sourceName, sourceId },
});
} catch (error) {
logger.error('Error getting exchange mapping', { error });
return c.json({ error: 'Internal server error' }, 500);
}
});
// Get all exchanges from a specific source
exchangeRoutes.get('/api/exchanges/source/:sourceName', async c => {
try {
const sourceName = c.req.param('sourceName');
await connectMongoDB();
const db = getDatabase();
const collection = db.collection<MasterExchange>('masterExchanges');
const results = await collection
.find(
{ [`sourceMappings.${sourceName}`]: { $exists: true } },
{
projection: {
masterExchangeId: 1,
[`sourceMappings.${sourceName}`]: 1,
},
}
)
.toArray();
const exchanges = results.map(result => ({
masterExchangeId: result.masterExchangeId,
sourceMapping: result.sourceMappings[sourceName],
}));
return c.json({
status: 'success',
data: {
sourceName,
count: exchanges.length,
exchanges,
},
});
} catch (error) {
logger.error('Error getting source exchanges', { error });
return c.json({ error: 'Internal server error' }, 500);
}
});
// Trigger exchange sync
exchangeRoutes.post('/api/exchanges/sync', async c => {
try {
const job = await queueManager.add('exchange-sync', {
type: 'exchange-sync',
provider: 'exchange-sync',
operation: 'sync-ib-exchanges',
payload: {},
priority: 2,
});
return c.json({
status: 'success',
message: 'IB exchange sync job queued',
jobId: job.id,
operation: 'sync-ib-exchanges',
});
} catch (error) {
logger.error('Error triggering exchange sync', { error });
return c.json({ error: 'Internal server error' }, 500);
}
});

View file

@ -1,5 +1,6 @@
/** /**
* Routes index - exports all route modules * Routes index - exports all route modules
*/ */
export { exchangeRoutes } from './exchange.routes';
export { healthRoutes } from './health.routes'; export { healthRoutes } from './health.routes';
export { queueRoutes } from './queue.routes'; export { queueRoutes } from './queue.routes';

View file

@ -12,6 +12,8 @@ export type {
AnalystReport, AnalystReport,
DocumentBase, DocumentBase,
EarningsTranscript, EarningsTranscript,
ExchangeSourceMapping,
MasterExchange,
NewsArticle, NewsArticle,
RawDocument, RawDocument,
SecFiling, SecFiling,

View file

@ -198,3 +198,34 @@ export interface AnalystReport extends DocumentBase {
key_points: string[]; key_points: string[];
financial_projections?: Record<string, number>; financial_projections?: Record<string, number>;
} }
/**
* Exchange-related types
*/
export interface ExchangeSourceMapping {
id: string;
name: string;
code?: string;
aliases?: string[];
lastUpdated: Date;
}
export interface MasterExchange extends DocumentBase {
masterExchangeId: string;
shortName?: string;
officialName: string;
country: string;
currency: string;
timezone: string;
active?: boolean;
tradingHours?: {
open: string;
close: string;
timezone: string;
};
sourceMappings: Record<string, ExchangeSourceMapping>;
confidence: number;
verified: boolean;
}