stock-bot/libs/data/postgres/src/client.ts
2025-06-22 08:45:14 -04:00

604 lines
17 KiB
TypeScript

import { Pool, QueryResultRow } from 'pg';
import { PostgreSQLHealthMonitor } from './health';
import { PostgreSQLQueryBuilder } from './query-builder';
import { PostgreSQLTransactionManager } from './transactions';
import type {
PostgreSQLClientConfig,
PostgreSQLConnectionOptions,
QueryResult,
TransactionCallback,
PoolMetrics,
ConnectionEvents,
DynamicPoolConfig,
} from './types';
/**
* PostgreSQL Client for Stock Bot
*
* Provides type-safe access to PostgreSQL with connection pooling,
* health monitoring, and transaction support.
*/
export class PostgreSQLClient {
private pool: Pool | null = null;
private readonly config: PostgreSQLClientConfig;
private readonly options: PostgreSQLConnectionOptions;
private readonly logger: any;
private readonly healthMonitor: PostgreSQLHealthMonitor;
private readonly transactionManager: PostgreSQLTransactionManager;
private isConnected = false;
private readonly metrics: PoolMetrics;
private readonly events?: ConnectionEvents;
private dynamicPoolConfig?: DynamicPoolConfig;
private poolMonitorInterval?: NodeJS.Timeout;
constructor(config: PostgreSQLClientConfig, logger?: any, options?: PostgreSQLConnectionOptions, events?: ConnectionEvents) {
this.config = config;
this.options = {
retryAttempts: 3,
retryDelay: 1000,
healthCheckInterval: 30000,
...options,
};
this.events = events;
this.logger = logger || console;
this.healthMonitor = new PostgreSQLHealthMonitor(this);
this.transactionManager = new PostgreSQLTransactionManager(this);
this.metrics = {
totalConnections: 0,
activeConnections: 0,
idleConnections: 0,
waitingRequests: 0,
errors: 0,
created: new Date(),
};
}
/**
* Connect to PostgreSQL
*/
async connect(): Promise<void> {
if (this.isConnected && this.pool) {
return;
}
let lastError: Error | null = null;
for (let attempt = 1; attempt <= (this.options.retryAttempts ?? 3); attempt++) {
try {
this.logger.info(
`Connecting to PostgreSQL (attempt ${attempt}/${this.options.retryAttempts})...`
);
this.pool = new Pool(this.buildPoolConfig());
// Test the connection
const client = await this.pool.connect();
await client.query('SELECT 1');
client.release();
this.isConnected = true;
// Update metrics
const poolConfig = this.config.poolSettings;
this.metrics.totalConnections = poolConfig?.max || 10;
this.metrics.idleConnections = poolConfig?.min || 2;
// Fire connection event
if (this.events?.onConnect) {
await Promise.resolve(this.events.onConnect());
}
// Fire pool created event
if (this.events?.onPoolCreated) {
await Promise.resolve(this.events.onPoolCreated());
}
this.logger.info('Successfully connected to PostgreSQL', {
poolSize: this.metrics.totalConnections,
});
// Start health monitoring
this.healthMonitor.start();
// Setup error handlers
this.setupErrorHandlers();
// Setup pool event listeners for metrics
this.setupPoolMetrics();
// Start dynamic pool monitoring if enabled
if (this.dynamicPoolConfig?.enabled) {
this.startPoolMonitoring();
}
return;
} catch (error) {
lastError = error as Error;
this.metrics.errors++;
this.metrics.lastError = lastError.message;
// Fire error event
if (this.events?.onError) {
await Promise.resolve(this.events.onError(lastError));
}
this.logger.error(`PostgreSQL connection attempt ${attempt} failed:`, error);
if (this.pool) {
await this.pool.end();
this.pool = null;
}
if (attempt < (this.options.retryAttempts ?? 3)) {
await this.delay((this.options.retryDelay ?? 1000) * attempt);
}
}
}
throw new Error(
`Failed to connect to PostgreSQL after ${this.options.retryAttempts} attempts: ${lastError?.message}`
);
}
/**
* Disconnect from PostgreSQL
*/
async disconnect(): Promise<void> {
if (!this.pool) {
return;
}
try {
// Stop pool monitoring
if (this.poolMonitorInterval) {
clearInterval(this.poolMonitorInterval);
this.poolMonitorInterval = undefined;
}
this.healthMonitor.stop();
await this.pool.end();
this.isConnected = false;
this.pool = null;
// Fire disconnect event
if (this.events?.onDisconnect) {
await Promise.resolve(this.events.onDisconnect());
}
this.logger.info('Disconnected from PostgreSQL');
} catch (error) {
this.logger.error('Error disconnecting from PostgreSQL:', error);
throw error;
}
}
/**
* Execute a query
*/
async query<T extends QueryResultRow = any>(
text: string,
params?: any[]
): Promise<QueryResult<T>> {
if (!this.pool) {
throw new Error('PostgreSQL client not connected');
}
const startTime = Date.now();
try {
const result = await this.pool.query<T>(text, params);
const executionTime = Date.now() - startTime;
this.logger.debug(`Query executed in ${executionTime}ms`, {
query: text.substring(0, 100),
params: params?.length,
});
return {
...result,
executionTime,
} as QueryResult<T>;
} catch (error) {
const executionTime = Date.now() - startTime;
this.logger.error(`Query failed after ${executionTime}ms:`, {
error,
query: text,
params,
});
throw error;
}
}
/**
* Execute multiple queries in a transaction
*/
async transaction<T>(callback: TransactionCallback<T>): Promise<T> {
return await this.transactionManager.execute(callback);
}
/**
* Get a query builder instance
*/
queryBuilder(): PostgreSQLQueryBuilder {
return new PostgreSQLQueryBuilder(this);
}
/**
* Create a new query builder with SELECT
*/
select(columns: string | string[] = '*'): PostgreSQLQueryBuilder {
return this.queryBuilder().select(columns);
}
/**
* Create a new query builder with INSERT
*/
insert(table: string): PostgreSQLQueryBuilder {
return this.queryBuilder().insert(table);
}
/**
* Create a new query builder with UPDATE
*/
update(table: string): PostgreSQLQueryBuilder {
return this.queryBuilder().update(table);
}
/**
* Create a new query builder with DELETE
*/
delete(table: string): PostgreSQLQueryBuilder {
return this.queryBuilder().delete(table);
}
/**
* Execute a stored procedure or function
*/
async callFunction<T extends QueryResultRow = any>(
functionName: string,
params?: any[]
): Promise<QueryResult<T>> {
const placeholders = params ? params.map((_, i) => `$${i + 1}`).join(', ') : '';
const query = `SELECT * FROM ${functionName}(${placeholders})`;
return await this.query<T>(query, params);
}
/**
* Batch upsert operation for high-performance inserts/updates
*/
async batchUpsert(
tableName: string,
data: Record<string, unknown>[],
conflictColumn: string,
options: {
chunkSize?: number;
excludeColumns?: string[];
} = {}
): Promise<{ insertedCount: number; updatedCount: number }> {
if (!this.pool) {
throw new Error('PostgreSQL client not connected');
}
if (data.length === 0) {
return { insertedCount: 0, updatedCount: 0 };
}
const { chunkSize = 1000, excludeColumns = [] } = options;
const columns = Object.keys(data[0] ?? {}).filter(col => !excludeColumns.includes(col));
const updateColumns = columns.filter(col => col !== conflictColumn);
let totalInserted = 0;
let totalUpdated = 0;
// Process in chunks to avoid memory issues and parameter limits
for (let i = 0; i < data.length; i += chunkSize) {
const chunk = data.slice(i, i + chunkSize);
// Build placeholders for this chunk
const placeholders = chunk.map((_, rowIndex) => {
const rowPlaceholders = columns.map((_, colIndex) => {
return `$${rowIndex * columns.length + colIndex + 1}`;
});
return `(${rowPlaceholders.join(', ')})`;
});
// Flatten the chunk data
const values = chunk.flatMap(row => columns.map(col => row[col as keyof typeof row]));
// Build the upsert query
const updateClauses = updateColumns.map(col => `${col} = EXCLUDED.${col}`);
const query = `
INSERT INTO ${tableName} (${columns.join(', ')})
VALUES ${placeholders.join(', ')}
ON CONFLICT (${conflictColumn})
DO UPDATE SET
${updateClauses.join(', ')},
updated_at = NOW()
RETURNING (xmax = 0) AS is_insert
`;
try {
const startTime = Date.now();
const result = await this.pool.query(query, values);
const executionTime = Date.now() - startTime;
// Count inserts vs updates
const inserted = result.rows.filter((row: { is_insert: boolean }) => row.is_insert).length;
const updated = result.rows.length - inserted;
totalInserted += inserted;
totalUpdated += updated;
this.logger.debug(`Batch upsert chunk processed in ${executionTime}ms`, {
chunkSize: chunk.length,
inserted,
updated,
table: tableName,
});
} catch (error) {
this.logger.error(`Batch upsert failed on chunk ${Math.floor(i / chunkSize) + 1}:`, {
error,
table: tableName,
chunkStart: i,
chunkSize: chunk.length,
});
throw error;
}
}
this.logger.info('Batch upsert completed', {
table: tableName,
totalRecords: data.length,
inserted: totalInserted,
updated: totalUpdated,
});
return { insertedCount: totalInserted, updatedCount: totalUpdated };
}
/**
* Check if a table exists
*/
async tableExists(tableName: string, schemaName: string = 'public'): Promise<boolean> {
const result = await this.query(
`SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = $1 AND table_name = $2
)`,
[schemaName, tableName]
);
return result.rows[0].exists;
}
/**
* Get table schema information
*/
async getTableSchema(tableName: string, schemaName: string = 'public'): Promise<any[]> {
const result = await this.query(
`SELECT
column_name,
data_type,
is_nullable,
column_default,
character_maximum_length
FROM information_schema.columns
WHERE table_schema = $1 AND table_name = $2
ORDER BY ordinal_position`,
[schemaName, tableName]
);
return result.rows;
}
/**
* Execute EXPLAIN for query analysis
*/
async explain(query: string, params?: any[]): Promise<any[]> {
const explainQuery = `EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) ${query}`;
const result = await this.query(explainQuery, params);
return result.rows[0]['QUERY PLAN'];
}
/**
* Get database statistics
*/
async getStats(): Promise<any> {
const result = await this.query(`
SELECT
(SELECT count(*) FROM pg_stat_activity WHERE state = 'active') as active_connections,
(SELECT count(*) FROM pg_stat_activity WHERE state = 'idle') as idle_connections,
(SELECT setting FROM pg_settings WHERE name = 'max_connections') as max_connections,
pg_size_pretty(pg_database_size(current_database())) as database_size
`);
return result.rows[0];
}
/**
* Check if client is connected
*/
get connected(): boolean {
return this.isConnected && !!this.pool;
}
/**
* Get the underlying connection pool
*/
get connectionPool(): Pool | null {
return this.pool;
}
private buildPoolConfig(): any {
return {
host: this.config.host,
port: this.config.port,
database: this.config.database,
user: this.config.username,
password: this.config.password,
min: this.config.poolSettings?.min,
max: this.config.poolSettings?.max,
idleTimeoutMillis: this.config.poolSettings?.idleTimeoutMillis,
connectionTimeoutMillis: this.config.timeouts?.connection,
query_timeout: this.config.timeouts?.query,
statement_timeout: this.config.timeouts?.statement,
lock_timeout: this.config.timeouts?.lock,
idle_in_transaction_session_timeout: this.config.timeouts?.idleInTransaction,
ssl: this.config.ssl?.enabled
? {
rejectUnauthorized: this.config.ssl.rejectUnauthorized,
}
: false,
};
}
private setupErrorHandlers(): void {
if (!this.pool) {
return;
}
this.pool.on('error', error => {
this.logger.error('PostgreSQL pool error:', error);
});
this.pool.on('connect', () => {
this.logger.debug('New PostgreSQL client connected');
});
this.pool.on('remove', () => {
this.logger.debug('PostgreSQL client removed from pool');
});
}
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
/**
* Get current pool metrics
*/
getPoolMetrics(): PoolMetrics {
// Update last used timestamp
this.metrics.lastUsed = new Date();
// Update metrics from pool if available
if (this.pool) {
this.metrics.totalConnections = this.pool.totalCount;
this.metrics.idleConnections = this.pool.idleCount;
this.metrics.waitingRequests = this.pool.waitingCount;
this.metrics.activeConnections = this.metrics.totalConnections - this.metrics.idleConnections;
}
return { ...this.metrics };
}
/**
* Set dynamic pool configuration
*/
setDynamicPoolConfig(config: DynamicPoolConfig): void {
this.dynamicPoolConfig = config;
if (config.enabled && this.isConnected && !this.poolMonitorInterval) {
this.startPoolMonitoring();
} else if (!config.enabled && this.poolMonitorInterval) {
clearInterval(this.poolMonitorInterval);
this.poolMonitorInterval = undefined;
}
}
/**
* Start monitoring pool and adjust size dynamically
*/
private startPoolMonitoring(): void {
if (!this.dynamicPoolConfig || this.poolMonitorInterval) {
return;
}
this.poolMonitorInterval = setInterval(() => {
this.evaluatePoolSize();
}, this.dynamicPoolConfig.evaluationInterval);
}
/**
* Setup pool event listeners for metrics
*/
private setupPoolMetrics(): void {
if (!this.pool) {
return;
}
// Track when connections are acquired
this.pool.on('acquire', () => {
this.metrics.activeConnections++;
this.metrics.idleConnections--;
});
// Track when connections are released
this.pool.on('release', () => {
this.metrics.activeConnections--;
this.metrics.idleConnections++;
});
}
/**
* Evaluate and adjust pool size based on usage
*/
private async evaluatePoolSize(): Promise<void> {
if (!this.dynamicPoolConfig || !this.pool) {
return;
}
const metrics = this.getPoolMetrics();
const { minSize, maxSize, scaleUpThreshold, scaleDownThreshold } = this.dynamicPoolConfig;
const currentSize = metrics.totalConnections;
const utilization = currentSize > 0 ? ((metrics.activeConnections / currentSize) * 100) : 0;
this.logger.debug('Pool utilization', {
utilization: `${utilization.toFixed(1)}%`,
active: metrics.activeConnections,
total: currentSize,
waiting: metrics.waitingRequests,
});
// Scale up if utilization is high or there are waiting requests
if ((utilization > scaleUpThreshold || metrics.waitingRequests > 0) && currentSize < maxSize) {
const newSize = Math.min(currentSize + this.dynamicPoolConfig.scaleUpIncrement, maxSize);
this.logger.info('Would scale up connection pool', { from: currentSize, to: newSize, utilization });
// Note: pg module doesn't support dynamic resizing, would need reconnection
}
// Scale down if utilization is low
else if (utilization < scaleDownThreshold && currentSize > minSize) {
const newSize = Math.max(currentSize - this.dynamicPoolConfig.scaleDownIncrement, minSize);
this.logger.info('Would scale down connection pool', { from: currentSize, to: newSize, utilization });
// Note: pg module doesn't support dynamic resizing, would need reconnection
}
}
/**
* Enable pool warmup on connect
*/
async warmupPool(): Promise<void> {
if (!this.pool || !this.isConnected) {
throw new Error('Client not connected');
}
const minSize = this.config.poolSettings?.min || 2;
const promises: Promise<void>[] = [];
// Create minimum connections by running parallel queries
for (let i = 0; i < minSize; i++) {
promises.push(
this.pool.query('SELECT 1')
.then(() => {
this.logger.debug(`Warmed up connection ${i + 1}/${minSize}`);
})
.catch(error => {
this.logger.warn(`Failed to warm up connection ${i + 1}`, { error });
})
);
}
await Promise.allSettled(promises);
this.logger.info('Connection pool warmup complete', { connections: minSize });
}
}