deleted a lot of the stuff

This commit is contained in:
Bojan Kucera 2025-06-04 10:50:05 -04:00
parent d22f7aafa0
commit 3e451558ac
173 changed files with 1313 additions and 30205 deletions

View file

@ -3,6 +3,19 @@
*/
import { cleanEnv, str, num, bool } from 'envalid';
export interface ProviderConfig {
name: string;
type: 'rest' | 'websocket';
enabled: boolean;
baseUrl?: string;
apiKey?: string;
apiSecret?: string;
rateLimits?: {
maxRequestsPerMinute?: number;
maxRequestsPerSecond?: number;
maxRequestsPerHour?: number;
};
}
/**
* Data providers configuration with validation and defaults
*/
@ -53,6 +66,8 @@ export const dataProvidersConfig = cleanEnv(process.env, {
* Helper function to get provider-specific configuration
*/
export function getProviderConfig(providerName: string) {
// make a interface for the provider config
const name = providerName.toUpperCase();
switch (name) {
@ -128,6 +143,20 @@ export function getDefaultProvider() {
// Export typed configuration object
export type DataProvidersConfig = typeof dataProvidersConfig;
export class DataProviders {
static getProviderConfig(providerName: string): ProviderConfig {
return getProviderConfig(providerName);
}
static getEnabledProviders(): ProviderConfig[] {
return getEnabledProviders();
}
static getDefaultProvider(): ProviderConfig {
return getDefaultProvider();
}
}
// Export individual config values for convenience
export const {

View file

@ -11,11 +11,11 @@
"test": "jest"
},
"dependencies": {
"@stock-bot/types": "workspace:*",
"@stock-bot/config": "workspace:*",
"winston": "^3.11.0",
"winston-loki": "^6.0.8",
"winston-daily-rotate-file": "^4.7.1"
"@stock-bot/types": "workspace:*",
"pino": "^9.7.0",
"pino-loki": "^2.6.0",
"pino-pretty": "^13.0.0"
},
"devDependencies": {
"@types/jest": "^29.5.2",

View file

@ -21,11 +21,15 @@ export {
LogThrottle
} from './utils';
// Express middleware
// Hono middleware
export {
loggingMiddleware,
errorLoggingMiddleware,
createRequestLogger
createRequestLogger,
performanceMiddleware,
securityMiddleware,
businessEventMiddleware,
comprehensiveLoggingMiddleware
} from './middleware';
// Type exports

View file

@ -1,23 +1,123 @@
/**
* Enhanced logger with Loki integration for Stock Bot platform
* Enhanced Pino-based logger with Loki integration for Stock Bot platform
*
* Features:
* - Multiple log levels (debug, info, warn, error)
* - Console and file logging
* - High performance JSON logging with Pino
* - Multiple log levels (debug, info, warn, error, http, verbose, silly)
* - Console and file logging with pino-pretty formatting
* - Loki integration for centralized logging
* - Structured logging with metadata
* - Performance optimized with batching
* - Flexible message handling (string or object)
* - Service-specific context
* - TypeScript-friendly interfaces
*/
import winston from 'winston';
import LokiTransport from 'winston-loki';
import DailyRotateFile from 'winston-daily-rotate-file';
import pino from 'pino';
import { loggingConfig, lokiConfig } from '@stock-bot/config';
import type { LogLevel, LogContext, LogMetadata } from './types';
// Global logger instances cache
const loggerInstances = new Map<string, winston.Logger>();
const loggerInstances = new Map<string, pino.Logger>();
// Pino log level mapping from string to number
const PINO_LEVELS: Record<LogLevel, number> = {
silly: 10,
debug: 20,
verbose: 25,
http: 30,
info: 30,
warn: 40,
error: 50
};
/**
* Create transport configuration for Pino based on options
*/
function createTransports(serviceName: string, options?: {
enableConsole?: boolean;
enableFile?: boolean;
enableLoki?: boolean;
}): any {
const {
enableConsole = loggingConfig.LOG_CONSOLE,
enableFile = loggingConfig.LOG_FILE,
enableLoki = true
} = options || {};
const targets: any[] = [];
// Console transport with pretty formatting
if (enableConsole) {
targets.push({
target: 'pino-pretty',
level: loggingConfig.LOG_LEVEL,
options: {
colorize: true,
translateTime: 'yyyy-mm-dd HH:MM:ss.l',
ignore: 'pid,hostname',
messageFormat: '[{service}] {msg}',
customPrettifiers: {
service: (service: string) => `${service}`,
level: (level: string) => `[${level.toUpperCase()}]`
}
}
});
}
// File transport for general logs
if (enableFile) {
targets.push({
target: 'pino/file',
level: loggingConfig.LOG_LEVEL,
options: {
destination: `${loggingConfig.LOG_FILE_PATH}/${serviceName}.log`,
mkdir: true
}
});
// Separate error file if enabled
if (loggingConfig.LOG_ERROR_FILE) {
targets.push({
target: 'pino/file',
level: 'error',
options: {
destination: `${loggingConfig.LOG_FILE_PATH}/${serviceName}-error.log`,
mkdir: true
}
});
}
}
// Loki transport for centralized logging
if (enableLoki && lokiConfig.LOKI_HOST) {
targets.push({
target: 'pino-loki',
level: loggingConfig.LOG_LEVEL,
options: {
batching: true,
interval: lokiConfig.LOKI_FLUSH_INTERVAL_MS,
host: lokiConfig.LOKI_URL || `http://${lokiConfig.LOKI_HOST}:${lokiConfig.LOKI_PORT}`,
basicAuth: lokiConfig.LOKI_USERNAME && lokiConfig.LOKI_PASSWORD
? {
username: lokiConfig.LOKI_USERNAME,
password: lokiConfig.LOKI_PASSWORD
}
: undefined,
labels: {
service: serviceName,
environment: lokiConfig.LOKI_ENVIRONMENT_LABEL,
...(lokiConfig.LOKI_DEFAULT_LABELS ? JSON.parse(lokiConfig.LOKI_DEFAULT_LABELS) : {})
},
timeout: lokiConfig.LOKI_PUSH_TIMEOUT || 10000,
silenceErrors: false
}
});
}
return {
targets
};
}
/**
* Create or retrieve a logger instance for a specific service
@ -27,7 +127,7 @@ export function createLogger(serviceName: string, options?: {
enableLoki?: boolean;
enableFile?: boolean;
enableConsole?: boolean;
}): winston.Logger {
}): pino.Logger {
const key = `${serviceName}-${JSON.stringify(options || {})}`;
if (loggerInstances.has(key)) {
@ -41,124 +141,52 @@ export function createLogger(serviceName: string, options?: {
}
/**
* Build a winston logger with all configured transports
* Build a Pino logger with all configured transports
*/
function buildLogger(serviceName: string, options?: {
level?: LogLevel;
enableLoki?: boolean;
enableFile?: boolean;
enableConsole?: boolean;
}): winston.Logger {
}): pino.Logger {
const {
level = loggingConfig.LOG_LEVEL as LogLevel,
enableLoki = true,
enableFile = loggingConfig.LOG_FILE,
enableConsole = loggingConfig.LOG_CONSOLE
} = options || {}; // Base logger configuration
const transports: winston.transport[] = [];
} = options || {};
// Console transport
if (enableConsole) {
transports.push(new winston.transports.Console({
format: winston.format.combine(
winston.format.colorize(),
winston.format.simple(),
winston.format.printf(({ timestamp, level, service, message, metadata }) => {
const meta = metadata && Object.keys(metadata).length > 0
? `\n${JSON.stringify(metadata, null, 2)}`
: '';
return `${timestamp} [${level}] [${service}] ${message}${meta}`;
})
)
}));
}
const transport = createTransports(serviceName, {
enableConsole,
enableFile,
enableLoki
});
// File transport with daily rotation
if (enableFile) {
// General log file
transports.push(new DailyRotateFile({
filename: `${loggingConfig.LOG_FILE_PATH}/${serviceName}-%DATE%.log`,
datePattern: loggingConfig.LOG_FILE_DATE_PATTERN,
zippedArchive: true,
maxSize: loggingConfig.LOG_FILE_MAX_SIZE,
maxFiles: loggingConfig.LOG_FILE_MAX_FILES,
format: winston.format.combine(
winston.format.timestamp(),
winston.format.json()
)
}));
// Separate error log file
if (loggingConfig.LOG_ERROR_FILE) {
transports.push(new DailyRotateFile({
level: 'error',
filename: `${loggingConfig.LOG_FILE_PATH}/${serviceName}-error-%DATE%.log`,
datePattern: loggingConfig.LOG_FILE_DATE_PATTERN,
zippedArchive: true,
maxSize: loggingConfig.LOG_FILE_MAX_SIZE,
maxFiles: loggingConfig.LOG_FILE_MAX_FILES,
format: winston.format.combine(
winston.format.timestamp(),
winston.format.json()
)
}));
}
}
// Loki transport for centralized logging
if (enableLoki && lokiConfig.LOKI_HOST) {
try {
const lokiTransport = new LokiTransport({
host: lokiConfig.LOKI_URL || `http://${lokiConfig.LOKI_HOST}:${lokiConfig.LOKI_PORT}`,
labels: {
service: serviceName,
environment: lokiConfig.LOKI_ENVIRONMENT_LABEL,
...(lokiConfig.LOKI_DEFAULT_LABELS ? JSON.parse(lokiConfig.LOKI_DEFAULT_LABELS) : {})
},
json: true,
batching: true,
interval: lokiConfig.LOKI_FLUSH_INTERVAL_MS,
timeout: lokiConfig.LOKI_PUSH_TIMEOUT,
basicAuth: lokiConfig.LOKI_USERNAME && lokiConfig.LOKI_PASSWORD
? `${lokiConfig.LOKI_USERNAME}:${lokiConfig.LOKI_PASSWORD}`
: undefined,
onConnectionError: (err) => {
console.error('Loki connection error:', err);
}
});
transports.push(lokiTransport);
} catch (error) {
console.warn('Failed to initialize Loki transport:', error);
}
}
const loggerConfig: winston.LoggerOptions = {
level,
format: winston.format.combine(
winston.format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss.SSS' }),
winston.format.errors({ stack: true }),
winston.format.metadata({
fillExcept: ['message', 'level', 'timestamp', 'service']
}),
winston.format.json()
),
defaultMeta: {
const loggerConfig: pino.LoggerOptions = {
level: PINO_LEVELS[level] ? level : 'info',
customLevels: PINO_LEVELS,
useOnlyCustomLevels: false,
timestamp: () => `,"timestamp":"${new Date().toISOString()}"`,
formatters: {
level: (label: string) => ({ level: label }),
bindings: () => ({})
},
base: {
service: serviceName,
environment: loggingConfig.LOG_ENVIRONMENT,
version: loggingConfig.LOG_SERVICE_VERSION
},
transports
transport
};
return winston.createLogger(loggerConfig);
return pino(loggerConfig);
}
/**
* Enhanced Logger class with convenience methods
* Enhanced Logger class with convenience methods and flexible message handling
*/
export class Logger {
private winston: winston.Logger;
private pino: pino.Logger;
private serviceName: string;
private context: LogContext;
@ -170,34 +198,90 @@ export class Logger {
}) {
this.serviceName = serviceName;
this.context = context;
this.winston = createLogger(serviceName, options);
this.pino = createLogger(serviceName, options);
}
/**
* Flexible log method that accepts string or object messages
*/
log(level: LogLevel, message: string | object, metadata?: LogMetadata): void {
const logData = {
...this.context,
...metadata,
timestamp: new Date().toISOString()
};
// Map custom log levels to Pino levels
const pinoLevel = this.mapToPinoLevel(level);
if (typeof message === 'string') {
(this.pino as any)[pinoLevel](logData, message);
} else {
(this.pino as any)[pinoLevel]({ ...logData, ...message });
}
}
/**
* Map custom log levels to Pino levels
*/
private mapToPinoLevel(level: LogLevel): string {
switch (level) {
case 'silly':
case 'verbose':
case 'debug':
return 'debug';
case 'http':
case 'info':
return 'info';
case 'warn':
return 'warn';
case 'error':
return 'error';
default:
return 'info';
}
}
/**
* Debug level logging
*/
debug(message: string, metadata?: LogMetadata): void {
debug(message: string | object, metadata?: LogMetadata): void {
this.log('debug', message, metadata);
}
/**
* Verbose level logging
*/
verbose(message: string | object, metadata?: LogMetadata): void {
// Map verbose to debug level since Pino doesn't have verbose by default
this.log('debug', message, { ...metadata, originalLevel: 'verbose' });
}
/**
* Silly level logging
*/
silly(message: string | object, metadata?: LogMetadata): void {
// Map silly to debug level since Pino doesn't have silly by default
this.log('debug', message, { ...metadata, originalLevel: 'silly' });
}
/**
* Info level logging
*/
info(message: string, metadata?: LogMetadata): void {
info(message: string | object, metadata?: LogMetadata): void {
this.log('info', message, metadata);
}
/**
* Warning level logging
*/
warn(message: string, metadata?: LogMetadata): void {
warn(message: string | object, metadata?: LogMetadata): void {
this.log('warn', message, metadata);
}
/**
* Error level logging
*/
error(message: string, error?: Error | any, metadata?: LogMetadata): void {
error(message: string | object, error?: Error | any, metadata?: LogMetadata): void {
const logData: LogMetadata = { ...metadata };
if (error) {
@ -218,7 +302,7 @@ export class Logger {
/**
* HTTP request logging
*/
http(message: string, requestData?: {
http(message: string | object, requestData?: {
method?: string;
url?: string;
statusCode?: number;
@ -228,7 +312,7 @@ export class Logger {
}): void {
if (!loggingConfig.LOG_HTTP_REQUESTS) return;
this.log('http', message, {
this.log('info', message, {
request: requestData,
type: 'http_request'
});
@ -237,7 +321,7 @@ export class Logger {
/**
* Performance/timing logging
*/
performance(message: string, timing: {
performance(message: string | object, timing: {
operation: string;
duration: number;
startTime?: number;
@ -254,7 +338,7 @@ export class Logger {
/**
* Business event logging
*/
business(message: string, event: {
business(message: string | object, event: {
type: string;
entity?: string;
action?: string;
@ -271,7 +355,7 @@ export class Logger {
/**
* Security event logging
*/
security(message: string, event: {
security(message: string | object, event: {
type: 'authentication' | 'authorization' | 'access' | 'vulnerability';
user?: string;
resource?: string;
@ -301,33 +385,27 @@ export class Logger {
}
/**
* Internal logging method
* Get the underlying Pino logger
*/
private log(level: LogLevel, message: string, metadata?: LogMetadata): void {
const logData = {
...this.context,
...metadata,
timestamp: new Date().toISOString()
};
this.winston.log(level, message, logData);
getPinoLogger(): pino.Logger {
return this.pino;
}
/**
* Get the underlying winston logger
* Compatibility method for existing code that expects Winston logger
*/
getWinstonLogger(): winston.Logger {
return this.winston;
getWinstonLogger(): any {
console.warn('getWinstonLogger() is deprecated. Use getPinoLogger() instead.');
return this.pino;
}
/**
* Gracefully close all transports
*/
async close(): Promise<void> {
return new Promise((resolve) => {
this.winston.end(() => {
resolve();
});
});
// Pino doesn't require explicit closing like Winston
// But we can flush any pending logs
this.pino.flush();
}
}
@ -342,15 +420,13 @@ export function getLogger(serviceName: string, context?: LogContext): Logger {
* Shutdown all logger instances gracefully
*/
export async function shutdownLoggers(): Promise<void> {
const closePromises = Array.from(loggerInstances.values()).map(logger =>
new Promise<void>((resolve) => {
logger.end(() => {
resolve();
});
})
);
// Flush all logger instances
const flushPromises = Array.from(loggerInstances.values()).map(logger => {
logger.flush();
return Promise.resolve();
});
await Promise.all(closePromises);
await Promise.all(flushPromises);
loggerInstances.clear();
}

View file

@ -1,10 +1,10 @@
/**
* Hono middleware for request logging
* Comprehensive Hono middleware for request logging with Pino integration
*/
import type { Context, Next } from 'hono';
import { Logger } from './logger';
import { generateCorrelationId, createTimer } from './utils';
import { generateCorrelationId, createTimer, sanitizeMetadata, LogThrottle } from './utils';
export interface LoggingMiddlewareOptions {
logger?: Logger;
@ -14,10 +14,18 @@ export interface LoggingMiddlewareOptions {
logRequestBody?: boolean;
logResponseBody?: boolean;
maxBodySize?: number;
enablePerformanceMetrics?: boolean;
enableThrottling?: boolean;
throttleConfig?: {
maxLogs?: number;
windowMs?: number;
};
sensitiveHeaders?: string[];
redactSensitiveData?: boolean;
}
/**
* Hono middleware for HTTP request/response logging
* Hono middleware for HTTP request/response logging with enhanced features
*/
export function loggingMiddleware(options: LoggingMiddlewareOptions = {}) {
const {
@ -26,11 +34,19 @@ export function loggingMiddleware(options: LoggingMiddlewareOptions = {}) {
skipSuccessfulRequests = false,
logRequestBody = false,
logResponseBody = false,
maxBodySize = 1024
maxBodySize = 1024,
enablePerformanceMetrics = true,
enableThrottling = false,
throttleConfig = { maxLogs: 100, windowMs: 60000 },
sensitiveHeaders = ['authorization', 'cookie', 'x-api-key', 'x-auth-token'],
redactSensitiveData = true
} = options;
// Create logger if not provided
const logger = options.logger || new Logger(serviceName);
// Create throttle instance if enabled
const throttle = enableThrottling ? new LogThrottle(throttleConfig.maxLogs, throttleConfig.windowMs) : null;
return async (c: Context, next: Next) => {
const url = new URL(c.req.url);
@ -39,7 +55,15 @@ export function loggingMiddleware(options: LoggingMiddlewareOptions = {}) {
// Skip certain paths
if (skipPaths.some(skipPath => path.startsWith(skipPath))) {
return next();
} // Generate correlation ID
}
// Check throttling
const throttleKey = `${c.req.method}-${path}`;
if (throttle && !throttle.shouldLog(throttleKey)) {
return next();
}
// Generate correlation ID
const correlationId = generateCorrelationId();
// Store correlation ID in context for later use
c.set('correlationId', correlationId);
@ -47,7 +71,23 @@ export function loggingMiddleware(options: LoggingMiddlewareOptions = {}) {
c.header('x-correlation-id', correlationId);
// Start timer
const timer = createTimer(`${c.req.method} ${path}`); // Extract request metadata
const timer = createTimer(`${c.req.method} ${path}`);
// Extract request headers (sanitized)
const rawHeaders = Object.fromEntries(
Array.from(c.req.raw.headers.entries())
);
const headers = redactSensitiveData
? Object.fromEntries(
Object.entries(rawHeaders).map(([key, value]) => [
key,
sensitiveHeaders.some(sensitive => key.toLowerCase().includes(sensitive.toLowerCase()))
? '[REDACTED]'
: value
])
)
: rawHeaders; // Extract request metadata following LogMetadata.request structure
const requestMetadata: any = {
method: c.req.method,
url: c.req.url,
@ -55,9 +95,9 @@ export function loggingMiddleware(options: LoggingMiddlewareOptions = {}) {
userAgent: c.req.header('user-agent'),
contentType: c.req.header('content-type'),
contentLength: c.req.header('content-length'),
ip: c.req.header('x-forwarded-for') || c.req.header('x-real-ip') || 'unknown'
ip: c.req.header('x-forwarded-for') || c.req.header('x-real-ip') || 'unknown',
headers: headers
};
// Add request body if enabled
if (logRequestBody) {
try {
@ -69,8 +109,9 @@ export function loggingMiddleware(options: LoggingMiddlewareOptions = {}) {
}
} catch (error) {
// Body might not be available or already consumed
requestMetadata.bodyError = 'Unable to read request body';
}
} // Log request start
} // Log request start with structured data
logger.http('HTTP Request started', {
method: c.req.method,
url: c.req.url,
@ -78,71 +119,103 @@ export function loggingMiddleware(options: LoggingMiddlewareOptions = {}) {
ip: c.req.header('x-forwarded-for') || c.req.header('x-real-ip') || 'unknown'
});
// Process request
await next();
let error: Error | null = null;
let responseTime: number = 0;
// Calculate response time
const timing = timer.end();
// Get response information
const response = c.res;
const status = response.status;
// Determine log level based on status code
const isError = status >= 400;
const isSuccess = status >= 200 && status < 300;
// Skip successful requests if configured
if (skipSuccessfulRequests && isSuccess) {
return;
}
try {
// Process request
await next();
} catch (err) {
error = err instanceof Error ? err : new Error(String(err));
throw error; // Re-throw to maintain error handling flow
} finally {
// Calculate response time
const timing = timer.end();
responseTime = timing.duration;
// Get response information
const response = c.res;
const status = response.status;
// Determine log level based on status code
const isError = status >= 400 || error !== null;
const isSuccess = status >= 200 && status < 300;
const isRedirect = status >= 300 && status < 400;
// Skip successful requests if configured
if (skipSuccessfulRequests && isSuccess && !error) {
return;
}
const responseMetadata: any = {
correlationId,
request: requestMetadata,
response: {
// Extract response headers (sanitized)
const rawResponseHeaders = Object.fromEntries(
Array.from(response.headers.entries())
);
const responseHeaders = redactSensitiveData
? Object.fromEntries(
Object.entries(rawResponseHeaders).map(([key, value]) => [
key,
sensitiveHeaders.some(sensitive => key.toLowerCase().includes(sensitive.toLowerCase()))
? '[REDACTED]'
: value
])
)
: rawResponseHeaders;
const responseMetadata: any = {
statusCode: status,
statusText: response.statusText,
headers: responseHeaders,
contentLength: response.headers.get('content-length'),
contentType: response.headers.get('content-type')
},
performance: timing
};
};
// Add response body if enabled
if (logResponseBody) {
try {
const responseBody = await response.clone().text();
if (responseBody) {
responseMetadata.response.body = responseBody.length > maxBodySize
? responseBody.substring(0, maxBodySize) + '...[truncated]'
: responseBody;
// Add response body if enabled and not an error
if (logResponseBody && !isError) {
try {
const responseBody = await response.clone().text();
if (responseBody) {
responseMetadata.body = responseBody.length > maxBodySize
? responseBody.substring(0, maxBodySize) + '...[truncated]'
: responseBody;
}
} catch (bodyError) {
responseMetadata.bodyError = 'Unable to read response body';
}
} catch (error) {
// Response body might not be available
}
} // Log based on status code
if (isError) {
logger.error('HTTP Request failed', undefined, {
} // Performance metrics matching LogMetadata.performance structure
const performanceMetrics = {
operation: timing.operation,
duration: timing.duration,
startTime: enablePerformanceMetrics ? timing.startTime : undefined,
endTime: enablePerformanceMetrics ? timing.endTime : undefined
}; // Create comprehensive log entry
const logEntry: any = {
correlationId,
request: requestMetadata,
response: {
response: responseMetadata,
performance: performanceMetrics,
type: error ? 'custom' : isError ? 'custom' : 'http_request'
};
// Sanitize if needed
const finalLogEntry = redactSensitiveData ? sanitizeMetadata(logEntry) : logEntry; // Log based on status code and errors
if (error) {
logger.error('HTTP Request error', error, finalLogEntry);
} else if (isError) {
logger.warn('HTTP Request failed', finalLogEntry);
} else if (isRedirect) {
logger.info('HTTP Request redirected', finalLogEntry);
} else {
logger.http('HTTP Request completed', {
method: c.req.method,
url: c.req.url,
statusCode: status,
statusText: response.statusText,
contentLength: response.headers.get('content-length'),
contentType: response.headers.get('content-type')
},
performance: timing
});
} else {
logger.http('HTTP Request completed', {
method: c.req.method,
url: c.req.url,
statusCode: status,
responseTime: timing.duration,
userAgent: c.req.header('user-agent'),
ip: c.req.header('x-forwarded-for') || c.req.header('x-real-ip') || 'unknown'
});
responseTime: timing.duration,
userAgent: c.req.header('user-agent'),
ip: c.req.header('x-forwarded-for') || c.req.header('x-real-ip') || 'unknown'
});
}
}
};
}
@ -198,3 +271,192 @@ export function createRequestLogger(c: Context, baseLogger: Logger): Logger {
ip: c.req.header('x-forwarded-for') || c.req.header('x-real-ip') || 'unknown'
});
}
/**
* Performance monitoring middleware for specific operations
*/
export function performanceMiddleware(operationName?: string, logger?: Logger) {
return async (c: Context, next: Next) => {
const perfLogger = logger || new Logger('performance');
const operation = operationName || `${c.req.method} ${new URL(c.req.url).pathname}`;
const timer = createTimer(operation);
try {
await next();
const timing = timer.end();
perfLogger.info('Operation completed', {
type: 'performance',
performance: timing,
correlationId: c.get('correlationId')
});
} catch (error) {
const timing = timer.end();
perfLogger.warn('Operation failed', {
type: 'performance',
performance: timing,
correlationId: c.get('correlationId'),
error: error instanceof Error ? {
name: error.name,
message: error.message,
stack: error.stack
} : { message: String(error) }
});
throw error;
}
};
}
/**
* Security event logging middleware
*/
export function securityMiddleware(logger?: Logger) {
return async (c: Context, next: Next) => {
const secLogger = logger || new Logger('security');
const url = new URL(c.req.url);
// Log authentication attempts
const authHeader = c.req.header('authorization');
if (authHeader) {
secLogger.info('Authentication attempt', {
type: 'security_event',
security: {
type: 'authentication',
ip: c.req.header('x-forwarded-for') || c.req.header('x-real-ip') || 'unknown',
resource: url.pathname,
action: 'access_attempt'
},
correlationId: c.get('correlationId')
});
}
await next();
// Log access control
const status = c.res.status;
if (status === 401 || status === 403) {
secLogger.warn('Access denied', {
type: 'security_event',
security: {
type: 'authorization',
result: 'failure',
ip: c.req.header('x-forwarded-for') || c.req.header('x-real-ip') || 'unknown',
resource: url.pathname,
action: c.req.method,
severity: status === 401 ? 'medium' : 'high'
},
correlationId: c.get('correlationId')
});
}
};
}
/**
* Business event logging middleware for trading operations
*/
export function businessEventMiddleware(logger?: Logger) {
return async (c: Context, next: Next) => {
const bizLogger = logger || new Logger('business');
const url = new URL(c.req.url);
const path = url.pathname;
// Check if this is a business-critical endpoint
const businessEndpoints = [
'/api/orders',
'/api/trades',
'/api/portfolio',
'/api/strategies',
'/api/signals'
];
const isBusinessEndpoint = businessEndpoints.some(endpoint => path.startsWith(endpoint));
if (isBusinessEndpoint) {
const timer = createTimer(`business_${c.req.method}_${path}`);
try {
await next();
const timing = timer.end();
const status = c.res.status;
bizLogger.info('Business operation completed', {
type: 'business_event',
business: {
type: 'trading_operation',
action: c.req.method,
result: status >= 200 && status < 300 ? 'success' : 'failure'
},
performance: timing,
correlationId: c.get('correlationId')
});
} catch (error) {
const timing = timer.end();
bizLogger.error('Business operation failed', error, {
type: 'business_event',
business: {
type: 'trading_operation',
action: c.req.method,
result: 'failure'
},
performance: timing,
correlationId: c.get('correlationId')
});
throw error;
}
} else {
await next();
}
};
}
/**
* Comprehensive logging middleware that combines all logging features
*/
export function comprehensiveLoggingMiddleware(options: LoggingMiddlewareOptions & {
enableSecurity?: boolean;
enableBusiness?: boolean;
enablePerformance?: boolean;
} = {}) {
const {
enableSecurity = true,
enableBusiness = true,
enablePerformance = true,
...loggingOptions
} = options;
return async (c: Context, next: Next) => {
const middlewares: Array<(c: Context, next: Next) => Promise<void>> = [];
// Add security middleware
if (enableSecurity) {
middlewares.push(securityMiddleware(options.logger));
}
// Add performance middleware
if (enablePerformance) {
middlewares.push(performanceMiddleware(undefined, options.logger));
}
// Add business event middleware
if (enableBusiness) {
middlewares.push(businessEventMiddleware(options.logger));
}
// Add main logging middleware
middlewares.push(loggingMiddleware(loggingOptions));
// Execute middleware chain
let index = 0;
async function dispatch(i: number): Promise<void> {
if (i >= middlewares.length) {
return next();
}
const middleware = middlewares[i];
return middleware(c, () => dispatch(i + 1));
}
return dispatch(0);
};
}

View file

@ -0,0 +1,6 @@
// Data Processing and Pipeline Types
export * from './sources';
export * from './processors';
export * from './pipelines';
export * from './transformations';
export * from './quality';

View file

@ -0,0 +1,181 @@
import { AuthenticationConfig } from '../config/security';
import { ProcessingError } from './processors';
// Data Pipeline Types
export interface ProcessingPipeline {
id: string;
name: string;
description?: string;
version: string;
processors: PipelineProcessor[];
inputFilter: InputFilter;
outputTargets: OutputTargets;
schedule?: PipelineSchedule;
status: PipelineStatus;
metadata: PipelineMetadata;
}
export interface PipelineProcessor {
processorId: string;
order: number;
config?: Record<string, any>;
conditions?: ProcessingCondition[];
parallelBranches?: PipelineProcessor[][];
}
export interface ProcessingCondition {
field: string;
operator: 'equals' | 'not_equals' | 'contains' | 'regex' | 'greater_than' | 'less_than';
value: any;
logicalOperator?: 'AND' | 'OR';
}
export interface InputFilter {
symbols?: string[];
sources?: string[];
dataTypes?: string[];
timeRange?: TimeRange;
conditions?: ProcessingCondition[];
}
export interface TimeRange {
start?: Date;
end?: Date;
duration?: string;
timezone?: string;
}
export interface OutputTargets {
eventBus?: boolean;
database?: DatabaseTarget;
cache?: CacheTarget;
websocket?: WebSocketTarget;
dataProcessor?: ServiceTarget;
featureStore?: ServiceTarget;
file?: FileTarget;
api?: ApiTarget;
}
export interface DatabaseTarget {
enabled: boolean;
connectionId: string;
table: string;
batchSize?: number;
flushInterval?: number;
}
export interface CacheTarget {
enabled: boolean;
connectionId: string;
keyPattern: string;
ttl?: number;
compression?: boolean;
}
export interface WebSocketTarget {
enabled: boolean;
rooms?: string[];
filters?: Record<string, any>;
}
export interface ServiceTarget {
enabled: boolean;
serviceId: string;
endpoint: string;
batchSize?: number;
timeout?: number;
}
export interface FileTarget {
enabled: boolean;
path: string;
format: 'json' | 'csv' | 'parquet' | 'avro';
compression?: 'gzip' | 'bzip2' | 'lz4';
rotation?: FileRotation;
}
export interface FileRotation {
size?: string;
time?: string;
count?: number;
}
export interface ApiTarget {
enabled: boolean;
url: string;
method: string;
headers?: Record<string, string>;
authentication?: AuthenticationConfig;
}
export interface PipelineSchedule {
enabled: boolean;
cronExpression?: string;
interval?: string;
timezone?: string;
startDate?: Date;
endDate?: Date;
}
export enum PipelineStatus {
DRAFT = 'draft',
ACTIVE = 'active',
PAUSED = 'paused',
ERROR = 'error',
COMPLETED = 'completed',
ARCHIVED = 'archived'
}
export interface PipelineMetadata {
createdBy: string;
createdAt: Date;
updatedBy: string;
updatedAt: Date;
tags: string[];
documentation?: string;
changeLog?: ChangeLogEntry[];
}
export interface ChangeLogEntry {
version: string;
timestamp: Date;
author: string;
changes: string[];
breaking: boolean;
}
export interface PipelineExecution {
id: string;
pipelineId: string;
status: ExecutionStatus;
startTime: Date;
endTime?: Date;
recordsProcessed: number;
recordsSuccess: number;
recordsError: number;
errors: ProcessingError[];
metadata: ExecutionMetadata;
}
export enum ExecutionStatus {
PENDING = 'pending',
RUNNING = 'running',
COMPLETED = 'completed',
FAILED = 'failed',
CANCELLED = 'cancelled',
TIMEOUT = 'timeout'
}
export interface ExecutionMetadata {
triggeredBy: 'schedule' | 'manual' | 'event';
trigger?: string;
parameters?: Record<string, any>;
resources?: ResourceUsage;
}
export interface ResourceUsage {
cpuTimeMs: number;
memoryPeakMB: number;
diskIOBytes: number;
networkIOBytes: number;
}

View file

@ -0,0 +1,103 @@
// Data Processing Types
export interface DataProcessor {
id: string;
name: string;
type: ProcessorType;
enabled: boolean;
priority: number;
config: ProcessorConfig;
inputSchema?: DataSchema;
outputSchema?: DataSchema;
metrics: ProcessorMetrics;
}
export enum ProcessorType {
ENRICHMENT = 'enrichment',
VALIDATION = 'validation',
NORMALIZATION = 'normalization',
AGGREGATION = 'aggregation',
FILTER = 'filter',
TRANSFORMATION = 'transformation',
QUALITY_CHECK = 'quality_check',
DEDUPLICATION = 'deduplication'
}
export interface ProcessorConfig {
batchSize?: number;
timeoutMs?: number;
retryAttempts?: number;
parallelism?: number;
parameters?: Record<string, any>;
errorHandling?: ErrorHandlingConfig;
}
export interface ErrorHandlingConfig {
strategy: 'fail_fast' | 'skip_invalid' | 'retry' | 'quarantine';
maxErrors?: number;
quarantineLocation?: string;
notificationChannels?: string[];
}
export interface ProcessorMetrics {
recordsProcessed: number;
recordsSuccess: number;
recordsError: number;
avgProcessingTimeMs: number;
lastProcessed: Date;
errorRate: number;
throughputPerSecond: number;
}
export interface ProcessingError {
processorId: string;
pipelineId: string;
timestamp: Date;
inputData: any;
error: string;
stackTrace?: string;
severity: 'low' | 'medium' | 'high' | 'critical';
recoverable: boolean;
}
export interface DataSchema {
version: string;
fields: SchemaField[];
primaryKeys?: string[];
foreignKeys?: ForeignKey[];
indexes?: Index[];
constraints?: SchemaConstraint[];
}
export interface SchemaField {
name: string;
type: string;
nullable: boolean;
description?: string;
defaultValue?: any;
format?: string;
pattern?: string;
enum?: any[];
}
export interface ForeignKey {
fields: string[];
referencedTable: string;
referencedFields: string[];
onDelete?: 'cascade' | 'restrict' | 'set_null';
onUpdate?: 'cascade' | 'restrict' | 'set_null';
}
export interface Index {
name: string;
fields: string[];
unique: boolean;
type: 'btree' | 'hash' | 'gin' | 'gist';
}
export interface SchemaConstraint {
name: string;
type: 'check' | 'unique' | 'not_null' | 'range';
fields: string[];
condition?: string;
parameters?: Record<string, any>;
}

View file

@ -0,0 +1,193 @@
// Data Quality Types
export interface DataQuality {
id: string;
assetId: string;
overallScore: number;
dimensions: QualityDimension[];
rules: QualityRule[];
issues: QualityIssue[];
trend: QualityTrend;
lastAssessment: Date;
nextAssessment?: Date;
}
export interface QualityDimension {
name: QualityDimensionType;
score: number;
weight: number;
description: string;
metrics: QualityMetric[];
status: 'pass' | 'warn' | 'fail';
}
export enum QualityDimensionType {
COMPLETENESS = 'completeness',
ACCURACY = 'accuracy',
CONSISTENCY = 'consistency',
VALIDITY = 'validity',
UNIQUENESS = 'uniqueness',
TIMELINESS = 'timeliness',
INTEGRITY = 'integrity',
CONFORMITY = 'conformity',
RELEVANCE = 'relevance'
}
export interface QualityRule {
id: string;
name: string;
description: string;
dimension: QualityDimensionType;
type: QualityRuleType;
field?: string;
condition: string;
threshold: number;
severity: 'low' | 'medium' | 'high' | 'critical';
enabled: boolean;
metadata?: Record<string, any>;
}
export enum QualityRuleType {
NULL_CHECK = 'null_check',
RANGE_CHECK = 'range_check',
PATTERN_CHECK = 'pattern_check',
REFERENCE_CHECK = 'reference_check',
DUPLICATE_CHECK = 'duplicate_check',
FRESHNESS_CHECK = 'freshness_check',
OUTLIER_CHECK = 'outlier_check',
CUSTOM = 'custom'
}
export interface QualityMetric {
name: string;
value: number;
unit?: string;
threshold?: number;
status: 'pass' | 'warn' | 'fail';
timestamp: Date;
trend?: 'improving' | 'stable' | 'degrading';
}
export interface QualityIssue {
id: string;
ruleId: string;
severity: 'low' | 'medium' | 'high' | 'critical';
description: string;
field?: string;
affectedRows?: number;
sampleValues?: any[];
detectedAt: Date;
status: 'open' | 'acknowledged' | 'resolved' | 'false_positive';
assignee?: string;
resolution?: string;
resolvedAt?: Date;
impact?: QualityImpact;
}
export interface QualityImpact {
scope: 'field' | 'table' | 'dataset' | 'system';
affectedRecords: number;
downstreamAssets: string[];
businessImpact: 'low' | 'medium' | 'high' | 'critical';
estimatedCost?: number;
}
export interface QualityTrend {
timeframe: 'day' | 'week' | 'month' | 'quarter';
dataPoints: QualityDataPoint[];
trend: 'improving' | 'stable' | 'degrading';
changeRate: number;
projectedScore?: number;
}
export interface QualityDataPoint {
timestamp: Date;
score: number;
dimensionScores: Record<QualityDimensionType, number>;
issueCount: number;
rulesPassed: number;
rulesTotal: number;
}
export interface QualityProfile {
assetId: string;
fieldProfiles: FieldProfile[];
rowCount: number;
columnCount: number;
dataTypes: Record<string, string>;
nullCounts: Record<string, number>;
uniqueCounts: Record<string, number>;
profiledAt: Date;
}
export interface FieldProfile {
fieldName: string;
dataType: string;
nullCount: number;
uniqueCount: number;
minValue?: any;
maxValue?: any;
avgValue?: number;
medianValue?: number;
standardDeviation?: number;
topValues?: ValueFrequency[];
histogram?: HistogramBucket[];
}
export interface ValueFrequency {
value: any;
count: number;
percentage: number;
}
export interface HistogramBucket {
min: number;
max: number;
count: number;
percentage: number;
}
export interface QualityReport {
id: string;
assetId: string;
reportType: 'summary' | 'detailed' | 'trend' | 'comparison';
generatedAt: Date;
period: string;
summary: QualitySummary;
details?: QualityDetails;
recommendations?: QualityRecommendation[];
}
export interface QualitySummary {
overallScore: number;
scoreTrend: 'improving' | 'stable' | 'degrading';
totalIssues: number;
criticalIssues: number;
resolvedIssues: number;
rulesPassed: number;
rulesTotal: number;
}
export interface QualityDetails {
dimensionBreakdown: Record<QualityDimensionType, QualityDimension>;
topIssues: QualityIssue[];
ruleResults: QualityRuleResult[];
historicalTrend: QualityDataPoint[];
}
export interface QualityRuleResult {
ruleId: string;
ruleName: string;
status: 'pass' | 'fail';
score: number;
violations: number;
executionTime: number;
}
export interface QualityRecommendation {
type: 'rule_adjustment' | 'data_cleanup' | 'process_improvement' | 'monitoring';
priority: 'low' | 'medium' | 'high';
description: string;
impact: string;
effort: string;
steps: string[];
}

View file

@ -0,0 +1,84 @@
// Data Source Configuration Types
export interface DataSourceConfig {
id: string;
name: string;
type: 'websocket' | 'rest' | 'fix' | 'stream' | 'file' | 'database';
enabled: boolean;
priority: number;
rateLimit: DataRateLimitConfig;
connection: DataConnectionConfig;
subscriptions: SubscriptionConfig;
symbols: string[];
retryPolicy: RetryPolicyConfig;
healthCheck: HealthCheckConfig;
metadata?: Record<string, any>;
}
export interface DataRateLimitConfig {
requestsPerSecond: number;
burstLimit: number;
windowMs?: number;
}
export interface DataConnectionConfig {
url: string;
headers?: Record<string, string>;
queryParams?: Record<string, string>;
authentication?: DataAuthenticationConfig;
timeout?: number;
keepAlive?: boolean;
compression?: boolean;
}
export interface DataAuthenticationConfig {
type: 'apikey' | 'oauth' | 'basic' | 'jwt' | 'cert';
credentials: Record<string, string>;
refreshInterval?: number;
}
export interface SubscriptionConfig {
quotes: boolean;
trades: boolean;
orderbook: boolean;
candles: boolean;
news: boolean;
fundamentals?: boolean;
options?: boolean;
futures?: boolean;
}
export interface RetryPolicyConfig {
maxRetries: number;
backoffMultiplier: number;
maxBackoffMs: number;
enableJitter?: boolean;
}
export interface HealthCheckConfig {
intervalMs: number;
timeoutMs: number;
expectedLatencyMs: number;
failureThreshold?: number;
}
export interface DataSourceStatus {
id: string;
status: 'connected' | 'disconnected' | 'connecting' | 'error';
lastUpdate: Date;
connectionTime?: Date;
disconnectionTime?: Date;
errorCount: number;
messagesReceived: number;
latencyMs: number;
throughputPerSecond: number;
}
export interface DataSourceError {
sourceId: string;
timestamp: Date;
type: 'connection' | 'authentication' | 'ratelimit' | 'data' | 'timeout' | 'parsing';
message: string;
details?: Record<string, any>;
severity: 'low' | 'medium' | 'high' | 'critical';
recoverable: boolean;
}

View file

@ -0,0 +1,119 @@
// Data Transformation Types
export interface DataTransformation {
id: string;
name: string;
type: TransformationType;
description?: string;
code?: string;
inputFields: string[];
outputFields: string[];
logic: string;
parameters?: Record<string, any>;
validationRules?: ValidationRule[];
}
export enum TransformationType {
FILTER = 'filter',
AGGREGATE = 'aggregate',
JOIN = 'join',
UNION = 'union',
PIVOT = 'pivot',
UNPIVOT = 'unpivot',
SORT = 'sort',
DEDUPLICATE = 'deduplicate',
CALCULATE = 'calculate',
CAST = 'cast',
RENAME = 'rename',
SPLIT = 'split',
MERGE = 'merge',
NORMALIZE = 'normalize',
ENRICH = 'enrich'
}
export interface ValidationRule {
field: string;
type: ValidationType;
parameters?: Record<string, any>;
errorMessage?: string;
severity: 'warning' | 'error' | 'critical';
}
export enum ValidationType {
REQUIRED = 'required',
TYPE_CHECK = 'type_check',
RANGE = 'range',
PATTERN = 'pattern',
LENGTH = 'length',
UNIQUE = 'unique',
REFERENCE = 'reference',
CUSTOM = 'custom'
}
export interface TransformationRule {
id: string;
sourceField: string;
targetField: string;
transformation: string;
condition?: string;
parameters?: Record<string, any>;
}
export interface MappingRule {
sourceField: string;
targetField: string;
transformation?: string;
defaultValue?: any;
required: boolean;
}
export interface AggregationRule {
groupByFields: string[];
aggregations: AggregationFunction[];
having?: string;
windowFunction?: WindowFunction;
}
export interface AggregationFunction {
function: 'sum' | 'avg' | 'count' | 'min' | 'max' | 'first' | 'last' | 'stddev' | 'variance';
field: string;
alias?: string;
distinct?: boolean;
}
export interface WindowFunction {
type: 'row_number' | 'rank' | 'dense_rank' | 'lag' | 'lead' | 'first_value' | 'last_value';
partitionBy?: string[];
orderBy?: OrderByClause[];
frame?: WindowFrame;
}
export interface OrderByClause {
field: string;
direction: 'asc' | 'desc';
nulls?: 'first' | 'last';
}
export interface WindowFrame {
type: 'rows' | 'range';
start: FrameBoundary;
end: FrameBoundary;
}
export interface FrameBoundary {
type: 'unbounded_preceding' | 'current_row' | 'unbounded_following' | 'preceding' | 'following';
offset?: number;
}
export interface JoinRule {
type: 'inner' | 'left' | 'right' | 'full' | 'cross';
leftTable: string;
rightTable: string;
joinConditions: JoinCondition[];
selectFields?: string[];
}
export interface JoinCondition {
leftField: string;
rightField: string;
operator: '=' | '<>' | '<' | '>' | '<=' | '>=';
}