stock-bot/libs/questdb-client/src/schema.ts

404 lines
13 KiB
TypeScript

import { getLogger } from '@stock-bot/logger';
import type { TableSchema, IndexDefinition, TableNames, QueryResult } from './types';
// Interface to avoid circular dependency
interface QuestDBClientInterface {
query<T = any>(sql: string, params?: any[]): Promise<QueryResult<T>>;
}
/**
* QuestDB Schema Manager
*
* Manages database schemas, table creation, and optimization
* for time-series data storage in QuestDB.
*/
export class QuestDBSchemaManager {
private readonly logger: ReturnType<typeof getLogger>;
private readonly schemas: Map<string, TableSchema> = new Map();
constructor(private readonly client: QuestDBClientInterface) {
this.logger = getLogger('questdb-schema-manager');
this.initializeSchemas();
}
/**
* Initialize predefined schemas
*/
private initializeSchemas(): void {
// OHLCV Data Table
this.schemas.set('ohlcv_data', {
tableName: 'ohlcv_data',
columns: [
{ name: 'symbol', type: 'SYMBOL', nullable: false },
{ name: 'exchange', type: 'SYMBOL', nullable: false },
{ name: 'timestamp', type: 'TIMESTAMP', nullable: false, designated: true },
{ name: 'open', type: 'DOUBLE', nullable: false },
{ name: 'high', type: 'DOUBLE', nullable: false },
{ name: 'low', type: 'DOUBLE', nullable: false },
{ name: 'close', type: 'DOUBLE', nullable: false },
{ name: 'volume', type: 'LONG', nullable: false },
{ name: 'data_source', type: 'SYMBOL', nullable: true }
],
partitionBy: 'DAY',
orderBy: ['symbol', 'timestamp'],
indices: [
{ columns: ['symbol'], type: 'HASH' },
{ columns: ['exchange'], type: 'HASH' }
]
});
// Market Analytics Table
this.schemas.set('market_analytics', {
tableName: 'market_analytics',
columns: [
{ name: 'symbol', type: 'SYMBOL', nullable: false },
{ name: 'exchange', type: 'SYMBOL', nullable: false },
{ name: 'timestamp', type: 'TIMESTAMP', nullable: false, designated: true },
{ name: 'rsi', type: 'DOUBLE', nullable: true },
{ name: 'macd', type: 'DOUBLE', nullable: true },
{ name: 'signal', type: 'DOUBLE', nullable: true },
{ name: 'histogram', type: 'DOUBLE', nullable: true },
{ name: 'bollinger_upper', type: 'DOUBLE', nullable: true },
{ name: 'bollinger_lower', type: 'DOUBLE', nullable: true },
{ name: 'volume_sma', type: 'DOUBLE', nullable: true },
{ name: 'timeframe', type: 'SYMBOL', nullable: true }
],
partitionBy: 'DAY',
orderBy: ['symbol', 'timestamp'],
indices: [
{ columns: ['symbol'], type: 'HASH' },
{ columns: ['timeframe'], type: 'HASH' }
]
});
// Trade Executions Table
this.schemas.set('trade_executions', {
tableName: 'trade_executions',
columns: [
{ name: 'symbol', type: 'SYMBOL', nullable: false },
{ name: 'timestamp', type: 'TIMESTAMP', nullable: false, designated: true },
{ name: 'side', type: 'SYMBOL', nullable: false },
{ name: 'quantity', type: 'DOUBLE', nullable: false },
{ name: 'price', type: 'DOUBLE', nullable: false },
{ name: 'execution_time', type: 'LONG', nullable: false },
{ name: 'order_id', type: 'SYMBOL', nullable: true },
{ name: 'strategy', type: 'SYMBOL', nullable: true },
{ name: 'commission', type: 'DOUBLE', nullable: true }
],
partitionBy: 'DAY',
orderBy: ['symbol', 'timestamp'],
indices: [
{ columns: ['symbol'], type: 'HASH' },
{ columns: ['order_id'], type: 'HASH' },
{ columns: ['strategy'], type: 'HASH' }
]
});
// Performance Metrics Table
this.schemas.set('performance_metrics', {
tableName: 'performance_metrics',
columns: [
{ name: 'timestamp', type: 'TIMESTAMP', nullable: false, designated: true },
{ name: 'operation', type: 'SYMBOL', nullable: false },
{ name: 'response_time', type: 'LONG', nullable: false },
{ name: 'success', type: 'BOOLEAN', nullable: false },
{ name: 'error_code', type: 'SYMBOL', nullable: true },
{ name: 'component', type: 'SYMBOL', nullable: true }
],
partitionBy: 'HOUR',
orderBy: ['operation', 'timestamp'],
indices: [
{ columns: ['operation'], type: 'HASH' },
{ columns: ['success'], type: 'HASH' }
]
});
// Portfolio Positions Table
this.schemas.set('portfolio_positions', {
tableName: 'portfolio_positions',
columns: [
{ name: 'portfolio_id', type: 'SYMBOL', nullable: false },
{ name: 'symbol', type: 'SYMBOL', nullable: false },
{ name: 'timestamp', type: 'TIMESTAMP', nullable: false, designated: true },
{ name: 'quantity', type: 'DOUBLE', nullable: false },
{ name: 'avg_cost', type: 'DOUBLE', nullable: false },
{ name: 'market_value', type: 'DOUBLE', nullable: false },
{ name: 'unrealized_pnl', type: 'DOUBLE', nullable: false },
{ name: 'realized_pnl', type: 'DOUBLE', nullable: false }
],
partitionBy: 'DAY',
orderBy: ['portfolio_id', 'symbol', 'timestamp'],
indices: [
{ columns: ['portfolio_id'], type: 'HASH' },
{ columns: ['symbol'], type: 'HASH' }
]
});
// Risk Metrics Table
this.schemas.set('risk_metrics', {
tableName: 'risk_metrics',
columns: [
{ name: 'portfolio_id', type: 'SYMBOL', nullable: false },
{ name: 'timestamp', type: 'TIMESTAMP', nullable: false, designated: true },
{ name: 'var_1d', type: 'DOUBLE', nullable: true },
{ name: 'var_5d', type: 'DOUBLE', nullable: true },
{ name: 'expected_shortfall', type: 'DOUBLE', nullable: true },
{ name: 'beta', type: 'DOUBLE', nullable: true },
{ name: 'sharpe_ratio', type: 'DOUBLE', nullable: true },
{ name: 'max_drawdown', type: 'DOUBLE', nullable: true },
{ name: 'volatility', type: 'DOUBLE', nullable: true }
],
partitionBy: 'DAY',
orderBy: ['portfolio_id', 'timestamp'],
indices: [
{ columns: ['portfolio_id'], type: 'HASH' }
]
});
}
/**
* Create all tables
*/
public async createAllTables(): Promise<void> {
this.logger.info('Creating all QuestDB tables');
for (const [tableName, schema] of this.schemas) {
try {
await this.createTable(schema);
this.logger.info(`Table ${tableName} created successfully`);
} catch (error) {
this.logger.error(`Failed to create table ${tableName}`, error);
throw error;
}
}
}
/**
* Create a single table
*/
public async createTable(schema: TableSchema): Promise<void> {
const sql = this.buildCreateTableSQL(schema);
try {
await this.client.query(sql);
this.logger.info(`Table ${schema.tableName} created`, { sql });
} catch (error) {
// Check if table already exists
if (error instanceof Error && error.message.includes('already exists')) {
this.logger.info(`Table ${schema.tableName} already exists`);
return;
}
throw error;
}
}
/**
* Drop a table
*/
public async dropTable(tableName: string): Promise<void> {
const sql = `DROP TABLE IF EXISTS ${tableName}`;
try {
await this.client.query(sql);
this.logger.info(`Table ${tableName} dropped`);
} catch (error) {
this.logger.error(`Failed to drop table ${tableName}`, error);
throw error;
}
}
/**
* Check if table exists
*/
public async tableExists(tableName: string): Promise<boolean> {
try {
const result = await this.client.query(`
SELECT COUNT(*) as count
FROM information_schema.tables
WHERE table_name = '${tableName}'
`);
return result.rows.length > 0 && result.rows[0].count > 0;
} catch (error) {
this.logger.error(`Error checking if table exists: ${tableName}`, error);
return false;
}
}
/**
* Get table schema
*/
public getSchema(tableName: string): TableSchema | undefined {
return this.schemas.get(tableName);
}
/**
* Add custom schema
*/
public addSchema(schema: TableSchema): void {
this.schemas.set(schema.tableName, schema);
this.logger.info(`Schema added for table: ${schema.tableName}`);
}
/**
* Get all schema names
*/
public getSchemaNames(): string[] {
return Array.from(this.schemas.keys());
}
/**
* Optimize table (rebuild indices, etc.)
*/
public async optimizeTable(tableName: string): Promise<void> {
const schema = this.schemas.get(tableName);
if (!schema) {
throw new Error(`Schema not found for table: ${tableName}`);
}
// QuestDB automatically optimizes, but we can analyze table stats
try {
const stats = await this.getTableStats(tableName);
this.logger.info(`Table ${tableName} stats`, stats);
} catch (error) {
this.logger.error(`Failed to optimize table ${tableName}`, error);
throw error;
}
}
/**
* Get table statistics
*/
public async getTableStats(tableName: string): Promise<any> {
try {
const result = await this.client.query(`
SELECT
COUNT(*) as row_count,
MIN(timestamp) as min_timestamp,
MAX(timestamp) as max_timestamp
FROM ${tableName}
`);
return result.rows[0] || {};
} catch (error) {
this.logger.error(`Failed to get table stats for ${tableName}`, error);
throw error;
}
}
/**
* Truncate table (remove all data but keep structure)
*/
public async truncateTable(tableName: string): Promise<void> {
try {
await this.client.query(`TRUNCATE TABLE ${tableName}`);
this.logger.info(`Table ${tableName} truncated`);
} catch (error) {
this.logger.error(`Failed to truncate table ${tableName}`, error);
throw error;
}
}
/**
* Create table partitions for future dates
*/
public async createPartitions(tableName: string, days: number = 30): Promise<void> {
// QuestDB handles partitioning automatically based on the PARTITION BY clause
// This method is for future extensibility
this.logger.info(`Partitioning is automatic for table ${tableName}`);
}
/**
* Build CREATE TABLE SQL statement
*/
private buildCreateTableSQL(schema: TableSchema): string {
const columns = schema.columns.map(col => {
let columnDef = `${col.name} ${col.type}`;
if (!col.nullable) {
columnDef += ' NOT NULL';
}
return columnDef;
}).join(', ');
let sql = `CREATE TABLE IF NOT EXISTS ${schema.tableName} (${columns})`;
// Add designated timestamp
const timestampColumn = schema.columns.find(col => col.designated);
if (timestampColumn) {
sql += ` timestamp(${timestampColumn.name})`;
}
// Add partition by
if (schema.partitionBy) {
sql += ` PARTITION BY ${schema.partitionBy}`;
}
return sql;
}
/**
* Build index creation SQL (for future use)
*/
private buildCreateIndexSQL(tableName: string, index: IndexDefinition): string {
const indexName = `idx_${tableName}_${index.columns.join('_')}`;
const columns = index.columns.join(', ');
// QuestDB uses different index syntax, this is for future compatibility
return `CREATE INDEX ${indexName} ON ${tableName} (${columns})`;
}
/**
* Validate schema definition
*/
private validateSchema(schema: TableSchema): void {
if (!schema.tableName) {
throw new Error('Table name is required');
}
if (!schema.columns || schema.columns.length === 0) {
throw new Error('At least one column is required');
}
const timestampColumns = schema.columns.filter(col => col.designated);
if (timestampColumns.length > 1) {
throw new Error('Only one designated timestamp column is allowed');
}
if (timestampColumns.length === 0) {
throw new Error('A designated timestamp column is required for time-series tables');
}
}
/**
* Get table creation status
*/
public async getTableCreationStatus(): Promise<Record<string, boolean>> {
const status: Record<string, boolean> = {};
for (const tableName of this.schemas.keys()) {
status[tableName] = await this.tableExists(tableName);
}
return status;
}
/**
* Initialize database schema
*/
public async initializeDatabase(): Promise<void> {
this.logger.info('Initializing QuestDB schema');
// Validate all schemas first
for (const schema of this.schemas.values()) {
this.validateSchema(schema);
}
// Create all tables
await this.createAllTables();
// Get creation status
const status = await this.getTableCreationStatus();
this.logger.info('Database initialization complete', { tableStatus: status });
}
}