stock-bot/database/postgres/scripts/setup-ib.ts

366 lines
11 KiB
TypeScript

#!/usr/bin/env bun
/**
* Interactive Brokers complete setup script
* Sets up schema and populates IB exchanges from ib-exchanges.json into PostgreSQL
*/
import { postgresConfig } from '@stock-bot/config';
import { getLogger } from '@stock-bot/logger';
import { PostgreSQLClient } from '@stock-bot/postgres-client';
import { readFileSync } from 'fs';
import { join } from 'path';
// Initialize logger
const logger = getLogger('ib-setup');
// Type definitions based on the JSON structure
interface IBExchange {
id: string;
name: string;
country: string;
region: string;
assets: string;
country_code: string;
}
async function connectToDatabase(): Promise<PostgreSQLClient> {
logger.info('Connecting to PostgreSQL', {
host: postgresConfig.POSTGRES_HOST,
port: postgresConfig.POSTGRES_PORT,
database: postgresConfig.POSTGRES_DATABASE
});
try {
const client = new PostgreSQLClient();
await client.connect();
logger.info('Connected to PostgreSQL database');
// Test the connection
const result = await client.query('SELECT version()');
const version = result.rows[0].version.split(' ')[0];
logger.info('PostgreSQL connection verified', { version });
return client;
} catch (error) {
logger.error('Failed to connect to PostgreSQL', { error });
throw error;
}
}
async function runSchemaSetup(client: PostgreSQLClient) {
try {
logger.info('Loading schema SQL file');
const schemaPath = join(process.cwd(), 'database/postgres/providers/01-ib.sql');
const schemaSql = readFileSync(schemaPath, 'utf-8');
logger.info('Executing schema setup');
// Execute the entire SQL file as one statement to handle multi-line functions
try {
await client.query(schemaSql);
logger.info('Schema setup completed successfully');
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
// Check if it's just "already exists" errors
if (errorMessage.includes('already exists')) {
logger.info('Schema setup completed (some objects already existed)');
} else {
logger.error('Error executing schema setup', { error: errorMessage });
throw error;
}
}
// Verify the setup
await verifySchemaSetup(client);
} catch (error) {
logger.error('Schema setup failed', { error });
throw error;
}
}
async function verifySchemaSetup(client: PostgreSQLClient) {
logger.info('Verifying schema setup');
try {
// Check if schema exists
const schemaCheck = await client.query(`
SELECT schema_name
FROM information_schema.schemata
WHERE schema_name = 'ib_data'
`);
if (schemaCheck.rows.length === 0) {
throw new Error('ib_data schema was not created');
}
// Check tables
const tableCheck = await client.query(`
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'ib_data'
ORDER BY table_name
`);
const actualTables = tableCheck.rows.map((row: any) => row.table_name);
// Check functions
const functionCheck = await client.query(`
SELECT routine_name
FROM information_schema.routines
WHERE routine_schema = 'ib_data'
ORDER BY routine_name
`);
const functions = functionCheck.rows.map((row: any) => row.routine_name);
logger.info('Schema verification completed', {
schema: 'ib_data',
tables: actualTables,
functions: functions
});
} catch (error) {
logger.error('Schema verification failed', { error });
throw error;
}
}
async function loadExchangesData(): Promise<IBExchange[]> {
try {
// Look for the JSON file in the project root
const jsonPath = join(process.cwd(), 'apps/data-service/src/setup/ib-exchanges.json');
logger.info('Loading exchanges from file', { path: jsonPath });
const jsonData = readFileSync(jsonPath, 'utf-8');
// Remove comment lines if they exist
const cleanJsonData = jsonData.replace(/^\/\/.*$/gm, '');
const exchanges: IBExchange[] = JSON.parse(cleanJsonData);
// Filter out incomplete entries and deduplicate by exchange code
const validExchanges = exchanges.filter(exchange =>
exchange.id &&
exchange.name &&
exchange.country_code &&
exchange.id.trim() !== '' &&
exchange.name.trim() !== '' &&
exchange.country_code.trim() !== ''
);
// Deduplicate by exchange code (keep the first occurrence)
const exchangeMap = new Map<string, IBExchange>();
validExchanges.forEach(exchange => {
if (!exchangeMap.has(exchange.id)) {
exchangeMap.set(exchange.id, exchange);
}
});
const uniqueExchanges = Array.from(exchangeMap.values());
logger.info('Exchanges loaded successfully', {
totalExchanges: exchanges.length,
validExchanges: validExchanges.length,
uniqueExchanges: uniqueExchanges.length,
duplicatesRemoved: validExchanges.length - uniqueExchanges.length,
filteredOut: exchanges.length - validExchanges.length
});
if (validExchanges.length !== exchanges.length) {
logger.warn('Some exchanges were filtered out due to incomplete data', {
filteredCount: exchanges.length - validExchanges.length
});
}
if (uniqueExchanges.length !== validExchanges.length) {
logger.warn('Duplicate exchange codes found and removed', {
duplicateCount: validExchanges.length - uniqueExchanges.length
});
}
return uniqueExchanges;
} catch (error) {
logger.error('Error loading exchanges JSON', { error });
throw error;
}
}
async function populateExchanges(client: PostgreSQLClient, exchanges: IBExchange[]): Promise<void> {
logger.info('Starting batch exchange population', {
totalExchanges: exchanges.length
});
try {
// Use the new batchUpsert method for fast population
const result = await client.batchUpsert(
'ib_data.exchanges',
exchanges.map(ex => ({
exchange_code: ex.id,
exchange_name: ex.name,
country: ex.country || null,
region: ex.region || null,
country_code: ex.country_code,
assets: ex.assets || null
})),
'exchange_code',
{ chunkSize: 100 }
);
logger.info('Batch exchange population completed', {
insertedCount: result.insertedCount,
updatedCount: result.updatedCount,
totalProcessed: result.insertedCount + result.updatedCount
});
} catch (error) {
logger.error('Batch exchange population failed', { error });
throw error;
}
}
async function verifyData(client: PostgreSQLClient) {
logger.info('Verifying populated data');
try {
// Count exchanges
const exchangeCount = await client.query(`
SELECT COUNT(*) as count FROM ib_data.exchanges
`);
// Get exchanges by region
const regionStats = await client.query(`
SELECT region, COUNT(*) as count
FROM ib_data.exchanges
WHERE region IS NOT NULL
GROUP BY region
ORDER BY count DESC
`);
// Get sample exchanges
const sampleExchanges = await client.query(`
SELECT
exchange_code,
exchange_name,
country,
region,
country_code,
assets
FROM ib_data.exchanges
ORDER BY exchange_code
LIMIT 10
`);
const totalExchanges = exchangeCount.rows[0].count;
logger.info('Data verification completed', { totalExchanges });
if (regionStats.rows.length > 0) {
logger.info('Exchanges by region', {
regions: regionStats.rows.map((row: any) => ({
region: row.region,
count: row.count
}))
});
}
logger.info('Sample exchanges', {
samples: sampleExchanges.rows.slice(0, 5).map((row: any) => ({
code: row.exchange_code,
name: row.exchange_name,
country: row.country,
region: row.region,
assets: row.assets
}))
});
} catch (error) {
logger.error('Data verification failed', { error });
throw error;
}
}
async function main() {
logger.info('Starting Interactive Brokers complete setup (schema + data)');
logger.info('Database configuration', {
database: postgresConfig.POSTGRES_DATABASE,
host: postgresConfig.POSTGRES_HOST,
port: postgresConfig.POSTGRES_PORT,
user: postgresConfig.POSTGRES_USERNAME,
ssl: postgresConfig.POSTGRES_SSL
});
let client: PostgreSQLClient | null = null;
try {
// Connect to database
client = await connectToDatabase();
// Step 1: Setup schema
logger.info('Step 1: Setting up database schema');
await runSchemaSetup(client);
// Step 2: Load exchange data
logger.info('Step 2: Loading exchange data');
const exchanges = await loadExchangesData();
if (exchanges.length === 0) {
logger.warn('No valid exchanges found to process');
return;
}
// Step 3: Populate exchanges with batch upsert
logger.info('Step 3: Populating exchanges (batch mode)');
await populateExchanges(client, exchanges);
// Step 4: Verify the data
logger.info('Step 4: Verifying setup and data');
await verifyData(client);
logger.info('Interactive Brokers setup completed successfully');
logger.info('Next steps', {
suggestions: [
'Start your data service',
'Begin collecting market data',
'Connect to Interactive Brokers API'
]
});
} catch (error: unknown) {
logger.error('IB setup failed', { error });
// Provide helpful error messages
if (error && typeof error === 'object' && 'code' in error && error.code === 'ECONNREFUSED') {
logger.error('Database connection refused', {
troubleshooting: [
'Make sure PostgreSQL is running',
'Check your database configuration in .env file',
'Verify the database connection details'
]
});
} else if (error && typeof error === 'object' && 'message' in error &&
typeof error.message === 'string' &&
error.message.includes('database') &&
error.message.includes('does not exist')) {
logger.error('Database does not exist', {
suggestion: `Create database first: createdb ${postgresConfig.POSTGRES_DATABASE}`
});
}
process.exit(1);
} finally {
if (client) {
await client.disconnect();
logger.info('Database connection closed');
}
}
}
// Run the script
if (import.meta.main) {
main().catch((error) => {
console.error('IB setup script failed:', error);
process.exit(1);
});
}
export { main as setupIB };