import numpy as np import pandas as pd from typing import Dict, List, Tuple, Optional, Union import talib from scipy import stats from sklearn.preprocessing import StandardScaler, RobustScaler import logging logger = logging.getLogger(__name__) class FeatureEngineer: """ Feature engineering for financial ML models """ def __init__(self, lookback_periods: List[int] = None): self.lookback_periods = lookback_periods or [5, 10, 20, 50, 100, 200] self.scaler = RobustScaler() # Robust to outliers self.feature_names: List[str] = [] def create_features( self, data: pd.DataFrame, include_technical: bool = True, include_microstructure: bool = True, include_fundamental: bool = False, include_sentiment: bool = False ) -> pd.DataFrame: """ Create comprehensive feature set for ML models """ features = pd.DataFrame(index=data.index) # Price-based features logger.info("Creating price-based features...") price_features = self._create_price_features(data) features = pd.concat([features, price_features], axis=1) # Technical indicators if include_technical: logger.info("Creating technical indicators...") tech_features = self._create_technical_features(data) features = pd.concat([features, tech_features], axis=1) # Microstructure features if include_microstructure: logger.info("Creating microstructure features...") micro_features = self._create_microstructure_features(data) features = pd.concat([features, micro_features], axis=1) # Fundamental features (if available) if include_fundamental and 'earnings' in data.columns: logger.info("Creating fundamental features...") fund_features = self._create_fundamental_features(data) features = pd.concat([features, fund_features], axis=1) # Sentiment features (if available) if include_sentiment and 'sentiment' in data.columns: logger.info("Creating sentiment features...") sent_features = self._create_sentiment_features(data) features = pd.concat([features, sent_features], axis=1) # Time-based features logger.info("Creating time-based features...") time_features = self._create_time_features(data) features = pd.concat([features, time_features], axis=1) # Cross-sectional features (if multiple symbols) if 'symbol' in data.columns and data['symbol'].nunique() > 1: logger.info("Creating cross-sectional features...") cross_features = self._create_cross_sectional_features(data) features = pd.concat([features, cross_features], axis=1) # Store feature names self.feature_names = features.columns.tolist() # Handle missing values features = self._handle_missing_values(features) return features def _create_price_features(self, data: pd.DataFrame) -> pd.DataFrame: """Create price-based features""" features = pd.DataFrame(index=data.index) # Returns at different horizons for period in self.lookback_periods: features[f'returns_{period}'] = data['close'].pct_change(period) features[f'log_returns_{period}'] = np.log(data['close'] / data['close'].shift(period)) # Price ratios features['high_low_ratio'] = data['high'] / data['low'] features['close_open_ratio'] = data['close'] / data['open'] # Price position in range features['price_position'] = (data['close'] - data['low']) / (data['high'] - data['low']).replace(0, np.nan) # Volume-weighted metrics if 'volume' in data.columns: features['vwap'] = (data['close'] * data['volume']).rolling(20).sum() / data['volume'].rolling(20).sum() features['volume_ratio'] = data['volume'] / data['volume'].rolling(20).mean() features['dollar_volume'] = data['close'] * data['volume'] # Volatility measures for period in [5, 20, 50]: features[f'volatility_{period}'] = data['close'].pct_change().rolling(period).std() * np.sqrt(252) features[f'realized_var_{period}'] = (data['close'].pct_change() ** 2).rolling(period).sum() # Price momentum features['momentum_1m'] = data['close'] / data['close'].shift(20) - 1 features['momentum_3m'] = data['close'] / data['close'].shift(60) - 1 features['momentum_6m'] = data['close'] / data['close'].shift(120) - 1 # Relative strength for short, long in [(10, 30), (20, 50), (50, 200)]: features[f'rs_{short}_{long}'] = ( data['close'].rolling(short).mean() / data['close'].rolling(long).mean() ) return features def _create_technical_features(self, data: pd.DataFrame) -> pd.DataFrame: """Create technical indicator features""" features = pd.DataFrame(index=data.index) # Moving averages for period in self.lookback_periods: sma = talib.SMA(data['close'].values, timeperiod=period) ema = talib.EMA(data['close'].values, timeperiod=period) features[f'sma_{period}'] = sma features[f'ema_{period}'] = ema features[f'price_to_sma_{period}'] = data['close'] / sma # Bollinger Bands for period in [20, 50]: upper, middle, lower = talib.BBANDS( data['close'].values, timeperiod=period, nbdevup=2, nbdevdn=2 ) features[f'bb_upper_{period}'] = upper features[f'bb_lower_{period}'] = lower features[f'bb_width_{period}'] = (upper - lower) / middle features[f'bb_position_{period}'] = (data['close'] - lower) / (upper - lower) # RSI for period in [14, 28]: features[f'rsi_{period}'] = talib.RSI(data['close'].values, timeperiod=period) # MACD macd, signal, hist = talib.MACD(data['close'].values) features['macd'] = macd features['macd_signal'] = signal features['macd_hist'] = hist # Stochastic slowk, slowd = talib.STOCH( data['high'].values, data['low'].values, data['close'].values ) features['stoch_k'] = slowk features['stoch_d'] = slowd # ADX (Average Directional Index) features['adx'] = talib.ADX( data['high'].values, data['low'].values, data['close'].values ) # ATR (Average True Range) for period in [14, 20]: features[f'atr_{period}'] = talib.ATR( data['high'].values, data['low'].values, data['close'].values, timeperiod=period ) # CCI (Commodity Channel Index) features['cci'] = talib.CCI( data['high'].values, data['low'].values, data['close'].values ) # Williams %R features['williams_r'] = talib.WILLR( data['high'].values, data['low'].values, data['close'].values ) # OBV (On Balance Volume) if 'volume' in data.columns: features['obv'] = talib.OBV(data['close'].values, data['volume'].values) features['obv_ema'] = talib.EMA(features['obv'].values, timeperiod=20) return features def _create_microstructure_features(self, data: pd.DataFrame) -> pd.DataFrame: """Create market microstructure features""" features = pd.DataFrame(index=data.index) # Spread estimation (using high-low) features['hl_spread'] = 2 * (data['high'] - data['low']) / (data['high'] + data['low']) features['hl_spread_ma'] = features['hl_spread'].rolling(20).mean() # Roll's implied spread if len(data) > 2: returns = data['close'].pct_change() features['roll_spread'] = 2 * np.sqrt(-returns.rolling(20).cov(returns.shift(1))) # Amihud illiquidity if 'volume' in data.columns: features['amihud'] = (returns.abs() / (data['volume'] * data['close'])).rolling(20).mean() * 1e6 features['log_amihud'] = np.log(features['amihud'].replace(0, np.nan) + 1e-10) # Kyle's lambda (price impact) if 'volume' in data.columns: # Simplified version using rolling regression for period in [20, 50]: price_changes = data['close'].pct_change() signed_volume = data['volume'] * np.sign(price_changes) # Rolling correlation as proxy for Kyle's lambda features[f'kyle_lambda_{period}'] = ( price_changes.rolling(period).corr(signed_volume) * price_changes.rolling(period).std() / signed_volume.rolling(period).std() ) # Intraday patterns if 'timestamp' in data.columns: data['hour'] = pd.to_datetime(data['timestamp']).dt.hour data['minute'] = pd.to_datetime(data['timestamp']).dt.minute # Time since market open (assuming 9:30 AM open) features['minutes_since_open'] = (data['hour'] - 9) * 60 + data['minute'] - 30 features['minutes_to_close'] = 390 - features['minutes_since_open'] # 6.5 hour day # Normalized time of day features['time_of_day_norm'] = features['minutes_since_open'] / 390 # Order flow imbalance proxy features['high_low_imbalance'] = (data['high'] - data['close']) / (data['close'] - data['low'] + 1e-10) features['close_position_in_range'] = (data['close'] - data['low']) / (data['high'] - data['low'] + 1e-10) return features def _create_fundamental_features(self, data: pd.DataFrame) -> pd.DataFrame: """Create fundamental analysis features""" features = pd.DataFrame(index=data.index) # Price to earnings if 'earnings' in data.columns: features['pe_ratio'] = data['close'] / data['earnings'] features['earnings_yield'] = data['earnings'] / data['close'] features['pe_relative'] = features['pe_ratio'] / features['pe_ratio'].rolling(252).mean() # Price to book if 'book_value' in data.columns: features['pb_ratio'] = data['close'] / data['book_value'] features['pb_relative'] = features['pb_ratio'] / features['pb_ratio'].rolling(252).mean() # Dividend yield if 'dividends' in data.columns: features['dividend_yield'] = data['dividends'].rolling(252).sum() / data['close'] features['dividend_growth'] = data['dividends'].pct_change(252) # Sales/Revenue metrics if 'revenue' in data.columns: features['price_to_sales'] = data['close'] * data['shares_outstanding'] / data['revenue'] features['revenue_growth'] = data['revenue'].pct_change(4) # YoY for quarterly # Profitability metrics if 'net_income' in data.columns and 'total_assets' in data.columns: features['roe'] = data['net_income'] / data['shareholders_equity'] features['roa'] = data['net_income'] / data['total_assets'] features['profit_margin'] = data['net_income'] / data['revenue'] return features def _create_sentiment_features(self, data: pd.DataFrame) -> pd.DataFrame: """Create sentiment-based features""" features = pd.DataFrame(index=data.index) if 'sentiment' in data.columns: # Raw sentiment features['sentiment'] = data['sentiment'] features['sentiment_ma'] = data['sentiment'].rolling(20).mean() features['sentiment_std'] = data['sentiment'].rolling(20).std() # Sentiment momentum features['sentiment_change'] = data['sentiment'].pct_change(5) features['sentiment_momentum'] = data['sentiment'] - data['sentiment'].shift(20) # Sentiment extremes features['sentiment_zscore'] = ( (data['sentiment'] - features['sentiment_ma']) / features['sentiment_std'] ) # Sentiment divergence from price price_zscore = (data['close'] - data['close'].rolling(20).mean()) / data['close'].rolling(20).std() features['sentiment_price_divergence'] = features['sentiment_zscore'] - price_zscore # News volume features if 'news_count' in data.columns: features['news_volume'] = data['news_count'] features['news_volume_ma'] = data['news_count'].rolling(5).mean() features['news_spike'] = data['news_count'] / features['news_volume_ma'] # Social media features if 'twitter_mentions' in data.columns: features['social_volume'] = data['twitter_mentions'] features['social_momentum'] = data['twitter_mentions'].pct_change(1) features['social_vs_avg'] = data['twitter_mentions'] / data['twitter_mentions'].rolling(20).mean() return features def _create_time_features(self, data: pd.DataFrame) -> pd.DataFrame: """Create time-based features""" features = pd.DataFrame(index=data.index) if 'timestamp' in data.columns: timestamps = pd.to_datetime(data['timestamp']) # Day of week features['day_of_week'] = timestamps.dt.dayofweek features['is_monday'] = (features['day_of_week'] == 0).astype(int) features['is_friday'] = (features['day_of_week'] == 4).astype(int) # Month features['month'] = timestamps.dt.month features['is_quarter_end'] = timestamps.dt.month.isin([3, 6, 9, 12]).astype(int) features['is_year_end'] = timestamps.dt.month.eq(12).astype(int) # Trading day in month features['trading_day_of_month'] = timestamps.dt.day features['trading_day_of_year'] = timestamps.dt.dayofyear # Seasonality features features['sin_day_of_year'] = np.sin(2 * np.pi * features['trading_day_of_year'] / 365) features['cos_day_of_year'] = np.cos(2 * np.pi * features['trading_day_of_year'] / 365) # Options expiration week (third Friday) features['is_opex_week'] = self._is_options_expiration_week(timestamps) # Fed meeting weeks (approximate) features['is_fed_week'] = self._is_fed_meeting_week(timestamps) return features def _create_cross_sectional_features(self, data: pd.DataFrame) -> pd.DataFrame: """Create features comparing across multiple symbols""" features = pd.DataFrame(index=data.index) # Calculate market averages market_returns = data.groupby('timestamp')['close'].mean().pct_change() market_volume = data.groupby('timestamp')['volume'].mean() # Relative performance data['returns'] = data.groupby('symbol')['close'].pct_change() features['relative_returns'] = data['returns'] - market_returns[data['timestamp']].values features['relative_volume'] = data['volume'] / market_volume[data['timestamp']].values # Sector/market correlation for period in [20, 50]: rolling_corr = data.groupby('symbol')['returns'].rolling(period).corr(market_returns) features[f'market_correlation_{period}'] = rolling_corr # Cross-sectional momentum features['cross_sectional_rank'] = data.groupby('timestamp')['returns'].rank(pct=True) return features def _handle_missing_values(self, features: pd.DataFrame) -> pd.DataFrame: """Handle missing values in features""" # Forward fill for small gaps features = features.fillna(method='ffill', limit=5) # For remaining NaNs, use median of non-missing values for col in features.columns: if features[col].isna().any(): median_val = features[col].median() features[col].fillna(median_val, inplace=True) # Replace any infinities features = features.replace([np.inf, -np.inf], np.nan) features = features.fillna(0) return features def _is_options_expiration_week(self, timestamps: pd.Series) -> pd.Series: """Identify options expiration weeks (third Friday of month)""" # This is a simplified version is_third_week = (timestamps.dt.day >= 15) & (timestamps.dt.day <= 21) is_friday = timestamps.dt.dayofweek == 4 return (is_third_week & is_friday).astype(int) def _is_fed_meeting_week(self, timestamps: pd.Series) -> pd.Series: """Identify approximate Fed meeting weeks""" # Fed typically meets 8 times per year, roughly every 6 weeks # This is a simplified approximation week_of_year = timestamps.dt.isocalendar().week return (week_of_year % 6 == 0).astype(int) def transform_features( self, features: pd.DataFrame, method: str = 'robust', clip_outliers: bool = True, clip_quantile: float = 0.01 ) -> pd.DataFrame: """ Transform features for ML models """ transformed = features.copy() # Clip outliers if requested if clip_outliers: lower = features.quantile(clip_quantile) upper = features.quantile(1 - clip_quantile) transformed = features.clip(lower=lower, upper=upper, axis=1) # Scale features if method == 'robust': scaler = RobustScaler() elif method == 'standard': scaler = StandardScaler() else: raise ValueError(f"Unknown scaling method: {method}") scaled_values = scaler.fit_transform(transformed) transformed = pd.DataFrame( scaled_values, index=features.index, columns=features.columns ) self.scaler = scaler return transformed def get_feature_importance( self, features: pd.DataFrame, target: pd.Series, method: str = 'mutual_info' ) -> pd.DataFrame: """ Calculate feature importance scores """ importance_scores = {} if method == 'mutual_info': from sklearn.feature_selection import mutual_info_regression scores = mutual_info_regression(features, target) importance_scores['mutual_info'] = scores elif method == 'correlation': scores = features.corrwith(target).abs() importance_scores['correlation'] = scores.values elif method == 'random_forest': from sklearn.ensemble import RandomForestRegressor rf = RandomForestRegressor(n_estimators=100, random_state=42) rf.fit(features, target) importance_scores['rf_importance'] = rf.feature_importances_ # Create DataFrame with results importance_df = pd.DataFrame( importance_scores, index=features.columns ).sort_values(by=list(importance_scores.keys())[0], ascending=False) return importance_df