-
-
Save alexvorndran/aad69fa741e579aad093608ccaab4fe1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import timeit | |
import numpy as np | |
from numba import jit | |
def hurst(sig: np.ndarray) -> float: | |
"""This is (almost) the original version of the code posted at Code Review | |
(see the question https://codereview.stackexchange.com/q/224360/) | |
(C) CC-BY-SA Matthew Anderson (https://codereview.stackexchange.com/users/204532/) | |
""" | |
n = sig.size # num timesteps | |
t = np.arange(1, n+1) | |
y = sig.cumsum() # marginally more efficient than: np.cumsum(sig) | |
mean_t = y / t # running mean | |
# this is more efficient than: | |
s_t = np.empty(n) # s_t = np.zeros(n) | |
r_t = np.empty(n) # r_t = np.zeros(n) | |
for i in range(n): | |
s_t[i] = np.std(sig[:i+1]) | |
x_t = y - t * mean_t[i] | |
r_t[i] = np.ptp(x_t[:i+1]) | |
r_s = r_t / s_t | |
r_s = np.log(r_s)[1:] | |
n = np.log(t)[1:] | |
a = np.column_stack((n, np.ones(n.size))) | |
# see https://docs.scipy.org/doc/numpy/reference/generated/numpy.linalg.lstsq.html | |
# for an explanation why I did set rcond=-1 | |
hurst_exponent, _ = np.linalg.lstsq(a, r_s, rcond=-1)[0] | |
return hurst_exponent | |
@jit(nopython=True) | |
def hurst_numba(sig): | |
n = sig.size # num timesteps | |
t = np.arange(1, n+1) | |
y = sig.cumsum() # marginally more efficient than: np.cumsum(sig) | |
mean_t = y / t # running mean | |
# this is more efficient than: | |
s_t = np.empty(n) # s_t = np.zeros(n) | |
r_t = np.empty(n) # r_t = np.zeros(n) | |
for i in range(n): | |
s_t[i] = np.std(sig[:i+1]) | |
x_t = y[:i+1] - t[:i+1] * mean_t[i] | |
r_t[i] = np.ptp(x_t) | |
r_s = r_t / s_t | |
r_s = np.log(r_s)[1:] | |
n = np.log(t)[1:] | |
a = np.column_stack((n, np.ones(n.size))) | |
hurst_exponent, _ = np.linalg.lstsq(a, r_s, rcond=-1)[0] | |
return hurst_exponent | |
def hurst_vect_loop(sig): | |
n = sig.size # num timesteps | |
t = np.arange(1, n+1) | |
y = sig.cumsum() # marginally more efficient than: np.cumsum(sig) | |
mean_t = y / t # running mean | |
s_t = np.empty(n) | |
r_t = np.empty(n) | |
# x_t_v = y.reshape(1, -1) - mean_t.reshape(-1, 1) @ t.reshape(1, -1) | |
for i in range(n): | |
s_t[i] = np.mean((sig[:i+1] - mean_t[i])**2) | |
r_t[i] = np.ptp(y[:i+1] - t[:i+1] * mean_t[i]) | |
s_t = np.sqrt(s_t) | |
r_s = r_t / s_t | |
r_s = np.log(r_s)[1:] | |
n = np.log(t)[1:] | |
a = np.column_stack((n, np.ones(n.size))) | |
hurst_exponent, _ = np.linalg.lstsq(a, r_s, rcond=-1)[0] | |
return hurst_exponent | |
def hurst_vect_dbl_lc(sig): | |
n = sig.size # num timesteps | |
t = np.arange(1, n+1) | |
y = sig.cumsum() # marginally more efficient than: np.cumsum(sig) | |
mean_t = y / t # running mean | |
s_t = np.sqrt( | |
np.array([np.mean((sig[:i+1] - mean_t[i])**2) for i in range(n)]) | |
) | |
r_t = np.array([np.ptp(y[:i+1] - t[:i+1] * mean_t[i]) for i in range(n)]) | |
r_s = r_t / s_t | |
r_s = np.log(r_s)[1:] | |
n = np.log(t)[1:] | |
a = np.column_stack((n, np.ones(n.size))) | |
hurst_exponent, _ = np.linalg.lstsq(a, r_s, rcond=-1)[0] | |
return hurst_exponent | |
@jit(nopython=True) | |
def hurst_vect_loop_numba(sig): | |
n = sig.size # num timesteps | |
t = np.arange(1, n + 1) | |
y = sig.cumsum() # marginally more efficient than: np.cumsum(sig) | |
mean_t = y / t # running mean | |
s_t = np.empty(n) | |
r_t = np.empty(n) | |
for i in range(n): | |
s_t[i] = np.mean((sig[:i + 1] - mean_t[i])**2) | |
r_t[i] = np.ptp(y[:i + 1] - t[:i + 1] * mean_t[i]) | |
s_t = np.sqrt(s_t) | |
r_s = r_t / s_t | |
r_s = np.log(r_s)[1:] | |
n = np.log(t)[1:] | |
a = np.column_stack((n, np.ones(n.size))) | |
hurst_exponent, _ = np.linalg.lstsq(a, r_s, rcond=-1)[0] | |
return hurst_exponent | |
@jit(nopython=True) | |
def hurst_vect_dbl_lc_numba(sig): | |
n = sig.size # num timesteps | |
t = np.arange(1, n+1) | |
y = sig.cumsum() # marginally more efficient than: np.cumsum(sig) | |
mean_t = y / t # running mean | |
s_t = np.sqrt( | |
np.array([np.mean((sig[:i+1] - mean_t[i])**2) for i in range(n)]) | |
) | |
r_t = np.array([np.ptp(y[:i+1] - t[:i+1] * mean_t[i]) for i in range(n)]) | |
r_s = r_t / s_t | |
r_s = np.log(r_s)[1:] | |
n = np.log(t)[1:] | |
a = np.column_stack((n, np.ones(n.size))) | |
hurst_exponent, _ = np.linalg.lstsq(a, r_s, rcond=-1)[0] | |
return hurst_exponent | |
def hurst_vect_lc(sig): | |
n = sig.size # num timesteps | |
t = np.arange(1, n+1) | |
y = sig.cumsum() # marginally more efficient than: np.cumsum(sig) | |
mean_t = y / t # running mean | |
rs_t = np.array([ | |
[ | |
np.mean((sig[:i+1] - mean_t[i])**2), | |
np.ptp(y[:i+1] - t[:i+1] * mean_t[i]) | |
] for i in range(n) | |
]) | |
rs_t[:, 0] = np.sqrt(rs_t[:, 0]) | |
r_s = rs_t[:, 1] / rs_t[:, 0] | |
r_s = np.log(r_s)[1:] | |
n = np.log(t)[1:] | |
a = np.column_stack((n, np.ones(n.size))) | |
hurst_exponent, _ = np.linalg.lstsq(a, r_s, rcond=-1)[0] | |
return hurst_exponent | |
@jit(nopython=True) | |
def hurst_vect_lc_numba(sig): | |
n = sig.size # num timesteps | |
t = np.arange(1, n+1) | |
y = sig.cumsum() # marginally more efficient than: np.cumsum(sig) | |
mean_t = y / t # running mean | |
rs_t = np.array([ | |
[ | |
np.sqrt(np.mean((sig[:i+1] - mean_t[i])**2)), | |
np.ptp(y[:i+1] - t[:i+1] * mean_t[i]) | |
] for i in range(n) | |
]) | |
# rs_t[t-1, 0] = np.sqrt(rs_t[t-1, 0]) | |
r_s = np.empty(n) | |
for i in range(n): | |
r_s[i] = rs_t[i, 1] / rs_t[i, 0] | |
r_s = np.log(r_s)[1:] | |
n = np.log(t)[1:] | |
a = np.column_stack((n, np.ones(n.size))) | |
hurst_exponent, _ = np.linalg.lstsq(a, r_s, rcond=-1)[0] | |
return hurst_exponent | |
def time_hurst(fun, signal, n_runs=10, dry_run=False, | |
aggregator=lambda x: sum(x) / len(x)): | |
"""Timing fun using signal over several runs. | |
Allows to enable a dry run, to allow numba to compile the function without | |
affecting the benchmark. The timings are feed to an aggregator function, | |
where the default is average. | |
""" | |
if dry_run: | |
# this is to allow numba to compile the functions and other funny stuff | |
_ = fun(signal) | |
timings = [] | |
for _ in range(n_runs): | |
t_start = timeit.default_timer() | |
exponent = fun(signal) | |
timings.append(timeit.default_timer() - t_start) | |
return aggregator(timings), exponent | |
def run_benchmark(to_benchmark, n_samples, show_exponent=True): | |
"""Benchmark the different versions against each other""" | |
np.random.seed(224360) | |
signal = np.random.random_sample((n_samples, )) | |
n_runs = 10 | |
for label, variant in to_benchmark: | |
timing, exponent = time_hurst(variant, signal, | |
n_runs=n_runs, dry_run=True) | |
print(f"{label}: {timing:.6f}s") | |
if show_exponent: | |
print(exponent) | |
if __name__ == "__main__": | |
import warnings | |
with warnings.catch_warnings(): | |
warnings.simplefilter("ignore") | |
run_benchmark( | |
(("base", hurst), | |
("better_loop", hurst_vect_loop), | |
("double_lc", hurst_vect_dbl_lc), | |
("single_lc", hurst_vect_lc)), | |
n_samples=15360, | |
show_exponent=False | |
) | |
run_benchmark( | |
(("numba", hurst_numba), | |
("better_loop_numba", hurst_vect_loop_numba), | |
("double_lc_numba", hurst_vect_dbl_lc_numba)), | |
#("single_lc_numba", hurst_vect_lc_numba)), | |
n_samples=15360, | |
show_exponent=False | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment