Created
March 23, 2021 12:58
-
-
Save psinger/0b9525742e46f987dee8b3fdccacd7ca to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" Calibrated Classifier Model: To calibrate predictions using Platt's scaling, Isotonic Regression or Splines | |
""" | |
import copy | |
import datatable as dt | |
from h2oaicore.mojo import MojoWriter, MojoFrame | |
from h2oaicore.systemutils import config | |
from h2oaicore.models import CustomModel, LightGBMModel | |
from sklearn.preprocessing import LabelEncoder | |
import numpy as np | |
from scipy.special import softmax, expit | |
from sklearn.calibration import CalibratedClassifierCV | |
_global_modules_needed_by_name = ['ml_insights==0.1.4'] # for SplineCalibration | |
class CalibratedClassifierModel: | |
_regression = False | |
_binary = True | |
_multiclass = True | |
_can_use_gpu = True | |
_mojo = True | |
_description = "Calibrated Classifier Model (LightGBM)" | |
le = LabelEncoder() | |
def get_params(self, deep=True): | |
return super().get_params() | |
@staticmethod | |
def do_acceptance_test(): | |
""" | |
Return whether to do acceptance tests during upload of recipe and during start of Driverless AI. | |
Acceptance tests perform a number of sanity checks on small data, and attempt to provide helpful instructions | |
for how to fix any potential issues. Disable if your recipe requires specific data or won't work on random data. | |
""" | |
return False | |
def fit(self, X, y, sample_weight=None, eval_set=None, sample_weight_eval_set=None, **kwargs): | |
assert len(self.__class__.__bases__) == 3 | |
assert CalibratedClassifierModel in self.__class__.__bases__ | |
self.le.fit(self.labels) | |
y_ = self.le.transform(y) | |
whoami = [x for x in self.__class__.__bases__ if (x != CustomModel and x != CalibratedClassifierModel)][0] | |
kwargs_classification = copy.deepcopy(self.params_base) | |
kwargs_update = dict( | |
num_classes=len(self.le.classes_), | |
labels=list(np.arange(len(self.le.classes_))), | |
) | |
kwargs_classification.update(kwargs_update) | |
for k in kwargs: | |
if k in kwargs_classification: | |
kwargs[k] = kwargs_classification[k] | |
model_classification = whoami(context=self.context, | |
unfitted_pipeline_path=self.unfitted_pipeline_path, | |
transformed_features=self.transformed_features, | |
original_user_cols=self.original_user_cols, | |
date_format_strings=self.date_format_strings, **kwargs_classification) | |
eval_set_classification = None | |
val_y = None | |
if eval_set is not None: | |
eval_set_y = self.le.transform(eval_set[0][1]) | |
val_y = eval_set_y.astype(int) | |
eval_set_classification = [(eval_set[0][0], val_y)] | |
# Stratified split with classes control - making sure all classes present in both train and test | |
unique_cls = np.unique(y_) | |
tr_indx, te_indx = [], [] | |
for c in unique_cls: | |
c_indx = np.argwhere(y_ == c).ravel() | |
indx = np.random.permutation(c_indx) | |
if self.params["calib_method"] in ["sigmoid", "isotonic"]: | |
start_indx = max(1, int(self.params["calib_perc"] * len(c_indx))) | |
else: | |
start_indx = max(3, int(self.params["calib_perc"] * len(c_indx))) | |
tr_indx += list(indx[start_indx:]) | |
te_indx += list(indx[:start_indx]) | |
tr_indx = np.array(tr_indx) | |
te_indx = np.array(te_indx) | |
X_train, y_train = X[tr_indx, :], y_.astype(int)[tr_indx] | |
if self.params["calib_method"] in ["sigmoid", "isotonic"]: | |
X_calibrate, y_calibrate = X[te_indx, :].to_pandas(), y[te_indx].ravel() | |
else: | |
X_calibrate, y_calibrate = X[te_indx, :].to_pandas(), y_.astype(int)[te_indx].ravel() | |
if sample_weight is not None: | |
sample_weight_ = sample_weight[tr_indx] | |
sample_weight_calib = sample_weight[te_indx] | |
else: | |
sample_weight_ = sample_weight | |
sample_weight_calib = sample_weight | |
# mimic rest of fit_base not done: | |
# get self.observed_labels | |
model_classification.check_labels_and_response(y_train, val_y=val_y) | |
model_classification.orig_cols = self.orig_cols | |
model_classification.X_shape = self.X_shape | |
model_classification.fit(X_train, y_train, | |
sample_weight=sample_weight_, eval_set=eval_set_classification, | |
sample_weight_eval_set=sample_weight_eval_set, **kwargs) | |
model_classification.fitted = True | |
model_classification.eval_set_used_during_fit = val_y is not None | |
# calibration | |
model_classification.predict_proba = model_classification.predict_simple | |
model_classification.classes_ = self.le.classes_ | |
if self.params["calib_method"] in ["sigmoid", "isotonic"]: | |
calibrator = CalibratedClassifierCV( | |
base_estimator=model_classification, | |
method="isotonic", | |
cv="prefit") | |
calibrator.fit(X_calibrate, y_calibrate, sample_weight=sample_weight_calib) | |
self.calib_method = calibrator.method | |
if calibrator.method == "sigmoid": | |
self.slope = [] | |
self.intercept = [] | |
for c in calibrator.calibrated_classifiers_[0].calibrators_: | |
self.slope.append(c.a_) | |
self.intercept.append(c.b_) | |
elif calibrator.method == "isotonic": | |
self._necessary_X_ = [] | |
self._necessary_y_ = [] | |
self.X_min_ = [] | |
self.X_max_ = [] | |
for c in calibrator.calibrated_classifiers_[0].calibrators_: | |
self._necessary_X_.append(c._necessary_X_) | |
self._necessary_y_.append(c._necessary_y_) | |
self.X_min_.append(c.X_min_) | |
self.X_max_.append(c.X_max_) | |
else: | |
raise RuntimeError('Unknown calibration method in fit()') | |
elif self.params["calib_method"] in ["spline"]: | |
import ml_insights as mli | |
self.calib_method = "spline" | |
spline = mli.SplineCalib( | |
penalty='l2', | |
solver='liblinear', | |
reg_param_vec='default', | |
cv_spline=3, random_state=4451, | |
knot_sample_size=30, | |
) | |
preds = model_classification.predict_proba(X_calibrate) | |
for c in range(preds.shape[1]): | |
if len(np.unique(preds[:, c])) < 3: # we need at least 3 unique points to form the knots | |
preds[:, c] = preds[:, c] + .0001 * np.random.randn(len(preds[:, c])) | |
spline.fit(preds, y_calibrate, verbose=False) # no weight support so far :( | |
self.calib_logodds_scale = spline.logodds_scale | |
self.calib_logodds_eps = spline.logodds_eps | |
self.calib_knot_vec_tr = [] | |
self.calib_basis_coef_vec = [] | |
if spline.n_classes > 2: | |
for calib_ in spline.binary_splinecalibs: | |
self.calib_knot_vec_tr.append(calib_.knot_vec_tr) | |
self.calib_basis_coef_vec.append(calib_.basis_coef_vec) | |
else: | |
self.calib_knot_vec_tr.append(spline.knot_vec_tr) | |
self.calib_basis_coef_vec.append(spline.basis_coef_vec) | |
else: | |
raise RuntimeError('Unknown calibration method in fit()') | |
# calibration | |
varimp = model_classification.imp_features(columns=X.names) | |
varimp.index = varimp['LInteraction'] | |
varimp = varimp['LGain'] | |
varimp = varimp[:len(X.names)] | |
varimp = varimp.reindex(X.names).values | |
importances = varimp | |
iters = model_classification.best_iterations | |
iters = int(max(1, iters)) | |
self.set_model_properties(model=model_classification.model, | |
features=list(X.names), importances=importances, iterations=iters | |
) | |
@staticmethod | |
def _natural_cubic_spline_basis_expansion(xpts, knots): | |
num_knots = len(knots) | |
num_pts = len(xpts) | |
outmat = np.zeros((num_pts, num_knots)) | |
outmat[:, 0] = np.ones(num_pts) | |
outmat[:, 1] = xpts | |
# last knot calc | |
denom = knots[-1] - knots[-2] | |
numer = (np.maximum(xpts - knots[-2], np.zeros(num_pts)) ** 3 - | |
np.maximum(xpts - knots[-1], np.zeros(num_pts)) ** 3) | |
last_knot = numer / denom | |
# current knots calc | |
for i in range(1, num_knots - 1): | |
denom = knots[-1] - knots[i - 1] | |
numer = (np.maximum(xpts - knots[i - 1], np.zeros(num_pts)) ** 3 - | |
np.maximum(xpts - knots[-1], np.zeros(num_pts)) ** 3) | |
outmat[:, i + 1] = (numer / denom) - last_knot | |
return outmat | |
def predict(self, X, **kwargs): | |
from scipy import interpolate | |
X = dt.Frame(X) | |
model, _, _, _ = self.get_model_properties() | |
preds = model.predict_proba(X) | |
if preds.shape[1] <= 2: | |
if self.calib_method == "sigmoid": | |
scaled_preds = expit(-(self.slope[0] * preds[:, 1] + self.intercept[0])) | |
elif self.calib_method == "isotonic": | |
f_ = interpolate.interp1d( | |
self._necessary_X_[0], | |
self._necessary_y_[0], | |
kind='linear', | |
bounds_error='nan' | |
) | |
scaled_preds = f_(np.clip(preds[:, 1], self.X_min_[0], self.X_max_[0])) | |
elif self.calib_method == "spline": | |
y_in_to_use = preds[:, 1] | |
if self.calib_logodds_scale: | |
y_in_to_use = np.minimum(1 - self.calib_logodds_eps, y_in_to_use) | |
y_in_to_use = np.maximum(self.calib_logodds_eps, y_in_to_use) | |
y_model_tr = np.log(y_in_to_use / (1 - y_in_to_use)) | |
else: | |
y_model_tr = y_in_to_use | |
scaled_preds = self._natural_cubic_spline_basis_expansion(y_model_tr, self.calib_knot_vec_tr[0]) | |
scaled_preds = scaled_preds.dot(self.calib_basis_coef_vec[0].T) | |
scaled_preds = 1 / (1 + np.exp(-scaled_preds)) | |
scaled_preds = scaled_preds.ravel() | |
else: | |
raise RuntimeError('Unknown calibration method in predict()') | |
preds[:, 1] = scaled_preds | |
preds[:, 0] = 1. - scaled_preds | |
else: | |
for c in range(preds.shape[1]): | |
if self.calib_method == "sigmoid": | |
scaled_preds = expit(-(self.slope[c] * preds[:, c] + self.intercept[c])) | |
elif self.calib_method == "isotonic": | |
f_ = interpolate.interp1d( | |
self._necessary_X_[c], | |
self._necessary_y_[c], | |
kind='linear', | |
bounds_error='nan' | |
) | |
scaled_preds = f_(np.clip(preds[:, c], self.X_min_[c], self.X_max_[c])) | |
elif self.calib_method == "spline": | |
y_in_to_use = preds[:, c] | |
if self.calib_logodds_scale: | |
y_in_to_use = np.minimum(1 - self.calib_logodds_eps, y_in_to_use) | |
y_in_to_use = np.maximum(self.calib_logodds_eps, y_in_to_use) | |
y_model_tr = np.log(y_in_to_use / (1 - y_in_to_use)) | |
else: | |
y_model_tr = y_in_to_use | |
scaled_preds = self._natural_cubic_spline_basis_expansion(y_model_tr, self.calib_knot_vec_tr[c]) | |
scaled_preds = scaled_preds.dot(self.calib_basis_coef_vec[c].T) | |
scaled_preds = 1 / (1 + np.exp(-scaled_preds)) | |
scaled_preds = scaled_preds.ravel() | |
else: | |
raise RuntimeError('Unknown calibration method in predict()') | |
preds[:, c] = scaled_preds | |
preds = preds / np.sum(preds, 1).reshape(-1, 1) | |
return preds | |
from h2oaicore.mojo import MojoWriter, MojoFrame | |
class CalibratedClassifierLGBMModel(CalibratedClassifierModel, LightGBMModel, CustomModel): | |
_mojo = True | |
@property | |
def has_pred_contribs(self): | |
return False | |
@property | |
def has_output_margin(self): | |
return False | |
def set_default_params(self, **kwargs): | |
super().set_default_params(**kwargs) | |
self.params["calib_perc"] = .1 | |
self.params["calib_method"] = "isotonic" | |
def mutate_params(self, **kwargs): | |
super().mutate_params(**kwargs) | |
self.params["calib_perc"] = np.random.choice([.05, .1, .15, .2]) | |
self.params["calib_method"] = np.random.choice(["isotonic"]) | |
def write_to_mojo(self, mojo: MojoWriter, iframe: MojoFrame, group_uuid=None, group_name=None): | |
return self.to_mojo(mojo=mojo, iframe=iframe, group_uuid=group_uuid, group_name=group_name) | |
def to_mojo(self, mojo: MojoWriter, iframe: MojoFrame, group_uuid=None, group_name=None): | |
from h2oaicore.mojo import MojoColumn | |
from h2oaicore.mojo_transformers import (MjT_ConstBinaryOp, MjT_Sigmoid, MjT_AsType, | |
MjT_Agg, MjT_BinaryOp, MjT_IntervalMap, MjT_Clip, MjT_Log) | |
import uuid | |
group_uuid = str(uuid.uuid4()) | |
group_name = self.__class__.__name__ | |
_iframe = super().write_to_mojo(mojo=mojo, iframe=iframe, group_uuid=group_uuid, group_name=group_name) | |
res = MojoFrame() | |
def _get_new_pair(left, right): | |
pair = MojoFrame() | |
pair.cbind(left) | |
pair.cbind(right) | |
return pair | |
for c in range(len(_iframe)): | |
icol = _iframe.get_column(c) | |
def _get_new_col(name, type_=None): | |
ocol_ = MojoColumn(name=name, dtype=icol.type if type_ is None else type_) | |
oframe_ = MojoFrame(columns=[ocol_]) | |
return oframe_ | |
if self.calib_method == "sigmoid": | |
oframe1 = _get_new_col(icol.name + "_slope") | |
oframe2 = _get_new_col(icol.name + "_intercept") | |
oframe3 = _get_new_col(icol.name + "_negative") | |
oframe4 = _get_new_col(icol.name + "_calibrated", type_="float64") | |
oframe5 = _get_new_col(icol.name + "_astype") | |
mojo += MjT_ConstBinaryOp(iframe=_iframe[c], oframe=oframe1, op="multiply", const=self.slope[c], | |
pos="right", | |
group_uuid=group_uuid, group_name=group_name) | |
mojo += MjT_ConstBinaryOp(iframe=oframe1, oframe=oframe2, op="add", const=self.intercept[c], | |
pos="right", | |
group_uuid=group_uuid, group_name=group_name) | |
mojo += MjT_ConstBinaryOp(iframe=oframe2, oframe=oframe3, op="multiply", const=-1., pos="right", | |
group_uuid=group_uuid, group_name=group_name) | |
mojo += MjT_Sigmoid(iframe=oframe3, oframe=oframe4, group_uuid=group_uuid, group_name=group_name) | |
mojo += MjT_AsType(iframe=oframe4, oframe=oframe5, type="float32", group_uuid=group_uuid, | |
group_name=group_name) | |
res.cbind(oframe5) | |
elif self.calib_method == "isotonic": | |
X = list(self._necessary_X_[c]) | |
y = list(self._necessary_y_[c]) | |
if len(y) == 1: | |
oframe1 = _get_new_col(icol.name + "_zeroing") | |
new_y = _get_new_col(icol.name + "_addingConst") | |
mojo += MjT_ConstBinaryOp(iframe=_iframe[c], oframe=oframe1, op="multiply", const=0, pos="right", | |
group_uuid=group_uuid, group_name=group_name) | |
mojo += MjT_ConstBinaryOp(iframe=oframe1, oframe=new_y, op="add", const=y[0], pos="right", | |
group_uuid=group_uuid, group_name=group_name) | |
else: | |
max_X = X + [self._necessary_X_[c][-1], None] | |
min_X = [self._necessary_X_[c][0]] + X + [None] | |
max_y = y + [self._necessary_y_[c][-1], None] | |
min_y = [self._necessary_y_[c][0]] + y + [None] | |
ocol1 = MojoColumn(name=icol.name + "_maxX", dtype=icol.type) | |
ocol2 = MojoColumn(name=icol.name + "_minX", dtype=icol.type) | |
ocol3 = MojoColumn(name=icol.name + "_maxY", dtype=icol.type) | |
ocol4 = MojoColumn(name=icol.name + "_minY", dtype=icol.type) | |
XY = MojoFrame(columns=[ocol1, ocol2, ocol3, ocol4]) | |
# clipping | |
inp_clipped = _get_new_col(icol.name + "_clipped") | |
mojo += MjT_Clip(iframe=_iframe[c], oframe=inp_clipped, | |
min=self.X_min_[c], max=self.X_max_[c], | |
group_uuid=group_uuid, group_name=group_name | |
) | |
# search for coordinates | |
mojo += MjT_IntervalMap( | |
iframe=inp_clipped, oframe=XY, | |
breakpoints=X, | |
values=[[x1, x0, y1, y0] for x1, x0, y1, y0 in zip(max_X, min_X, max_y, min_y)], | |
group_uuid=group_uuid, group_name=group_name | |
) | |
# interpolation | |
curr_diff = _get_new_col(icol.name + "_currDiff") | |
pair = _get_new_pair(inp_clipped, XY[1]) | |
mojo += MjT_BinaryOp(iframe=pair, oframe=curr_diff, op="subtract", group_uuid=group_uuid, | |
group_name=group_name) | |
y_diff = _get_new_col(icol.name + "_yDiff") | |
pair = _get_new_pair(XY[2], XY[3]) | |
mojo += MjT_BinaryOp(iframe=pair, oframe=y_diff, op="subtract", group_uuid=group_uuid, | |
group_name=group_name) | |
X_diff = _get_new_col(icol.name + "_XDiff") | |
pair = _get_new_pair(XY[0], XY[1]) | |
mojo += MjT_BinaryOp(iframe=pair, oframe=X_diff, op="subtract", group_uuid=group_uuid, | |
group_name=group_name) | |
xy_ratio = _get_new_col(icol.name + "_xyRatio") | |
pair = _get_new_pair(y_diff, X_diff) | |
mojo += MjT_BinaryOp(iframe=pair, oframe=xy_ratio, op="divide", eps=1e-10, group_uuid=group_uuid, | |
group_name=group_name) | |
scaled_cur_diff = _get_new_col(icol.name + "_scaledCurDiff") | |
pair = _get_new_pair(xy_ratio, curr_diff) | |
mojo += MjT_BinaryOp(iframe=pair, oframe=scaled_cur_diff, op="multiply", group_uuid=group_uuid, | |
group_name=group_name) | |
new_y = _get_new_col(icol.name + "_newY") | |
pair = _get_new_pair(XY[3], scaled_cur_diff) | |
mojo += MjT_BinaryOp(iframe=pair, oframe=new_y, op="add", group_uuid=group_uuid, | |
group_name=group_name) | |
res.cbind(new_y) | |
elif self.calib_method == "spline": | |
if self.calib_logodds_scale: | |
oframe1 = _get_new_col(icol.name + "_clipped") | |
mojo += MjT_Clip(iframe=_iframe[c], oframe=oframe1, | |
min=self.calib_logodds_eps, max=1 - self.calib_logodds_eps, | |
group_uuid=group_uuid, group_name=group_name | |
) | |
oframe2 = _get_new_col(icol.name + "_inverse") | |
mojo += MjT_ConstBinaryOp(iframe=oframe1, oframe=oframe2, op="subtract", const=1., pos="left", | |
group_uuid=group_uuid, group_name=group_name) | |
oframe3 = _get_new_col(icol.name + "_ratio") | |
pair = _get_new_pair(oframe1, oframe2) | |
mojo += MjT_BinaryOp(iframe=pair, oframe=oframe3, op="divide", eps=1e-10, group_uuid=group_uuid, | |
group_name=group_name) | |
oframe4 = _get_new_col(icol.name + "_log") | |
mojo += MjT_Log(iframe=oframe3, oframe=oframe4, group_uuid=group_uuid, group_name=group_name) | |
inp = oframe4 | |
else: | |
inp = _iframe[c] | |
knots = self.calib_knot_vec_tr[c] | |
num_knots = len(knots) | |
# zero col | |
zeros = _get_new_col(icol.name + "_zeros") | |
mojo += MjT_ConstBinaryOp(iframe=inp, oframe=zeros, op="multiply", const=0., pos="right", | |
group_uuid=group_uuid, group_name=group_name) | |
# ones col | |
ones = _get_new_col(icol.name + f"_ones") | |
mojo += MjT_ConstBinaryOp(iframe=zeros, oframe=ones, op="add", const=1., pos="right", | |
group_uuid=group_uuid, group_name=group_name) | |
# last knot calc | |
denom = knots[-1] - knots[-2] | |
def _to_mojo_helper(mojo, inp, val, zeros, suffix=""): | |
oframe5 = _get_new_col(icol.name + f"_{suffix}diff") | |
mojo += MjT_ConstBinaryOp(iframe=inp, oframe=oframe5, op="subtract", const=val, pos="right", | |
group_uuid=group_uuid, group_name=group_name) | |
oframe6 = _get_new_col(icol.name + f"_{suffix}max") | |
pair = _get_new_pair(oframe5, zeros) | |
mojo += MjT_Agg(iframe=pair, oframe=oframe6, op="max", group_uuid=group_uuid, group_name=group_name) | |
oframe7 = _get_new_col(icol.name + f"_{suffix}pwr") | |
oframe8 = _get_new_col(icol.name + f"_{suffix}pwr2") | |
oframe9 = _get_new_col(icol.name + f"_{suffix}pwr3") | |
mojo += MjT_ConstBinaryOp(iframe=oframe6, oframe=oframe7, op="multiply", const=1., pos="right", | |
group_uuid=group_uuid, group_name=group_name) | |
pair = _get_new_pair(oframe6, oframe7) | |
mojo += MjT_BinaryOp(iframe=pair, oframe=oframe8, op="multiply", group_uuid=group_uuid, | |
group_name=group_name) | |
pair = _get_new_pair(oframe8, oframe7) | |
mojo += MjT_BinaryOp(iframe=pair, oframe=oframe9, op="multiply", group_uuid=group_uuid, | |
group_name=group_name) | |
return oframe9 | |
last_knot2 = _to_mojo_helper(mojo=mojo, inp=inp, val=knots[-2], zeros=zeros, suffix="last2") | |
last_knot1 = _to_mojo_helper(mojo=mojo, inp=inp, val=knots[-1], zeros=zeros, suffix="last1") | |
oframe5 = _get_new_col(icol.name + f"_last21diff") | |
pair = _get_new_pair(last_knot2, last_knot1) | |
mojo += MjT_BinaryOp(iframe=pair, oframe=oframe5, op="subtract", group_uuid=group_uuid, | |
group_name=group_name) | |
last_knot = _get_new_col(icol.name + f"_lastKnot") | |
mojo += MjT_ConstBinaryOp(iframe=oframe5, oframe=last_knot, op="divide", const=denom, pos="right", | |
group_uuid=group_uuid, group_name=group_name) | |
# all knots calc | |
results = [] | |
for i in range(1, num_knots - 1): | |
denom = knots[-1] - knots[i - 1] | |
knot1 = _to_mojo_helper(mojo=mojo, inp=inp, val=knots[i - 1], zeros=zeros, suffix=f"knot{i}m1") | |
knot2 = _to_mojo_helper(mojo=mojo, inp=inp, val=knots[-1], zeros=zeros, suffix=f"knotm1f{i}") | |
oframe_ = _get_new_col(icol.name + f"_knots_{i}_diff") | |
pair = _get_new_pair(knot1, knot2) | |
mojo += MjT_BinaryOp(iframe=pair, oframe=oframe_, op="subtract", group_uuid=group_uuid, | |
group_name=group_name) | |
div_res = _get_new_col(icol.name + f"_dv_{i}") | |
mojo += MjT_ConstBinaryOp(iframe=oframe_, oframe=div_res, op="divide", const=denom, pos="right", | |
group_uuid=group_uuid, group_name=group_name) | |
diff_res = _get_new_col(icol.name + f"_diff_{i}") | |
pair = _get_new_pair(div_res, last_knot) | |
mojo += MjT_BinaryOp(iframe=pair, oframe=diff_res, op="subtract", group_uuid=group_uuid, | |
group_name=group_name) | |
results.append(diff_res) | |
results = [ones, inp] + results | |
assert len(results) == len(self.calib_basis_coef_vec[c].ravel()), "Something went wrong :(" | |
# linear model | |
results2 = MojoFrame() | |
for i, (frame_, const_) in enumerate(zip(results, self.calib_basis_coef_vec[c].ravel())): | |
res_fr = _get_new_col(icol.name + f"_logits_{i}") | |
mojo += MjT_ConstBinaryOp(iframe=frame_, oframe=res_fr, op="multiply", const=const_, pos="right", | |
group_uuid=group_uuid, group_name=group_name) | |
results2.cbind(res_fr) | |
ocol_logits_sum = _get_new_col(icol.name + f"_logits_sum") | |
mojo += MjT_Agg(iframe=results2, oframe=ocol_logits_sum, op="sum", group_uuid=group_uuid, | |
group_name=group_name) | |
# sigmoid | |
ocol_spline_sigmoid = _get_new_col(icol.name + f"_spline_sigmoid", type_="float64") | |
mojo += MjT_Sigmoid(iframe=ocol_logits_sum, oframe=ocol_spline_sigmoid, group_uuid=group_uuid, | |
group_name=group_name) | |
ocol_spline_sigmoid_astype = _get_new_col(icol.name + f"_spline_sigmoid_astype") | |
mojo += MjT_AsType( | |
iframe=ocol_spline_sigmoid, oframe=ocol_spline_sigmoid_astype, | |
type="float32", | |
group_uuid=group_uuid, group_name=group_name | |
) | |
res.cbind(ocol_spline_sigmoid_astype) | |
else: | |
raise RuntimeError('Unknown calibration method in to_mojo()') | |
# normalization | |
if len(res) > 1: | |
res2 = MojoFrame() | |
oframe_sum = _get_new_col(self.__class__.__name__ + "_sum") | |
mojo += MjT_Agg(iframe=res, oframe=oframe_sum, op="sum", group_uuid=group_uuid, group_name=group_name) | |
for c in range(len(res)): | |
icol = res.get_column(c) | |
oframe1 = _get_new_col(icol.name + "_normalized") | |
pair = _get_new_pair(res[c], oframe_sum) | |
mojo += MjT_BinaryOp(iframe=pair, oframe=oframe1, op="divide", group_uuid=group_uuid, | |
group_name=group_name) | |
res2.cbind(oframe1) | |
res = res2 | |
return res |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment