running prettier for cleanup
This commit is contained in:
parent
fe7733aeb5
commit
d85cd58acd
151 changed files with 29158 additions and 27966 deletions
|
|
@ -1,436 +1,430 @@
|
|||
import { getLogger } from '@stock-bot/logger';
|
||||
import type {
|
||||
InfluxLineData,
|
||||
InfluxWriteOptions,
|
||||
BaseTimeSeriesData
|
||||
} from './types';
|
||||
|
||||
// Interface to avoid circular dependency
|
||||
interface QuestDBClientInterface {
|
||||
getHttpUrl(): string;
|
||||
}
|
||||
|
||||
/**
|
||||
* QuestDB InfluxDB Line Protocol Writer
|
||||
*
|
||||
* Provides high-performance data ingestion using InfluxDB Line Protocol
|
||||
* which QuestDB supports natively for optimal time-series data insertion.
|
||||
*/
|
||||
export class QuestDBInfluxWriter {
|
||||
private readonly logger: ReturnType<typeof getLogger>;
|
||||
private writeBuffer: string[] = [];
|
||||
private flushTimer: NodeJS.Timeout | null = null;
|
||||
private readonly defaultOptions: Required<InfluxWriteOptions> = {
|
||||
batchSize: 1000,
|
||||
flushInterval: 5000,
|
||||
autoFlush: true,
|
||||
precision: 'ms',
|
||||
retryAttempts: 3,
|
||||
retryDelay: 1000
|
||||
};
|
||||
constructor(private readonly client: QuestDBClientInterface) {
|
||||
this.logger = getLogger('questdb-influx-writer');
|
||||
}
|
||||
|
||||
/**
|
||||
* Write single data point using InfluxDB Line Protocol
|
||||
*/
|
||||
public async writePoint(
|
||||
measurement: string,
|
||||
tags: Record<string, string>,
|
||||
fields: Record<string, number | string | boolean>,
|
||||
timestamp?: Date,
|
||||
options?: Partial<InfluxWriteOptions>
|
||||
): Promise<void> {
|
||||
const line = this.buildLineProtocol(measurement, tags, fields, timestamp);
|
||||
const opts = { ...this.defaultOptions, ...options };
|
||||
|
||||
if (opts.autoFlush && this.writeBuffer.length === 0) {
|
||||
// Single point write - send immediately
|
||||
await this.sendLines([line], opts);
|
||||
} else {
|
||||
// Add to buffer
|
||||
this.writeBuffer.push(line);
|
||||
|
||||
if (opts.autoFlush) {
|
||||
this.scheduleFlush(opts);
|
||||
}
|
||||
|
||||
// Flush if buffer is full
|
||||
if (this.writeBuffer.length >= opts.batchSize) {
|
||||
await this.flush(opts);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write multiple data points
|
||||
*/
|
||||
public async writePoints(
|
||||
data: InfluxLineData[],
|
||||
options?: Partial<InfluxWriteOptions>
|
||||
): Promise<void> {
|
||||
const opts = { ...this.defaultOptions, ...options };
|
||||
const lines = data.map(point =>
|
||||
this.buildLineProtocol(point.measurement, point.tags, point.fields, point.timestamp)
|
||||
);
|
||||
|
||||
if (opts.autoFlush) {
|
||||
// Send immediately for batch writes
|
||||
await this.sendLines(lines, opts);
|
||||
} else {
|
||||
// Add to buffer
|
||||
this.writeBuffer.push(...lines);
|
||||
|
||||
// Flush if buffer exceeds batch size
|
||||
while (this.writeBuffer.length >= opts.batchSize) {
|
||||
const batch = this.writeBuffer.splice(0, opts.batchSize);
|
||||
await this.sendLines(batch, opts);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write OHLCV data optimized for QuestDB
|
||||
*/
|
||||
public async writeOHLCV(
|
||||
symbol: string,
|
||||
exchange: string,
|
||||
data: {
|
||||
timestamp: Date;
|
||||
open: number;
|
||||
high: number;
|
||||
low: number;
|
||||
close: number;
|
||||
volume: number;
|
||||
}[],
|
||||
options?: Partial<InfluxWriteOptions>
|
||||
): Promise<void> {
|
||||
const influxData: InfluxLineData[] = data.map(candle => ({
|
||||
measurement: 'ohlcv_data',
|
||||
tags: {
|
||||
symbol,
|
||||
exchange,
|
||||
data_source: 'market_feed'
|
||||
},
|
||||
fields: {
|
||||
open: candle.open,
|
||||
high: candle.high,
|
||||
low: candle.low,
|
||||
close: candle.close,
|
||||
volume: candle.volume
|
||||
},
|
||||
timestamp: candle.timestamp
|
||||
}));
|
||||
|
||||
await this.writePoints(influxData, options);
|
||||
}
|
||||
|
||||
/**
|
||||
* Write market analytics data
|
||||
*/
|
||||
public async writeMarketAnalytics(
|
||||
symbol: string,
|
||||
exchange: string,
|
||||
analytics: {
|
||||
timestamp: Date;
|
||||
rsi?: number;
|
||||
macd?: number;
|
||||
signal?: number;
|
||||
histogram?: number;
|
||||
bollinger_upper?: number;
|
||||
bollinger_lower?: number;
|
||||
volume_sma?: number;
|
||||
},
|
||||
options?: Partial<InfluxWriteOptions>
|
||||
): Promise<void> {
|
||||
const fields: Record<string, number> = {};
|
||||
|
||||
// Only include defined values
|
||||
Object.entries(analytics).forEach(([key, value]) => {
|
||||
if (key !== 'timestamp' && value !== undefined && value !== null) {
|
||||
fields[key] = value as number;
|
||||
}
|
||||
});
|
||||
|
||||
if (Object.keys(fields).length === 0) {
|
||||
this.logger.warn('No analytics fields to write', { symbol, timestamp: analytics.timestamp });
|
||||
return;
|
||||
}
|
||||
|
||||
await this.writePoint(
|
||||
'market_analytics',
|
||||
{ symbol, exchange },
|
||||
fields,
|
||||
analytics.timestamp,
|
||||
options
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Write trade execution data
|
||||
*/
|
||||
public async writeTradeExecution(
|
||||
execution: {
|
||||
symbol: string;
|
||||
side: 'buy' | 'sell';
|
||||
quantity: number;
|
||||
price: number;
|
||||
timestamp: Date;
|
||||
executionTime: number;
|
||||
orderId?: string;
|
||||
strategy?: string;
|
||||
},
|
||||
options?: Partial<InfluxWriteOptions>
|
||||
): Promise<void> {
|
||||
const tags: Record<string, string> = {
|
||||
symbol: execution.symbol,
|
||||
side: execution.side
|
||||
};
|
||||
|
||||
if (execution.orderId) {
|
||||
tags.order_id = execution.orderId;
|
||||
}
|
||||
|
||||
if (execution.strategy) {
|
||||
tags.strategy = execution.strategy;
|
||||
}
|
||||
|
||||
await this.writePoint(
|
||||
'trade_executions',
|
||||
tags,
|
||||
{
|
||||
quantity: execution.quantity,
|
||||
price: execution.price,
|
||||
execution_time: execution.executionTime
|
||||
},
|
||||
execution.timestamp,
|
||||
options
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Write performance metrics
|
||||
*/
|
||||
public async writePerformanceMetrics(
|
||||
metrics: {
|
||||
timestamp: Date;
|
||||
operation: string;
|
||||
responseTime: number;
|
||||
success: boolean;
|
||||
errorCode?: string;
|
||||
},
|
||||
options?: Partial<InfluxWriteOptions>
|
||||
): Promise<void> {
|
||||
const tags: Record<string, string> = {
|
||||
operation: metrics.operation,
|
||||
success: metrics.success.toString()
|
||||
};
|
||||
|
||||
if (metrics.errorCode) {
|
||||
tags.error_code = metrics.errorCode;
|
||||
}
|
||||
|
||||
await this.writePoint(
|
||||
'performance_metrics',
|
||||
tags,
|
||||
{ response_time: metrics.responseTime },
|
||||
metrics.timestamp,
|
||||
options
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Manually flush the write buffer
|
||||
*/
|
||||
public async flush(options?: Partial<InfluxWriteOptions>): Promise<void> {
|
||||
if (this.writeBuffer.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const opts = { ...this.defaultOptions, ...options };
|
||||
const lines = this.writeBuffer.splice(0); // Clear buffer
|
||||
|
||||
if (this.flushTimer) {
|
||||
clearTimeout(this.flushTimer);
|
||||
this.flushTimer = null;
|
||||
}
|
||||
|
||||
await this.sendLines(lines, opts);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get current buffer size
|
||||
*/
|
||||
public getBufferSize(): number {
|
||||
return this.writeBuffer.length;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear the buffer without writing
|
||||
*/
|
||||
public clearBuffer(): void {
|
||||
this.writeBuffer.length = 0;
|
||||
if (this.flushTimer) {
|
||||
clearTimeout(this.flushTimer);
|
||||
this.flushTimer = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Build InfluxDB Line Protocol string
|
||||
*/
|
||||
private buildLineProtocol(
|
||||
measurement: string,
|
||||
tags: Record<string, string>,
|
||||
fields: Record<string, number | string | boolean>,
|
||||
timestamp?: Date
|
||||
): string {
|
||||
// Escape special characters in measurement name
|
||||
const escapedMeasurement = measurement.replace(/[, =]/g, '\\$&');
|
||||
|
||||
// Build tags string
|
||||
const tagString = Object.entries(tags)
|
||||
.filter(([_, value]) => value !== undefined && value !== null)
|
||||
.map(([key, value]) => `${this.escapeTagKey(key)}=${this.escapeTagValue(value)}`)
|
||||
.join(',');
|
||||
|
||||
// Build fields string
|
||||
const fieldString = Object.entries(fields)
|
||||
.filter(([_, value]) => value !== undefined && value !== null)
|
||||
.map(([key, value]) => `${this.escapeFieldKey(key)}=${this.formatFieldValue(value)}`)
|
||||
.join(',');
|
||||
|
||||
// Build timestamp
|
||||
const timestampString = timestamp ?
|
||||
Math.floor(timestamp.getTime() * 1000000).toString() : // Convert to nanoseconds
|
||||
'';
|
||||
|
||||
// Combine parts
|
||||
let line = escapedMeasurement;
|
||||
if (tagString) {
|
||||
line += `,${tagString}`;
|
||||
}
|
||||
line += ` ${fieldString}`;
|
||||
if (timestampString) {
|
||||
line += ` ${timestampString}`;
|
||||
}
|
||||
|
||||
return line;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send lines to QuestDB via HTTP endpoint
|
||||
*/
|
||||
private async sendLines(
|
||||
lines: string[],
|
||||
options: Required<InfluxWriteOptions>
|
||||
): Promise<void> {
|
||||
if (lines.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const payload = lines.join('\n');
|
||||
let attempt = 0;
|
||||
|
||||
while (attempt <= options.retryAttempts) {
|
||||
try {
|
||||
// QuestDB InfluxDB Line Protocol endpoint
|
||||
const response = await fetch(`${this.client.getHttpUrl()}/write`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'text/plain',
|
||||
},
|
||||
body: payload
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
|
||||
}
|
||||
|
||||
this.logger.debug(`Successfully wrote ${lines.length} lines to QuestDB`);
|
||||
return;
|
||||
|
||||
} catch (error) {
|
||||
attempt++;
|
||||
this.logger.error(`Write attempt ${attempt} failed`, {
|
||||
error,
|
||||
linesCount: lines.length,
|
||||
willRetry: attempt <= options.retryAttempts
|
||||
});
|
||||
|
||||
if (attempt <= options.retryAttempts) {
|
||||
await this.sleep(options.retryDelay * attempt); // Exponential backoff
|
||||
} else {
|
||||
throw new Error(`Failed to write to QuestDB after ${options.retryAttempts} attempts: $error`);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedule automatic flush
|
||||
*/
|
||||
private scheduleFlush(options: Required<InfluxWriteOptions>): void {
|
||||
if (this.flushTimer || !options.autoFlush) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.flushTimer = setTimeout(async () => {
|
||||
try {
|
||||
await this.flush(options);
|
||||
} catch (error) {
|
||||
this.logger.error('Scheduled flush failed', error);
|
||||
}
|
||||
}, options.flushInterval);
|
||||
}
|
||||
|
||||
/**
|
||||
* Format field value for InfluxDB Line Protocol
|
||||
*/
|
||||
private formatFieldValue(value: number | string | boolean): string {
|
||||
if (typeof value === 'string') {
|
||||
return `"${value.replace(/"/g, '\\"')}"`;
|
||||
} else if (typeof value === 'boolean') {
|
||||
return value ? 'true' : 'false';
|
||||
} else {
|
||||
return value.toString();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Escape tag key
|
||||
*/
|
||||
private escapeTagKey(key: string): string {
|
||||
return key.replace(/[, =]/g, '\\$&');
|
||||
}
|
||||
|
||||
/**
|
||||
* Escape tag value
|
||||
*/
|
||||
private escapeTagValue(value: string): string {
|
||||
return value.replace(/[, =]/g, '\\$&');
|
||||
}
|
||||
|
||||
/**
|
||||
* Escape field key
|
||||
*/
|
||||
private escapeFieldKey(key: string): string {
|
||||
return key.replace(/[, =]/g, '\\$&');
|
||||
}
|
||||
|
||||
/**
|
||||
* Sleep utility
|
||||
*/
|
||||
private sleep(ms: number): Promise<void> {
|
||||
return new Promise(resolve => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanup resources
|
||||
*/
|
||||
public destroy(): void {
|
||||
this.clearBuffer();
|
||||
this.logger.info('InfluxDB writer destroyed');
|
||||
}
|
||||
}
|
||||
import { getLogger } from '@stock-bot/logger';
|
||||
import type { BaseTimeSeriesData, InfluxLineData, InfluxWriteOptions } from './types';
|
||||
|
||||
// Interface to avoid circular dependency
|
||||
interface QuestDBClientInterface {
|
||||
getHttpUrl(): string;
|
||||
}
|
||||
|
||||
/**
|
||||
* QuestDB InfluxDB Line Protocol Writer
|
||||
*
|
||||
* Provides high-performance data ingestion using InfluxDB Line Protocol
|
||||
* which QuestDB supports natively for optimal time-series data insertion.
|
||||
*/
|
||||
export class QuestDBInfluxWriter {
|
||||
private readonly logger: ReturnType<typeof getLogger>;
|
||||
private writeBuffer: string[] = [];
|
||||
private flushTimer: NodeJS.Timeout | null = null;
|
||||
private readonly defaultOptions: Required<InfluxWriteOptions> = {
|
||||
batchSize: 1000,
|
||||
flushInterval: 5000,
|
||||
autoFlush: true,
|
||||
precision: 'ms',
|
||||
retryAttempts: 3,
|
||||
retryDelay: 1000,
|
||||
};
|
||||
constructor(private readonly client: QuestDBClientInterface) {
|
||||
this.logger = getLogger('questdb-influx-writer');
|
||||
}
|
||||
|
||||
/**
|
||||
* Write single data point using InfluxDB Line Protocol
|
||||
*/
|
||||
public async writePoint(
|
||||
measurement: string,
|
||||
tags: Record<string, string>,
|
||||
fields: Record<string, number | string | boolean>,
|
||||
timestamp?: Date,
|
||||
options?: Partial<InfluxWriteOptions>
|
||||
): Promise<void> {
|
||||
const line = this.buildLineProtocol(measurement, tags, fields, timestamp);
|
||||
const opts = { ...this.defaultOptions, ...options };
|
||||
|
||||
if (opts.autoFlush && this.writeBuffer.length === 0) {
|
||||
// Single point write - send immediately
|
||||
await this.sendLines([line], opts);
|
||||
} else {
|
||||
// Add to buffer
|
||||
this.writeBuffer.push(line);
|
||||
|
||||
if (opts.autoFlush) {
|
||||
this.scheduleFlush(opts);
|
||||
}
|
||||
|
||||
// Flush if buffer is full
|
||||
if (this.writeBuffer.length >= opts.batchSize) {
|
||||
await this.flush(opts);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write multiple data points
|
||||
*/
|
||||
public async writePoints(
|
||||
data: InfluxLineData[],
|
||||
options?: Partial<InfluxWriteOptions>
|
||||
): Promise<void> {
|
||||
const opts = { ...this.defaultOptions, ...options };
|
||||
const lines = data.map(point =>
|
||||
this.buildLineProtocol(point.measurement, point.tags, point.fields, point.timestamp)
|
||||
);
|
||||
|
||||
if (opts.autoFlush) {
|
||||
// Send immediately for batch writes
|
||||
await this.sendLines(lines, opts);
|
||||
} else {
|
||||
// Add to buffer
|
||||
this.writeBuffer.push(...lines);
|
||||
|
||||
// Flush if buffer exceeds batch size
|
||||
while (this.writeBuffer.length >= opts.batchSize) {
|
||||
const batch = this.writeBuffer.splice(0, opts.batchSize);
|
||||
await this.sendLines(batch, opts);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write OHLCV data optimized for QuestDB
|
||||
*/
|
||||
public async writeOHLCV(
|
||||
symbol: string,
|
||||
exchange: string,
|
||||
data: {
|
||||
timestamp: Date;
|
||||
open: number;
|
||||
high: number;
|
||||
low: number;
|
||||
close: number;
|
||||
volume: number;
|
||||
}[],
|
||||
options?: Partial<InfluxWriteOptions>
|
||||
): Promise<void> {
|
||||
const influxData: InfluxLineData[] = data.map(candle => ({
|
||||
measurement: 'ohlcv_data',
|
||||
tags: {
|
||||
symbol,
|
||||
exchange,
|
||||
data_source: 'market_feed',
|
||||
},
|
||||
fields: {
|
||||
open: candle.open,
|
||||
high: candle.high,
|
||||
low: candle.low,
|
||||
close: candle.close,
|
||||
volume: candle.volume,
|
||||
},
|
||||
timestamp: candle.timestamp,
|
||||
}));
|
||||
|
||||
await this.writePoints(influxData, options);
|
||||
}
|
||||
|
||||
/**
|
||||
* Write market analytics data
|
||||
*/
|
||||
public async writeMarketAnalytics(
|
||||
symbol: string,
|
||||
exchange: string,
|
||||
analytics: {
|
||||
timestamp: Date;
|
||||
rsi?: number;
|
||||
macd?: number;
|
||||
signal?: number;
|
||||
histogram?: number;
|
||||
bollinger_upper?: number;
|
||||
bollinger_lower?: number;
|
||||
volume_sma?: number;
|
||||
},
|
||||
options?: Partial<InfluxWriteOptions>
|
||||
): Promise<void> {
|
||||
const fields: Record<string, number> = {};
|
||||
|
||||
// Only include defined values
|
||||
Object.entries(analytics).forEach(([key, value]) => {
|
||||
if (key !== 'timestamp' && value !== undefined && value !== null) {
|
||||
fields[key] = value as number;
|
||||
}
|
||||
});
|
||||
|
||||
if (Object.keys(fields).length === 0) {
|
||||
this.logger.warn('No analytics fields to write', { symbol, timestamp: analytics.timestamp });
|
||||
return;
|
||||
}
|
||||
|
||||
await this.writePoint(
|
||||
'market_analytics',
|
||||
{ symbol, exchange },
|
||||
fields,
|
||||
analytics.timestamp,
|
||||
options
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Write trade execution data
|
||||
*/
|
||||
public async writeTradeExecution(
|
||||
execution: {
|
||||
symbol: string;
|
||||
side: 'buy' | 'sell';
|
||||
quantity: number;
|
||||
price: number;
|
||||
timestamp: Date;
|
||||
executionTime: number;
|
||||
orderId?: string;
|
||||
strategy?: string;
|
||||
},
|
||||
options?: Partial<InfluxWriteOptions>
|
||||
): Promise<void> {
|
||||
const tags: Record<string, string> = {
|
||||
symbol: execution.symbol,
|
||||
side: execution.side,
|
||||
};
|
||||
|
||||
if (execution.orderId) {
|
||||
tags.order_id = execution.orderId;
|
||||
}
|
||||
|
||||
if (execution.strategy) {
|
||||
tags.strategy = execution.strategy;
|
||||
}
|
||||
|
||||
await this.writePoint(
|
||||
'trade_executions',
|
||||
tags,
|
||||
{
|
||||
quantity: execution.quantity,
|
||||
price: execution.price,
|
||||
execution_time: execution.executionTime,
|
||||
},
|
||||
execution.timestamp,
|
||||
options
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Write performance metrics
|
||||
*/
|
||||
public async writePerformanceMetrics(
|
||||
metrics: {
|
||||
timestamp: Date;
|
||||
operation: string;
|
||||
responseTime: number;
|
||||
success: boolean;
|
||||
errorCode?: string;
|
||||
},
|
||||
options?: Partial<InfluxWriteOptions>
|
||||
): Promise<void> {
|
||||
const tags: Record<string, string> = {
|
||||
operation: metrics.operation,
|
||||
success: metrics.success.toString(),
|
||||
};
|
||||
|
||||
if (metrics.errorCode) {
|
||||
tags.error_code = metrics.errorCode;
|
||||
}
|
||||
|
||||
await this.writePoint(
|
||||
'performance_metrics',
|
||||
tags,
|
||||
{ response_time: metrics.responseTime },
|
||||
metrics.timestamp,
|
||||
options
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Manually flush the write buffer
|
||||
*/
|
||||
public async flush(options?: Partial<InfluxWriteOptions>): Promise<void> {
|
||||
if (this.writeBuffer.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const opts = { ...this.defaultOptions, ...options };
|
||||
const lines = this.writeBuffer.splice(0); // Clear buffer
|
||||
|
||||
if (this.flushTimer) {
|
||||
clearTimeout(this.flushTimer);
|
||||
this.flushTimer = null;
|
||||
}
|
||||
|
||||
await this.sendLines(lines, opts);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get current buffer size
|
||||
*/
|
||||
public getBufferSize(): number {
|
||||
return this.writeBuffer.length;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear the buffer without writing
|
||||
*/
|
||||
public clearBuffer(): void {
|
||||
this.writeBuffer.length = 0;
|
||||
if (this.flushTimer) {
|
||||
clearTimeout(this.flushTimer);
|
||||
this.flushTimer = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Build InfluxDB Line Protocol string
|
||||
*/
|
||||
private buildLineProtocol(
|
||||
measurement: string,
|
||||
tags: Record<string, string>,
|
||||
fields: Record<string, number | string | boolean>,
|
||||
timestamp?: Date
|
||||
): string {
|
||||
// Escape special characters in measurement name
|
||||
const escapedMeasurement = measurement.replace(/[, =]/g, '\\$&');
|
||||
|
||||
// Build tags string
|
||||
const tagString = Object.entries(tags)
|
||||
.filter(([_, value]) => value !== undefined && value !== null)
|
||||
.map(([key, value]) => `${this.escapeTagKey(key)}=${this.escapeTagValue(value)}`)
|
||||
.join(',');
|
||||
|
||||
// Build fields string
|
||||
const fieldString = Object.entries(fields)
|
||||
.filter(([_, value]) => value !== undefined && value !== null)
|
||||
.map(([key, value]) => `${this.escapeFieldKey(key)}=${this.formatFieldValue(value)}`)
|
||||
.join(',');
|
||||
|
||||
// Build timestamp
|
||||
const timestampString = timestamp
|
||||
? Math.floor(timestamp.getTime() * 1000000).toString() // Convert to nanoseconds
|
||||
: '';
|
||||
|
||||
// Combine parts
|
||||
let line = escapedMeasurement;
|
||||
if (tagString) {
|
||||
line += `,${tagString}`;
|
||||
}
|
||||
line += ` ${fieldString}`;
|
||||
if (timestampString) {
|
||||
line += ` ${timestampString}`;
|
||||
}
|
||||
|
||||
return line;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send lines to QuestDB via HTTP endpoint
|
||||
*/
|
||||
private async sendLines(lines: string[], options: Required<InfluxWriteOptions>): Promise<void> {
|
||||
if (lines.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const payload = lines.join('\n');
|
||||
let attempt = 0;
|
||||
|
||||
while (attempt <= options.retryAttempts) {
|
||||
try {
|
||||
// QuestDB InfluxDB Line Protocol endpoint
|
||||
const response = await fetch(`${this.client.getHttpUrl()}/write`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'text/plain',
|
||||
},
|
||||
body: payload,
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
|
||||
}
|
||||
|
||||
this.logger.debug(`Successfully wrote ${lines.length} lines to QuestDB`);
|
||||
return;
|
||||
} catch (error) {
|
||||
attempt++;
|
||||
this.logger.error(`Write attempt ${attempt} failed`, {
|
||||
error,
|
||||
linesCount: lines.length,
|
||||
willRetry: attempt <= options.retryAttempts,
|
||||
});
|
||||
|
||||
if (attempt <= options.retryAttempts) {
|
||||
await this.sleep(options.retryDelay * attempt); // Exponential backoff
|
||||
} else {
|
||||
throw new Error(
|
||||
`Failed to write to QuestDB after ${options.retryAttempts} attempts: $error`
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedule automatic flush
|
||||
*/
|
||||
private scheduleFlush(options: Required<InfluxWriteOptions>): void {
|
||||
if (this.flushTimer || !options.autoFlush) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.flushTimer = setTimeout(async () => {
|
||||
try {
|
||||
await this.flush(options);
|
||||
} catch (error) {
|
||||
this.logger.error('Scheduled flush failed', error);
|
||||
}
|
||||
}, options.flushInterval);
|
||||
}
|
||||
|
||||
/**
|
||||
* Format field value for InfluxDB Line Protocol
|
||||
*/
|
||||
private formatFieldValue(value: number | string | boolean): string {
|
||||
if (typeof value === 'string') {
|
||||
return `"${value.replace(/"/g, '\\"')}"`;
|
||||
} else if (typeof value === 'boolean') {
|
||||
return value ? 'true' : 'false';
|
||||
} else {
|
||||
return value.toString();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Escape tag key
|
||||
*/
|
||||
private escapeTagKey(key: string): string {
|
||||
return key.replace(/[, =]/g, '\\$&');
|
||||
}
|
||||
|
||||
/**
|
||||
* Escape tag value
|
||||
*/
|
||||
private escapeTagValue(value: string): string {
|
||||
return value.replace(/[, =]/g, '\\$&');
|
||||
}
|
||||
|
||||
/**
|
||||
* Escape field key
|
||||
*/
|
||||
private escapeFieldKey(key: string): string {
|
||||
return key.replace(/[, =]/g, '\\$&');
|
||||
}
|
||||
|
||||
/**
|
||||
* Sleep utility
|
||||
*/
|
||||
private sleep(ms: number): Promise<void> {
|
||||
return new Promise(resolve => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanup resources
|
||||
*/
|
||||
public destroy(): void {
|
||||
this.clearBuffer();
|
||||
this.logger.info('InfluxDB writer destroyed');
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue