stock-bot/libs/questdb-client/src/query-builder.ts

368 lines
8.7 KiB
TypeScript

import { Logger } from '@stock-bot/logger';
import type {
QueryResult,
TimeSeriesQuery,
AggregationQuery,
TimeRange,
TableNames
} from './types';
// Interface to avoid circular dependency
interface QuestDBClientInterface {
query<T = any>(sql: string, params?: any[]): Promise<QueryResult<T>>;
}
/**
* QuestDB Query Builder
*
* Provides a fluent interface for building optimized time-series queries
* with support for QuestDB-specific functions and optimizations.
*/
export class QuestDBQueryBuilder {
private readonly logger: Logger;
private query!: {
select: string[];
from: string;
where: string[];
groupBy: string[];
orderBy: string[];
limit?: number;
sampleBy?: string;
latestBy?: string[];
timeRange?: TimeRange;
};
constructor(private readonly client: QuestDBClientInterface) {
this.logger = new Logger('QuestDBQueryBuilder');
this.reset();
}
/**
* Reset the query builder
*/
private reset(): QuestDBQueryBuilder {
this.query = {
select: [],
from: '',
where: [],
groupBy: [],
orderBy: [],
sampleBy: undefined,
latestBy: undefined,
timeRange: undefined
};
return this;
}
/**
* Start a new query
*/
public static create(client: QuestDBClientInterface): QuestDBQueryBuilder {
return new QuestDBQueryBuilder(client);
}
/**
* Select columns
*/
public select(...columns: string[]): QuestDBQueryBuilder {
this.query.select.push(...columns);
return this;
}
/**
* Select with aggregation functions
*/
public selectAgg(aggregations: Record<string, string>): QuestDBQueryBuilder {
Object.entries(aggregations).forEach(([alias, expression]) => {
this.query.select.push(`${expression} as ${alias}`);
});
return this;
}
/**
* From table
*/
public from(table: TableNames | string): QuestDBQueryBuilder {
this.query.from = table;
return this;
}
/**
* Where condition
*/
public where(condition: string): QuestDBQueryBuilder {
this.query.where.push(condition);
return this;
}
/**
* Where symbol equals
*/
public whereSymbol(symbol: string): QuestDBQueryBuilder {
this.query.where.push(`symbol = '${symbol}'`);
return this;
}
/**
* Where symbols in list
*/
public whereSymbolIn(symbols: string[]): QuestDBQueryBuilder {
const symbolList = symbols.map(s => `'${s}'`).join(', ');
this.query.where.push(`symbol IN (${symbolList})`);
return this;
}
/**
* Where exchange equals
*/
public whereExchange(exchange: string): QuestDBQueryBuilder {
this.query.where.push(`exchange = '${exchange}'`);
return this;
}
/**
* Time range filter
*/
public whereTimeRange(startTime: Date, endTime: Date): QuestDBQueryBuilder {
this.query.timeRange = { startTime, endTime };
this.query.where.push(
`timestamp >= '${startTime.toISOString()}' AND timestamp <= '${endTime.toISOString()}'`
);
return this;
}
/**
* Last N hours
*/
public whereLastHours(hours: number): QuestDBQueryBuilder {
this.query.where.push(`timestamp > dateadd('h', -${hours}, now())`);
return this;
}
/**
* Last N days
*/
public whereLastDays(days: number): QuestDBQueryBuilder {
this.query.where.push(`timestamp > dateadd('d', -${days}, now())`);
return this;
}
/**
* Group by columns
*/
public groupBy(...columns: string[]): QuestDBQueryBuilder {
this.query.groupBy.push(...columns);
return this;
}
/**
* Order by column
*/
public orderBy(column: string, direction: 'ASC' | 'DESC' = 'ASC'): QuestDBQueryBuilder {
this.query.orderBy.push(`${column} ${direction}`);
return this;
}
/**
* Order by timestamp descending (most recent first)
*/
public orderByTimeDesc(): QuestDBQueryBuilder {
this.query.orderBy.push('timestamp DESC');
return this;
}
/**
* Limit results
*/
public limit(count: number): QuestDBQueryBuilder {
this.query.limit = count;
return this;
}
/**
* Sample by time interval (QuestDB specific)
*/
public sampleBy(interval: string): QuestDBQueryBuilder {
this.query.sampleBy = interval;
return this;
}
/**
* Latest by columns (QuestDB specific)
*/
public latestBy(...columns: string[]): QuestDBQueryBuilder {
this.query.latestBy = columns;
return this;
}
/**
* Build and execute the query
*/
public async execute<T = any>(): Promise<QueryResult<T>> {
const sql = this.build();
this.logger.debug('Executing query', { sql });
try {
const result = await this.client.query<T>(sql);
this.reset(); // Reset for next query
return result;
} catch (error) {
this.logger.error('Query execution failed', { sql, error });
this.reset(); // Reset even on error
throw error;
}
}
/**
* Build the SQL query string
*/
public build(): string {
if (!this.query.from) {
throw new Error('FROM clause is required');
}
if (this.query.select.length === 0) {
this.query.select.push('*');
}
let sql = `SELECT ${this.query.select.join(', ')} FROM ${this.query.from}`;
// Add WHERE clause
if (this.query.where.length > 0) {
sql += ` WHERE ${this.query.where.join(' AND ')}`;
}
// Add LATEST BY (QuestDB specific - must come before GROUP BY)
if (this.query.latestBy && this.query.latestBy.length > 0) {
sql += ` LATEST BY ${this.query.latestBy.join(', ')}`;
}
// Add SAMPLE BY (QuestDB specific)
if (this.query.sampleBy) {
sql += ` SAMPLE BY ${this.query.sampleBy}`;
}
// Add GROUP BY
if (this.query.groupBy.length > 0) {
sql += ` GROUP BY ${this.query.groupBy.join(', ')}`;
}
// Add ORDER BY
if (this.query.orderBy.length > 0) {
sql += ` ORDER BY ${this.query.orderBy.join(', ')}`;
}
// Add LIMIT
if (this.query.limit) {
sql += ` LIMIT ${this.query.limit}`;
}
return sql;
}
/**
* Get the built query without executing
*/
public toSQL(): string {
return this.build();
}
// Predefined query methods for common use cases
/**
* Get latest OHLCV data for symbols
*/
public static latestOHLCV(
client: QuestDBClientInterface,
symbols: string[],
exchange?: string
): QuestDBQueryBuilder {
const builder = QuestDBQueryBuilder.create(client)
.select('symbol', 'timestamp', 'open', 'high', 'low', 'close', 'volume')
.from('ohlcv_data')
.whereSymbolIn(symbols)
.latestBy('symbol')
.orderByTimeDesc();
if (exchange) {
builder.whereExchange(exchange);
}
return builder;
}
/**
* Get OHLCV data with time sampling
*/
public static ohlcvTimeSeries(
client: QuestDBClientInterface,
symbol: string,
interval: string,
hours: number = 24
): QuestDBQueryBuilder {
return QuestDBQueryBuilder.create(client)
.selectAgg({
'first_open': 'first(open)',
'max_high': 'max(high)',
'min_low': 'min(low)',
'last_close': 'last(close)',
'sum_volume': 'sum(volume)'
})
.from('ohlcv_data')
.whereSymbol(symbol)
.whereLastHours(hours)
.sampleBy(interval)
.orderByTimeDesc();
}
/**
* Get market analytics data
*/
public static marketAnalytics(
client: QuestDBClientInterface,
symbols: string[],
hours: number = 1
): QuestDBQueryBuilder {
return QuestDBQueryBuilder.create(client)
.select('symbol', 'timestamp', 'rsi', 'macd', 'bollinger_upper', 'bollinger_lower', 'volume_sma')
.from('market_analytics')
.whereSymbolIn(symbols)
.whereLastHours(hours)
.orderBy('symbol')
.orderByTimeDesc();
}
/**
* Get performance metrics for a time range
*/
public static performanceMetrics(
client: QuestDBClientInterface,
startTime: Date,
endTime: Date
): QuestDBQueryBuilder {
return QuestDBQueryBuilder.create(client)
.selectAgg({
'total_trades': 'count(*)',
'avg_response_time': 'avg(response_time)',
'max_response_time': 'max(response_time)',
'error_rate': 'sum(case when success = false then 1 else 0 end) * 100.0 / count(*)'
})
.from('performance_metrics')
.whereTimeRange(startTime, endTime)
.sampleBy('1m');
}
/**
* Get trade execution data
*/
public static tradeExecutions(
client: QuestDBClientInterface,
symbol?: string,
hours: number = 24
): QuestDBQueryBuilder {
const builder = QuestDBQueryBuilder.create(client)
.select('symbol', 'timestamp', 'side', 'quantity', 'price', 'execution_time')
.from('trade_executions')
.whereLastHours(hours)
.orderByTimeDesc();
if (symbol) {
builder.whereSymbol(symbol);
}
return builder;
}
}