adding data-services

This commit is contained in:
Bojan Kucera 2025-06-03 07:42:48 -04:00
parent e3bfd05b90
commit 405b818c86
139 changed files with 55943 additions and 416 deletions

View file

@ -0,0 +1,104 @@
import { Context } from 'hono';
import { logger } from '@stock-bot/utils';
export class HealthController {
async getHealth(c: Context): Promise<Response> {
try {
const health = {
status: 'healthy',
timestamp: new Date().toISOString(),
service: 'data-processor',
version: process.env.npm_package_version || '1.0.0',
uptime: process.uptime(),
environment: process.env.NODE_ENV || 'development',
dependencies: {
redis: await this.checkRedisHealth(),
eventBus: await this.checkEventBusHealth(),
}
};
return c.json(health);
} catch (error) {
logger.error('Health check failed:', error);
return c.json({
status: 'unhealthy',
timestamp: new Date().toISOString(),
service: 'data-processor',
error: error instanceof Error ? error.message : 'Unknown error'
}, 503);
}
}
async getDetailedHealth(c: Context): Promise<Response> {
try {
const health = {
status: 'healthy',
timestamp: new Date().toISOString(),
service: 'data-processor',
version: process.env.npm_package_version || '1.0.0',
uptime: process.uptime(),
environment: process.env.NODE_ENV || 'development',
system: {
platform: process.platform,
architecture: process.arch,
nodeVersion: process.version,
memory: process.memoryUsage(),
pid: process.pid
},
dependencies: {
redis: await this.checkRedisHealth(),
eventBus: await this.checkEventBusHealth(),
},
metrics: {
activePipelines: 0, // Will be populated by orchestrator
runningJobs: 0, // Will be populated by orchestrator
totalProcessedRecords: 0 // Will be populated by orchestrator
}
};
return c.json(health);
} catch (error) {
logger.error('Detailed health check failed:', error);
return c.json({
status: 'unhealthy',
timestamp: new Date().toISOString(),
service: 'data-processor',
error: error instanceof Error ? error.message : 'Unknown error'
}, 503);
}
}
private async checkRedisHealth(): Promise<{ status: string; latency?: number; error?: string }> {
try {
const startTime = Date.now();
// In a real implementation, ping Redis here
const latency = Date.now() - startTime;
return {
status: 'healthy',
latency
};
} catch (error) {
return {
status: 'unhealthy',
error: error instanceof Error ? error.message : 'Redis connection failed'
};
}
}
private async checkEventBusHealth(): Promise<{ status: string; error?: string }> {
try {
// In a real implementation, check event bus connection here
return {
status: 'healthy'
};
} catch (error) {
return {
status: 'unhealthy',
error: error instanceof Error ? error.message : 'Event bus connection failed'
};
}
}
}

View file

@ -0,0 +1,297 @@
import { Context } from 'hono';
import { logger } from '@stock-bot/utils';
import { DataPipelineOrchestrator } from '../core/DataPipelineOrchestrator';
import { JobStatus } from '../types/DataPipeline';
export class JobController {
constructor(private orchestrator: DataPipelineOrchestrator) {}
async listJobs(c: Context): Promise<Response> {
try {
const pipelineId = c.req.query('pipelineId');
const status = c.req.query('status') as JobStatus;
const limit = parseInt(c.req.query('limit') || '50');
const offset = parseInt(c.req.query('offset') || '0');
let jobs = this.orchestrator.listJobs(pipelineId);
// Filter by status if provided
if (status) {
jobs = jobs.filter(job => job.status === status);
}
// Sort by creation time (newest first)
jobs.sort((a, b) => b.createdAt.getTime() - a.createdAt.getTime());
// Apply pagination
const totalJobs = jobs.length;
const paginatedJobs = jobs.slice(offset, offset + limit);
return c.json({
success: true,
data: paginatedJobs,
pagination: {
total: totalJobs,
limit,
offset,
hasMore: offset + limit < totalJobs
}
});
} catch (error) {
logger.error('Failed to list jobs:', error);
return c.json({
success: false,
error: error instanceof Error ? error.message : 'Failed to list jobs'
}, 500);
}
}
async getJob(c: Context): Promise<Response> {
try {
const jobId = c.req.param('id');
const job = this.orchestrator.getJob(jobId);
if (!job) {
return c.json({
success: false,
error: 'Job not found'
}, 404);
}
return c.json({
success: true,
data: job
});
} catch (error) {
logger.error('Failed to get job:', error);
return c.json({
success: false,
error: error instanceof Error ? error.message : 'Failed to get job'
}, 500);
}
}
async cancelJob(c: Context): Promise<Response> {
try {
const jobId = c.req.param('id');
const job = this.orchestrator.getJob(jobId);
if (!job) {
return c.json({
success: false,
error: 'Job not found'
}, 404);
}
if (job.status !== JobStatus.RUNNING && job.status !== JobStatus.PENDING) {
return c.json({
success: false,
error: 'Job cannot be cancelled in current status'
}, 400);
}
// Update job status to cancelled
job.status = JobStatus.CANCELLED;
job.completedAt = new Date();
job.error = 'Job cancelled by user';
logger.info(`Cancelled job: ${jobId}`);
return c.json({
success: true,
message: 'Job cancelled successfully',
data: job
});
} catch (error) {
logger.error('Failed to cancel job:', error);
return c.json({
success: false,
error: error instanceof Error ? error.message : 'Failed to cancel job'
}, 500);
}
}
async retryJob(c: Context): Promise<Response> {
try {
const jobId = c.req.param('id');
const job = this.orchestrator.getJob(jobId);
if (!job) {
return c.json({
success: false,
error: 'Job not found'
}, 404);
}
if (job.status !== JobStatus.FAILED) {
return c.json({
success: false,
error: 'Only failed jobs can be retried'
}, 400);
}
// Create a new job with the same parameters
const newJob = await this.orchestrator.runPipeline(job.pipelineId, job.parameters);
logger.info(`Retried job: ${jobId} as new job: ${newJob.id}`);
return c.json({
success: true,
message: 'Job retried successfully',
data: {
originalJob: job,
newJob: newJob
}
});
} catch (error) {
logger.error('Failed to retry job:', error);
return c.json({
success: false,
error: error instanceof Error ? error.message : 'Failed to retry job'
}, 500);
}
}
async getJobLogs(c: Context): Promise<Response> {
try {
const jobId = c.req.param('id');
const job = this.orchestrator.getJob(jobId);
if (!job) {
return c.json({
success: false,
error: 'Job not found'
}, 404);
}
// In a real implementation, fetch logs from a log store
const logs = [
{
timestamp: job.createdAt,
level: 'info',
message: `Job ${jobId} created`
},
...(job.startedAt ? [{
timestamp: job.startedAt,
level: 'info',
message: `Job ${jobId} started`
}] : []),
...(job.completedAt ? [{
timestamp: job.completedAt,
level: job.status === JobStatus.COMPLETED ? 'info' : 'error',
message: job.status === JobStatus.COMPLETED ?
`Job ${jobId} completed successfully` :
`Job ${jobId} failed: ${job.error}`
}] : [])
];
return c.json({
success: true,
data: {
jobId,
logs,
totalLogs: logs.length
}
});
} catch (error) {
logger.error('Failed to get job logs:', error);
return c.json({
success: false,
error: error instanceof Error ? error.message : 'Failed to get job logs'
}, 500);
}
}
async getJobMetrics(c: Context): Promise<Response> {
try {
const jobId = c.req.param('id');
const job = this.orchestrator.getJob(jobId);
if (!job) {
return c.json({
success: false,
error: 'Job not found'
}, 404);
}
const metrics = {
...job.metrics,
duration: job.completedAt && job.startedAt ?
job.completedAt.getTime() - job.startedAt.getTime() : null,
successRate: job.metrics.recordsProcessed > 0 ?
(job.metrics.recordsSuccessful / job.metrics.recordsProcessed) * 100 : 0,
errorRate: job.metrics.recordsProcessed > 0 ?
(job.metrics.recordsFailed / job.metrics.recordsProcessed) * 100 : 0,
status: job.status,
startedAt: job.startedAt,
completedAt: job.completedAt
};
return c.json({
success: true,
data: metrics
});
} catch (error) {
logger.error('Failed to get job metrics:', error);
return c.json({
success: false,
error: error instanceof Error ? error.message : 'Failed to get job metrics'
}, 500);
}
}
async getJobStats(c: Context): Promise<Response> {
try {
const jobs = this.orchestrator.listJobs();
const stats = {
total: jobs.length,
byStatus: {
pending: jobs.filter(j => j.status === JobStatus.PENDING).length,
running: jobs.filter(j => j.status === JobStatus.RUNNING).length,
completed: jobs.filter(j => j.status === JobStatus.COMPLETED).length,
failed: jobs.filter(j => j.status === JobStatus.FAILED).length,
cancelled: jobs.filter(j => j.status === JobStatus.CANCELLED).length,
},
metrics: {
totalRecordsProcessed: jobs.reduce((sum, j) => sum + j.metrics.recordsProcessed, 0),
totalRecordsSuccessful: jobs.reduce((sum, j) => sum + j.metrics.recordsSuccessful, 0),
totalRecordsFailed: jobs.reduce((sum, j) => sum + j.metrics.recordsFailed, 0),
averageProcessingTime: jobs.length > 0 ?
jobs.reduce((sum, j) => sum + j.metrics.processingTimeMs, 0) / jobs.length : 0,
successRate: jobs.length > 0 ?
(jobs.filter(j => j.status === JobStatus.COMPLETED).length / jobs.length) * 100 : 0
},
recentJobs: jobs
.sort((a, b) => b.createdAt.getTime() - a.createdAt.getTime())
.slice(0, 10)
.map(job => ({
id: job.id,
pipelineId: job.pipelineId,
status: job.status,
createdAt: job.createdAt,
processingTime: job.metrics.processingTimeMs,
recordsProcessed: job.metrics.recordsProcessed
}))
};
return c.json({
success: true,
data: stats
});
} catch (error) {
logger.error('Failed to get job stats:', error);
return c.json({
success: false,
error: error instanceof Error ? error.message : 'Failed to get job stats'
}, 500);
}
}
}

View file

@ -0,0 +1,346 @@
import { Context } from 'hono';
import { logger } from '@stock-bot/utils';
import { DataPipelineOrchestrator } from '../core/DataPipelineOrchestrator';
import { DataPipeline, PipelineStatus } from '../types/DataPipeline';
export class PipelineController {
constructor(private orchestrator: DataPipelineOrchestrator) {}
async listPipelines(c: Context): Promise<Response> {
try {
const pipelines = this.orchestrator.listPipelines();
return c.json({
success: true,
data: pipelines,
total: pipelines.length
});
} catch (error) {
logger.error('Failed to list pipelines:', error);
return c.json({
success: false,
error: error instanceof Error ? error.message : 'Failed to list pipelines'
}, 500);
}
}
async createPipeline(c: Context): Promise<Response> {
try {
const pipelineData = await c.req.json();
// Validate required fields
if (!pipelineData.name) {
return c.json({
success: false,
error: 'Pipeline name is required'
}, 400);
}
const pipeline = await this.orchestrator.createPipeline(pipelineData);
logger.info(`Created pipeline: ${pipeline.name} (${pipeline.id})`);
return c.json({
success: true,
data: pipeline
}, 201);
} catch (error) {
logger.error('Failed to create pipeline:', error);
return c.json({
success: false,
error: error instanceof Error ? error.message : 'Failed to create pipeline'
}, 500);
}
}
async getPipeline(c: Context): Promise<Response> {
try {
const pipelineId = c.req.param('id');
const pipeline = this.orchestrator.getPipeline(pipelineId);
if (!pipeline) {
return c.json({
success: false,
error: 'Pipeline not found'
}, 404);
}
return c.json({
success: true,
data: pipeline
});
} catch (error) {
logger.error('Failed to get pipeline:', error);
return c.json({
success: false,
error: error instanceof Error ? error.message : 'Failed to get pipeline'
}, 500);
}
}
async updatePipeline(c: Context): Promise<Response> {
try {
const pipelineId = c.req.param('id');
const updateData = await c.req.json();
const existingPipeline = this.orchestrator.getPipeline(pipelineId);
if (!existingPipeline) {
return c.json({
success: false,
error: 'Pipeline not found'
}, 404);
}
// Update pipeline (in a real implementation, this would use a proper update method)
const updatedPipeline: DataPipeline = {
...existingPipeline,
...updateData,
id: pipelineId, // Ensure ID doesn't change
updatedAt: new Date()
};
// In a real implementation, save to persistent storage
logger.info(`Updated pipeline: ${updatedPipeline.name} (${pipelineId})`);
return c.json({
success: true,
data: updatedPipeline
});
} catch (error) {
logger.error('Failed to update pipeline:', error);
return c.json({
success: false,
error: error instanceof Error ? error.message : 'Failed to update pipeline'
}, 500);
}
}
async deletePipeline(c: Context): Promise<Response> {
try {
const pipelineId = c.req.param('id');
const pipeline = this.orchestrator.getPipeline(pipelineId);
if (!pipeline) {
return c.json({
success: false,
error: 'Pipeline not found'
}, 404);
}
// Check if pipeline is running
const runningJobs = this.orchestrator.listJobs(pipelineId);
if (runningJobs.length > 0) {
return c.json({
success: false,
error: 'Cannot delete pipeline with running jobs'
}, 400);
}
// In a real implementation, delete from persistent storage
logger.info(`Deleted pipeline: ${pipeline.name} (${pipelineId})`);
return c.json({
success: true,
message: 'Pipeline deleted successfully'
});
} catch (error) {
logger.error('Failed to delete pipeline:', error);
return c.json({
success: false,
error: error instanceof Error ? error.message : 'Failed to delete pipeline'
}, 500);
}
}
async runPipeline(c: Context): Promise<Response> {
try {
const pipelineId = c.req.param('id');
const parameters = await c.req.json().catch(() => ({}));
const pipeline = this.orchestrator.getPipeline(pipelineId);
if (!pipeline) {
return c.json({
success: false,
error: 'Pipeline not found'
}, 404);
}
if (pipeline.status !== PipelineStatus.ACTIVE) {
return c.json({
success: false,
error: 'Pipeline is not active'
}, 400);
}
const job = await this.orchestrator.runPipeline(pipelineId, parameters);
logger.info(`Started pipeline job: ${job.id} for pipeline: ${pipelineId}`);
return c.json({
success: true,
data: job
}, 202);
} catch (error) {
logger.error('Failed to run pipeline:', error);
return c.json({
success: false,
error: error instanceof Error ? error.message : 'Failed to run pipeline'
}, 500);
}
}
async schedulePipeline(c: Context): Promise<Response> {
try {
const pipelineId = c.req.param('id');
const { cronExpression } = await c.req.json();
if (!cronExpression) {
return c.json({
success: false,
error: 'Cron expression is required'
}, 400);
}
const pipeline = this.orchestrator.getPipeline(pipelineId);
if (!pipeline) {
return c.json({
success: false,
error: 'Pipeline not found'
}, 404);
}
await this.orchestrator.schedulePipeline(pipelineId, cronExpression);
logger.info(`Scheduled pipeline: ${pipelineId} with cron: ${cronExpression}`);
return c.json({
success: true,
message: 'Pipeline scheduled successfully',
data: {
pipelineId,
cronExpression
}
});
} catch (error) {
logger.error('Failed to schedule pipeline:', error);
return c.json({
success: false,
error: error instanceof Error ? error.message : 'Failed to schedule pipeline'
}, 500);
}
}
async pausePipeline(c: Context): Promise<Response> {
try {
const pipelineId = c.req.param('id');
const pipeline = this.orchestrator.getPipeline(pipelineId);
if (!pipeline) {
return c.json({
success: false,
error: 'Pipeline not found'
}, 404);
}
// Update pipeline status to paused
pipeline.status = PipelineStatus.PAUSED;
pipeline.updatedAt = new Date();
logger.info(`Paused pipeline: ${pipelineId}`);
return c.json({
success: true,
message: 'Pipeline paused successfully',
data: pipeline
});
} catch (error) {
logger.error('Failed to pause pipeline:', error);
return c.json({
success: false,
error: error instanceof Error ? error.message : 'Failed to pause pipeline'
}, 500);
}
}
async resumePipeline(c: Context): Promise<Response> {
try {
const pipelineId = c.req.param('id');
const pipeline = this.orchestrator.getPipeline(pipelineId);
if (!pipeline) {
return c.json({
success: false,
error: 'Pipeline not found'
}, 404);
}
// Update pipeline status to active
pipeline.status = PipelineStatus.ACTIVE;
pipeline.updatedAt = new Date();
logger.info(`Resumed pipeline: ${pipelineId}`);
return c.json({
success: true,
message: 'Pipeline resumed successfully',
data: pipeline
});
} catch (error) {
logger.error('Failed to resume pipeline:', error);
return c.json({
success: false,
error: error instanceof Error ? error.message : 'Failed to resume pipeline'
}, 500);
}
}
async getPipelineMetrics(c: Context): Promise<Response> {
try {
const pipelineId = c.req.param('id');
const pipeline = this.orchestrator.getPipeline(pipelineId);
if (!pipeline) {
return c.json({
success: false,
error: 'Pipeline not found'
}, 404);
}
const jobs = this.orchestrator.listJobs(pipelineId);
const metrics = {
totalJobs: jobs.length,
completedJobs: jobs.filter(j => j.status === 'completed').length,
failedJobs: jobs.filter(j => j.status === 'failed').length,
runningJobs: jobs.filter(j => j.status === 'running').length,
totalRecordsProcessed: jobs.reduce((sum, j) => sum + j.metrics.recordsProcessed, 0),
totalProcessingTime: jobs.reduce((sum, j) => sum + j.metrics.processingTimeMs, 0),
averageProcessingTime: jobs.length > 0 ?
jobs.reduce((sum, j) => sum + j.metrics.processingTimeMs, 0) / jobs.length : 0,
successRate: jobs.length > 0 ?
(jobs.filter(j => j.status === 'completed').length / jobs.length) * 100 : 0
};
return c.json({
success: true,
data: metrics
});
} catch (error) {
logger.error('Failed to get pipeline metrics:', error);
return c.json({
success: false,
error: error instanceof Error ? error.message : 'Failed to get pipeline metrics'
}, 500);
}
}
}

View file

@ -0,0 +1,293 @@
import { EventBus } from '@stock-bot/event-bus';
import { logger } from '@stock-bot/utils';
import { DataPipeline, PipelineStatus, PipelineJob, JobStatus } from '../types/DataPipeline';
import { DataIngestionService } from '../services/DataIngestionService';
import { DataTransformationService } from '../services/DataTransformationService';
import { DataValidationService } from '../services/DataValidationService';
import { DataQualityService } from '../services/DataQualityService';
import { PipelineScheduler } from './PipelineScheduler';
import { JobQueue } from './JobQueue';
export class DataPipelineOrchestrator {
private eventBus: EventBus;
private scheduler: PipelineScheduler;
private jobQueue: JobQueue;
private pipelines: Map<string, DataPipeline> = new Map();
private runningJobs: Map<string, PipelineJob> = new Map();
constructor(
private ingestionService: DataIngestionService,
private transformationService: DataTransformationService,
private validationService: DataValidationService,
private qualityService: DataQualityService
) {
this.eventBus = new EventBus();
this.scheduler = new PipelineScheduler(this);
this.jobQueue = new JobQueue(this);
}
async initialize(): Promise<void> {
logger.info('🔄 Initializing Data Pipeline Orchestrator...');
await this.eventBus.initialize();
await this.scheduler.initialize();
await this.jobQueue.initialize();
// Subscribe to pipeline events
await this.eventBus.subscribe('data.pipeline.*', this.handlePipelineEvent.bind(this));
await this.eventBus.subscribe('data.job.*', this.handleJobEvent.bind(this));
// Load existing pipelines
await this.loadPipelines();
logger.info('✅ Data Pipeline Orchestrator initialized');
}
async createPipeline(pipeline: Omit<DataPipeline, 'id' | 'createdAt' | 'updatedAt'>): Promise<DataPipeline> {
const pipelineWithId: DataPipeline = {
...pipeline,
id: this.generatePipelineId(),
status: PipelineStatus.DRAFT,
createdAt: new Date(),
updatedAt: new Date(),
};
this.pipelines.set(pipelineWithId.id, pipelineWithId);
await this.eventBus.publish('data.pipeline.created', {
pipelineId: pipelineWithId.id,
pipeline: pipelineWithId,
});
logger.info(`📋 Created pipeline: ${pipelineWithId.name} (${pipelineWithId.id})`);
return pipelineWithId;
}
async runPipeline(pipelineId: string, parameters?: Record<string, any>): Promise<PipelineJob> {
const pipeline = this.pipelines.get(pipelineId);
if (!pipeline) {
throw new Error(`Pipeline not found: ${pipelineId}`);
}
if (pipeline.status !== PipelineStatus.ACTIVE) {
throw new Error(`Pipeline is not active: ${pipeline.status}`);
}
const job: PipelineJob = {
id: this.generateJobId(),
pipelineId,
status: JobStatus.PENDING,
parameters: parameters || {},
createdAt: new Date(),
startedAt: null,
completedAt: null,
error: null,
metrics: {
recordsProcessed: 0,
recordsSuccessful: 0,
recordsFailed: 0,
processingTimeMs: 0,
},
};
this.runningJobs.set(job.id, job);
// Queue the job for execution
await this.jobQueue.enqueueJob(job);
await this.eventBus.publish('data.job.queued', {
jobId: job.id,
pipelineId,
job,
});
logger.info(`🚀 Queued pipeline job: ${job.id} for pipeline: ${pipeline.name}`);
return job;
}
async executePipelineJob(job: PipelineJob): Promise<void> {
const pipeline = this.pipelines.get(job.pipelineId);
if (!pipeline) {
throw new Error(`Pipeline not found: ${job.pipelineId}`);
}
const startTime = Date.now();
job.status = JobStatus.RUNNING;
job.startedAt = new Date();
await this.eventBus.publish('data.job.started', {
jobId: job.id,
pipelineId: job.pipelineId,
job,
});
try {
logger.info(`⚙️ Executing pipeline job: ${job.id}`);
// Execute pipeline steps
await this.executeIngestionStep(pipeline, job);
await this.executeTransformationStep(pipeline, job);
await this.executeValidationStep(pipeline, job);
await this.executeQualityChecks(pipeline, job);
// Complete the job
job.status = JobStatus.COMPLETED;
job.completedAt = new Date();
job.metrics.processingTimeMs = Date.now() - startTime;
await this.eventBus.publish('data.job.completed', {
jobId: job.id,
pipelineId: job.pipelineId,
job,
});
logger.info(`✅ Pipeline job completed: ${job.id} in ${job.metrics.processingTimeMs}ms`);
} catch (error) {
job.status = JobStatus.FAILED;
job.completedAt = new Date();
job.error = error instanceof Error ? error.message : 'Unknown error';
job.metrics.processingTimeMs = Date.now() - startTime;
await this.eventBus.publish('data.job.failed', {
jobId: job.id,
pipelineId: job.pipelineId,
job,
error: job.error,
});
logger.error(`❌ Pipeline job failed: ${job.id}`, error);
throw error;
}
}
private async executeIngestionStep(pipeline: DataPipeline, job: PipelineJob): Promise<void> {
if (!pipeline.steps.ingestion) return;
logger.info(`📥 Executing ingestion step for job: ${job.id}`);
const result = await this.ingestionService.ingestData(
pipeline.steps.ingestion,
job.parameters
);
job.metrics.recordsProcessed += result.recordsProcessed;
job.metrics.recordsSuccessful += result.recordsSuccessful;
job.metrics.recordsFailed += result.recordsFailed;
}
private async executeTransformationStep(pipeline: DataPipeline, job: PipelineJob): Promise<void> {
if (!pipeline.steps.transformation) return;
logger.info(`🔄 Executing transformation step for job: ${job.id}`);
const result = await this.transformationService.transformData(
pipeline.steps.transformation,
job.parameters
);
job.metrics.recordsProcessed += result.recordsProcessed;
job.metrics.recordsSuccessful += result.recordsSuccessful;
job.metrics.recordsFailed += result.recordsFailed;
}
private async executeValidationStep(pipeline: DataPipeline, job: PipelineJob): Promise<void> {
if (!pipeline.steps.validation) return;
logger.info(`✅ Executing validation step for job: ${job.id}`);
const result = await this.validationService.validateData(
pipeline.steps.validation,
job.parameters
);
job.metrics.recordsProcessed += result.recordsProcessed;
job.metrics.recordsSuccessful += result.recordsSuccessful;
job.metrics.recordsFailed += result.recordsFailed;
}
private async executeQualityChecks(pipeline: DataPipeline, job: PipelineJob): Promise<void> {
if (!pipeline.steps.qualityChecks) return;
logger.info(`🔍 Executing quality checks for job: ${job.id}`);
await this.qualityService.runQualityChecks(
pipeline.steps.qualityChecks,
job.parameters
);
}
async schedulePipeline(pipelineId: string, cronExpression: string): Promise<void> {
const pipeline = this.pipelines.get(pipelineId);
if (!pipeline) {
throw new Error(`Pipeline not found: ${pipelineId}`);
}
await this.scheduler.schedulePipeline(pipelineId, cronExpression);
pipeline.schedule = {
cronExpression,
enabled: true,
lastRun: null,
nextRun: this.scheduler.getNextRunTime(cronExpression),
};
await this.eventBus.publish('data.pipeline.scheduled', {
pipelineId,
cronExpression,
});
logger.info(`📅 Scheduled pipeline: ${pipeline.name} with cron: ${cronExpression}`);
}
// Pipeline CRUD operations
getPipeline(pipelineId: string): DataPipeline | undefined {
return this.pipelines.get(pipelineId);
}
listPipelines(): DataPipeline[] {
return Array.from(this.pipelines.values());
}
getJob(jobId: string): PipelineJob | undefined {
return this.runningJobs.get(jobId);
}
listJobs(pipelineId?: string): PipelineJob[] {
const jobs = Array.from(this.runningJobs.values());
return pipelineId ? jobs.filter(job => job.pipelineId === pipelineId) : jobs;
}
private async handlePipelineEvent(event: any): Promise<void> {
logger.debug('📨 Received pipeline event:', event);
// Handle pipeline-level events
}
private async handleJobEvent(event: any): Promise<void> {
logger.debug('📨 Received job event:', event);
// Handle job-level events
}
private async loadPipelines(): Promise<void> {
// In a real implementation, load pipelines from persistent storage
logger.info('📂 Loading existing pipelines...');
}
private generatePipelineId(): string {
return `pipeline_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
private generateJobId(): string {
return `job_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
async shutdown(): Promise<void> {
logger.info('🔄 Shutting down Data Pipeline Orchestrator...');
await this.scheduler.shutdown();
await this.jobQueue.shutdown();
await this.eventBus.disconnect();
logger.info('✅ Data Pipeline Orchestrator shutdown complete');
}
}

View file

@ -0,0 +1,77 @@
import Queue from 'bull';
import { logger } from '@stock-bot/utils';
import { PipelineJob } from '../types/DataPipeline';
import { DataPipelineOrchestrator } from './DataPipelineOrchestrator';
export class JobQueue {
private queue: Queue.Queue;
constructor(private orchestrator: DataPipelineOrchestrator) {
this.queue = new Queue('data-pipeline-jobs', {
redis: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
},
});
}
async initialize(): Promise<void> {
logger.info('🔄 Initializing Job Queue...');
// Process jobs with a maximum of 5 concurrent jobs
this.queue.process('pipeline-job', 5, async (job) => {
const pipelineJob: PipelineJob = job.data;
await this.orchestrator.executePipelineJob(pipelineJob);
});
// Handle job events
this.queue.on('completed', (job) => {
logger.info(`✅ Job completed: ${job.id}`);
});
this.queue.on('failed', (job, error) => {
logger.error(`❌ Job failed: ${job.id}`, error);
});
this.queue.on('stalled', (job) => {
logger.warn(`⚠️ Job stalled: ${job.id}`);
});
logger.info('✅ Job Queue initialized');
}
async enqueueJob(job: PipelineJob): Promise<void> {
await this.queue.add('pipeline-job', job, {
jobId: job.id,
removeOnComplete: 100, // Keep last 100 completed jobs
removeOnFail: 50, // Keep last 50 failed jobs
attempts: 3, // Retry failed jobs up to 3 times
backoff: {
type: 'exponential',
delay: 2000,
},
});
logger.info(`📤 Enqueued job: ${job.id}`);
}
async getJobStats(): Promise<any> {
const waiting = await this.queue.getWaiting();
const active = await this.queue.getActive();
const completed = await this.queue.getCompleted();
const failed = await this.queue.getFailed();
return {
waiting: waiting.length,
active: active.length,
completed: completed.length,
failed: failed.length,
};
}
async shutdown(): Promise<void> {
logger.info('🔄 Shutting down Job Queue...');
await this.queue.close();
logger.info('✅ Job Queue shutdown complete');
}
}

View file

@ -0,0 +1,69 @@
import { CronJob } from 'cron';
import { logger } from '@stock-bot/utils';
import { DataPipelineOrchestrator } from './DataPipelineOrchestrator';
export class PipelineScheduler {
private scheduledJobs: Map<string, CronJob> = new Map();
constructor(private orchestrator: DataPipelineOrchestrator) {}
async initialize(): Promise<void> {
logger.info('🔄 Initializing Pipeline Scheduler...');
logger.info('✅ Pipeline Scheduler initialized');
}
async schedulePipeline(pipelineId: string, cronExpression: string): Promise<void> {
// Cancel existing schedule if it exists
if (this.scheduledJobs.has(pipelineId)) {
this.cancelSchedule(pipelineId);
}
const cronJob = new CronJob(
cronExpression,
async () => {
try {
logger.info(`⏰ Scheduled execution triggered for pipeline: ${pipelineId}`);
await this.orchestrator.runPipeline(pipelineId);
} catch (error) {
logger.error(`❌ Scheduled pipeline execution failed: ${pipelineId}`, error);
}
},
null,
true, // Start immediately
'UTC'
);
this.scheduledJobs.set(pipelineId, cronJob);
logger.info(`📅 Scheduled pipeline ${pipelineId} with cron: ${cronExpression}`);
}
cancelSchedule(pipelineId: string): void {
const job = this.scheduledJobs.get(pipelineId);
if (job) {
job.stop();
this.scheduledJobs.delete(pipelineId);
logger.info(`🚫 Cancelled schedule for pipeline: ${pipelineId}`);
}
}
getNextRunTime(cronExpression: string): Date {
const job = new CronJob(cronExpression);
return job.nextDate().toDate();
}
getScheduledPipelines(): string[] {
return Array.from(this.scheduledJobs.keys());
}
async shutdown(): Promise<void> {
logger.info('🔄 Shutting down Pipeline Scheduler...');
for (const [pipelineId, job] of this.scheduledJobs) {
job.stop();
logger.info(`🚫 Stopped scheduled job for pipeline: ${pipelineId}`);
}
this.scheduledJobs.clear();
logger.info('✅ Pipeline Scheduler shutdown complete');
}
}

View file

@ -0,0 +1,107 @@
import { Hono } from 'hono';
import { serve } from 'bun';
import { logger } from '@stock-bot/utils';
import { DataPipelineOrchestrator } from './core/DataPipelineOrchestrator';
import { DataQualityService } from './services/DataQualityService';
import { DataIngestionService } from './services/DataIngestionService';
import { DataTransformationService } from './services/DataTransformationService';
import { DataValidationService } from './services/DataValidationService';
import { HealthController } from './controllers/HealthController';
import { PipelineController } from './controllers/PipelineController';
import { JobController } from './controllers/JobController';
const app = new Hono();
// Services
const dataQualityService = new DataQualityService();
const dataIngestionService = new DataIngestionService();
const dataTransformationService = new DataTransformationService();
const dataValidationService = new DataValidationService();
// Core orchestrator
const pipelineOrchestrator = new DataPipelineOrchestrator(
dataIngestionService,
dataTransformationService,
dataValidationService,
dataQualityService
);
// Controllers
const healthController = new HealthController();
const pipelineController = new PipelineController(pipelineOrchestrator);
const jobController = new JobController(pipelineOrchestrator);
// Health endpoints
app.get('/health', healthController.getHealth.bind(healthController));
app.get('/health/detailed', healthController.getDetailedHealth.bind(healthController));
// Pipeline management
app.get('/api/pipelines', pipelineController.listPipelines.bind(pipelineController));
app.post('/api/pipelines', pipelineController.createPipeline.bind(pipelineController));
app.get('/api/pipelines/:id', pipelineController.getPipeline.bind(pipelineController));
app.put('/api/pipelines/:id', pipelineController.updatePipeline.bind(pipelineController));
app.delete('/api/pipelines/:id', pipelineController.deletePipeline.bind(pipelineController));
app.post('/api/pipelines/:id/run', pipelineController.runPipeline.bind(pipelineController));
app.post('/api/pipelines/:id/schedule', pipelineController.schedulePipeline.bind(pipelineController));
app.post('/api/pipelines/:id/pause', pipelineController.pausePipeline.bind(pipelineController));
app.post('/api/pipelines/:id/resume', pipelineController.resumePipeline.bind(pipelineController));
app.get('/api/pipelines/:id/metrics', pipelineController.getPipelineMetrics.bind(pipelineController));
// Job management
app.get('/api/jobs', jobController.listJobs.bind(jobController));
app.get('/api/jobs/stats', jobController.getJobStats.bind(jobController));
app.get('/api/jobs/:id', jobController.getJob.bind(jobController));
app.get('/api/jobs/:id/logs', jobController.getJobLogs.bind(jobController));
app.get('/api/jobs/:id/metrics', jobController.getJobMetrics.bind(jobController));
app.post('/api/jobs/:id/cancel', jobController.cancelJob.bind(jobController));
app.post('/api/jobs/:id/retry', jobController.retryJob.bind(jobController));
// Data quality endpoints
app.get('/api/data-quality/metrics', async (c) => {
const metrics = await dataQualityService.getQualityMetrics();
return c.json({ success: true, data: metrics });
});
app.get('/api/data-quality/report/:dataset', async (c) => {
const dataset = c.req.param('dataset');
const report = await dataQualityService.generateReport(dataset);
return c.json({ success: true, data: report });
});
const PORT = parseInt(process.env.DATA_PROCESSOR_PORT || '5001');
// Initialize services
async function initializeServices() {
try {
logger.info('🔄 Initializing Data Processor services...');
await dataQualityService.initialize();
await dataIngestionService.initialize();
await dataTransformationService.initialize();
await dataValidationService.initialize();
await pipelineOrchestrator.initialize();
logger.info('✅ Data Processor services initialized successfully');
} catch (error) {
logger.error('❌ Failed to initialize Data Processor services:', error);
process.exit(1);
}
}
// Graceful shutdown
process.on('SIGINT', async () => {
logger.info('🔄 Gracefully shutting down Data Processor...');
await pipelineOrchestrator.shutdown();
process.exit(0);
});
initializeServices().then(() => {
serve({
port: PORT,
fetch: app.fetch,
});
logger.info(`🚀 Data Processor running on port ${PORT}`);
logger.info(`🔍 Health check: http://localhost:${PORT}/health`);
logger.info(`📊 API documentation: http://localhost:${PORT}/api`);
});

View file

@ -0,0 +1,200 @@
import { logger } from '@stock-bot/utils';
import { IngestionStep, ProcessingResult, DataSource } from '../types/DataPipeline';
import axios from 'axios';
import * as csv from 'csv-parser';
import * as fs from 'fs';
export class DataIngestionService {
private activeConnections: Map<string, any> = new Map();
async initialize(): Promise<void> {
logger.info('🔄 Initializing Data Ingestion Service...');
logger.info('✅ Data Ingestion Service initialized');
}
async ingestData(step: IngestionStep, parameters: Record<string, any>): Promise<ProcessingResult> {
const startTime = Date.now();
logger.info(`📥 Starting data ingestion from ${step.source.type}: ${step.source.connection.url || step.source.connection.host}`);
try {
switch (step.source.type) {
case 'api':
return await this.ingestFromApi(step.source, parameters);
case 'file':
return await this.ingestFromFile(step.source, parameters);
case 'database':
return await this.ingestFromDatabase(step.source, parameters);
case 'stream':
return await this.ingestFromStream(step.source, parameters);
default:
throw new Error(`Unsupported ingestion type: ${step.source.type}`);
}
} catch (error) {
const processingTime = Date.now() - startTime;
logger.error(`❌ Data ingestion failed after ${processingTime}ms:`, error);
return {
recordsProcessed: 0,
recordsSuccessful: 0,
recordsFailed: 0,
errors: [{
record: 0,
message: error instanceof Error ? error.message : 'Unknown error',
code: 'INGESTION_ERROR'
}],
metadata: { processingTimeMs: processingTime }
};
}
}
private async ingestFromApi(source: DataSource, parameters: Record<string, any>): Promise<ProcessingResult> {
const config = {
method: 'GET',
url: source.connection.url,
headers: source.connection.headers || {},
params: { ...source.connection.params, ...parameters },
};
if (source.connection.apiKey) {
config.headers['Authorization'] = `Bearer ${source.connection.apiKey}`;
}
const response = await axios(config);
const data = response.data;
// Process the data based on format
let records: any[] = [];
if (Array.isArray(data)) {
records = data;
} else if (data.data && Array.isArray(data.data)) {
records = data.data;
} else if (data.results && Array.isArray(data.results)) {
records = data.results;
} else {
records = [data];
}
logger.info(`📊 Ingested ${records.length} records from API: ${source.connection.url}`);
return {
recordsProcessed: records.length,
recordsSuccessful: records.length,
recordsFailed: 0,
errors: [],
metadata: {
source: 'api',
url: source.connection.url,
statusCode: response.status,
responseSize: JSON.stringify(data).length
}
};
}
private async ingestFromFile(source: DataSource, parameters: Record<string, any>): Promise<ProcessingResult> {
const filePath = source.connection.url || parameters.filePath;
if (!filePath) {
throw new Error('File path is required for file ingestion');
}
switch (source.format) {
case 'csv':
return await this.ingestCsvFile(filePath);
case 'json':
return await this.ingestJsonFile(filePath);
default:
throw new Error(`Unsupported file format: ${source.format}`);
}
}
private async ingestCsvFile(filePath: string): Promise<ProcessingResult> {
return new Promise((resolve, reject) => {
const records: any[] = [];
const errors: any[] = [];
let recordCount = 0;
fs.createReadStream(filePath)
.pipe(csv())
.on('data', (data) => {
recordCount++;
try {
records.push(data);
} catch (error) {
errors.push({
record: recordCount,
message: error instanceof Error ? error.message : 'Parse error',
code: 'CSV_PARSE_ERROR'
});
}
})
.on('end', () => {
logger.info(`📊 Ingested ${records.length} records from CSV: ${filePath}`);
resolve({
recordsProcessed: recordCount,
recordsSuccessful: records.length,
recordsFailed: errors.length,
errors,
metadata: {
source: 'file',
format: 'csv',
filePath
}
});
})
.on('error', reject);
});
}
private async ingestJsonFile(filePath: string): Promise<ProcessingResult> {
const fileContent = await fs.promises.readFile(filePath, 'utf8');
const data = JSON.parse(fileContent);
let records: any[] = [];
if (Array.isArray(data)) {
records = data;
} else {
records = [data];
}
logger.info(`📊 Ingested ${records.length} records from JSON: ${filePath}`);
return {
recordsProcessed: records.length,
recordsSuccessful: records.length,
recordsFailed: 0,
errors: [],
metadata: {
source: 'file',
format: 'json',
filePath,
fileSize: fileContent.length
}
};
}
private async ingestFromDatabase(source: DataSource, parameters: Record<string, any>): Promise<ProcessingResult> {
// Placeholder for database ingestion
// In a real implementation, this would connect to various databases
// (PostgreSQL, MySQL, MongoDB, etc.) and execute queries
throw new Error('Database ingestion not yet implemented');
}
private async ingestFromStream(source: DataSource, parameters: Record<string, any>): Promise<ProcessingResult> {
// Placeholder for stream ingestion
// In a real implementation, this would connect to streaming sources
// (Kafka, Kinesis, WebSocket, etc.)
throw new Error('Stream ingestion not yet implemented');
}
async getIngestionMetrics(): Promise<any> {
return {
activeConnections: this.activeConnections.size,
supportedSources: ['api', 'file', 'database', 'stream'],
supportedFormats: ['json', 'csv', 'xml', 'parquet', 'avro']
};
}
}

View file

@ -0,0 +1,373 @@
import { logger } from '@stock-bot/utils';
import { QualityCheckStep, ProcessingResult, QualityCheck, QualityThresholds } from '../types/DataPipeline';
export class DataQualityService {
private qualityMetrics: Map<string, any> = new Map();
private qualityReports: Map<string, any> = new Map();
async initialize(): Promise<void> {
logger.info('🔄 Initializing Data Quality Service...');
// Initialize quality metrics storage
this.qualityMetrics.clear();
this.qualityReports.clear();
logger.info('✅ Data Quality Service initialized');
}
async runQualityChecks(step: QualityCheckStep, parameters: Record<string, any>): Promise<ProcessingResult> {
const startTime = Date.now();
logger.info(`🔍 Running ${step.checks.length} quality checks`);
const inputData = parameters.inputData || [];
const results: any[] = [];
const errors: any[] = [];
let totalScore = 0;
try {
for (const check of step.checks) {
const checkResult = await this.executeQualityCheck(check, inputData);
results.push(checkResult);
totalScore += checkResult.score;
// Check if the quality score meets thresholds
if (checkResult.score < step.thresholds.error) {
errors.push({
record: 0,
field: check.field,
message: `Quality check failed: ${check.name} scored ${checkResult.score}%, below error threshold ${step.thresholds.error}%`,
code: 'QUALITY_CHECK_ERROR'
});
} else if (checkResult.score < step.thresholds.warning) {
logger.warn(`⚠️ Quality warning: ${check.name} scored ${checkResult.score}%, below warning threshold ${step.thresholds.warning}%`);
}
}
const averageScore = totalScore / step.checks.length;
const processingTime = Date.now() - startTime;
// Store quality metrics
this.storeQualityMetrics({
timestamp: new Date(),
averageScore,
checksRun: step.checks.length,
results,
processingTimeMs: processingTime
});
logger.info(`🔍 Quality checks completed: ${averageScore.toFixed(2)}% average score in ${processingTime}ms`);
return {
recordsProcessed: inputData.length,
recordsSuccessful: errors.length === 0 ? inputData.length : 0,
recordsFailed: errors.length > 0 ? inputData.length : 0,
errors,
metadata: {
qualityScore: averageScore,
checksRun: step.checks.length,
results,
processingTimeMs: processingTime
}
};
} catch (error) {
const processingTime = Date.now() - startTime;
logger.error(`❌ Quality checks failed after ${processingTime}ms:`, error);
return {
recordsProcessed: inputData.length,
recordsSuccessful: 0,
recordsFailed: inputData.length,
errors: [{
record: 0,
message: error instanceof Error ? error.message : 'Unknown quality check error',
code: 'QUALITY_SERVICE_ERROR'
}],
metadata: { processingTimeMs: processingTime }
};
}
}
private async executeQualityCheck(check: QualityCheck, data: any[]): Promise<any> {
switch (check.type) {
case 'completeness':
return this.checkCompleteness(check, data);
case 'accuracy':
return this.checkAccuracy(check, data);
case 'consistency':
return this.checkConsistency(check, data);
case 'validity':
return this.checkValidity(check, data);
case 'uniqueness':
return this.checkUniqueness(check, data);
default:
throw new Error(`Unsupported quality check type: ${check.type}`);
}
}
private checkCompleteness(check: QualityCheck, data: any[]): any {
if (!check.field) {
throw new Error('Completeness check requires a field');
}
const totalRecords = data.length;
const completeRecords = data.filter(record => {
const value = this.getFieldValue(record, check.field!);
return value !== null && value !== undefined && value !== '';
}).length;
const score = totalRecords > 0 ? (completeRecords / totalRecords) * 100 : 100;
return {
checkName: check.name,
type: 'completeness',
field: check.field,
score,
passed: score >= check.threshold,
details: {
totalRecords,
completeRecords,
missingRecords: totalRecords - completeRecords
}
};
}
private checkAccuracy(check: QualityCheck, data: any[]): any {
// Placeholder for accuracy checks
// In a real implementation, this would validate data against known references
// or business rules specific to stock market data
const score = 95; // Mock score
return {
checkName: check.name,
type: 'accuracy',
field: check.field,
score,
passed: score >= check.threshold,
details: {
validatedRecords: data.length,
accurateRecords: Math.floor(data.length * 0.95)
}
};
}
private checkConsistency(check: QualityCheck, data: any[]): any {
if (!check.field) {
throw new Error('Consistency check requires a field');
}
// Check for consistent data types and formats
const fieldValues = data.map(record => this.getFieldValue(record, check.field!));
const types = [...new Set(fieldValues.map(val => typeof val))];
// For stock symbols, check consistent format
if (check.field === 'symbol') {
const validSymbols = fieldValues.filter(symbol =>
typeof symbol === 'string' && /^[A-Z]{1,5}$/.test(symbol)
).length;
const score = fieldValues.length > 0 ? (validSymbols / fieldValues.length) * 100 : 100;
return {
checkName: check.name,
type: 'consistency',
field: check.field,
score,
passed: score >= check.threshold,
details: {
totalValues: fieldValues.length,
consistentValues: validSymbols,
inconsistentValues: fieldValues.length - validSymbols
}
};
}
// Generic consistency check
const score = types.length === 1 ? 100 : 0;
return {
checkName: check.name,
type: 'consistency',
field: check.field,
score,
passed: score >= check.threshold,
details: {
dataTypes: types,
isConsistent: types.length === 1
}
};
}
private checkValidity(check: QualityCheck, data: any[]): any {
if (!check.field) {
throw new Error('Validity check requires a field');
}
let validRecords = 0;
const totalRecords = data.length;
for (const record of data) {
const value = this.getFieldValue(record, check.field);
if (this.isValidValue(check.field, value)) {
validRecords++;
}
}
const score = totalRecords > 0 ? (validRecords / totalRecords) * 100 : 100;
return {
checkName: check.name,
type: 'validity',
field: check.field,
score,
passed: score >= check.threshold,
details: {
totalRecords,
validRecords,
invalidRecords: totalRecords - validRecords
}
};
}
private checkUniqueness(check: QualityCheck, data: any[]): any {
if (!check.field) {
throw new Error('Uniqueness check requires a field');
}
const fieldValues = data.map(record => this.getFieldValue(record, check.field!));
const uniqueValues = new Set(fieldValues);
const score = fieldValues.length > 0 ? (uniqueValues.size / fieldValues.length) * 100 : 100;
return {
checkName: check.name,
type: 'uniqueness',
field: check.field,
score,
passed: score >= check.threshold,
details: {
totalValues: fieldValues.length,
uniqueValues: uniqueValues.size,
duplicateValues: fieldValues.length - uniqueValues.size
}
};
}
private getFieldValue(record: any, fieldPath: string): any {
return fieldPath.split('.').reduce((obj, field) => obj?.[field], record);
}
private isValidValue(field: string, value: any): boolean {
switch (field) {
case 'symbol':
return typeof value === 'string' && /^[A-Z]{1,5}$/.test(value);
case 'price':
return typeof value === 'number' && value > 0 && value < 1000000;
case 'volume':
return typeof value === 'number' && value >= 0 && Number.isInteger(value);
case 'timestamp':
return value instanceof Date || !isNaN(new Date(value).getTime());
default:
return value !== null && value !== undefined;
}
}
private storeQualityMetrics(metrics: any): void {
const key = `metrics_${Date.now()}`;
this.qualityMetrics.set(key, metrics);
// Keep only last 100 metrics
if (this.qualityMetrics.size > 100) {
const oldestKey = this.qualityMetrics.keys().next().value;
this.qualityMetrics.delete(oldestKey);
}
}
async getQualityMetrics(dataset?: string): Promise<any> {
const allMetrics = Array.from(this.qualityMetrics.values());
if (allMetrics.length === 0) {
return {
totalChecks: 0,
averageScore: 0,
recentResults: []
};
}
const totalChecks = allMetrics.reduce((sum, m) => sum + m.checksRun, 0);
const averageScore = allMetrics.reduce((sum, m) => sum + m.averageScore, 0) / allMetrics.length;
const recentResults = allMetrics.slice(-10);
return {
totalChecks,
averageScore: Math.round(averageScore * 100) / 100,
recentResults,
summary: {
totalRuns: allMetrics.length,
averageProcessingTime: allMetrics.reduce((sum, m) => sum + m.processingTimeMs, 0) / allMetrics.length
}
};
}
async generateReport(dataset: string): Promise<any> {
const metrics = await this.getQualityMetrics(dataset);
const report = {
dataset,
generatedAt: new Date(),
summary: metrics,
recommendations: this.generateRecommendations(metrics),
trends: this.analyzeTrends(metrics.recentResults)
};
this.qualityReports.set(dataset, report);
return report;
}
private generateRecommendations(metrics: any): string[] {
const recommendations: string[] = [];
if (metrics.averageScore < 80) {
recommendations.push('Overall data quality is below acceptable threshold. Review data ingestion processes.');
}
if (metrics.averageScore < 95 && metrics.averageScore >= 80) {
recommendations.push('Data quality is acceptable but could be improved. Consider implementing additional validation rules.');
}
if (metrics.totalChecks === 0) {
recommendations.push('No quality checks have been run. Implement quality monitoring for your data pipelines.');
}
return recommendations;
}
private analyzeTrends(recentResults: any[]): any {
if (recentResults.length < 2) {
return { trend: 'insufficient_data', message: 'Not enough data to analyze trends' };
}
const scores = recentResults.map(r => r.averageScore);
const latestScore = scores[scores.length - 1];
const previousScore = scores[scores.length - 2];
if (latestScore > previousScore) {
return { trend: 'improving', message: 'Data quality is improving' };
} else if (latestScore < previousScore) {
return { trend: 'declining', message: 'Data quality is declining' };
} else {
return { trend: 'stable', message: 'Data quality is stable' };
}
}
async getAvailableReports(): Promise<string[]> {
return Array.from(this.qualityReports.keys());
}
async getReport(dataset: string): Promise<any | null> {
return this.qualityReports.get(dataset) || null;
}
}

View file

@ -0,0 +1,290 @@
import { logger } from '@stock-bot/utils';
import { TransformationStep, ProcessingResult } from '../types/DataPipeline';
export class DataTransformationService {
private transformationFunctions: Map<string, Function> = new Map();
async initialize(): Promise<void> {
logger.info('🔄 Initializing Data Transformation Service...');
// Register built-in transformation functions
this.registerBuiltInTransformations();
logger.info('✅ Data Transformation Service initialized');
}
async transformData(step: TransformationStep, parameters: Record<string, any>): Promise<ProcessingResult> {
const startTime = Date.now();
logger.info(`🔄 Starting data transformation: ${step.type}`);
try {
switch (step.type) {
case 'javascript':
return await this.executeJavaScriptTransformation(step, parameters);
case 'sql':
return await this.executeSqlTransformation(step, parameters);
case 'custom':
return await this.executeCustomTransformation(step, parameters);
default:
throw new Error(`Unsupported transformation type: ${step.type}`);
}
} catch (error) {
const processingTime = Date.now() - startTime;
logger.error(`❌ Data transformation failed after ${processingTime}ms:`, error);
return {
recordsProcessed: 0,
recordsSuccessful: 0,
recordsFailed: 0,
errors: [{
record: 0,
message: error instanceof Error ? error.message : 'Unknown error',
code: 'TRANSFORMATION_ERROR'
}],
metadata: { processingTimeMs: processingTime }
};
}
}
private async executeJavaScriptTransformation(step: TransformationStep, parameters: Record<string, any>): Promise<ProcessingResult> {
const { code, inputData } = step.configuration;
if (!code || !inputData) {
throw new Error('JavaScript transformation requires code and inputData configuration');
}
const transformedRecords: any[] = [];
const errors: any[] = [];
let recordCount = 0;
// Execute transformation for each record
for (const record of inputData) {
recordCount++;
try {
// Create a safe execution context
const context = {
record,
parameters,
utils: this.getTransformationUtils(),
};
// Execute the transformation code
const transformFunction = new Function('context', `
const { record, parameters, utils } = context;
${code}
`);
const result = transformFunction(context);
if (result !== undefined) {
transformedRecords.push(result);
} else {
transformedRecords.push(record); // Keep original if no transformation result
}
} catch (error) {
errors.push({
record: recordCount,
message: error instanceof Error ? error.message : 'Transformation error',
code: 'JS_TRANSFORM_ERROR'
});
}
}
logger.info(`🔄 Transformed ${transformedRecords.length} records using JavaScript`);
return {
recordsProcessed: recordCount,
recordsSuccessful: transformedRecords.length,
recordsFailed: errors.length,
errors,
metadata: {
transformationType: 'javascript',
outputData: transformedRecords
}
};
}
private async executeSqlTransformation(step: TransformationStep, parameters: Record<string, any>): Promise<ProcessingResult> {
// Placeholder for SQL transformation
// In a real implementation, this would execute SQL queries against a data warehouse
// or in-memory SQL engine like DuckDB
throw new Error('SQL transformation not yet implemented');
}
private async executeCustomTransformation(step: TransformationStep, parameters: Record<string, any>): Promise<ProcessingResult> {
const { functionName, inputData } = step.configuration;
if (!functionName) {
throw new Error('Custom transformation requires functionName configuration');
}
const transformFunction = this.transformationFunctions.get(functionName);
if (!transformFunction) {
throw new Error(`Custom transformation function not found: ${functionName}`);
}
const result = await transformFunction(inputData, parameters);
logger.info(`🔄 Executed custom transformation: ${functionName}`);
return result;
}
private registerBuiltInTransformations(): void {
// Market data normalization
this.transformationFunctions.set('normalizeMarketData', (data: any[], parameters: any) => {
const normalized = data.map(record => ({
symbol: record.symbol?.toUpperCase(),
price: parseFloat(record.price) || 0,
volume: parseInt(record.volume) || 0,
timestamp: new Date(record.timestamp || Date.now()),
source: parameters.source || 'unknown'
}));
return {
recordsProcessed: data.length,
recordsSuccessful: normalized.length,
recordsFailed: 0,
errors: [],
metadata: { outputData: normalized }
};
});
// Financial data aggregation
this.transformationFunctions.set('aggregateFinancialData', (data: any[], parameters: any) => {
const { groupBy = 'symbol', aggregations = ['avg', 'sum'] } = parameters;
const grouped = data.reduce((acc, record) => {
const key = record[groupBy];
if (!acc[key]) {
acc[key] = [];
}
acc[key].push(record);
return acc;
}, {});
const aggregated = Object.entries(grouped).map(([key, records]: [string, any[]]) => {
const result: any = { [groupBy]: key };
if (aggregations.includes('avg')) {
result.avgPrice = records.reduce((sum, r) => sum + (r.price || 0), 0) / records.length;
}
if (aggregations.includes('sum')) {
result.totalVolume = records.reduce((sum, r) => sum + (r.volume || 0), 0);
}
if (aggregations.includes('count')) {
result.count = records.length;
}
return result;
});
return {
recordsProcessed: data.length,
recordsSuccessful: aggregated.length,
recordsFailed: 0,
errors: [],
metadata: { outputData: aggregated }
};
});
// Data cleaning
this.transformationFunctions.set('cleanData', (data: any[], parameters: any) => {
const { removeNulls = true, trimStrings = true, validateNumbers = true } = parameters;
const cleaned: any[] = [];
const errors: any[] = [];
data.forEach((record, index) => {
try {
let cleanRecord = { ...record };
if (removeNulls) {
Object.keys(cleanRecord).forEach(key => {
if (cleanRecord[key] === null || cleanRecord[key] === undefined) {
delete cleanRecord[key];
}
});
}
if (trimStrings) {
Object.keys(cleanRecord).forEach(key => {
if (typeof cleanRecord[key] === 'string') {
cleanRecord[key] = cleanRecord[key].trim();
}
});
}
if (validateNumbers) {
Object.keys(cleanRecord).forEach(key => {
if (typeof cleanRecord[key] === 'string' && !isNaN(Number(cleanRecord[key]))) {
cleanRecord[key] = Number(cleanRecord[key]);
}
});
}
cleaned.push(cleanRecord);
} catch (error) {
errors.push({
record: index + 1,
message: error instanceof Error ? error.message : 'Cleaning error',
code: 'DATA_CLEANING_ERROR'
});
}
});
return {
recordsProcessed: data.length,
recordsSuccessful: cleaned.length,
recordsFailed: errors.length,
errors,
metadata: { outputData: cleaned }
};
});
}
private getTransformationUtils() {
return {
// Date utilities
formatDate: (date: Date | string, format: string = 'ISO') => {
const d = new Date(date);
switch (format) {
case 'ISO':
return d.toISOString();
case 'YYYY-MM-DD':
return d.toISOString().split('T')[0];
default:
return d.toString();
}
},
// Number utilities
round: (num: number, decimals: number = 2) => {
return Math.round(num * Math.pow(10, decimals)) / Math.pow(10, decimals);
},
// String utilities
slugify: (str: string) => {
return str.toLowerCase().replace(/[^a-z0-9]/g, '-').replace(/-+/g, '-');
},
// Market data utilities
calculatePercentageChange: (current: number, previous: number) => {
if (previous === 0) return 0;
return ((current - previous) / previous) * 100;
}
};
}
registerCustomTransformation(name: string, func: Function): void {
this.transformationFunctions.set(name, func);
logger.info(`✅ Registered custom transformation: ${name}`);
}
getAvailableTransformations(): string[] {
return Array.from(this.transformationFunctions.keys());
}
}

View file

@ -0,0 +1,303 @@
import { logger } from '@stock-bot/utils';
import { ValidationStep, ProcessingResult, ValidationRule } from '../types/DataPipeline';
import Joi from 'joi';
export class DataValidationService {
private validators: Map<string, Function> = new Map();
async initialize(): Promise<void> {
logger.info('🔄 Initializing Data Validation Service...');
// Register built-in validators
this.registerBuiltInValidators();
logger.info('✅ Data Validation Service initialized');
}
async validateData(step: ValidationStep, parameters: Record<string, any>): Promise<ProcessingResult> {
const startTime = Date.now();
logger.info(`✅ Starting data validation with ${step.rules.length} rules`);
const inputData = parameters.inputData || [];
const validRecords: any[] = [];
const errors: any[] = [];
let recordCount = 0;
try {
for (const record of inputData) {
recordCount++;
const recordErrors: any[] = [];
// Apply all validation rules to this record
for (const rule of step.rules) {
try {
const isValid = await this.applyValidationRule(record, rule);
if (!isValid) {
recordErrors.push({
record: recordCount,
field: rule.field,
message: rule.message || `Validation failed for rule: ${rule.type}`,
code: `VALIDATION_${rule.type.toUpperCase()}_FAILED`
});
}
} catch (error) {
recordErrors.push({
record: recordCount,
field: rule.field,
message: error instanceof Error ? error.message : 'Validation error',
code: 'VALIDATION_ERROR'
});
}
}
if (recordErrors.length === 0) {
validRecords.push(record);
} else {
errors.push(...recordErrors);
// Handle validation failure based on strategy
if (step.onFailure === 'stop') {
break;
}
}
}
const processingTime = Date.now() - startTime;
logger.info(`✅ Validation completed: ${validRecords.length}/${recordCount} records valid in ${processingTime}ms`);
return {
recordsProcessed: recordCount,
recordsSuccessful: validRecords.length,
recordsFailed: recordCount - validRecords.length,
errors,
metadata: {
validationRules: step.rules.length,
onFailure: step.onFailure,
processingTimeMs: processingTime,
outputData: validRecords
}
};
} catch (error) {
const processingTime = Date.now() - startTime;
logger.error(`❌ Data validation failed after ${processingTime}ms:`, error);
return {
recordsProcessed: recordCount,
recordsSuccessful: 0,
recordsFailed: recordCount,
errors: [{
record: 0,
message: error instanceof Error ? error.message : 'Unknown validation error',
code: 'VALIDATION_SERVICE_ERROR'
}],
metadata: { processingTimeMs: processingTime }
};
}
}
private async applyValidationRule(record: any, rule: ValidationRule): Promise<boolean> {
const fieldValue = this.getFieldValue(record, rule.field);
switch (rule.type) {
case 'required':
return this.validateRequired(fieldValue);
case 'type':
return this.validateType(fieldValue, rule.value);
case 'range':
return this.validateRange(fieldValue, rule.value);
case 'pattern':
return this.validatePattern(fieldValue, rule.value);
case 'custom':
return await this.validateCustom(record, rule);
default:
throw new Error(`Unknown validation rule type: ${rule.type}`);
}
}
private getFieldValue(record: any, fieldPath: string): any {
return fieldPath.split('.').reduce((obj, key) => obj?.[key], record);
}
private validateRequired(value: any): boolean {
return value !== null && value !== undefined && value !== '';
}
private validateType(value: any, expectedType: string): boolean {
if (value === null || value === undefined) {
return false;
}
switch (expectedType) {
case 'string':
return typeof value === 'string';
case 'number':
return typeof value === 'number' && !isNaN(value);
case 'boolean':
return typeof value === 'boolean';
case 'date':
return value instanceof Date || !isNaN(Date.parse(value));
case 'array':
return Array.isArray(value);
case 'object':
return typeof value === 'object' && !Array.isArray(value);
default:
return false;
}
}
private validateRange(value: any, range: { min?: number; max?: number }): boolean {
if (typeof value !== 'number') {
return false;
}
if (range.min !== undefined && value < range.min) {
return false;
}
if (range.max !== undefined && value > range.max) {
return false;
}
return true;
}
private validatePattern(value: any, pattern: string): boolean {
if (typeof value !== 'string') {
return false;
}
const regex = new RegExp(pattern);
return regex.test(value);
}
private async validateCustom(record: any, rule: ValidationRule): Promise<boolean> {
const validatorName = rule.value as string;
const validator = this.validators.get(validatorName);
if (!validator) {
throw new Error(`Custom validator not found: ${validatorName}`);
}
return await validator(record, rule.field);
}
private registerBuiltInValidators(): void {
// Stock symbol validator
this.validators.set('stockSymbol', (record: any, field: string) => {
const symbol = this.getFieldValue(record, field);
if (typeof symbol !== 'string') return false;
// Basic stock symbol validation: 1-5 uppercase letters
return /^[A-Z]{1,5}$/.test(symbol);
});
// Price validator
this.validators.set('stockPrice', (record: any, field: string) => {
const price = this.getFieldValue(record, field);
// Must be a positive number
return typeof price === 'number' && price > 0 && price < 1000000;
});
// Volume validator
this.validators.set('stockVolume', (record: any, field: string) => {
const volume = this.getFieldValue(record, field);
// Must be a non-negative integer
return Number.isInteger(volume) && volume >= 0;
});
// Market data timestamp validator
this.validators.set('marketTimestamp', (record: any, field: string) => {
const timestamp = this.getFieldValue(record, field);
if (!timestamp) return false;
const date = new Date(timestamp);
if (isNaN(date.getTime())) return false;
// Check if timestamp is within reasonable bounds (not too old or in future)
const now = new Date();
const oneYearAgo = new Date(now.getTime() - 365 * 24 * 60 * 60 * 1000);
const oneHourInFuture = new Date(now.getTime() + 60 * 60 * 1000);
return date >= oneYearAgo && date <= oneHourInFuture;
});
// Email validator
this.validators.set('email', (record: any, field: string) => {
const email = this.getFieldValue(record, field);
if (typeof email !== 'string') return false;
const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/;
return emailRegex.test(email);
});
// JSON schema validator
this.validators.set('jsonSchema', (record: any, field: string, schema?: any) => {
if (!schema) return false;
try {
const joiSchema = Joi.object(schema);
const { error } = joiSchema.validate(record);
return !error;
} catch {
return false;
}
});
// Data completeness validator
this.validators.set('completeness', (record: any, field: string) => {
const requiredFields = ['symbol', 'price', 'timestamp'];
return requiredFields.every(f => {
const value = this.getFieldValue(record, f);
return value !== null && value !== undefined && value !== '';
});
});
}
registerCustomValidator(name: string, validator: Function): void {
this.validators.set(name, validator);
logger.info(`✅ Registered custom validator: ${name}`);
}
getAvailableValidators(): string[] {
return Array.from(this.validators.keys());
}
async validateSchema(data: any[], schema: any): Promise<ProcessingResult> {
const joiSchema = Joi.array().items(Joi.object(schema));
const { error, value } = joiSchema.validate(data);
if (error) {
return {
recordsProcessed: data.length,
recordsSuccessful: 0,
recordsFailed: data.length,
errors: [{
record: 0,
message: error.message,
code: 'SCHEMA_VALIDATION_FAILED'
}],
metadata: { schemaValidation: true }
};
}
return {
recordsProcessed: data.length,
recordsSuccessful: data.length,
recordsFailed: 0,
errors: [],
metadata: {
schemaValidation: true,
outputData: value
}
};
}
}

View file

@ -0,0 +1,178 @@
// Data Pipeline Types
export interface DataPipeline {
id: string;
name: string;
description?: string;
status: PipelineStatus;
steps: PipelineSteps;
schedule?: PipelineSchedule;
metadata: Record<string, any>;
createdAt: Date;
updatedAt: Date;
}
export enum PipelineStatus {
DRAFT = 'draft',
ACTIVE = 'active',
PAUSED = 'paused',
DISABLED = 'disabled',
}
export interface PipelineSteps {
ingestion?: IngestionStep;
transformation?: TransformationStep;
validation?: ValidationStep;
qualityChecks?: QualityCheckStep;
}
export interface IngestionStep {
type: 'api' | 'file' | 'database' | 'stream';
source: DataSource;
configuration: Record<string, any>;
retryPolicy?: RetryPolicy;
}
export interface TransformationStep {
type: 'sql' | 'javascript' | 'python' | 'custom';
configuration: Record<string, any>;
schema?: DataSchema;
}
export interface ValidationStep {
rules: ValidationRule[];
onFailure: 'stop' | 'continue' | 'alert';
}
export interface QualityCheckStep {
checks: QualityCheck[];
thresholds: QualityThresholds;
}
export interface PipelineSchedule {
cronExpression: string;
enabled: boolean;
lastRun: Date | null;
nextRun: Date | null;
}
// Job Types
export interface PipelineJob {
id: string;
pipelineId: string;
status: JobStatus;
parameters: Record<string, any>;
createdAt: Date;
startedAt: Date | null;
completedAt: Date | null;
error: string | null;
metrics: JobMetrics;
}
export enum JobStatus {
PENDING = 'pending',
RUNNING = 'running',
COMPLETED = 'completed',
FAILED = 'failed',
CANCELLED = 'cancelled',
}
export interface JobMetrics {
recordsProcessed: number;
recordsSuccessful: number;
recordsFailed: number;
processingTimeMs: number;
}
// Data Source Types
export interface DataSource {
type: 'api' | 'file' | 'database' | 'stream';
connection: ConnectionConfig;
format?: 'json' | 'csv' | 'xml' | 'parquet' | 'avro';
}
export interface ConnectionConfig {
url?: string;
host?: string;
port?: number;
database?: string;
username?: string;
password?: string;
apiKey?: string;
headers?: Record<string, string>;
params?: Record<string, any>;
}
// Schema Types
export interface DataSchema {
fields: SchemaField[];
constraints?: SchemaConstraint[];
}
export interface SchemaField {
name: string;
type: 'string' | 'number' | 'boolean' | 'date' | 'object' | 'array';
required: boolean;
nullable: boolean;
format?: string;
description?: string;
}
export interface SchemaConstraint {
type: 'unique' | 'reference' | 'range' | 'pattern';
field: string;
value: any;
}
// Validation Types
export interface ValidationRule {
field: string;
type: 'required' | 'type' | 'range' | 'pattern' | 'custom';
value: any;
message?: string;
}
// Quality Check Types
export interface QualityCheck {
name: string;
type: 'completeness' | 'accuracy' | 'consistency' | 'validity' | 'uniqueness';
field?: string;
condition: string;
threshold: number;
}
export interface QualityThresholds {
error: number; // 0-100 percentage
warning: number; // 0-100 percentage
}
// Processing Result Types
export interface ProcessingResult {
recordsProcessed: number;
recordsSuccessful: number;
recordsFailed: number;
errors: ProcessingError[];
metadata: Record<string, any>;
}
export interface ProcessingError {
record: number;
field?: string;
message: string;
code?: string;
}
// Retry Policy Types
export interface RetryPolicy {
maxAttempts: number;
backoffStrategy: 'fixed' | 'exponential' | 'linear';
initialDelay: number;
maxDelay: number;
}