247 lines
5.5 KiB
TypeScript
247 lines
5.5 KiB
TypeScript
import type { Document } from 'mongodb';
|
|
import type { MongoDBClient } from './client';
|
|
import type { CollectionNames } from './types';
|
|
|
|
/**
|
|
* MongoDB Aggregation Builder
|
|
*
|
|
* Provides a fluent interface for building MongoDB aggregation pipelines
|
|
*/
|
|
export class MongoDBAggregationBuilder {
|
|
private pipeline: any[] = [];
|
|
private readonly client: MongoDBClient;
|
|
private collection: CollectionNames | null = null;
|
|
|
|
constructor(client: MongoDBClient) {
|
|
this.client = client;
|
|
}
|
|
|
|
/**
|
|
* Set the collection to aggregate on
|
|
*/
|
|
from(collection: CollectionNames): this {
|
|
this.collection = collection;
|
|
return this;
|
|
}
|
|
|
|
/**
|
|
* Add a match stage
|
|
*/
|
|
match(filter: any): this {
|
|
this.pipeline.push({ $match: filter });
|
|
return this;
|
|
}
|
|
|
|
/**
|
|
* Add a group stage
|
|
*/
|
|
group(groupBy: any): this {
|
|
this.pipeline.push({ $group: groupBy });
|
|
return this;
|
|
}
|
|
|
|
/**
|
|
* Add a sort stage
|
|
*/
|
|
sort(sortBy: any): this {
|
|
this.pipeline.push({ $sort: sortBy });
|
|
return this;
|
|
}
|
|
|
|
/**
|
|
* Add a limit stage
|
|
*/
|
|
limit(count: number): this {
|
|
this.pipeline.push({ $limit: count });
|
|
return this;
|
|
}
|
|
|
|
/**
|
|
* Add a skip stage
|
|
*/
|
|
skip(count: number): this {
|
|
this.pipeline.push({ $skip: count });
|
|
return this;
|
|
}
|
|
|
|
/**
|
|
* Add a project stage
|
|
*/
|
|
project(projection: any): this {
|
|
this.pipeline.push({ $project: projection });
|
|
return this;
|
|
}
|
|
|
|
/**
|
|
* Add an unwind stage
|
|
*/
|
|
unwind(field: string, options?: any): this {
|
|
this.pipeline.push({
|
|
$unwind: options ? { path: field, ...options } : field
|
|
});
|
|
return this;
|
|
}
|
|
|
|
/**
|
|
* Add a lookup stage (join)
|
|
*/
|
|
lookup(from: string, localField: string, foreignField: string, as: string): this {
|
|
this.pipeline.push({
|
|
$lookup: {
|
|
from,
|
|
localField,
|
|
foreignField,
|
|
as
|
|
}
|
|
});
|
|
return this;
|
|
}
|
|
|
|
/**
|
|
* Add a custom stage
|
|
*/
|
|
addStage(stage: any): this {
|
|
this.pipeline.push(stage);
|
|
return this;
|
|
}
|
|
/**
|
|
* Execute the aggregation pipeline
|
|
*/
|
|
async execute<T extends Document = Document>(): Promise<T[]> {
|
|
if (!this.collection) {
|
|
throw new Error('Collection not specified. Use .from() to set the collection.');
|
|
}
|
|
|
|
const collection = this.client.getCollection(this.collection);
|
|
return await collection.aggregate<T>(this.pipeline).toArray();
|
|
}
|
|
|
|
/**
|
|
* Get the pipeline array
|
|
*/
|
|
getPipeline(): any[] {
|
|
return [...this.pipeline];
|
|
}
|
|
|
|
/**
|
|
* Reset the pipeline
|
|
*/
|
|
reset(): this {
|
|
this.pipeline = [];
|
|
this.collection = null;
|
|
return this;
|
|
}
|
|
|
|
// Convenience methods for common aggregations
|
|
|
|
/**
|
|
* Sentiment analysis aggregation
|
|
*/
|
|
sentimentAnalysis(symbol?: string, timeframe?: { start: Date; end: Date }): this {
|
|
this.from('sentiment_data');
|
|
|
|
const matchConditions: any = {};
|
|
if (symbol) matchConditions.symbol = symbol;
|
|
if (timeframe) {
|
|
matchConditions.timestamp = {
|
|
$gte: timeframe.start,
|
|
$lte: timeframe.end
|
|
};
|
|
}
|
|
|
|
if (Object.keys(matchConditions).length > 0) {
|
|
this.match(matchConditions);
|
|
}
|
|
|
|
return this.group({
|
|
_id: {
|
|
symbol: '$symbol',
|
|
sentiment: '$sentiment_label'
|
|
},
|
|
count: { $sum: 1 },
|
|
avgScore: { $avg: '$sentiment_score' },
|
|
avgConfidence: { $avg: '$confidence' }
|
|
});
|
|
}
|
|
|
|
/**
|
|
* News article aggregation by publication
|
|
*/
|
|
newsByPublication(symbols?: string[]): this {
|
|
this.from('news_articles');
|
|
|
|
if (symbols && symbols.length > 0) {
|
|
this.match({ symbols: { $in: symbols } });
|
|
}
|
|
|
|
return this.group({
|
|
_id: '$publication',
|
|
articleCount: { $sum: 1 },
|
|
symbols: { $addToSet: '$symbols' },
|
|
avgSentiment: { $avg: '$sentiment_score' },
|
|
latestArticle: { $max: '$published_date' }
|
|
});
|
|
}
|
|
|
|
/**
|
|
* SEC filings by company
|
|
*/
|
|
secFilingsByCompany(filingTypes?: string[]): this {
|
|
this.from('sec_filings');
|
|
|
|
if (filingTypes && filingTypes.length > 0) {
|
|
this.match({ filing_type: { $in: filingTypes } });
|
|
}
|
|
|
|
return this.group({
|
|
_id: {
|
|
cik: '$cik',
|
|
company: '$company_name'
|
|
},
|
|
filingCount: { $sum: 1 },
|
|
filingTypes: { $addToSet: '$filing_type' },
|
|
latestFiling: { $max: '$filing_date' },
|
|
symbols: { $addToSet: '$symbols' }
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Document processing status summary
|
|
*/
|
|
processingStatusSummary(collection: CollectionNames): this {
|
|
this.from(collection);
|
|
|
|
return this.group({
|
|
_id: '$processing_status',
|
|
count: { $sum: 1 },
|
|
avgSizeBytes: { $avg: '$size_bytes' },
|
|
oldestDocument: { $min: '$created_at' },
|
|
newestDocument: { $max: '$created_at' }
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Time-based aggregation (daily/hourly counts)
|
|
*/
|
|
timeBasedCounts(
|
|
collection: CollectionNames,
|
|
dateField: string = 'created_at',
|
|
interval: 'hour' | 'day' | 'week' | 'month' = 'day'
|
|
): this {
|
|
this.from(collection);
|
|
|
|
const dateFormat = {
|
|
hour: { $dateToString: { format: '%Y-%m-%d %H:00:00', date: `$${dateField}` } },
|
|
day: { $dateToString: { format: '%Y-%m-%d', date: `$${dateField}` } },
|
|
week: { $dateToString: { format: '%Y-W%V', date: `$${dateField}` } },
|
|
month: { $dateToString: { format: '%Y-%m', date: `$${dateField}` } }
|
|
};
|
|
|
|
return this.group({
|
|
_id: dateFormat[interval],
|
|
count: { $sum: 1 },
|
|
firstDocument: { $min: `$${dateField}` },
|
|
lastDocument: { $max: `$${dateField}` }
|
|
}).sort({ _id: 1 });
|
|
}
|
|
}
|