Skip to content

Instantly share code, notes, and snippets.

@humatic
Last active October 25, 2024 20:53
Show Gist options
  • Save humatic/df255cda1731d5a1bb4bf9af0c744401 to your computer and use it in GitHub Desktop.
Save humatic/df255cda1731d5a1bb4bf9af0c744401 to your computer and use it in GitHub Desktop.
Helper methods for the blog article "Decoding Dynamics: A Quick Guide to SINDy" found at https://humaticlabs.com/blog/sindy-guide/
"""
Description:
Helper methods for the blog article "Decoding Dynamics: A Quick Guide to
SINDy" found at https://humaticlabs.com/blog/sindy-guide/
Name: helpers.py
Version: 1.0.0
Date Created: Oct 24, 2024
Author: Robert Taylor
Email: robert@humaticlabs.com
Website: https://humaticlabs.com/
License: GNU GPLv3
Copyright (c) 2024, Robert Taylor
All rights reserved.
Requirements:
- matplotlib >= 3.9.1
- numpy >= 2.0.0
- scipy >= 1.14.0
- scikit-learn >= 1.5.1
"""
import matplotlib.pyplot as plt
import numpy as np
import time
import warnings
from itertools import product
from scipy.integrate import ode
from sklearn.linear_model import LinearRegression
__all__ = [
'set_figure_resolution',
'lorenz',
'plot_lorenz_3d',
'plot_lorenz_series',
'plot_lorenz_deriv',
'plot_lorenz_deriv_with_est',
'plot_lorenz',
'integrate_lorenz',
'poly_exponents',
'build_poly_label',
'build_theta',
'print_system',
'add_noise',
'get_xi_true',
'masked_linear_regression',
'plot_pareto_lasso',
'plot_pareto_stlsq',
'test_perf',
]
SUPERSCRIPTS = {
0: '⁰', 1: '¹', 2: '²', 3: '³', 4: '⁴',
5: '⁵', 6: '⁶', 7: '⁷', 8: '⁸', 9: '⁹',
}
LORENZ_SIG = 10
LORENZ_RHO = 28
LORENZ_BETA = 8/3
def set_figure_resolution(dpi=300):
"""
Increase the default resolution for PyPlot images.
"""
plt.rcParams['figure.dpi'] = dpi
plt.rcParams['savefig.dpi'] = dpi
def lorenz(t, y, sig=LORENZ_SIG, rho=LORENZ_RHO, beta=LORENZ_BETA):
"""
The right hand side of the Lorenz attractor system (ODE).
"""
return np.array([
sig * (y[1] - y[0]),
y[0] * (rho - y[2]) - y[1],
y[0] * y[1] - beta * y[2],
])
def plot_lorenz_3d(Y, *args, fig=None, figsize=None, **kwargs):
"""
Plot Lorenz in 3D projection.
"""
fig = fig or plt.figure(figsize=figsize)
ax = fig.add_subplot(projection='3d')
ax.plot(Y[0, :], Y[1, :], Y[2, :], *args, **kwargs)
ax.set(xlabel='x', ylabel='y', zlabel='z')
ax.zaxis.labelpad = -3
def plot_lorenz_series(
t_vec,
Y,
lim=True,
labels=None,
name=None,
fig=None,
ax=None,
**kwargs
):
"""
Plot Lorenz time series in 3 stacked figures.
"""
fig = fig or plt.figure(figsize=(10, 5))
ax = fig.subplots(3, 1, sharex=True) if ax is None else ax
# Optionally limit series to last `lim` time units
lim = 10 if lim is True else lim
if isinstance(lim, (int, float)):
ind = (t_vec[-1] - lim) < t_vec
t_vec = t_vec[ind]
Y = Y[:, ind]
labels = labels or ['x', 'y', 'z']
ax[0].plot(t_vec, Y[0], label=name, **kwargs)
ax[0].set(ylabel=labels[0])
ax[1].plot(t_vec, Y[1], **kwargs)
ax[1].set(ylabel=labels[1])
ax[2].plot(t_vec, Y[2], **kwargs)
ax[2].set(ylabel=labels[2], xlabel='t')
return fig, ax
def plot_lorenz_deriv(t_vec, dY, *args, **kwargs):
"""
Plot Lorenz derivative time series.
"""
kwargs.setdefault('labels', ['dx', 'dy', 'dz'])
kwargs.setdefault('color', 'C4')
return plot_lorenz_series(t_vec, dY, *args, **kwargs)
def plot_lorenz_deriv_with_est(t_vec, dX, dX_est, est_label):
"""
Plot an estimated derivative time series compared to the true derivative.
"""
fig, ax = plot_lorenz_deriv(t_vec, dX_est, name=est_label)
fig, ax = plot_lorenz_deriv(
t_vec, dX, name='true', color='#333',
linestyle='dashed', fig=fig, ax=ax)
ax[0].legend(loc='upper right')
mse = ((dX - dX_est)**2).mean()
ax[0].set_title('MSE = %0.3f' % mse)
def plot_lorenz(t_vec, Y, **kwargs):
"""
Plot 3D Lorenz system alongside its time series.
"""
fig = plt.figure(figsize=(12, 4))
subfigs = fig.subfigures(1, 2, wspace=0.01, width_ratios=[1, 2])
lim = kwargs.pop('lim', True)
plot_lorenz_3d(Y[:, :3000], '.', fig=subfigs[0], markersize=1, **kwargs)
plot_lorenz_series(t_vec, Y, lim=lim, fig=subfigs[1])
def integrate_lorenz(t_vec, y0=None):
"""
Integrate Lorenz system over the given time vector using Runge-Kutta.
"""
# Create arrays and set initial condition
Y = np.empty((3, len(t_vec)))
Y[:, 0] = y0 or [1.5961, 0.1859, 4.7297] # initial (x,y,z) coords
# Create explicit Runge-Kutta integrator of order (4)5
r = ode(lorenz).set_integrator('dopri5')
r.set_initial_value(Y[:, 0], t_vec[0])
# Run the integration
for i, t in enumerate(t_vec):
if not r.successful():
break
if i == 0:
continue # skip the initial position
r.integrate(t)
Y[:, i] = r.y
return Y
def poly_exponents(n_feat, max_deg):
"""
Generate all unique polynomial exponents for the given number of features
up to a maximum degree.
"""
def filter_(tup):
return sum(tup) <= max_deg
poly_exp = filter(filter_, product(range(max_deg + 1), repeat=n_feat))
poly_exp = [list(reversed(x)) for x in poly_exp]
return sorted(poly_exp, key=lambda x: sum(x))
def build_poly_label(poly_exp, feats='xyz'):
"""
Build string labels for given combination of exponents.
"""
assert len(poly_exp) == len(feats)
label = ''
for s, e in zip(feats, poly_exp):
if not e:
continue
label += s + SUPERSCRIPTS[e]
return label or '1'
def build_theta(X, n_feat=3, max_deg=5):
"""
Build and return "theta" feature library matrix.
"""
theta = []
theta_cols = []
for exp in poly_exponents(n_feat, max_deg):
candidate = np.ones(X.shape[1], dtype=X.dtype)
for _Y, e in zip(X, exp):
candidate *= np.pow(_Y, e)
label = build_poly_label(exp)
theta.append(candidate)
theta_cols.append(label)
theta = np.stack(theta).T
return theta, theta_cols
def print_system(coefs, col_labels, row_labels=None, thresh=None):
"""
Print the given coefficient matrix as a human-fiendly system of equations.
"""
thresh = 1e-3 if thresh is None else thresh
row_labels = row_labels or ['dx', 'dy', 'dz']
assert len(row_labels) == coefs.shape[0]
assert len(col_labels) == coefs.shape[1]
for i, targ in enumerate(row_labels):
print(f'{targ} = ', end='')
first = True
for j, (name, c) in enumerate(zip(col_labels, coefs[i])):
if abs(c) < thresh:
continue
if first:
op = ''
first = False
else:
op = ' - ' if c < 0 else ' + '
c = abs(c)
val = f'{c:0.3f}'
if name != '1':
val += ' ' + name
print(f'{op}{val}', end='')
print('')
def add_noise(Y, eta):
"""
Add noise to the given signal proportional to each row's standard
deviation. Y is expected to have shape (n_targ, n_samp). `eta` is a
scalar ratio representing the percent noise (e.g. 0.1 for 10% noise).
"""
noise = np.zeros_like(Y)
for i, row in enumerate(Y):
scale = eta * np.std(row)
noise[i] = np.random.normal(loc=0, scale=scale, size=row.shape)
return Y + noise
def get_xi_true(
theta_cols,
sig=LORENZ_SIG,
rho=LORENZ_RHO,
beta=LORENZ_BETA,
):
"""
Build the true (ideal) coefficient matrix "xi" for the Lorenz system.
"""
n_feat = 3
xi_true = np.zeros((n_feat, len(theta_cols)))
def set_(row, col_name, val):
col = theta_cols.index(col_name)
xi_true[row, col] = val
set_(0, 'x¹', -sig)
set_(0, 'y¹', sig)
set_(1, 'x¹', rho)
set_(1, 'x¹z¹', -1)
set_(1, 'y¹', -1)
set_(2, 'x¹y¹', 1)
set_(2, 'z¹', -beta)
return xi_true
def masked_linear_regression(X, y, ind):
"""
Perform simple linear regression using only the given indices of X.
"""
assert X.shape[0] == y.shape[0]
n_targ, n_feat = ind.shape
coef = np.zeros((n_targ, n_feat))
for i in range(n_targ):
ind_i = ind[i]
if np.any(ind_i):
reg = LinearRegression(fit_intercept=False)
coef[i, ind_i] = reg.fit(X[:, ind_i], y[:, i]).coef_
return coef
def plot_pareto_lasso(mse_vec, alpha_vec, log=False):
"""
Plot a Pareto front comparing Lasso's alpha against its MSE.
"""
plt.plot(alpha_vec, mse_vec)
kwargs = dict(xscale='log', yscale='log') if log else {}
plt.gca().set(xlabel='alpha', ylabel='MSE', **kwargs)
def plot_pareto_stlsq(loss_mat, alpha_vec, theshold_vec, log=False):
"""
Plot a Pareto front comparing STLSQ's alpha and threshold against its MSE.
"""
assert loss_mat.shape == (len(alpha_vec), len(theshold_vec))
X, Y = np.meshgrid(alpha_vec, theshold_vec, indexing='ij')
c = plt.pcolor(X, Y, loss_mat, cmap='Reds')
cbar = plt.colorbar(c, ax=plt.gca())
cbar.set_label('MSE')
kwargs = dict(xscale='log', yscale='log') if log else {}
plt.gca().set(xlabel='alpha', ylabel='threshold', **kwargs)
def test_perf(func, loops=10, t_stop=100, max_deg=5):
"""
Measure the average execution time for the given function (LASSO or STLSQ).
"""
t_step = 0.01
t_vec = np.arange(0, t_stop, t_step)
X = integrate_lorenz(t_vec)
dX = lorenz(t_vec, X)
eta = 0.1
X_hat = add_noise(X, eta)
dX_hat = add_noise(dX, eta)
theta, _ = build_theta(X_hat, max_deg=max_deg)
with warnings.catch_warnings():
warnings.simplefilter('ignore')
t0 = time.perf_counter()
for i in range(loops):
func(theta, dX_hat.T)
t1 = time.perf_counter()
avg = (t1 - t0) / loops
return avg, theta.shape[0], theta.shape[1]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment