Source code for optrade.utils.volatility

import pandas as pd
import pandas_market_calendars as mcal
import numpy as np
import warnings
from rich.console import Console

warnings.filterwarnings("ignore", message="The argument 'date_parser' is deprecated")

# Custom modules
from optrade.data.thetadata import load_stock_data


[docs] def get_historical_vol( stock_data: pd.DataFrame, volatility_type: str = "period", verbose: bool = False, ) -> float: """ Calculate historical volatility using intraday data from regular trading hours (9:30AM-3:59PM). Uses mid prices ((bid+ask)/2) for return calculations to avoid bid-ask bounce. Properly accounts for overnight return removal between trading days. Args: stock_data: DataFrame with datetime column in format "YYYY-MM-DD HH:MM:SS" Must be sorted and contain regular intervals during trading hours volatility_type: Type of volatility to calculate. Options: "daily", "period", "annualized". Returns: Volatility value based on the specified type """ ctx = Console() # Convert datetime strings to datetime objects datetimes = pd.to_datetime(stock_data["datetime"]) # Verify data is sorted assert ( datetimes.diff().dropna() > pd.Timedelta(0) ).all(), "DataFrame must be sorted by datetime in ascending order" # Calculate mid prices mid_prices = (stock_data["bid"] + stock_data["ask"]) / 2 unique_dates = stock_data["datetime"].dt.date.unique() # Drop NaN values unique_dates = unique_dates[~pd.isna(unique_dates)] num_trading_days = len(unique_dates) # Get intervals per day by counting observations in first full day first_day = stock_data[stock_data["datetime"].dt.date == unique_dates[0]] intervals_per_day = len(first_day) returns_per_day = intervals_per_day - 1 if verbose: ctx.log( f"Intervals per day are {intervals_per_day} and returns per day are {returns_per_day}" ) # Calculate log returns using mid prices log_returns = np.log(mid_prices.values[1:] / mid_prices.values[:-1]) # Remove overnight returns (last interval of day to first interval of next day) valid_return_days = (np.arange(len(log_returns)) + 1) % intervals_per_day != 0 # Validate the valid_return_days n = np.random.randint(0, num_trading_days - 1) idx = (returns_per_day) + (intervals_per_day * n) assert ( valid_return_days[idx] == False ), f"Did not remove correct overnight returns, expected False at index {idx}, got {valid_return_days[idx]}" # Get valid log returns values valid_returns = log_returns[valid_return_days] # Expected valid returns = (returns per day * number of days) expected_valid_returns = returns_per_day * num_trading_days assert ( valid_returns.shape[0] == expected_valid_returns ), f"Number of valid returns ({valid_returns.shape[0]}) does not match expected ({expected_valid_returns})" # Calculate the standard deviation of interval returns interval_vol = np.std(valid_returns) # Scale up to daily volatility (sqrt of number of returns per day) daily_vol = interval_vol * np.sqrt(returns_per_day) # Scale to the number of trading days in the dataset period_vol = daily_vol * np.sqrt(num_trading_days) if volatility_type == "daily": return daily_vol elif volatility_type == "period": return period_vol elif volatility_type == "annualized": return period_vol * np.sqrt(252 / num_trading_days) else: raise ValueError(f"Invalid volatility type: {type}")
[docs] def get_train_historical_vol( root: str, start_date: str, end_date: str, interval_min: int, volatility_window: float, volatility_type: str, ) -> float: """ Get historical volatility for a stock over a given time period. Args: root: Underlying stock symbol start_date: Start date for the total dataset in YYYYMMDD format end_date: End date for the total dataset in YYYYMMDD format interval_min: Interval in minutes for the underlying stock data volatility_window: Proportion of total days to use for historical volatility calculation volatility_type: Type of historical volatility to use. Options: "daily", "period", "annualized". Returns: Historical volatility value based on the specified type. """ # Calculate number of days to use for historical volatility total_days = ( pd.to_datetime(end_date, format="%Y%m%d") - pd.to_datetime(start_date, format="%Y%m%d") ).days num_vol_days = int(volatility_window * total_days) vol_end_date = ( pd.to_datetime(start_date, format="%Y%m%d") + pd.Timedelta(days=num_vol_days) ).strftime("%Y%m%d") stock_data = load_stock_data( root=root, start_date=start_date, end_date=end_date, interval_min=interval_min, clean_up=True, ) # Select only the first num_vol_days for calculating volatility stock_data = stock_data.loc[stock_data["datetime"] <= vol_end_date] # Calculate historical volatility return get_historical_vol(stock_data, volatility_type)
[docs] def get_previous_trading_day(date: pd.Timestamp, n_days: int = 1) -> pd.Timestamp: """ Returns the timestamp of the n-th previous NYSE trading day before `date`. Args: date: A pd.Timestamp n_days: How many trading days to go back Returns: pd.Timestamp of the previous trading day """ nyse = mcal.get_calendar("NYSE") # Start with a wide enough window to guarantee coverage window_days = n_days + 15 # pad in case of holidays start = date - pd.Timedelta(days=window_days) end = date schedule = nyse.valid_days( start_date=start.strftime("%Y-%m-%d"), end_date=end.strftime("%Y-%m-%d") ) if len(schedule) < n_days + 1: raise ValueError(f"Not enough trading days available to go back {n_days} from {date}") return pd.Timestamp(schedule[-(n_days + 1)])
[docs] def get_rolling_volatility( reference_df: pd.DataFrame, root: str, interval_min: int = 20, return_type: str = "log", time_col: str = "datetime", dev_mode: bool = False, ) -> pd.Series: """ Computes realized volatility over a lookback window ending at each timestamp in reference_df[time_col], with diagnostics for missing data. """ from optrade.data.thetadata import load_stock_data reference_df = reference_df.copy() reference_df[time_col] = pd.to_datetime(reference_df[time_col]) reference_times = reference_df[time_col] # Get stock data covering enough history start_dt = get_previous_trading_day(reference_times.min(), n_days=10) end_dt = reference_times.max() start_date_str = start_dt.strftime("%Y%m%d") end_date_str = end_dt.strftime("%Y%m%d") stock_data = load_stock_data( root=root, start_date=start_date_str, end_date=end_date_str, interval_min=1, dev_mode=dev_mode, ) stock_data["datetime"] = pd.to_datetime(stock_data["datetime"]) stock_data["mid_price"] = (stock_data["bid"] + stock_data["ask"]) / 2 stock_data.set_index("datetime", inplace=True) out_vols = [] skipped_timestamps = [] short_windows = [] misaligned_timestamps = [] for t in reference_times: end_idx = stock_data.index.searchsorted(t) start_idx = end_idx - interval_min if start_idx < 0: skipped_timestamps.append(t) out_vols.append(np.nan) continue if end_idx > len(stock_data): misaligned_timestamps.append(t) out_vols.append(np.nan) continue price_window = stock_data.iloc[start_idx:end_idx]["mid_price"] if len(price_window) < 2: short_windows.append(t) out_vols.append(np.nan) continue returns = ( np.log(price_window).diff().dropna() if return_type == "log" else price_window.pct_change().dropna() ) out_vols.append(np.std(returns)) rolling_vol = pd.Series(out_vols, index=reference_df.index, name=f"realized_vol_{interval_min}min") rolling_vol = rolling_vol.interpolate(method="linear", limit_direction="both") # === Diagnostics total_nans = rolling_vol.isna().sum() if total_nans > 0: print(f"[VOL WARNING] {total_nans} NaNs in rolling_vol_{interval_min}min") if skipped_timestamps: print(f"[VOL DIAG] Skipped {len(skipped_timestamps)} timestamps due to insufficient lookback") if short_windows: print(f"[VOL DIAG] {len(short_windows)} windows had <2 prices") if misaligned_timestamps: print(f"[VOL DIAG] {len(misaligned_timestamps)} timestamps were outside stock_data index") return rolling_vol
# def get_rolling_volatility( # reference_df: pd.DataFrame, # root: str, # interval_min: int = 20, # return_type: str = "log", # time_col: str = "datetime", # dev_mode: bool = False, # ) -> pd.Series: # """ # Computes realized volatility over a lookback window ending at each timestamp in reference_df[time_col]. # Args: # reference_df: DataFrame containing timestamps (e.g., 15-min intervals for prediction) # root: Stock symbol (e.g., "AAPL") # interval_min: Length of lookback window in minutes # return_type: "log" or "simple" returns # time_col: Name of datetime column in reference_df # dev_mode: Pass through to load_stock_data for data loading control # Returns: # pd.Series of realized volatility values, aligned with reference_df index # """ # from optrade.data.thetadata import load_stock_data # reference_df = reference_df.copy() # reference_df[time_col] = pd.to_datetime(reference_df[time_col]) # reference_times = reference_df[time_col] # # Determine data range needed for computing volatility # start_dt = get_previous_trading_day(reference_times.min(), n_days=10) # end_dt = reference_times.max() # start_date_str = start_dt.strftime("%Y%m%d") # end_date_str = end_dt.strftime("%Y%m%d") # # Load high-frequency stock data (1-minute) # stock_data = load_stock_data( # root=root, # start_date=start_date_str, # end_date=end_date_str, # interval_min=1, # dev_mode=dev_mode, # ) # stock_data["datetime"] = pd.to_datetime(stock_data["datetime"]) # stock_data["mid_price"] = (stock_data["bid"] + stock_data["ask"]) / 2 # stock_data.set_index("datetime", inplace=True) # out_vols = [] # for t in reference_times: # end_idx = stock_data.index.searchsorted(t) # start_idx = end_idx - interval_min # if start_idx < 0: # out_vols.append(np.nan) # continue # price_window = stock_data.iloc[start_idx:end_idx]["mid_price"] # if len(price_window) < 2: # out_vols.append(np.nan) # continue # returns = ( # np.log(price_window).diff().dropna() # if return_type == "log" # else price_window.pct_change().dropna() # ) # out_vols.append(np.std(returns)) # rolling_vol = pd.Series(out_vols, index=reference_df.index, name=f"realized_vol_{interval_min}min") # return rolling_vol if __name__ == "__main__": # Example usage root = "AAPL" start_date = "20230101" end_date = "20230131" interval_min = 20 reference_df = load_stock_data( root=root, start_date=start_date, end_date=end_date, interval_min=interval_min, dev_mode=True ) vol_df = get_rolling_volatility( root=root, reference_df=reference_df, time_col="datetime", interval_min=600, return_type="log", ) print(vol_df.head()) print(vol_df.tail()) # Check if NaNs in vol_df if vol_df.isna().any(): print("NaN values found in vol_df") else: print("No NaN values in vol_df") print(reference_df.head()) print(reference_df.tail())