Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Robust Spline Regression with scikit learn
#!/usr/bin/env python
"""
2D BSpline Regression with Scikitlearn.
"""
import matplotlib.pyplot as plt
import numpy as np
import scipy.interpolate as si
import itertools
from sklearn.base import TransformerMixin
from sklearn.pipeline import make_pipeline
from sklearn.linear_model import LinearRegression, RANSACRegressor,\
TheilSenRegressor, HuberRegressor
def main():
# generate test data
npoints = 1000
x = np.random.uniform(low=-1., high=1., size=npoints)
y = np.random.uniform(low=-1., high=1., size=npoints)
z = np.exp(-0.5 * (x**2 + y**2))
# add outliers
z_noise = np.random.uniform(low=0., high=1., size=npoints)
mask_good = z_noise < 0.9
z_noise[mask_good] = 0.
z_noise *= 10
z += z_noise
# prepare BSpline Basis
knotsx = np.linspace(-1, 1., 5)
knotsy = np.linspace(-1, 1., 5)
knots = np.array([knotsx, knotsy])
bspline_features = BSpline2DFeatures(knots)
# prepare grid for prediction
x1d = np.linspace(-1., 1., 100)
y1d = np.linspace(-1., 1., 100)
extent = x1d[0], x1d[-1], y1d[0], y1d[-1]
xgrid, ygrid = np.meshgrid(x1d, y1d, indexing='ij')
X_predict = np.array([xgrid.flatten(), ygrid.flatten()]).T
# assemble feature matrix and construct linear model
X = np.array([x, y]).T
estimators = [('Least-Square', LinearRegression(fit_intercept=False)),
('Theil-Sen', TheilSenRegressor(random_state=42)),
('RANSAC', RANSACRegressor(random_state=42)),
('HuberRegressor', HuberRegressor())]
# prepare plotting
nestimators = len(estimators)
norm = plt.Normalize(0., 1.5)
fig = plt.figure(figsize=(8, 5))
fig.suptitle('Robust 2D B-Spline Regression with SKLearn')
ax = plt.subplot2grid((2, nestimators), (0, 0))
ax.imshow(np.exp(-0.5*(xgrid**2 + ygrid**2)), extent=extent,
origin='lower', norm=norm)
ax.set(title='input function')
ax = plt.subplot2grid((2, nestimators), (0, 1))
ax.scatter(x[mask_good], y[mask_good], s=1, c=z[mask_good], norm=norm)
ax.scatter(x[~mask_good], y[~mask_good], s=5, marker='x', c='black')
ax.set_aspect(1.0)
ax.set(title='data points')
for iestimator, (label, estimator) in enumerate(estimators):
ax = plt.subplot2grid((2, nestimators), (1, iestimator))
model = make_pipeline(bspline_features, estimator)
model.fit(X, z)
z_predicted = model.predict(X_predict).reshape(xgrid.shape)
ax.imshow(z_predicted.T, origin='lower', extent=extent, norm=norm)
ax.scatter(x[mask_good], y[mask_good], s=1, c=z[mask_good], norm=norm)
ax.scatter(x[~mask_good], y[~mask_good], s=5, marker='x', c='black')
ax.set(title=label)
fig.subplots_adjust(top=0.9, hspace=0.2)
plt.figtext(0.55, 0.6,
r'The exponential function $\exp{-\frac{1}{2}(x^2 + y^2)}$'
'\nis fit with different regression models.'
'\n10% Outliers with value 10 are added\nto the data points'
' (black color).\n'
'The least-square model is strongly\ninfluenced by the '
'outliers, in contrast\nto the robust regression models.',
style='italic',
bbox={'facecolor': 'blue', 'alpha': 0.1, 'pad': 10})
plt.show()
class BSpline2DFeatures(TransformerMixin):
def __init__(self, knots, degree=3, periodic=False):
self.nfeatures = len(knots)
self.knots = knots
self.splines = BsplineND(knots, degree=degree, periodic=periodic)
def fit(self, X, y=None):
return self
def transform(self, X):
nsamples, nfeatures = X.shape
assert nfeatures == self.nfeatures, 'Error with spline dimension'
features = self.splines.evaluate(X.T)
features = features.reshape(self.splines.ncoeffs_all, nsamples).T
return features
class BsplineND():
def __init__(self, knots, degree=3, periodic=False):
"""
:param knots: a list of the spline knots with ndim = len(knots)
"""
self.ndim = len(knots)
self.splines = []
self.knots = knots
self.degree = degree
self.periodic = periodic
if self.periodic:
self.ncoeffs_valid = [len(knots1d) + 2 for knots1d in self.knots]
else:
self.ncoeffs_valid = [len(knots1d) for knots1d in self.knots]
for idim, knots1d in enumerate(knots):
nknots1d = len(knots1d)
y_dummy = np.zeros(nknots1d)
knots1d, coeffs, degree = si.splrep(knots1d, y_dummy, k=degree,
per=periodic)
self.splines.append((knots1d, coeffs, degree))
self.ncoeffs = [len(coeffs) for knots, coeffs, degree in self.splines]
self.ncoeffs_all = np.product(self.ncoeffs_valid)
def evaluate(self, position):
"""
:param position: a numpy array with size [ndim, npoints]
:returns: a numpy array with size [nspl1, nspl2, ..., nsplN, npts]
with the spline basis evaluated at the input points
"""
ndim, npts = position.shape
values_shape = self.ncoeffs_valid + [npts]
values = np.empty(values_shape)
ranges = [range(icoeffs) for icoeffs in self.ncoeffs_valid]
for icoeffs in itertools.product(*ranges):
values_dim = np.empty((ndim, npts))
for idim, icoeff in enumerate(icoeffs):
coeffs = [1.0 if ispl == icoeff else 0.0 for ispl in
range(self.ncoeffs[idim])]
values_dim[idim] = si.splev(
position[idim],
(self.splines[idim][0], coeffs, self.degree))
values[icoeffs] = np.product(values_dim, axis=0)
return values
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment