work on new engine

This commit is contained in:
Boki 2025-07-04 11:24:27 -04:00
parent 44476da13f
commit a1e5a21847
126 changed files with 3425 additions and 6695 deletions

View file

@ -0,0 +1,282 @@
use crate::{ExecutionHandler, FillSimulator, Order, ExecutionResult, OrderStatus, Fill, OrderBookSnapshot, OrderType, Side, MarketMicrostructure};
use crate::analytics::{MarketImpactModel, ImpactModelType};
use chrono::Utc;
use parking_lot::Mutex;
use std::collections::HashMap;
// Simulated execution for backtest and paper trading
pub struct SimulatedExecution {
fill_simulator: Box<dyn FillSimulator>,
pending_orders: Mutex<HashMap<String, Order>>,
}
impl SimulatedExecution {
pub fn new(fill_simulator: Box<dyn FillSimulator>) -> Self {
Self {
fill_simulator,
pending_orders: Mutex::new(HashMap::new()),
}
}
pub fn check_pending_orders(&self, orderbook: &OrderBookSnapshot) -> Vec<ExecutionResult> {
let mut results = Vec::new();
let mut pending = self.pending_orders.lock();
pending.retain(|order_id, order| {
if let Some(fill) = self.fill_simulator.simulate_fill(order, orderbook) {
results.push(ExecutionResult {
order_id: order_id.clone(),
status: OrderStatus::Filled,
fills: vec![fill],
});
false // Remove from pending
} else {
true // Keep in pending
}
});
results
}
}
#[async_trait::async_trait]
impl ExecutionHandler for SimulatedExecution {
async fn execute_order(&mut self, order: Order) -> Result<ExecutionResult, String> {
// For market orders, execute immediately
// For limit orders, add to pending
match &order.order_type {
OrderType::Market => {
// In simulation, market orders always fill
// The orchestrator will provide the orderbook for realistic fills
Ok(ExecutionResult {
order_id: order.id.clone(),
status: OrderStatus::Pending,
fills: vec![],
})
}
OrderType::Limit { .. } => {
self.pending_orders.lock().insert(order.id.clone(), order.clone());
Ok(ExecutionResult {
order_id: order.id,
status: OrderStatus::Accepted,
fills: vec![],
})
}
_ => Err("Order type not yet implemented".to_string()),
}
}
fn get_fill_simulator(&self) -> Option<&dyn FillSimulator> {
Some(&*self.fill_simulator)
}
}
// Backtest fill simulator - uses historical data
pub struct BacktestFillSimulator {
slippage_model: SlippageModel,
impact_model: MarketImpactModel,
microstructure_cache: Mutex<HashMap<String, MarketMicrostructure>>,
}
impl BacktestFillSimulator {
pub fn new() -> Self {
Self {
slippage_model: SlippageModel::default(),
impact_model: MarketImpactModel::new(ImpactModelType::SquareRoot),
microstructure_cache: Mutex::new(HashMap::new()),
}
}
pub fn with_impact_model(mut self, model_type: ImpactModelType) -> Self {
self.impact_model = MarketImpactModel::new(model_type);
self
}
pub fn set_microstructure(&self, symbol: String, microstructure: MarketMicrostructure) {
self.microstructure_cache.lock().insert(symbol, microstructure);
}
}
impl FillSimulator for BacktestFillSimulator {
fn simulate_fill(&self, order: &Order, orderbook: &OrderBookSnapshot) -> Option<Fill> {
match &order.order_type {
OrderType::Market => {
// Get market microstructure if available
let microstructure_guard = self.microstructure_cache.lock();
let maybe_microstructure = microstructure_guard.get(&order.symbol);
// Calculate price with market impact
let (price, _impact) = if let Some(microstructure) = maybe_microstructure {
// Use sophisticated market impact model
let impact_estimate = self.impact_model.estimate_impact(
order.quantity,
order.side,
microstructure,
match order.side {
Side::Buy => &orderbook.asks,
Side::Sell => &orderbook.bids,
},
Utc::now(),
);
let base_price = match order.side {
Side::Buy => orderbook.asks.first()?.price,
Side::Sell => orderbook.bids.first()?.price,
};
let impact_price = match order.side {
Side::Buy => base_price * (1.0 + impact_estimate.total_impact / 10000.0),
Side::Sell => base_price * (1.0 - impact_estimate.total_impact / 10000.0),
};
(impact_price, impact_estimate.total_impact)
} else {
// Fallback to simple slippage model
match order.side {
Side::Buy => {
let base_price = orderbook.asks.first()?.price;
let slippage = self.slippage_model.calculate_slippage(order.quantity, &orderbook.asks);
(base_price + slippage, slippage * 10000.0 / base_price)
}
Side::Sell => {
let base_price = orderbook.bids.first()?.price;
let slippage = self.slippage_model.calculate_slippage(order.quantity, &orderbook.bids);
(base_price - slippage, slippage * 10000.0 / base_price)
}
}
};
// Calculate realistic commission
let commission_rate = 0.0005; // 5 bps for institutional
let min_commission = 1.0;
let commission = (order.quantity * price * commission_rate).max(min_commission);
Some(Fill {
timestamp: Utc::now(), // Will be overridden by backtest engine
price,
quantity: order.quantity,
commission,
})
}
OrderType::Limit { price: limit_price } => {
// Check if limit can be filled
match order.side {
Side::Buy => {
if orderbook.asks.first()?.price <= *limit_price {
Some(Fill {
timestamp: Utc::now(),
price: *limit_price,
quantity: order.quantity,
commission: order.quantity * limit_price * 0.001,
})
} else {
None
}
}
Side::Sell => {
if orderbook.bids.first()?.price >= *limit_price {
Some(Fill {
timestamp: Utc::now(),
price: *limit_price,
quantity: order.quantity,
commission: order.quantity * limit_price * 0.001,
})
} else {
None
}
}
}
}
_ => None,
}
}
}
// Paper trading fill simulator - uses real order book
pub struct PaperFillSimulator {
use_real_orderbook: bool,
add_latency_ms: u64,
}
impl PaperFillSimulator {
pub fn new() -> Self {
Self {
use_real_orderbook: true,
add_latency_ms: 100, // Simulate 100ms latency
}
}
}
impl FillSimulator for PaperFillSimulator {
fn simulate_fill(&self, order: &Order, orderbook: &OrderBookSnapshot) -> Option<Fill> {
// Similar to backtest but with more realistic modeling
// Consider actual order book depth
// Add realistic latency simulation
// Respect position size limits based on actual liquidity
// For now, similar implementation to backtest
BacktestFillSimulator::new().simulate_fill(order, orderbook)
}
}
// Real broker execution for live trading
pub struct BrokerExecution {
broker: String,
account_id: String,
// In real implementation, would have broker API client
}
impl BrokerExecution {
pub fn new(broker: String, account_id: String) -> Self {
Self {
broker,
account_id,
}
}
}
#[async_trait::async_trait]
impl ExecutionHandler for BrokerExecution {
async fn execute_order(&mut self, order: Order) -> Result<ExecutionResult, String> {
// In real implementation, would:
// 1. Connect to broker API
// 2. Submit order
// 3. Handle broker responses
// 4. Track order status
// Placeholder for now
Ok(ExecutionResult {
order_id: order.id,
status: OrderStatus::Pending,
fills: vec![],
})
}
fn get_fill_simulator(&self) -> Option<&dyn FillSimulator> {
None // Real broker doesn't simulate
}
}
// Slippage model for realistic fills
#[derive(Default)]
struct SlippageModel {
base_slippage_bps: f64,
impact_coefficient: f64,
}
impl SlippageModel {
fn calculate_slippage(&self, quantity: f64, levels: &[crate::PriceLevel]) -> f64 {
// Simple linear impact model
// In reality would use square-root or more sophisticated model
let total_liquidity: f64 = levels.iter().map(|l| l.size).sum();
let participation_rate = quantity / total_liquidity.max(1.0);
let spread = if levels.len() >= 2 {
(levels[1].price - levels[0].price).abs()
} else {
levels[0].price * 0.0001 // 1 bps if only one level
};
spread * participation_rate * self.impact_coefficient
}
}

View file

@ -0,0 +1,152 @@
use crate::{MarketDataSource, MarketUpdate};
use chrono::{DateTime, Utc};
use parking_lot::Mutex;
use std::collections::VecDeque;
use super::mock_data_generator::MockDataGenerator;
// Historical data source for backtesting
pub struct HistoricalDataSource {
data_queue: Mutex<VecDeque<MarketUpdate>>,
current_position: Mutex<usize>,
}
impl HistoricalDataSource {
pub fn new() -> Self {
Self {
data_queue: Mutex::new(VecDeque::new()),
current_position: Mutex::new(0),
}
}
// This would be called by the orchestrator to load data
pub fn load_data(&self, data: Vec<MarketUpdate>) {
eprintln!("HistoricalDataSource::load_data called with {} items", data.len());
// Log first few items
for (i, update) in data.iter().take(3).enumerate() {
eprintln!(" Item {}: symbol={}, time={}", i, update.symbol, update.timestamp);
}
let mut queue = self.data_queue.lock();
queue.clear();
queue.extend(data);
*self.current_position.lock() = 0;
eprintln!("Data loaded successfully. Queue size: {}", queue.len());
}
pub fn data_len(&self) -> usize {
self.data_queue.lock().len()
}
// Generate mock data for testing
pub fn generate_mock_data(
&self,
symbol: String,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
seed: Option<u64>
) {
let mut generator = MockDataGenerator::new(seed.unwrap_or(42));
let data = generator.generate_mixed_data(symbol, start_time, end_time);
self.load_data(data);
}
}
#[async_trait::async_trait]
impl MarketDataSource for HistoricalDataSource {
async fn get_next_update(&mut self) -> Option<MarketUpdate> {
let queue = self.data_queue.lock();
let mut position = self.current_position.lock();
if *position < queue.len() {
let update = queue[*position].clone();
*position += 1;
Some(update)
} else {
None
}
}
fn seek_to_time(&mut self, timestamp: DateTime<Utc>) -> Result<(), String> {
let queue = self.data_queue.lock();
let mut position = self.current_position.lock();
eprintln!("HistoricalDataSource::seek_to_time called");
eprintln!(" Target time: {}", timestamp);
eprintln!(" Queue size: {}", queue.len());
if queue.is_empty() {
eprintln!(" WARNING: Queue is empty!");
return Ok(());
}
eprintln!(" First item time: {}", queue.front().map(|u| u.timestamp.to_string()).unwrap_or("N/A".to_string()));
eprintln!(" Last item time: {}", queue.back().map(|u| u.timestamp.to_string()).unwrap_or("N/A".to_string()));
// Binary search for the timestamp
match queue.binary_search_by_key(&timestamp, |update| update.timestamp) {
Ok(pos) => {
*position = pos;
eprintln!(" Found exact match at position {}", pos);
Ok(())
}
Err(pos) => {
// Position where it would be inserted
*position = pos;
eprintln!(" No exact match, would insert at position {}", pos);
Ok(())
}
}
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
self
}
}
// Live data source for paper and live trading
pub struct LiveDataSource {
// Channel to receive data from the orchestrator
data_receiver: tokio::sync::Mutex<Option<tokio::sync::mpsc::Receiver<MarketUpdate>>>,
}
impl LiveDataSource {
pub fn new() -> Self {
Self {
data_receiver: tokio::sync::Mutex::new(None),
}
}
pub async fn set_receiver(&self, receiver: tokio::sync::mpsc::Receiver<MarketUpdate>) {
*self.data_receiver.lock().await = Some(receiver);
}
}
#[async_trait::async_trait]
impl MarketDataSource for LiveDataSource {
async fn get_next_update(&mut self) -> Option<MarketUpdate> {
let mut receiver_guard = self.data_receiver.lock().await;
if let Some(receiver) = receiver_guard.as_mut() {
receiver.recv().await
} else {
None
}
}
fn seek_to_time(&mut self, _timestamp: DateTime<Utc>) -> Result<(), String> {
Err("Cannot seek in live data source".to_string())
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
self
}
}

View file

@ -0,0 +1,476 @@
use crate::{MarketMicrostructure, PriceLevel, Quote, Trade, Bar, Side};
use chrono::{DateTime, Utc, Duration, Timelike};
use rand::prelude::*;
use rand_distr::{Normal, Pareto, Beta};
pub struct OrderBookReconstructor {
tick_size: f64,
lot_size: f64,
num_levels: usize,
spread_model: SpreadModel,
depth_model: DepthModel,
}
#[derive(Clone)]
pub enum SpreadModel {
Fixed { spread_ticks: u32 },
Dynamic { base_bps: f64, volatility_factor: f64 },
InformedTrader { base_bps: f64, information_decay: f64 },
}
#[derive(Clone)]
pub enum DepthModel {
Linear { base_size: f64, decay_rate: f64 },
Exponential { base_size: f64, decay_factor: f64 },
PowerLaw { alpha: f64, x_min: f64 },
}
impl OrderBookReconstructor {
pub fn new(tick_size: f64, lot_size: f64) -> Self {
Self {
tick_size,
lot_size,
num_levels: 10,
spread_model: SpreadModel::Dynamic {
base_bps: 2.0,
volatility_factor: 1.5
},
depth_model: DepthModel::Exponential {
base_size: 1000.0,
decay_factor: 0.7
},
}
}
pub fn reconstruct_from_trades_and_quotes(
&self,
trades: &[(DateTime<Utc>, Trade)],
quotes: &[(DateTime<Utc>, Quote)],
timestamp: DateTime<Utc>,
) -> (Vec<PriceLevel>, Vec<PriceLevel>) {
// Find the most recent quote before timestamp
let recent_quote = quotes.iter()
.filter(|(t, _)| *t <= timestamp)
.last()
.map(|(_, q)| q);
// Find recent trades to estimate market conditions
let recent_trades: Vec<_> = trades.iter()
.filter(|(t, _)| {
let age = timestamp - *t;
age < Duration::minutes(5) && age >= Duration::zero()
})
.map(|(_, t)| t)
.collect();
if let Some(quote) = recent_quote {
// Start with actual quote
self.build_full_book(quote, &recent_trades, timestamp)
} else if !recent_trades.is_empty() {
// Reconstruct from trades only
self.reconstruct_from_trades_only(&recent_trades, timestamp)
} else {
// No data - return empty book
(vec![], vec![])
}
}
fn build_full_book(
&self,
top_quote: &Quote,
recent_trades: &[&Trade],
_timestamp: DateTime<Utc>,
) -> (Vec<PriceLevel>, Vec<PriceLevel>) {
let mut bids = Vec::with_capacity(self.num_levels);
let mut asks = Vec::with_capacity(self.num_levels);
// Add top of book
bids.push(PriceLevel {
price: top_quote.bid,
size: top_quote.bid_size,
order_count: Some(self.estimate_order_count(top_quote.bid_size)),
});
asks.push(PriceLevel {
price: top_quote.ask,
size: top_quote.ask_size,
order_count: Some(self.estimate_order_count(top_quote.ask_size)),
});
// Calculate spread and volatility from recent trades
let (_spread_bps, _volatility) = self.estimate_market_conditions(recent_trades, top_quote);
// Build deeper levels
for i in 1..self.num_levels {
// Bid levels
let bid_price = top_quote.bid - (i as f64 * self.tick_size);
let bid_size = self.calculate_level_size(i, top_quote.bid_size, &self.depth_model);
bids.push(PriceLevel {
price: bid_price,
size: bid_size,
order_count: Some(self.estimate_order_count(bid_size)),
});
// Ask levels
let ask_price = top_quote.ask + (i as f64 * self.tick_size);
let ask_size = self.calculate_level_size(i, top_quote.ask_size, &self.depth_model);
asks.push(PriceLevel {
price: ask_price,
size: ask_size,
order_count: Some(self.estimate_order_count(ask_size)),
});
}
(bids, asks)
}
fn reconstruct_from_trades_only(
&self,
recent_trades: &[&Trade],
_timestamp: DateTime<Utc>,
) -> (Vec<PriceLevel>, Vec<PriceLevel>) {
if recent_trades.is_empty() {
return (vec![], vec![]);
}
// Estimate mid price from trades
let prices: Vec<f64> = recent_trades.iter().map(|t| t.price).collect();
let mid_price = prices.iter().sum::<f64>() / prices.len() as f64;
// Estimate spread from trade price variance
let variance = prices.iter()
.map(|p| (p - mid_price).powi(2))
.sum::<f64>() / prices.len() as f64;
let estimated_spread = variance.sqrt() * 2.0; // Rough approximation
// Build synthetic book
let bid_price = (mid_price - estimated_spread / 2.0 / self.tick_size).round() * self.tick_size;
let ask_price = (mid_price + estimated_spread / 2.0 / self.tick_size).round() * self.tick_size;
// Estimate sizes from trade volumes
let avg_trade_size = recent_trades.iter()
.map(|t| t.size)
.sum::<f64>() / recent_trades.len() as f64;
let mut bids = Vec::with_capacity(self.num_levels);
let mut asks = Vec::with_capacity(self.num_levels);
for i in 0..self.num_levels {
let level_size = avg_trade_size * 10.0 / (i + 1) as f64; // Decay with depth
bids.push(PriceLevel {
price: bid_price - (i as f64 * self.tick_size),
size: level_size,
order_count: Some(self.estimate_order_count(level_size)),
});
asks.push(PriceLevel {
price: ask_price + (i as f64 * self.tick_size),
size: level_size,
order_count: Some(self.estimate_order_count(level_size)),
});
}
(bids, asks)
}
fn calculate_level_size(&self, level: usize, _top_size: f64, model: &DepthModel) -> f64 {
let size = match model {
DepthModel::Linear { base_size, decay_rate } => {
base_size - (level as f64 * decay_rate)
}
DepthModel::Exponential { base_size, decay_factor } => {
base_size * decay_factor.powi(level as i32)
}
DepthModel::PowerLaw { alpha, x_min } => {
x_min * ((level + 1) as f64).powf(-alpha)
}
};
// Round to lot size and ensure positive
((size / self.lot_size).round() * self.lot_size).max(self.lot_size)
}
fn estimate_order_count(&self, size: f64) -> u32 {
// Estimate based on typical order size distribution
let avg_order_size = 100.0;
let base_count = (size / avg_order_size).ceil() as u32;
// Add some randomness
let mut rng = thread_rng();
let variation = rng.gen_range(0.8..1.2);
((base_count as f64 * variation) as u32).max(1)
}
fn estimate_market_conditions(
&self,
recent_trades: &[&Trade],
quote: &Quote,
) -> (f64, f64) {
if recent_trades.is_empty() {
let spread_bps = ((quote.ask - quote.bid) / quote.bid) * 10000.0;
return (spread_bps, 0.02); // Default 2% volatility
}
// Calculate spread in bps
let mid_price = (quote.bid + quote.ask) / 2.0;
let spread_bps = ((quote.ask - quote.bid) / mid_price) * 10000.0;
// Estimate volatility from trade prices
let prices: Vec<f64> = recent_trades.iter().map(|t| t.price).collect();
let returns: Vec<f64> = prices.windows(2)
.map(|w| (w[1] / w[0]).ln())
.collect();
let volatility = if !returns.is_empty() {
let mean_return = returns.iter().sum::<f64>() / returns.len() as f64;
let variance = returns.iter()
.map(|r| (r - mean_return).powi(2))
.sum::<f64>() / returns.len() as f64;
variance.sqrt() * (252.0_f64).sqrt() // Annualize
} else {
0.02 // Default 2%
};
(spread_bps, volatility)
}
}
// Market data synthesizer for generating realistic data
pub struct MarketDataSynthesizer {
base_price: f64,
tick_size: f64,
base_spread_bps: f64,
volatility: f64,
mean_reversion_speed: f64,
jump_intensity: f64,
jump_size_dist: Normal<f64>,
volume_dist: Pareto<f64>,
intraday_pattern: Vec<f64>,
}
impl MarketDataSynthesizer {
pub fn new(symbol_params: &MarketMicrostructure) -> Self {
let jump_size_dist = Normal::new(0.0, symbol_params.volatility * 0.1).unwrap();
let volume_dist = Pareto::new(1.0, 1.5).unwrap();
Self {
base_price: 100.0, // Will be updated with actual price
tick_size: symbol_params.tick_size,
base_spread_bps: symbol_params.avg_spread_bps,
volatility: symbol_params.volatility,
mean_reversion_speed: 0.1,
jump_intensity: 0.05, // 5% chance of jump per time step
jump_size_dist,
volume_dist,
intraday_pattern: symbol_params.intraday_volume_profile.clone(),
}
}
pub fn generate_quote_sequence(
&mut self,
start_price: f64,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
interval_ms: i64,
) -> Vec<(DateTime<Utc>, Quote)> {
self.base_price = start_price;
let mut quotes = Vec::new();
let mut current_time = start_time;
let mut mid_price = start_price;
let mut spread_factor;
let mut rng = thread_rng();
while current_time <= end_time {
// Generate price movement
let dt = interval_ms as f64 / 1000.0 / 86400.0; // Convert to days
// Ornstein-Uhlenbeck process with jumps
let drift = -self.mean_reversion_speed * (mid_price / self.base_price - 1.0).ln();
let diffusion = self.volatility * (dt.sqrt()) * rng.gen::<f64>();
// Add jump component
let jump = if rng.gen::<f64>() < self.jump_intensity * dt {
mid_price * self.jump_size_dist.sample(&mut rng)
} else {
0.0
};
mid_price *= 1.0 + drift * dt + diffusion + jump;
mid_price = (mid_price / self.tick_size).round() * self.tick_size;
// Dynamic spread based on volatility and time of day
let hour_index = current_time.hour() as usize;
let volume_factor = if hour_index < self.intraday_pattern.len() {
self.intraday_pattern[hour_index]
} else {
0.04 // Default 4% of daily volume per hour
};
// Wider spreads during low volume periods
spread_factor = 1.0 / volume_factor.sqrt();
let spread_bps = self.base_spread_bps * spread_factor;
let half_spread = mid_price * spread_bps / 20000.0;
// Generate bid/ask
let bid = ((mid_price - half_spread) / self.tick_size).floor() * self.tick_size;
let ask = ((mid_price + half_spread) / self.tick_size).ceil() * self.tick_size;
// Generate sizes with correlation to spread
let size_multiplier = 1.0 / spread_factor; // Tighter spread = more size
let bid_size = (self.volume_dist.sample(&mut rng) * 1000.0 * size_multiplier).round();
let ask_size = (self.volume_dist.sample(&mut rng) * 1000.0 * size_multiplier).round();
quotes.push((current_time, Quote {
bid,
ask,
bid_size,
ask_size,
}));
current_time = current_time + Duration::milliseconds(interval_ms);
}
quotes
}
pub fn generate_trade_sequence(
&mut self,
quotes: &[(DateTime<Utc>, Quote)],
trade_intensity: f64,
) -> Vec<(DateTime<Utc>, Trade)> {
let mut trades = Vec::new();
let mut rng = thread_rng();
let beta_dist = Beta::new(2.0, 5.0).unwrap(); // Skewed towards smaller trades
for (time, quote) in quotes {
// Poisson process for trade arrivals
let num_trades = rng.gen_range(0..((trade_intensity * 10.0) as u32));
for i in 0..num_trades {
// Determine trade side (slight bias based on spread)
let spread_ratio = (quote.ask - quote.bid) / quote.bid;
let buy_prob = 0.5 - spread_ratio * 10.0; // More sells when spread is wide
let side = if rng.gen::<f64>() < buy_prob {
Side::Buy
} else {
Side::Sell
};
// Trade price (sometimes inside spread for large trades)
let price = match side {
Side::Buy => {
if rng.gen::<f64>() < 0.9 {
quote.ask // Take liquidity
} else {
// Provide liquidity (inside spread)
quote.bid + (quote.ask - quote.bid) * rng.gen::<f64>()
}
}
Side::Sell => {
if rng.gen::<f64>() < 0.9 {
quote.bid // Take liquidity
} else {
// Provide liquidity (inside spread)
quote.bid + (quote.ask - quote.bid) * rng.gen::<f64>()
}
}
};
// Trade size (power law distribution)
let size_percentile = beta_dist.sample(&mut rng);
let base_size = match side {
Side::Buy => quote.ask_size,
Side::Sell => quote.bid_size,
};
let size = (base_size * size_percentile * 0.1).round().max(1.0);
// Add small time offset for multiple trades
let trade_time = *time + Duration::milliseconds(i as i64 * 100);
trades.push((trade_time, Trade {
price,
size,
side,
}));
}
}
trades.sort_by_key(|(t, _)| *t);
trades
}
pub fn aggregate_to_bars(
&self,
trades: &[(DateTime<Utc>, Trade)],
bar_duration: Duration,
) -> Vec<(DateTime<Utc>, Bar)> {
if trades.is_empty() {
return Vec::new();
}
let mut bars = Vec::new();
let mut current_bar_start = trades[0].0;
let mut current_bar_end = current_bar_start + bar_duration;
let mut open = 0.0;
let mut high = 0.0;
let mut low = f64::MAX;
let mut close = 0.0;
let mut volume = 0.0;
let mut vwap_numerator = 0.0;
let mut first_trade = true;
for (time, trade) in trades {
// Check if we need to start a new bar
while *time >= current_bar_end {
if volume > 0.0 {
bars.push((current_bar_start, Bar {
open,
high,
low,
close,
volume,
vwap: Some(vwap_numerator / volume),
}));
}
// Reset for new bar
current_bar_start = current_bar_end;
current_bar_end = current_bar_start + bar_duration;
open = 0.0;
high = 0.0;
low = f64::MAX;
close = 0.0;
volume = 0.0;
vwap_numerator = 0.0;
first_trade = true;
}
// Update current bar
if first_trade {
open = trade.price;
first_trade = false;
}
high = high.max(trade.price);
low = low.min(trade.price);
close = trade.price;
volume += trade.size;
vwap_numerator += trade.price * trade.size;
}
// Add final bar if it has data
if volume > 0.0 {
bars.push((current_bar_start, Bar {
open,
high,
low,
close,
volume,
vwap: Some(vwap_numerator / volume),
}));
}
bars
}
}

View file

@ -0,0 +1,229 @@
use crate::{MarketUpdate, MarketDataType, Quote, Trade, Bar, Side};
use chrono::{DateTime, Utc, Duration};
use rand::{Rng, SeedableRng};
use rand::rngs::StdRng;
use rand_distr::{Normal, Distribution};
pub struct MockDataGenerator {
rng: StdRng,
base_price: f64,
volatility: f64,
spread_bps: f64,
volume_mean: f64,
volume_std: f64,
}
impl MockDataGenerator {
pub fn new(seed: u64) -> Self {
Self {
rng: StdRng::seed_from_u64(seed),
base_price: 100.0,
volatility: 0.02, // 2% daily volatility
spread_bps: 5.0, // 5 basis points spread
volume_mean: 1_000_000.0,
volume_std: 200_000.0,
}
}
pub fn with_params(seed: u64, base_price: f64, volatility: f64, spread_bps: f64) -> Self {
Self {
rng: StdRng::seed_from_u64(seed),
base_price,
volatility,
spread_bps,
volume_mean: 1_000_000.0,
volume_std: 200_000.0,
}
}
pub fn generate_quotes(
&mut self,
symbol: String,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
interval_ms: i64,
) -> Vec<MarketUpdate> {
let mut updates = Vec::new();
let mut current_time = start_time;
let mut price = self.base_price;
let price_dist = Normal::new(0.0, self.volatility).unwrap();
let volume_dist = Normal::new(self.volume_mean, self.volume_std).unwrap();
while current_time <= end_time {
// Generate price movement
let return_pct = price_dist.sample(&mut self.rng) / 100.0;
price *= 1.0 + return_pct;
price = price.max(0.01); // Ensure positive price
// Calculate bid/ask
let half_spread = price * self.spread_bps / 20000.0;
let bid = price - half_spread;
let ask = price + half_spread;
// Generate volume
let volume = volume_dist.sample(&mut self.rng).max(0.0) as u32;
updates.push(MarketUpdate {
symbol: symbol.clone(),
timestamp: current_time,
data: MarketDataType::Quote(Quote {
bid,
ask,
bid_size: (volume / 10) as f64,
ask_size: (volume / 10) as f64,
}),
});
current_time = current_time + Duration::milliseconds(interval_ms);
}
updates
}
pub fn generate_trades(
&mut self,
symbol: String,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
trades_per_minute: u32,
) -> Vec<MarketUpdate> {
let mut updates = Vec::new();
let mut current_time = start_time;
let mut price = self.base_price;
let price_dist = Normal::new(0.0, self.volatility / 100.0).unwrap();
let volume_dist = Normal::new(100.0, 50.0).unwrap();
let interval_ms = 60_000 / trades_per_minute as i64;
while current_time <= end_time {
// Generate price movement
let return_pct = price_dist.sample(&mut self.rng);
price *= 1.0 + return_pct;
price = price.max(0.01);
// Generate trade size
let raw_size: f64 = volume_dist.sample(&mut self.rng);
let size = raw_size.max(1.0) as u32;
// Random buy/sell
let is_buy = self.rng.gen_bool(0.5);
updates.push(MarketUpdate {
symbol: symbol.clone(),
timestamp: current_time,
data: MarketDataType::Trade(Trade {
price,
size: size as f64,
side: if is_buy { Side::Buy } else { Side::Sell },
}),
});
current_time = current_time + Duration::milliseconds(interval_ms);
}
updates
}
pub fn generate_bars(
&mut self,
symbol: String,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
timeframe: &str,
) -> Vec<MarketUpdate> {
let mut updates = Vec::new();
let mut current_time = start_time;
let mut price = self.base_price;
let interval = match timeframe {
"1m" => Duration::minutes(1),
"5m" => Duration::minutes(5),
"15m" => Duration::minutes(15),
"1h" => Duration::hours(1),
"1d" => Duration::days(1),
_ => Duration::minutes(1),
};
let price_dist = Normal::new(0.0, self.volatility).unwrap();
let volume_dist = Normal::new(self.volume_mean, self.volume_std).unwrap();
while current_time <= end_time {
// Generate OHLC
let open = price;
let mut high = open;
let mut low = open;
// Simulate intrabar movements
for _ in 0..4 {
let move_pct = price_dist.sample(&mut self.rng) / 100.0;
price *= 1.0 + move_pct;
price = price.max(0.01);
high = high.max(price);
low = low.min(price);
}
let close = price;
let volume = volume_dist.sample(&mut self.rng).max(0.0) as u64;
updates.push(MarketUpdate {
symbol: symbol.clone(),
timestamp: current_time,
data: MarketDataType::Bar(Bar {
open,
high,
low,
close,
volume: volume as f64,
vwap: Some((open + high + low + close) / 4.0),
}),
});
current_time = current_time + interval;
}
updates
}
pub fn generate_mixed_data(
&mut self,
symbol: String,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
) -> Vec<MarketUpdate> {
let mut all_updates = Vec::new();
// Generate quotes every 100ms
let quotes = self.generate_quotes(
symbol.clone(),
start_time,
end_time,
100
);
all_updates.extend(quotes);
// Generate trades
let trades = self.generate_trades(
symbol.clone(),
start_time,
end_time,
20 // 20 trades per minute
);
all_updates.extend(trades);
// Generate 1-minute bars
let bars = self.generate_bars(
symbol,
start_time,
end_time,
"1m"
);
all_updates.extend(bars);
// Sort by timestamp
all_updates.sort_by_key(|update| update.timestamp);
all_updates
}
}

View file

@ -0,0 +1,51 @@
pub mod time_providers;
pub mod market_data_sources;
pub mod execution_handlers;
pub mod market_microstructure;
pub mod mock_data_generator;
use crate::{MarketDataSource, ExecutionHandler, TimeProvider, TradingMode};
// Factory functions to create appropriate implementations based on mode
pub fn create_market_data_source(mode: &TradingMode) -> Box<dyn MarketDataSource> {
match mode {
TradingMode::Backtest { .. } => {
Box::new(market_data_sources::HistoricalDataSource::new())
}
TradingMode::Paper { .. } | TradingMode::Live { .. } => {
Box::new(market_data_sources::LiveDataSource::new())
}
}
}
pub fn create_execution_handler(mode: &TradingMode) -> Box<dyn ExecutionHandler> {
match mode {
TradingMode::Backtest { .. } => {
Box::new(execution_handlers::SimulatedExecution::new(
Box::new(execution_handlers::BacktestFillSimulator::new())
))
}
TradingMode::Paper { .. } => {
Box::new(execution_handlers::SimulatedExecution::new(
Box::new(execution_handlers::PaperFillSimulator::new())
))
}
TradingMode::Live { broker, account_id } => {
Box::new(execution_handlers::BrokerExecution::new(
broker.clone(),
account_id.clone()
))
}
}
}
pub fn create_time_provider(mode: &TradingMode) -> Box<dyn TimeProvider> {
match mode {
TradingMode::Backtest { start_time, .. } => {
Box::new(time_providers::SimulatedTime::new(*start_time))
}
TradingMode::Paper { .. } | TradingMode::Live { .. } => {
Box::new(time_providers::SystemTime::new())
}
}
}

View file

@ -0,0 +1,74 @@
use crate::TimeProvider;
use chrono::{DateTime, Utc};
use parking_lot::Mutex;
use std::sync::Arc;
// Real-time provider for paper and live trading
pub struct SystemTime;
impl SystemTime {
pub fn new() -> Self {
Self
}
}
impl TimeProvider for SystemTime {
fn now(&self) -> DateTime<Utc> {
Utc::now()
}
fn sleep_until(&self, target: DateTime<Utc>) -> Result<(), String> {
let now = Utc::now();
if target > now {
let duration = (target - now).to_std()
.map_err(|e| format!("Invalid duration: {}", e))?;
std::thread::sleep(duration);
}
Ok(())
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
// Simulated time for backtesting
pub struct SimulatedTime {
current_time: Arc<Mutex<DateTime<Utc>>>,
}
impl SimulatedTime {
pub fn new(start_time: DateTime<Utc>) -> Self {
Self {
current_time: Arc::new(Mutex::new(start_time)),
}
}
pub fn advance_to(&self, new_time: DateTime<Utc>) {
let mut current = self.current_time.lock();
if new_time > *current {
*current = new_time;
}
}
pub fn advance_by(&self, duration: chrono::Duration) {
let mut current = self.current_time.lock();
*current = *current + duration;
}
}
impl TimeProvider for SimulatedTime {
fn now(&self) -> DateTime<Utc> {
*self.current_time.lock()
}
fn sleep_until(&self, _target: DateTime<Utc>) -> Result<(), String> {
// In backtest mode, we don't actually sleep
// Time is controlled by the backtest engine
Ok(())
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}