stock-bot/libs/questdb-client/src/client.ts

476 lines
12 KiB
TypeScript

import { Pool } from 'pg';
import { questdbConfig } from '@stock-bot/config';
import { getLogger } from '@stock-bot/logger';
import { QuestDBHealthMonitor } from './health';
import { QuestDBInfluxWriter } from './influx-writer';
import { QuestDBQueryBuilder } from './query-builder';
import { QuestDBSchemaManager } from './schema';
import type {
BaseTimeSeriesData,
InsertResult,
QueryResult,
QuestDBClientConfig,
QuestDBConnectionOptions,
TableNames,
} from './types';
/**
* QuestDB Client for Stock Bot
*
* Provides high-performance time-series data access with support for
* multiple protocols (HTTP, PostgreSQL, InfluxDB Line Protocol).
*/
export class QuestDBClient {
private pgPool: Pool | null = null;
private readonly config: QuestDBClientConfig;
private readonly options: QuestDBConnectionOptions;
private readonly logger = getLogger('QuestDBClient');
private readonly healthMonitor: QuestDBHealthMonitor;
private readonly influxWriter: QuestDBInfluxWriter;
private readonly schemaManager: QuestDBSchemaManager;
private isConnected = false;
constructor(config?: Partial<QuestDBClientConfig>, options?: QuestDBConnectionOptions) {
this.config = this.buildConfig(config);
this.options = {
protocol: 'pg',
retryAttempts: 3,
retryDelay: 1000,
healthCheckInterval: 30000,
...options,
};
this.healthMonitor = new QuestDBHealthMonitor(this);
this.influxWriter = new QuestDBInfluxWriter(this);
this.schemaManager = new QuestDBSchemaManager(this);
}
/**
* Connect to QuestDB
*/
async connect(): Promise<void> {
if (this.isConnected) {
return;
}
let lastError: Error | null = null;
for (let attempt = 1; attempt <= this.options.retryAttempts!; attempt++) {
try {
this.logger.info(
`Connecting to QuestDB (attempt ${attempt}/${this.options.retryAttempts})...`
);
// Connect via PostgreSQL wire protocol
this.pgPool = new Pool(this.buildPgPoolConfig());
// Test the connection
const client = await this.pgPool.connect();
await client.query('SELECT 1');
client.release();
this.isConnected = true;
this.logger.info('Successfully connected to QuestDB');
// Initialize schema
await this.schemaManager.initializeDatabase();
// Start health monitoring
this.healthMonitor.startMonitoring();
return;
} catch (error) {
lastError = error as Error;
this.logger.error(`QuestDB connection attempt ${attempt} failed:`, error);
if (this.pgPool) {
await this.pgPool.end();
this.pgPool = null;
}
if (attempt < this.options.retryAttempts!) {
await this.delay(this.options.retryDelay! * attempt);
}
}
}
throw new Error(
`Failed to connect to QuestDB after ${this.options.retryAttempts} attempts: ${lastError?.message}`
);
}
/**
* Disconnect from QuestDB
*/
async disconnect(): Promise<void> {
if (!this.isConnected) {
return;
}
try {
this.healthMonitor.stopMonitoring();
if (this.pgPool) {
await this.pgPool.end();
this.pgPool = null;
}
this.isConnected = false;
this.logger.info('Disconnected from QuestDB');
} catch (error) {
this.logger.error('Error disconnecting from QuestDB:', error);
throw error;
}
}
/**
* Execute a SQL query
*/
async query<T = any>(sql: string, params?: any[]): Promise<QueryResult<T>> {
if (!this.pgPool) {
throw new Error('QuestDB client not connected');
}
const startTime = Date.now();
try {
const result = await this.pgPool.query(sql, params);
const executionTime = Date.now() - startTime;
this.logger.debug(`Query executed in ${executionTime}ms`, {
query: sql.substring(0, 100),
rowCount: result.rowCount,
});
return {
rows: result.rows,
rowCount: result.rowCount || 0,
executionTime,
metadata: {
columns:
result.fields?.map((field: any) => ({
name: field.name,
type: this.mapDataType(field.dataTypeID),
})) || [],
},
};
} catch (error) {
const executionTime = Date.now() - startTime;
this.logger.error(`Query failed after ${executionTime}ms:`, {
error: (error as Error).message,
query: sql,
params,
});
throw error;
}
}
/**
* Write OHLCV data using InfluxDB Line Protocol
*/
async writeOHLCV(
symbol: string,
exchange: string,
data: Array<{
timestamp: Date;
open: number;
high: number;
low: number;
close: number;
volume: number;
}>
): Promise<void> {
return await this.influxWriter.writeOHLCV(symbol, exchange, data);
}
/**
* Write market analytics data
*/
async writeMarketAnalytics(
symbol: string,
exchange: string,
analytics: {
timestamp: Date;
rsi?: number;
macd?: number;
signal?: number;
histogram?: number;
bollinger_upper?: number;
bollinger_lower?: number;
volume_sma?: number;
}
): Promise<void> {
return await this.influxWriter.writeMarketAnalytics(symbol, exchange, analytics);
}
/**
* Get a query builder instance
*/
queryBuilder(): QuestDBQueryBuilder {
return new QuestDBQueryBuilder(this);
}
/**
* Create a SELECT query builder
*/
select(...columns: string[]): QuestDBQueryBuilder {
return this.queryBuilder().select(...columns);
}
/**
* Create an aggregation query builder
*/
aggregate(table: TableNames): QuestDBQueryBuilder {
return this.queryBuilder().from(table);
}
/**
* Execute a time-series specific query with SAMPLE BY
*/
async sampleBy<T = any>(
table: TableNames,
columns: string[],
interval: string,
timeColumn: string = 'timestamp',
where?: string,
params?: any[]
): Promise<QueryResult<T>> {
const columnsStr = columns.join(', ');
const whereClause = where ? `WHERE ${where}` : '';
const sql = `
SELECT ${columnsStr}
FROM ${table}
${whereClause}
SAMPLE BY ${interval}
ALIGN TO CALENDAR
`;
return await this.query<T>(sql, params);
}
/**
* Get latest values by symbol using LATEST BY
*/
async latestBy<T = any>(
table: TableNames,
columns: string | string[] = '*',
keyColumns: string | string[] = 'symbol'
): Promise<QueryResult<T>> {
const columnsStr = Array.isArray(columns) ? columns.join(', ') : columns;
const keyColumnsStr = Array.isArray(keyColumns) ? keyColumns.join(', ') : keyColumns;
const sql = `
SELECT ${columnsStr}
FROM ${table}
LATEST BY ${keyColumnsStr}
`;
return await this.query<T>(sql);
}
/**
* Execute ASOF JOIN for time-series correlation
*/
async asofJoin<T = any>(
leftTable: TableNames,
rightTable: TableNames,
joinCondition: string,
columns?: string[],
where?: string,
params?: any[]
): Promise<QueryResult<T>> {
const columnsStr = columns ? columns.join(', ') : '*';
const whereClause = where ? `WHERE ${where}` : '';
const sql = `
SELECT ${columnsStr}
FROM ${leftTable}
ASOF JOIN ${rightTable} ON ${joinCondition}
${whereClause}
`;
return await this.query<T>(sql, params);
}
/**
* Get database statistics
*/
async getStats(): Promise<any> {
const result = await this.query(`
SELECT
table_name,
row_count,
partition_count,
size_bytes
FROM tables()
WHERE table_name NOT LIKE 'sys.%'
ORDER BY row_count DESC
`);
return result.rows;
}
/**
* Get table information
*/
async getTableInfo(tableName: string): Promise<any> {
const result = await this.query(`SELECT * FROM table_columns WHERE table_name = ?`, [
tableName,
]);
return result.rows;
}
/**
* Check if PostgreSQL pool is healthy
*/
isPgPoolHealthy(): boolean {
return this.pgPool !== null && !this.pgPool.ended;
}
/**
* Get HTTP endpoint URL
*/
getHttpUrl(): string {
const protocol = this.config.tls?.enabled ? 'https' : 'http';
return `${protocol}://${this.config.host}:${this.config.httpPort}`;
}
/**
* Get InfluxDB endpoint URL
*/
getInfluxUrl(): string {
const protocol = this.config.tls?.enabled ? 'https' : 'http';
return `${protocol}://${this.config.host}:${this.config.influxPort}`;
}
/**
* Get health monitor instance
*/
getHealthMonitor(): QuestDBHealthMonitor {
return this.healthMonitor;
}
/**
* Get schema manager instance
*/
getSchemaManager(): QuestDBSchemaManager {
return this.schemaManager;
}
/**
* Get InfluxDB writer instance
*/
getInfluxWriter(): QuestDBInfluxWriter {
return this.influxWriter;
}
/**
* Optimize table by rebuilding partitions
*/
async optimizeTable(tableName: string): Promise<void> {
await this.query(`VACUUM TABLE ${tableName}`);
this.logger.info(`Optimized table: ${tableName}`);
}
/**
* Create a table with time-series optimizations
*/
async createTable(
tableName: string,
columns: string,
partitionBy: string = 'DAY',
timestampColumn: string = 'timestamp'
): Promise<void> {
const sql = `
CREATE TABLE IF NOT EXISTS ${tableName} (
${columns}
) TIMESTAMP(${timestampColumn}) PARTITION BY ${partitionBy}
`;
await this.query(sql);
this.logger.info(`Created table: ${tableName}`);
}
/**
* Check if client is connected
*/
get connected(): boolean {
return this.isConnected && !!this.pgPool;
}
/**
* Get the PostgreSQL connection pool
*/
get connectionPool(): Pool | null {
return this.pgPool;
}
/**
* Get configuration
*/
get configuration(): QuestDBClientConfig {
return { ...this.config };
}
private buildConfig(config?: Partial<QuestDBClientConfig>): QuestDBClientConfig {
return {
host: config?.host || questdbConfig.QUESTDB_HOST,
httpPort: config?.httpPort || questdbConfig.QUESTDB_HTTP_PORT,
pgPort: config?.pgPort || questdbConfig.QUESTDB_PG_PORT,
influxPort: config?.influxPort || questdbConfig.QUESTDB_INFLUX_PORT,
user: config?.user || questdbConfig.QUESTDB_USER,
password: config?.password || questdbConfig.QUESTDB_PASSWORD,
database: config?.database || questdbConfig.QUESTDB_DEFAULT_DATABASE,
tls: {
enabled: questdbConfig.QUESTDB_TLS_ENABLED,
verifyServerCert: questdbConfig.QUESTDB_TLS_VERIFY_SERVER_CERT,
...config?.tls,
},
timeouts: {
connection: questdbConfig.QUESTDB_CONNECTION_TIMEOUT,
request: questdbConfig.QUESTDB_REQUEST_TIMEOUT,
...config?.timeouts,
},
retryAttempts: questdbConfig.QUESTDB_RETRY_ATTEMPTS,
...config,
};
}
private buildPgPoolConfig(): any {
return {
host: this.config.host,
port: this.config.pgPort,
database: this.config.database,
user: this.config.user,
password: this.config.password,
connectionTimeoutMillis: this.config.timeouts?.connection,
query_timeout: this.config.timeouts?.request,
ssl: this.config.tls?.enabled
? {
rejectUnauthorized: this.config.tls.verifyServerCert,
}
: false,
min: 2,
max: 10,
};
}
private mapDataType(typeId: number): string {
// Map PostgreSQL type IDs to QuestDB types
const typeMap: Record<number, string> = {
16: 'BOOLEAN',
20: 'LONG',
21: 'INT',
23: 'INT',
25: 'STRING',
700: 'FLOAT',
701: 'DOUBLE',
1043: 'STRING',
1082: 'DATE',
1114: 'TIMESTAMP',
1184: 'TIMESTAMP',
};
return typeMap[typeId] || 'STRING';
}
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}