Created March 24, 2019 23:01
Adaptive Switching Strategy proposed by Akimoto et al. in GECCO (2019)
import math
import time
import numpy as np
from scipy.stats import kendalltau # tau-b in scipy v1.1.0
class AdaptiveSimulationSwitcher:
"""Adaptive Switching Strategy
In simulation-basedf optimization we often have access to multiple simulators or surrogate models that approximate a computationally expensive or intractable objective function with different trade-offs between the fidelity and computational time. Such a setting is called multi-fidelity optimization.
This is a python code of the strategy proposed in the following reference to adaptively select which simulator to use during optimization of comparison-based evolutionary algorithms. Our adaptive switching strategy works as a wrapper of multiple simulators: optimization algorithms optimize the wrapper function and the adaptive switching strategy selects a simulator inside the wrapper.
Y. Akimoto, T. Shimizu, T. Yamaguchi
Adaptive Objective Selection for Multi-Fidelity Optimization
In Proceedings of Genetic and Evolutionary Computation Conference (2019)
# Monte Calro Simulator
# True objective f(x) = ||x||^2
# Noisy objective fn(x) = f(x) + N(0, 1)
# Approximate objective f_k = (1/k) * sum_{i=1}^{k} fn(x) = f(x) + N(0, 1/k)
class MonteCarloSimulator:
def __init__(self, k):
self.std = k**(-0.5)
def __call__(self, x):
return, x) + np.random.randn(1)[0] * self.std
# number of simulators
m = 10
# list of different approximate functions
func_list = [MonteCarloSimulator(2**i) for i in range(m)]
# list of approximate runtime ratio (if available)
err_list = [2] * (m-1)
# Create the switcher (function wrapper)
# Give this to your optimization algorithm as an objective function.
adaptf = AdaptiveSimulationSwitcher(func_list, err_list, runtime_estimation=False)
# Example calls
print("{:>7} {:>7} {:>7} {:>7} {:>7} {:>7}".format("Itr", "Idx", "Tcurr", "Tprev", "Tnext", "<tau>"))
for itr in range(1000):
fvals = adaptf(np.random.randn(10, 20)/2)
print("{:>7} {:>7} {:>7.1e} {:>7.1e} {:>7.1e} {:>7.1e}".format(itr+1, adaptf.idx_current, adaptf.t_current, adaptf.t_previous, adaptf.t_next, adaptf.tau_avg))
def __init__(self, func_list, err_list=None, runtime_estimation=True, beta=1.0, tau_thresh=0.5, tau_margin=0.2, tau_lr=0.1):
func_list : list of callable [f1, ..., fm]
simulators sorted in the ascending order of expected runtime
err_list : list of float [a1, ..., am-1]
estimated runtime ratios, mi = runtime(fi+1) / runtime(fi)
If there is not good estimated values, they can be all 0,
and use `runtime_estimation = True`
runtime_estimation : bool
compute elapsed time in runtime to decide when to check Kendall's tau
beta : float
Kendall's tau is computed if the current f has spent beta times
the elapsed time for the next f
tau_thresh : float
threshold for tau value to switch to the next level
tau_margin : float
threshold margin.
if err_list is not None:
assert len(func_list) - 1 == len(err_list)
err_list = np.zeros(len(func_list) - 1)
runtime_estimation = True
self.func_list = func_list
self.err_list = err_list
self.cum_err_list = np.hstack(([1.], np.cumprod(err_list)))
self.runtime_estimation = runtime_estimation
self.beta = beta
self.tau_thresh = tau_thresh
self.tau_margin = tau_margin
self.tau_avg = 0.
self.tau_lr = tau_lr
self.idx_current = 0
self.t_current = 0
self.t_next = 0
self.t_previous = 0
# The following values are used only for logging
self._time_list = np.zeros((len(func_list), 3)) # T0 and T1 for each mode
self._nfes_list = np.zeros((len(func_list), 3), dtype=int) # f0 and f1 FEs for each mode
def __call__(self, x_list):
"""Batch Evaluator
x_list : 2d array
each row is a candidate solution
f_list : 1d array
(approximated) objective values
flist1, elapsed1 = self._eval(x_list, idx=0)
self.t_current += elapsed1
flist = flist1
# initialize the first estimate of the runtime for next level
if (self.t_next == 0 and self.idx_current + 1 < len(self.func_list)):
self.t_next = self.err_list[self.idx_current] * self.t_current
elif (self.t_next == 0 and self.idx_current + 1 == len(self.func_list)):
self.t_next = np.inf
if (self.t_previous == 0 and self.idx_current > 0 and self.err_list[self.idx_current - 1] > 0):
self.t_previous = self.t_current / self.err_list[self.idx_current - 1]
elif (self.t_previous == 0 and self.idx_current == 0):
self.t_previous = np.inf
# compute f for the next level
if (self.beta * self.t_next < self.t_current):
flist2, elapsed2 = self._eval(x_list, idx=1)
self.t_next += elapsed2
# compute tau and update idx
tau, p_value = kendalltau(flist1, flist2)
if tau < self.tau_thresh:
self.idx_current += 1
self.t_current = self.t_next = self.t_previous = 0
self.tau_avg = 0
flist = flist2
elif (self.beta * self.t_previous < self.t_current):
flist3, elapsed3 = self._eval(x_list, idx=-1)
self.t_previous += elapsed3
# compute tau and update idx
tau, p_value = kendalltau(flist1, flist3)
self.tau_avg += (tau - self.tau_avg) * self.tau_lr
if self.tau_avg > self.tau_thresh + self.tau_margin:
self.idx_current -= 1
self.t_current = self.t_next = self.t_previous = 0
self.tau_avg = 0
flist = flist3
return flist
def _eval(self, x_list, idx=0):
assert 0 <= self.idx_current + idx < len(self.func_list), "index out of bound"
# evaluate current f
f = self.func_list[self.idx_current + idx]
st = time.time()
flist1 = np.asarray([f(x_list[i]) for i in range(len(x_list))])
et = time.time()
# keep elapsed time (or number of evaluations)
elapsed1 = et - st if self.runtime_estimation else len(x_list) * self.cum_err_list[self.idx_current + idx]
# log
self._time_list[self.idx_current, idx] += elapsed1
self._nfes_list[self.idx_current, idx] += len(x_list)
# Return
return flist1, elapsed1
