Skip to content

Instantly share code, notes, and snippets.

@geovedi
Created November 29, 2023 06:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save geovedi/613b27a57adec2bc2a606c83bad2ebde to your computer and use it in GitHub Desktop.
Save geovedi/613b27a57adec2bc2a606c83bad2ebde to your computer and use it in GitHub Desktop.
How to make rolling hurst function faster — TLDR: Use Numba!
import pandas as pd
import numpy as np
from typing import Union, Callable
def generate_ohlcv_data(
start_date: str, end_date: str, freq: str = "D", seed: int = None
) -> pd.DataFrame:
if seed is not None:
np.random.seed(seed)
dates = pd.date_range(start=start_date, end=end_date, freq=freq)
open_prices = np.random.rand(len(dates)) * 100 # Random open prices
close_prices = (
open_prices + np.random.randn(len(dates)) * 10
) # Close prices near open
high_prices = np.maximum(open_prices, close_prices) + np.random.rand(len(dates)) * 5
low_prices = np.minimum(open_prices, close_prices) - np.random.rand(len(dates)) * 5
volume = np.random.randint(100, 10000, len(dates)) # Random volume
data = {
"open": open_prices,
"high": high_prices,
"low": low_prices,
"close": close_prices,
"volume": volume,
}
return pd.DataFrame(data, index=dates)
# smidelis --
# https://discord.com/channels/700048804539400213/785934927011119124/1174743574953340940
def smidelis_hurst(ts, max_lag=20):
"""Compute the Hurst Exponent of the time series vector ts"""
lags = range(2, max_lag)
tau = [np.std(np.subtract(ts[lag:], ts[:-lag])) for lag in lags]
return np.polyfit(np.log(lags), np.log(tau), 1)[0]
def smidelis_rolling_hurst(dataframe, window=128, max_lag=20, column="close"):
"""Compute rolling Hurst Exponent"""
hurst_exp = (
dataframe[column]
.rolling(window=window)
.apply(lambda x: smidelis_hurst(x, max_lag=max_lag), raw=True)
)
return hurst_exp
# basic
def basic_hurst(ts: np.ndarray, max_lag: int = 20) -> float:
lags = np.arange(2, max_lag)
tau = [np.std(ts[lag:] - ts[:-lag]) for lag in lags]
tau_filtered = np.array(tau)[np.array(tau) > 0]
lags_filtered = lags[: len(tau_filtered)]
if len(tau_filtered) == 0 or np.any(tau_filtered <= 0):
return np.nan
return np.polyfit(np.log(lags_filtered), np.log(tau_filtered), 1)[0]
def basic_rolling_hurst(
dataframe: pd.DataFrame, window: int = 128, max_lag: int = 20, column: str = "close"
) -> pd.Series:
series = dataframe[column]
result = np.full(len(series), np.nan)
for start in range(len(series) - window + 1):
window_ts = series.iloc[start : start + window]
if (
not window_ts.isna().any() and np.ptp(window_ts) > 0
): # Check for NaN and non-zero range
result[start + window - 1] = basic_hurst(window_ts.values, max_lag)
return pd.Series(result, index=series.index)
# test_01
def test_01_hurst(ts: np.ndarray, max_lag: int = 20) -> float:
lags = np.arange(2, max_lag)
tau = [np.std(np.subtract(ts[lag:], ts[:-lag])) for lag in lags]
return np.polyfit(np.log(lags), np.log(tau), 1)[0]
def test_01_rolling_hurst(
dataframe: pd.DataFrame, window: int = 128, max_lag: int = 20, column: str = "close"
) -> np.ndarray:
stride_size = dataframe[column].values.strides[0]
rolling_views = np.lib.stride_tricks.as_strided(
dataframe[column].values,
shape=(dataframe[column].size - window + 1, window),
strides=(stride_size, stride_size),
)
hurst_exp = np.apply_along_axis(test_01_hurst, 1, rolling_views, max_lag=max_lag)
return pd.Series(
np.concatenate([np.full(window - 1, np.nan), hurst_exp]), index=dataframe.index
)
# test_02
def test_02_rolling_apply(df: pd.Series, window: int, func: Callable) -> np.ndarray:
result = np.full(df.size, np.nan)
for start in range(df.size - window + 1):
window_data = df.iloc[start : start + window].values
if not np.any(np.isnan(window_data)):
result[start + window - 1] = func(window_data)
return result
def test_02_rolling_hurst(
dataframe: pd.DataFrame, window: int = 128, max_lag: int = 20, column: str = "close"
) -> pd.Series:
hurst_exp = test_02_rolling_apply(
dataframe[column], window, lambda x: test_01_hurst(x, max_lag)
)
return pd.Series(hurst_exp, index=dataframe.index)
# test_03
def test_03_rolling_hurst(
dataframe: pd.DataFrame, window: int = 128, max_lag: int = 20, column: str = "close"
) -> pd.Series:
def apply_hurst_on_window(window_data: np.ndarray) -> float:
if not np.isnan(window_data).any():
return test_01_hurst(window_data, max_lag=max_lag)
else:
return np.nan
hurst_exp = (
dataframe[column]
.rolling(window=window, min_periods=window)
.apply(apply_hurst_on_window, raw=True)
)
return hurst_exp
# optimized with numba
from numba import jit
@jit(nopython=True)
def fast_hurst(ts, max_lag, precomputed_log_lags):
tau = np.empty(max_lag - 2)
for idx, lag in enumerate(range(2, max_lag)):
lagged_diff = ts[lag:] - ts[:-lag]
tau[idx] = np.std(lagged_diff)
valid_tau = tau[tau > 0]
if len(valid_tau) == 0:
return np.nan
# Simple linear regression calculation -- Numba doesn't support all NumPy functions
n = len(valid_tau)
sum_x = precomputed_log_lags[:n].sum()
sum_y = np.log(valid_tau).sum()
sum_xy = (precomputed_log_lags[:n] * np.log(valid_tau)).sum()
sum_x_squared = (precomputed_log_lags[:n] ** 2).sum()
m = (n * sum_xy - sum_x * sum_y) / (n * sum_x_squared - sum_x**2)
return m
def optimized_rolling_hurst(
dataframe: pd.DataFrame, window: int = 128, max_lag: int = 20, column: str = "close"
) -> pd.Series:
series = dataframe[column].fillna(method="ffill")
result = np.full(len(series), np.nan)
precomputed_log_lags = np.log(np.arange(2, max_lag))
for start in range(len(series) - window + 1):
window_ts = series.values[start : start + window]
if np.ptp(window_ts) > 0:
result[start + window - 1] = fast_hurst(
window_ts, max_lag, precomputed_log_lags
)
return pd.Series(result, index=dataframe.index)
def compare_series(series1, series2, tol=1e-5):
return np.isclose(series1, series2, equal_nan=True, atol=tol).all()
"""
df = generate_ohlcv_data('2021-01-01', '2021-12-31', 'D')
x0 = smidelis_rolling_hurst(df)
x1 = test_01_rolling_hurst(df)
x2 = test_02_rolling_hurst(df)
x3 = test_03_rolling_hurst(df)
x4 = optimized_rolling_hurst(df)
list(map(compare_series, [x0, x1, x2, x3, x4], [x0, x0, x0, x0, x0]))
In [195]: %timeit smidelis_rolling_hurst(df)
33.7 ms ± 24.2 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
In [196]: %timeit test_01_rolling_hurst(df)
32.8 ms ± 45.5 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
In [197]: %timeit test_02_rolling_hurst(df)
36.6 ms ± 39.9 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
In [198]: %timeit test_03_rolling_hurst(df)
33.7 ms ± 48.3 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
In [199]: %timeit optimized_rolling_hurst(df)
1.9 ms ± 3.63 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment