Skip to content

Instantly share code, notes, and snippets.

@youheiakimoto
Created July 10, 2023 04:47
Show Gist options
  • Save youheiakimoto/f342a139a6dfcfd55dbc5e345dbf4f9f to your computer and use it in GitHub Desktop.
Save youheiakimoto/f342a139a6dfcfd55dbc5e345dbf4f9f to your computer and use it in GitHub Desktop.
PSA-CMA-ES: CMA-ES with Population Size Adaptation [Nishida and Akimoto, GECCO 2018]
# coding:utf-8
import sys
import os
from abc import ABCMeta, abstractmethod, abstractproperty
import time
import pprint
import math
from collections import deque
from functools import partial
import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
from scipy.stats import norm
from numpy.core.umath import floor
from numpy.random.mtrand import rand
try:
import quadprog
except ImportError:
print("`quadprog` not found. Try `pip install quadprog`.")
print("Some constraint handling techniques are unavailable.")
mpl.rc('lines', linewidth=2, markersize=8)
mpl.rc('font', size=12)
mpl.rc('grid', color='0.75', linestyle=':')
mpl.rc('ps', useafm=True) # Force to use
mpl.rc('pdf', use14corefonts=True) # only Type 1 fonts
mpl.rc('text', usetex=True) # for a paper submision
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
NITER = 0
MEASURE = 1
ELAPSED = 2
CRITERION = 3
PROFILE = 'profile'
EXT = '.dat'
DELIM = '_'
PATTERNS = ["/", "\\", "|", "-", "+", "x", "o", "O", ".", "*", ""]
if sys.version_info[0] >= 3:
# For Python Version >= 3.0
basestring = str
else:
# For Python Version < 3.0
range = xrange
class CmaSolution:
"""Cma Solution Class
Attributes
----------
arxraw, arxmut, arxfeas : numpy.ndarray[float]
raw solution, mutation vector, repaired solution
They are all 1 dimensional, and must have the same shape
arffeas, arpen, arfit : float
objective value at repaired solution, penalty value, (final) fitness value
They are all 1 dimensional.
"""
def __init__(self, array_like):
"""
Parameters
----------
array_like : array_like
solution vector to be set to `arxfeas`
"""
self.arxfeas = np.asarray(array_like)
self.arxraw = np.copy(self.arxfeas)
self.arxmut = np.zeros(len(self.arxfeas))
self.x_to_eval = np.copy(self.arxfeas)
self.arffeas = np.nan
self.arpen = np.nan
self.arfit = np.nan
class NormalOrderStatistics(object):
"""Compute Moments of Normal Order Statistics
Requires
--------
numpy
scipy.stats.norm
"""
def __init__(self, n):
"""Normal Order Statistics from `n` populations
Normal order statistics of population size `n` are the ordered
random variables
N_{1:n} < N_{2:n} < ... < N_{n:n}
that are drawn from the standard normal distribution N(0, 1)
independently.
Parameters
----------
n : int
population size
"""
self._n = n
self._pr = np.arange(1, n + 1, dtype=float) / (n + 1)
self._q0r = norm.ppf(self._pr)
self._q1r = 1.0 / norm.pdf(self._q0r)
self._q2r = self._q0r * self._q1r**2
self._q3r = (1.0 + 2.0 * self._q0r**2) * self._q1r**3
self._q4r = self._q0r * (7.0 + 6.0 * self._q0r**2) * self._q1r**4
def exp(self):
"""Expectation of the normal order statistics, using Taylor Expansion.
Returns
-------
1D ndarray : array of expectation of the normal order statistics
Algorithm
---------
Eq. (4.6.3)--(4.6.5) combined with Example 4.6 in "Order Statistics".
"""
result = self._q0r
result += self._pr * (1 - self._pr) * self._q2r / (2 * self._n + 4)
result += self._pr * (1 - self._pr) * (
1 - 2 * self._pr) * self._q3r / (3 * (self._n + 2)**2)
result += (self._pr *
(1 - self._pr))**2 * self._q4r / (8 * (self._n + 2)**2)
return result
def var(self):
"""Variance of the normal order statistics, using Taylor Expansion.
Returns
-------
1D ndarray : array of variance of the normal order statistics
Algorithm
---------
Eq. (4.6.3)--(4.6.5) combined with Example 4.6 in "Order Statistics".
"""
result = self._pr * (1 - self._pr) * self._q1r**2 / (self._n + 2)
result += self._pr * (1 - self._pr) * (
1 - 2 * self._pr) * 2 * self._q1r * self._q2r / ((self._n + 2)**2)
result += (self._pr * (1 - self._pr))**2 * (
self._q1r * self._q3r + self._q2r**2 / 2) / (self._n + 2)**2
return result
def cov(self):
"""Covariance of the normal order statistics, using Taylor Expansion.
Returns
-------
2D ndarray : array of covariance of the normal order statistics
Algorithm
---------
Eq. (4.6.3)--(4.6.5) combined with Example 4.6 in "Order Statistics".
"""
result = np.outer(self._pr**2 * self._q2r, (1 - self._pr)
**2 * self._q2r) / 2
result += np.outer(self._pr * (1 - 2 * self._pr) * self._q2r,
(1 - self._pr) * self._q1r)
result += np.outer(self._pr * self._q1r,
(1 - self._pr) * (1 - 2 * self._pr) * self._q2r)
result += np.outer(self._pr**2 * (1 - self._pr) * self._q3r,
(1 - self._pr) * self._q1r) / 2
result += np.outer(self._pr * self._q1r, self._pr * (1 - self._pr)
**2 * self._q3r) / 2
result /= (self._n + 2)**2
result += np.outer(self._pr * self._q1r,
(1 - self._pr) * self._q1r) / (self._n + 2)
return np.triu(result) + np.triu(result, k=1).T
def blom(self):
"""Blom's Approximation of the Expectation of Normal Order Statistics
Returns
-------
1D ndarray : array of expectation of the normal order statistics
"""
alpha = 0.375
pir = (np.arange(1, self._n + 1) - alpha) / (self._n + 1 - 2 * alpha)
return norm.ppf(pir)
def davis_stephens(self):
"""Refinement of Covariance Matrix by Algorithm 128
Returns
-------
2D ndarray : array of covariance of the normal order statistics
See
---
https://statistics.stanford.edu/sites/default/files/SOL%20ONR%20254.pdf
"""
result = self.cov()
n = self._n
for i in range((n + 1) // 2):
rowsum = np.sum(result[i])
free = np.sum(result[i, i:n - i])
result[i, i:n - i] *= 1 + (1 - rowsum) / free
result[i:n - i, i] = result[i, i:n - i]
result[n - i - 1, i:n - i] = (result[i, i:n - i])[::-1]
result[i:n - i, n - i - 1] = (result[i, i:n - i])[::-1]
return result
def sigma_normalization_factor(dim, weights, cm=1.):
"""sigma = snf * ||xmean|| """
nos = NormalOrderStatistics(len(weights))
nlam = nos.blom()
beta = -np.dot(nlam, weights)
if len(weights) < 50:
nnlam = nos.davis_stephens()
gamma = beta**2 + np.dot(np.dot(nnlam, weights), weights)
else:
gamma = beta**2
muw = np.sum(np.abs(weights)) ** 2 / np.dot(weights, weights)
return beta * muw / (dim - 1 + gamma * muw) / cm
def compute_nearest_feasible(vec_x, mat_inv_c, mat_a, vec_b):
"""Find the nearest feasible solution
It solve the following convex quadratic programming:
argmin_{y in R^N} f(y) = (x - y) * C^{-1} * (x - y) / N
subject to A*y <= b
Parameters
----------
vec_x : 1d array-like
x
mat_inv_c : 2d array-like
C^{-1}
mat_a : 2d array-like
A
vec_b : 1d array-like
b
Returns
-------
y (1d array-like) : solution
f (float) : f(y)
iact (1d array-like) : list of indeces of the active constraints at y
Note
----
The solver uses quadprog package.
The following is taken from the solve_qp docstring.
============================================================================
Solve a strictly convex quadratic program
Minimize 1/2 x^T G x - a^T x
Subject to C.T x >= b
This routine uses the the Goldfarb/Idnani dual algorithm [1].
References
---------
... [1] D. Goldfarb and A. Idnani (1983). A numerically stable dual
method for solving strictly convex quadratic programs.
Mathematical Programming, 27, 1-33.
Parameters
----------
G : array, shape=(n, n)
matrix appearing in the quadratic function to be minimized
a : array, shape=(n,)
vector appearing in the quadratic function to be minimized
C : array, shape=(n, m)
matrix defining the constraints under which we want to minimize the
quadratic function
b : array, shape=(m), default=None
vector defining the constraints
meq : int, default=0
the first meq constraints are treated as equality constraints,
all further as inequality constraints (defaults to 0).
factorized : bool, default=False
If True, then we are passing :math:`R^{−1}` (where :math:`G = R^T R`)
instead of the matrix G in the argument G.
Returns
-------
x : array, shape=(n,)
vector containing the solution of the quadratic programming problem.
f : float
the value of the quadratic function at the solution.
xu : array, shape=(n,)
vector containing the unconstrained minimizer of the quadratic function
iterations : tuple
2-tuple. the first component contains the number of iterations the
algorithm needed, the second indicates how often constraints became
inactive after becoming active first.
lagrangian : array, shape=(m,)
vector with the Lagragian at the solution.
iact : array
vector with the indices of the active constraints at the solution.
============================================================================
In our case,
G = C^{-1}
a^T = C^{-1} * x
-C.T = A
b = -b
"""
inv_c_x = np.dot(mat_inv_c, vec_x)
scaling1 = np.max(np.abs(mat_inv_c))
scaling2 = np.max(np.abs(mat_a))
y, f, _, _, _, iact = quadprog.solve_qp(G=mat_inv_c / scaling1,
a=inv_c_x / scaling1,
C=-mat_a.T / scaling2,
b=-vec_b / scaling2,
meq=0,
factorized=False)
f = ((2.0 * scaling1) * f + np.dot(vec_x, inv_c_x)) / len(vec_x)
return y, f, iact
def ranking(v):
"""Index sort, Lower rank, Upper rank
Examples
--------
v = np.array([1, 1, 0, 3])
idx, lower, upper = rank(v)
idx == [2, 0, 1, 3] or [2, 1, 0, 3]
lower == [0, 1, 1, 3]
upper == [0, 2, 2, 3]
Parameters
----------
v : array-like
Returns
-------
idx : idx[j] is the index of `j`th smallest among `v`
lower : lower[j] is the lower rank of the index idx[j]
upper : upper[j] is the upper rank of the index idx[j]
"""
idx = np.argsort(v)
lower = np.arange(v.shape[0], dtype=int)
upper = np.arange(v.shape[0], dtype=int)
dup = np.where(np.abs(v[idx[1:]] - v[idx[:-1]]) <= np.spacing(v[idx[1:]]))[0]
if dup.shape[0] > 0:
jump = np.where(dup[1:] - dup[:-1] > 1)[0]
if jump.shape[0] == 0:
lower[dup + 1] = lower[dup[0]]
upper[dup] = upper[dup[-1] + 1]
else:
j = 0
for i in jump:
lower[dup[j:i+1] + 1] = lower[dup[j]]
upper[dup[j:i+1]] = upper[dup[i] + 1]
j = i+1
lower[dup[j:] + 1] = lower[dup[j]]
upper[dup[j:]] = upper[dup[-1] + 1]
return idx, lower, upper
def resample(dim, sample_func, func_list, non_positive_indices=None, maxresample=100, cls=CmaSolution):
"""Resampling Infeasibility Handling
The functions will
* raise Exception or
* return np.inf or np.nan or None
if they are not well-defined
It resamples a candidate solution until its function values are well-defined.
Moreover, if non_positive_indeces is given, it resamples until the corresponding
function values will be non positive.
Parameters
----------
dim : int
dimension
sample_func : callable
function to generate a candidate solution.
It takes a list of CmaSolusion instance as an argument
func_list : list of callable
list of functions for which the candidate solutions are resampled
until they are feasible
non_positive_indices : list of indeces, optional
candidate solutions are resampled until the functions with the given indices
return non-positive values
maxresample : int, optional
maximum number of resample
cls : class, optional
Solusion Class, subclass of CmaSolution
"""
assert issubclass(cls, CmaSolution)
non_positive_indices = non_positive_indices if non_positive_indices is not None else []
neval_func = [0] * len(func_list)
idx_feas = maxresample
resampled = []
for i in range(maxresample):
indi = cls(np.zeros(dim))
is_feas = True
indi = sample_func([indi])
indi[0].values = [None] * len(func_list)
for j, f in enumerate(func_list):
try:
indi[0].values[j] = f(indi[0])
except:
indi[0].values[j] = np.inf
if indi[0].values[j] in (None, np.nan):
indi[0].values[j] = np.inf
neval_func[j] += 1
if (indi[0].values[j] == np.inf) or ((j in non_positive_indices) and (indi[0].values[j] > 0)):
is_feas = False
break
resampled += indi
if is_feas:
idx_feas = i
break
return idx_feas, resampled, neval_func
class ABCIterativeAlgorithm(object):
"""Iterative Algorithm Interface
Main functionalities are: onestep, check, log, run, plotme, and plot
onestep : perform one iteration of the algorithm,
call _onestep
check : check if a termination condition is satisfied,
call _check
log : write and display the internal states,
call _log_preprocess before taking a log
run : perform onestep-check-log loop untile
a termination condition is satisfied
plotme : plot the log data, shortcut of `plot`
plot : plot the log data
Derived Classes need to implement the following methods and properties
_onestep : one iteration of the algorithm
_check : termination checker
_log_preprocess : optional, preprocessing for log output.
mainly for monkeypatch.
measure : measure (int) of the run-length of the algorithm
criterion : measure (float) of the progress of the algorithm
recommendation : current recommendation (ndarray)
See
---
RandomSearch, RandomSearchOption
"""
__metaclass__ = ABCMeta
def __init__(self, opts):
"""
Parameters
----------
opts : IterativeAlgorithmOption or its derived class instance
It can be either parsed or not parsed.
"""
self._niter = 0
self._time_step = 0.0
self._time_check = 0.0
self._time_log = 0.0
self._is_satisfied = False
self._condition = ''
assert isinstance(opts, IterativeAlgorithmOption)
if opts.is_parsed:
self.opts = opts
else:
self.opts = opts.parse()
self.opts_original = opts
# Logger dictionary
self.logger = dict()
if self.opts.log_prefix and self.opts.log_span > 0:
self.profile_logger = self.opts.log_prefix + DELIM + PROFILE + EXT
with open(self.profile_logger, 'w') as f:
f.write('#' + type(self).__name__ + "\n")
for key in self.opts.log_variable_list:
self.logger[key] = self.opts.log_prefix + DELIM + key + EXT
with open(self.logger[key], 'w') as f:
f.write('#' + type(self).__name__ + "\n")
@property
def is_satisfied(self):
return self._is_satisfied
@property
def condition(self):
return self._condition
def onestep(self):
t0 = time.perf_counter()
self._onestep()
self._time_step += time.perf_counter() - t0
self._niter += 1
def check(self):
t0 = time.perf_counter()
self._is_satisfied, self._condition = self._check()
self._time_check += time.perf_counter() - t0
return self._is_satisfied, self._condition
def log(self, flgforce=False, condition=''):
"""Take a log
Parameters
----------
flgforce : bool, optional
force to take a log.
condition : str, optional
termination condition.
"""
elapsed_time = self.time_step + self.time_check
# Check if the log should be taken
if ((flgforce and self.opts.log_span > 0) or
(self.opts.log_span >= 1 and self.niter % self.opts.log_span == 0)
or (1.0 > self.opts.log_span > 0.0 and
elapsed_time * self.opts.log_span > self.time_log)):
t0 = time.perf_counter()
# Preprocess
self._log_preprocess()
# Display
displayline = "{0} {1} {2} {3}".format(
repr(self.niter),
repr(self.measure), repr(elapsed_time), str(self.criterion))
if self.opts.log_display:
print(displayline)
if condition:
print('# End with condition = ' + condition)
with open(self.profile_logger, 'a') as f:
f.write(displayline + "\n")
if condition:
f.write('# End with condition = ' + condition)
for key, log in self.logger.items():
key_split = key.split('.')
key = key_split.pop(0)
var = getattr(self, key) # var = self.__dict__[key] # Y.A. on 2017/09/22
for i in key_split:
var = getattr(var, i) # var.__dict__[i] # Y.A. on 2017/09/22
if isinstance(var, np.ndarray) and len(var.shape) > 1:
var = var.flatten()
varlist = np.hstack(
(self.niter, self.measure, elapsed_time, var))
with open(log, 'a') as f:
f.write(' '.join(map(repr, varlist)) + "\n")
self._time_log += time.perf_counter() - t0
def _log_preprocess(self):
"""Preprocess for logging
This function is called at the top of `log` function.
By default, it does nothing. This method will be implemented
when one wants to record some values that are not the attribute
of the `iterative_algorithm`.
Two possible usage of this functions are:
1. to implement this method in a derived class
2. to monkeypatch this method outside the class definition
An example of the second usage is as follows
Example
-------
Assume that we want to record the product of `a` and `b`
defined in the `iterative_algorithm`.
>>> # Define a function to be monkeypatched
>>> def monkeypatch(self):
>>> a = self.iterative_algorithm.a
>>> b = self.iterative_algorithm.b
>>> # set a variable to `self.iterative_algorithm`
>>> self.iterative_algorithm.a_times_b = a * b
>>> # Replace `_log_preprocess`
>>> ABCIterativeAlgorithm._log_preprocess = monkeypatch
"""
pass
def run(self):
"""Run the iterative algorithm
If you want to monitor the behavior of the algorithm during the run,
it is possible to do it by either using the multithreading or run a different interpreter
Example 1
---------
from threading import Thread
th = Thread(target=algo.run, name='algo.run') # algo is an instance of this class
th.start()
algo.plotme()
Example 2
---------
algo.run()
# Open a separate interpreter
ABCIterativeAlgorithmOption.plot(...)
"""
is_satisfied = False
while not is_satisfied:
if self.opts.check_exception:
try:
self.onestep()
is_satisfied, condition = self.check()
self.log(flgforce=is_satisfied, condition=condition)
except Exception as e:
is_satisfied = True
self.log(flgforce=True, condition='exception: ' + str(e))
else:
self.onestep()
is_satisfied, condition = self.check()
self.log(flgforce=is_satisfied, condition=condition)
def plotme(self, xaxis=MEASURE, **kwargs):
"""Plot the results using `plot` function
Parameters
----------
xaxis : int
see xaxis in `plot`
kwargs : optional parameters
optional arguments to `plot`
"""
return ABCIterativeAlgorithm.plot(opts=self.opts, xaxis=xaxis, **kwargs)
@staticmethod
def plot(prefix='',
xaxis=MEASURE,
variable_list=None,
opts=None,
ncols=None,
figsize=None,
cmap_='winter'):
"""Plot the result
This allows to plot previously generated or post-processed data that
has the same format as the log file generated by ``log``.
Parameters
----------
prefix : str
the prefix of the log file
xaxis : int
NITER == 0. vs iter
MEASURE == 1. vs measure
ELAPSED == 2. vs cpu time in sec.
variable_list : list of str
names of variables
opts : OptionBaseClass or its derived class
If this is given, the other parameters are not needed
Returns
-------
fig : figure object.
figure object
axdict : dictionary of axes
the keys are the names of variables given in `variable_list`
The log files must be located at ``prefix`` + PROFILE + EXT,
and ``prefix`` + ``variable_list[i]`` + EXT.
"""
if opts:
prefix = opts.log_prefix
variable_list = opts.log_variable_list
if variable_list is None:
variable_list = []
# Default settings
nfigs = 1 + len(variable_list)
if ncols is None:
ncols = int(np.ceil(np.sqrt(nfigs)))
nrows = int(np.ceil(nfigs / ncols))
if figsize is None:
figsize = (4 * ncols, 3 * nrows)
axdict = dict()
# Figure
fig = plt.figure(figsize=figsize)
# The first figure
x = np.loadtxt(prefix + DELIM + PROFILE + EXT)
x = x[~np.isnan(x[:, xaxis]), :] # remove columns where xaxis is nan
# Axis
ax = plt.subplot(nrows, ncols, 1)
ax.set_title('criterion')
ax.grid(True)
ax.grid(which='major', linewidth=0.50)
ax.grid(which='minor', linewidth=0.25)
plt.plot(x[:, xaxis], x[:, 3:])
ax.xaxis.set_major_formatter(mpl.ticker.FuncFormatter(my_formatter))
ax.yaxis.set_major_formatter(mpl.ticker.FuncFormatter(my_formatter))
axdict['criterion'] = ax
# The other figures
idx = 1
for key in variable_list:
idx += 1
x = np.loadtxt(prefix + DELIM + key + EXT)
x = x[~np.isnan(
x[:, xaxis]), :] # remove columns where xaxis is nan
ax = plt.subplot(nrows, ncols, idx)
ax.set_title(r'\detokenize{' + key + '}')
ax.grid(True)
ax.grid(which='major', linewidth=0.50)
ax.grid(which='minor', linewidth=0.25)
if False: # Before 2017/02/11
plt.plot(x[:, xaxis], x[:, 3:])
else: # New
cmap = plt.get_cmap(cmap_)
cNorm = mpl.colors.Normalize(vmin=0, vmax=x.shape[1] - 1)
scalarMap = mpl.cm.ScalarMappable(norm=cNorm, cmap=cmap)
for i in range(x.shape[1] - 3):
plt.plot(
x[:, xaxis], x[:, 3 + i], color=scalarMap.to_rgba(i))
ax.xaxis.set_major_formatter(
mpl.ticker.FuncFormatter(my_formatter))
ax.yaxis.set_major_formatter(
mpl.ticker.FuncFormatter(my_formatter))
axdict[key] = ax
plt.tight_layout() # NOTE: not sure if it works fine
return fig, axdict
@property
def niter(self):
return self._niter
@property
def time_step(self):
return self._time_step
@property
def time_check(self):
return self._time_check
@property
def time_log(self):
return self._time_log
@abstractmethod
def _onestep(self):
"""Do one step (one iteration) of the algorithm"""
pass
@abstractmethod
def _check(self):
"""Check the termination criteria
Returns
-------
flag : bool
True if one of the termination condition is satisfied, False otherwise
condition : str
String of condition, empty string '' if no condition is satisfied
"""
if self.criterion < self.opts.check_target:
return True, 'target'
elif self.niter >= self.opts.check_maxiter:
return True, 'maxiter'
elif self.time_step + self.time_check >= self.opts.check_maxsec:
return True, 'maxsec'
elif self.measure >= self.opts.check_maxruntime:
return True, 'maxruntime'
else:
return False, ''
@abstractproperty
def measure(self):
"""Measure (int) of the run-length of the algorithm"""
@abstractproperty
def criterion(self):
"""Measure (float) of the progress of the algorithm"""
@abstractproperty
def recommendation(self):
"""The current recommendation (ndarray)"""
def my_formatter(x, pos):
"""Float Number Format for Axes"""
float_str = "{0:2.1e}".format(x)
if "e" in float_str:
base, exponent = float_str.split("e")
return r"{0}e{1}".format(base, int(exponent))
else:
return r"" + float_str + ""
def _myeval(s, g, l):
if isinstance(s, basestring):
return eval(s, g, l)
else:
return s
class OptionBaseClass(object):
def __init__(self, **kwargs):
"""OptionBaseClass
It is a container of variables, whose values can depends on the other variables.
The method `parse` automatically determines the right order to parse the variables
and returns a new instance consisting of the parsed values.
Parameters
----------
The pair (key, value) in the keyward arguments is stored in the instance as
instance.key = value
"""
for key in kwargs:
setattr(self, key, kwargs[key])
self.is_parsed = False
def __str__(self):
return type(self).__name__ + str(self.__dict__)
def setattr_from_local(self, locals_):
for key in locals_:
if key != 'self' and key != 'kwargs':
setattr(self, key, locals_[key])
def disp(self):
"""Display all the options"""
print(type(self).__name__)
pp = pprint.PrettyPrinter()
pp.pprint(self.__dict__)
def to_latex(self):
"""Parse the option to LaTeX
Return
------
LaTeX string.
"""
res = ''
res += '\\begin{longtable}{ll}\n'
res += '\\caption{The parameters of ' + type(self).__name__ + '.}\\\\\n'
res += '\\hline\n'
res += 'Key & Value \\\\\n'
res += '\\hline\n'
for key in sorted(list(self.__dict__)):
if key != 'is_parsed':
res += '\\detokenize{' + str(key) + '} & \\detokenize{' + str(getattr(self, key)) + '}\\\\\n'
res += '\\hline\n'
res += '\\end{longtable}\n'
return res
def parse(self, env=None, flg_print=False):
"""Parse the member variables that are string expressions
Parameters
----------
env : dict, default None
The dictionary of a namespace. A typical choice is globals().
The values in the instance are evaluated on the environment `env`
updated with globals() called inside the method.
flg_print : bool, default False
Print the parse process.
Returns
-------
parsed : an instance of the same class as `self`
all the member variables are parsed.
Example
-------
Case 1.
>>> opts = OptionBaseClass(N='10', M='int(N)', L='int(N)')
>>> parsed = opts.parse()
>>> parsed.disp()
OptionBaseClass
{'L': 10, 'M': 10, 'N': 10, 'is_parsed': True}
Case 2.
repr(N) is evaluated before it is passed to the function
>>> N = 100
>>> opts = OptionBaseClass(N='10', L=repr(N))
>>> parsed = opts.parse()
>>> parsed.disp()
OptionBaseClass
{'L': 100, 'N': 10, 'is_parsed': True}
Case 3.
>>> N = 100
>>> opts = OptionBaseClass(M='N', L=repr(N))
>>> parsed = opts.parse(globals())
>>> parsed.disp()
OptionBaseClass
{'L': 100, 'M': 100, 'is_parsed': True}
In the following cases, one may obtain unexpected outputs.
Case A.
opts = OptionBaseClass(N='10', M='N')
The output of `parse` method is undefined.
If `M` is evaluated before `N`, the result will be M = '10' (string).
To prevent this unexpected behavior, consider to cast the variable like
opts = OptionBaseClass(N='10', M='int(N)')
Case B.
opts = OptionBaseClass(N='M', M='N')
Obviously, the variables can not be evaluated if there is a pair of
variables that depend on each other.
Case C.
N = 100
mypow = pow
opts = OptionBaseClass(M='mypow(N, L)', L='2')
parsed = opts.parse(globals())
To refer to variables and objects defined in the caller,
call `parse` with env=globals()
Case D.
Call `parse` with env=globals() if some modules are required
to evaluate some variables
import numpy as np
opts = OptionBaseClass(N='np.arange(5)', M='np.sqrt(N)')
parsed = opts.parse(globals())
"""
if self.is_parsed:
print("Already parsed. Returns itself.")
return self
parsed_dict = dict()
failure_count = 0
key_list = list(self.__dict__)
key_list.remove('is_parsed')
if env is None:
env = dict(globals())
else:
env = dict(env)
env.update(globals())
env.update(self.__dict__)
while key_list:
if failure_count >= len(key_list):
print("Some options couldn't be parsed: " + str(key_list))
print("Their values are as follows.")
for key in key_list:
print(key + ': ' + getattr(self, key))
print("\n" + "To find out the reason, see the document of " +
"`OptionBaseClass.parse`, and\n" +
"A. type-cast the option variables (see Case A);\n" +
"B. remove a cycle of variables (see Case B);\n" +
"C. call `parse` with env=globals() " +
"to use global variables (see Case C);\n" +
"D. import modules and functions such as " +
"`numpy`, `exp`, `sqrt` (see Case D).\n")
print("Here are the parsed values:")
pp = pprint.PrettyPrinter()
pp.pprint(parsed_dict)
print("Try 'OptionBaseClass.parse' with 'flg_print' option.")
raise ValueError()
key = key_list.pop()
try:
val = _myeval(getattr(self, key), env, parsed_dict)
parsed_dict[key] = val
if flg_print:
print(key + ': ' + repr(val))
failure_count = 0
except:
key_list.insert(0, key)
failure_count += 1
parsed = self.create()
key_list = list(self.__dict__)
key_list.remove('is_parsed')
for key in key_list:
setattr(parsed, key, parsed_dict[key])
parsed.is_parsed = True
return parsed
@classmethod
def create(cls, **kwargs):
return cls(**kwargs)
class IterativeAlgorithmOption(OptionBaseClass):
"""Option Container for Iterative Algorithm Class"""
def __init__(self,
log_prefix=repr('.'),
log_span=repr(0.1),
log_display=repr(True),
log_variable_list=repr([]),
check_target="-np.inf",
check_maxiter="np.inf",
check_maxsec="np.inf",
check_maxruntime="np.inf",
check_exception=repr(False),
**kwargs):
OptionBaseClass.__init__(self, **kwargs)
self.setattr_from_local(locals())
class TolfChecker(object):
def __init__(self, size=20):
"""
Parameters
----------
size : int
Nnumber of points for which the value is restored
For CMA-ES, 10 + (N / lambda) will be a reasonable value.
"""
self._min_hist = np.empty(size) * np.nan
self._l_quartile_hist = np.empty(size) * np.nan
self._median_hist = np.empty(size) * np.nan
self._u_quartile_hist = np.empty(size) * np.nan
self._max_hist = np.empty(size) * np.nan
self._pop_hist = np.empty(size) * np.nan
self._next_position = 0
def update(self, arf):
"""Add values to the history and remove elements from history if needed"""
self._min_hist[self._next_position] = np.nanmin(arf)
self._l_quartile_hist[self._next_position] = np.nanpercentile(arf, 25)
self._median_hist[self._next_position] = np.nanmedian(arf)
self._u_quartile_hist[self._next_position] = np.nanpercentile(arf, 75)
self._max_hist[self._next_position] = np.nanmax(arf)
self._next_position = (
self._next_position + 1) % self._min_hist.shape[0]
def check(self, tolfun=1e-9):
# alias to check_absolute
return self.check_relative(tolfun)
def check_relative(self, tolfun=1e-11):
iqr = np.nanmedian(self._u_quartile_hist - self._l_quartile_hist)
med_ = abs(np.nanmedian(self._min_hist))
if not np.isnan(iqr) and not np.isnan(med_):
return iqr < tolfun * med_
else:
return False
def check_absolute(self, tolfun=1e-11):
iqr = np.nanmedian(self._u_quartile_hist - self._l_quartile_hist)
if not np.isnan(iqr):
return iqr < tolfun
else:
return False
def check_flatarea(self):
return np.all(self._l_quartile_hist - self._min_hist == 0)
class TolxChecker(object):
def __init__(self, dim, size=20):
"""
Parameters
----------
size : int
number of points for which the value is restored
"""
self._min_hist = np.empty((size, dim)) * np.nan
self._l_quartile_hist = np.empty((size, dim)) * np.nan
self._median_hist = np.empty((size, dim)) * np.nan
self._u_quartile_hist = np.empty((size, dim)) * np.nan
self._max_hist = np.empty((size, dim)) * np.nan
self._next_position = 0
def update(self, arx):
"""Add values to the history and remove elements from history if needed"""
self._min_hist[self._next_position] = np.nanmin(arx, axis=0)
self._l_quartile_hist[self._next_position] = np.nanpercentile(
arx, 25, axis=0)
self._median_hist[self._next_position] = np.nanmedian(arx, axis=0)
self._u_quartile_hist[self._next_position] = np.nanpercentile(
arx, 75, axis=0)
self._max_hist[self._next_position] = np.nanmax(arx, axis=0)
self._next_position = (
self._next_position + 1) % self._min_hist.shape[0]
def check(self, tolx=1e-10):
# alias to check_absolute
return self.check_relative(tolx)
def check_relative(self, tolx=1e-10):
iqr = np.nanmedian(self._u_quartile_hist - self._l_quartile_hist, axis=0)
med_ = np.abs(self._median_hist).min(axis=0)
if not np.all(np.isnan(iqr)) and not np.all(np.isnan(med_)):
return np.any(iqr < tolx * med_)
else:
return False
def check_absolute(self, tolx=1e-10):
iqr = np.nanmedian(self._u_quartile_hist - self._l_quartile_hist, axis=0)
if not np.all(np.isnan(iqr)):
return np.any(iqr < tolx)
else:
return False
def weight(pop, xmean, transform_inverse, base_weight=None, cone=0, cmu=1):
"""Compute Weights
Parameters
----------
pop : list of CmaSolution instances
population to be weighted
xmean : ndarray 1d
mean vector
transform_inverse : callable
function to inverse-transform a solution
pre_weight : array-like
pre-defined weight values
cone, cmu : float, optional
learning rate for rank-one and rank-mu
0 <= cone + cmu <= 1
If cmu = 1, the resulting weights are all non-negative
Returns
-------
ndarray (1D)
scaled weights
"""
lam = len(pop)
N = len(xmean)
ww = pre_weight(N, lam=lam, base_weight=base_weight, cone=cone, cmu=cmu)
# Treat Tie
idx, lr, ur = ranking(np.asarray([indi.arfit for indi in pop]))
w = np.array([np.mean(ww[lr[i]:ur[i]+1]) for i in range(lam)])
# assert np.all(idx == pop.idx)
# Scale Weights
for i in range(lam):
if w[i] < 0:
arz = transform_inverse(pop[idx[i]].arxraw - xmean)
w[i] *= N / np.sum(arz**2)
#arz = transform_inverse(pop.arxraw - xmean)
#w[w < 0] *= N / np.sum(arz[idx[w < 0]]**2, axis=1)
return w
def pre_weight(N, lam=None, base_weight=None, cone=0, cmu=1):
"""Compute Weights
Parameters
----------
N : int
dimension of search space
lam : int
pop size
base_weight : array-like
pre-defined weight values
cone, cmu : float, optional
learning rate for rank-one and rank-mu
0 <= cone + cmu <= 1
If cmu = 1, the resulting weights are all non-negative
Returns
-------
ndarray (1D)
pre-weights
"""
if lam is None:
assert base_weight is not None
lam = len(base_weight)
if base_weight is None:
ww = np.array([math.log((lam + 1) / 2.0) - math.log(1 + i) for i in range(lam)])
else:
assert len(base_weight) == lam
ww = np.array(base_weight, copy=True)
# Taken from Niko's CMA-ES Tutorial (Appendix A)
ww[ww > 0] /= np.sum(ww[ww > 0]) if np.any(ww > 0) else 1
mueff_pos = 1.0 / np.sum(ww[ww > 0] ** 2) if np.any(ww > 0) else 0
if cmu > 0:
# Taken from Niko's CMA-ES Tutorial (Appendix A)
ww[ww < 0] /= -np.sum(ww[ww < 0]) if np.any(ww < 0) else 1
mueff_neg = 1.0 / np.sum(ww[ww < 0] ** 2) if np.any(ww < 0) else 0
teig = max(1, 1 / (10 * N * (cmu + cone))) # eigh performed every teig iterations
amu = (1 + cone / cmu) / (1 + N * ((20 * N / (20 * N - 1)) ** teig - 1))
amuw = 1 + 2 * mueff_neg / (mueff_pos + 2)
amupos = (1 - cone - cmu) / (N * cmu)
ww[ww < 0] *= np.nanmin((amu, amuw, amupos))
return ww
class MixedIntegerHandler(object):
def __init__(self, stair_width, cs=None, ds=None, minstd=0, correction_mode='sigma', flg_integer_mutation=True):
"""Helper for Mixed Integer Problems
Three functionarities are implimented:
integer_mutation : mutation for integer variables
mixed_integer_csa : csa variant for mixed integer
repair_coordinate_std : force the minimum std for integer variables
Parameters
----------
stair_width : array-like
stair width for integer variables. 0 for float variables.
cs, ds : float
cumulation factor and damping factor
minstd : float or array-like, default = 0
minimum (coordinate-wize) standard deviation
correction_mode : str, optional, default = 'sigma'
mode selection for covariance correction
'none' : no correction
'sigma' : sigma correction
'coordinate_std' : coordinate_std correction
flg_integer_mutation : bool, optional, default = True
flag for integer mutation
"""
N = len(stair_width)
self.stair_width = stair_width
# for update
self.ps = np.zeros(N)
self.cs = cs
self.ds = ds
self.chiN = math.sqrt(N) * (1.0 - 1.0 / (4.0 * N) + 1.0 / (21.0 * N * N))
self.hsig = True
self.last_sig_change_fac = 1.0
self.correction_mode = correction_mode
self.flg_integer_mutation = flg_integer_mutation
# for repair_coordinate_std
ien = np.sum(self.stair_width > 0.0)
if np.isscalar(minstd):
self.minstd = np.asarray([max(minstd, self.stair_width[i] / (2.0 * ien + 1.0)) for i in range(N)])
else:
self.minstd = np.asarray([max(minstd[i], self.stair_width[i] / (2.0 * ien + 1.0)) for i in range(N)])
def integer_mutation(self, lam, xmean, coordinate_std, bestx=None):
"""Integer Mutation
Parameters
----------
lam : int
number of candidate solutions
xmean, coordinate_std : 1d array-like
mean vector and the coordinate wise standard deviation (sqrt of diag of sigma**2 * C)
bestx : 1d array-like, optional
best solution from the last iteration. If not given, best solution injection is disabled.
Returns
-------
integer mutation vectors
"""
m = lam
n = len(xmean)
if not self.flg_integer_mutation:
return np.zeros((m, n))
if bestx is None:
bestx = xmean
Rint = np.zeros((m, n), dtype=int)
# Compute IR: masking vector: the components where an integer mutation will be applied
IR = (2.0 * coordinate_std < self.stair_width)
absIR = np.sum(IR)
# compute lambda_int: number of solutions that the integer mutation will be applied
if absIR == 0:
lamint = 0
elif absIR == n:
lamint = m // 2
else:
lamint = min( m // 10 + absIR + 1, m // 2 - 1 )
if lamint > 0:
# compute Rp
rnddim = np.nonzero(IR)[0][np.random.permutation(absIR)]
Rp = np.zeros((lamint, n))
for i in range(lamint):
Rp[i, rnddim[i % absIR]] = 1
# compute Rpp
# Rpp = np.random.rand(lamint, n) >= 0.7 ** (1.0 / absIR)
Rpp = np.zeros((lamint, n))
Rpp[:, rnddim] = np.random.geometric(p=0.7 ** (1.0 / absIR), size=(lamint, absIR)) - 1
# compute Iplusminus: sign-swiching (dim, lamint)-array for Rint, with each element being +-1
#Iplusminus = np.random.randint(low=0, high=2, size=(lamint, n))
Iplusminus = np.random.choice([-1, 1], size=(lamint, n))
# compute Rint: integer mutation
rndidx = np.random.permutation(m)
Rint[rndidx[:lamint], :] = Iplusminus * (Rp + Rpp)
#Rint[rndidx[lamint], stair_width > 0.0] = np.floor((bestx - xmean)[stair_width > 0.0] / stair_width[stair_width > 0.0])
mask = np.where(self.stair_width > 0.0)[0]
Rint[rndidx[lamint], mask] = np.floor(bestx[mask] / self.stair_width[mask]) - np.floor(xmean[mask] / self.stair_width[mask])
return Rint * self.stair_width
def mixed_integer_csa(self, weights, arz, coordinate_std):
"""Update Step-Size by CSA variation for Mixed Integer
Correction for the flat (ineffective) dimension is implemented.
Parameters
----------
weights: 1d array-like
recombination weights. Each value corresponds to each row of arz
arz : 2d array-like
inversely_transformed steps (rows)
coordinate_std : 1d array-like
coordinate-wise standard deviation of gaussian
Returns
-------
last_sig_change_fac (float) : multiplicant to update sigma
hsig (bool) : flag to check if the step-size is not going to be increased rapidly, normally 1.
Note
----
The correction is required since CSA suffer from ineffective axis.
For mixed integer problems the correction can be done explicitly, but
it is not possible in general. A preferred way would be to use other
step-size control mechanisms such as TPA.
"""
# update evolution path
w = np.zeros(len(weights))
w[weights > 0.0] = weights[weights > 0]
muw = 1.0 / np.dot(w, w)
dz = np.dot(w, arz)
self.ps *= (1.0 - self.cs)
self.ps += math.sqrt(self.cs * (2.0 - self.cs) * muw) * dz
# update sigma
self.hsig = np.linalg.norm(self.ps) < self.chiN * (1.5 + 1.0 / (len(self.ps) - 0.5))
vecIsigma = (coordinate_std / math.sqrt(self.cs) >= 0.2 * self.stair_width)
count_Isig = np.sum(vecIsigma)
normps = np.linalg.norm(self.ps[vecIsigma])
if count_Isig > 0:
chin_mi = math.sqrt(count_Isig) * (1.0 - 1.0 / (4.0 * count_Isig) + 1.0 / (21.0 * count_Isig * count_Isig))
self.last_sig_change_fac = math.exp((self.cs / self.ds) * (normps / chin_mi - 1.0))
else:
self.last_sig_change_fac = 1.0
return self.last_sig_change_fac, self.hsig
def std_correction(self, coordinate_std):
"""Std correction to prevent the coordinate-wise std to be too small
Parameters
coordinate_std : 1d array-like
coordinate-wise standard deviation of gaussian
Returns
-------
1d ndarray : multiplicant to update std
"""
if self.correction_mode == 'sigma':
return max(1.0, np.max(self.minstd / coordinate_std))
elif self.correction_mode == 'coordinate_std':
return np.fmax(1.0, self.minstd / coordinate_std)
elif self.correction_mode == 'none':
return 1
else:
raise ValueError(self.correction_mode)
class PeriodicVariableHandler(object):
def __init__(self, lower, upper):
"""Periodic Variable Handler for CMA by Yamaguchi 2016 (in japanese)
Parameters
----------
lower, upper : array-like
period = [lower, upper]. Set -np.inf, np.inf for non-periodic variables
"""
self.lower = lower
self.upper = upper
def project_x(self, x):
"""Return x projected into the right period
Parameters
----------
x : array-like
solution vector
Return
------
projected arx : 2d array-like
Note
----
Unnecessary to call this function if the objective function and the constraint functions
are well-defined outside the right period.
"""
period = self.upper - self.lower
mask = period < np.inf
projected = x.copy()
if np.ndim(x) == 1:
projected[mask] -= np.floor((x[mask] - self.lower[mask]) / period[mask]) * period[mask]
elif np.ndim(x) == 2:
projected[:, mask] -= np.floor((x[:, mask] - self.lower[mask]) / period[mask]) * period[mask]
else:
raise TypeError
return projected
def project_distribution(self, mean, coordinate_std):
"""project N(mean, cov)
Parameters
----------
mean : array-like
mean vector
coordinate_std : array-like
sqrt of the diagonal elements of the covariance matrix. Note: sigma * diag(C)**0.5 for CMA
Returns
-------
projected_mean (array-like) : projected mean
projection_diag (array-like) : diagonal matrix to project the covariance matrix and others.
Note
----
1. The evolution path for rank-1 update should be projected by the return diagonal matrix.
2. In case of full covariance matrix C, projection by D (i.e., D*C*D) can be done as
C *= D
C *= D[np.newaxis].T
pc *= D
3. In case that the mean vector is recorded for the later use (e.g., for evolution path update),
projecting the mean vector will affect the code.
"""
projected_mean = self.project_x(mean)
D = np.fmin((self.upper - self.lower) / coordinate_std / 4.0, 1.0)
return projected_mean, D
class MirroringBoxConstrainHandler(object):
"""Mirroring Box Constraint Handling
f(x) = f(2*UB - x) if x > UB
f(x) = f(2*LB - x) if x < LB
Note
----
If mirroring is applied, the corresponding variables become periodic,
where the period is [LB - (UB - LB)/2, UB + (UB - LB)/2].
Consider to apply PeriodicVariableHandler together with this class.
See example for the usage
"""
def __init__(self, lower, upper, mode='linear'):
"""
Parameters
----------
lower, upper : array-like
if no bounds, put -np.inf (for lb) or np.inf (for ub)
mode : str, optional
('linear', 'linquad')
"""
assert mode in ('linear', 'linquad')
self.lb = lower
self.ub = upper
self.mode = mode
if self.mode == 'linquad':
self.al = np.asarray([min( (self.ub[i] - self.lb[i]) / 2.0, (1.0 + np.abs(self.lb[i])) / 20.0 )
if np.isfinite(self.lb[i]) else 1 for i in range(len(self.lb))])
self.au = np.asarray([min( (self.ub[i] - self.lb[i]) / 2.0, (1.0 + np.abs(self.ub[i])) / 20.0 )
if np.isfinite(self.ub[i]) else 1 for i in range(len(self.ub))])
else:
self.al = np.zeros(len(self.lb))
self.au = np.zeros(len(self.ub))
def get_mirrored_x(self, x):
x_mirrored = np.array(x, copy=True)
mask_l = (self.lb > -np.inf)
mask_u = (self.ub < np.inf)
mask_lu = mask_l * mask_u
# step 1: projection of x into a base period [l - 2al - (u - l)/2, u + 2au + (u - l)/2]
lb = self.lb[mask_l] - self.al[mask_l]
ub = self.ub[mask_u] + self.au[mask_u]
ll = 1.5 * self.lb[mask_lu] - 0.5 * self.ub[mask_lu] - 2.0 * self.al[mask_lu]
uu = 1.5 * self.ub[mask_lu] - 0.5 * self.lb[mask_lu] + 2.0 * self.au[mask_lu]
if np.ndim(x) == 1:
x_mirrored[mask_lu] = x[mask_lu] - np.floor((x[mask_lu] - ll) / (uu - ll)) * (uu - ll)
# assert np.all(ll <= x_mirrored[mask_lu]) and np.all(x_mirrored[mask_lu] < uu)
elif np.ndim(x) == 2:
x_mirrored[:, mask_lu] = x[:, mask_lu] - np.floor((x[:, mask_lu] - ll) / (uu - ll)) * (uu - ll)
else:
raise ValueError
# step 2: mirroring
if np.ndim(x) == 1:
x_mirrored[mask_l] = lb + np.abs(lb - x_mirrored[mask_l])
x_mirrored[mask_u] = ub - np.abs(ub - x_mirrored[mask_u])
elif np.ndim(x) == 2:
x_mirrored[:, mask_l] = lb + np.abs(lb - x_mirrored[:, mask_l])
x_mirrored[:, mask_u] = ub - np.abs(ub - x_mirrored[:, mask_u])
else:
raise ValueError
if self.mode is 'linquad':
import warnings
warnings.warn("This option is not yet tested. Check carefully and remove warning.")
mask_al = (x_mirrored < self.lb + self.al)
mask_au = (x_mirrored > self.ub - self.au)
if np.ndim(x) == 1:
x_mirrored[mask_al] = self.lb + (x_mirrored[mask_al] - lb) ** 2 / 4.0 / self.al[mask_al]
x_mirrored[mask_au] = self.ub - (x_mirrored[mask_au] - ub) ** 2 / 4.0 / self.au[mask_au]
elif np.ndim(x) == 2:
x_mirrored[:, mask_al] = self.lb + (x_mirrored[:, mask_al] - lb) ** 2 / 4.0 / self.al[mask_al]
x_mirrored[:, mask_au] = self.ub - (x_mirrored[:, mask_au] - ub) ** 2 / 4.0 / self.au[mask_au]
else:
raise ValueError
return x_mirrored
class AdaptivePenaltyFunctionBoxConstraintHandler(object):
def __init__(self, lower_bound, upper_bound, lambda_=1, mode='sakamoto17'):
"""Box Constraint Handling Based on Adaptive Penalty Function
Two major variants are implemented
hansen09 : The original version. Erratum and the modification presented in the author version of [Hansen 09] is implemented.
sakamoto17 : The improved version [Sakamoto 17].
Parameters
----------
lower_bound, upper_bound : ndarray
lower and upper bound of each variable.
lambda_ : int, optional
lambda, population size, sample size per iteration
To support adaptive population size, lambda_ should be the
minimal number for the population size
mode : str, defalt = 'sakamoto17'
version ('hansen09', 'sakamoto17')
"""
self.lb = lower_bound
self.ub = upper_bound
N = self.lb.shape[0]
self.penalty = np.zeros(N)
self.iqr_hist = deque(maxlen=20 + 3 * N // lambda_)
self.flg_winit = False
self.sqdm = 1.0
self.meanviolation = 1.0
assert mode in ('hansen09', 'sakamoto17'), 'Unsupported mode {}'.format(mode)
self.mode = mode
def repair(self, x, out=None):
"""Push each variable outside the feasible set onto the boundary
Parameters
----------
x : ndarray
each row vector represents a point to be repaired
out : ndarray, optional
array where the output (repaired point) is stored
Returns
-------
out : ndarray
Repaired x. If `out` is passed, its reference is returned.
"""
return np.clip(x, self.lb, self.ub, out)
def check(self, x):
"""Check if x is feasible
Parameters
----------
x : ndarray (1D or 2D)
a vector to be checked, or a matrix such that each row is a vector to be checked
Returns
-------
ndarray (same dimension as x) : each value is -1 if x[i] < LB[i], 1 if x[i] > UB[i], 0 otherwise
"""
return -1 * (x < self.lb) + (x > self.ub)
def update_penalty_factor(self, lam, mueff, f_repaired, xmean, coordinate_std):
# Update interquartile history
q75, q50, q25 = np.percentile(f_repaired, [75, 50, 25])
iqr = q75 - q25
delf = iqr / np.mean(coordinate_std ** 2)
if np.isfinite(delf) and delf > 0:
self.iqr_hist.append(delf)
else:
self.iqr_hist.append(q50)
print('Warning: Bad Initialization. An Invalid IQR observed.')
print('delf = {}'.format(delf))
# Penalty Weight Initialization
if (not np.all(self.check(xmean) == 0) and
(not self.flg_winit or len(self.iqr_hist) < 3)):
# This initialization is performed at most twice (in first and second iterations)
assert len(self.iqr_hist) > 0, 'No Feasible Solution. Try something like xmean = (lb + ub) / 2 and sigma = (ub - lb) / 6.'
self.penalty[:] = 2.0 * self._effective_iqr_median(lam)
self.flg_winit = True
if self.flg_winit and len(self.iqr_hist) >= 3:
# Increase the penalty coefficient
dm = xmean - self.repair(xmean)
N = xmean.shape[0]
delm = np.abs(dm) / coordinate_std
delth = 3 * max(1, math.sqrt(N) / mueff)
dgam = min(1, mueff / (10 * N))
if self.mode in ('hansen09'):
self.penalty *= np.exp(
(dgam / 2) * np.tanh(np.fmax(0, delm - delth) / 3))
self.penalty[self.penalty >
5 * np.median(self.iqr_hist)] *= math.exp(-dgam / 3)
elif self.mode in ('sakamoto17'):
mvio = np.fmax(0, delm - delth)
self.penalty *= np.exp((dgam / 2) * np.tanh(mvio / 3))
# Update the penalty coefficient based on IQR history
scale = self._effective_iqr_median(lam) / np.mean(self.penalty)
if 3.0 * scale < 1.0:
# Decrease
self.penalty[:] *= 3.0 * scale
# elif scale > 1.0: #
# # Increase # Change 2017.02.17
# self.penalty[mvio > 0] *= scale #
# Cinv_array = 1 / (sigma**2 * variance)
# vCv_sqrt = np.sqrt(Cinv_array)
# self.sqdm = np.sum((dm * Cinv_array / vCv_sqrt)**2)
self.sqdm = np.sum(delm**2)
def compute_penalty(self, x_raw, x_repaired, coordinate_std):
"""Compute Penalty
Parameters
----------
x_raw, x_repaired : ndarray
each row of them is a point before and after the repair
coordinate_std : ndarray (1D)
coordinate-wise std of distribution
"""
assert x_raw.shape == x_repaired.shape
N = len(coordinate_std)
if self.mode in ('hansen09'):
return np.dot((x_repaired - x_raw)**2, self.penalty) / self.penalty.shape[0]
elif self.mode in ('sakamoto17'):
Cinv = 1.0 / coordinate_std ** 2
vCv = Cinv.copy()
self.meanviolation = np.dot(((x_raw-x_repaired) * Cinv.T)**2 / vCv.T, np.ones(N)) / N
return np.dot((x_repaired - x_raw)**2, self.penalty) / self.penalty.shape[0]
def _effective_iqr_median(self, lam):
"""Effective IQR Median
Let L be the length of the IQR history (iqr_hist).
L = 0 : error
L <= 3 : median of iqr_hist
L > 3 : let t be the current iteration.
step 1. compute the median of three latest IQRs (medIQR)
step 2. find the maximum T such that
| log(IQR[-3-t]) - log(medIQR) | < log(5.0)
for all t <= T <= maximum history length.
step 3. compute the median of IQR[-3-T], ... , IQR[-1]
Parameters
----------
lam : int
population size
"""
N = self.lb.shape[0]
if len(self.iqr_hist) == 0:
raise RuntimeError
elif len(self.iqr_hist) <= 3:
return np.median(self.iqr_hist)
else:
arr_iqr_hist = np.asarray(self.iqr_hist)[:20+3*N//lam]
med = np.median(arr_iqr_hist[-3:])
tmp = np.abs(np.log(arr_iqr_hist[:-3]) - np.log(med))[::-1]
i = np.where(tmp > np.log(5.0))[0]
if len(i) > 0:
i = i[0] + 3
else:
i = len(arr_iqr_hist)
return np.median(arr_iqr_hist[-1:-i - 1:-1])
class AdaptivePenaltyFunctionLinearConstraintHandler(object):
# Created by Sakamoto 2017.02.16, Revised by YA on 2017.10.13
def __init__(self,
matA,
vecb,
method='vLCH',
dth_number=3):
self.A = matA.reshape(1, len(matA)).astype(float) if matA.ndim == 1 else matA.astype(float)
self.b = vecb.ravel().astype(float) if vecb.ndim == 2 else vecb.astype(float)
# self.b[:] -= 1e-15
N = self.A.shape[1]
self.iqr_hist = deque(maxlen=20 + 3 * N)
self.winit = False
self.method = method
self.dth_num = dth_number
if method in ('vLCH'):
self.penalty = np.zeros(self.A.shape[0])
elif method in ('eLCH'):
self.penalty = np.zeros(N)
else:
raise NameError('wrong lch-name')
def violation(self, x):
"""
Parameters
----------
x : ndarray (1D or 2D)
Returns
-------
"""
g_value = np.dot(x, self.A.T) - self.b
return g_value
# def repair(self, x, transform_inverse=None, epsrel=1e-8, epsabs=1e-14):
# N = x.shape[-1]
# y = x.reshape((-1, N))
# yfeas = np.empty(y.shape)
# invC = np.eye(N)
# if transform_inverse is not None:
# invC = transform_inverse(invC)
# invC = transform_inverse(invC.T)
# for i in range(y.shape[0]):
# yfeas[i] = compute_nearest_feasible(vec_x=y[i], mat_inv_c=invC, mat_a=self.A, vec_b=self.b)[0]
# # Force xfeas to be feasible
# while np.any(self.violation(yfeas[i]) > 0):
# dy = yfeas[i] - y[i]
# ndy = np.linalg.norm(dy)
# dy /= ndy
# yfeas[i] += (epsabs + epsrel * ndy) * dy
# yfeas.shape = x.shape
# return yfeas
def repair(self, x, transform_inverse=None, init_eps=1e-14):
if np.any(self.violation(x) > 0):
N = x.shape[-1]
y = x.reshape((-1, N))
yfeas = y.copy()
mat_inv_c = np.eye(N)
if transform_inverse is not None:
mat_inv_c = transform_inverse(transform_inverse(mat_inv_c).T)
for i in range(y.shape[0]):
eps = init_eps
while np.any(self.violation(yfeas[i]) > 0):
yfeas[i] = compute_nearest_feasible(vec_x=y[i],
mat_inv_c=mat_inv_c,
mat_a=self.A,
vec_b=self.b-eps)[0]
eps *= 10
return yfeas
else:
return x
def update_penalty_factor(self, mueff, f_repaired, xmean, coordinate_std, transform):
N = len(xmean)
lam = len(f_repaired)
damp = min(1, mueff / (10 * N))
dth = (self.dth_num * max(1, N ** 0.5 / mueff))
# Update interquartile history
q75, q50, q25 = np.percentile(f_repaired, [75, 50, 25])
iqr = q75 - q25
delf = iqr / np.mean(coordinate_std ** 2)
if np.isfinite(delf) and delf > 0:
self.iqr_hist.append(delf)
else: # Add 2017.02.16
self.iqr_hist.append(q50)
print('Warning: Bad Initialization. An Invalid IQR observed.')
print('delf = {}'.format(delf))
# Prepare
violation = self.violation(xmean)
trimed = self._effective_iqr_median(lam)
# Penalty coefficients Initialization
# This initialization is performed at most twice (in first and second iterations)
if np.any(violation > 0) and (not self.winit or len(self.iqr_hist) <= 2):
self.penalty[:] = 2.0 * trimed
self.winit = True
# Update the penalty coefficient
if self.winit and len(self.iqr_hist) > 2:
if self.method == 'vLCH':
# === v-LCH === #
sqrtCv = transform(self.A)
vCv = np.sum(sqrtCv ** 2, axis=1)
edist = (violation / np.sqrt(vCv)) - dth
elif self.method in ('eLCH'):
# === e-LCH === #
dm = xmean - self.repair(xmean)
edist = (np.abs(dm) / coordinate_std) - dth
else:
raise ValueError
# Increase penalty coefficient
self.penalty *= np.exp((edist > 0) * np.tanh(edist / self.dth_num) * damp / 2.)
# Decrease penalty coefficient
scale = trimed / np.mean(self.penalty)
if 3.0 * scale < 1.0:
self.penalty[:] *= 3.0 * scale
def compute_penalty(self, x_raw, x_repaired):
"""Compute Penalty
Parameters
----------
x_raw, x_repaired : ndarray
each row of them is a point before and after the repair
Returns
-------
ndarray (1D)
array of penalty values
"""
assert x_raw.shape == x_repaired.shape
N = x_repaired.shape[-1]
if (self.method == 'vLCH') or (not self.method):
# === v-LCH === #
return np.dot(np.fmax(np.dot(x_raw - x_repaired, self.A.T), 0) ** 2, self.penalty) / N
elif self.method == 'eLCH':
# === e-LCH === #
return np.dot((x_raw - x_repaired) ** 2, self.penalty) / N
def _effective_iqr_median(self, lam):
"""Effective IQR Median
Let L be the length of the IQR history (iqr_hist).
L = 0 : error
L <= 3 : median of iqr_hist
L > 3 : let t be the current iteration.
step 1. compute the median of three latest IQRs (medIQR)
step 2. find the maximum T such that
| log(IQR[-3-t]) - log(medIQR) | < log(5.0)
for all t <= T <= maximum history length.
step 3. compute the median of IQR[-3-T], ... , IQR[-1]
Parameters
----------
lam : int
population size
"""
N = self.A.shape[1]
if len(self.iqr_hist) == 0:
raise RuntimeError
elif len(self.iqr_hist) <= 3:
return np.median(self.iqr_hist)
else:
arr_iqr_hist = np.asarray(self.iqr_hist)[:20+3*N//lam]
med = np.median(arr_iqr_hist[-3:])
tmp = np.abs(np.log(arr_iqr_hist[:-3]) - np.log(med))[::-1]
i = np.where(tmp > np.log(5.0))[0]
if len(i) > 0:
i = i[0] + 3
else:
i = len(arr_iqr_hist)
return np.median(arr_iqr_hist[-1:-i - 1:-1])
class UncertaintyHandler(object):
"""Uncertainty Handling Utilities based on [Hansen 09]"""
def __init__(self,
N,
func=None,
r_lam=0.1,
theta=0.2,
alpha_sigma=None,
alpha_teval=1.5,
teval_min=1,
teval_max=np.inf,
eps_reev=1e-7,
cut_limit=np.inf,
cum_noise=1,
batch_evaluation=False,
additive_gauss=False):
self.func = func
self.r_lam = r_lam
self.theta = theta
self.alpha_sigma = 1 + 2 / (N + 10) if alpha_sigma is None else alpha_sigma
self.alpha_teval = alpha_teval
self.teval_min = teval_min
self.teval_max = teval_max
self.eps_reev = eps_reev
self.cut_limit = cut_limit
self.cum_noise = cum_noise
self.noiselevel = 0
self.teval = teval_min
self.batch_evaluation = batch_evaluation
self.additive_gauss = additive_gauss
def eval(self, indi):
"""Evaluate averaged objective
Parameters
----------
indi : CmaSolution (or its derived class) instance
Returns
-------
averaged objective function value (indi.arffeas = f(indi.x_to_eval))
"""
arf = 0.
if self.additive_gauss:
arf = self.func(indi, n_average=self.teval)
else:
for t in range(self.teval):
arf += self.func(indi)
arf /= self.teval
indi.arffeas = arf
return arf
@staticmethod
def uncertainty_quantification(arf1, arf2, theta=0.2, cutlimit=np.Inf):
"""Uncertainty Quantification based on Hansen et al. (2009)"""
# input argument verification
lam = arf1.size
lamreev = arf2.size
assert (arf1.ndim == arf2.ndim ==
1), "arf1 and arf2 should be numpy 1D-array"
assert (lam >= lamreev), "arf1.size must be no less than arf2.size"
if lam > lamreev:
arf2 = np.hstack((arf2, arf1[lamreev:lam]))
# compute rank change in rankDelta
idx = np.hstack((arf1, arf2)).argsort(axis=0)
ranks = idx.argsort(axis=0)
rank_old = ranks[0:lamreev]
rank_new = ranks[lam:lam + lamreev]
rankDelta = rank_new - rank_old - np.sign(rank_new - rank_old)
# compute the measurement
X = np.tile(np.arange(2 * lam - 1), (lamreev, 1)).T
Lim1 = np.tile(rank_new - (rank_new > rank_old), (2 * lam - 1, 1))
Lim2 = np.tile(rank_old - (rank_old > rank_new), (2 * lam - 1, 1))
pcntile1 = np.percentile(np.abs(X - Lim1), 50 * theta, axis=0)
pcntile2 = np.percentile(np.abs(X - Lim2), 50 * theta, axis=0)
s = 2 * np.abs(rankDelta) - pcntile1 - pcntile2
# cut-off limit
idx = np.abs(s) > cutlimit
s[idx] = np.sign(s[idx]) * cutlimit
# return
return np.mean(s)
def measure_noise(self, pop, func_mutate=None):
# Re-Evaluation
fpr = max(self.r_lam * len(pop), 2)
lamreev = int(floor(fpr) + (rand(1) < (fpr - floor(fpr))))
pop_re = [None] * lamreev
arf_re = [None] * lamreev
for i in range(lamreev):
if func_mutate is not None:
pop_re[i] = type(pop[i])(func_mutate(pop[i].x_to_eval, self.eps_reev))
else:
pop_re[i] = type(pop[i])(pop[i].x_to_eval)
pop_re[i].x_to_eval = pop_re[i].arxfeas
arf_re[i] = self.eval(pop_re[i])
# update the noise level
s = self.uncertainty_quantification(
np.asarray([pop[i].arffeas for i in range(len(pop))]),
np.asarray(arf_re), theta=self.theta, cutlimit=self.cut_limit)
self.noiselevel += self.cum_noise * (s - self.noiselevel)
return pop_re, arf_re, self.teval * lamreev
@staticmethod
def argsort(arf, arf_re):
# input argument verification
arf_ = np.asarray(arf)
arf_re_ = np.asarray(arf_re)
lam = arf_.size
lamreev = arf_re_.size
assert (arf_.ndim == arf_re_.ndim ==
1), "arf1 and arf2 should be numpy 1D-array"
assert (lam >= lamreev), "arf1.size must be no less than arf2.size"
arf2 = np.hstack((arf_re_, arf_[lamreev:lam]))
# compute rank change in rankDelta
idx = np.hstack((arf_, arf2)).argsort(axis=0)
ranks = idx.argsort(axis=0)
rank_old = ranks[0:lamreev]
rank_new = ranks[lam:lam + lamreev]
rankDelta = rank_new - rank_old - np.sign(rank_new - rank_old)
rank_avg = ranks[:lam] + ranks[lam:]
rank_diff = np.zeros(lam, dtype=int)
rank_diff[:len(rankDelta)] = np.abs(rankDelta)
x = np.array(
[i for i in zip(rank_avg, rank_diff, (arf_ + arf2) / 2)],
dtype=np.dtype('i8, i8, f8')) #Modified by knishida on June 28, 2017
# x = np.array(
# zip(rank_avg, rank_diff, (arf + arf2) / 2),
# dtype=np.dtype('i8, i8, f8'))
return np.argsort(x)
def uncertainty_handling(self, sigma):
# Update teval or sigma
if self.noiselevel > 0:
if self.teval < self.teval_max:
self.teval = int(
min(
max(self.teval + 1, self.alpha_teval * self.teval),
self.teval_max))
else:
sigma *= self.alpha_sigma
elif self.noiselevel < 0:
self.teval = int(
max(
min(self.teval - 1, self.teval / self.alpha_teval),
self.teval_min))
return self.teval, sigma
class PopSizeAdaptation(object):
def __init__(self, lam_min=4, lam_max=np.inf, alpha=1.4, beta=0.5, dt=2.0, granularity=1):
self.lam_min = lam_min
self.lam_max = lam_max
self.psa_alpha = alpha
self.psa_beta = beta
self.psa_dt = dt
self.psa_granularity = granularity
self.pt_factor = 0.0
self.pmnorm_list = []
self.pcnorm_list = []
assert lam_min % granularity == 0
def register_cma(self, cma):
if not hasattr(self, 'cma'):
assert isinstance(cma, ABCCma)
self.cma = cma
self.N = self.cma.opts.N
self.lam_dash = cma.popsize
self.pxmean = np.zeros(cma.opts.N)
self.pcov = np.zeros((cma.opts.N, cma.opts.N))
self.old_M = self.cma.xmean.copy()
self.old_invLt = self.cma.transform_inverse(np.eye(self.N))
def update(self):
# compute defference of the distribution parameters between before and after update
new_Lt = self.cma.transform(np.eye(self.N))
# L is supposed to be the square root, not cholesky
delta_xmean = self.cma.xmean - self.old_M
delta_xmean = np.dot(self.old_invLt.T, delta_xmean)
delta_cov = np.dot(new_Lt, self.old_invLt)
delta_cov = np.dot(delta_cov.T, delta_cov) - np.eye(self.N)
# If L is not the square root, but cholesky, the following should work.
# invDD, B = np.linalg.eigh(np.dot(self.old_invLt, self.old_invLt.T))
# invSqrtC = np.dot(B, (B * np.sqrt(invDD)).T)
# delta_xmean = self.cma.xmean - self.old_M
# delta_xmean = np.dot(invSqrtC, delta_xmean)
# delta_cov = np.dot(invSqrtC, new_Lt)
# delta_cov = np.dot(delta_cov, delta_cov.T) - np.eye(self.N)
# update the evolution path
factor = self.cma.expected_update_snorm()
pnormfac = math.sqrt(self.psa_beta * (2 - self.psa_beta) / factor)
self.pxmean *= (1 - self.psa_beta)
self.pxmean += pnormfac * delta_xmean
self.pcov *= (1 - self.psa_beta)
self.pcov += pnormfac * delta_cov
# compute the norm of the evolution path
self.pmnorm = np.sum(self.pxmean ** 2)
self.pcnorm = np.sum(self.pcov ** 2) / 2
self.pnorm = self.pmnorm + self.pcnorm
self.pcnorm_list.append(self.pmnorm)
self.pmnorm_list.append(self.pcnorm)
# update the normarization factor of the evolution path
self.pt_factor = (1 - self.psa_beta)**2 * self.pt_factor + (self.psa_beta * (2 - self.psa_beta))
# update the population size
self.lam_dash *= math.exp(1. / self.psa_dt * (self.pt_factor - self.pnorm / self.psa_alpha))
self.lam_dash = min(max(self.lam_dash, self.lam_min), self.lam_max)
lam = self.psa_granularity * int(round(self.lam_dash / self.psa_granularity))
self.cma.popsize = lam # do the whole job, including sigma normalization
# record
self.old_M = self.cma.xmean.copy()
self.old_invLt = self.cma.transform_inverse(np.eye(self.N))
class InjectionHelper(object):
def __init__(self, dim, clip_x=None, clip_m=None):
"""Injection Mechanism based on [Hansen 11]"""
self.N = dim
self.clip_x = clip_x if clip_x else math.sqrt(self.N) + 2.0 * self.N / (self.N + 2.0)
self.clip_m = clip_m if clip_m else math.sqrt(2.0 *self.N) + 2.0 * self.N / (self.N + 2.0)
def clip(self, a, b):
assert a > 0 and b > 0
return 1.0 if a > b else a / b
def x_for_inject_direc(self, v, xmean, func_transform_inverse, random=True):
"""Generate x corresponding to the give direction v
x = xmean + t * v / norm(transorm_inverse(v)),
where t is norm(randn(N)) if random option is True, sqrt(N) otherwise.
"""
if random:
t = np.linalg.norm(np.random.randn(len(v)))
else:
t = np.sqrt(len(v))
x = xmean + (t / np.linalg.norm(func_transform_inverse(v))) * v
return x
def x_for_inject_grad(self, v, xmean, func_transform, random=True):
"""Generate x corresponding to the give gradient c
x = xmean + t * transform(transform(v)) / norm(transorm(v)),
where t is norm(randn(N)) if random option is True, sqrt(N) otherwise.
"""
if random:
t = np.linalg.norm(np.random.randn(len(v)))
else:
t = np.sqrt(len(v))
sqrtcv = func_transform(v)
x = xmean + (t / np.linalg.norm(sqrtcv)) * func_transform(sqrtcv)
return x
def scaled_step_for_inject_x(self, x, xmean, func_transform_inverse):
"""Generate a step y corresponding to the injected point x
The step-length is scaled so that it will be comparable to the other solutions.
"""
y = np.array(x, copy=True) - xmean
z = func_transform_inverse(y)
y *= self.clip(self.clip_x, np.linalg.norm(z))
return y
def scaled_step_for_inject_mean(self, x, xmean, muw, func_transform_inverse):
"""Generate a mean shift vector dm corresponding to the injected mean x
The step-length is scaled so that it will be comparable to the standard cases.
Note
----
if a mean vector is injected, set cmu = 0 and perform the other update.
"""
dm = x - xmean
dm *= self.clip(self.clip_m, np.sqrt(muw) * np.linalg.norm(func_transform_inverse(dm)))
return dm
class FuncWrapper:
"""Function Wrapper Class for Benchmark"""
def __init__(self, func, batcheval=False):
"""
Parameters
----------
func : callable
It takes numpy.ndarray and returns to the objective function values.
batcheval : bool
batch evaluation is available if it is true
"""
self._func = func
self._batcheval = batcheval
def __call__(self, cma_solution, *args, **kwargs):
"""
Parameters
----------
cma_solution : (list of) CmaSolution (or its derived) class instance(s),
Returns
-------
(list of) cma_solution.arffeas (over-written)
"""
if isinstance(cma_solution, list):
if self._batcheval:
xarr = np.asarray([sol.x_to_eval for sol in cma_solution])
farr = self._func(xarr, *args, **kwargs)
for i in range(len(cma_solution)):
cma_solution[i].arffeas = farr[i]
return list(farr)
else:
raise NotImplementedError
else:
assert isinstance(cma_solution, CmaSolution), "type(cma_solution) == {}".format(type(cma_solution))
cma_solution.arffeas = self._func(cma_solution.x_to_eval, *args, **kwargs)
return cma_solution.arffeas
class ABCCma(ABCIterativeAlgorithm):
"""Interface for Variations of CMA-ES
The main purpose of this interface is to support the functionarities
implemented in this file `cmaes_util` for any variations of CMA-ES.
"""
def __init__(self, opts):
"""Constructor of `ABCCma`
Parameters
----------
opts : ABCCmaOption
"""
super(ABCCma, self).__init__(opts)
assert hasattr(self.opts, 'N')
assert hasattr(self.opts, 'lam')
# Tolerance Checker
if self.opts.check_tolf_iter > 0:
self.tolf_checker = TolfChecker(self.opts.check_tolf_iter)
self.tolf = self.opts.check_tolf
self.tolfrel = self.opts.check_tolfrel
if self.opts.check_tolx_iter > 0:
self.tolx_checker = TolxChecker(self.opts.N, self.opts.check_tolx_iter)
self.tolx = self.opts.check_tolx
self.tolxrel = self.opts.check_tolxrel
# Mixed Integer
if self.opts.mixed_integer_stair_width is not None:
self.mixed_integer_handler = MixedIntegerHandler(self.opts.mixed_integer_stair_width,
minstd=self.opts.mixed_integer_minstd,
correction_mode=self.opts.mixed_integer_correction_mode,
flg_integer_mutation=self.opts.mixed_integer_flg_integer_mutation)
# Periodic Variable
if self.opts.periodic_lower is not None and self.opts.periodic_upper is not None:
self.periodic_handler = PeriodicVariableHandler(
lower=self.opts.periodic_lower,
upper=self.opts.periodic_upper)
# Box Constraint
if self.opts.lower_bound is not None or self.opts.upper_bound is not None:
if self.opts.box_constraint_handler in ('mirror'):
self.box_constraint_handler = MirroringBoxConstrainHandler(
lower=self.opts.lower_bound,
upper=self.opts.upper_bound)
if not hasattr(self, 'periodic_handler'):
raise UserWarning('Periodic Helper with period (1.5 * LB - 0.5 * UB, 1.5 * UB - 0.5 * LB) is desired to be used with Mirroring BCH.')
elif self.opts.box_constraint_handler in ('hansen09', 'sakamoto17'):
self.box_constraint_handler = AdaptivePenaltyFunctionBoxConstraintHandler(
lower_bound=self.opts.lower_bound,
upper_bound=self.opts.upper_bound,
mode=self.opts.box_constraint_handler)
else:
raise NotImplementedError('Unsupported value {} for opts.box_constraint_handler'.format(self.opts.box_constraint_handler))
# Linear Constraint
if self.opts.lch_b is not None and self.opts.lch_A is not None:
if self.opts.lch_method in ('vLCH', 'eLCH'):
# TODO: AdaptivePenaltyFunctionBoxConstraintHandler should not be used at the same time
self.linear_constraint_handler = AdaptivePenaltyFunctionLinearConstraintHandler(
matA=self.opts.lch_A,
vecb=self.opts.lch_b,
method=self.opts.lch_method)
else:
raise NotImplementedError(
'Unsupported value {} for opts.lch_method'.format(self.opts.lch_method))
# Quantifiable/Relaxable/Simulation-based/Known (QRSK) Constraint
# if self.opts.qrsk_constraint is not None:
# if self.opts.qrsk_constraint_handler == "augmented_lagrangian":
# self.qrsk_constraint_handler = AugmentedLagrangianConstraintHandler(*self.opts.qrsk_constraint)
# else:
# raise NotImplementedError(
# 'Unsupported value {} for opts.qrsk_constraint_handler'.format(self.opts.qrsk_constraint_handler))
# Uncertainty
if self.opts.uncertainty:
if (hasattr(self, 'box_constraint_handler') and isinstance(self.box_constraint_handler, AdaptivePenaltyFunctionBoxConstraintHandler)):
assert self.opts.uh_eps_reev == 0, "self.opts.uh_eps_reev must be 0 if combined with `AdaptivePenaltyFunctionBoxConstraintHandler`"
self.uncertainty_handler = UncertaintyHandler(
N=self.opts.N,
r_lam=self.opts.uh_r_lam,
theta=self.opts.uh_theta,
alpha_sigma=self.opts.uh_alpha_sigma,
alpha_teval=self.opts.uh_alpha_teval,
teval_min=self.opts.uh_teval_min,
teval_max=self.opts.uh_teval_max,
eps_reev=self.opts.uh_eps_reev,
cut_limit=self.opts.uh_cut_limit,
cum_noise=self.opts.uh_cum_noise,
batch_evaluation=self.opts.batch_evaluation,
additive_gauss=self.opts.additive_gauss)
# Population Size Adaptation. Note: do not forget `register` cma
if self.opts.psa:
self.popsize_adaptation = PopSizeAdaptation(lam_min=self.opts.psa_lam_min,
lam_max=self.opts.psa_lam_max,
alpha=self.opts.psa_alpha,
beta=self.opts.psa_beta,
dt=self.opts.psa_dt,
granularity=self.opts.psa_granularity)
# Injection
self.injection_helper = InjectionHelper(
dim=self.opts.N,
clip_x=self.opts.injection_clip_x,
clip_m=self.opts.injection_clip_m)
self.best_from_last = None
self.best_so_far = None
self.arxinj = []
# Solution Class
self.cls_solution = self.opts.cls_solution
# Record
self.fbest = np.inf
self.neval = 0
@property
@abstractmethod
def popsize(self):
"""int : population size
Notes
-----
Setting the popsize should update all the relevant parameters such as
recombination weights and learning rates.
If necessary, the step-size normalization should be done here.
"""
pass
@popsize.setter
@abstractmethod
def popsize(self, pop_size):
pass
@property
@abstractmethod
def xmean(self):
""":obj:`numpy.ndarray` (1 dimensional) : Mean vector"""
pass
@xmean.setter
@abstractmethod
def xmean(self, x):
pass
@property
@abstractmethod
def coordinate_std(self):
""":obj:`numpy.ndarray` (1 dimensional) : coordinate-wise standard deviation"""
pass
@abstractmethod
def multiply_covariance_by(self, d):
"""Multiply the covariance matrix by the diagonal matrix
Parameter
---------
d : float or 1D array
a scalar or a vector of elements of a diagonal matrix
Note
----
In case of full covariance matrix, one might need to re-compute
the eigen decomposition since d * C * d may be sufficiently different
from C.
Naive way is as follows:
C *= d
C *= d[np.newaxis].T
"""
pass
@abstractmethod
def transform(self, x):
"""Transform the covariance of a random vector from I to S = s**2*C
Parameters
----------
x : 1D or 2D array-like
vector(s) with zero mean and identity covariance
Returns
-------
xt (same type as x) : transformed to make covariance S
Note
----
Naive way is as follows
DD, B = np.linalg.eigh(C)
D = np.sqrt(D)
sqrtS = np.dot(B * (sigma * D), B.T)
return np.dot(x, sqrtS)
"""
pass
@abstractmethod
def transform_inverse(self, x):
"""Transform the covariance of a random vector from S = s**2*C to I
Parameters
----------
x : 1D or 2D array-like
vector(s) with zero mean and covariance S
Returns
-------
xt (same type as x) : transformed to make covariance I
Note
----
Naive way is as follows
DD, B = np.linalg.eigh(C)
D = np.sqrt(D)
invsqrtS = np.dot(B / (sigma * D), B.T)
return np.dot(x, invsqrtS)
"""
pass
@abstractmethod
def _update(self, pop):
"""Update the distribution parameters and others
Parameters
----------
pop : list of CmaSolution (or its derived class) instances
"""
pass
@property
def measure(self):
return self.neval
@property
def criterion(self):
return self.fbest
@property
def recommendation(self):
return self.xmean
def expected_update_snorm(self):
"""Compute the expected square norm of the normalized parameter update
Returns
-------
float
expected square norm of the normalized parameter update
"""
raise NotImplementedError('Necessary for PopSizeAdaptation')
def mutate(self, x, mut_strength=1.0):
"""Mutate x with a given mutation strength
Parameters
----------
x : 1D or 2D array-like
vector(s) to be mutated
mut_strength : float, default = 1.0
mutation strength
Returns
-------
mutants (same shape as x)
"""
x_ = np.asarray(x)
z = np.random.randn(*x_.shape)
return x_ + mut_strength * self.transform(z)
def repair(self, x):
"""Repair Solution
Parameters
----------
x : 1D or 2D array-like
vector(s) to be repaired
Returns
-------
repaired vectors (same shape as x) to be evaluated
"""
# Linear Constraint Support
if hasattr(self, 'linear_constraint_handler'):
if isinstance(self.linear_constraint_handler, AdaptivePenaltyFunctionLinearConstraintHandler):
return self.linear_constraint_handler.repair(x)
else:
self.raise_value_error('linear_constraint_handler')
# Box Constraint Support
elif hasattr(self, 'box_constraint_handler'):
if isinstance(self.box_constraint_handler, AdaptivePenaltyFunctionBoxConstraintHandler):
return self.box_constraint_handler.repair(x)
elif isinstance(self.box_constraint_handler, MirroringBoxConstrainHandler):
return self.box_constraint_handler.get_mirrored_x(x)
else:
self.raise_value_error('box_constraint_handler')
else:
return x
def inject_solution(self, pop):
"""Inject Solutions
Parameters
----------
pop : list of CmaSolution (or its derived class) instances
"""
self.arxinj += pop
def _check(self):
"""Inherited from ABCIterativeAlgorithm"""
is_satisfied, condition = super(ABCCma, self)._check()
if not is_satisfied:
is_satisfied, condition = self._check_tolerance()
return is_satisfied, condition
def _onestep(self):
"""Inherited from ABCIterativeAlgorithm"""
assert hasattr(self, 'func')
# Population Size Adaptation
if hasattr(self, 'popsize_adaptation'):
self.popsize_adaptation.register_cma(self)
# Best Solution Injection
if self.opts.injection_mode == 'best' and self.best_so_far is not None:
self.inject_solution([self.best_so_far])
# Augmented Lagrangian
# if (hasattr(self, 'qrsk_constraint_handler') and
# isinstance(self.qrsk_constraint_handler, AugmentedLagrangianConstraintHandler)):
# xmean_solution = self.cls_solution(self.xmean)
# xmean_solution.arxfeas[:] = self.repair(xmean_solution.arxraw + xmean_solution.arxmut)
# xmean_solution.x_to_eval[:] = xmean_solution.arxfeas[:]
# self.qrsk_constraint_handler.update(xmean_solution,
# self.evaluate_objective(xmean_solution))
# Sampling and Evaluation
pop, nevals = self.sample_and_evaluate(self.opts.N, self.popsize, parallel=self.opts.parallel)
assert len(nevals) == 1, 'counting the numbers of calls of multiple functions not supported yet.'
# Uncertainty Support
if hasattr(self, 'uncertainty_handler'):
# TODO: Combination with Constraint Handler not yet supported
pop_re, arf_re, neval_re = self.uncertainty_handler.measure_noise(pop, self.mutate)
self.neval += neval_re + self.uncertainty_handler.teval * nevals[0]
teval, sigmafac = self.uncertainty_handler.uncertainty_handling(1.0)
self.multiply_covariance_by(sigmafac)
idx = self.uncertainty_handler.argsort(np.asarray([indi.arffeas for indi in pop]),
np.asarray([indi.arffeas for indi in pop_re]))
for i in range(len(pop)):
# In this case `arfit` is the ranking starting from 0
pop[idx[i]].arfit = i
else:
self.neval += nevals[0]
# Update
self._update(pop)
# Population Size Adaptation
if hasattr(self, 'popsize_adaptation'):
self.popsize_adaptation.update()
# Mixed Integer
if hasattr(self, 'mixed_integer_handler'):
std = self.mixed_integer_handler.std_correction(self.coordinate_std)
self.multiply_covariance_by(std)
# Periodic
if hasattr(self, 'periodic_handler'):
m, D = self.periodic_handler.project_distribution(self.xmean, self.coordinate_std)
self.multiply_covariance_by(D)
self.xmean = m
# Finalize
self.pop = pop
self.fbest = np.min([indi.arfit for indi in pop])
self.best_from_last = pop[np.argmin([indi.arffeas for indi in pop])]
if self.best_so_far is None or self.best_so_far.arffeas >= self.best_from_last.arffeas:
self.best_so_far = self.best_from_last
def sample_candidate(self, pop):
"""Generate Candidate Solutions
The following attributes are filled
arxraw : mutant vector
arxmut : integer mutation vector
arxfeas : repaired solution (originally arxraw + arxmut)
Parameters
----------
pop : list of CmaSolution (or its derived class) instances
list of (empty) candidate solutions
Returns
-------
pop : list of filled candidate solutions
Support
-------
* Mirrored Mutation
* Integer Mutation
* Repair Operator
* Periodic Variable
See
---
`MixedIntegerHandler`, `mutate`, `repair`
"""
n_sample = len(pop)
# arxraw
for idx in range(n_sample):
indi = pop[idx]
if not self.opts.mirror or idx % 2 == 0:
indi.arxraw[:] = self.mutate(self.xmean, mut_strength=1.0)
else:
indi.arxraw[:] = 2.0 * self.xmean - pop[idx - 1].arxraw
# arxmut
if hasattr(self, 'mixed_integer_handler'):
if not isinstance(self.mixed_integer_handler, MixedIntegerHandler):
self.raise_value_error('mixed_integer_handler')
else:
bestx = self.best_from_last.arxfeas if self.best_from_last else None
arxmut = self.mixed_integer_handler.integer_mutation(n_sample, self.xmean, self.coordinate_std, bestx=bestx)
else:
arxmut = np.zeros((n_sample, len(pop[0].arxmut)))
# arxfeas and x_to_eval
for idx in range(n_sample):
indi = pop[idx]
indi.arxmut[:] = arxmut[idx]
indi.arxfeas[:] = self.repair(indi.arxraw + indi.arxmut)
# Periodic Support
if hasattr(self, 'periodic_handler'):
indi.x_to_eval[:] = self.periodic_handler.project_x(indi.arxfeas)
else:
indi.x_to_eval[:] = np.asarray(indi.arxfeas)
return pop
def evaluate_objective(self, pop, parallel=False):
"""Compute objective function value
pop[i].arffeas <- f(pop[i].x_to_eval)
Parameters
----------
pop : list of CmaSolution
solutions to be evaluated
parallel : bool, optional
evaluation will be parallelized if it is True.
Returns
list of objective values (float)
"""
assert hasattr(self, 'func')
# Uncertainty Handling Support
if hasattr(self, 'uncertainty_handler'):
self.uncertainty_handler.func = self.func
fobj = self.uncertainty_handler.eval
# def fobj(x):
# assert isinstance(x, CmaSolution)
# if self.opts.additive_gauss:
# f = self.func(x, n_average=self.uncertainty_handler.teval)
# else:
# f = 0.0
# for t in range(self.uncertainty_handler.teval):
# f += self.func(x)
# f /= self.uncertainty_handler.teval
# return f
else:
fobj = self.func
if isinstance(pop, CmaSolution):
return fobj(pop)
elif parallel:
# Parallel Evaluation
flist = fobj(pop)
# TODO: Below Not working. f-values are not set to pop
# lam = len(pop)
# n_cpu = multiprocessing.cpu_count()
# lam_cpu = np.linspace(0, lam, num=n_cpu+1, endpoint=True, dtype=int)
# subpop_list = [pop[lam_cpu[i]:lam_cpu[i+1]] for i in range(n_cpu)]
# with multiprocessing.Pool(processes=n_cpu) as pool:
# flist_list = pool.map(self.evaluate_objective, subpop_list)
# flist = []
# for fsublist in flist_list:
# flist += fsublist
else:
# Serial Evaluation
flist = [fobj(indi) for indi in pop]
return flist
def sample_and_evaluate(self, dim, lam, parallel=False):
"""Sample and Evaluate
Parameters
----------
dim : int
dimension
lam : int
number of candidate solutions
mueff : float, optional
1 / sum of (cm * wi) ** 2
cm : learning rate for mean
wi : recombination weights used for m-update
It is only necessary for `AdaptivePenaltyFunctionBoxConstraintHandler`
parallel : bool, optional
parallel evaluation will be performed if it is True
Returns
-------
list of CmaSolution (or its derived class) instances : solution set including injected solutions
1D array-like : number of function calls
"""
n_injected = len(self.arxinj)
lam_new = lam - n_injected
# Execution
total_nevals = np.zeros(1)
# Resampling Support
if self.opts.resampling > 1:
pop = []
for i in range(lam_new):
idx_feas, resampled, nevals = resample(dim=dim,
sample_func=self.sample_candidate,
func_list=[partial(self.evaluate_objective, parallel=parallel)],
non_positive_indices=None,
maxresample=self.opts.resampling,
cls=self.cls_solution)
resampled[-1].arffeas = resampled[-1].values[0]
pop.append(resampled[-1])
if len(nevals) > 1:
# TODO: nevals for constraint not supported yet
raise NotImplementedError('Only resampling for objective is supported.')
total_nevals += np.asarray(nevals)
else:
pop = [self.cls_solution(np.zeros(dim) * np.nan) for i in range(lam_new)]
pop = self.sample_candidate(pop)
self.evaluate_objective(pop, parallel=parallel)
total_nevals += np.asarray([lam_new])
# Evaluate Injected
for indi in self.arxinj:
if np.isnan(indi.arffeas):
indi.arxfeas[:] = self.repair(indi.arxraw + indi.arxmut)
# Periodic Support
if hasattr(self, 'periodic_handler'):
indi.x_to_eval[:] = self.periodic_handler.project_x(indi.arxfeas)
else:
indi.x_to_eval[:] = np.asarray(indi.arxfeas)
self.evaluate_objective(indi)
total_nevals[0] += 1
pop += [indi]
self.arxinj = []
# Penalty
# Linear Constraint Support
if (hasattr(self, 'linear_constraint_handler') and
isinstance(self.linear_constraint_handler, AdaptivePenaltyFunctionLinearConstraintHandler)):
assert hasattr(self, 'mueff')
self.linear_constraint_handler.update_penalty_factor(self.mueff,
[indi.arffeas for indi in pop],
self.xmean,
self.coordinate_std,
self.transform)
for indi in pop:
indi.arpen = self.linear_constraint_handler.compute_penalty(indi.arxraw + indi.arxmut, indi.arxfeas)
elif (hasattr(self, 'box_constraint_handler') and
isinstance(self.box_constraint_handler, AdaptivePenaltyFunctionBoxConstraintHandler)):
assert hasattr(self, 'mueff')
self.box_constraint_handler.update_penalty_factor(lam, self.mueff, [indi.arffeas for indi in pop], self.xmean, self.coordinate_std)
for indi in pop:
indi.arpen = self.box_constraint_handler.compute_penalty(indi.arxraw + indi.arxmut, indi.arxfeas, self.coordinate_std)
# elif (hasattr(self, 'qrsk_constraint_handler') and
# isinstance(self.qrsk_constraint_handler, AugmentedLagrangianConstraintHandler)):
# for indi in pop:
# indi.arpen = self.qrsk_constraint_handler(indi)
elif (hasattr(self, 'stochastic_constraint_handler') and
isinstance(self.stochastic_constraint_handler, StochasticConstraintHandler)):
pop = self.stochastic_constraint_handler(pop)
else:
for indi in pop:
indi.arpen = 0.0
for indi in pop:
indi.arfit = indi.arffeas + indi.arpen
return pop, total_nevals
def _check_tolerance(self):
"""Check Tolerance"""
is_satisfied = False
condition = ''
if hasattr(self, 'tolf_checker'):
self.tolf_checker.update([indi.arfit for indi in self.pop])
if not is_satisfied and self.tolf_checker.check_absolute(self.tolf):
is_satisfied = True
condition = 'tolf'
if not is_satisfied and self.tolf_checker.check_relative(self.tolfrel):
is_satisfied = True
condition = 'tolfrel'
if not is_satisfied and self.tolf_checker.check_flatarea():
is_satisfied = True
condition = 'flatarea'
if hasattr(self, 'tolx_checker'):
self.tolx_checker.update([indi.arxraw for indi in self.pop])
if not is_satisfied and self.tolx_checker.check_absolute(self.tolx):
is_satisfied = True
condition = 'tolx'
if not is_satisfied and self.tolx_checker.check_relative(self.tolxrel):
is_satisfied = True
condition = 'tolxrel'
return is_satisfied, condition
def raise_value_error(self, attribute_name):
raise ValueError("Unsupported type {} for self.{}".format(type(getattr(self, attribute_name)), attribute_name))
class ABCCmaOption(IterativeAlgorithmOption):
def __init__(self,
# Termination
check_tolf_iter='20 + N // lam # tolf is disabled if 0',
check_tolf='check_target / 1e3',
check_tolfrel='1e-12',
check_tolx_iter='20 + N // lam # tolx is disabled if 0',
check_tolx='1e-10',
check_tolxrel='1e-12',
# Mixed Integer
mixed_integer_stair_width='None # stair width',
mixed_integer_minstd='0 # minimum coordinate_std',
mixed_integer_correction_mode='"sigma" # correction mode',
mixed_integer_flg_integer_mutation='True # flag for integer mutation',
# Uncertainty
uncertainty='False',
uh_r_lam=0.1,
uh_theta=0.2,
uh_alpha_sigma=None,
uh_alpha_teval=1.5,
uh_teval_min=1,
uh_teval_max=np.inf,
uh_eps_reev=1e-7,
uh_cut_limit=np.inf,
uh_cum_noise=1,
batch_evaluation=repr(False),
additive_gauss=repr(False),
# Resampling
resampling='1 # number of resampling',
# Periodic
periodic_lower='None # lower bound of periodic variables',
periodic_upper='None # upper bound of periodic variables',
# Box Constraint
lower_bound='None # None or ndarray',
upper_bound='None # None or ndarray',
box_constraint_handler='"mirror" # ("mirror", "hansen09", "sakamoto17")',
# Linear Constraint Handler
lch_A='None # A in np.dot(A, x) <= b',
lch_b='None # b in np.dot(A, x) <= b ',
lch_method='"vLCH" # ("vLCH", "eLCH")',
# Quantifiable/Relaxable/Simulation-based/Known (QRSK) Constraint Handler
# qrsk_constraint='None # tuple of constraint functions',
# qrsk_constraint_handler='"augmented_lagrangian"',
# Injection Helper
injection_clip_x='None # float of clip for x injection',
injection_clip_m='None # float of clip for mean injection',
injection_mode='None # "best" or "initial" or None',
# Mirrored Sampling
mirror='False # (bool) Mirrored Sampling Mode',
# Parallel Evaluation
parallel='False # (bool) Parallel Function Evaluation',
# Population Size Adaptation
psa='False # (bool) Population Size Adaptation Mode',
psa_lam_min='4 # minimum population size',
psa_lam_max='np.inf # maximum population size',
psa_alpha='1.4 # threshold',
psa_beta='0.5 # cumulation factor for normalized update',
psa_dt='1/float(psa_beta) # dampling for popsize update',
psa_granularity='1 # guranularity of adapted popsize',
# Solution Class
cls_solution=CmaSolution,
# Others
**kwargs):
super(ABCCmaOption, self).__init__(**kwargs)
self.setattr_from_local(locals())
class Cma(ABCCma):
"""Covariance Matrix Adaptation Evolution Strategy for Continuous Optimization
"""
@property
def popsize(self):
return self._lam
@popsize.setter
def popsize(self, pop_size):
self._lam = pop_size
opts = self.opts_original
opts.lam = self._lam
parsed = opts.parse()
self._w = parsed.w # TODO:
self.mueff = parsed.mueff # TODO:
self.cm = parsed.cm
self.cc = parsed.cc
self.cone = parsed.cone
self.cmu = parsed.cmu
self.ssa = parsed.ssa
if self.ssa == 'TPA':
self.cs = parsed.tpa_cs
self.ds = parsed.tpa_ds
if not hasattr(self, 'ps'):
self.ps = 0
elif self.ssa == 'CSA':
self.cs = parsed.csa_cs
self.ds = parsed.csa_ds
if not hasattr(self, 'ps'):
self.ps = np.zeros(self.N)
self.ps_factor = 0.0
elif self.ssa == 'NCSA':
self.cs = parsed.ncsa_cs
self.ds = parsed.ncsa_ds
if not hasattr(self, 'ps'):
self.ps = np.zeros(self.N)
elif self.ssa == 'HSA':
self.cs = parsed.hsa_cs
self.ds = parsed.hsa_ds
self.asig = parsed.hsa_as
if not hasattr(self, 'ps'):
self.ps = np.zeros(self.N)
elif self.ssa == 'MICSA':
self.mixed_integer_handler.cs = parsed.csa_cs
self.mixed_integer_handler.ds = parsed.csa_ds
elif self.ssa == 'NoSSA':
pass
else:
raise NotImplementedError('ssa = ' + self.ssa +
' is not implemented.')
self._w = pre_weight(self.N, self._lam, self._w, self.cone, self.cmu)
if hasattr(self, '_last_alpha'):
# update step-size
next_alpha = sigma_normalization_factor(self.N, self._w, self.cm)
self.sigma *= next_alpha / self._last_alpha
self._last_alpha = next_alpha
else:
self._last_alpha = sigma_normalization_factor(self.N, self._w, self.cm)
@property
def xmean(self):
return self._xmean
@xmean.setter
def xmean(self, x):
self._xmean[:] = x
@property
def coordinate_std(self):
return self.sigma * np.sqrt(np.diag(self.cov))
def multiply_covariance_by(self, d):
s = math.exp(np.mean(np.log(d)))
nd = d / s
if np.any(nd != 1):
self.cov *= nd[np.newaxis].T
self.cov *= nd
self.sigma *= s
else:
self.sigma *= s
def transform(self, x):
return np.dot(np.asarray(x), self.sqrtC) * self.sigma
def transform_inverse(self, x):
return np.dot(np.asarray(x), self.invsqrtC) / self.sigma
def _decompose(self, det_normalize=False):
"""Decompose the covariance matrix and update relevant parameters"""
DD, self.B = np.linalg.eigh(self.cov)
self.D = np.sqrt(DD)
if not (self.D.max() < self.D.min() * 1e7):
raise RuntimeError('Condition number > 1e7 or nan appears.')
if det_normalize:
gamma = np.exp(np.mean(np.log(self.D)))
self.D /= gamma
self.pc /= gamma
self.cov /= gamma ** 2
self.sqrtC = np.dot(self.B * self.D, self.B.T)
self.invsqrtC = np.dot(self.B / self.D, self.B.T)
self.eigneval = self.measure
return gamma if det_normalize else 1
def _update(self, pop):
"""Update the distribution parameters and others
See Also : `ABCCma`
Parameters
----------
pop : list of CmaSolution (or its derived class) instances
"""
idx = np.argsort([indi.arfit for indi in pop])
sary = np.asarray([(pop[i].arxraw - self.xmean) / self.sigma for i in idx])
sarx = np.asarray([pop[i].arxraw + pop[i].arxmut for i in idx])
lam = len(pop)
# idx = pop.idx
# sary = (pop.arxraw[idx] - self.xmean) / self.sigma
# sarx = pop.arxraw[idx] + pop.arxmut[idx]
# lam = pop.arxraw.shape[0]
w = weight(pop, self.xmean, self.transform_inverse,
base_weight=self._w, cone=self.cone, cmu=self.cmu)
mueff = np.sum(w[w > 0]) / np.sum(w[w > 0] ** 2) if np.any(w > 0) else 0
self.w = w # used for PSA
# TODO: break if mueff == 0 (no positive update performed)
# at this moment, negative update is also killed
if mueff == 0:
return
# Update sigma
if self.ssa == 'CSA':
self.ps_factor = (1 - self.cs) ** 2 * self.ps_factor + self.cs * (2 - self.cs)
self.ps *= (1 - self.cs)
self.ps += math.sqrt(self.cs * (2 - self.cs) * mueff) * self.sigma * self.transform_inverse(np.dot(w[w > 0], sary[w > 0]))
normsquared = np.sum(self.ps * self.ps)
self.sigma *= math.exp((math.sqrt(normsquared) / self.chiN - math.sqrt(self.ps_factor)) * self.cs / self.ds)
# hsig = normsquared < (2.0 + 4.0 / (self.N + 1)) * self.N
hsig = normsquared < ((1.4 + 2.0 / (self.N + 1)) * self.chiN * math.sqrt(self.ps_factor)) ** 2
elif self.ssa == 'TPA':
# find symmetric points
nsary = sary / np.linalg.norm(sary, axis=1)[np.newaxis].T
ndx = self.dx / np.linalg.norm(self.dx)
for ip in range(lam):
if np.allclose(nsary[ip], ndx):
break
for im in range(lam):
if np.allclose(nsary[im], -ndx):
break
# update
if ip < lam and im < lam:
alpha_act = im - ip
alpha_act /= float(lam - 1)
self.ps += self.cs * (alpha_act - self.ps)
self.sigma *= math.exp(self.ps / self.ds)
hsig = self.ps < 0.5
else:
hsig = True
elif self.ssa == 'HSA':
# Akimoto. Ph.D. Thesis.
self.ps *= (1 - self.cs)
self.ps += math.sqrt(self.cs * (2 - self.cs) * mueff) * self.sigma * self.transform_inverse(np.dot(w[w > 0], sary[w > 0]))
musig = np.dot(w[w > 0], np.linalg.norm(self.transform_inverse(sary[w > 0]), axis=1)) * self.sigma
psnorm = np.linalg.norm(self.ps)
self.sigma *= math.exp((((1 - self.asig) * musig + self.asig * psnorm) / self.chiN - 1) / self.ds)
hsig = psnorm**2 < (2 + 4 / (self.N + 1)) * self.N
elif self.ssa == 'NCSA':
nstep = np.dot(w[w > 0], self.transform_inverse(sary[w > 0])) * self.sigma
self.ps *= (1 - self.cs)
self.ps += math.sqrt(self.cs * (2 - self.cs) / np.sum(nstep * nstep)) * nstep
normsquared = np.sum(self.ps * self.ps)
self.sigma *= math.exp((normsquared - 1) * self.cs / self.ds)
hsig = normsquared < 2.0
elif self.ssa == 'MICSA':
sigfac, hsig = self.mixed_integer_handler.mixed_integer_csa(w, self.sigma * self.transform_inverse(sary), self.coordinate_std)
self.sigma *= sigfac
elif self.ssa == 'NoSSA':
hsig = 1
else:
raise NotImplementedError('ssa = ' + self.ssa + ' is not implemented.')
# Update xmean
self.dx = np.dot(w[w > 0], sarx[w > 0]) - np.sum(w[w > 0]) * self.xmean
self.xmean += self.cm * self.dx
# Cumulation
self.pc = (1 - self.cc) * self.pc + hsig * math.sqrt(self.cc * (2 - self.cc) * mueff) * np.dot(w[w > 0], sary[w > 0])
self.pc_factor = (1 - self.cc) ** 2 * self.pc_factor + hsig * self.cc * (2 - self.cc)
# Rank-mu
if self.cmu == 0:
rank_mu = np.zeros((self.N, self.N))
elif self.opts.active_update:
rank_mu = np.dot(sary.T * w, sary) - np.sum(w) * self.cov
else:
rank_mu = np.dot(sary[w > 0].T * w[w > 0], sary[w > 0]) - np.sum(w[w > 0]) * self.cov
# Rank-one
if self.cone == 0:
rank_one = np.zeros((self.N, self.N))
else:
rank_one = np.outer(self.pc, self.pc) - self.pc_factor * self.cov
self.cov += self.cmu * rank_mu + self.cone * rank_one
# update the square root of the covariance matrix
if (self.neval - self.eigneval) > self.opts.eigen_frequency:
self._decompose(det_normalize=self.opts.det_normalization)
# Inject Two Symmetric Solutions for TPA
if self.ssa == 'TPA':
tmp = self.transform_inverse(self.dx)
dy = (np.linalg.norm(np.random.randn(self.N)) /
np.sqrt(np.dot(tmp, tmp))) * self.dx
inj = self.cls_solution(self.xmean + dy)
inj.arxmut[:] = 0.0
self.inject_solution([inj])
inj = self.cls_solution(self.xmean - dy)
inj.arxmut[:] = 0.0
self.inject_solution([inj])
def __init__(self, opts, func):
super(Cma, self).__init__(opts)
self.N = self.opts.N
self.chiN = np.sqrt(self.N) * (1.0 - 1.0 / (4.0 * self.N) + 1.0 / (21.0 * self.N * self.N))
self.func = func
self._xmean = np.array(self.opts.xmean)
if isinstance(self.opts.sigma, np.ndarray):
self.sigma = np.exp(np.log(self.opts.sigma).mean())
self.D = self.opts.sigma / self.sigma
else:
self.sigma = self.opts.sigma
self.D = np.ones(self.N)
self.cov = np.diag(self.D**2) # cov = B * D^2 * B^t
self.pc_factor = 0.0
self.pc = np.zeros(self.N)
self.dx = np.zeros(self.N) * np.nan
self.sigma *= self._decompose(det_normalize=True)
self.popsize = self.opts.lam
def expected_update_snorm(self):
"""Compute the expected square norm of the normalized parameter update
Returns
-------
float
expected square norm of the normalized parameter update
"""
N = self.N
w = self.w
if self.ssa == 'CSA':
sfactor_a6 = 4. * self.ps_factor * (N / self.chiN ** 2 - 1) * (self.cs / self.ds) ** 2
sfactor_a5 = 1 + sfactor_a6 * 2
elif self.ssa == 'NoSSA':
sfactor_a5 = 1.0
sfactor_a6 = 0.0
else:
raise NotImplementedError()
if self.opts.active_update:
v = w
else:
v = np.zeros(w.shape)
v[w > 0] = w[w > 0]
mfactor = self.cm**2 * N * np.sum(w[w > 0] ** 2)
rankmu_factor = (N * (N + 1)) * (self.cmu**2 * np.dot(v, v))
muone_factor = (N * (N + 1)) * (self.cc * (2.0 - self.cc) * self.cone * self.cmu * np.sum(w[w > 0]**2 * v[w > 0]) / np.sum(w[w > 0] ** 2))
rankone_factor = self.cone**2 * (self.pc_factor**2 * N**2 + (2 * self.pc_factor**2 - 2 * self.pc_factor + 1) * N)
cfactor = rankmu_factor + muone_factor + rankone_factor
return mfactor + (sfactor_a5 * cfactor + N * sfactor_a6) / 2
class CmaOption(ABCCmaOption):
def __init__(self,
N = '0',
sigma = '0.0',
xmean = 'np.zeros(N)',
lam = '4 + int(3 * np.log(N))',
active_update = 'False',
w = 'np.log((lam + 1) / 2.0) - np.log(np.arange(1, lam+1))',
mueff = 'np.sum(w[w>0])**2 / np.sum(w[w>0] ** 2)',
cm = '1 # learning rate for m-update',
cc='(4+mueff/N) / (N+4+2*mueff/N)',
cone='2 / ((N+1.3)**2+mueff)',
cmu='min(1-cone, 2 * (mueff-2+1/mueff) / ((N+2)**2+mueff))',
eigen_frequency='lam/(10*N*(cone+cmu)) # decomposition performed after given #f-calls',
ssa='"CSA" # ("CSA", "TPA", "NCSA", "HSA", "MICSA", "NoSSA")',
tpa_cs='0.3',
tpa_ds='np.sqrt(N)',
csa_cs='(mueff+2)/(N+mueff+5)',
csa_ds='1 + 2.0*max(0, np.sqrt((mueff-1)/(N+1))-1) + csa_cs',
ncsa_cs='3.0 / (6.0 + N/mueff**0.9)',
ncsa_ds='2.0',
hsa_cs='(mueff+2)/(N+mueff+5)',
hsa_ds='(1 + hsa_cs) / (1 + hsa_cs - hsa_as)',
hsa_as='np.sqrt((N + 1) / (N + mueff))',
det_normalization='False # (bool)',
**kwargs):
super(CmaOption, self).__init__(**kwargs)
self.setattr_from_local(locals())
class PsaCma(Cma):
pass
class PsaCmaOption(CmaOption):
def __init__(self,
# Initial Parameters of the Sampling Distribution
N='0',
sigma='0.0',
xmean='np.zeros(N)',
# Parameters for the Population Size Adaptaiton
psa='True # (bool) Population Size Adaptation Mode',
psa_lam_min='int(lam) # minimum population size',
psa_lam_max='np.inf # maximum population size',
psa_alpha='1.4 # threshold',
psa_beta='0.4 # cumulation factor for normalized update',
psa_dt='1/float(psa_beta) # dampling for popsize update',
psa_granularity='1 # guranularity of adapted popsize',
**kwargs):
super(PsaCmaOption, self).__init__(**kwargs)
self.setattr_from_local(locals())
if __name__ == '__main__':
## Requirements for the objective function
# argument `x`: a single candidate solution or a matrix of candidate solutions. np.ndarray (`x.ndim in (1, 2)`)
# - If `x.ndim == 2`, then `x.shape` is in the order (population size, objective function dimensionality).
# - For parallel simulation, it is necessary to accept `x` such that `x.ndim == 2`. (Parallelization must be implemented separately in the function.)
# The return value is an evaluation value or a one-dimensional array of evaluation values for each solution.
def sphere(x):
x = np.array(x)
if x.ndim == 1:
return np.sum(x**2)
elif x.ndim == 2:
return np.sum(x**2, axis=1)
else:
raise ValueError('x.ndim has to be <= 2, but given x.ndim was {}'.format(x.ndim))
def rastrigin(x):
a = 10.0
x = np.array(x)
if x.ndim == 1:
return np.sum(x**2 + a * (1 - np.cos(2 * np.pi * x)))
elif x.ndim == 2:
return np.sum(x**2 + a * (1 - np.cos(2 * np.pi * x)), axis=1)
else:
raise ValueError('x.ndim has to be <= 2, but given x.ndim was {}'.format(x.ndim))
## FuncWrapper
# Pass a function object to `func`.
# If multiple solutions can be evaluated in parallel, set `batcheval` to `True`.
# Otherwise (default), `batcheval=False`.
# fobj = FuncWrapper(func=sphere, batcheval=True)
fobj = FuncWrapper(func=rastrigin, batcheval=True)
## Configuration object generation
### Minimum parameters that must be entered.
# - `N`: Number of dimensions of the objective function.
# - `sigma`: Initial step size.
# - `xmean`: Initial mean vector
### Parameters that can be adjusted as needed
# - `psa_lam_min`: minimum population size, natural number, minimum `4`. Default `4 + floor(3*log(N))`.
# - `psa_lam_max`: maximum population size, natural number, minimum `4`. default `np.inf`.
# - `psa_granularity`: granularity of the population size, natural number, minimum `1`. Default `1`. The conversion of a real-valued population size to a natural number is made to a multiple of the value specified here.
# - `parallel`: `True` if the solutions are evaluated in parallel. Default `False`.
# - The solution population is evaluated in parallel only if `batcheval=True` in FuncWrapper and this `parallel` is set to `True`. In this case, the input to the objective function is a two-dimensional array.
# - Note that an error occurs if `batcheval=False` and `parallel=True`.
# - In other cases, the functions are evaluated one by one. In this case, the input to the destination function is a 1D array.
### Exit Conditions
# The terminating condition begins with `check_`.
# By default, the search continues until the distribution converges and no significant mutations are found on the computer.
# Example.)
# - `check_target`: The target for the best solution evaluation.
# - `check_maxruntime`: The maximum number of function evaluations.
# - `check_maxiter`: the maximum number of iterations. `check_maxiter`: The maximum number of updates of the distribution.
# - `check_maxsec`: The maximum execution time in seconds.
### Logging parameters
# The parameters starting with `log_`.
# - `log_display`: Display the progress (number of iterations, number of evaluations, elapsed time, and the value of the best solution obtained for the iteration) at runtime.
# - `log_prefix`: Prefix of the log file. The default is `"'. `log_prefix`, so that the log file is created as a hidden file.
# - `log_span`: The logging interval, which can be 0, a positive real number less than 1, or an integer greater than or equal to 1.
# - `0`: no logging.
# - `<1`: interval at which the ratio of the logging time to the algorithm computation time is around the specified value.
# - Integer: Log when the number of elapsed iterations is a multiple of the specified value.
# - `log_variable_list`: The list of parameters to log. If you enter an empty list, only the best solution values per iteration will be logged.
# Example.)
# - `xmean`: Mean vector of the distribution
# - `sigma`: Step size
# - `D`: Square root of the eigenvalues of the covariance matrix of the distribution
# - `popsize`: the population size (integer).
# - `popsize_adaptation.lam_dash`: population size (real number) kept inside PSA
opts = PsaCmaOption(N=10, sigma=2, xmean='sigma*np.ones(N)')
opts.psa_granularity = 10
opts.parallel = True
opts.check_target = 1e-8
opts.check_maxruntime = 1e5
opts.log_display = True
opts.log_prefix = repr('test')
opts.log_span = 0.1
opts.log_variable_list = repr(['xmean', 'D', 'sigma', 'popsize', 'popsize_adaptation.lam_dash'])
## Run
algo = PsaCma(opts=opts, func=fobj)
algo.run()
## plot
# The `plot` method allows you to generate a figure from a log file.
# - `prefix`: The log file prefix.
# - `variable_list`: The list of parameters to plot.
# - `xaxis`: value for the horizontal axis.
# - `0`: number of iterations.
# - `1`: number of evaluations.
# - `2`: execution time.
# It is possible to omit the `prefix` and `variable_list` by using the `plotme` method.
# It returns a `figure` object (first return value) for the entire figure and a `dictionary` (second return value) of `axis` objects for each parameter.
# - The figure can be modified after it has been generated, as in the following example. (The display will reflect this only when in `nbagg` mode.)
fig, axdict = algo.plot(prefix=algo.opts.log_prefix, variable_list=algo.opts.log_variable_list, xaxis=0)
#fig, axdict = algo.plotme()
for key in axdict:
if all(line.get_ydata().min() > 0 for line in axdict[key].get_lines()):
axdict[key].set_yscale('log')
# plt.savefig('test.eps')
plt.savefig('test.png')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment