414 lines
11 KiB
TypeScript
414 lines
11 KiB
TypeScript
import { Pool, QueryResultRow } from 'pg';
|
|
import { getLogger } from '@stock-bot/logger';
|
|
import { PostgreSQLHealthMonitor } from './health';
|
|
import { PostgreSQLQueryBuilder } from './query-builder';
|
|
import { PostgreSQLTransactionManager } from './transactions';
|
|
import type {
|
|
PostgreSQLClientConfig,
|
|
PostgreSQLConnectionOptions,
|
|
QueryResult,
|
|
TransactionCallback,
|
|
} from './types';
|
|
|
|
/**
|
|
* PostgreSQL Client for Stock Bot
|
|
*
|
|
* Provides type-safe access to PostgreSQL with connection pooling,
|
|
* health monitoring, and transaction support.
|
|
*/
|
|
export class PostgreSQLClient {
|
|
private pool: Pool | null = null;
|
|
private readonly config: PostgreSQLClientConfig;
|
|
private readonly options: PostgreSQLConnectionOptions;
|
|
private readonly logger: ReturnType<typeof getLogger>;
|
|
private readonly healthMonitor: PostgreSQLHealthMonitor;
|
|
private readonly transactionManager: PostgreSQLTransactionManager;
|
|
private isConnected = false;
|
|
|
|
constructor(config: PostgreSQLClientConfig, options?: PostgreSQLConnectionOptions) {
|
|
this.config = config;
|
|
this.options = {
|
|
retryAttempts: 3,
|
|
retryDelay: 1000,
|
|
healthCheckInterval: 30000,
|
|
...options,
|
|
};
|
|
|
|
this.logger = getLogger('postgres-client');
|
|
this.healthMonitor = new PostgreSQLHealthMonitor(this);
|
|
this.transactionManager = new PostgreSQLTransactionManager(this);
|
|
}
|
|
|
|
/**
|
|
* Connect to PostgreSQL
|
|
*/
|
|
async connect(): Promise<void> {
|
|
if (this.isConnected && this.pool) {
|
|
return;
|
|
}
|
|
|
|
let lastError: Error | null = null;
|
|
|
|
for (let attempt = 1; attempt <= this.options.retryAttempts!; attempt++) {
|
|
try {
|
|
this.logger.info(
|
|
`Connecting to PostgreSQL (attempt ${attempt}/${this.options.retryAttempts})...`
|
|
);
|
|
|
|
this.pool = new Pool(this.buildPoolConfig());
|
|
|
|
// Test the connection
|
|
const client = await this.pool.connect();
|
|
await client.query('SELECT 1');
|
|
client.release();
|
|
|
|
this.isConnected = true;
|
|
this.logger.info('Successfully connected to PostgreSQL');
|
|
|
|
// Start health monitoring
|
|
this.healthMonitor.start();
|
|
|
|
// Setup error handlers
|
|
this.setupErrorHandlers();
|
|
|
|
return;
|
|
} catch (error) {
|
|
lastError = error as Error;
|
|
this.logger.error(`PostgreSQL connection attempt ${attempt} failed:`, error);
|
|
|
|
if (this.pool) {
|
|
await this.pool.end();
|
|
this.pool = null;
|
|
}
|
|
|
|
if (attempt < this.options.retryAttempts!) {
|
|
await this.delay(this.options.retryDelay! * attempt);
|
|
}
|
|
}
|
|
}
|
|
|
|
throw new Error(
|
|
`Failed to connect to PostgreSQL after ${this.options.retryAttempts} attempts: ${lastError?.message}`
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Disconnect from PostgreSQL
|
|
*/
|
|
async disconnect(): Promise<void> {
|
|
if (!this.pool) {
|
|
return;
|
|
}
|
|
|
|
try {
|
|
this.healthMonitor.stop();
|
|
await this.pool.end();
|
|
this.isConnected = false;
|
|
this.pool = null;
|
|
this.logger.info('Disconnected from PostgreSQL');
|
|
} catch (error) {
|
|
this.logger.error('Error disconnecting from PostgreSQL:', error);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Execute a query
|
|
*/
|
|
async query<T extends QueryResultRow = any>(
|
|
text: string,
|
|
params?: any[]
|
|
): Promise<QueryResult<T>> {
|
|
if (!this.pool) {
|
|
throw new Error('PostgreSQL client not connected');
|
|
}
|
|
|
|
const startTime = Date.now();
|
|
|
|
try {
|
|
const result = await this.pool.query<T>(text, params);
|
|
const executionTime = Date.now() - startTime;
|
|
|
|
this.logger.debug(`Query executed in ${executionTime}ms`, {
|
|
query: text.substring(0, 100),
|
|
params: params?.length,
|
|
});
|
|
|
|
return {
|
|
...result,
|
|
executionTime,
|
|
} as QueryResult<T>;
|
|
} catch (error) {
|
|
const executionTime = Date.now() - startTime;
|
|
this.logger.error(`Query failed after ${executionTime}ms:`, {
|
|
error,
|
|
query: text,
|
|
params,
|
|
});
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Execute multiple queries in a transaction
|
|
*/
|
|
async transaction<T>(callback: TransactionCallback<T>): Promise<T> {
|
|
return await this.transactionManager.execute(callback);
|
|
}
|
|
|
|
/**
|
|
* Get a query builder instance
|
|
*/
|
|
queryBuilder(): PostgreSQLQueryBuilder {
|
|
return new PostgreSQLQueryBuilder(this);
|
|
}
|
|
|
|
/**
|
|
* Create a new query builder with SELECT
|
|
*/
|
|
select(columns: string | string[] = '*'): PostgreSQLQueryBuilder {
|
|
return this.queryBuilder().select(columns);
|
|
}
|
|
|
|
/**
|
|
* Create a new query builder with INSERT
|
|
*/
|
|
insert(table: string): PostgreSQLQueryBuilder {
|
|
return this.queryBuilder().insert(table);
|
|
}
|
|
|
|
/**
|
|
* Create a new query builder with UPDATE
|
|
*/
|
|
update(table: string): PostgreSQLQueryBuilder {
|
|
return this.queryBuilder().update(table);
|
|
}
|
|
|
|
/**
|
|
* Create a new query builder with DELETE
|
|
*/
|
|
delete(table: string): PostgreSQLQueryBuilder {
|
|
return this.queryBuilder().delete(table);
|
|
}
|
|
|
|
/**
|
|
* Execute a stored procedure or function
|
|
*/
|
|
async callFunction<T extends QueryResultRow = any>(
|
|
functionName: string,
|
|
params?: any[]
|
|
): Promise<QueryResult<T>> {
|
|
const placeholders = params ? params.map((_, i) => `$${i + 1}`).join(', ') : '';
|
|
const query = `SELECT * FROM ${functionName}(${placeholders})`;
|
|
return await this.query<T>(query, params);
|
|
}
|
|
|
|
/**
|
|
* Batch upsert operation for high-performance inserts/updates
|
|
*/
|
|
async batchUpsert(
|
|
tableName: string,
|
|
data: Record<string, unknown>[],
|
|
conflictColumn: string,
|
|
options: {
|
|
chunkSize?: number;
|
|
excludeColumns?: string[];
|
|
} = {}
|
|
): Promise<{ insertedCount: number; updatedCount: number }> {
|
|
if (!this.pool) {
|
|
throw new Error('PostgreSQL client not connected');
|
|
}
|
|
|
|
if (data.length === 0) {
|
|
return { insertedCount: 0, updatedCount: 0 };
|
|
}
|
|
|
|
const { chunkSize = 1000, excludeColumns = [] } = options;
|
|
const columns = Object.keys(data[0] ?? {}).filter(col => !excludeColumns.includes(col));
|
|
const updateColumns = columns.filter(col => col !== conflictColumn);
|
|
|
|
let totalInserted = 0;
|
|
let totalUpdated = 0;
|
|
|
|
// Process in chunks to avoid memory issues and parameter limits
|
|
for (let i = 0; i < data.length; i += chunkSize) {
|
|
const chunk = data.slice(i, i + chunkSize);
|
|
|
|
// Build placeholders for this chunk
|
|
const placeholders = chunk.map((_, rowIndex) => {
|
|
const rowPlaceholders = columns.map((_, colIndex) => {
|
|
return `$${rowIndex * columns.length + colIndex + 1}`;
|
|
});
|
|
return `(${rowPlaceholders.join(', ')})`;
|
|
});
|
|
|
|
// Flatten the chunk data
|
|
const values = chunk.flatMap(row => columns.map(col => row[col as keyof typeof row]));
|
|
|
|
// Build the upsert query
|
|
const updateClauses = updateColumns.map(col => `${col} = EXCLUDED.${col}`);
|
|
const query = `
|
|
INSERT INTO ${tableName} (${columns.join(', ')})
|
|
VALUES ${placeholders.join(', ')}
|
|
ON CONFLICT (${conflictColumn})
|
|
DO UPDATE SET
|
|
${updateClauses.join(', ')},
|
|
updated_at = NOW()
|
|
RETURNING (xmax = 0) AS is_insert
|
|
`;
|
|
|
|
try {
|
|
const startTime = Date.now();
|
|
const result = await this.pool.query(query, values);
|
|
const executionTime = Date.now() - startTime;
|
|
|
|
// Count inserts vs updates
|
|
const inserted = result.rows.filter((row: { is_insert: boolean }) => row.is_insert).length;
|
|
const updated = result.rows.length - inserted;
|
|
|
|
totalInserted += inserted;
|
|
totalUpdated += updated;
|
|
|
|
this.logger.debug(`Batch upsert chunk processed in ${executionTime}ms`, {
|
|
chunkSize: chunk.length,
|
|
inserted,
|
|
updated,
|
|
table: tableName,
|
|
});
|
|
} catch (error) {
|
|
this.logger.error(`Batch upsert failed on chunk ${Math.floor(i / chunkSize) + 1}:`, {
|
|
error,
|
|
table: tableName,
|
|
chunkStart: i,
|
|
chunkSize: chunk.length,
|
|
});
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
this.logger.info('Batch upsert completed', {
|
|
table: tableName,
|
|
totalRecords: data.length,
|
|
inserted: totalInserted,
|
|
updated: totalUpdated,
|
|
});
|
|
|
|
return { insertedCount: totalInserted, updatedCount: totalUpdated };
|
|
}
|
|
|
|
/**
|
|
* Check if a table exists
|
|
*/
|
|
async tableExists(tableName: string, schemaName: string = 'public'): Promise<boolean> {
|
|
const result = await this.query(
|
|
`SELECT EXISTS (
|
|
SELECT FROM information_schema.tables
|
|
WHERE table_schema = $1 AND table_name = $2
|
|
)`,
|
|
[schemaName, tableName]
|
|
);
|
|
return result.rows[0].exists;
|
|
}
|
|
|
|
/**
|
|
* Get table schema information
|
|
*/
|
|
async getTableSchema(tableName: string, schemaName: string = 'public'): Promise<any[]> {
|
|
const result = await this.query(
|
|
`SELECT
|
|
column_name,
|
|
data_type,
|
|
is_nullable,
|
|
column_default,
|
|
character_maximum_length
|
|
FROM information_schema.columns
|
|
WHERE table_schema = $1 AND table_name = $2
|
|
ORDER BY ordinal_position`,
|
|
[schemaName, tableName]
|
|
);
|
|
return result.rows;
|
|
}
|
|
|
|
/**
|
|
* Execute EXPLAIN for query analysis
|
|
*/
|
|
async explain(query: string, params?: any[]): Promise<any[]> {
|
|
const explainQuery = `EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) ${query}`;
|
|
const result = await this.query(explainQuery, params);
|
|
return result.rows[0]['QUERY PLAN'];
|
|
}
|
|
|
|
/**
|
|
* Get database statistics
|
|
*/
|
|
async getStats(): Promise<any> {
|
|
const result = await this.query(`
|
|
SELECT
|
|
(SELECT count(*) FROM pg_stat_activity WHERE state = 'active') as active_connections,
|
|
(SELECT count(*) FROM pg_stat_activity WHERE state = 'idle') as idle_connections,
|
|
(SELECT setting FROM pg_settings WHERE name = 'max_connections') as max_connections,
|
|
pg_size_pretty(pg_database_size(current_database())) as database_size
|
|
`);
|
|
return result.rows[0];
|
|
}
|
|
|
|
/**
|
|
* Check if client is connected
|
|
*/
|
|
get connected(): boolean {
|
|
return this.isConnected && !!this.pool;
|
|
}
|
|
|
|
/**
|
|
* Get the underlying connection pool
|
|
*/
|
|
get connectionPool(): Pool | null {
|
|
return this.pool;
|
|
}
|
|
|
|
|
|
private buildPoolConfig(): any {
|
|
return {
|
|
host: this.config.host,
|
|
port: this.config.port,
|
|
database: this.config.database,
|
|
user: this.config.username,
|
|
password: this.config.password,
|
|
min: this.config.poolSettings?.min,
|
|
max: this.config.poolSettings?.max,
|
|
idleTimeoutMillis: this.config.poolSettings?.idleTimeoutMillis,
|
|
connectionTimeoutMillis: this.config.timeouts?.connection,
|
|
query_timeout: this.config.timeouts?.query,
|
|
statement_timeout: this.config.timeouts?.statement,
|
|
lock_timeout: this.config.timeouts?.lock,
|
|
idle_in_transaction_session_timeout: this.config.timeouts?.idleInTransaction,
|
|
ssl: this.config.ssl?.enabled
|
|
? {
|
|
rejectUnauthorized: this.config.ssl.rejectUnauthorized,
|
|
}
|
|
: false,
|
|
};
|
|
}
|
|
|
|
private setupErrorHandlers(): void {
|
|
if (!this.pool) {
|
|
return;
|
|
}
|
|
|
|
this.pool.on('error', error => {
|
|
this.logger.error('PostgreSQL pool error:', error);
|
|
});
|
|
|
|
this.pool.on('connect', () => {
|
|
this.logger.debug('New PostgreSQL client connected');
|
|
});
|
|
|
|
this.pool.on('remove', () => {
|
|
this.logger.debug('PostgreSQL client removed from pool');
|
|
});
|
|
}
|
|
|
|
private delay(ms: number): Promise<void> {
|
|
return new Promise(resolve => setTimeout(resolve, ms));
|
|
}
|
|
}
|