import { Pool } from 'pg'; import { questdbConfig } from '@stock-bot/config'; import { getLogger } from '@stock-bot/logger'; import { QuestDBHealthMonitor } from './health'; import { QuestDBInfluxWriter } from './influx-writer'; import { QuestDBQueryBuilder } from './query-builder'; import { QuestDBSchemaManager } from './schema'; import type { BaseTimeSeriesData, InsertResult, QueryResult, QuestDBClientConfig, QuestDBConnectionOptions, TableNames, } from './types'; /** * QuestDB Client for Stock Bot * * Provides high-performance time-series data access with support for * multiple protocols (HTTP, PostgreSQL, InfluxDB Line Protocol). */ export class QuestDBClient { private pgPool: Pool | null = null; private readonly config: QuestDBClientConfig; private readonly options: QuestDBConnectionOptions; private readonly logger = getLogger('QuestDBClient'); private readonly healthMonitor: QuestDBHealthMonitor; private readonly influxWriter: QuestDBInfluxWriter; private readonly schemaManager: QuestDBSchemaManager; private isConnected = false; constructor(config?: Partial, options?: QuestDBConnectionOptions) { this.config = this.buildConfig(config); this.options = { protocol: 'pg', retryAttempts: 3, retryDelay: 1000, healthCheckInterval: 30000, ...options, }; this.healthMonitor = new QuestDBHealthMonitor(this); this.influxWriter = new QuestDBInfluxWriter(this); this.schemaManager = new QuestDBSchemaManager(this); } /** * Connect to QuestDB */ async connect(): Promise { if (this.isConnected) { return; } let lastError: Error | null = null; for (let attempt = 1; attempt <= this.options.retryAttempts!; attempt++) { try { this.logger.info( `Connecting to QuestDB (attempt ${attempt}/${this.options.retryAttempts})...` ); // Connect via PostgreSQL wire protocol this.pgPool = new Pool(this.buildPgPoolConfig()); // Test the connection const client = await this.pgPool.connect(); await client.query('SELECT 1'); client.release(); this.isConnected = true; this.logger.info('Successfully connected to QuestDB'); // Initialize schema await this.schemaManager.initializeDatabase(); // Start health monitoring this.healthMonitor.startMonitoring(); return; } catch (error) { lastError = error as Error; this.logger.error(`QuestDB connection attempt ${attempt} failed:`, error); if (this.pgPool) { await this.pgPool.end(); this.pgPool = null; } if (attempt < this.options.retryAttempts!) { await this.delay(this.options.retryDelay! * attempt); } } } throw new Error( `Failed to connect to QuestDB after ${this.options.retryAttempts} attempts: ${lastError?.message}` ); } /** * Disconnect from QuestDB */ async disconnect(): Promise { if (!this.isConnected) { return; } try { this.healthMonitor.stopMonitoring(); if (this.pgPool) { await this.pgPool.end(); this.pgPool = null; } this.isConnected = false; this.logger.info('Disconnected from QuestDB'); } catch (error) { this.logger.error('Error disconnecting from QuestDB:', error); throw error; } } /** * Execute a SQL query */ async query(sql: string, params?: any[]): Promise> { if (!this.pgPool) { throw new Error('QuestDB client not connected'); } const startTime = Date.now(); try { const result = await this.pgPool.query(sql, params); const executionTime = Date.now() - startTime; this.logger.debug(`Query executed in ${executionTime}ms`, { query: sql.substring(0, 100), rowCount: result.rowCount, }); return { rows: result.rows, rowCount: result.rowCount || 0, executionTime, metadata: { columns: result.fields?.map((field: any) => ({ name: field.name, type: this.mapDataType(field.dataTypeID), })) || [], }, }; } catch (error) { const executionTime = Date.now() - startTime; this.logger.error(`Query failed after ${executionTime}ms:`, { error: (error as Error).message, query: sql, params, }); throw error; } } /** * Write OHLCV data using InfluxDB Line Protocol */ async writeOHLCV( symbol: string, exchange: string, data: Array<{ timestamp: Date; open: number; high: number; low: number; close: number; volume: number; }> ): Promise { return await this.influxWriter.writeOHLCV(symbol, exchange, data); } /** * Write market analytics data */ async writeMarketAnalytics( symbol: string, exchange: string, analytics: { timestamp: Date; rsi?: number; macd?: number; signal?: number; histogram?: number; bollinger_upper?: number; bollinger_lower?: number; volume_sma?: number; } ): Promise { return await this.influxWriter.writeMarketAnalytics(symbol, exchange, analytics); } /** * Get a query builder instance */ queryBuilder(): QuestDBQueryBuilder { return new QuestDBQueryBuilder(this); } /** * Create a SELECT query builder */ select(...columns: string[]): QuestDBQueryBuilder { return this.queryBuilder().select(...columns); } /** * Create an aggregation query builder */ aggregate(table: TableNames): QuestDBQueryBuilder { return this.queryBuilder().from(table); } /** * Execute a time-series specific query with SAMPLE BY */ async sampleBy( table: TableNames, columns: string[], interval: string, timeColumn: string = 'timestamp', where?: string, params?: any[] ): Promise> { const columnsStr = columns.join(', '); const whereClause = where ? `WHERE ${where}` : ''; const sql = ` SELECT ${columnsStr} FROM ${table} ${whereClause} SAMPLE BY ${interval} ALIGN TO CALENDAR `; return await this.query(sql, params); } /** * Get latest values by symbol using LATEST BY */ async latestBy( table: TableNames, columns: string | string[] = '*', keyColumns: string | string[] = 'symbol' ): Promise> { const columnsStr = Array.isArray(columns) ? columns.join(', ') : columns; const keyColumnsStr = Array.isArray(keyColumns) ? keyColumns.join(', ') : keyColumns; const sql = ` SELECT ${columnsStr} FROM ${table} LATEST BY ${keyColumnsStr} `; return await this.query(sql); } /** * Execute ASOF JOIN for time-series correlation */ async asofJoin( leftTable: TableNames, rightTable: TableNames, joinCondition: string, columns?: string[], where?: string, params?: any[] ): Promise> { const columnsStr = columns ? columns.join(', ') : '*'; const whereClause = where ? `WHERE ${where}` : ''; const sql = ` SELECT ${columnsStr} FROM ${leftTable} ASOF JOIN ${rightTable} ON ${joinCondition} ${whereClause} `; return await this.query(sql, params); } /** * Get database statistics */ async getStats(): Promise { const result = await this.query(` SELECT table_name, row_count, partition_count, size_bytes FROM tables() WHERE table_name NOT LIKE 'sys.%' ORDER BY row_count DESC `); return result.rows; } /** * Get table information */ async getTableInfo(tableName: string): Promise { const result = await this.query(`SELECT * FROM table_columns WHERE table_name = ?`, [ tableName, ]); return result.rows; } /** * Check if PostgreSQL pool is healthy */ isPgPoolHealthy(): boolean { return this.pgPool !== null && !this.pgPool.ended; } /** * Get HTTP endpoint URL */ getHttpUrl(): string { const protocol = this.config.tls?.enabled ? 'https' : 'http'; return `${protocol}://${this.config.host}:${this.config.httpPort}`; } /** * Get InfluxDB endpoint URL */ getInfluxUrl(): string { const protocol = this.config.tls?.enabled ? 'https' : 'http'; return `${protocol}://${this.config.host}:${this.config.influxPort}`; } /** * Get health monitor instance */ getHealthMonitor(): QuestDBHealthMonitor { return this.healthMonitor; } /** * Get schema manager instance */ getSchemaManager(): QuestDBSchemaManager { return this.schemaManager; } /** * Get InfluxDB writer instance */ getInfluxWriter(): QuestDBInfluxWriter { return this.influxWriter; } /** * Optimize table by rebuilding partitions */ async optimizeTable(tableName: string): Promise { await this.query(`VACUUM TABLE ${tableName}`); this.logger.info(`Optimized table: ${tableName}`); } /** * Create a table with time-series optimizations */ async createTable( tableName: string, columns: string, partitionBy: string = 'DAY', timestampColumn: string = 'timestamp' ): Promise { const sql = ` CREATE TABLE IF NOT EXISTS ${tableName} ( ${columns} ) TIMESTAMP(${timestampColumn}) PARTITION BY ${partitionBy} `; await this.query(sql); this.logger.info(`Created table: ${tableName}`); } /** * Check if client is connected */ get connected(): boolean { return this.isConnected && !!this.pgPool; } /** * Get the PostgreSQL connection pool */ get connectionPool(): Pool | null { return this.pgPool; } /** * Get configuration */ get configuration(): QuestDBClientConfig { return { ...this.config }; } private buildConfig(config?: Partial): QuestDBClientConfig { return { host: config?.host || questdbConfig.QUESTDB_HOST, httpPort: config?.httpPort || questdbConfig.QUESTDB_HTTP_PORT, pgPort: config?.pgPort || questdbConfig.QUESTDB_PG_PORT, influxPort: config?.influxPort || questdbConfig.QUESTDB_INFLUX_PORT, user: config?.user || questdbConfig.QUESTDB_USER, password: config?.password || questdbConfig.QUESTDB_PASSWORD, database: config?.database || questdbConfig.QUESTDB_DEFAULT_DATABASE, tls: { enabled: questdbConfig.QUESTDB_TLS_ENABLED, verifyServerCert: questdbConfig.QUESTDB_TLS_VERIFY_SERVER_CERT, ...config?.tls, }, timeouts: { connection: questdbConfig.QUESTDB_CONNECTION_TIMEOUT, request: questdbConfig.QUESTDB_REQUEST_TIMEOUT, ...config?.timeouts, }, retryAttempts: questdbConfig.QUESTDB_RETRY_ATTEMPTS, ...config, }; } private buildPgPoolConfig(): any { return { host: this.config.host, port: this.config.pgPort, database: this.config.database, user: this.config.user, password: this.config.password, connectionTimeoutMillis: this.config.timeouts?.connection, query_timeout: this.config.timeouts?.request, ssl: this.config.tls?.enabled ? { rejectUnauthorized: this.config.tls.verifyServerCert, } : false, min: 2, max: 10, }; } private mapDataType(typeId: number): string { // Map PostgreSQL type IDs to QuestDB types const typeMap: Record = { 16: 'BOOLEAN', 20: 'LONG', 21: 'INT', 23: 'INT', 25: 'STRING', 700: 'FLOAT', 701: 'DOUBLE', 1043: 'STRING', 1082: 'DATE', 1114: 'TIMESTAMP', 1184: 'TIMESTAMP', }; return typeMap[typeId] || 'STRING'; } private delay(ms: number): Promise { return new Promise(resolve => setTimeout(resolve, ms)); } }