Skip to content

Instantly share code, notes, and snippets.

@hsteinshiromoto
Last active July 20, 2021 23:30
Show Gist options
  • Save hsteinshiromoto/4ba05756a9ad887be70288984f013d8c to your computer and use it in GitHub Desktop.
Save hsteinshiromoto/4ba05756a9ad887be70288984f013d8c to your computer and use it in GitHub Desktop.
bayesiansearchcv.py
import sys
import traceback
from collections.abc import Iterable
from typing import Union
from hyperopt import STATUS_FAIL, STATUS_OK, Trials, fmin, hp, tpe
import pandas as pd
import numpy as np
import scipy as sp
from sklearn.model_selection import KFold, StratifiedKFold
from sklearn.metrics import get_scorer
class BayesSearchCV:
def __init__(self, estimator, param_distributions: dict, scoring: dict
,n_iter: int=10, weights_matrix: np.ndarray=None, cv: Union[int, Iterable]=5, random_state: int=None
,algo=tpe.suggest, trials: Trials=Trials()) -> None:
"""Use Bayesian optimisation to search for hyperpameters and selects best estimator based on validation sets
Args:
estimator: Estimator object
param_distributions (dict): Search space containing hyperparameters
scoring (dict): {metric: opt_value} Dict of performance metrics to measure the estimator performance and their corresponding optimal values.
Select one from sklearn.metrics.SCORERS.keys()
n_iter (int, optional): Max number of iterations. Defaults to 10.
weights_matrix (np.ndarray, optional): Symmetric positive definite matrix used to calculate the quadratic loss function
cv (int or Iterable, optional): int, cross-validation generator or an iterable. Defaults to 5.
random_state (int, optional): Pseudo random number generator state used for random uniform sampling. Defaults to None.
algo (optional): Algorithm to for distribution search. Defaults to tpe.suggest.
trials (Trials, optional): [description]. Defaults to Trials().
"""
self.estimator = estimator
self.param_distributions = param_distributions
self.n_iter = n_iter
self.random_state = random_state
self.weights_matrix = weights_matrix or np.identity(len(scoring))
self.cv = cv
self.algo = algo
self.trials = trials
self.scoring = scoring
def fit(self, X: pd.DataFrame, y=None) -> None:
"""Find optimal hyperparameters and fit estimator
Args:
X (pd.DataFrame): Predictors
y (pd.DataFrame): Target
"""
self.cv_results_ = pd.DataFrame()
self.min_loss = np.inf
if not self._check_spd(self.weights_matrix):
msg = f"Expected weights matrix to be symmetric positive definite."
raise ValueError(msg)
for iteration, (train_index, val_index) in enumerate(self._get_splits(X, y)):
X_train, X_val = X[train_index], X[val_index]
y_train, y_val = y[train_index], y[val_index]
objective = lambda space: self._cost(X=X_train, y=y_train, hyperparameters=space)
try:
hyperparameters = fmin(fn=objective, space=self.param_distributions
,algo=self.algo, max_evals=self.n_iter
,trials=self.trials)
except KeyError:
exc_info = sys.exc_info()
traceback.print_exception(*exc_info)
return {'status': STATUS_FAIL,
'exception': str(sys.exc_info())}
estimator = self._instantiate_estimator(X_train, y_train, hyperarameters=hyperparameters)
loss_df, current_loss = self._cost(X_val, y_val, hyperparameters, estimator=estimator, return_loss_df=True)
loss_df["cv_iteration"] = iteration
if current_loss < self.min_loss:
self.min_loss = current_loss
self.best_estimator_ = estimator
self.best_hyperparameters_ = hyperparameters
self.cv_results_ = pd.concat([self.cv_results_ , loss_df.copy()])
self.cv_results_.rename(columns={col: f"{col}_loss" for col in self.cv_results_.columns if col != "loss"}, inplace=True)
self.cv_results_.sort_values(by="loss", inplace=True)
self.cv_results_.reset_index(inplace=True, drop=True)
def _get_splits(self, X: pd.DataFrame, y=None):
"""Instantiate and/or get training and validation datasets
Args:
X (pd.DataFrame): Predictor
y (pd.DataFrame): Target
Yields:
[type]: Train and test indices
Raises:
NotImplementedError: Only KFold and StratifiedKFold are implemented
"""
if isinstance(self.cv, int):
self.cv = KFold(n_splits=self.cv, random_state=self.random_state)
elif isinstance(self.cv, StratifiedKFold):
pass
else:
msg = f"Cross validation not yet implemented for type {type(self.cv)}"
NotImplementedError(msg)
for train_index, test_index in self.cv.split(X, y):
yield train_index, test_index
def _cost(self, X: pd.DataFrame, y: pd.DataFrame, hyperparameters: dict
,estimator=None, return_loss_df: bool=False) -> dict:
"""Evaluates the cost function for the trained estimator using a quadratic loss function
Args:
X (pd.DataFrame): Predictor
y (pd.DataFrame): Target
hyperarameters (dict): Estimator hyperparameters
return_loss_df (bool): Returns fit loss data frame
Returns:
(dict)
"""
loss_dict = {metric_name: [] for metric_name in self.scoring}
if not estimator:
estimator = self._instantiate_estimator(X, y, hyperparameters)
for p_metric, opt_value in self.scoring.items():
scorer = get_scorer(p_metric)
loss_dict[p_metric].append((opt_value - scorer(estimator, X, y))**2)
loss_df = pd.DataFrame.from_dict(loss_dict)
loss = loss_df.values.dot(self.weights_matrix.dot(loss_df.T.values))
loss_df["loss"] = loss
if return_loss_df:
return loss_df, loss
return {'loss': np.sqrt(loss), 'status': STATUS_OK}
def _instantiate_estimator(self, X: pd.DataFrame, y: pd.DataFrame
,hyperarameters: dict):
"""Instantiate estimator with selected hyperparameters
Args:
X (pd.DataFrame): Predictors
y (pd.DataFrame): Target
hyperarameters (dict): Estimator hyperparameters
Returns:
[type]: Estimator
"""
estimator_cls = self.estimator.__class__
estimator = estimator_cls(**hyperarameters)
estimator.fit(X, y)
return estimator
@staticmethod
def _check_spd(m: np.ndarray, rtol: float=1e-6, atol: float=1e-9) -> bool:
"""Checks if a matrix is symmetric positive definite
Args:
m (np.ndarray): Matrix
rtol (float): Relative tolerance to verify if m is symmetric
atol (float): Absolute tolerance to verify if m is symmetric
Returns:
(bool): True if matrix is symmetric positive definite
"""
try:
# Check if matrix is positive definite
np.linalg.cholesky(m)
except np.linalg.linalg.LinAlgError as err:
if 'Matrix is not positive definite' in err.message:
return False
else:
raise
else:
# Now that m is positive definite, check if it is symmetric
return np.allclose(m, m.T, rtol=rtol, atol=atol)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment