added initial integration tests with bun
This commit is contained in:
parent
3e451558ac
commit
fb22815450
52 changed files with 7588 additions and 364 deletions
247
libs/mongodb-client/src/aggregation.ts
Normal file
247
libs/mongodb-client/src/aggregation.ts
Normal file
|
|
@ -0,0 +1,247 @@
|
|||
import type { MongoDBClient } from './client';
|
||||
import type { CollectionNames } from './types';
|
||||
|
||||
/**
|
||||
* MongoDB Aggregation Builder
|
||||
*
|
||||
* Provides a fluent interface for building MongoDB aggregation pipelines
|
||||
*/
|
||||
export class MongoDBAggregationBuilder {
|
||||
private pipeline: any[] = [];
|
||||
private readonly client: MongoDBClient;
|
||||
private collection: CollectionNames | null = null;
|
||||
|
||||
constructor(client: MongoDBClient) {
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the collection to aggregate on
|
||||
*/
|
||||
from(collection: CollectionNames): this {
|
||||
this.collection = collection;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a match stage
|
||||
*/
|
||||
match(filter: any): this {
|
||||
this.pipeline.push({ $match: filter });
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a group stage
|
||||
*/
|
||||
group(groupBy: any): this {
|
||||
this.pipeline.push({ $group: groupBy });
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a sort stage
|
||||
*/
|
||||
sort(sortBy: any): this {
|
||||
this.pipeline.push({ $sort: sortBy });
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a limit stage
|
||||
*/
|
||||
limit(count: number): this {
|
||||
this.pipeline.push({ $limit: count });
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a skip stage
|
||||
*/
|
||||
skip(count: number): this {
|
||||
this.pipeline.push({ $skip: count });
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a project stage
|
||||
*/
|
||||
project(projection: any): this {
|
||||
this.pipeline.push({ $project: projection });
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add an unwind stage
|
||||
*/
|
||||
unwind(field: string, options?: any): this {
|
||||
this.pipeline.push({
|
||||
$unwind: options ? { path: field, ...options } : field
|
||||
});
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a lookup stage (join)
|
||||
*/
|
||||
lookup(from: string, localField: string, foreignField: string, as: string): this {
|
||||
this.pipeline.push({
|
||||
$lookup: {
|
||||
from,
|
||||
localField,
|
||||
foreignField,
|
||||
as
|
||||
}
|
||||
});
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a custom stage
|
||||
*/
|
||||
addStage(stage: any): this {
|
||||
this.pipeline.push(stage);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute the aggregation pipeline
|
||||
*/
|
||||
async execute<T = any>(): Promise<T[]> {
|
||||
if (!this.collection) {
|
||||
throw new Error('Collection not specified. Use .from() to set the collection.');
|
||||
}
|
||||
|
||||
const collection = this.client.getCollection(this.collection);
|
||||
return await collection.aggregate<T>(this.pipeline).toArray();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the pipeline array
|
||||
*/
|
||||
getPipeline(): any[] {
|
||||
return [...this.pipeline];
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset the pipeline
|
||||
*/
|
||||
reset(): this {
|
||||
this.pipeline = [];
|
||||
this.collection = null;
|
||||
return this;
|
||||
}
|
||||
|
||||
// Convenience methods for common aggregations
|
||||
|
||||
/**
|
||||
* Sentiment analysis aggregation
|
||||
*/
|
||||
sentimentAnalysis(symbol?: string, timeframe?: { start: Date; end: Date }): this {
|
||||
this.from('sentiment_data');
|
||||
|
||||
const matchConditions: any = {};
|
||||
if (symbol) matchConditions.symbol = symbol;
|
||||
if (timeframe) {
|
||||
matchConditions.timestamp = {
|
||||
$gte: timeframe.start,
|
||||
$lte: timeframe.end
|
||||
};
|
||||
}
|
||||
|
||||
if (Object.keys(matchConditions).length > 0) {
|
||||
this.match(matchConditions);
|
||||
}
|
||||
|
||||
return this.group({
|
||||
_id: {
|
||||
symbol: '$symbol',
|
||||
sentiment: '$sentiment_label'
|
||||
},
|
||||
count: { $sum: 1 },
|
||||
avgScore: { $avg: '$sentiment_score' },
|
||||
avgConfidence: { $avg: '$confidence' }
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* News article aggregation by publication
|
||||
*/
|
||||
newsByPublication(symbols?: string[]): this {
|
||||
this.from('news_articles');
|
||||
|
||||
if (symbols && symbols.length > 0) {
|
||||
this.match({ symbols: { $in: symbols } });
|
||||
}
|
||||
|
||||
return this.group({
|
||||
_id: '$publication',
|
||||
articleCount: { $sum: 1 },
|
||||
symbols: { $addToSet: '$symbols' },
|
||||
avgSentiment: { $avg: '$sentiment_score' },
|
||||
latestArticle: { $max: '$published_date' }
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* SEC filings by company
|
||||
*/
|
||||
secFilingsByCompany(filingTypes?: string[]): this {
|
||||
this.from('sec_filings');
|
||||
|
||||
if (filingTypes && filingTypes.length > 0) {
|
||||
this.match({ filing_type: { $in: filingTypes } });
|
||||
}
|
||||
|
||||
return this.group({
|
||||
_id: {
|
||||
cik: '$cik',
|
||||
company: '$company_name'
|
||||
},
|
||||
filingCount: { $sum: 1 },
|
||||
filingTypes: { $addToSet: '$filing_type' },
|
||||
latestFiling: { $max: '$filing_date' },
|
||||
symbols: { $addToSet: '$symbols' }
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Document processing status summary
|
||||
*/
|
||||
processingStatusSummary(collection: CollectionNames): this {
|
||||
this.from(collection);
|
||||
|
||||
return this.group({
|
||||
_id: '$processing_status',
|
||||
count: { $sum: 1 },
|
||||
avgSizeBytes: { $avg: '$size_bytes' },
|
||||
oldestDocument: { $min: '$created_at' },
|
||||
newestDocument: { $max: '$created_at' }
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Time-based aggregation (daily/hourly counts)
|
||||
*/
|
||||
timeBasedCounts(
|
||||
collection: CollectionNames,
|
||||
dateField: string = 'created_at',
|
||||
interval: 'hour' | 'day' | 'week' | 'month' = 'day'
|
||||
): this {
|
||||
this.from(collection);
|
||||
|
||||
const dateFormat = {
|
||||
hour: { $dateToString: { format: '%Y-%m-%d %H:00:00', date: `$${dateField}` } },
|
||||
day: { $dateToString: { format: '%Y-%m-%d', date: `$${dateField}` } },
|
||||
week: { $dateToString: { format: '%Y-W%V', date: `$${dateField}` } },
|
||||
month: { $dateToString: { format: '%Y-%m', date: `$${dateField}` } }
|
||||
};
|
||||
|
||||
return this.group({
|
||||
_id: dateFormat[interval],
|
||||
count: { $sum: 1 },
|
||||
firstDocument: { $min: `$${dateField}` },
|
||||
lastDocument: { $max: `$${dateField}` }
|
||||
}).sort({ _id: 1 });
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue