import { QueryResult as PgQueryResult, Pool, PoolClient, QueryResultRow } from 'pg'; import { postgresConfig } from '@stock-bot/config'; import { getLogger } from '@stock-bot/logger'; import { PostgreSQLHealthMonitor } from './health'; import { PostgreSQLQueryBuilder } from './query-builder'; import { PostgreSQLTransactionManager } from './transactions'; import type { PostgreSQLClientConfig, PostgreSQLConnectionOptions, QueryResult, TransactionCallback, } from './types'; /** * PostgreSQL Client for Stock Bot * * Provides type-safe access to PostgreSQL with connection pooling, * health monitoring, and transaction support. */ export class PostgreSQLClient { private pool: Pool | null = null; private readonly config: PostgreSQLClientConfig; private readonly options: PostgreSQLConnectionOptions; private readonly logger: ReturnType; private readonly healthMonitor: PostgreSQLHealthMonitor; private readonly transactionManager: PostgreSQLTransactionManager; private isConnected = false; constructor(config?: Partial, options?: PostgreSQLConnectionOptions) { this.config = this.buildConfig(config); this.options = { retryAttempts: 3, retryDelay: 1000, healthCheckInterval: 30000, ...options, }; this.logger = getLogger('postgres-client'); this.healthMonitor = new PostgreSQLHealthMonitor(this); this.transactionManager = new PostgreSQLTransactionManager(this); } /** * Connect to PostgreSQL */ async connect(): Promise { if (this.isConnected && this.pool) { return; } let lastError: Error | null = null; for (let attempt = 1; attempt <= this.options.retryAttempts!; attempt++) { try { this.logger.info( `Connecting to PostgreSQL (attempt ${attempt}/${this.options.retryAttempts})...` ); this.pool = new Pool(this.buildPoolConfig()); // Test the connection const client = await this.pool.connect(); await client.query('SELECT 1'); client.release(); this.isConnected = true; this.logger.info('Successfully connected to PostgreSQL'); // Start health monitoring this.healthMonitor.start(); // Setup error handlers this.setupErrorHandlers(); return; } catch (error) { lastError = error as Error; this.logger.error(`PostgreSQL connection attempt ${attempt} failed:`, error); if (this.pool) { await this.pool.end(); this.pool = null; } if (attempt < this.options.retryAttempts!) { await this.delay(this.options.retryDelay! * attempt); } } } throw new Error( `Failed to connect to PostgreSQL after ${this.options.retryAttempts} attempts: ${lastError?.message}` ); } /** * Disconnect from PostgreSQL */ async disconnect(): Promise { if (!this.pool) { return; } try { this.healthMonitor.stop(); await this.pool.end(); this.isConnected = false; this.pool = null; this.logger.info('Disconnected from PostgreSQL'); } catch (error) { this.logger.error('Error disconnecting from PostgreSQL:', error); throw error; } } /** * Execute a query */ async query( text: string, params?: any[] ): Promise> { if (!this.pool) { throw new Error('PostgreSQL client not connected'); } const startTime = Date.now(); try { const result = await this.pool.query(text, params); const executionTime = Date.now() - startTime; this.logger.debug(`Query executed in ${executionTime}ms`, { query: text.substring(0, 100), params: params?.length, }); return { ...result, executionTime, } as QueryResult; } catch (error) { const executionTime = Date.now() - startTime; this.logger.error(`Query failed after ${executionTime}ms:`, { error, query: text, params, }); throw error; } } /** * Execute multiple queries in a transaction */ async transaction(callback: TransactionCallback): Promise { return await this.transactionManager.execute(callback); } /** * Get a query builder instance */ queryBuilder(): PostgreSQLQueryBuilder { return new PostgreSQLQueryBuilder(this); } /** * Create a new query builder with SELECT */ select(columns: string | string[] = '*'): PostgreSQLQueryBuilder { return this.queryBuilder().select(columns); } /** * Create a new query builder with INSERT */ insert(table: string): PostgreSQLQueryBuilder { return this.queryBuilder().insert(table); } /** * Create a new query builder with UPDATE */ update(table: string): PostgreSQLQueryBuilder { return this.queryBuilder().update(table); } /** * Create a new query builder with DELETE */ delete(table: string): PostgreSQLQueryBuilder { return this.queryBuilder().delete(table); } /** * Execute a stored procedure or function */ async callFunction( functionName: string, params?: any[] ): Promise> { const placeholders = params ? params.map((_, i) => `$${i + 1}`).join(', ') : ''; const query = `SELECT * FROM ${functionName}(${placeholders})`; return await this.query(query, params); } /** * Check if a table exists */ async tableExists(tableName: string, schemaName: string = 'public'): Promise { const result = await this.query( `SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = $1 AND table_name = $2 )`, [schemaName, tableName] ); return result.rows[0].exists; } /** * Get table schema information */ async getTableSchema(tableName: string, schemaName: string = 'public'): Promise { const result = await this.query( `SELECT column_name, data_type, is_nullable, column_default, character_maximum_length FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2 ORDER BY ordinal_position`, [schemaName, tableName] ); return result.rows; } /** * Execute EXPLAIN for query analysis */ async explain(query: string, params?: any[]): Promise { const explainQuery = `EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) ${query}`; const result = await this.query(explainQuery, params); return result.rows[0]['QUERY PLAN']; } /** * Get database statistics */ async getStats(): Promise { const result = await this.query(` SELECT (SELECT count(*) FROM pg_stat_activity WHERE state = 'active') as active_connections, (SELECT count(*) FROM pg_stat_activity WHERE state = 'idle') as idle_connections, (SELECT setting FROM pg_settings WHERE name = 'max_connections') as max_connections, pg_size_pretty(pg_database_size(current_database())) as database_size `); return result.rows[0]; } /** * Check if client is connected */ get connected(): boolean { return this.isConnected && !!this.pool; } /** * Get the underlying connection pool */ get connectionPool(): Pool | null { return this.pool; } private buildConfig(config?: Partial): PostgreSQLClientConfig { return { host: config?.host || postgresConfig.POSTGRES_HOST, port: config?.port || postgresConfig.POSTGRES_PORT, database: config?.database || postgresConfig.POSTGRES_DATABASE, username: config?.username || postgresConfig.POSTGRES_USERNAME, password: config?.password || postgresConfig.POSTGRES_PASSWORD, poolSettings: { min: postgresConfig.POSTGRES_POOL_MIN, max: postgresConfig.POSTGRES_POOL_MAX, idleTimeoutMillis: postgresConfig.POSTGRES_POOL_IDLE_TIMEOUT, ...config?.poolSettings, }, ssl: { enabled: postgresConfig.POSTGRES_SSL, rejectUnauthorized: postgresConfig.POSTGRES_SSL_REJECT_UNAUTHORIZED, ...config?.ssl, }, timeouts: { query: postgresConfig.POSTGRES_QUERY_TIMEOUT, connection: postgresConfig.POSTGRES_CONNECTION_TIMEOUT, statement: postgresConfig.POSTGRES_STATEMENT_TIMEOUT, lock: postgresConfig.POSTGRES_LOCK_TIMEOUT, idleInTransaction: postgresConfig.POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT, ...config?.timeouts, }, }; } private buildPoolConfig(): any { return { host: this.config.host, port: this.config.port, database: this.config.database, user: this.config.username, password: this.config.password, min: this.config.poolSettings?.min, max: this.config.poolSettings?.max, idleTimeoutMillis: this.config.poolSettings?.idleTimeoutMillis, connectionTimeoutMillis: this.config.timeouts?.connection, query_timeout: this.config.timeouts?.query, statement_timeout: this.config.timeouts?.statement, lock_timeout: this.config.timeouts?.lock, idle_in_transaction_session_timeout: this.config.timeouts?.idleInTransaction, ssl: this.config.ssl?.enabled ? { rejectUnauthorized: this.config.ssl.rejectUnauthorized, } : false, }; } private setupErrorHandlers(): void { if (!this.pool) {return;} this.pool.on('error', error => { this.logger.error('PostgreSQL pool error:', error); }); this.pool.on('connect', () => { this.logger.debug('New PostgreSQL client connected'); }); this.pool.on('remove', () => { this.logger.debug('PostgreSQL client removed from pool'); }); } private delay(ms: number): Promise { return new Promise(resolve => setTimeout(resolve, ms)); } }