Source code for optrade.analysis.factors

import pandas as pd
import pandas_datareader.data as web
import statsmodels.api as sm
from datetime import datetime
import warnings
import numpy as np
from typing import Dict, Any, Optional, Union, List

warnings.filterwarnings("ignore", message="The argument 'date_parser' is deprecated")

# Custom modules
from optrade.data.thetadata import load_stock_data_eod


[docs] def get_factor_exposures( root: str, start_date: str, end_date: str, mode: str = "ff3", ) -> Dict[str, Any]: """ Calculate factor model exposures for a stock over the specified period. Supports Fama-French 3-factor (ff3), Fama-French 5-factor (ff5), and Carhart 4-factor (c4) models. Args: root (str): Root symbol of the underlying security start_date (str): Start date in YYYYMMDD format end_date (str): End date in YYYYMMDD format mode (str): Mode for the factor model. Options: "ff3" (Fama-French 3 factor), "ff5" (Fama-French 5 factor), or "c4" (Carhart 4 factor). Returns: Dictionary containing the factor betas: - market_beta: Market excess return sensitivity - size_beta: Small Minus Big (SMB) factor exposure - value_beta: High Minus Low (HML) book-to-market factor exposure - momentum_beta: Winners Minus Losers (WML) momentum factor (Carhart model only) - profitability_beta: Robust Minus Weak (RMW) profitability factor (5-factor only) - investment_beta: Conservative Minus Aggressive (CMA) investment factor (5-factor only) - r_squared: Proportion of return variation explained by the factors """ # Suppress the date_parser deprecation warning warnings.filterwarnings( "ignore", message="The argument 'date_parser' is deprecated" ) # Convert date strings to datetime objects factor_start_date = datetime.strptime(start_date, "%Y%m%d") factor_end_date = datetime.strptime(end_date, "%Y%m%d") # Get stock data stock_data = load_stock_data_eod( root=root, start_date=start_date, end_date=end_date, clean_up=True, offline=False, ) # Calculate daily returns stock_data["returns"] = stock_data["close"].pct_change().dropna() stock_data["Date"] = stock_data["datetime"].dt.date # Drop NaN stock_data = stock_data.dropna() # Drop all other columns besides Date and returns stock_data = stock_data[["Date", "returns"]] # Get factor data based on mode if mode == "ff3": factor_data = web.DataReader( "F-F_Research_Data_Factors_daily", "famafrench", start=factor_start_date, end=factor_end_date, )[0] factor_columns = ["Mkt-RF", "SMB", "HML"] elif mode == "ff5": factor_data = web.DataReader( "F-F_Research_Data_5_Factors_2x3_daily", "famafrench", start=factor_start_date, end=factor_end_date, )[0] factor_columns = ["Mkt-RF", "SMB", "HML", "RMW", "CMA"] elif mode == "c4": # Get FF 3-factor data factor_data = web.DataReader( "F-F_Research_Data_Factors_daily", "famafrench", start=factor_start_date, end=factor_end_date, )[0] # Get momentum factor data - note the column name has trailing spaces mom_data = web.DataReader( "F-F_Momentum_Factor_daily", "famafrench", start=factor_start_date, end=factor_end_date, )[0] # Fix the column name to remove trailing spaces mom_data.columns = [col.strip() for col in mom_data.columns] # Merge FF 3-factor with momentum factor_data = pd.merge(factor_data, mom_data, left_index=True, right_index=True) factor_columns = ["Mkt-RF", "SMB", "HML", "Mom"] else: raise ValueError(f"Invalid mode: {mode}. Choose 'ff3', 'c4', or 'ff5'.") # Convert percentages to decimals factor_data = factor_data / 100 # Truncate factor_data to the same date range as stock_data["Date"] valid_dates = pd.DatetimeIndex(stock_data["Date"]) factor_data = factor_data.loc[factor_data.index.intersection(valid_dates)] # Reset index to make Date a column factor_data_reset = factor_data.reset_index() factor_data_reset["Date"] = factor_data_reset["Date"].dt.date # Merge stock_data with factor_data on Date aligned_data = pd.merge(stock_data, factor_data_reset, on="Date", how="inner") # Linear regression X = aligned_data[factor_columns] X = sm.add_constant(X) y = aligned_data["returns"] - aligned_data["RF"] # Excess return # Ensure y is 1-dimensional if isinstance(y, pd.DataFrame): y = y.iloc[:, 0] # Run regression model = sm.OLS(y, X).fit() # Prepare results result = { "market_beta": model.params.get("Mkt-RF", None), "size_beta": model.params.get("SMB", None), "value_beta": model.params.get("HML", None), "r_squared": model.rsquared, } # Add momentum for Carhart model if mode == "c4": result["momentum_beta"] = model.params.get("Mom", None) # Add additional factors for 5-factor model if mode == "ff5": result["profitability_beta"] = model.params.get("RMW", None) result["investment_beta"] = model.params.get("CMA", None) return result
[docs] def factor_categorization( factors: Dict[str, Dict[str, float]], mode: str = "ff3" ) -> Dict[str, Dict[str, str]]: """ Categorize stocks based on their factor model exposures using percentiles. Args: factors: Nested dictionary where: - Outer key is the root symbol - Inner key is the factor type - Value is the factor beta mode: Factor model type ("ff3", "ff5", or "c4") Returns: Nested dictionary with categorizations for each stock and factor """ # Define factor sets for each model model_factors = { "ff3": {"market_beta", "size_beta", "value_beta"}, "ff5": { "market_beta", "size_beta", "value_beta", "profitability_beta", "investment_beta", }, "c4": {"market_beta", "size_beta", "value_beta", "momentum_beta"}, } # Get relevant factors for this mode relevant_factors = model_factors[mode] # Define factor category mappings factor_mappings = { "market_beta": {"high": "high", "low": "low", "neutral": "neutral"}, "size_beta": {"high": "small_cap", "low": "large_cap", "neutral": "neutral"}, "value_beta": {"high": "value", "low": "growth", "neutral": "neutral"}, "momentum_beta": {"high": "high", "low": "low", "neutral": "neutral"}, "profitability_beta": {"high": "robust", "low": "weak", "neutral": "neutral"}, "investment_beta": { "high": "conservative", "low": "aggressive", "neutral": "neutral", }, } # Calculate percentiles for each relevant factor percentiles = {} for factor_type in relevant_factors: # Extract values, ignoring None values = [ f[factor_type] for f in factors.values() if factor_type in f and f[factor_type] is not None ] if values: percentiles[factor_type] = [ np.percentile(values, 30), # 30th percentile np.percentile(values, 70), # 70th percentile ] # Function to categorize a single factor value def categorize_factor(factor_type, value): # Special case for market beta if factor_type == "market_beta": if value > 1.1: return "high" elif value < 0.9: return "low" else: return "neutral" # For other factors, use percentiles if available if factor_type in percentiles: if value > percentiles[factor_type][1]: return factor_mappings[factor_type]["high"] elif value < percentiles[factor_type][0]: return factor_mappings[factor_type]["low"] # Default case return factor_mappings[factor_type]["neutral"] # Build the categorization result result = {} for root, root_factors in factors.items(): result[root] = {} for factor_type in relevant_factors: if factor_type in root_factors and root_factors[factor_type] is not None: result[root][factor_type] = categorize_factor( factor_type, root_factors[factor_type] ) return result
# Function to calculate factor exposures for multiple stocks
[docs] def get_universe_factor_exposures( roots: List[str], start_date: str, end_date: str, mode: str = "ff3" ) -> Dict[str, Dict[str, float]]: """ Calculate factor model exposures for multiple stocks over the specified period. Args: roots: List of stock roots to analyze start_date: Start date in YYYYMMDD format end_date: End date in YYYYMMDD format mode: Factor model to use ("ff3", "ff5", or "c4") Returns: Nested dictionary where: - Outer key is the root symbol - Inner key is the factor type - Value is the factor beta """ # Collect factor betas for all stocks all_factors = {} for root in roots: try: # Get factor exposures for the stock factors = get_factor_exposures( root=root, start_date=start_date, end_date=end_date, mode=mode ) all_factors[symbol] = factors except Exception as e: all_factors[symbol] = None print(f"Error processing {symbol}: {e}") continue return all_factors
# Example usage if __name__ == "__main__": from rich.console import Console ctx = Console() # Set test period (1 year) start_date = "20230101" # YYYYMMDD format end_date = "20231231" # YYYYMMDD format # Define a sample universe of stocks sample_universe = [ "AAPL", "MSFT", "AMZN", "GOOGL", "META", "TSLA", "NVDA", "JPM", "V", "PG", ] print( f"Testing Carhart 4-factor model for sample universe from {start_date} to {end_date}" ) # Calculate factor exposures for all stocks in the universe universe_factors = get_universe_factor_exposures( sample_universe, start_date, end_date, mode="c4" ) # Categorize the stocks based on their factor exposures universe_categorization = factor_categorization(universe_factors, mode="c4") print("\nStock Factor Categorization and Values:") for symbol, factors in universe_categorization.items(): ctx.log(f"{symbol}: {factors}") print("\nFactor Exposures:") for symbol, factors in universe_factors.items(): ctx.log(f"{symbol}: {factors}")