stock-bot/apps/data-services/data-processor/src/services/DataIngestionService.ts

200 lines
6.2 KiB
TypeScript

import { getLogger } from '@stock-bot/logger';
const logger = getLogger('DataIngestionService');
import { IngestionStep, ProcessingResult, DataSource } from '../types/DataPipeline';
import axios from 'axios';
import csv from 'csv-parser';
import * as fs from 'fs';
export class DataIngestionService {
private activeConnections: Map<string, any> = new Map();
async initialize(): Promise<void> {
logger.info('🔄 Initializing Data Ingestion Service...');
logger.info('✅ Data Ingestion Service initialized');
}
async ingestData(step: IngestionStep, parameters: Record<string, any>): Promise<ProcessingResult> {
const startTime = Date.now();
logger.info(`📥 Starting data ingestion from ${step.source.type}: ${step.source.connection.url || step.source.connection.host}`);
try {
switch (step.source.type) {
case 'api':
return await this.ingestFromApi(step.source, parameters);
case 'file':
return await this.ingestFromFile(step.source, parameters);
case 'database':
return await this.ingestFromDatabase(step.source, parameters);
case 'stream':
return await this.ingestFromStream(step.source, parameters);
default:
throw new Error(`Unsupported ingestion type: ${step.source.type}`);
}
} catch (error) {
const processingTime = Date.now() - startTime;
logger.error(`❌ Data ingestion failed after ${processingTime}ms:`, error);
return {
recordsProcessed: 0,
recordsSuccessful: 0,
recordsFailed: 0,
errors: [{
record: 0,
message: error instanceof Error ? error.message : 'Unknown error',
code: 'INGESTION_ERROR'
}],
metadata: { processingTimeMs: processingTime }
};
}
}
private async ingestFromApi(source: DataSource, parameters: Record<string, any>): Promise<ProcessingResult> {
const config = {
method: 'GET',
url: source.connection.url,
headers: source.connection.headers || {},
params: { ...source.connection.params, ...parameters },
};
if (source.connection.apiKey) {
config.headers['Authorization'] = `Bearer ${source.connection.apiKey}`;
}
const response = await axios(config);
const data = response.data;
// Process the data based on format
let records: any[] = [];
if (Array.isArray(data)) {
records = data;
} else if (data.data && Array.isArray(data.data)) {
records = data.data;
} else if (data.results && Array.isArray(data.results)) {
records = data.results;
} else {
records = [data];
}
logger.info(`📊 Ingested ${records.length} records from API: ${source.connection.url}`);
return {
recordsProcessed: records.length,
recordsSuccessful: records.length,
recordsFailed: 0,
errors: [],
metadata: {
source: 'api',
url: source.connection.url,
statusCode: response.status,
responseSize: JSON.stringify(data).length
}
};
}
private async ingestFromFile(source: DataSource, parameters: Record<string, any>): Promise<ProcessingResult> {
const filePath = source.connection.url || parameters.filePath;
if (!filePath) {
throw new Error('File path is required for file ingestion');
}
switch (source.format) {
case 'csv':
return await this.ingestCsvFile(filePath);
case 'json':
return await this.ingestJsonFile(filePath);
default:
throw new Error(`Unsupported file format: ${source.format}`);
}
}
private async ingestCsvFile(filePath: string): Promise<ProcessingResult> {
return new Promise((resolve, reject) => {
const records: any[] = [];
const errors: any[] = [];
let recordCount = 0; fs.createReadStream(filePath)
.pipe(csv())
.on('data', (data: any) => {
recordCount++;
try {
records.push(data);
} catch (error) {
errors.push({
record: recordCount,
message: error instanceof Error ? error.message : 'Parse error',
code: 'CSV_PARSE_ERROR'
});
}
})
.on('end', () => {
logger.info(`📊 Ingested ${records.length} records from CSV: ${filePath}`);
resolve({
recordsProcessed: recordCount,
recordsSuccessful: records.length,
recordsFailed: errors.length,
errors,
metadata: {
source: 'file',
format: 'csv',
filePath
}
});
})
.on('error', reject);
});
}
private async ingestJsonFile(filePath: string): Promise<ProcessingResult> {
const fileContent = await fs.promises.readFile(filePath, 'utf8');
const data = JSON.parse(fileContent);
let records: any[] = [];
if (Array.isArray(data)) {
records = data;
} else {
records = [data];
}
logger.info(`📊 Ingested ${records.length} records from JSON: ${filePath}`);
return {
recordsProcessed: records.length,
recordsSuccessful: records.length,
recordsFailed: 0,
errors: [],
metadata: {
source: 'file',
format: 'json',
filePath,
fileSize: fileContent.length
}
};
}
private async ingestFromDatabase(source: DataSource, parameters: Record<string, any>): Promise<ProcessingResult> {
// Placeholder for database ingestion
// In a real implementation, this would connect to various databases
// (PostgreSQL, MySQL, MongoDB, etc.) and execute queries
throw new Error('Database ingestion not yet implemented');
}
private async ingestFromStream(source: DataSource, parameters: Record<string, any>): Promise<ProcessingResult> {
// Placeholder for stream ingestion
// In a real implementation, this would connect to streaming sources
// (Kafka, Kinesis, WebSocket, etc.)
throw new Error('Stream ingestion not yet implemented');
}
async getIngestionMetrics(): Promise<any> {
return {
activeConnections: this.activeConnections.size,
supportedSources: ['api', 'file', 'database', 'stream'],
supportedFormats: ['json', 'csv', 'xml', 'parquet', 'avro']
};
}
}