import math
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
import ddcma
from ddcma import DdCma
""" should be imported from"""
class MyDdCma(DdCma):
def set_dynamic_parameters(self, init_matrix, init_S, init_sigma):
self.Z = np.array(init_matrix[0,:,:], copy=True)
self.C = np.array(init_matrix[1,:,:], copy=True)
self.B = np.array(init_matrix[2,:,:], copy=True)
self.sqrtC = np.array(init_matrix[3,:,:], copy=True)
self.invsqrtC = np.array(init_matrix[4,:,:], copy=True)
self.S = np.array(init_S[:], copy=True)
self.sigma = init_sigma.copy()
def mirror(z, lbound, ubound):
""" Mirroring constraint handling
z : np.ndarray (1D)
solution vector to be mirrorred
lbound, ubound : np.ndarray (1D)
lower and upper bound of the box constraint
np.ndarray (1D) : mirrorred solution
width = ubound - lbound
return ubound - np.abs(np.mod(z - lbound, 2 * width) - width)
class WRA:
""" Worst-case Raking Approximation (WRA) : approximation of the worst-case ranking
for the given solution candidates by approximately solving internal maximization problem max_{y} f(x, y) with using CMA-ES
def __init__(self, f, n, yb, boundy, tau_thr, cmax, Vmin, Tmin, init_y, init_ymean, init_D_y, init_matrix, init_S, init_sigma):
""" Initialization of WRA
f : callable f (a solution candidate, a scenario variable) -> float
objective function
n : int
number of dimension for y
yb : np.ndarray (2D)
lower and upper bound of the box constraint
boundy : bool
the domain for scenario variable is bounded (== initialization interval) if it is True
tau_thr : float
threshold of Kendall's tau for early stopping strategy
cmax : int
parameter for interrupting CMA-ES
If worse scenario is found cmax times, CMA-ES is interrupted.
Vmin : float
termination parameter for maximum standard deviation of CMA-ES
Tmin : int
minimum iteration for CMA-ES
init_y : np.ndarray (2D)
initial candidate vector of scenario variables
init_ymean : np.ndarray (2D)
initial mean vector of CMA-ES for each solution candidate
init_D_y : np.ndarray (2D)
initial coordinate-wise std of CMA-ES for each solution candidate
Initial dynamic parameters of CMA-ES for each solution candidate
init_matrix : np.ndarray (4D)
init_S : np.ndarray (2D)
init_sigma : np.ndarray (1D)
Followings are dynamic parameters for solution candidate i
init_matrix[i, 0, :,:] = Z
init_matrix[i, 1, :,:] = C
init_matrix[i, 2, :,:] = B
init_matrix[i, 3, :,:] = sqrt.C
init_matrix[i, 4, :,:] = invsqrt.C
init_S[i, :] = S
init_sigma[i] = sigma
self.f = f
self.n = n
self.yb = yb
self.tau_thr = tau_thr
self.cmax = cmax
self.Vmin = Vmin
self.Tmin = Tmin
self.init_ymean = init_ymean
self.init_D_y = init_D_y
self.yy = init_y
self.ymean = init_ymean
self.D_y = init_D_y
self.init_matrix = init_matrix
self.init_S = init_S
self.init_sigma = init_sigma
self.fcalls = 0
self.boundy = boundy
self.tau = 0
self.wrac = []
self.wraneval = []
self.Fnew = []
def __call__(self, arx):
return self.wra_ddcma(arx)
def wra_ddcma(self, x_t):
"""approximating the worst-case ranking of solution candidate
x^t : np.ndarray (2D)
solution candidates in iteration t
idr : np.ndarray (1D)
approximated ranking of solution candidates x^t
ymember = self.yy.shape[0]
lambdax = x_t.shape[0]
self.fcalls = 0
### Initialization part
fx_arr = np.array([[self.f(x, y) for y in self.yy] for x in x_t])
kworst = np.argmax(fx_arr, axis=1)
Fold = np.max(fx_arr, axis=1)
self.Fnew = Fold.copy()
self.fcalls += ymember * lambdax
ymean_tilde = np.zeros((lambdax, self.n))
D_y_tilde = np.zeros((lambdax, self.n))
y_tilde = np.zeros((lambdax, self.n))
init_matrix_tilde = np.zeros((lambdax, 5, self.n, self.n))
init_S_tilde = np.ones((lambdax, self.n))
init_sigma_tilde = np.ones(lambdax)
for i in range(lambdax):
y_tilde[i, :] = self.yy[kworst[i], :]
ymean_tilde[i, :] = self.ymean[kworst[i], :]
D_y_tilde[i, :] = self.D_y[kworst[i], :]
init_matrix_tilde[i, :, :, :] = self.init_matrix[kworst[i], :, :, :]
init_S_tilde[i, :] = self.init_S[kworst[i], :]
init_sigma_tilde[i] = self.init_sigma[kworst[i]]
cma_instances = [MyDdCma(ymean_tilde[i, :], D_y_tilde[i, :], flg_variance_update=False) for i in range(lambdax)]
for i in range(lambdax):
cma_instances[i].set_dynamic_parameters(init_matrix_tilde[i, :, :, :], init_S_tilde[i, :], init_sigma_tilde[i])
### Worst-case Ranking Approximation
h = np.ones(lambdax, dtype=bool)
updaten = np.zeros(lambdax)
tt = np.zeros(lambdax)
self.wrac = np.zeros(lambdax)
self.wraneval = np.zeros(lambdax)
self.tau = -1.0
while self.tau <= self.tau_thr:
for i in range(lambdax):
if h[i]==True:
wracma = cma_instances[i]
Fdash = Fold[i]
updaten[i] += 1
c = 0
while c<self.cmax:
wrax, wray, wraz = wracma.sample()
wracy = np.zeros((wracma.lam, self.n))
fy = np.zeros(wracma.lam)
for k in range(wracma.lam):
wracy[k, :]=wrax[k, :]
if self.boundy==True:
wracy[k, :]= mirror(wrax[k, :], self.yb[0, :], self.yb[1, :])
fy[k] = self.f(x_t[i,:], wracy[k,:])
self.fcalls +=1
self.wraneval[i] += 1
if max(fy) > Fdash:
yid = np.argmax(fy)
y_tilde[i, :] = wracy[yid, :]
Fdash = max(fy)
c += 1
self.wrac[i] += 1
idy = np.argsort(-fy)
wracma.update(idy, wrax, wray, wraz)
tt[i] +=1
if self.boundy==True:
wraD_y = wracma.D.copy()
for k in range(self.n):
wracma.D[k] = min(wraD_y[k], (self.yb[1, k] - self.yb[0, k])/4/wracma.sigma)
if np.max(wracma.coordinate_std) < self.Vmin and tt[i] >=self.Tmin:
h[i] = False
self.Fnew[i] = Fdash
self.tau, p_value = stats.kendalltau(self.Fnew, Fold)
Fold[:] = self.Fnew[:]
for i in range(lambdax):
wracma = cma_instances[i]
ymean_tilde[i, :] = wracma.xmean
D_y_tilde[i, :]= wracma.D
init_matrix_tilde[i, 0, :,:] = wracma.Z
init_matrix_tilde[i, 1, :,:] = wracma.C
init_matrix_tilde[i, 2, :,:] = wracma.B
init_matrix_tilde[i, 3, :,:] = wracma.sqrtC
init_matrix_tilde[i, 4, :,:] = wracma.invsqrtC
init_S_tilde[i, :] = wracma.S
init_sigma_tilde[i] = wracma.sigma
idr = np.argsort(self.Fnew)
self.yy[:,:] = y_tilde[:,:]
self.ymean [:,:] = ymean_tilde[:, :]
self.D_y[:,:] = D_y_tilde[:, :]
self.init_matrix[:, :, :, : ] = init_matrix_tilde[:, :, :, : ]
self.init_S[:, :] = init_S_tilde[:, :]
self.init_sigma[:] = init_sigma_tilde[:]
### postprocess part
return idr
def post_process(self, ymember):
"""Post process for following two points
1 : prevent the coordinate-wise standard deviation from becoming smaller than Vmin
2 : prevent the Gaussian distributions of CMA-ES instances from converging to the same point
ymember : int
number of the Gaussian distribution
for i in range(ymember):
cp_D_y = self.D_y[i,:].copy()
for k in range(self.n):
if self.boundy==True:
self.D_y[i, k] = min((self.yb[1, k] - self.yb[0, k])/4/self.init_sigma[i], max(cp_D_y[k], self.Vmin/self.init_sigma[i]))
self.D_y[i, k] = max(cp_D_y[k], self.Vmin/self.init_sigma[i])
for j in range(i+1, ymember):
self.yy[j,:], self.init_matrix[j,:,:,:], self.init_S[j,:], self.init_sigma[j], self.ymean[j,:], self.D_y[j,:] = self.purge(\
self.yy[i,:], self.yy[j,:], self.init_matrix[j,:,:,:], self.init_S[j,:], self.init_sigma[j], self.ymean[j,:], self.D_y[j,:], self.Vmin, j)
def purge(self, ya, jy, jmatrix, jS, jsigma, jmean, jD, Vmin, j):
"""If two Gaussian distribution is closer than Vmin, the distribution is reset. Otherwise, nothing is happen"""
mdistance = abs(math.dist(ya, jy))
if mdistance <= self.Vmin*np.sqrt(self.n):
jmatrix[:,:,:], jS[:], jsigma = self.set_init_dp()
jD[:] = self.init_D_y[j,:]
jmean[:] = self.init_ymean[j,:]
jyz = np.random.randn(self.n)
jyy = jyz * jD + jmean
jy = mirror(jyy, self.yb[0, :], self.yb[1, :])
return jy, jmatrix, jS, jsigma, jmean, jD
## set initial dynamic parameters for DD-CMA-ES
def set_init_dp (self):
"""Initialization of the dynamic parameters for CMA-ES"""
init_matrix = np.zeros ((5, self.n, self.n))
init_S = np.zeros (self.n)
init_matrix[0,:,:] = np.zeros((self.n, self.n)) # Z
init_matrix[1,:,:] = np.eye(self.n) # C
init_matrix[2,:,:] = np.eye(self.n) # B
init_matrix[3,:,:] = np.eye(self.n) # sqrtC
init_matrix[4,:,:] = np.eye(self.n) # invsqrtC
init_S[:] = np.ones(self.n) # S
init_sigma = 1. # sigma
return init_matrix, init_S, init_sigma
def optimize(f, cmaterm, boundx, boundy, lambdax, init_x, init_D_x, maxitr, maxeval, xb,
ymember, yb, init_y, init_ymean, init_D_y, tau_thr, cmax, Vmin, Tmin, logpath='test.txt'):
"""Optimize min-max problem min_x max_y f(x,y) by CMA-ES with WRA
f : callable
min-max objective function f(x, y)
cmaterm : termination criteria for the CMA-ES, 1D array of size 4
cmaterm[0] : minimum sigma
cmaterm[1] : maximum ratio between maximum and minimum eigen value of covariance matrix
cmaterm[2] : maximum ratio between maximum and minimum standard deviasion
cmaterm[3] : minimum value for maximum standard deviasion
boundx and boundy : bool
the domain for design variable or scenario variable is bounded (== initialization interval) if it is True
lambdax : int, (default : 4 + int(3 * log(n)))
population size for the CMA-ES
init_x : 1D array of size n
initial x vector
init_D_x : 1D array, positive
coordinate-wise std for x update
maxitr : integer
maximum number of iterations
maxeval : int
maximum number of f-calls
xb and yb : np.ndarray (2D)
lower and upper bound of the box constraint for x and y, respectively
ymember : int
number of initial distribution for CMA-ES approximately solving internal maximization problem
tau_thr : float
threshold of Kendall's tau for early stopping strategy
cmax : int
parameter for interrupting CMA-ES
If worse scenario is found cmax times, CMA-ES is interrupted.
Vmin : float
termination parameter for maximum standard deviation of CMA-ES
Tmin : int
minimum iteration for CMA-ES
init_y : np.ndarray (2D)
initial candidate vector of scenario variables
init_ymean : np.ndarray (2D)
initial mean vector of CMA-ES for each solution candidate
init_D_y : np.ndarray (2D)
initial coordinate-wise std of CMA-ES for each solution candidate
float : f(x, y) value
numpy.ndarray (1d) : x best
numpy.ndarray (1d) : y worst
list of numpy.ndarray (1d) : list of candidate x
list of numpy.ndarray (1d) : list of candidate y
m = len(init_x)
n = len(init_y[0, :])
# Initialization for CMA-ES
cma = MyDdCma(init_x, init_D_x, lambdax, flg_variance_update=False)
# Initialization for WRA
init_matrix = np.zeros((ymember, 5, n, n))
init_S = np.zeros((ymember, n))
init_sigma = np.zeros(ymember)
for i in range(ymember):
init_matrix[i, 0,:,:] = np.zeros((n, n))
init_matrix[i, 1,:,:] = np.eye(n)
init_matrix[i, 2,:,:] = np.eye(n)
init_matrix[i, 3,:,:] = np.eye(n)
init_matrix[i, 4,:,:] = np.eye(n)
init_S[i, :] = np.ones(n)
init_sigma[i] = 1.
wrar = WRA(f, n, yb, boundy, tau_thr, cmax, Vmin, Tmin, init_y, init_ymean, init_D_y, init_matrix, init_S, init_sigma)
neval = 0
conv = 0
## History
res = np.zeros(2+4*m+lambdax+2)
## Preparation for BIPOP or IPOP CMA-ES
binum = 0
tt = 0
## Preparation of result list
nominalx = []
nominaly = []
# Main Loop
for t in range(maxitr):
# CMA Sampling
arx, ary, arz = cma.sample()
arc = arx.copy()
if boundx==True:
for i in range(cma.lam):
arc [i, :] = mirror (arx[i, :], xb[0, :], xb[1, :])
# wradd_cma
idr = wrar(arc)
neval += wrar.fcalls
# X Update
cma.update(idr, arx, ary, arz)
if boundx==True:
cp_D_x = cma.D.copy()
for j in range(m):
cma.D[j] = min(cp_D_x[j], (xb[1, j] - xb[0, j])/4/cma.sigma)
cmean = cma.xmean
if boundx==True:
cmean = mirror (cma.xmean, xb[0, :], xb[1, :])
# Output
idx = 0
res[idx] = neval; idx += 1
res[idx] = cma.sigma; idx += 1
res[idx:idx+m] = cmean; idx += m
res[idx:idx+m] = cma.coordinate_std; idx += m
res[idx:idx+m] = cma.S; idx += m
res[idx:idx+lambdax] = wrar.Fnew; idx+=m
res[idx] = np.sum(wrar.wraneval[:]); idx +=1
res[idx] = np.sum(wrar.wrac[:]) ; idx +=1
with open(logpath, 'ba') as flog:
np.savetxt(flog, res, newline=' ')
# Termination
if cma.sigma < cmaterm[0]:
conv = 3
if np.max(cma.S) / np.min(cma.S) > cmaterm[1]:
conv = 4
if np.max(cma.coordinate_std) / np.min(cma.coordinate_std) > cmaterm[2]:
conv = 5
if np.max(cma.coordinate_std) < cmaterm[3]:
conv = 6
if neval >= maxeval:
if conv > 1:
# check best solution candidate
for i in range(cma.lam):
wlist=np.zeros(fsize, dtype='int')
for i in range(fsize):
f_arr = np.array([f(nominalx[i], nominaly[j]) for j in range(fsize)])
flist[i] = np.max(f_arr)
wlist[i] = np.argmax(f_arr)
fbest = np.min(flist)
bid = np.argmin(flist)
xbest = nominalx[bid]
yworst = nominaly[wlist[bid]]
return fbest, xbest, yworst, nominalx, nominaly
## test function
a = 1.
b = 100.
c = 1.
def ftest(x, y, a=1, b=10, c=1):
fxy = a/2*, x) + b *, y) - c/2*, y)
return fxy
def f(x, y):
return ftest(x, y, a, b, c)
def worsty(x, yb, b):
m = len(x)
for i in range(m):
worstS[i] = yb[1, i]*np.sign(x[i])
if yb[0,i]/b<=x[i] and x[i]<=yb[1,i]/b:
worstS[i] = b*x[i]
return worstS
def callworst(x, yb):
return worsty(x, yb, b)
if __name__ == "__main__":
import numpy as np
import matplotlib.pyplot as plt
logpre = 'CMA-WRA_ftest'
logpath = logpre + '.txt'
logpathx = logpre + '_x.csv'
logpathy = logpre + '_y.csv'
with open(logpath, 'w'):
with open(logpathx, 'w'):
with open(logpathy, 'w'):
## Experiment
n = 20
m = 20
Vmin = 1e-4
Tmin = 10
tau_thr = 0.7
gamma = 1
cmax = int(n*gamma)
xb = np.zeros((2, m))
xb[0, :] = np.ones(m)*-3
xb[1, :] = np.ones(m)*3
yb = np.zeros((2, n))
yb[0, :] = np.ones(n)*-3
yb[1, :] = np.ones(n)*3
boundx = True
boundy = True
## termination criteria
cmaterm = [1e-6, 1e+6, 1e+6, 1e-6]
## initial parameters for CMA-ES
init_x = np.random.rand(m)*(xb[1, :] - xb[0, :]) + xb[0, :]
init_sigma = (xb[1, :]-xb[0, :])/4
lambdax = (4 + int(3 * math.log(m)))
maxeval = int(5e+06)
maxitr = int(5e+5)
## initial parameters for WRA
ymember = lambdax
init_ymean = np.zeros((ymember, n))
init_ysigma = init_ymean.copy()
init_y = init_ymean.copy()
for i in range(ymember):
init_ymean[i,:] = np.random.rand(n)*(yb[1, :] - yb[0, :]) + yb[0, :]
init_ysigma[i,:] = (yb[1, :]-yb[0, :])/4
yz = np.random.randn(n)
init_y[i, :] = mirror(yz * init_ysigma[i,:] + init_ymean[i,:], yb[0, :], yb[1, :])
fxy, xbest, yworst, x_arr, y_arr = optimize(\
f, cmaterm, boundx, boundy, lambdax, init_x, init_sigma, maxitr, maxeval, xb,
ymember, yb, init_y, init_ymean, init_ysigma, tau_thr, cmax, Vmin, Tmin, logpath = logpath
print ('fxy=', fxy)
print ('xbest=', xbest)
print ('yworst=', yworst)
np.savetxt(logpathx, x_arr)
np.savetxt(logpathy, y_arr)
dat = np.loadtxt(logpath)
optimumx = np.zeros(m)
worstSo= callworst(optimumx, yb)
idp = 2+m
Fgap = np.zeros(len(dat[:, 0]))
for t in range (len(dat[:, 0])):
cmean = dat[t, idp:idp+m]
worstS = callworst(cmean, yb)
Fgap[t] = abs(f(cmean, worstS)-f(optimumx, worstSo))
plt.rcParams[''] ='sans-serif'
plt.rcParams['xtick.direction'] = 'in'
plt.rcParams['ytick.direction'] = 'in'
plt.rcParams['xtick.major.width'] = 5.0
plt.rcParams['ytick.major.width'] = 5.0
plt.rcParams['font.size'] = 12
plt.rcParams['axes.linewidth'] = 1.0
## Plot
ax1 = plt.subplot(221)
ax1.plot(dat[:, 0], Fgap[:], label='$F(m^t) - F(x^*)$')
ax1.plot(dat[:, 0], dat[:, 1], label=r'$\sigma_x$')
ax2 = plt.subplot(222)
ax2.plot(dat[:, 0], dat[:, 2:2+m])
plt.ylabel("Standard deviation")
idp = 2+m
ax3 = plt.subplot(223)
ax3.plot(dat[:, 0], abs(dat[:, idp:idp+m]))
idp +=m
ax4 = plt.subplot(224)
ax4.plot(dat[:, 0], dat[:, idp:idp+m])
plt.ylabel("Eigen value")
plt.savefig('WRA-CMA_ftest_b{0}_cmax{1}.pdf'.format(b, cmax))
