Last active
October 25, 2024 20:53
-
-
Save humatic/df255cda1731d5a1bb4bf9af0c744401 to your computer and use it in GitHub Desktop.
Helper methods for the blog article "Decoding Dynamics: A Quick Guide to SINDy" found at https://humaticlabs.com/blog/sindy-guide/
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Description: | |
Helper methods for the blog article "Decoding Dynamics: A Quick Guide to | |
SINDy" found at https://humaticlabs.com/blog/sindy-guide/ | |
Name: helpers.py | |
Version: 1.0.0 | |
Date Created: Oct 24, 2024 | |
Author: Robert Taylor | |
Email: robert@humaticlabs.com | |
Website: https://humaticlabs.com/ | |
License: GNU GPLv3 | |
Copyright (c) 2024, Robert Taylor | |
All rights reserved. | |
Requirements: | |
- matplotlib >= 3.9.1 | |
- numpy >= 2.0.0 | |
- scipy >= 1.14.0 | |
- scikit-learn >= 1.5.1 | |
""" | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import time | |
import warnings | |
from itertools import product | |
from scipy.integrate import ode | |
from sklearn.linear_model import LinearRegression | |
__all__ = [ | |
'set_figure_resolution', | |
'lorenz', | |
'plot_lorenz_3d', | |
'plot_lorenz_series', | |
'plot_lorenz_deriv', | |
'plot_lorenz_deriv_with_est', | |
'plot_lorenz', | |
'integrate_lorenz', | |
'poly_exponents', | |
'build_poly_label', | |
'build_theta', | |
'print_system', | |
'add_noise', | |
'get_xi_true', | |
'masked_linear_regression', | |
'plot_pareto_lasso', | |
'plot_pareto_stlsq', | |
'test_perf', | |
] | |
SUPERSCRIPTS = { | |
0: '⁰', 1: '¹', 2: '²', 3: '³', 4: '⁴', | |
5: '⁵', 6: '⁶', 7: '⁷', 8: '⁸', 9: '⁹', | |
} | |
LORENZ_SIG = 10 | |
LORENZ_RHO = 28 | |
LORENZ_BETA = 8/3 | |
def set_figure_resolution(dpi=300): | |
""" | |
Increase the default resolution for PyPlot images. | |
""" | |
plt.rcParams['figure.dpi'] = dpi | |
plt.rcParams['savefig.dpi'] = dpi | |
def lorenz(t, y, sig=LORENZ_SIG, rho=LORENZ_RHO, beta=LORENZ_BETA): | |
""" | |
The right hand side of the Lorenz attractor system (ODE). | |
""" | |
return np.array([ | |
sig * (y[1] - y[0]), | |
y[0] * (rho - y[2]) - y[1], | |
y[0] * y[1] - beta * y[2], | |
]) | |
def plot_lorenz_3d(Y, *args, fig=None, figsize=None, **kwargs): | |
""" | |
Plot Lorenz in 3D projection. | |
""" | |
fig = fig or plt.figure(figsize=figsize) | |
ax = fig.add_subplot(projection='3d') | |
ax.plot(Y[0, :], Y[1, :], Y[2, :], *args, **kwargs) | |
ax.set(xlabel='x', ylabel='y', zlabel='z') | |
ax.zaxis.labelpad = -3 | |
def plot_lorenz_series( | |
t_vec, | |
Y, | |
lim=True, | |
labels=None, | |
name=None, | |
fig=None, | |
ax=None, | |
**kwargs | |
): | |
""" | |
Plot Lorenz time series in 3 stacked figures. | |
""" | |
fig = fig or plt.figure(figsize=(10, 5)) | |
ax = fig.subplots(3, 1, sharex=True) if ax is None else ax | |
# Optionally limit series to last `lim` time units | |
lim = 10 if lim is True else lim | |
if isinstance(lim, (int, float)): | |
ind = (t_vec[-1] - lim) < t_vec | |
t_vec = t_vec[ind] | |
Y = Y[:, ind] | |
labels = labels or ['x', 'y', 'z'] | |
ax[0].plot(t_vec, Y[0], label=name, **kwargs) | |
ax[0].set(ylabel=labels[0]) | |
ax[1].plot(t_vec, Y[1], **kwargs) | |
ax[1].set(ylabel=labels[1]) | |
ax[2].plot(t_vec, Y[2], **kwargs) | |
ax[2].set(ylabel=labels[2], xlabel='t') | |
return fig, ax | |
def plot_lorenz_deriv(t_vec, dY, *args, **kwargs): | |
""" | |
Plot Lorenz derivative time series. | |
""" | |
kwargs.setdefault('labels', ['dx', 'dy', 'dz']) | |
kwargs.setdefault('color', 'C4') | |
return plot_lorenz_series(t_vec, dY, *args, **kwargs) | |
def plot_lorenz_deriv_with_est(t_vec, dX, dX_est, est_label): | |
""" | |
Plot an estimated derivative time series compared to the true derivative. | |
""" | |
fig, ax = plot_lorenz_deriv(t_vec, dX_est, name=est_label) | |
fig, ax = plot_lorenz_deriv( | |
t_vec, dX, name='true', color='#333', | |
linestyle='dashed', fig=fig, ax=ax) | |
ax[0].legend(loc='upper right') | |
mse = ((dX - dX_est)**2).mean() | |
ax[0].set_title('MSE = %0.3f' % mse) | |
def plot_lorenz(t_vec, Y, **kwargs): | |
""" | |
Plot 3D Lorenz system alongside its time series. | |
""" | |
fig = plt.figure(figsize=(12, 4)) | |
subfigs = fig.subfigures(1, 2, wspace=0.01, width_ratios=[1, 2]) | |
lim = kwargs.pop('lim', True) | |
plot_lorenz_3d(Y[:, :3000], '.', fig=subfigs[0], markersize=1, **kwargs) | |
plot_lorenz_series(t_vec, Y, lim=lim, fig=subfigs[1]) | |
def integrate_lorenz(t_vec, y0=None): | |
""" | |
Integrate Lorenz system over the given time vector using Runge-Kutta. | |
""" | |
# Create arrays and set initial condition | |
Y = np.empty((3, len(t_vec))) | |
Y[:, 0] = y0 or [1.5961, 0.1859, 4.7297] # initial (x,y,z) coords | |
# Create explicit Runge-Kutta integrator of order (4)5 | |
r = ode(lorenz).set_integrator('dopri5') | |
r.set_initial_value(Y[:, 0], t_vec[0]) | |
# Run the integration | |
for i, t in enumerate(t_vec): | |
if not r.successful(): | |
break | |
if i == 0: | |
continue # skip the initial position | |
r.integrate(t) | |
Y[:, i] = r.y | |
return Y | |
def poly_exponents(n_feat, max_deg): | |
""" | |
Generate all unique polynomial exponents for the given number of features | |
up to a maximum degree. | |
""" | |
def filter_(tup): | |
return sum(tup) <= max_deg | |
poly_exp = filter(filter_, product(range(max_deg + 1), repeat=n_feat)) | |
poly_exp = [list(reversed(x)) for x in poly_exp] | |
return sorted(poly_exp, key=lambda x: sum(x)) | |
def build_poly_label(poly_exp, feats='xyz'): | |
""" | |
Build string labels for given combination of exponents. | |
""" | |
assert len(poly_exp) == len(feats) | |
label = '' | |
for s, e in zip(feats, poly_exp): | |
if not e: | |
continue | |
label += s + SUPERSCRIPTS[e] | |
return label or '1' | |
def build_theta(X, n_feat=3, max_deg=5): | |
""" | |
Build and return "theta" feature library matrix. | |
""" | |
theta = [] | |
theta_cols = [] | |
for exp in poly_exponents(n_feat, max_deg): | |
candidate = np.ones(X.shape[1], dtype=X.dtype) | |
for _Y, e in zip(X, exp): | |
candidate *= np.pow(_Y, e) | |
label = build_poly_label(exp) | |
theta.append(candidate) | |
theta_cols.append(label) | |
theta = np.stack(theta).T | |
return theta, theta_cols | |
def print_system(coefs, col_labels, row_labels=None, thresh=None): | |
""" | |
Print the given coefficient matrix as a human-fiendly system of equations. | |
""" | |
thresh = 1e-3 if thresh is None else thresh | |
row_labels = row_labels or ['dx', 'dy', 'dz'] | |
assert len(row_labels) == coefs.shape[0] | |
assert len(col_labels) == coefs.shape[1] | |
for i, targ in enumerate(row_labels): | |
print(f'{targ} = ', end='') | |
first = True | |
for j, (name, c) in enumerate(zip(col_labels, coefs[i])): | |
if abs(c) < thresh: | |
continue | |
if first: | |
op = '' | |
first = False | |
else: | |
op = ' - ' if c < 0 else ' + ' | |
c = abs(c) | |
val = f'{c:0.3f}' | |
if name != '1': | |
val += ' ' + name | |
print(f'{op}{val}', end='') | |
print('') | |
def add_noise(Y, eta): | |
""" | |
Add noise to the given signal proportional to each row's standard | |
deviation. Y is expected to have shape (n_targ, n_samp). `eta` is a | |
scalar ratio representing the percent noise (e.g. 0.1 for 10% noise). | |
""" | |
noise = np.zeros_like(Y) | |
for i, row in enumerate(Y): | |
scale = eta * np.std(row) | |
noise[i] = np.random.normal(loc=0, scale=scale, size=row.shape) | |
return Y + noise | |
def get_xi_true( | |
theta_cols, | |
sig=LORENZ_SIG, | |
rho=LORENZ_RHO, | |
beta=LORENZ_BETA, | |
): | |
""" | |
Build the true (ideal) coefficient matrix "xi" for the Lorenz system. | |
""" | |
n_feat = 3 | |
xi_true = np.zeros((n_feat, len(theta_cols))) | |
def set_(row, col_name, val): | |
col = theta_cols.index(col_name) | |
xi_true[row, col] = val | |
set_(0, 'x¹', -sig) | |
set_(0, 'y¹', sig) | |
set_(1, 'x¹', rho) | |
set_(1, 'x¹z¹', -1) | |
set_(1, 'y¹', -1) | |
set_(2, 'x¹y¹', 1) | |
set_(2, 'z¹', -beta) | |
return xi_true | |
def masked_linear_regression(X, y, ind): | |
""" | |
Perform simple linear regression using only the given indices of X. | |
""" | |
assert X.shape[0] == y.shape[0] | |
n_targ, n_feat = ind.shape | |
coef = np.zeros((n_targ, n_feat)) | |
for i in range(n_targ): | |
ind_i = ind[i] | |
if np.any(ind_i): | |
reg = LinearRegression(fit_intercept=False) | |
coef[i, ind_i] = reg.fit(X[:, ind_i], y[:, i]).coef_ | |
return coef | |
def plot_pareto_lasso(mse_vec, alpha_vec, log=False): | |
""" | |
Plot a Pareto front comparing Lasso's alpha against its MSE. | |
""" | |
plt.plot(alpha_vec, mse_vec) | |
kwargs = dict(xscale='log', yscale='log') if log else {} | |
plt.gca().set(xlabel='alpha', ylabel='MSE', **kwargs) | |
def plot_pareto_stlsq(loss_mat, alpha_vec, theshold_vec, log=False): | |
""" | |
Plot a Pareto front comparing STLSQ's alpha and threshold against its MSE. | |
""" | |
assert loss_mat.shape == (len(alpha_vec), len(theshold_vec)) | |
X, Y = np.meshgrid(alpha_vec, theshold_vec, indexing='ij') | |
c = plt.pcolor(X, Y, loss_mat, cmap='Reds') | |
cbar = plt.colorbar(c, ax=plt.gca()) | |
cbar.set_label('MSE') | |
kwargs = dict(xscale='log', yscale='log') if log else {} | |
plt.gca().set(xlabel='alpha', ylabel='threshold', **kwargs) | |
def test_perf(func, loops=10, t_stop=100, max_deg=5): | |
""" | |
Measure the average execution time for the given function (LASSO or STLSQ). | |
""" | |
t_step = 0.01 | |
t_vec = np.arange(0, t_stop, t_step) | |
X = integrate_lorenz(t_vec) | |
dX = lorenz(t_vec, X) | |
eta = 0.1 | |
X_hat = add_noise(X, eta) | |
dX_hat = add_noise(dX, eta) | |
theta, _ = build_theta(X_hat, max_deg=max_deg) | |
with warnings.catch_warnings(): | |
warnings.simplefilter('ignore') | |
t0 = time.perf_counter() | |
for i in range(loops): | |
func(theta, dX_hat.T) | |
t1 = time.perf_counter() | |
avg = (t1 - t0) / loops | |
return avg, theta.shape[0], theta.shape[1] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment