import { Pool, QueryResultRow } from 'pg'; import { getLogger } from '@stock-bot/logger'; import { PostgreSQLHealthMonitor } from './health'; import { PostgreSQLQueryBuilder } from './query-builder'; import { PostgreSQLTransactionManager } from './transactions'; import type { PostgreSQLClientConfig, PostgreSQLConnectionOptions, QueryResult, TransactionCallback, } from './types'; /** * PostgreSQL Client for Stock Bot * * Provides type-safe access to PostgreSQL with connection pooling, * health monitoring, and transaction support. */ export class PostgreSQLClient { private pool: Pool | null = null; private readonly config: PostgreSQLClientConfig; private readonly options: PostgreSQLConnectionOptions; private readonly logger: ReturnType; private readonly healthMonitor: PostgreSQLHealthMonitor; private readonly transactionManager: PostgreSQLTransactionManager; private isConnected = false; constructor(config: PostgreSQLClientConfig, options?: PostgreSQLConnectionOptions) { this.config = config; this.options = { retryAttempts: 3, retryDelay: 1000, healthCheckInterval: 30000, ...options, }; this.logger = getLogger('postgres-client'); this.healthMonitor = new PostgreSQLHealthMonitor(this); this.transactionManager = new PostgreSQLTransactionManager(this); } /** * Connect to PostgreSQL */ async connect(): Promise { if (this.isConnected && this.pool) { return; } let lastError: Error | null = null; for (let attempt = 1; attempt <= (this.options.retryAttempts ?? 3); attempt++) { try { this.logger.info( `Connecting to PostgreSQL (attempt ${attempt}/${this.options.retryAttempts})...` ); this.pool = new Pool(this.buildPoolConfig()); // Test the connection const client = await this.pool.connect(); await client.query('SELECT 1'); client.release(); this.isConnected = true; this.logger.info('Successfully connected to PostgreSQL'); // Start health monitoring this.healthMonitor.start(); // Setup error handlers this.setupErrorHandlers(); return; } catch (error) { lastError = error as Error; this.logger.error(`PostgreSQL connection attempt ${attempt} failed:`, error); if (this.pool) { await this.pool.end(); this.pool = null; } if (attempt < (this.options.retryAttempts ?? 3)) { await this.delay((this.options.retryDelay ?? 1000) * attempt); } } } throw new Error( `Failed to connect to PostgreSQL after ${this.options.retryAttempts} attempts: ${lastError?.message}` ); } /** * Disconnect from PostgreSQL */ async disconnect(): Promise { if (!this.pool) { return; } try { this.healthMonitor.stop(); await this.pool.end(); this.isConnected = false; this.pool = null; this.logger.info('Disconnected from PostgreSQL'); } catch (error) { this.logger.error('Error disconnecting from PostgreSQL:', error); throw error; } } /** * Execute a query */ async query( text: string, params?: any[] ): Promise> { if (!this.pool) { throw new Error('PostgreSQL client not connected'); } const startTime = Date.now(); try { const result = await this.pool.query(text, params); const executionTime = Date.now() - startTime; this.logger.debug(`Query executed in ${executionTime}ms`, { query: text.substring(0, 100), params: params?.length, }); return { ...result, executionTime, } as QueryResult; } catch (error) { const executionTime = Date.now() - startTime; this.logger.error(`Query failed after ${executionTime}ms:`, { error, query: text, params, }); throw error; } } /** * Execute multiple queries in a transaction */ async transaction(callback: TransactionCallback): Promise { return await this.transactionManager.execute(callback); } /** * Get a query builder instance */ queryBuilder(): PostgreSQLQueryBuilder { return new PostgreSQLQueryBuilder(this); } /** * Create a new query builder with SELECT */ select(columns: string | string[] = '*'): PostgreSQLQueryBuilder { return this.queryBuilder().select(columns); } /** * Create a new query builder with INSERT */ insert(table: string): PostgreSQLQueryBuilder { return this.queryBuilder().insert(table); } /** * Create a new query builder with UPDATE */ update(table: string): PostgreSQLQueryBuilder { return this.queryBuilder().update(table); } /** * Create a new query builder with DELETE */ delete(table: string): PostgreSQLQueryBuilder { return this.queryBuilder().delete(table); } /** * Execute a stored procedure or function */ async callFunction( functionName: string, params?: any[] ): Promise> { const placeholders = params ? params.map((_, i) => `$${i + 1}`).join(', ') : ''; const query = `SELECT * FROM ${functionName}(${placeholders})`; return await this.query(query, params); } /** * Batch upsert operation for high-performance inserts/updates */ async batchUpsert( tableName: string, data: Record[], conflictColumn: string, options: { chunkSize?: number; excludeColumns?: string[]; } = {} ): Promise<{ insertedCount: number; updatedCount: number }> { if (!this.pool) { throw new Error('PostgreSQL client not connected'); } if (data.length === 0) { return { insertedCount: 0, updatedCount: 0 }; } const { chunkSize = 1000, excludeColumns = [] } = options; const columns = Object.keys(data[0] ?? {}).filter(col => !excludeColumns.includes(col)); const updateColumns = columns.filter(col => col !== conflictColumn); let totalInserted = 0; let totalUpdated = 0; // Process in chunks to avoid memory issues and parameter limits for (let i = 0; i < data.length; i += chunkSize) { const chunk = data.slice(i, i + chunkSize); // Build placeholders for this chunk const placeholders = chunk.map((_, rowIndex) => { const rowPlaceholders = columns.map((_, colIndex) => { return `$${rowIndex * columns.length + colIndex + 1}`; }); return `(${rowPlaceholders.join(', ')})`; }); // Flatten the chunk data const values = chunk.flatMap(row => columns.map(col => row[col as keyof typeof row])); // Build the upsert query const updateClauses = updateColumns.map(col => `${col} = EXCLUDED.${col}`); const query = ` INSERT INTO ${tableName} (${columns.join(', ')}) VALUES ${placeholders.join(', ')} ON CONFLICT (${conflictColumn}) DO UPDATE SET ${updateClauses.join(', ')}, updated_at = NOW() RETURNING (xmax = 0) AS is_insert `; try { const startTime = Date.now(); const result = await this.pool.query(query, values); const executionTime = Date.now() - startTime; // Count inserts vs updates const inserted = result.rows.filter((row: { is_insert: boolean }) => row.is_insert).length; const updated = result.rows.length - inserted; totalInserted += inserted; totalUpdated += updated; this.logger.debug(`Batch upsert chunk processed in ${executionTime}ms`, { chunkSize: chunk.length, inserted, updated, table: tableName, }); } catch (error) { this.logger.error(`Batch upsert failed on chunk ${Math.floor(i / chunkSize) + 1}:`, { error, table: tableName, chunkStart: i, chunkSize: chunk.length, }); throw error; } } this.logger.info('Batch upsert completed', { table: tableName, totalRecords: data.length, inserted: totalInserted, updated: totalUpdated, }); return { insertedCount: totalInserted, updatedCount: totalUpdated }; } /** * Check if a table exists */ async tableExists(tableName: string, schemaName: string = 'public'): Promise { const result = await this.query( `SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = $1 AND table_name = $2 )`, [schemaName, tableName] ); return result.rows[0].exists; } /** * Get table schema information */ async getTableSchema(tableName: string, schemaName: string = 'public'): Promise { const result = await this.query( `SELECT column_name, data_type, is_nullable, column_default, character_maximum_length FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2 ORDER BY ordinal_position`, [schemaName, tableName] ); return result.rows; } /** * Execute EXPLAIN for query analysis */ async explain(query: string, params?: any[]): Promise { const explainQuery = `EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) ${query}`; const result = await this.query(explainQuery, params); return result.rows[0]['QUERY PLAN']; } /** * Get database statistics */ async getStats(): Promise { const result = await this.query(` SELECT (SELECT count(*) FROM pg_stat_activity WHERE state = 'active') as active_connections, (SELECT count(*) FROM pg_stat_activity WHERE state = 'idle') as idle_connections, (SELECT setting FROM pg_settings WHERE name = 'max_connections') as max_connections, pg_size_pretty(pg_database_size(current_database())) as database_size `); return result.rows[0]; } /** * Check if client is connected */ get connected(): boolean { return this.isConnected && !!this.pool; } /** * Get the underlying connection pool */ get connectionPool(): Pool | null { return this.pool; } private buildPoolConfig(): any { return { host: this.config.host, port: this.config.port, database: this.config.database, user: this.config.username, password: this.config.password, min: this.config.poolSettings?.min, max: this.config.poolSettings?.max, idleTimeoutMillis: this.config.poolSettings?.idleTimeoutMillis, connectionTimeoutMillis: this.config.timeouts?.connection, query_timeout: this.config.timeouts?.query, statement_timeout: this.config.timeouts?.statement, lock_timeout: this.config.timeouts?.lock, idle_in_transaction_session_timeout: this.config.timeouts?.idleInTransaction, ssl: this.config.ssl?.enabled ? { rejectUnauthorized: this.config.ssl.rejectUnauthorized, } : false, }; } private setupErrorHandlers(): void { if (!this.pool) { return; } this.pool.on('error', error => { this.logger.error('PostgreSQL pool error:', error); }); this.pool.on('connect', () => { this.logger.debug('New PostgreSQL client connected'); }); this.pool.on('remove', () => { this.logger.debug('PostgreSQL client removed from pool'); }); } private delay(ms: number): Promise { return new Promise(resolve => setTimeout(resolve, ms)); } }