Skip to content

Instantly share code, notes, and snippets.

@toshihikoyanase
Last active February 5, 2021 13:09
Show Gist options
  • Save toshihikoyanase/d2bbf883f4f4acde1793ccc7effee4b7 to your computer and use it in GitHub Desktop.
Save toshihikoyanase/d2bbf883f4f4acde1793ccc7effee4b7 to your computer and use it in GitHub Desktop.
A simple objective function to check constant liar.
import optuna
from optuna import distributions
from optuna.samplers import TPESampler
from optuna.study import StudyDirection
from optuna.trial import TrialState
import numpy as np
def default_gamma(x):
# type: (int) -> int
return min(int(np.ceil(0.1 * x)), 25)
def default_weights(x):
# type: (int) -> np.ndarray
if x == 0:
return np.asarray([])
elif x < 25:
return np.ones(x)
else:
ramp = np.linspace(1.0 / x, 1.0, num=x - 25)
flat = np.ones(25)
return np.concatenate([ramp, flat], axis=0)
class ClTPESampler(TPESampler):
def __init__(
self,
consider_prior=True, # type: bool
prior_weight=1.0, # type: float
consider_magic_clip=True, # type: bool
consider_endpoints=False, # type: bool
n_startup_trials=10, # type: int
n_ei_candidates=24, # type: int
gamma=default_gamma, # type: Callable[[int], int]
weights=default_weights, # type: Callable[[int], np.ndarray]
seed=None, # type: Optional[int]
*,
constant_liar_l # type: int
):
# type: (...) -> None
self._constant_liar_l = constant_liar_l
super(ClTPESampler, self).__init__(consider_prior, prior_weight, consider_magic_clip, consider_endpoints, n_startup_trials, n_ei_candidates, gamma, weights, seed)
def sample_independent(self, study, trial, param_name, param_distribution):
# type: (Study, FrozenTrial, str, BaseDistribution) -> Any
values, scores = _get_observation_pairs(study, param_name, trial, self._constant_liar_l)
n = len(values)
if n < self._n_startup_trials:
return self._random_sampler.sample_independent(
study, trial, param_name, param_distribution
)
below_param_values, above_param_values = self._split_observation_pairs(values, scores)
if isinstance(param_distribution, distributions.UniformDistribution):
return self._sample_uniform(param_distribution, below_param_values, above_param_values)
elif isinstance(param_distribution, distributions.LogUniformDistribution):
return self._sample_loguniform(
param_distribution, below_param_values, above_param_values
)
elif isinstance(param_distribution, distributions.DiscreteUniformDistribution):
return self._sample_discrete_uniform(
param_distribution, below_param_values, above_param_values
)
elif isinstance(param_distribution, distributions.IntUniformDistribution):
return self._sample_int(param_distribution, below_param_values, above_param_values)
elif isinstance(param_distribution, distributions.IntLogUniformDistribution):
return self._sample_int_loguniform(
param_distribution, below_param_values, above_param_values
)
elif isinstance(param_distribution, distributions.CategoricalDistribution):
index = self._sample_categorical_index(
param_distribution, below_param_values, above_param_values
)
return param_distribution.choices[index]
else:
distribution_list = [
distributions.UniformDistribution.__name__,
distributions.LogUniformDistribution.__name__,
distributions.DiscreteUniformDistribution.__name__,
distributions.IntUniformDistribution.__name__,
distributions.IntLogUniformDistribution.__name__,
distributions.CategoricalDistribution.__name__,
]
raise NotImplementedError(
"The distribution {} is not implemented. "
"The parameter distribution should be one of the {}".format(
param_distribution, distribution_list
)
)
def _get_observation_pairs(study, param_name, trial, constant_liar_l):
# type: (Study, str, FrozenTrial) -> Tuple[List[Optional[float]], List[Tuple[float, float]]]
sign = 1
if study.direction == StudyDirection.MAXIMIZE:
sign = -1
values = []
scores = []
for trial in study._storage.get_all_trials(study._study_id, deepcopy=False):
if trial.state is TrialState.COMPLETE and trial.value is not None:
score = (-float("inf"), sign * trial.value)
elif trial.state is TrialState.PRUNED:
if len(trial.intermediate_values) > 0:
step, intermediate_value = max(trial.intermediate_values.items())
if math.isnan(intermediate_value):
score = (-step, float("inf"))
else:
score = (-step, sign * intermediate_value)
else:
score = (float("inf"), 0.0)
elif trial.state is TrialState.RUNNING:
score = (-float("inf"), sign * constant_liar_l)
else:
continue
param_value = None # type: Optional[float]
if param_name in trial.params:
distribution = trial.distributions[param_name]
param_value = distribution.to_internal_repr(trial.params[param_name])
values.append(param_value)
scores.append(score)
return values, scores
import time
def objective(trial):
x = trial.suggest_float("x", 0, 10)
y = trial.suggest_float("y", 0, 10)
time.sleep(1)
return (x - 5) ** 2 + (y - 3) ** 2
study.optimize(objective, n_trials=200, n_jobs=20)
@toshihikoyanase
Copy link
Author

toshihikoyanase commented Jul 2, 2020

Without the Constant Liar algorithm

Some groups of points that look like bands (e.g., trial 60-80, 80-100, and 100-120) can be seen in the optimization history.

スクリーンショット 2020-07-02 20 17 49

With the Constant Liar algorithm

We cannot see any obvious bands in the plot.

スクリーンショット 2020-07-02 20 19 19

@sachinruk
Copy link

Is the above method based off of this paper? https://arxiv.org/pdf/1602.05149.pdf

@toshihikoyanase
Copy link
Author

Thank you for your question.
I implemented the Constant Liar heuristics based on this article as an example for Optuna's issue, and it seems to be the same as the heuristics mentioned in the paper you shown.

The paper refers to Ginsbourger et al. (2007), and it shows the algorithm as follows:

image

It looks identical to the algorithm in the article.

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment