Source code for optrade.data.thetadata

import os
from pathlib import Path
import httpx
import csv
import pandas as pd
import numpy as np
from typing import Optional, Tuple
from datetime import datetime
from rich.console import Console

# Custom modules
from optrade.utils.directories import clean_up_dir
from optrade.utils.error_handlers import (
    DataValidationError,
    INCOMPATIBLE_START_DATE,
    INCOMPATIBLE_END_DATE,
    MISSING_DATES,
    UNKNOWN_ERROR,
)

SCRIPT_DIR = Path(__file__).resolve().parent
BASE_URL = "http://127.0.0.1:25510/v2"


[docs] def get_roots( sec: str = "option", save_dir: Optional[str] = None, clean_up: bool = False, offline: bool = False, dev_mode: bool = False, ) -> pd.DataFrame: """ Fetches all root symbols for a given security type. Args: sec (str): The security type. Options: 'option', 'stock', 'index'. save_dir (str): Directory to save the CSV file (default: current directory) clean_up (bool): Whether to clean up the CSV files after merging. If True, the CSV files are saved in a temp folder and then subsequently deleted before returning the df. offline (bool): Whether to work in offline mode, using previously saved data. dev_mode (bool): Whether to run in development mode. Returns: pd.DataFrame: The DataFrame containing the root symbols for the given security type. """ url = BASE_URL + f"/list/roots/{sec}" params = { "use_csv": "true", } # Set up directory structure working_dir = SCRIPT_DIR if dev_mode else Path.cwd() if save_dir is None: save_dir = working_dir / "historical_data" / "roots" else: save_dir = Path(save_dir) / "historical_data" / "roots" # If clean_up is True, save the CSVs in a temp folder, which will be deleted later if clean_up and not offline: temp_dir = working_dir / "temp" / "roots" save_dir = temp_dir sec_path = "all_" + sec save_dir = Path(save_dir) / sec_path save_dir.mkdir(parents=True, exist_ok=True) file_path = save_dir / "roots.csv" # If offline mode is enabled, read and return the merged data. This assumes data is already saved. if offline: try: return pd.read_csv(file_path) except FileNotFoundError: raise FileNotFoundError( f"No offline data found at {file_path}." "Please run with offline=False and clean_up=False first to download the required data." ) while url is not None: response = httpx.get(url, params=params, timeout=10) # make the request response.raise_for_status() # make sure the request worked # Parse CSV and create DataFrame, skipping empty rows csv_reader = csv.reader(response.text.split("\n")) rows = [row for row in csv_reader if row] # Skip empty rows # Create DataFrame with column name df = pd.DataFrame(rows[1:], columns=["root"]) # Skip header row and name column print(df.head()) # Save to CSV with appropriate mode if url == BASE_URL + f"/list/roots/{sec}": df.to_csv(file_path, index=False) # Save first batch as .csv else: df.to_csv( file_path, mode="a", header=False, index=False ) # Append subsequent batches # check the Next-Page header to see if we have more data if "Next-Page" in response.headers and response.headers["Next-Page"] != "null": url = response.headers["Next-Page"] params = None else: url = None # Load the CSV into pandas dataframe df = pd.read_csv(file_path) # Clean up the file if requested if clean_up: clean_up_dir(temp_dir) return df
[docs] def get_expirations( root: str, save_dir: str = ".", clean_up: bool = False, offline: bool = False, dev_mode: bool = False, ) -> pd.DataFrame: """ Fetch option expiration dates for a given root symbol and save to CSV. Args: root (str): The root symbol to get expirations for. save_dir (str): Directory to save the CSV file (default: current directory) clean_up (bool): Whether to clean up the CSV files after merging. If True, the CSV files are saved in a temp folder and then subsequently deleted before returning the df. offline (bool): Whether to work in offline mode, using previously saved data. dev_mode (bool): Whether to run in development mode. Returns: pd.DataFrame: The DataFrame containing the expiration dates for the given root symbol. """ url = BASE_URL + "/list/expirations" params = {"root": root} # Set up directory structure working_dir = SCRIPT_DIR if dev_mode else Path.cwd() if save_dir is None: save_dir = working_dir / "historical_data" / "roots" / "expirations" else: save_dir = Path(save_dir) / "historical_data" / "roots" / "expirations" # If clean_up is True, save the CSVs in a temp folder, which will be deleted later if clean_up and not offline: temp_dir = working_dir / "temp" / "expirations" save_dir = temp_dir save_dir = Path(save_dir) / root save_dir.mkdir(parents=True, exist_ok=True) file_path = save_dir / "expirations.csv" # If offline mode is enabled, read and return the merged data. This assumes data is already saved. if offline: try: return pd.read_csv(file_path) except FileNotFoundError: raise FileNotFoundError( f"No offline data found at {file_path}. " "Please run with offline=False and clean_up=False first to download the required data." ) while url is not None: # Make the request response = httpx.get(url, params=params, timeout=10) response.raise_for_status() # Parse the response as CSV csv_reader = csv.reader(response.text.split("\n")) rows = list(csv_reader) # Extract the data rows (skipping header rows) and format as a list of dictionaries data_rows = [ {"date": row[0]} for row in rows if len(row) > 0 and row[0].strip().isdigit() ] # Create DataFrame for this batch df = pd.DataFrame.from_records(data_rows) # Save to CSV with appropriate mode if url == BASE_URL + "/list/expirations": df.to_csv(file_path, index=False) # Save first batch as .csv else: df.to_csv( file_path, mode="a", header=False, index=False ) # Append subsequent batches # Check for next page if "Next-Page" in response.headers and response.headers["Next-Page"] != "null": url = response.headers["Next-Page"] params = None print(f"Paginating to {url}") else: url = None # Load the CSV into pandas dataframe df = pd.read_csv(file_path) # Clean up the file if requested if clean_up: clean_up_dir(temp_dir) return df
[docs] def get_strikes( root: str, exp: str, save_dir: str = ".", clean_up: bool = False, offline: bool = False, dev_mode: bool = False, ) -> pd.DataFrame: """ Fetch option strike prices for a given root symbol and expiration, saving to CSV. Args: root (str): The root symbol to get expirations for. exp (str): The expiration date to get strikes for. save_dir (str): Directory to save the CSV file (default: current directory) clean_up (bool): Whether to clean up the CSV files after merging. If True, the CSV files are saved in a temp folder and then subsequently deleted before returning the df. offline (bool): Whether to work in offline mode, using previously saved data. dev_mode (bool): Whether to run in development mode. Returns: pd.DataFrame: The DataFrame containing the strike prices for the given root and expiration. """ url = BASE_URL + "/list/strikes" params = {"root": root, "exp": exp} # Set up directory structure working_dir = SCRIPT_DIR if dev_mode else Path.cwd() if save_dir is None: save_dir = working_dir / "historical_data" / "roots" / "strikes" else: save_dir = Path(save_dir) / "historical_data" / "roots" / "strikes" # If clean_up is True, save the CSVs in a temp folder, which will be deleted later if clean_up and not offline: temp_dir = working_dir / "temp" / "strikes" save_dir = temp_dir save_dir = Path(save_dir) / root / exp save_dir.mkdir(parents=True, exist_ok=True) file_path = save_dir / "strikes.csv" # If offline mode is enabled, read and return the merged data. This assumes data is already saved. if offline: try: return pd.read_csv(file_path) except FileNotFoundError: raise FileNotFoundError( f"No offline data found at {file_path}. " "Please run with offline=False and clean_up=False first to download the required data." ) while url is not None: # Make the request response = httpx.get(url, params=params, timeout=10) response.raise_for_status() # Parse the response as CSV csv_reader = csv.reader(response.text.split("\n")) rows = list(csv_reader) # Extract the data rows (skipping header rows) and format as a list of dictionaries data_rows = [ {"strike": row[0]} for row in rows if len(row) > 0 and row[0].strip().isdigit() ] # Create DataFrame for this batch df = pd.DataFrame.from_records(data_rows) # Divide values by 1000 to convert to dollars and convert to float df["strike"] = df["strike"].astype(float) / 1000 # Save to CSV with appropriate mode if url == BASE_URL + "/list/strikes": df.to_csv(file_path, index=False) # Save first batch as .csv else: df.to_csv( file_path, mode="a", header=False, index=False ) # Append subsequent batches # Check for next page if "Next-Page" in response.headers and response.headers["Next-Page"] != "null": url = response.headers["Next-Page"] params = None print(f"Paginating to {url}") else: url = None # Load the CSV into pandas dataframe df = pd.read_csv(file_path) # Clean up the file if requested if clean_up: clean_up_dir(temp_dir) return df
[docs] def find_optimal_exp( root: str, start_date: str, target_tte: int, tte_tolerance: Tuple[int, int], clean_up: bool = False, dev_mode: bool = False, ) -> Tuple[Optional[str], Optional[int]]: """ Returns the closest valid TTE to target_tte within tolerance range and its expiration date. Args: root: The root symbol of the underlying security start_date: The start date in YYYYMMDD format target_tte: Desired days to expiry (e.g., 30) tte_tolerance: (min_tte, max_tte) acceptable range save_dir: Directory to save the data files. clean_up (bool): Whether to clean up the CSV files after merging. If True, the CSV files are saved in a temp folder and then subsequently deleted before returning the df. Returns: Tuple[str, int]: A tuple containing the optimal expiration date (in YYYYMMDD format) and the corresponding time-to-expiration in days. """ min_tte, max_tte = tte_tolerance # Get expirations and convert to list of strings expirations = get_expirations( root=root, clean_up=clean_up, dev_mode=dev_mode, ).values.squeeze() # Convert start_date to datetime start_dt = datetime.strptime(start_date, "%Y%m%d") # Calculate TTEs for each expiration valid_pairs = [] for exp in expirations: # Convert expiration to datetime exp_dt = datetime.strptime(str(exp), "%Y%m%d") # Calculate days to expiry tte = (exp_dt - start_dt).days # Check if TTE is within tolerance range if min_tte <= tte <= max_tte: valid_pairs.append((tte, exp)) # If we found valid TTEs, return the one closest to target if valid_pairs: # Sort by distance to target TTE optimal_tte, optimal_exp = min( valid_pairs, key=lambda x: abs(x[0] - target_tte) ) return str(optimal_exp), optimal_tte else: raise ValueError( f"No valid TTE found within tolerance range {tte_tolerance}. " "Please try a wider tolerance band." )
[docs] def load_stock_data( root: str, start_date: str, end_date: str, interval_min: int = 1, save_dir: Optional[str] = None, clean_up: bool = False, offline: bool = False, dev_mode: bool = False, ) -> pd.DataFrame: """ Gets historical quote-level data (NBBO) and OHLC (Open High Low Close) from ThetaData API for stocks across multiple exchanges, aggregated by interval_min (lowest resolution: 1min). .. note:: Data from OHLC ends at 15:59:00, while quote data ends at 16:00:00, so for simplicity we remove all rows with 16:00:00 in datetime from quote data, before merging quote and OHLC data. Args: root (str): The root symbol of the underlying security. start_date (str): The start date of the data in YYYYMMDD format. end_date (str): The end date of the data in YYYYMMDD format. interval_min (int): The interval in minutes between data points. save_dir (str): The directory to save the data. clean_up (bool): Whether to clean up the CSV files after merging. If True, the CSV files are saved in a temp folder and then subsequently deleted before returning the df. offline (bool): Whether to work in offline mode, using previously saved data. dev_mode (bool): Whether to run in development mode. Returns: pd.DataFrame: The merged NBBO quote and OHLCVC data. """ BASE_URL = "http://127.0.0.1:25510/v2" # all endpoints use this URL base intervals = str(interval_min * 60000) # Convert minutes to milliseconds ctx = Console() params = { "root": root, "start_date": start_date, "end_date": end_date, "use_csv": "true", "ivl": intervals, "venue": "utp_cta", # Merged UTP & CTA data } # Set up directory structure working_dir = SCRIPT_DIR if dev_mode else Path.cwd() if save_dir is None: save_dir = working_dir / "historical_data" / "stocks" else: save_dir = Path(save_dir) / "stocks" if clean_up and not offline: temp_dir = working_dir / "temp" / "stocks" save_dir = Path(temp_dir) save_dir = save_dir / root / f"{start_date}_{end_date}" save_dir.mkdir(parents=True, exist_ok=True) # Define file paths quote_file_path = save_dir / "quote.csv" ohlc_file_path = save_dir / "ohlc.csv" merged_file_path = save_dir / "merged.csv" # If offline mode is enabled, read and return the merged data. This assumes data is already saved. if offline: try: return pd.read_csv(merged_file_path, parse_dates=["datetime"]) except FileNotFoundError: raise FileNotFoundError( f"No offline data found at {merged_file_path}. " "Please run with offline=False and clean_up=False first to download the required data." ) # <-- Quote data --> quote_url = BASE_URL + "/hist/stock/quote" while quote_url is not None: quote_response = httpx.get( quote_url, params=params, timeout=1000 ) # make the request quote_response.raise_for_status() # make sure the request worked # read the entire quote_response, and parse it as CSV csv_reader = csv.reader(quote_response.text.split("\n")) # Convert to pandas dataframe. The first row are the column names quote_df = pd.DataFrame( list(csv_reader)[1:], # Skip the first row and use it as columns columns=next(csv.reader(quote_response.text.split("\n"))), ) # Use first row as column names # Convert to float64 numeric_columns = quote_df.columns.difference(["date"]) quote_df[numeric_columns] = quote_df[numeric_columns].astype("float64") # Create a datetime column in standard format in 'YYYY-MM-DD HH:MM:SS' (e.g., 2022-09-27 18:00:00) # First get time of day in HH:MM:SS format from ms_of_day time_of_day = pd.to_datetime( pd.to_numeric(quote_df["ms_of_day"]), unit="ms" ).dt.time # Then get the date in YYYY-MM-DD format date = pd.to_datetime(quote_df["date"], format="%Y%m%d").dt.strftime("%Y-%m-%d") # Then concatenate the date and time_of_day quote_df["datetime"] = pd.to_datetime(date + " " + time_of_day.astype(str)) # Remove date and ms_of_day (redundant information) quote_df = quote_df.drop(columns=["date", "ms_of_day"]) # Remove any rows with 16:00:00 in datetime, as OHLC data ends at 15:59:00 quote_df = quote_df[ quote_df["datetime"].dt.time != pd.to_datetime("16:00:00").time() ] # Make datetime the first column quote_df = quote_df.reindex( columns=["datetime"] + list(quote_df.columns.drop("datetime")) ) # Save with mode='a' (append) after the first write if quote_url == BASE_URL + "/hist/stock/quote": quote_df.to_csv(quote_file_path, index=False) # Save first batch as .csv. else: quote_df.to_csv( quote_file_path, mode="a", header=False, index=False ) # Update .csv for subsequent batches # check the Next-Page header to see if we have more data if ( "Next-Page" in quote_response.headers and quote_response.headers["Next-Page"] != "null" ): quote_url = quote_response.headers["Next-Page"] params = None ctx.log(f"Paginating to {quote_url}") else: quote_url = None # Redefine params for OHLC data (as it will set to None) params = { "root": root, "start_date": start_date, "end_date": end_date, "use_csv": "true", "ivl": intervals, "venue": "utp_cta", # Merged UTP & CTA data } # <-- OHLC data --> ohlc_url = BASE_URL + "/hist/stock/ohlc" while ohlc_url is not None: ohlc_response = httpx.get(ohlc_url, params=params, timeout=1000) ohlc_response.raise_for_status() # read the entire response, and parse it as CSV csv_reader = csv.reader(ohlc_response.text.split("\n")) ohlc_df = pd.DataFrame( list(csv_reader)[1:], columns=next(csv.reader(ohlc_response.text.split("\n"))), ) # Convert to float64 numeric_columns = ohlc_df.columns.difference(["date"]) ohlc_df[numeric_columns] = ohlc_df[numeric_columns].astype("float64") # Create datetime column time_of_day = pd.to_datetime( pd.to_numeric(ohlc_df["ms_of_day"]), unit="ms" ).dt.time date = pd.to_datetime(ohlc_df["date"], format="%Y%m%d").dt.strftime("%Y-%m-%d") ohlc_df["datetime"] = pd.to_datetime(date + " " + time_of_day.astype(str)) # Remove redundant columns ohlc_df = ohlc_df.drop(columns=["date", "ms_of_day"]) # Save with mode='a' (append) after the first write if ohlc_url == BASE_URL + "/hist/stock/ohlc": ohlc_df.to_csv(ohlc_file_path, index=False) # Save first batch as .csv else: ohlc_df.to_csv( ohlc_file_path, mode="a", header=False, index=False ) # Update .csv for subsequent batches # Check for next page if ( "Next-Page" in ohlc_response.headers and ohlc_response.headers["Next-Page"] != "null" ): ohlc_url = ohlc_response.headers["Next-Page"] params = None ctx.log(f"Paginating to {ohlc_url}") else: ohlc_url = None # Read CSVs with datetime parsing quote_df = pd.read_csv(quote_file_path, parse_dates=["datetime"]) ohlc_df = pd.read_csv(ohlc_file_path, parse_dates=["datetime"]) # Merge on datetime to ensure proper alignment merged_df = pd.merge(quote_df, ohlc_df, on="datetime", how="inner") # Remove any duplicate columns that might exist in both dataframes duplicate_cols = merged_df.columns.duplicated() merged_df = merged_df.loc[:, ~duplicate_cols] # Remove last row (NaN) merged_df = merged_df.dropna() # Calculate regular mid prices merged_df["mid_price"] = (merged_df["bid"] + merged_df["ask"]) / 2 # Clean up the entire temp_dir if clean_up: clean_up_dir(temp_dir) else: # Save merged data merged_df.to_csv(merged_file_path, index=False) return merged_df
[docs] def load_stock_data_eod( root: str, start_date: str, end_date: str, save_dir: Optional[str] = None, clean_up: bool = False, offline: bool = False, dev_mode: bool = False, ) -> pd.DataFrame: """ Gets historical End of Day (EOD) report from ThetaData API for stocks across multiple exchanges. Each report is generated around 17:15:00 ET and contain NBBO and OHLCVC data. Args: root (str): The root symbol of the underlying security. start_date (str): The start date of the data in YYYYMMDD format. end_date (str): The end date of the data in YYYYMMDD format. save_dir (str): The directory to save the data. clean_up (bool): Whether to clean up the CSV files after merging. If True, the CSV files are saved in a temp folder and then subsequently deleted before returning the df. offline (bool): Whether to work in offline mode, using previously saved data. dev_mode (bool): Whether to run in development mode. Returns: pd.DataFrame: The merged quote-level and OHLC data. """ BASE_URL = "http://127.0.0.1:25510/v2" # all endpoints use this URL base ctx = Console() params = { "root": root, "start_date": start_date, "end_date": end_date, "use_csv": "true", } # Set up directory structure working_dir = SCRIPT_DIR if dev_mode else Path.cwd() if save_dir is None: save_dir = working_dir / "historical_data" / "stocks" else: save_dir = Path(save_dir) / "stocks" if clean_up and not offline: temp_dir = working_dir / "temp" / "stocks" save_dir = Path(temp_dir) save_dir = save_dir / root / f"{start_date}_{end_date}" save_dir.mkdir(parents=True, exist_ok=True) # Define file paths eod_path = save_dir / "eod.csv" # If offline mode is enabled, read and return the merged data. This assumes data is already saved. if offline: try: return pd.read_csv(eod_path, parse_dates=["datetime"]) except FileNotFoundError: raise FileNotFoundError( f"No offline data found at {eod_path}." "Please run with offline=False and clean_up=False first to download the required data." ) # <-- Quote data --> eod_url = BASE_URL + "/hist/stock/eod" while eod_url is not None: eod_response = httpx.get( eod_url, params=params, timeout=1000 ) # make the request eod_response.raise_for_status() # make sure the request worked # read the entire quote_response, and parse it as CSV csv_reader = csv.reader(eod_response.text.split("\n")) # Convert to pandas dataframe. The first row are the column names eod_df = pd.DataFrame( list(csv_reader)[1:], # Skip the first row and use it as columns columns=next(csv.reader(eod_response.text.split("\n"))), ) # Use first row as column names # Convert to float64 numeric_columns = eod_df.columns.difference(["date"]) eod_df[numeric_columns] = eod_df[numeric_columns].astype("float64") # Create a datetime column in standard format in 'YYYY-MM-DD HH:MM:SS' (e.g., 2022-09-27 18:00:00) # First get time of day in HH:MM:SS format from ms_of_day time_of_day = pd.to_datetime( pd.to_numeric(eod_df["ms_of_day"]), unit="ms" ).dt.time # Then get the date in YYYY-MM-DD format date = pd.to_datetime(eod_df["date"], format="%Y%m%d").dt.strftime("%Y-%m-%d") # Create the date_time_str column first eod_df["date_time_str"] = date + " " + time_of_day.astype(str) # Then parse it with format='mixed' to handle different time formats eod_df["datetime"] = pd.to_datetime(eod_df["date_time_str"], format="mixed") # Remove the temporary column and other redundant columns eod_df = eod_df.drop( columns=["date_time_str", "date", "ms_of_day", "ms_of_day2"] ) # Make datetime the first column eod_df = eod_df.reindex( columns=["datetime"] + list(eod_df.columns.drop("datetime")) ) # Save with mode='a' (append) after the first write if eod_url == BASE_URL + "/hist/stock/eod": eod_df.to_csv(eod_path, index=False) # Save first batch as .csv. else: eod_df.to_csv( eod_path, mode="a", header=False, index=False ) # Update .csv for subsequent batches # check the Next-Page header to see if we have more data if ( "Next-Page" in eod_response.headers and eod_response.headers["Next-Page"] != "null" ): eod_url = eod_response.headers["Next-Page"] params = None ctx.log(f"Paginating to {eod_url}") else: eod_url = None # Remove last row (NaN) eod_df = pd.read_csv(eod_path, parse_dates=["datetime"]).dropna() # Calculate regular mid prices eod_df["mid_price"] = (eod_df["bid"] + eod_df["ask"]) / 2 # Clean up the entire temp_dir if clean_up: clean_up_dir(temp_dir) else: # Save merged data eod_df.to_csv(eod_path, index=False) return eod_df
[docs] def find_optimal_strike( root: str, start_date: str, exp: str, right: str, interval_min: int, moneyness: str, strike_band: Optional[float] = 0.05, volatility_scaled: bool = False, hist_vol: Optional[float] = None, volatility_scalar: Optional[float] = 1.0, clean_up: bool = False, offline: bool = False, deterministic: Optional[ bool ] = True, # TODO: Implement deterministic algorithm or random selection dev_mode: bool = False, ) -> Tuple[float, str]: """ Finds the optimal strike price for option return forecasting, prioritizing strikes that are likely to provide meaningful price movement data. Args: root: The root symbol of the option start_date: The start date in YYYYMMDD format exp: The expiration date in YYYYMMDD format right: Option type - "C" for call or "P" for put interval_min: The interval in minutes between data points (the resolution of the data). moneyness: Desired moneyness - "OTM", "ITM", or "ATM" strike_band: Base percentage distance from current price for strike selection volatility_scaled: Whether to adjust strike_band based on historical volatility hist_vol: Historical volatility to use for scaling strike_band (required if volatility_scaled=True). volatility_scalar: The number of standard deviations to scale the strike_band by. clean_up (bool): Whether to clean up the CSV files after merging. If True, the CSV files are saved in a temp folder and then subsequently deleted before returning the df. offline (bool): Whether to work in offline mode, using previously saved data. deterministic: Use deterministic algorithm for strike selection (True by default, stochastic mode not yet implemented). dev_mode: Whether to run in development mode (True) or production mode (False). Returns: float: The optimal strike price for option return forecasting based on the specified criteria. """ # Get current price and available strikes try: stock_data = load_stock_data( root=root, start_date=start_date, end_date=start_date, interval_min=interval_min, clean_up=clean_up, offline=offline, dev_mode=dev_mode, ) except: # Shift start_date by 1 day if no data is found new_start_date = ( pd.to_datetime(start_date, format="%Y%m%d") + pd.Timedelta(days=1) ).strftime("%Y%m%d") stock_data = load_stock_data( root=root, start_date=new_start_date, end_date=new_start_date, interval_min=interval_min, clean_up=clean_up, offline=offline, dev_mode=dev_mode, ) print(f"Stock data not found for {start_date}, shifting to {new_start_date}") # Get the average midprice for the day to use as the current price current_price = stock_data["mid_price"].mean() # Get all available strikes for a given expiration strikes = get_strikes( root=root, exp=exp, clean_up=clean_up, offline=offline ).values.squeeze() # Calculate the target strike band if moneyness in ["ITM", "OTM"]: # print(f"Volatility scalar: {volatility_scalar}. Historical volatility: {hist_vol}") # Get historical prices and calculate volatility if volatility_scaled: assert hist_vol is not None, "Historical volatility must be provided" assert volatility_scalar is not None, "Volatility scalar must be provided" # Scale target band based on volatility scaled_vol = volatility_scalar * hist_vol # (SD) * (num_SDs) strike_band = np.array( [ current_price - current_price * scaled_vol, current_price + current_price * scaled_vol, ] ) else: strike_band = np.array( [ current_price - strike_band * current_price, current_price + strike_band * current_price, ] ) # Calculate target strike based on moneyness. Find closest strike to target band if right == "C": if moneyness == "OTM": optimal_strike = strikes[np.argmin(np.abs(strikes - strike_band[0]))] elif moneyness == "ITM": optimal_strike = strikes[np.argmin(np.abs(strikes - strike_band[1]))] elif moneyness == "ATM": optimal_strike = strikes[np.argmin(np.abs(strikes - current_price))] else: raise ValueError(f"Invalid moneyness: {moneyness}") else: # Put options if moneyness == "OTM": optimal_strike = strikes[np.argmin(np.abs(strikes - strike_band[1]))] elif moneyness == "ITM": optimal_strike = strikes[np.argmin(np.abs(strikes - strike_band[0]))] elif moneyness == "ATM": optimal_strike = strikes[np.argmin(np.abs(strikes - current_price))] else: raise ValueError(f"Invalid moneyness: {moneyness}") return optimal_strike
[docs] def load_option_data( root: str, start_date: str, end_date: str, exp: str, strike: float, interval_min: int, right: str, save_dir: Optional[str] = None, clean_up: bool = False, offline: bool = False, count_ohlc_zeros: bool = False, dev_mode: bool = False, ) -> pd.DataFrame: """ Gets historical quote-level data (NBBO) and OHLC (Open High Low Close) from ThetaData API for options across multiple exchanges, aggregated by interval_min (lowest resolution: 1min). .. note:: Data from OHLC ends at 15:59:00, while quote data ends at 16:00:00, so for simplicity we remove all rows with 16:00:00 in datetime from quote data, before merging quote and OHLC data. Args: root (str): The root symbol of the underlying security. start_date (str): The start date of the data in YYYYMMDD format. end_date (str): The end date of the data in YYYYMMDD format. exp (Optional[str]): The expiration date of the option in YYYYMMDD format. strike (int): The strike price of the option in dollars. interval_min (int): The interval in minutes between data points. right (str): The type of option, either 'C' for call or 'P' for put. save_dir (str): The directory to save the data. clean_up (bool): Whether to clean up the CSV files after merging. If True, the CSV files are saved in a temp folder and then subsequently deleted before returning the df. offline (bool): Whether to work in offline mode, using previously saved data. count_ohlc_zeros (bool): Whether to count the proportion of zero values in OHLC transactions data. dev_mode (bool): Whether to run in development mode. Returns: pd.DataFrame: Merged DataFrame containing quote-level (NBBO) and OHLC data for the specified option. """ ctx = Console() BASE_URL = "http://127.0.0.1:25510/v2" # all endpoints use this URL base intervals = str(interval_min * 60000) # Convert minutes to milliseconds strike = str(int(strike * 1000)) # Converts dollars to 1/10th cents params = { "root": root, "exp": exp, "strike": strike, "right": right, "start_date": start_date, "end_date": end_date, "use_csv": "true", "ivl": intervals, } # Set up directory structure working_dir = SCRIPT_DIR if dev_mode else Path.cwd() if save_dir is None: save_dir = working_dir / "historical_data" / "options" else: save_dir = Path(save_dir) / "options" if clean_up and not offline: temp_dir = working_dir / "temp" / "options" save_dir = Path(temp_dir) # Set up directory structure save_dir = ( save_dir / root / right / f"{start_date}_{end_date}" / f"{strike}strike_{exp}exp" ) save_dir.mkdir(parents=True, exist_ok=True) # Define file paths quote_file_path = save_dir / "quote.csv" ohlc_file_path = save_dir / "ohlc.csv" merged_file_path = save_dir / "merged.csv" # If offline mode is enabled, read and return the merged data. This assumes data is already saved. if offline: try: return pd.read_csv(merged_file_path, parse_dates=["datetime"]) except FileNotFoundError: raise FileNotFoundError( f"No offline data found at {merged_file_path}. " "Please run with offline=False and clean_up=False first to download the required data." ) # <-- Quote data --> quote_url = BASE_URL + "/hist/option/quote" while quote_url is not None: quote_response = httpx.get( quote_url, params=params, timeout=1000 ) # make the request quote_response.raise_for_status() # make sure the request worked # read the entire quote_response, and parse it as CSV csv_reader = csv.reader(quote_response.text.split("\n")) # Convert to pandas dataframe (eliminate [0,1,2,...] row of indices from ThetaData) quote_df = pd.DataFrame( list(csv_reader)[1:], # Skip the first row and use it as columns columns=next(csv.reader(quote_response.text.split("\n"))), ) # Use first row as column names # Convert to float64 numeric_columns = quote_df.columns.difference(["date"]) quote_df[numeric_columns] = quote_df[numeric_columns].astype("float64") # Create a datetime column in standard format in 'YYYY-MM-DD HH:MM:SS' (e.g., 2022-09-27 18:00:00) # First get time of day in HH:MM:SS format from ms_of_day time_of_day = pd.to_datetime( pd.to_numeric(quote_df["ms_of_day"]), unit="ms" ).dt.time # Then get the date in YYYY-MM-DD format date = pd.to_datetime(quote_df["date"], format="%Y%m%d").dt.strftime("%Y-%m-%d") # Then concatenate the date and time_of_day quote_df["datetime"] = pd.to_datetime(date + " " + time_of_day.astype(str)) # Remove date and ms_of_day (redundant information) quote_df = quote_df.drop(columns=["date", "ms_of_day"]) # Remove any rows with 16:00:00 in datetime, as OHLC data ends at 15:59:00 quote_df = quote_df[ quote_df["datetime"].dt.time != pd.to_datetime("16:00:00").time() ] # Make datetime the first column quote_df = quote_df[ ["datetime"] + [col for col in quote_df.columns if col != "datetime"] ] # Save with mode='a' (append) after the first write if quote_url == BASE_URL + "/hist/option/quote": quote_df.to_csv(quote_file_path, index=False) # Save first batch as .csv else: quote_df.to_csv( quote_file_path, mode="a", header=False, index=False ) # Update .csv for subsequent batches # check the Next-Page header to see if we have more data if ( "Next-Page" in quote_response.headers and quote_response.headers["Next-Page"] != "null" ): quote_url = quote_response.headers["Next-Page"] params = None ctx.log(f"Paginating to {quote_url}") else: quote_url = None # Redefine params for OHLC data (as it will set to None) params = { "root": root, "exp": exp, "strike": strike, "right": right, "start_date": start_date, "end_date": end_date, "use_csv": "true", "ivl": intervals, } # <-- OHLC data --> ohlc_url = BASE_URL + "/hist/option/ohlc" while ohlc_url is not None: ohlc_response = httpx.get(ohlc_url, params=params, timeout=1000) ohlc_response.raise_for_status() # read the entire response, and parse it as CSV csv_reader = csv.reader(ohlc_response.text.split("\n")) ohlc_df = pd.DataFrame( list(csv_reader)[1:], columns=next(csv.reader(ohlc_response.text.split("\n"))), ) # Convert to float64 numeric_columns = ohlc_df.columns.difference(["date"]) ohlc_df[numeric_columns] = ohlc_df[numeric_columns].astype("float64") # Create datetime column time_of_day = pd.to_datetime( pd.to_numeric(ohlc_df["ms_of_day"]), unit="ms" ).dt.time date = pd.to_datetime(ohlc_df["date"], format="%Y%m%d").dt.strftime("%Y-%m-%d") ohlc_df["datetime"] = pd.to_datetime(date + " " + time_of_day.astype(str)) # Remove redundant columns ohlc_df = ohlc_df.drop(columns=["date", "ms_of_day"]) # Save with mode='a' (append) after the first write if ohlc_url == BASE_URL + "/hist/option/ohlc": ohlc_df.to_csv(ohlc_file_path, index=False) # Save first batch as .csv else: ohlc_df.to_csv( ohlc_file_path, mode="a", header=False, index=False ) # Update .csv for subsequent batches # Check for next page if ( "Next-Page" in ohlc_response.headers and ohlc_response.headers["Next-Page"] != "null" ): ohlc_url = ohlc_response.headers["Next-Page"] params = None ctx.log(f"Paginating to {ohlc_url}") else: ohlc_url = None # Read CSVs with datetime parsing quote_df = pd.read_csv(quote_file_path, parse_dates=["datetime"]) ohlc_df = pd.read_csv(ohlc_file_path, parse_dates=["datetime"]) # Merge on datetime to ensure proper alignment merged_df = pd.merge(quote_df, ohlc_df, on="datetime", how="inner") # Remove any duplicate columns that might exist in both dataframes duplicate_cols = merged_df.columns.duplicated() merged_df = merged_df.loc[:, ~duplicate_cols] # Remove last row (NaN) merged_df = merged_df.dropna() # Calculate regular mid prices merged_df["mid_price"] = (merged_df["bid"] + merged_df["ask"]) / 2 # Convert datetime to pandas datetime if it isn't already if not pd.api.types.is_datetime64_ns_dtype(merged_df["datetime"]): merged_df["datetime"] = pd.to_datetime(merged_df["datetime"]) # Handle market open midprices (9:30 AM) separately using OHLC data market_open_mask = ( (merged_df["datetime"].dt.hour == 9) & (merged_df["datetime"].dt.minute == 30) & (merged_df["datetime"].dt.second == 0) ) merged_df.loc[market_open_mask, "mid_price"] = merged_df.loc[ market_open_mask, ["open", "close"] ].mean(axis=1) # Catch any remaining zeroes at market open if open and close are also zero # by backfilling with the next non-zero value zero_mask = merged_df["mid_price"] == 0 zero_indices = zero_mask[zero_mask].index # For each zero, take the next non-zero value for idx in zero_indices: # Get next index next_idx = idx + 1 # Use the next value to fill the zero merged_df.loc[idx, "mid_price"] = merged_df.loc[next_idx, "mid_price"] # Verify fix remaining_zeros = merged_df[merged_df["mid_price"] == 0] if not remaining_zeros.empty: ctx.log("Still have zeros at:", remaining_zeros.index) if count_ohlc_zeros: # Check proportion of zeros for open and close (do not backfill/interpolate these) zero_mask_open = merged_df["open"] == 0 ctx.log( f"Proportion of zeros in the open: {zero_mask_open.sum() / len(merged_df):.2f}" ) zero_mask_close = merged_df["close"] == 0 ctx.log( f"Proportion of zeros in the close: {zero_mask_close.sum() / len(merged_df):.2f}" ) # ctx.log proportion of zeros to total dates zero_mask_high = merged_df["high"] == 0 ctx.log( f"Proportion of zeros in the high: {zero_mask_high.sum() / len(merged_df):.2f}" ) zero_mask_low = merged_df["low"] == 0 ctx.log( f"Proportion of zeros in the low: {zero_mask_low.sum() / len(merged_df):.2f}" ) # Clean up the entire temp_dir if clean_up: clean_up_dir(temp_dir) else: # Save merged data merged_df.to_csv(merged_file_path, index=False) return merged_df
[docs] def load_all_data( root: str, start_date: str, exp: str, interval_min: int, right: str, strike: float, save_dir: Optional[str] = None, clean_up: bool = False, offline: bool = False, warning: bool = False, dev_mode: bool = False, ) -> pd.DataFrame: """ Gets historical quote-level data (NBBO) and OHLC (Open High Low Close) from ThetaData API for combined stocks and options across multiple exchanges, aggregated by interval_min (lowest resolution: 1min). .. note:: Data from OHLC ends at 15:59:00, while quote data ends at 16:00:00, so for simplicity we remove all rows with 16:00:00 in datetime from quote data, before merging quote and OHLC data. Args: root (str): The root symbol of the underlying security. start_date (str): The start date of the data in YYYYMMDD format. exp (str): The expiration date of the option in YYYYMMDD format. interval_min (int): The interval in minutes between data points. right (str): The type of option, either 'C' for call or 'P' for put. strike (float): The strike price of the option in dollars. save_dir (str): The directory to save the data. clean_up (bool): Whether to clean up the CSV files after merging. If True, the CSV files are saved in a temp folder and then subsequently deleted before returning the df. offline (bool): Whether to use offline (already saved) data instead of calling ThetaData API directly (default: False). dev_mode (bool): Whether to run in development mode. Returns: DataFrame: The combined quote-level and OHLC data for an option and the underlying, """ # Set up directory structure working_dir = SCRIPT_DIR if dev_mode else Path.cwd() if save_dir is None: save_dir = working_dir / "historical_data" / "combined" options_dir = working_dir / "historical_data" / "options" stocks_dir = working_dir / "historical_data" / "stocks" else: save_dir = Path(save_dir) / "combined" options_dir = Path(save_dir) / "options" stocks_dir = Path(save_dir) / "stocks" if clean_up and not offline: temp_dir = working_dir / "temp" / "combined" save_dir = Path(temp_dir) # Set up directory structure save_dir = ( save_dir / root / right / f"{start_date}_{exp}" / f"{strike}strike_{exp}exp" ) save_dir.mkdir(parents=True, exist_ok=True) # Define file paths combined_file_path = save_dir / "combined.csv" # If offline mode is enabled, read and return the combined data. This assumes data is already saved. if offline: try: return pd.read_csv(combined_file_path, parse_dates=["datetime"]) except FileNotFoundError: raise FileNotFoundError( f"No offline data found at {combined_file_path}." "Please run with offline=False and clean_up=False first to download the required data." ) option_df = load_option_data( root=root, start_date=start_date, end_date=exp, exp=exp, strike=strike, interval_min=interval_min, right=right, save_dir=options_dir, clean_up=clean_up, offline=offline, dev_mode=dev_mode, ) stock_df = load_stock_data( root=root, start_date=start_date, end_date=exp, interval_min=interval_min, save_dir=stocks_dir, clean_up=clean_up, offline=offline, dev_mode=dev_mode, ) # Rename columns of options df according to the dictionary base_columns = [ "datetime", "mid_price", "bid_size", "bid_exchange", "bid", "bid_condition", "ask_size", "ask_exchange", "ask", "ask_condition", "open", "high", "low", "close", "volume", "count", ] option_columns = {col: "option_" + col for col in base_columns if col != "datetime"} option_df = option_df.rename(columns=option_columns) # Same for stocks stock_columns = {col: "stock_" + col for col in base_columns if col != "datetime"} stock_df = stock_df.rename(columns=stock_columns) # Check lengths of option_df and stock_df match exactly if len(option_df) != len(stock_df): real_start_date = option_df["datetime"].iloc[0] stock_filtered_start = stock_df[ stock_df["datetime"] >= real_start_date ].reset_index(drop=True) real_end_date = option_df["datetime"].iloc[-1] stock_filtered = stock_filtered_start[ stock_filtered_start["datetime"] <= real_end_date ].reset_index(drop=True) # Verify if option_df starts earlier than stock_df if ( option_df["datetime"].to_list() == stock_filtered_start["datetime"].to_list() ): real_start_date = real_start_date.strftime("%Y%m%d") queried_start_date = stock_df["datetime"].iloc[0].strftime("%Y%m%d") raise DataValidationError( message=f"Option data queried on start_date={queried_start_date}, but the contract doesn't start until start_date={real_start_date}.", error_code=INCOMPATIBLE_START_DATE, real_start_date=real_start_date, warning=warning, ) # Otherwise verify if option_df starts earlier AND later than stock_df elif option_df["datetime"].to_list() == stock_filtered["datetime"].to_list(): real_start_date = real_start_date.strftime("%Y%m%d") real_end_date = real_end_date.strftime("%Y%m%d") queried_start_date = stock_df["datetime"].iloc[0].strftime("%Y%m%d") queried_end_date = stock_df["datetime"].iloc[-1].strftime("%Y%m%d") raise DataValidationError( message=f"Option data queried on start_date={queried_end_date} with exp={queried_end_date}, but the contract starts on start_date={real_start_date} and ends on {real_end_date}.", error_code=INCOMPATIBLE_END_DATE, real_start_date=real_start_date, real_end_date=real_end_date, warning=warning, ) # Otherwise verify if option_df is a continuous subset of stock data (i.e. missing dates in between start_date and end_date) elif set(option_df["datetime"]).issubset(set(stock_filtered["datetime"])): option_dates_set = set(option_df["datetime"]) stock_dates_set = set(stock_filtered["datetime"]) # Find dates in option data that don't exist in stock data missing_in_stock = option_dates_set - stock_dates_set # Find dates in stock data that don't exist in option data missing_in_option = stock_dates_set - option_dates_set error_message = "" if missing_in_stock: error_message += f"\nDates in option data missing from stock data: {sorted(missing_in_stock)[:3]}" if len(missing_in_stock) > 3: error_message += f" ...{sorted(missing_in_option)[-3:-1]}. Total number of missing dates: {len(missing_in_stock)}." if missing_in_option: error_message += f"\nDates in stock data missing from option data: {sorted(missing_in_option)[:3]}" if len(missing_in_option) > 3: error_message += f" ...{sorted(missing_in_option)[-3:-1]}. Total number of missing dates: {len(missing_in_option)}." raise DataValidationError( message=error_message + "This discrepancy is likely due to anomalous market events.", error_code=MISSING_DATES, warning=warning, ) else: raise DataValidationError( message=f"Unknown error. Filtering stock data by start_date={real_start_date} and end_date={real_end_date} did not resolve the issue, and neither is option dates a subset of stock dates.", error_code=UNKNOWN_ERROR, warning=warning, ) # Merge the two dataframes and save df = pd.merge(option_df, stock_df, on="datetime") # Clean up temp files if clean_up: clean_up_dir(temp_dir) else: # Save combined data df.to_csv(combined_file_path, index=False) return df
if __name__ == "__main__": # Test: get_strikes, get_expirations, get_roots from rich.console import Console console = Console() clean_up = False offline = False df1 = get_strikes( exp="20240419", root="MSFT", clean_up=clean_up, offline=offline, dev_mode=True ) df2 = get_expirations( root="AAPL", clean_up=clean_up, offline=offline, dev_mode=True ) df3 = get_roots(clean_up=clean_up, offline=offline, dev_mode=True) console.log(df1) console.log(df2) console.log(df3) # # Test: find_optimal_exp # try: # optimal_exp, optimal_tte = find_optimal_exp( # root="AAPL", start_date="20241107", tte_tolerance=(20, 40), target_tte=37 # ) # print(f"Found valid TTE: {optimal_tte}") # print(f"Expiration date: {optimal_exp}") # except ValueError as e: # print(f"Error: {e}") # # Test: find_optimal_strike # optimal_strike = find_optimal_strike( # root="AAPL", # # start_date="20241107", # # exp="20241206", # start_date="20230113", # exp="20230209", # right="C", # interval_min=1, # moneyness="ITM", # strike_band=0.10, # hist_vol=0.20, # volatility_scaled=True, # volatility_scalar=2.0, # clean_up=True, # offline=False, # dev_mode=True, # ) # # 20230113 and expiration: 20230209 # from rich.console import Console # console = Console() # console.log( # f"Optimal strike of {optimal_strike} found successfully!", style="bold green" # ) # # Test: load_stock_data # df = load_stock_data( # root="AAPL", # start_date="20230101", # end_date="20230231", # clean_up=False, # offline=False, # dev_mode=True, # ) # print(df.head()) # df = load_stock_data_eod( # root="BALL", # start_date="20230101", # end_date="20230231", # clean_up=False, # offline=False, # dev_mode=True, # ) # print(df) # # Test: load_option_data # df = load_option_data( # root="MSFT", # start_date="20240105", # end_date="20240205", # right="C", # exp="20240419", # strike=400, # interval_min=1, # clean_up=False, # offline=False, # dev_mode=True, # ) # print(df.head()) # # Test: load_all_data # df = load_all_data( # root="AAPL", # start_date="20241107", # exp="20250117", # interval_min=1, # right="C", # strike=225, # clean_up=False, # offline=False, # dev_mode=True, # ) # print(df.head())