Create a gist now

Instantly share code, notes, and snippets.

Robust Spline Regression with scikit learn
#!/usr/bin/env python
2D BSpline Regression with Scikitlearn.
import matplotlib.pyplot as plt
import numpy as np
import scipy.interpolate as si
import itertools
from sklearn.base import TransformerMixin
from sklearn.pipeline import make_pipeline
from sklearn.linear_model import LinearRegression, RANSACRegressor,\
TheilSenRegressor, HuberRegressor
def main():
# generate test data
npoints = 1000
x = np.random.uniform(low=-1., high=1., size=npoints)
y = np.random.uniform(low=-1., high=1., size=npoints)
z = np.exp(-0.5 * (x**2 + y**2))
# add outliers
z_noise = np.random.uniform(low=0., high=1., size=npoints)
mask_good = z_noise < 0.9
z_noise[mask_good] = 0.
z_noise *= 10
z += z_noise
# prepare BSpline Basis
knotsx = np.linspace(-1, 1., 5)
knotsy = np.linspace(-1, 1., 5)
knots = np.array([knotsx, knotsy])
bspline_features = BSpline2DFeatures(knots)
# prepare grid for prediction
x1d = np.linspace(-1., 1., 100)
y1d = np.linspace(-1., 1., 100)
extent = x1d[0], x1d[-1], y1d[0], y1d[-1]
xgrid, ygrid = np.meshgrid(x1d, y1d, indexing='ij')
X_predict = np.array([xgrid.flatten(), ygrid.flatten()]).T
# assemble feature matrix and construct linear model
X = np.array([x, y]).T
estimators = [('Least-Square', LinearRegression(fit_intercept=False)),
('Theil-Sen', TheilSenRegressor(random_state=42)),
('RANSAC', RANSACRegressor(random_state=42)),
('HuberRegressor', HuberRegressor())]
# prepare plotting
nestimators = len(estimators)
norm = plt.Normalize(0., 1.5)
fig = plt.figure(figsize=(8, 5))
fig.suptitle('Robust 2D B-Spline Regression with SKLearn')
ax = plt.subplot2grid((2, nestimators), (0, 0))
ax.imshow(np.exp(-0.5*(xgrid**2 + ygrid**2)), extent=extent,
origin='lower', norm=norm)
ax.set(title='input function')
ax = plt.subplot2grid((2, nestimators), (0, 1))
ax.scatter(x[mask_good], y[mask_good], s=1, c=z[mask_good], norm=norm)
ax.scatter(x[~mask_good], y[~mask_good], s=5, marker='x', c='black')
ax.set(title='data points')
for iestimator, (label, estimator) in enumerate(estimators):
ax = plt.subplot2grid((2, nestimators), (1, iestimator))
model = make_pipeline(bspline_features, estimator), z)
z_predicted = model.predict(X_predict).reshape(xgrid.shape)
ax.imshow(z_predicted.T, origin='lower', extent=extent, norm=norm)
ax.scatter(x[mask_good], y[mask_good], s=1, c=z[mask_good], norm=norm)
ax.scatter(x[~mask_good], y[~mask_good], s=5, marker='x', c='black')
fig.subplots_adjust(top=0.9, hspace=0.2)
plt.figtext(0.55, 0.6,
r'The exponential function $\exp{-\frac{1}{2}(x^2 + y^2)}$'
'\nis fit with different regression models.'
'\n10% Outliers with value 10 are added\nto the data points'
' (black color).\n'
'The least-square model is strongly\ninfluenced by the '
'outliers, in contrast\nto the robust regression models.',
bbox={'facecolor': 'blue', 'alpha': 0.1, 'pad': 10})
class BSpline2DFeatures(TransformerMixin):
def __init__(self, knots, degree=3, periodic=False):
self.nfeatures = len(knots)
self.knots = knots
self.splines = BsplineND(knots, degree=degree, periodic=periodic)
def fit(self, X, y=None):
return self
def transform(self, X):
nsamples, nfeatures = X.shape
assert nfeatures == self.nfeatures, 'Error with spline dimension'
features = self.splines.evaluate(X.T)
features = features.reshape(self.splines.ncoeffs_all, nsamples).T
return features
class BsplineND():
def __init__(self, knots, degree=3, periodic=False):
:param knots: a list of the spline knots with ndim = len(knots)
self.ndim = len(knots)
self.splines = []
self.knots = knots = degree
self.periodic = periodic
if self.periodic:
self.ncoeffs_valid = [len(knots1d) + 2 for knots1d in self.knots]
self.ncoeffs_valid = [len(knots1d) for knots1d in self.knots]
for idim, knots1d in enumerate(knots):
nknots1d = len(knots1d)
y_dummy = np.zeros(nknots1d)
knots1d, coeffs, degree = si.splrep(knots1d, y_dummy, k=degree,
self.splines.append((knots1d, coeffs, degree))
self.ncoeffs = [len(coeffs) for knots, coeffs, degree in self.splines]
self.ncoeffs_all = np.product(self.ncoeffs_valid)
def evaluate(self, position):
:param position: a numpy array with size [ndim, npoints]
:returns: a numpy array with size [nspl1, nspl2, ..., nsplN, npts]
with the spline basis evaluated at the input points
ndim, npts = position.shape
values_shape = self.ncoeffs_valid + [npts]
values = np.empty(values_shape)
ranges = [range(icoeffs) for icoeffs in self.ncoeffs_valid]
for icoeffs in itertools.product(*ranges):
values_dim = np.empty((ndim, npts))
for idim, icoeff in enumerate(icoeffs):
coeffs = [1.0 if ispl == icoeff else 0.0 for ispl in
values_dim[idim] = si.splev(
(self.splines[idim][0], coeffs,
values[icoeffs] = np.product(values_dim, axis=0)
return values
if __name__ == "__main__":
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment