stock-bot/libs/data/questdb/src/influx-writer.ts
2025-06-21 18:27:00 -04:00

430 lines
10 KiB
TypeScript

import { getLogger } from '@stock-bot/logger';
import type { InfluxLineData, InfluxWriteOptions } from './types';
// Interface to avoid circular dependency
interface QuestDBClientInterface {
getHttpUrl(): string;
}
/**
* QuestDB InfluxDB Line Protocol Writer
*
* Provides high-performance data ingestion using InfluxDB Line Protocol
* which QuestDB supports natively for optimal time-series data insertion.
*/
export class QuestDBInfluxWriter {
private readonly logger: ReturnType<typeof getLogger>;
private writeBuffer: string[] = [];
private flushTimer: NodeJS.Timeout | null = null;
private readonly defaultOptions: Required<InfluxWriteOptions> = {
batchSize: 1000,
flushInterval: 5000,
autoFlush: true,
precision: 'ms',
retryAttempts: 3,
retryDelay: 1000,
};
constructor(private readonly client: QuestDBClientInterface) {
this.logger = getLogger('questdb-influx-writer');
}
/**
* Write single data point using InfluxDB Line Protocol
*/
public async writePoint(
measurement: string,
tags: Record<string, string>,
fields: Record<string, number | string | boolean>,
timestamp?: Date,
options?: Partial<InfluxWriteOptions>
): Promise<void> {
const line = this.buildLineProtocol(measurement, tags, fields, timestamp);
const opts = { ...this.defaultOptions, ...options };
if (opts.autoFlush && this.writeBuffer.length === 0) {
// Single point write - send immediately
await this.sendLines([line], opts);
} else {
// Add to buffer
this.writeBuffer.push(line);
if (opts.autoFlush) {
this.scheduleFlush(opts);
}
// Flush if buffer is full
if (this.writeBuffer.length >= opts.batchSize) {
await this.flush(opts);
}
}
}
/**
* Write multiple data points
*/
public async writePoints(
data: InfluxLineData[],
options?: Partial<InfluxWriteOptions>
): Promise<void> {
const opts = { ...this.defaultOptions, ...options };
const lines = data.map(point =>
this.buildLineProtocol(point.measurement, point.tags, point.fields, point.timestamp)
);
if (opts.autoFlush) {
// Send immediately for batch writes
await this.sendLines(lines, opts);
} else {
// Add to buffer
this.writeBuffer.push(...lines);
// Flush if buffer exceeds batch size
while (this.writeBuffer.length >= opts.batchSize) {
const batch = this.writeBuffer.splice(0, opts.batchSize);
await this.sendLines(batch, opts);
}
}
}
/**
* Write OHLCV data optimized for QuestDB
*/
public async writeOHLCV(
symbol: string,
exchange: string,
data: {
timestamp: Date;
open: number;
high: number;
low: number;
close: number;
volume: number;
}[],
options?: Partial<InfluxWriteOptions>
): Promise<void> {
const influxData: InfluxLineData[] = data.map(candle => ({
measurement: 'ohlcv_data',
tags: {
symbol,
exchange,
data_source: 'market_feed',
},
fields: {
open: candle.open,
high: candle.high,
low: candle.low,
close: candle.close,
volume: candle.volume,
},
timestamp: candle.timestamp,
}));
await this.writePoints(influxData, options);
}
/**
* Write market analytics data
*/
public async writeMarketAnalytics(
symbol: string,
exchange: string,
analytics: {
timestamp: Date;
rsi?: number;
macd?: number;
signal?: number;
histogram?: number;
bollinger_upper?: number;
bollinger_lower?: number;
volume_sma?: number;
},
options?: Partial<InfluxWriteOptions>
): Promise<void> {
const fields: Record<string, number> = {};
// Only include defined values
Object.entries(analytics).forEach(([key, value]) => {
if (key !== 'timestamp' && value !== undefined && value !== null) {
fields[key] = value as number;
}
});
if (Object.keys(fields).length === 0) {
this.logger.debug('No analytics fields to write', { symbol, timestamp: analytics.timestamp });
return;
}
await this.writePoint(
'market_analytics',
{ symbol, exchange },
fields,
analytics.timestamp,
options
);
}
/**
* Write trade execution data
*/
public async writeTradeExecution(
execution: {
symbol: string;
side: 'buy' | 'sell';
quantity: number;
price: number;
timestamp: Date;
executionTime: number;
orderId?: string;
strategy?: string;
},
options?: Partial<InfluxWriteOptions>
): Promise<void> {
const tags: Record<string, string> = {
symbol: execution.symbol,
side: execution.side,
};
if (execution.orderId) {
tags['order_id'] = execution.orderId;
}
if (execution.strategy) {
tags['strategy'] = execution.strategy;
}
await this.writePoint(
'trade_executions',
tags,
{
quantity: execution.quantity,
price: execution.price,
execution_time: execution.executionTime,
},
execution.timestamp,
options
);
}
/**
* Write performance metrics
*/
public async writePerformanceMetrics(
metrics: {
timestamp: Date;
operation: string;
responseTime: number;
success: boolean;
errorCode?: string;
},
options?: Partial<InfluxWriteOptions>
): Promise<void> {
const tags: Record<string, string> = {
operation: metrics.operation,
success: metrics.success.toString(),
};
if (metrics.errorCode) {
tags['error_code'] = metrics.errorCode;
}
await this.writePoint(
'performance_metrics',
tags,
{ response_time: metrics.responseTime },
metrics.timestamp,
options
);
}
/**
* Manually flush the write buffer
*/
public async flush(options?: Partial<InfluxWriteOptions>): Promise<void> {
if (this.writeBuffer.length === 0) {
return;
}
const opts = { ...this.defaultOptions, ...options };
const lines = this.writeBuffer.splice(0); // Clear buffer
if (this.flushTimer) {
clearTimeout(this.flushTimer);
this.flushTimer = null;
}
await this.sendLines(lines, opts);
}
/**
* Get current buffer size
*/
public getBufferSize(): number {
return this.writeBuffer.length;
}
/**
* Clear the buffer without writing
*/
public clearBuffer(): void {
this.writeBuffer.length = 0;
if (this.flushTimer) {
clearTimeout(this.flushTimer);
this.flushTimer = null;
}
}
/**
* Build InfluxDB Line Protocol string
*/
private buildLineProtocol(
measurement: string,
tags: Record<string, string>,
fields: Record<string, number | string | boolean>,
timestamp?: Date
): string {
// Escape special characters in measurement name
const escapedMeasurement = measurement.replace(/[, =]/g, '\\$&');
// Build tags string
const tagString = Object.entries(tags)
.filter(([_, value]) => value !== undefined && value !== null)
.map(([key, value]) => `${this.escapeTagKey(key)}=${this.escapeTagValue(value)}`)
.join(',');
// Build fields string
const fieldString = Object.entries(fields)
.filter(([_, value]) => value !== undefined && value !== null)
.map(([key, value]) => `${this.escapeFieldKey(key)}=${this.formatFieldValue(value)}`)
.join(',');
// Build timestamp
const timestampString = timestamp
? Math.floor(timestamp.getTime() * 1000000).toString() // Convert to nanoseconds
: '';
// Combine parts
let line = escapedMeasurement;
if (tagString) {
line += `,${tagString}`;
}
line += ` ${fieldString}`;
if (timestampString) {
line += ` ${timestampString}`;
}
return line;
}
/**
* Send lines to QuestDB via HTTP endpoint
*/
private async sendLines(lines: string[], options: Required<InfluxWriteOptions>): Promise<void> {
if (lines.length === 0) {
return;
}
const payload = lines.join('\n');
let attempt = 0;
while (attempt <= options.retryAttempts) {
try {
// QuestDB InfluxDB Line Protocol endpoint
const response = await fetch(`${this.client.getHttpUrl()}/write`, {
method: 'POST',
headers: {
'Content-Type': 'text/plain',
},
body: payload,
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
this.logger.debug(`Successfully wrote ${lines.length} lines to QuestDB`);
return;
} catch (error) {
attempt++;
this.logger.warn(`Write attempt ${attempt} failed`, {
error,
linesCount: lines.length,
willRetry: attempt <= options.retryAttempts,
});
if (attempt <= options.retryAttempts) {
await this.sleep(options.retryDelay * attempt); // Exponential backoff
} else {
throw new Error(
`Failed to write to QuestDB after ${options.retryAttempts} attempts: $error`
);
}
}
}
}
/**
* Schedule automatic flush
*/
private scheduleFlush(options: Required<InfluxWriteOptions>): void {
if (this.flushTimer || !options.autoFlush) {
return;
}
this.flushTimer = setTimeout(async () => {
try {
await this.flush(options);
} catch (error) {
this.logger.error('Scheduled flush failed', error);
}
}, options.flushInterval);
}
/**
* Format field value for InfluxDB Line Protocol
*/
private formatFieldValue(value: number | string | boolean): string {
if (typeof value === 'string') {
return `"${value.replace(/"/g, '\\"')}"`;
} else if (typeof value === 'boolean') {
return value ? 'true' : 'false';
} else {
return value.toString();
}
}
/**
* Escape tag key
*/
private escapeTagKey(key: string): string {
return key.replace(/[, =]/g, '\\$&');
}
/**
* Escape tag value
*/
private escapeTagValue(value: string): string {
return value.replace(/[, =]/g, '\\$&');
}
/**
* Escape field key
*/
private escapeFieldKey(key: string): string {
return key.replace(/[, =]/g, '\\$&');
}
/**
* Sleep utility
*/
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
/**
* Cleanup resources
*/
public destroy(): void {
this.clearBuffer();
this.logger.debug('InfluxDB writer destroyed');
}
}