Last active
March 11, 2019 03:57
-
-
Save deaktator/644086d0093b94fab567388f8008fe19 to your computer and use it in GitHub Desktop.
Shows sklearn cross validation w/ unweighted metrics return different optimal param values than when using sample weights in metrics.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ============================================================================ | |
# R.M. Deak wt_cv_eval_is_diff_unwt_cv.py | |
# | |
# Runs scikit-learn's cross validation with GridSearchCV and shows that | |
# different optimal parameter values may be returned by GridSearchCV than | |
# when using cross validation with sample_weights passed to the scoring | |
# function. | |
# ============================================================================ | |
from typing import NamedTuple | |
import numpy as np | |
import pytest | |
import sklearn | |
from hypothesis import assume, given, settings, PrintSettings | |
from hypothesis.strategies import (composite, integers, floats, lists, sampled_from) | |
from sklearn.datasets import make_classification | |
from sklearn.linear_model import LogisticRegression | |
from sklearn.model_selection import GridSearchCV, StratifiedKFold | |
Data = NamedTuple('Data', [ | |
('X', np.ndarray), | |
('y', np.ndarray), | |
('sample_weight', np.ndarray), | |
('search_cv_random_state', int), | |
('cv_split_random_state', int), | |
('clf_random_state', int) | |
]) | |
imp_wt_metrics = [ | |
"accuracy", | |
"average_precision", | |
"balanced_accuracy", | |
"brier_score_loss", | |
"explained_variance", | |
"f1", | |
"f1_macro", | |
"f1_micro", | |
"f1_weighted", | |
"neg_mean_absolute_error", | |
"neg_mean_squared_error", | |
"neg_mean_squared_log_error", | |
"precision", | |
"precision_macro", | |
"precision_micro", | |
"precision_weighted", | |
"r2", | |
"recall", | |
"recall_macro", | |
"recall_micro", | |
"recall_weighted" | |
] | |
def dataset(draw, n): | |
rs_mc = draw(integers(min_value=1)) | |
w_mc = draw(floats(min_value=0.2, max_value=0.8, allow_nan=False)) | |
flip_y_mc = draw(floats(min_value=0.05, max_value=0.2, allow_nan=False)) | |
return make_classification( | |
n_samples=n, | |
n_classes=2, | |
n_features=2, | |
n_clusters_per_class=1, | |
n_informative=1, | |
n_redundant=0, | |
n_repeated=0, | |
weights=[w_mc, 1 - w_mc], | |
flip_y=flip_y_mc, | |
shuffle=True, | |
random_state=rs_mc) | |
def sample_weights(draw, n, min_value, max_value): | |
return np.array( | |
draw( | |
lists(floats(min_value=min_value, max_value=max_value, allow_nan=False), | |
min_size=n, max_size=n))) | |
@composite | |
def _data(draw) -> Data: | |
n = draw(integers(min_value=10, max_value=50)) | |
X, y = dataset(draw, n) | |
sample_weight = sample_weights(draw, n, min_value=1.0, max_value=1.0e3) | |
seeds = [draw(integers(min_value=1)) for _ in range(3)] | |
return Data(X, y, sample_weight, seeds[0], seeds[1], seeds[2]) | |
def is_close(a, b): | |
# Used to determine if two cross validation scores are about the same. | |
return np.isclose(a, b, rtol=1.e-4, atol=1.e-8) | |
def best_results(weighted_cv_results): | |
# Given weighted cross validation results, get the best score and the set | |
# of parameter values that can achieve the best weighted cross validation | |
# score. | |
def param_val(x): return x[0] | |
def score(x): return x[1] | |
optimum = max | |
best = optimum(weighted_cv_results, key=score) | |
best_score = score(best) | |
best_params = list(map(param_val, filter(lambda v: is_close(score(v), best_score), weighted_cv_results))) | |
return best_score, best_params | |
def get_cv_best_params(cv_results, best_score): | |
# Given the cross validation results from sklearn and the best score returned | |
# during cross validation, find all parameter values that have an "equally good" | |
# score. | |
def score(i): return cv_results['mean_test_score'][i] | |
ind = range(len(cv_results['mean_test_score'])) | |
best_inds = [i for i in ind if is_close(score(i), best_score)] | |
# Assume just one parameter name. | |
param_name = list(cv_results['params'][0].keys())[0] | |
best_param_values = cv_results[f"param_{param_name}"][best_inds] | |
return best_param_values | |
def weighted_cv(clf, scorer, param_name, param_values, X, y, sample_weight, cv_splitter): | |
# This is similar to the cross validation in sklearn but uses sample weights in | |
# both the scoring function and weights the mean across folds. | |
cv_results = [] | |
for v in param_values: | |
n_splits = cv_splitter.get_n_splits() | |
train_ind, test_ind = cv_splitter.split(X, y) | |
m = [] | |
for split_i in range(n_splits): | |
train = train_ind[split_i] | |
test = test_ind[split_i] | |
c = sklearn.clone(clf) | |
c.set_params(**{param_name: v}) | |
c.fit(X[train], y[train], sample_weight=sample_weight[train]) | |
score = scorer(c, X[test], y[test], sample_weight=sample_weight[test]) | |
w_sum_test = np.sum(sample_weight[test]) | |
m.append((score, w_sum_test)) | |
scores, sum_wts = list(zip(*m)) | |
weighted_avg_score = np.dot(scores, sum_wts) / np.sum(sum_wts) | |
cv_results.append((v, weighted_avg_score, np.array(scores), np.array(sum_wts))) | |
best_score, best_params = best_results(cv_results) | |
results = { | |
"cv_results": cv_results, | |
"best_score": best_score, | |
"best_params": best_params | |
} | |
return results | |
def report_error(unweighted, weighted): | |
u = set(unweighted) | |
w = set(weighted) | |
intersection = u.intersection(w) | |
u_outer = sorted(list(u.difference(intersection))) | |
w_outer = sorted(list(w.difference(intersection))) | |
return ( | |
"best parameter values don't match." | |
f"\n\tunweighted best param values: {u}." | |
f"\n\tweighted best param values: {w}." | |
f"\n\tunweighted not in weighted {u_outer}." | |
f"\n\tweighted not in unweighted {w_outer}." | |
) | |
# The test is expected to fail because there are differences when taking | |
# the sample_weights into account in the metric calculations. If we find | |
# one the test succeeds; otherwise the test fails. | |
@pytest.mark.xfail(raises=AssertionError) | |
@given(metric_name=sampled_from(imp_wt_metrics), data=_data()) | |
@settings(max_examples=1000, deadline=None, print_blob=PrintSettings.ALWAYS) | |
def test_wt_cv_eval_is_different_from_unweighted(metric_name, data): | |
# For certain metrics, we need a positive and negative example per fold. | |
# So we can ensure this this property with the following ``assume``. | |
n = len(data.y) | |
assume(np.sum(data.y) not in [0, 1, n - 1, n]) | |
scorer = sklearn.metrics.scorer.SCORERS[metric_name] | |
clf = LogisticRegression(solver="lbfgs", penalty="l2", | |
random_state=data.clf_random_state) | |
cv_splitter = StratifiedKFold(n_splits=2, shuffle=True, | |
random_state=data.cv_split_random_state) | |
# 'C' argument is the L2 penalty | |
l2_penalties = [0.0001, 0.001, 0.01, 0.1, 1, 10, 100, 1000] | |
iid = True # To suppress deprecation warning. Works with 'warn'. | |
scv = GridSearchCV(sklearn.clone(clf), {'C': l2_penalties}, | |
scoring=metric_name, cv=cv_splitter, iid=iid) | |
scv.fit(data.X, data.y, sample_weight=data.sample_weight) | |
# Find all parameter values with the same score as the best score. | |
cv_best_params = get_cv_best_params(scv.cv_results_, scv.best_score_) | |
wt_cv_results = weighted_cv( | |
clf, scorer, "C", l2_penalties, | |
data.X, data.y, data.sample_weight, cv_splitter) | |
unweighted_best = set(cv_best_params) | |
weighted_best = set(wt_cv_results["best_params"]) | |
assert unweighted_best == weighted_best, report_error(unweighted_best, weighted_best) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment