Created
March 24, 2019 23:01
-
-
Save youheiakimoto/0630db3d461f855fb09d799d5dc48dd8 to your computer and use it in GitHub Desktop.
Adaptive Switching Strategy proposed by Akimoto et al. in GECCO (2019)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
import time | |
import numpy as np | |
from scipy.stats import kendalltau # tau-b in scipy v1.1.0 | |
class AdaptiveSimulationSwitcher: | |
"""Adaptive Switching Strategy | |
In simulation-basedf optimization we often have access to multiple simulators or surrogate models that approximate a computationally expensive or intractable objective function with different trade-offs between the fidelity and computational time. Such a setting is called multi-fidelity optimization. | |
This is a python code of the strategy proposed in the following reference to adaptively select which simulator to use during optimization of comparison-based evolutionary algorithms. Our adaptive switching strategy works as a wrapper of multiple simulators: optimization algorithms optimize the wrapper function and the adaptive switching strategy selects a simulator inside the wrapper. | |
Reference | |
--------- | |
Y. Akimoto, T. Shimizu, T. Yamaguchi | |
Adaptive Objective Selection for Multi-Fidelity Optimization | |
In Proceedings of Genetic and Evolutionary Computation Conference (2019) | |
Usage | |
----- | |
# Monte Calro Simulator | |
# True objective f(x) = ||x||^2 | |
# Noisy objective fn(x) = f(x) + N(0, 1) | |
# Approximate objective f_k = (1/k) * sum_{i=1}^{k} fn(x) = f(x) + N(0, 1/k) | |
class MonteCarloSimulator: | |
def __init__(self, k): | |
self.std = k**(-0.5) | |
def __call__(self, x): | |
return np.dot(x, x) + np.random.randn(1)[0] * self.std | |
# number of simulators | |
m = 10 | |
# list of different approximate functions | |
func_list = [MonteCarloSimulator(2**i) for i in range(m)] | |
# list of approximate runtime ratio (if available) | |
err_list = [2] * (m-1) | |
# Create the switcher (function wrapper) | |
# Give this to your optimization algorithm as an objective function. | |
adaptf = AdaptiveSimulationSwitcher(func_list, err_list, runtime_estimation=False) | |
# Example calls | |
print("{:>7} {:>7} {:>7} {:>7} {:>7} {:>7}".format("Itr", "Idx", "Tcurr", "Tprev", "Tnext", "<tau>")) | |
for itr in range(1000): | |
fvals = adaptf(np.random.randn(10, 20)/2) | |
print("{:>7} {:>7} {:>7.1e} {:>7.1e} {:>7.1e} {:>7.1e}".format(itr+1, adaptf.idx_current, adaptf.t_current, adaptf.t_previous, adaptf.t_next, adaptf.tau_avg)) | |
""" | |
def __init__(self, func_list, err_list=None, runtime_estimation=True, beta=1.0, tau_thresh=0.5, tau_margin=0.2, tau_lr=0.1): | |
""" | |
Parameters | |
---------- | |
func_list : list of callable [f1, ..., fm] | |
simulators sorted in the ascending order of expected runtime | |
err_list : list of float [a1, ..., am-1] | |
estimated runtime ratios, mi = runtime(fi+1) / runtime(fi) | |
If there is not good estimated values, they can be all 0, | |
and use `runtime_estimation = True` | |
runtime_estimation : bool | |
compute elapsed time in runtime to decide when to check Kendall's tau | |
beta : float | |
Kendall's tau is computed if the current f has spent beta times | |
the elapsed time for the next f | |
tau_thresh : float | |
threshold for tau value to switch to the next level | |
tau_margin : float | |
threshold margin. | |
""" | |
if err_list is not None: | |
assert len(func_list) - 1 == len(err_list) | |
else: | |
err_list = np.zeros(len(func_list) - 1) | |
runtime_estimation = True | |
self.func_list = func_list | |
self.err_list = err_list | |
self.cum_err_list = np.hstack(([1.], np.cumprod(err_list))) | |
self.runtime_estimation = runtime_estimation | |
self.beta = beta | |
self.tau_thresh = tau_thresh | |
self.tau_margin = tau_margin | |
self.tau_avg = 0. | |
self.tau_lr = tau_lr | |
self.idx_current = 0 | |
self.t_current = 0 | |
self.t_next = 0 | |
self.t_previous = 0 | |
# The following values are used only for logging | |
self._time_list = np.zeros((len(func_list), 3)) # T0 and T1 for each mode | |
self._nfes_list = np.zeros((len(func_list), 3), dtype=int) # f0 and f1 FEs for each mode | |
def __call__(self, x_list): | |
"""Batch Evaluator | |
Parameters | |
---------- | |
x_list : 2d array | |
each row is a candidate solution | |
Returns | |
------- | |
f_list : 1d array | |
(approximated) objective values | |
""" | |
flist1, elapsed1 = self._eval(x_list, idx=0) | |
self.t_current += elapsed1 | |
flist = flist1 | |
# initialize the first estimate of the runtime for next level | |
if (self.t_next == 0 and self.idx_current + 1 < len(self.func_list)): | |
self.t_next = self.err_list[self.idx_current] * self.t_current | |
elif (self.t_next == 0 and self.idx_current + 1 == len(self.func_list)): | |
self.t_next = np.inf | |
if (self.t_previous == 0 and self.idx_current > 0 and self.err_list[self.idx_current - 1] > 0): | |
self.t_previous = self.t_current / self.err_list[self.idx_current - 1] | |
elif (self.t_previous == 0 and self.idx_current == 0): | |
self.t_previous = np.inf | |
# compute f for the next level | |
if (self.beta * self.t_next < self.t_current): | |
flist2, elapsed2 = self._eval(x_list, idx=1) | |
self.t_next += elapsed2 | |
# compute tau and update idx | |
tau, p_value = kendalltau(flist1, flist2) | |
if tau < self.tau_thresh: | |
self.idx_current += 1 | |
self.t_current = self.t_next = self.t_previous = 0 | |
self.tau_avg = 0 | |
flist = flist2 | |
elif (self.beta * self.t_previous < self.t_current): | |
flist3, elapsed3 = self._eval(x_list, idx=-1) | |
self.t_previous += elapsed3 | |
# compute tau and update idx | |
tau, p_value = kendalltau(flist1, flist3) | |
self.tau_avg += (tau - self.tau_avg) * self.tau_lr | |
if self.tau_avg > self.tau_thresh + self.tau_margin: | |
self.idx_current -= 1 | |
self.t_current = self.t_next = self.t_previous = 0 | |
self.tau_avg = 0 | |
flist = flist3 | |
return flist | |
def _eval(self, x_list, idx=0): | |
assert 0 <= self.idx_current + idx < len(self.func_list), "index out of bound" | |
# evaluate current f | |
f = self.func_list[self.idx_current + idx] | |
st = time.time() | |
flist1 = np.asarray([f(x_list[i]) for i in range(len(x_list))]) | |
et = time.time() | |
# keep elapsed time (or number of evaluations) | |
elapsed1 = et - st if self.runtime_estimation else len(x_list) * self.cum_err_list[self.idx_current + idx] | |
# log | |
self._time_list[self.idx_current, idx] += elapsed1 | |
self._nfes_list[self.idx_current, idx] += len(x_list) | |
# Return | |
return flist1, elapsed1 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment