Skip to content

Instantly share code, notes, and snippets.

@avidale
Created July 16, 2022 11:32
Show Gist options
  • Save avidale/83998aa24e5d9c40c894e9e39c9f9c86 to your computer and use it in GitHub Desktop.
Save avidale/83998aa24e5d9c40c894e9e39c9f9c86 to your computer and use it in GitHub Desktop.
import numpy as np
import scipy.optimize
from sklearn.linear_model import Ridge
from sklearn.isotonic import IsotonicRegression
from sklearn.base import BaseEstimator, RegressorMixin
class MonotonicRegression(BaseEstimator, RegressorMixin):
""" Smooth increasing piecewise linear regression.
During training, it minimizes MSE and the sum of absolute changes in its slope.
"""
def __init__(self, alpha=1.0, max_cuts=100, cuts_method='uniform', max_iter=1000):
self.alpha = alpha
self.max_cuts = max_cuts
self.cuts_method = cuts_method
self.max_iter = max_iter
def _make_features(self, X):
fe = np.array([
[max(0, min(x, self.cutpoints_[i + 1]) - self.cutpoints_[i]) for i in range(self.n_)]
for x in X
])
return fe
def fit(self, X, y):
X = np.array(X).ravel()
cutpoints = set(X)
if len(cutpoints) > self.max_cuts:
if self.cuts_method == 'quantile':
cutpoints = np.percentile(X, np.linspace(0, 100, self.max_cuts))
else:
cutpoints = np.linspace(min(X), max(X), self.max_cuts)
cutpoints = set(cutpoints)
self.cutpoints_ = np.array(sorted(cutpoints))
self.n_ = len(self.cutpoints_) - 1
fe = self._make_features(X)
# initialize with a Linear regression
# lr0 = LinearRegression().fit(X[:, np.newaxis], y)
# x0 = [lr0.intercept_ + lr0.coef_[0] * self.cutpoints_[0]] + [lr0.coef_[0] for _ in range(self.n_)]
# initialize with a non-smoothed monotonic regression
lr0 = Ridge(1e-6, positive=True).fit(fe, y)
x0 = np.concatenate([[lr0.intercept_], lr0.coef_])
bounds = [[None, None]] + [[0, None] for _ in range(self.n_)]
def fun(params):
# main loss
preds = params[0] + fe @ params[1:]
resid = (preds - y)
loss = np.mean(resid ** 2)
# gradient
gloss = params * 0
gloss[0] = np.mean(resid) * 2
gloss[1:] = 2 / len(resid) * resid @ fe
# regularization
dparam = (params[1:-1] - params[2:])
reg = np.mean(np.abs(dparam))
# gradient of regularization
greg = params * 0
greg[1:-1] += np.sign(dparam) / self.n_
greg[2:] -= np.sign(dparam) / self.n_
return loss + self.alpha * reg, gloss + self.alpha * greg
res = scipy.optimize.minimize(
fun,
x0=x0,
method='SLSQP',
jac=True,
bounds=bounds,
options={'maxiter': self.max_iter},
)
self.params_ = res.x
return self
def predict(self, X):
fe = self._make_features(np.array(X).ravel())
return self.params_[0] + fe @ self.params_[1:]
def to_isotonic(self, min_rel_diff=0):
""" Convert this object to scikit-learn isotinic regression, which is more efficient.
If `min_rel_diff > 0`, the cutpoints with relative slope difference below this number are ignored.
"""
x_points = self.cutpoints_
if min_rel_diff:
p = self.params_[1:]
fltr = np.abs(p[:-1] - p[1:]) / np.abs(p[:-1] + p[1:]) >= min_rel_diff
x_points = np.concatenate([[x_points[0]], x_points[1:-1][fltr], [x_points[-1]]])
y_points = self.predict(x_points[:, np.newaxis])
iso2 = IsotonicRegression(out_of_bounds='clip')
iso2.X_thresholds_, iso2.y_thresholds = x_points, y_points
iso2.X_min_ = min(x_points)
iso2.X_max_ = max(x_points)
iso2._build_f(x_points, y_points)
iso2.increasing_ = True
return iso2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment