476 lines
12 KiB
TypeScript
476 lines
12 KiB
TypeScript
import { Pool } from 'pg';
|
|
import { questdbConfig } from '@stock-bot/config';
|
|
import { getLogger } from '@stock-bot/logger';
|
|
import { QuestDBHealthMonitor } from './health';
|
|
import { QuestDBInfluxWriter } from './influx-writer';
|
|
import { QuestDBQueryBuilder } from './query-builder';
|
|
import { QuestDBSchemaManager } from './schema';
|
|
import type {
|
|
BaseTimeSeriesData,
|
|
InsertResult,
|
|
QueryResult,
|
|
QuestDBClientConfig,
|
|
QuestDBConnectionOptions,
|
|
TableNames,
|
|
} from './types';
|
|
|
|
/**
|
|
* QuestDB Client for Stock Bot
|
|
*
|
|
* Provides high-performance time-series data access with support for
|
|
* multiple protocols (HTTP, PostgreSQL, InfluxDB Line Protocol).
|
|
*/
|
|
export class QuestDBClient {
|
|
private pgPool: Pool | null = null;
|
|
private readonly config: QuestDBClientConfig;
|
|
private readonly options: QuestDBConnectionOptions;
|
|
private readonly logger = getLogger('QuestDBClient');
|
|
private readonly healthMonitor: QuestDBHealthMonitor;
|
|
private readonly influxWriter: QuestDBInfluxWriter;
|
|
private readonly schemaManager: QuestDBSchemaManager;
|
|
private isConnected = false;
|
|
|
|
constructor(config?: Partial<QuestDBClientConfig>, options?: QuestDBConnectionOptions) {
|
|
this.config = this.buildConfig(config);
|
|
this.options = {
|
|
protocol: 'pg',
|
|
retryAttempts: 3,
|
|
retryDelay: 1000,
|
|
healthCheckInterval: 30000,
|
|
...options,
|
|
};
|
|
|
|
this.healthMonitor = new QuestDBHealthMonitor(this);
|
|
this.influxWriter = new QuestDBInfluxWriter(this);
|
|
this.schemaManager = new QuestDBSchemaManager(this);
|
|
}
|
|
|
|
/**
|
|
* Connect to QuestDB
|
|
*/
|
|
async connect(): Promise<void> {
|
|
if (this.isConnected) {
|
|
return;
|
|
}
|
|
|
|
let lastError: Error | null = null;
|
|
|
|
for (let attempt = 1; attempt <= this.options.retryAttempts!; attempt++) {
|
|
try {
|
|
this.logger.info(
|
|
`Connecting to QuestDB (attempt ${attempt}/${this.options.retryAttempts})...`
|
|
);
|
|
|
|
// Connect via PostgreSQL wire protocol
|
|
this.pgPool = new Pool(this.buildPgPoolConfig());
|
|
|
|
// Test the connection
|
|
const client = await this.pgPool.connect();
|
|
await client.query('SELECT 1');
|
|
client.release();
|
|
|
|
this.isConnected = true;
|
|
this.logger.info('Successfully connected to QuestDB');
|
|
// Initialize schema
|
|
await this.schemaManager.initializeDatabase();
|
|
|
|
// Start health monitoring
|
|
this.healthMonitor.startMonitoring();
|
|
|
|
return;
|
|
} catch (error) {
|
|
lastError = error as Error;
|
|
this.logger.error(`QuestDB connection attempt ${attempt} failed:`, error);
|
|
|
|
if (this.pgPool) {
|
|
await this.pgPool.end();
|
|
this.pgPool = null;
|
|
}
|
|
|
|
if (attempt < this.options.retryAttempts!) {
|
|
await this.delay(this.options.retryDelay! * attempt);
|
|
}
|
|
}
|
|
}
|
|
|
|
throw new Error(
|
|
`Failed to connect to QuestDB after ${this.options.retryAttempts} attempts: ${lastError?.message}`
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Disconnect from QuestDB
|
|
*/
|
|
async disconnect(): Promise<void> {
|
|
if (!this.isConnected) {
|
|
return;
|
|
}
|
|
try {
|
|
this.healthMonitor.stopMonitoring();
|
|
|
|
if (this.pgPool) {
|
|
await this.pgPool.end();
|
|
this.pgPool = null;
|
|
}
|
|
|
|
this.isConnected = false;
|
|
this.logger.info('Disconnected from QuestDB');
|
|
} catch (error) {
|
|
this.logger.error('Error disconnecting from QuestDB:', error);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Execute a SQL query
|
|
*/
|
|
async query<T = any>(sql: string, params?: any[]): Promise<QueryResult<T>> {
|
|
if (!this.pgPool) {
|
|
throw new Error('QuestDB client not connected');
|
|
}
|
|
|
|
const startTime = Date.now();
|
|
|
|
try {
|
|
const result = await this.pgPool.query(sql, params);
|
|
const executionTime = Date.now() - startTime;
|
|
|
|
this.logger.debug(`Query executed in ${executionTime}ms`, {
|
|
query: sql.substring(0, 100),
|
|
rowCount: result.rowCount,
|
|
});
|
|
|
|
return {
|
|
rows: result.rows,
|
|
rowCount: result.rowCount || 0,
|
|
executionTime,
|
|
metadata: {
|
|
columns:
|
|
result.fields?.map((field: any) => ({
|
|
name: field.name,
|
|
type: this.mapDataType(field.dataTypeID),
|
|
})) || [],
|
|
},
|
|
};
|
|
} catch (error) {
|
|
const executionTime = Date.now() - startTime;
|
|
this.logger.error(`Query failed after ${executionTime}ms:`, {
|
|
error: (error as Error).message,
|
|
query: sql,
|
|
params,
|
|
});
|
|
throw error;
|
|
}
|
|
}
|
|
/**
|
|
* Write OHLCV data using InfluxDB Line Protocol
|
|
*/
|
|
async writeOHLCV(
|
|
symbol: string,
|
|
exchange: string,
|
|
data: Array<{
|
|
timestamp: Date;
|
|
open: number;
|
|
high: number;
|
|
low: number;
|
|
close: number;
|
|
volume: number;
|
|
}>
|
|
): Promise<void> {
|
|
return await this.influxWriter.writeOHLCV(symbol, exchange, data);
|
|
}
|
|
|
|
/**
|
|
* Write market analytics data
|
|
*/
|
|
async writeMarketAnalytics(
|
|
symbol: string,
|
|
exchange: string,
|
|
analytics: {
|
|
timestamp: Date;
|
|
rsi?: number;
|
|
macd?: number;
|
|
signal?: number;
|
|
histogram?: number;
|
|
bollinger_upper?: number;
|
|
bollinger_lower?: number;
|
|
volume_sma?: number;
|
|
}
|
|
): Promise<void> {
|
|
return await this.influxWriter.writeMarketAnalytics(symbol, exchange, analytics);
|
|
}
|
|
|
|
/**
|
|
* Get a query builder instance
|
|
*/
|
|
queryBuilder(): QuestDBQueryBuilder {
|
|
return new QuestDBQueryBuilder(this);
|
|
}
|
|
/**
|
|
* Create a SELECT query builder
|
|
*/
|
|
select(...columns: string[]): QuestDBQueryBuilder {
|
|
return this.queryBuilder().select(...columns);
|
|
}
|
|
|
|
/**
|
|
* Create an aggregation query builder
|
|
*/
|
|
aggregate(table: TableNames): QuestDBQueryBuilder {
|
|
return this.queryBuilder().from(table);
|
|
}
|
|
|
|
/**
|
|
* Execute a time-series specific query with SAMPLE BY
|
|
*/
|
|
async sampleBy<T = any>(
|
|
table: TableNames,
|
|
columns: string[],
|
|
interval: string,
|
|
timeColumn: string = 'timestamp',
|
|
where?: string,
|
|
params?: any[]
|
|
): Promise<QueryResult<T>> {
|
|
const columnsStr = columns.join(', ');
|
|
const whereClause = where ? `WHERE ${where}` : '';
|
|
|
|
const sql = `
|
|
SELECT ${columnsStr}
|
|
FROM ${table}
|
|
${whereClause}
|
|
SAMPLE BY ${interval}
|
|
ALIGN TO CALENDAR
|
|
`;
|
|
|
|
return await this.query<T>(sql, params);
|
|
}
|
|
|
|
/**
|
|
* Get latest values by symbol using LATEST BY
|
|
*/
|
|
async latestBy<T = any>(
|
|
table: TableNames,
|
|
columns: string | string[] = '*',
|
|
keyColumns: string | string[] = 'symbol'
|
|
): Promise<QueryResult<T>> {
|
|
const columnsStr = Array.isArray(columns) ? columns.join(', ') : columns;
|
|
const keyColumnsStr = Array.isArray(keyColumns) ? keyColumns.join(', ') : keyColumns;
|
|
|
|
const sql = `
|
|
SELECT ${columnsStr}
|
|
FROM ${table}
|
|
LATEST BY ${keyColumnsStr}
|
|
`;
|
|
|
|
return await this.query<T>(sql);
|
|
}
|
|
|
|
/**
|
|
* Execute ASOF JOIN for time-series correlation
|
|
*/
|
|
async asofJoin<T = any>(
|
|
leftTable: TableNames,
|
|
rightTable: TableNames,
|
|
joinCondition: string,
|
|
columns?: string[],
|
|
where?: string,
|
|
params?: any[]
|
|
): Promise<QueryResult<T>> {
|
|
const columnsStr = columns ? columns.join(', ') : '*';
|
|
const whereClause = where ? `WHERE ${where}` : '';
|
|
|
|
const sql = `
|
|
SELECT ${columnsStr}
|
|
FROM ${leftTable}
|
|
ASOF JOIN ${rightTable} ON ${joinCondition}
|
|
${whereClause}
|
|
`;
|
|
|
|
return await this.query<T>(sql, params);
|
|
}
|
|
|
|
/**
|
|
* Get database statistics
|
|
*/
|
|
async getStats(): Promise<any> {
|
|
const result = await this.query(`
|
|
SELECT
|
|
table_name,
|
|
row_count,
|
|
partition_count,
|
|
size_bytes
|
|
FROM tables()
|
|
WHERE table_name NOT LIKE 'sys.%'
|
|
ORDER BY row_count DESC
|
|
`);
|
|
return result.rows;
|
|
}
|
|
|
|
/**
|
|
* Get table information
|
|
*/
|
|
async getTableInfo(tableName: string): Promise<any> {
|
|
const result = await this.query(`SELECT * FROM table_columns WHERE table_name = ?`, [
|
|
tableName,
|
|
]);
|
|
return result.rows;
|
|
}
|
|
|
|
/**
|
|
* Check if PostgreSQL pool is healthy
|
|
*/
|
|
isPgPoolHealthy(): boolean {
|
|
return this.pgPool !== null && !this.pgPool.ended;
|
|
}
|
|
|
|
/**
|
|
* Get HTTP endpoint URL
|
|
*/
|
|
getHttpUrl(): string {
|
|
const protocol = this.config.tls?.enabled ? 'https' : 'http';
|
|
return `${protocol}://${this.config.host}:${this.config.httpPort}`;
|
|
}
|
|
|
|
/**
|
|
* Get InfluxDB endpoint URL
|
|
*/
|
|
getInfluxUrl(): string {
|
|
const protocol = this.config.tls?.enabled ? 'https' : 'http';
|
|
return `${protocol}://${this.config.host}:${this.config.influxPort}`;
|
|
}
|
|
|
|
/**
|
|
* Get health monitor instance
|
|
*/
|
|
getHealthMonitor(): QuestDBHealthMonitor {
|
|
return this.healthMonitor;
|
|
}
|
|
|
|
/**
|
|
* Get schema manager instance
|
|
*/
|
|
getSchemaManager(): QuestDBSchemaManager {
|
|
return this.schemaManager;
|
|
}
|
|
|
|
/**
|
|
* Get InfluxDB writer instance
|
|
*/
|
|
getInfluxWriter(): QuestDBInfluxWriter {
|
|
return this.influxWriter;
|
|
}
|
|
|
|
/**
|
|
* Optimize table by rebuilding partitions
|
|
*/
|
|
async optimizeTable(tableName: string): Promise<void> {
|
|
await this.query(`VACUUM TABLE ${tableName}`);
|
|
this.logger.info(`Optimized table: ${tableName}`);
|
|
}
|
|
|
|
/**
|
|
* Create a table with time-series optimizations
|
|
*/
|
|
async createTable(
|
|
tableName: string,
|
|
columns: string,
|
|
partitionBy: string = 'DAY',
|
|
timestampColumn: string = 'timestamp'
|
|
): Promise<void> {
|
|
const sql = `
|
|
CREATE TABLE IF NOT EXISTS ${tableName} (
|
|
${columns}
|
|
) TIMESTAMP(${timestampColumn}) PARTITION BY ${partitionBy}
|
|
`;
|
|
|
|
await this.query(sql);
|
|
this.logger.info(`Created table: ${tableName}`);
|
|
}
|
|
|
|
/**
|
|
* Check if client is connected
|
|
*/
|
|
get connected(): boolean {
|
|
return this.isConnected && !!this.pgPool;
|
|
}
|
|
|
|
/**
|
|
* Get the PostgreSQL connection pool
|
|
*/
|
|
get connectionPool(): Pool | null {
|
|
return this.pgPool;
|
|
}
|
|
|
|
/**
|
|
* Get configuration
|
|
*/
|
|
get configuration(): QuestDBClientConfig {
|
|
return { ...this.config };
|
|
}
|
|
|
|
private buildConfig(config?: Partial<QuestDBClientConfig>): QuestDBClientConfig {
|
|
return {
|
|
host: config?.host || questdbConfig.QUESTDB_HOST,
|
|
httpPort: config?.httpPort || questdbConfig.QUESTDB_HTTP_PORT,
|
|
pgPort: config?.pgPort || questdbConfig.QUESTDB_PG_PORT,
|
|
influxPort: config?.influxPort || questdbConfig.QUESTDB_INFLUX_PORT,
|
|
user: config?.user || questdbConfig.QUESTDB_USER,
|
|
password: config?.password || questdbConfig.QUESTDB_PASSWORD,
|
|
database: config?.database || questdbConfig.QUESTDB_DEFAULT_DATABASE,
|
|
tls: {
|
|
enabled: questdbConfig.QUESTDB_TLS_ENABLED,
|
|
verifyServerCert: questdbConfig.QUESTDB_TLS_VERIFY_SERVER_CERT,
|
|
...config?.tls,
|
|
},
|
|
timeouts: {
|
|
connection: questdbConfig.QUESTDB_CONNECTION_TIMEOUT,
|
|
request: questdbConfig.QUESTDB_REQUEST_TIMEOUT,
|
|
...config?.timeouts,
|
|
},
|
|
retryAttempts: questdbConfig.QUESTDB_RETRY_ATTEMPTS,
|
|
...config,
|
|
};
|
|
}
|
|
|
|
private buildPgPoolConfig(): any {
|
|
return {
|
|
host: this.config.host,
|
|
port: this.config.pgPort,
|
|
database: this.config.database,
|
|
user: this.config.user,
|
|
password: this.config.password,
|
|
connectionTimeoutMillis: this.config.timeouts?.connection,
|
|
query_timeout: this.config.timeouts?.request,
|
|
ssl: this.config.tls?.enabled
|
|
? {
|
|
rejectUnauthorized: this.config.tls.verifyServerCert,
|
|
}
|
|
: false,
|
|
min: 2,
|
|
max: 10,
|
|
};
|
|
}
|
|
|
|
private mapDataType(typeId: number): string {
|
|
// Map PostgreSQL type IDs to QuestDB types
|
|
const typeMap: Record<number, string> = {
|
|
16: 'BOOLEAN',
|
|
20: 'LONG',
|
|
21: 'INT',
|
|
23: 'INT',
|
|
25: 'STRING',
|
|
700: 'FLOAT',
|
|
701: 'DOUBLE',
|
|
1043: 'STRING',
|
|
1082: 'DATE',
|
|
1114: 'TIMESTAMP',
|
|
1184: 'TIMESTAMP',
|
|
};
|
|
|
|
return typeMap[typeId] || 'STRING';
|
|
}
|
|
|
|
private delay(ms: number): Promise<void> {
|
|
return new Promise(resolve => setTimeout(resolve, ms));
|
|
}
|
|
}
|