Skip to content

Instantly share code, notes, and snippets.

@youheiakimoto
Created March 24, 2019 23:01
Show Gist options
  • Save youheiakimoto/0630db3d461f855fb09d799d5dc48dd8 to your computer and use it in GitHub Desktop.
Save youheiakimoto/0630db3d461f855fb09d799d5dc48dd8 to your computer and use it in GitHub Desktop.
Adaptive Switching Strategy proposed by Akimoto et al. in GECCO (2019)
import math
import time
import numpy as np
from scipy.stats import kendalltau # tau-b in scipy v1.1.0
class AdaptiveSimulationSwitcher:
"""Adaptive Switching Strategy
In simulation-basedf optimization we often have access to multiple simulators or surrogate models that approximate a computationally expensive or intractable objective function with different trade-offs between the fidelity and computational time. Such a setting is called multi-fidelity optimization.
This is a python code of the strategy proposed in the following reference to adaptively select which simulator to use during optimization of comparison-based evolutionary algorithms. Our adaptive switching strategy works as a wrapper of multiple simulators: optimization algorithms optimize the wrapper function and the adaptive switching strategy selects a simulator inside the wrapper.
Reference
---------
Y. Akimoto, T. Shimizu, T. Yamaguchi
Adaptive Objective Selection for Multi-Fidelity Optimization
In Proceedings of Genetic and Evolutionary Computation Conference (2019)
Usage
-----
# Monte Calro Simulator
# True objective f(x) = ||x||^2
# Noisy objective fn(x) = f(x) + N(0, 1)
# Approximate objective f_k = (1/k) * sum_{i=1}^{k} fn(x) = f(x) + N(0, 1/k)
class MonteCarloSimulator:
def __init__(self, k):
self.std = k**(-0.5)
def __call__(self, x):
return np.dot(x, x) + np.random.randn(1)[0] * self.std
# number of simulators
m = 10
# list of different approximate functions
func_list = [MonteCarloSimulator(2**i) for i in range(m)]
# list of approximate runtime ratio (if available)
err_list = [2] * (m-1)
# Create the switcher (function wrapper)
# Give this to your optimization algorithm as an objective function.
adaptf = AdaptiveSimulationSwitcher(func_list, err_list, runtime_estimation=False)
# Example calls
print("{:>7} {:>7} {:>7} {:>7} {:>7} {:>7}".format("Itr", "Idx", "Tcurr", "Tprev", "Tnext", "<tau>"))
for itr in range(1000):
fvals = adaptf(np.random.randn(10, 20)/2)
print("{:>7} {:>7} {:>7.1e} {:>7.1e} {:>7.1e} {:>7.1e}".format(itr+1, adaptf.idx_current, adaptf.t_current, adaptf.t_previous, adaptf.t_next, adaptf.tau_avg))
"""
def __init__(self, func_list, err_list=None, runtime_estimation=True, beta=1.0, tau_thresh=0.5, tau_margin=0.2, tau_lr=0.1):
"""
Parameters
----------
func_list : list of callable [f1, ..., fm]
simulators sorted in the ascending order of expected runtime
err_list : list of float [a1, ..., am-1]
estimated runtime ratios, mi = runtime(fi+1) / runtime(fi)
If there is not good estimated values, they can be all 0,
and use `runtime_estimation = True`
runtime_estimation : bool
compute elapsed time in runtime to decide when to check Kendall's tau
beta : float
Kendall's tau is computed if the current f has spent beta times
the elapsed time for the next f
tau_thresh : float
threshold for tau value to switch to the next level
tau_margin : float
threshold margin.
"""
if err_list is not None:
assert len(func_list) - 1 == len(err_list)
else:
err_list = np.zeros(len(func_list) - 1)
runtime_estimation = True
self.func_list = func_list
self.err_list = err_list
self.cum_err_list = np.hstack(([1.], np.cumprod(err_list)))
self.runtime_estimation = runtime_estimation
self.beta = beta
self.tau_thresh = tau_thresh
self.tau_margin = tau_margin
self.tau_avg = 0.
self.tau_lr = tau_lr
self.idx_current = 0
self.t_current = 0
self.t_next = 0
self.t_previous = 0
# The following values are used only for logging
self._time_list = np.zeros((len(func_list), 3)) # T0 and T1 for each mode
self._nfes_list = np.zeros((len(func_list), 3), dtype=int) # f0 and f1 FEs for each mode
def __call__(self, x_list):
"""Batch Evaluator
Parameters
----------
x_list : 2d array
each row is a candidate solution
Returns
-------
f_list : 1d array
(approximated) objective values
"""
flist1, elapsed1 = self._eval(x_list, idx=0)
self.t_current += elapsed1
flist = flist1
# initialize the first estimate of the runtime for next level
if (self.t_next == 0 and self.idx_current + 1 < len(self.func_list)):
self.t_next = self.err_list[self.idx_current] * self.t_current
elif (self.t_next == 0 and self.idx_current + 1 == len(self.func_list)):
self.t_next = np.inf
if (self.t_previous == 0 and self.idx_current > 0 and self.err_list[self.idx_current - 1] > 0):
self.t_previous = self.t_current / self.err_list[self.idx_current - 1]
elif (self.t_previous == 0 and self.idx_current == 0):
self.t_previous = np.inf
# compute f for the next level
if (self.beta * self.t_next < self.t_current):
flist2, elapsed2 = self._eval(x_list, idx=1)
self.t_next += elapsed2
# compute tau and update idx
tau, p_value = kendalltau(flist1, flist2)
if tau < self.tau_thresh:
self.idx_current += 1
self.t_current = self.t_next = self.t_previous = 0
self.tau_avg = 0
flist = flist2
elif (self.beta * self.t_previous < self.t_current):
flist3, elapsed3 = self._eval(x_list, idx=-1)
self.t_previous += elapsed3
# compute tau and update idx
tau, p_value = kendalltau(flist1, flist3)
self.tau_avg += (tau - self.tau_avg) * self.tau_lr
if self.tau_avg > self.tau_thresh + self.tau_margin:
self.idx_current -= 1
self.t_current = self.t_next = self.t_previous = 0
self.tau_avg = 0
flist = flist3
return flist
def _eval(self, x_list, idx=0):
assert 0 <= self.idx_current + idx < len(self.func_list), "index out of bound"
# evaluate current f
f = self.func_list[self.idx_current + idx]
st = time.time()
flist1 = np.asarray([f(x_list[i]) for i in range(len(x_list))])
et = time.time()
# keep elapsed time (or number of evaluations)
elapsed1 = et - st if self.runtime_estimation else len(x_list) * self.cum_err_list[self.idx_current + idx]
# log
self._time_list[self.idx_current, idx] += elapsed1
self._nfes_list[self.idx_current, idx] += len(x_list)
# Return
return flist1, elapsed1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment