import pandas as pd
import numpy as np
from typing import Optional, List
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from datetime import datetime
from optrade.utils.volatility import get_rolling_volatility
from py_vollib.black_scholes.implied_volatility import implied_volatility
from py_vollib.black_scholes.greeks.analytical import delta, gamma, vega, theta, rho
[docs]
def dt_features(
df: pd.DataFrame,
feats: List[str],
dt_col: Optional[str] = "datetime",
market_open_time: Optional[str] = "09:30:00",
market_close_time: Optional[str] = "16:00:00",
) -> pd.DataFrame:
"""Generates datetime features for options.
Args:
df: DataFrame containing a datetime column.
feats: List of datetime features to generate. Options include:
- minute_of_day: Minute of trading day (0-389 for standard session)
- sin_minute_of_day: Sine transformation of time of day (continuous circular feature)
- cos_minute_of_day: Cosine transformation of time of day (continuous circular feature)
- day_of_week: Day of week (0=Monday, 4=Friday)
- hour_of_week: Hour position in trading week as proportion (0.0-1.0)
- sin_hour_of_week: Sine transformation of hour of week (continuous circular feature)
- cos_hour_of_week: Cosine transformation of hour of week (continuous circular feature)
dt_col: Name of datetime column. If None, will attempt to detect it.
Defaults to datetime.
market_open_time: Market open time in HH:MM:SS format.
Defaults to 09:30:00.
market_close_time: Market close time in HH:MM:SS format.
Defaults to 16:00:00.
Returns:
Original DataFrame with additional datetime feature columns, prefixed with dt\\_.
Examples:
Basic usage:
>>> import pandas as pd
>>> data = pd.DataFrame({
... "datetime": pd.date_range("2023-01-02 09:30:00", periods=5, freq="1min")
... })
>>> feats = ["minute_of_day", "day_of_week"]
>>> result = dt_features(data, feats)
>>> result.columns
Index(['datetime', 'dt_minute_of_day', 'dt_day_of_week'], dtype='object')
Using custom datetime column name:
>>> data = pd.DataFrame({
... "timestamp": pd.date_range("2023-01-02 09:30:00", periods=5, freq="1min")
... })
>>> result = dt_features(data, feats, dt_col="timestamp")
>>> result.columns
Index(['timestamp', 'dt_minute_of_day', 'dt_day_of_week'], dtype='object')
"""
# Create a copy to avoid modifying the original
result_df = df.copy()
# Find the datetime column if not specified
if dt_col is None:
if "datetime" in df.columns:
dt_col = "datetime"
else:
# Look for any datetime64 column
datetime_cols = df.select_dtypes(include=["datetime64"]).columns
if len(datetime_cols) > 0:
dt_col = datetime_cols[0]
else:
# As a fallback, look for any column with a name containing "date" or "time"
time_related_cols = [
col
for col in df.columns
if "date" in col.lower() or "time" in col.lower()
]
if time_related_cols:
dt_col = time_related_cols[0]
else:
raise ValueError("Could not find a datetime column")
# Ensure column is datetime type
if not pd.api.types.is_datetime64_any_dtype(df[dt_col]):
result_df[dt_col] = pd.to_datetime(df[dt_col], errors="coerce")
# Parse market hours
market_open = pd.to_datetime(market_open_time).time()
market_close = pd.to_datetime(market_close_time).time()
# Convert times to minutes
def time_to_minutes(t):
return t.hour * 60 + t.minute
open_minutes = time_to_minutes(market_open)
close_minutes = time_to_minutes(market_close)
trading_minutes_per_day = close_minutes - open_minutes
if "minute_of_day" in feats:
time_minutes = result_df[dt_col].dt.hour * 60 + result_df[dt_col].dt.minute
# Normalize to trading day (0 = market open)
result_df["dt_minute_of_day"] = (time_minutes - open_minutes).astype("float64")
# Cyclic time encoding - continuous through the trading day
# Scale from market open to market close
if "sin_minute_of_day" in feats or "cos_minute_of_day" in feats:
time_minutes = result_df[dt_col].dt.hour * 60 + result_df[dt_col].dt.minute
# Normalize to [0, 2π] across trading day
normalized_time = (
2 * np.pi * (time_minutes - open_minutes) / trading_minutes_per_day
)
if "sin_minute_of_day" in feats:
result_df["dt_sin_minute_of_day"] = np.sin(normalized_time).astype(
"float64"
)
if "cos_minute_of_day" in feats:
result_df["dt_cos_minute_of_day"] = np.cos(normalized_time).astype(
"float64"
)
if "day_of_week" in feats:
result_df["dt_day_of_week"] = result_df[dt_col].dt.day_of_week.astype("float64")
# Hour of week features - considering a 5-day trading week
if any(
f in feats for f in ["hour_of_week", "sin_hour_of_week", "cos_hour_of_week"]
):
# Calculate total trading hours in a week (5 trading days)
trading_hours_per_day = trading_minutes_per_day / 60
total_trading_hours_per_week = 5 * trading_hours_per_day
# Get day of week (0=Monday, 4=Friday)
dow = result_df[dt_col].dt.day_of_week
# Calculate hours elapsed in the week for each timestamp
# First, calculate full days completed
hours_from_completed_days = dow * trading_hours_per_day
# Then add hours elapsed in the current day
time_minutes = result_df[dt_col].dt.hour * 60 + result_df[dt_col].dt.minute
# Only count minutes during market hours
market_minutes = np.maximum(
0, np.minimum(time_minutes - open_minutes, trading_minutes_per_day)
)
hours_from_current_day = market_minutes / 60
# Total hours elapsed in the trading week
hours_elapsed = hours_from_completed_days + hours_from_current_day
if "hour_of_week" in feats:
# Normalize to [0, 1] across the trading week
result_df["dt_hour_of_week"] = (
hours_elapsed / total_trading_hours_per_week
).astype("float64")
if "sin_hour_of_week" in feats or "cos_hour_of_week" in feats:
# Normalize to [0, 2π] across the trading week
normalized_week_time = (
2 * np.pi * hours_elapsed / total_trading_hours_per_week
)
if "sin_hour_of_week" in feats:
result_df["dt_sin_hour_of_week"] = np.sin(normalized_week_time).astype(
"float64"
)
if "cos_hour_of_week" in feats:
result_df["dt_cos_hour_of_week"] = np.cos(normalized_week_time).astype(
"float64"
)
return result_df
[docs]
def tte_features(
df: pd.DataFrame,
feats: List[str],
exp: str,
) -> pd.DataFrame:
"""
Generate Time to Expiration (TTE) features for a given DataFrame.
Args:
df (pd.DataFrame): DataFrame containing datetime column in format "YYYY-MM-DD HH:MM:SS".
The function will try to identify a datetime column if not explicitly named "datetime".
feats (List): List of features to generate. Options include:
- "linear": raw TTE in minutes
- "inverse": 1/TTE (in minutes)
- "sqrt": √(TTE minutes)
- "inverse_sqrt": 1/√(TTE minutes)
- "exp_decay": exp(-TTE/contract_length)
exp (str): The expiration date of the option in YYYYMMDD format. The expiration time
is assumed to be 16:30 (4:30 PM) on the expiration date.
Returns:
pd.DataFrame: The original DataFrame with additional TTE feature columns. Each requested
feature will be added with a prefix "tte\\_" (e.g., "tte\\_inverse").
All TTE features are guaranteed to be float64 type.
"""
if feats == []:
return df
# Create a copy to avoid modifying the original
result_df = df.copy()
# Convert expiration date string to datetime
exp_date = datetime.strptime(exp, "%Y%m%d")
# Set expiration time to 4:30 PM on expiration date
exp_datetime = exp_date.replace(hour=16, minute=30, second=0)
# Find the datetime column - standardized approach
# First check if "datetime" column exists
if "datetime" in df.columns:
dt_col = "datetime"
else:
# Look for any datetime64 column
datetime_cols = df.select_dtypes(include=["datetime64"]).columns
if len(datetime_cols) > 0:
dt_col = datetime_cols[0]
else:
# As a fallback, look for any column with a name containing "date" or "time"
time_related_cols = [
col
for col in df.columns
if "date" in col.lower() or "time" in col.lower()
]
if time_related_cols:
dt_col = time_related_cols[0]
else:
raise ValueError("Could not find a datetime column")
# Ensure column is datetime type - using pandas" robust datetime conversion
if not pd.api.types.is_datetime64_any_dtype(df[dt_col]):
result_df[dt_col] = pd.to_datetime(df[dt_col], errors="coerce")
# Calculate TTE in minutes as float64
result_df["tte_minutes"] = (
exp_datetime - result_df[dt_col]
).dt.total_seconds().astype("float64") / 60
# Calculate maximum TTE (contract length in minutes)
contract_length = result_df["tte_minutes"].max()
# Generate requested features
if "tte" in feats or "all" in feats:
# Linear TTE (raw minutes)
result_df["tte"] = result_df["tte_minutes"].astype("float64")
if "inverse" in feats or "all" in feats:
# Inverse TTE (1/minutes)
# Handle potential division by zero with np.inf handling
result_df["tte_inverse"] = np.where(
result_df["tte_minutes"] > 0, 1 / result_df["tte_minutes"], np.inf
).astype("float64")
if "sqrt" in feats or "all" in feats:
# Square root of TTE
result_df["tte_sqrt"] = np.sqrt(result_df["tte_minutes"]).astype("float64")
if "inverse_sqrt" in feats or "all" in feats:
# Inverse square root of TTE
# Handle potential division by zero
result_df["tte_inverse_sqrt"] = np.where(
result_df["tte_minutes"] > 0, 1 / np.sqrt(result_df["tte_minutes"]), np.inf
).astype("float64")
if "exp_decay" in feats or "all" in feats:
# Exponential decay with lambda = 1/contract_length
result_df["tte_exp_decay"] = np.exp(
-result_df["tte_minutes"] / contract_length
).astype("float64")
# Remove intermediate calculation if not requested
if "linear" not in feats and "all" not in feats:
result_df = result_df.drop("tte_minutes", axis=1)
else:
# If we"re keeping tte_minutes, ensure it"s float64
result_df["tte_minutes"] = result_df["tte_minutes"].astype("float64")
return result_df
[docs]
def get_volatility_features(
df: pd.DataFrame,
feats: List[str],
root: str,
right: str,
risk_free_rate: float = 0.045,
rolling_volatility_range: Optional[List[int]] = None,
) -> pd.DataFrame:
"""
Computes volatility features from stock and option data.
Args:
df: DataFrame with required columns
feats: List of feature names to compute
r: Risk-free rate
short_window: Lookback for short-term realized vol
long_window: Lookback for long-term realized vol
return_type: 'log' or 'simple' returns
Returns:
DataFrame with new volatility features
"""
df = df.copy()
# Stock-level volatility
if "rolling_volatility" in feats or "vol_ratio" in feats:
assert rolling_volatility_range is not None, "rolling_volatility_range is required for rolling_volatility or vol_ratio features"
for interval_min in rolling_volatility_range:
rolling_vol = get_rolling_volatility(reference_df=df, root=root, interval_min=interval_min)
df[f"rolling_volatility_{interval_min}min"] = rolling_vol
if "vol_ratio" in feats:
# Calculate the ratio of short-term to long-term volatility
short_window = min(rolling_volatility_range)
long_window = max(rolling_volatility_range)
df[f"vol_ratio_{short_window}min_to_{long_window}min"] = (
df[f"rolling_volatility_{short_window}min"]
/ df[f"rolling_volatility_{long_window}min"]
)
# Implied volatility and the Greeks
compute_iv = "implied_volatility" in feats
compute_greeks = any(f in feats for f in ["delta", "gamma", "vega", "theta", "rho"])
if compute_iv or compute_greeks:
ivs = []
deltas, gammas, vegas, thetas, rhos = [], [], [], [], []
for idx, row in df.iterrows():
try:
S = row["stock_mid_price"]
K = row["strike"]
t = row["tte_minutes"] / (365 * 24 * 60)
price = row["option_mid_price"]
flag = "c" if right == "C" else "p"
iv = implied_volatility(price, S, K, t, risk_free_rate, flag)
ivs.append(iv)
if "delta" in feats:
deltas.append(delta(flag, S, K, t, risk_free_rate, iv))
if "gamma" in feats:
gammas.append(gamma(flag, S, K, t, risk_free_rate, iv))
if "vega" in feats:
vegas.append(vega(flag, S, K, t, risk_free_rate, iv))
if "theta" in feats:
thetas.append(theta(flag, S, K, t, risk_free_rate, iv))
if "rho" in feats:
rhos.append(rho(flag, S, K, t, risk_free_rate, iv))
except Exception:
ivs.append(np.nan)
if "delta" in feats: deltas.append(np.nan)
if "gamma" in feats: gammas.append(np.nan)
if "vega" in feats: vegas.append(np.nan)
if "theta" in feats: thetas.append(np.nan)
if "rho" in feats: rhos.append(np.nan)
if "implied_volatility" in feats:
df["implied_volatility"] = ivs
if "delta" in feats:
df["delta"] = deltas
if "gamma" in feats:
df["gamma"] = gammas
if "vega" in feats:
df["vega"] = vegas
if "theta" in feats:
df["theta"] = thetas
if "rho" in feats:
df["rho"] = rhos
return df
if __name__ == "__main__":
# # Test: get_volatility_features
# from rich.console import Console
# from optrade.data.contracts import Contract
# ctx = Console()
# root = "AAPL"
# start_date = "20230103"
# right = "C"
# target_tte = 30
# tte_tolerance = (15, 40)
# moneyness = "ATM"
# interval_min = 20
# contract = Contract.find_optimal(
# root=root,
# start_date=start_date,
# target_tte=target_tte,
# right=right,
# tte_tolerance=tte_tolerance,
# moneyness=moneyness,
# interval_min=interval_min,
# )
# df = contract.load_data(
# dev_mode=True,
# offline=True,
# )
# vol_feats = [
# "vol_ratio",
# "rolling_volatility",
# ]
# rolling_volatility_range = [20, 60]
# df = get_volatility_features(
# df=df,
# feats=vol_feats,
# root=root,
# right=right,
# rolling_volatility_range=rolling_volatility_range,
# )
# ctx.log(df.head())
# # Filter df for only the vol_features
# short_window = rolling_volatility_range[0]
# long_window = rolling_volatility_range[1]
# vol_features = [
# "rolling_volatility_20min",
# "rolling_volatility_60min",
# f"vol_ratio_{short_window}min_to_{long_window}min"
# ]
# df_filtered = df[vol_features]
# ctx.log(df_filtered.head())
# Test: transform_features
from rich.console import Console
from optrade.data.contracts import Contract
ctx = Console()
root = "AAPL"
start_date = "20230103"
right = "C"
target_tte = 30
tte_tolerance = (15, 40)
moneyness = "ATM"
interval_min = 20
contract = Contract.find_optimal(
root=root,
start_date=start_date,
target_tte=target_tte,
right=right,
tte_tolerance=tte_tolerance,
moneyness=moneyness,
interval_min=interval_min,
)
df = contract.load_data(
dev_mode=True,
offline=True,
)
ctx.log(df.head())
df = transform_features(
df=df,
core_feats=[
"option_mid_price",
"option_bid_size",
"option_bid",
"option_ask_size",
"option_close",
"option_volume",
"option_count",
"stock_mid_price",
"stock_bid_size",
"stock_bid",
"stock_ask_size",
"stock_ask",
"stock_volume",
"stock_count",
],
tte_feats=["sqrt", "exp_decay"],
datetime_feats=["sin_minute_of_day", "cos_minute_of_day"],
vol_feats=["rolling_volatility", "vol_ratio"],
strike=contract.strike,
exp=contract.exp,
root=root,
right=right,
rolling_volatility_range=[20, 60],
)
# for each column in df print the head out
for col in df.columns:
ctx.log(f"{col}: {df[col].head()}")