Skip to content

Instantly share code, notes, and snippets.

@alexvorndran alexvorndran/hurst_exponent.py Secret
Created Jul 17, 2019

Embed
What would you like to do?
import timeit
import numpy as np
from numba import jit
def hurst(sig: np.ndarray) -> float:
"""This is (almost) the original version of the code posted at Code Review
(see the question https://codereview.stackexchange.com/q/224360/)
(C) CC-BY-SA Matthew Anderson (https://codereview.stackexchange.com/users/204532/)
"""
n = sig.size # num timesteps
t = np.arange(1, n+1)
y = sig.cumsum() # marginally more efficient than: np.cumsum(sig)
mean_t = y / t # running mean
# this is more efficient than:
s_t = np.empty(n) # s_t = np.zeros(n)
r_t = np.empty(n) # r_t = np.zeros(n)
for i in range(n):
s_t[i] = np.std(sig[:i+1])
x_t = y - t * mean_t[i]
r_t[i] = np.ptp(x_t[:i+1])
r_s = r_t / s_t
r_s = np.log(r_s)[1:]
n = np.log(t)[1:]
a = np.column_stack((n, np.ones(n.size)))
# see https://docs.scipy.org/doc/numpy/reference/generated/numpy.linalg.lstsq.html
# for an explanation why I did set rcond=-1
hurst_exponent, _ = np.linalg.lstsq(a, r_s, rcond=-1)[0]
return hurst_exponent
@jit(nopython=True)
def hurst_numba(sig):
n = sig.size # num timesteps
t = np.arange(1, n+1)
y = sig.cumsum() # marginally more efficient than: np.cumsum(sig)
mean_t = y / t # running mean
# this is more efficient than:
s_t = np.empty(n) # s_t = np.zeros(n)
r_t = np.empty(n) # r_t = np.zeros(n)
for i in range(n):
s_t[i] = np.std(sig[:i+1])
x_t = y[:i+1] - t[:i+1] * mean_t[i]
r_t[i] = np.ptp(x_t)
r_s = r_t / s_t
r_s = np.log(r_s)[1:]
n = np.log(t)[1:]
a = np.column_stack((n, np.ones(n.size)))
hurst_exponent, _ = np.linalg.lstsq(a, r_s, rcond=-1)[0]
return hurst_exponent
def hurst_vect_loop(sig):
n = sig.size # num timesteps
t = np.arange(1, n+1)
y = sig.cumsum() # marginally more efficient than: np.cumsum(sig)
mean_t = y / t # running mean
s_t = np.empty(n)
r_t = np.empty(n)
# x_t_v = y.reshape(1, -1) - mean_t.reshape(-1, 1) @ t.reshape(1, -1)
for i in range(n):
s_t[i] = np.mean((sig[:i+1] - mean_t[i])**2)
r_t[i] = np.ptp(y[:i+1] - t[:i+1] * mean_t[i])
s_t = np.sqrt(s_t)
r_s = r_t / s_t
r_s = np.log(r_s)[1:]
n = np.log(t)[1:]
a = np.column_stack((n, np.ones(n.size)))
hurst_exponent, _ = np.linalg.lstsq(a, r_s, rcond=-1)[0]
return hurst_exponent
def hurst_vect_dbl_lc(sig):
n = sig.size # num timesteps
t = np.arange(1, n+1)
y = sig.cumsum() # marginally more efficient than: np.cumsum(sig)
mean_t = y / t # running mean
s_t = np.sqrt(
np.array([np.mean((sig[:i+1] - mean_t[i])**2) for i in range(n)])
)
r_t = np.array([np.ptp(y[:i+1] - t[:i+1] * mean_t[i]) for i in range(n)])
r_s = r_t / s_t
r_s = np.log(r_s)[1:]
n = np.log(t)[1:]
a = np.column_stack((n, np.ones(n.size)))
hurst_exponent, _ = np.linalg.lstsq(a, r_s, rcond=-1)[0]
return hurst_exponent
@jit(nopython=True)
def hurst_vect_loop_numba(sig):
n = sig.size # num timesteps
t = np.arange(1, n + 1)
y = sig.cumsum() # marginally more efficient than: np.cumsum(sig)
mean_t = y / t # running mean
s_t = np.empty(n)
r_t = np.empty(n)
for i in range(n):
s_t[i] = np.mean((sig[:i + 1] - mean_t[i])**2)
r_t[i] = np.ptp(y[:i + 1] - t[:i + 1] * mean_t[i])
s_t = np.sqrt(s_t)
r_s = r_t / s_t
r_s = np.log(r_s)[1:]
n = np.log(t)[1:]
a = np.column_stack((n, np.ones(n.size)))
hurst_exponent, _ = np.linalg.lstsq(a, r_s, rcond=-1)[0]
return hurst_exponent
@jit(nopython=True)
def hurst_vect_dbl_lc_numba(sig):
n = sig.size # num timesteps
t = np.arange(1, n+1)
y = sig.cumsum() # marginally more efficient than: np.cumsum(sig)
mean_t = y / t # running mean
s_t = np.sqrt(
np.array([np.mean((sig[:i+1] - mean_t[i])**2) for i in range(n)])
)
r_t = np.array([np.ptp(y[:i+1] - t[:i+1] * mean_t[i]) for i in range(n)])
r_s = r_t / s_t
r_s = np.log(r_s)[1:]
n = np.log(t)[1:]
a = np.column_stack((n, np.ones(n.size)))
hurst_exponent, _ = np.linalg.lstsq(a, r_s, rcond=-1)[0]
return hurst_exponent
def hurst_vect_lc(sig):
n = sig.size # num timesteps
t = np.arange(1, n+1)
y = sig.cumsum() # marginally more efficient than: np.cumsum(sig)
mean_t = y / t # running mean
rs_t = np.array([
[
np.mean((sig[:i+1] - mean_t[i])**2),
np.ptp(y[:i+1] - t[:i+1] * mean_t[i])
] for i in range(n)
])
rs_t[:, 0] = np.sqrt(rs_t[:, 0])
r_s = rs_t[:, 1] / rs_t[:, 0]
r_s = np.log(r_s)[1:]
n = np.log(t)[1:]
a = np.column_stack((n, np.ones(n.size)))
hurst_exponent, _ = np.linalg.lstsq(a, r_s, rcond=-1)[0]
return hurst_exponent
@jit(nopython=True)
def hurst_vect_lc_numba(sig):
n = sig.size # num timesteps
t = np.arange(1, n+1)
y = sig.cumsum() # marginally more efficient than: np.cumsum(sig)
mean_t = y / t # running mean
rs_t = np.array([
[
np.sqrt(np.mean((sig[:i+1] - mean_t[i])**2)),
np.ptp(y[:i+1] - t[:i+1] * mean_t[i])
] for i in range(n)
])
# rs_t[t-1, 0] = np.sqrt(rs_t[t-1, 0])
r_s = np.empty(n)
for i in range(n):
r_s[i] = rs_t[i, 1] / rs_t[i, 0]
r_s = np.log(r_s)[1:]
n = np.log(t)[1:]
a = np.column_stack((n, np.ones(n.size)))
hurst_exponent, _ = np.linalg.lstsq(a, r_s, rcond=-1)[0]
return hurst_exponent
def time_hurst(fun, signal, n_runs=10, dry_run=False,
aggregator=lambda x: sum(x) / len(x)):
"""Timing fun using signal over several runs.
Allows to enable a dry run, to allow numba to compile the function without
affecting the benchmark. The timings are feed to an aggregator function,
where the default is average.
"""
if dry_run:
# this is to allow numba to compile the functions and other funny stuff
_ = fun(signal)
timings = []
for _ in range(n_runs):
t_start = timeit.default_timer()
exponent = fun(signal)
timings.append(timeit.default_timer() - t_start)
return aggregator(timings), exponent
def run_benchmark(to_benchmark, n_samples, show_exponent=True):
"""Benchmark the different versions against each other"""
np.random.seed(224360)
signal = np.random.random_sample((n_samples, ))
n_runs = 10
for label, variant in to_benchmark:
timing, exponent = time_hurst(variant, signal,
n_runs=n_runs, dry_run=True)
print(f"{label}: {timing:.6f}s")
if show_exponent:
print(exponent)
if __name__ == "__main__":
import warnings
with warnings.catch_warnings():
warnings.simplefilter("ignore")
run_benchmark(
(("base", hurst),
("better_loop", hurst_vect_loop),
("double_lc", hurst_vect_dbl_lc),
("single_lc", hurst_vect_lc)),
n_samples=15360,
show_exponent=False
)
run_benchmark(
(("numba", hurst_numba),
("better_loop_numba", hurst_vect_loop_numba),
("double_lc_numba", hurst_vect_dbl_lc_numba)),
#("single_lc_numba", hurst_vect_lc_numba)),
n_samples=15360,
show_exponent=False
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.