-
-
Save a2hi6/ac511f101a494197b5fab56a407aa094 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
import numpy as np | |
import matplotlib.pyplot as plt | |
from scipy import stats | |
import ddcma | |
from ddcma import DdCma | |
"""dd-cma.py should be imported from | |
https://gist.github.com/youheiakimoto/1180b67b5a0b1265c204cba991fa8518""" | |
class MyDdCma(DdCma): | |
def set_dynamic_parameters(self, init_matrix, init_S, init_sigma): | |
self.Z = np.array(init_matrix[0,:,:], copy=True) | |
self.C = np.array(init_matrix[1,:,:], copy=True) | |
self.B = np.array(init_matrix[2,:,:], copy=True) | |
self.sqrtC = np.array(init_matrix[3,:,:], copy=True) | |
self.invsqrtC = np.array(init_matrix[4,:,:], copy=True) | |
self.S = np.array(init_S[:], copy=True) | |
self.sigma = init_sigma.copy() | |
pass | |
def mirror(z, lbound, ubound): | |
""" Mirroring constraint handling | |
Parameters | |
---------- | |
z : np.ndarray (1D) | |
solution vector to be mirrorred | |
lbound, ubound : np.ndarray (1D) | |
lower and upper bound of the box constraint | |
Returns | |
------- | |
np.ndarray (1D) : mirrorred solution | |
""" | |
width = ubound - lbound | |
return ubound - np.abs(np.mod(z - lbound, 2 * width) - width) | |
class WRA: | |
""" Worst-case Raking Approximation (WRA) : approximation of the worst-case ranking | |
for the given solution candidates by approximately solving internal maximization problem max_{y} f(x, y) with using CMA-ES | |
""" | |
def __init__(self, f, n, yb, boundy, tau_thr, cmax, Vmin, Tmin, init_y, init_ymean, init_D_y, init_matrix, init_S, init_sigma): | |
""" Initialization of WRA | |
Parameters | |
---------- | |
f : callable f (a solution candidate, a scenario variable) -> float | |
objective function | |
n : int | |
number of dimension for y | |
yb : np.ndarray (2D) | |
lower and upper bound of the box constraint | |
boundy : bool | |
the domain for scenario variable is bounded (== initialization interval) if it is True | |
tau_thr : float | |
threshold of Kendall's tau for early stopping strategy | |
cmax : int | |
parameter for interrupting CMA-ES | |
If worse scenario is found cmax times, CMA-ES is interrupted. | |
Vmin : float | |
termination parameter for maximum standard deviation of CMA-ES | |
Tmin : int | |
minimum iteration for CMA-ES | |
init_y : np.ndarray (2D) | |
initial candidate vector of scenario variables | |
init_ymean : np.ndarray (2D) | |
initial mean vector of CMA-ES for each solution candidate | |
init_D_y : np.ndarray (2D) | |
initial coordinate-wise std of CMA-ES for each solution candidate | |
Initial dynamic parameters of CMA-ES for each solution candidate | |
init_matrix : np.ndarray (4D) | |
init_S : np.ndarray (2D) | |
init_sigma : np.ndarray (1D) | |
Followings are dynamic parameters for solution candidate i | |
init_matrix[i, 0, :,:] = Z | |
init_matrix[i, 1, :,:] = C | |
init_matrix[i, 2, :,:] = B | |
init_matrix[i, 3, :,:] = sqrt.C | |
init_matrix[i, 4, :,:] = invsqrt.C | |
init_S[i, :] = S | |
init_sigma[i] = sigma | |
""" | |
self.f = f | |
self.n = n | |
self.yb = yb | |
self.tau_thr = tau_thr | |
self.cmax = cmax | |
self.Vmin = Vmin | |
self.Tmin = Tmin | |
self.init_ymean = init_ymean | |
self.init_D_y = init_D_y | |
self.yy = init_y | |
self.ymean = init_ymean | |
self.D_y = init_D_y | |
self.init_matrix = init_matrix | |
self.init_S = init_S | |
self.init_sigma = init_sigma | |
self.fcalls = 0 | |
self.boundy = boundy | |
self.tau = 0 | |
self.wrac = [] | |
self.wraneval = [] | |
self.Fnew = [] | |
def __call__(self, arx): | |
return self.wra_ddcma(arx) | |
def wra_ddcma(self, x_t): | |
"""approximating the worst-case ranking of solution candidate | |
Parameters | |
---------- | |
x^t : np.ndarray (2D) | |
solution candidates in iteration t | |
Return | |
idr : np.ndarray (1D) | |
approximated ranking of solution candidates x^t | |
""" | |
ymember = self.yy.shape[0] | |
lambdax = x_t.shape[0] | |
self.fcalls = 0 | |
### Initialization part | |
fx_arr = np.array([[self.f(x, y) for y in self.yy] for x in x_t]) | |
kworst = np.argmax(fx_arr, axis=1) | |
Fold = np.max(fx_arr, axis=1) | |
self.Fnew = Fold.copy() | |
self.fcalls += ymember * lambdax | |
ymean_tilde = np.zeros((lambdax, self.n)) | |
D_y_tilde = np.zeros((lambdax, self.n)) | |
y_tilde = np.zeros((lambdax, self.n)) | |
init_matrix_tilde = np.zeros((lambdax, 5, self.n, self.n)) | |
init_S_tilde = np.ones((lambdax, self.n)) | |
init_sigma_tilde = np.ones(lambdax) | |
for i in range(lambdax): | |
y_tilde[i, :] = self.yy[kworst[i], :] | |
ymean_tilde[i, :] = self.ymean[kworst[i], :] | |
D_y_tilde[i, :] = self.D_y[kworst[i], :] | |
init_matrix_tilde[i, :, :, :] = self.init_matrix[kworst[i], :, :, :] | |
init_S_tilde[i, :] = self.init_S[kworst[i], :] | |
init_sigma_tilde[i] = self.init_sigma[kworst[i]] | |
cma_instances = [MyDdCma(ymean_tilde[i, :], D_y_tilde[i, :], flg_variance_update=False) for i in range(lambdax)] | |
for i in range(lambdax): | |
cma_instances[i].set_dynamic_parameters(init_matrix_tilde[i, :, :, :], init_S_tilde[i, :], init_sigma_tilde[i]) | |
### Worst-case Ranking Approximation | |
h = np.ones(lambdax, dtype=bool) | |
updaten = np.zeros(lambdax) | |
tt = np.zeros(lambdax) | |
self.wrac = np.zeros(lambdax) | |
self.wraneval = np.zeros(lambdax) | |
self.tau = -1.0 | |
while self.tau <= self.tau_thr: | |
for i in range(lambdax): | |
if h[i]==True: | |
wracma = cma_instances[i] | |
Fdash = Fold[i] | |
updaten[i] += 1 | |
c = 0 | |
while c<self.cmax: | |
wrax, wray, wraz = wracma.sample() | |
wracy = np.zeros((wracma.lam, self.n)) | |
fy = np.zeros(wracma.lam) | |
for k in range(wracma.lam): | |
wracy[k, :]=wrax[k, :] | |
if self.boundy==True: | |
wracy[k, :]= mirror(wrax[k, :], self.yb[0, :], self.yb[1, :]) | |
fy[k] = self.f(x_t[i,:], wracy[k,:]) | |
self.fcalls +=1 | |
self.wraneval[i] += 1 | |
if max(fy) > Fdash: | |
yid = np.argmax(fy) | |
y_tilde[i, :] = wracy[yid, :] | |
Fdash = max(fy) | |
c += 1 | |
self.wrac[i] += 1 | |
idy = np.argsort(-fy) | |
wracma.update(idy, wrax, wray, wraz) | |
tt[i] +=1 | |
if self.boundy==True: | |
wraD_y = wracma.D.copy() | |
for k in range(self.n): | |
wracma.D[k] = min(wraD_y[k], (self.yb[1, k] - self.yb[0, k])/4/wracma.sigma) | |
if np.max(wracma.coordinate_std) < self.Vmin and tt[i] >=self.Tmin: | |
h[i] = False | |
break | |
self.Fnew[i] = Fdash | |
self.tau, p_value = stats.kendalltau(self.Fnew, Fold) | |
Fold[:] = self.Fnew[:] | |
for i in range(lambdax): | |
wracma = cma_instances[i] | |
ymean_tilde[i, :] = wracma.xmean | |
D_y_tilde[i, :]= wracma.D | |
init_matrix_tilde[i, 0, :,:] = wracma.Z | |
init_matrix_tilde[i, 1, :,:] = wracma.C | |
init_matrix_tilde[i, 2, :,:] = wracma.B | |
init_matrix_tilde[i, 3, :,:] = wracma.sqrtC | |
init_matrix_tilde[i, 4, :,:] = wracma.invsqrtC | |
init_S_tilde[i, :] = wracma.S | |
init_sigma_tilde[i] = wracma.sigma | |
idr = np.argsort(self.Fnew) | |
self.yy[:,:] = y_tilde[:,:] | |
self.ymean [:,:] = ymean_tilde[:, :] | |
self.D_y[:,:] = D_y_tilde[:, :] | |
self.init_matrix[:, :, :, : ] = init_matrix_tilde[:, :, :, : ] | |
self.init_S[:, :] = init_S_tilde[:, :] | |
self.init_sigma[:] = init_sigma_tilde[:] | |
### postprocess part | |
self.post_process(ymember) | |
return idr | |
def post_process(self, ymember): | |
"""Post process for following two points | |
1 : prevent the coordinate-wise standard deviation from becoming smaller than Vmin | |
2 : prevent the Gaussian distributions of CMA-ES instances from converging to the same point | |
Parameters | |
---------- | |
ymember : int | |
number of the Gaussian distribution | |
""" | |
for i in range(ymember): | |
cp_D_y = self.D_y[i,:].copy() | |
for k in range(self.n): | |
if self.boundy==True: | |
self.D_y[i, k] = min((self.yb[1, k] - self.yb[0, k])/4/self.init_sigma[i], max(cp_D_y[k], self.Vmin/self.init_sigma[i])) | |
else: | |
self.D_y[i, k] = max(cp_D_y[k], self.Vmin/self.init_sigma[i]) | |
for j in range(i+1, ymember): | |
self.yy[j,:], self.init_matrix[j,:,:,:], self.init_S[j,:], self.init_sigma[j], self.ymean[j,:], self.D_y[j,:] = self.purge(\ | |
self.yy[i,:], self.yy[j,:], self.init_matrix[j,:,:,:], self.init_S[j,:], self.init_sigma[j], self.ymean[j,:], self.D_y[j,:], self.Vmin, j) | |
def purge(self, ya, jy, jmatrix, jS, jsigma, jmean, jD, Vmin, j): | |
"""If two Gaussian distribution is closer than Vmin, the distribution is reset. Otherwise, nothing is happen""" | |
mdistance = abs(math.dist(ya, jy)) | |
if mdistance <= self.Vmin*np.sqrt(self.n): | |
jmatrix[:,:,:], jS[:], jsigma = self.set_init_dp() | |
jD[:] = self.init_D_y[j,:] | |
jmean[:] = self.init_ymean[j,:] | |
jyz = np.random.randn(self.n) | |
jyy = jyz * jD + jmean | |
jy = mirror(jyy, self.yb[0, :], self.yb[1, :]) | |
return jy, jmatrix, jS, jsigma, jmean, jD | |
## set initial dynamic parameters for DD-CMA-ES | |
def set_init_dp (self): | |
"""Initialization of the dynamic parameters for CMA-ES""" | |
init_matrix = np.zeros ((5, self.n, self.n)) | |
init_S = np.zeros (self.n) | |
init_matrix[0,:,:] = np.zeros((self.n, self.n)) # Z | |
init_matrix[1,:,:] = np.eye(self.n) # C | |
init_matrix[2,:,:] = np.eye(self.n) # B | |
init_matrix[3,:,:] = np.eye(self.n) # sqrtC | |
init_matrix[4,:,:] = np.eye(self.n) # invsqrtC | |
init_S[:] = np.ones(self.n) # S | |
init_sigma = 1. # sigma | |
return init_matrix, init_S, init_sigma | |
def optimize(f, cmaterm, boundx, boundy, lambdax, init_x, init_D_x, maxitr, maxeval, xb, | |
ymember, yb, init_y, init_ymean, init_D_y, tau_thr, cmax, Vmin, Tmin, logpath='test.txt'): | |
"""Optimize min-max problem min_x max_y f(x,y) by CMA-ES with WRA | |
Parameters | |
---------- | |
f : callable | |
min-max objective function f(x, y) | |
cmaterm : termination criteria for the CMA-ES, 1D array of size 4 | |
cmaterm[0] : minimum sigma | |
cmaterm[1] : maximum ratio between maximum and minimum eigen value of covariance matrix | |
cmaterm[2] : maximum ratio between maximum and minimum standard deviasion | |
cmaterm[3] : minimum value for maximum standard deviasion | |
boundx and boundy : bool | |
the domain for design variable or scenario variable is bounded (== initialization interval) if it is True | |
lambdax : int, (default : 4 + int(3 * log(n))) | |
population size for the CMA-ES | |
init_x : 1D array of size n | |
initial x vector | |
init_D_x : 1D array, positive | |
coordinate-wise std for x update | |
maxitr : integer | |
maximum number of iterations | |
maxeval : int | |
maximum number of f-calls | |
xb and yb : np.ndarray (2D) | |
lower and upper bound of the box constraint for x and y, respectively | |
ymember : int | |
number of initial distribution for CMA-ES approximately solving internal maximization problem | |
tau_thr : float | |
threshold of Kendall's tau for early stopping strategy | |
cmax : int | |
parameter for interrupting CMA-ES | |
If worse scenario is found cmax times, CMA-ES is interrupted. | |
Vmin : float | |
termination parameter for maximum standard deviation of CMA-ES | |
Tmin : int | |
minimum iteration for CMA-ES | |
init_y : np.ndarray (2D) | |
initial candidate vector of scenario variables | |
init_ymean : np.ndarray (2D) | |
initial mean vector of CMA-ES for each solution candidate | |
init_D_y : np.ndarray (2D) | |
initial coordinate-wise std of CMA-ES for each solution candidate | |
Returns | |
------- | |
float : f(x, y) value | |
numpy.ndarray (1d) : x best | |
numpy.ndarray (1d) : y worst | |
list of numpy.ndarray (1d) : list of candidate x | |
list of numpy.ndarray (1d) : list of candidate y | |
""" | |
m = len(init_x) | |
n = len(init_y[0, :]) | |
# Initialization for CMA-ES | |
cma = MyDdCma(init_x, init_D_x, lambdax, flg_variance_update=False) | |
# Initialization for WRA | |
init_matrix = np.zeros((ymember, 5, n, n)) | |
init_S = np.zeros((ymember, n)) | |
init_sigma = np.zeros(ymember) | |
for i in range(ymember): | |
init_matrix[i, 0,:,:] = np.zeros((n, n)) | |
init_matrix[i, 1,:,:] = np.eye(n) | |
init_matrix[i, 2,:,:] = np.eye(n) | |
init_matrix[i, 3,:,:] = np.eye(n) | |
init_matrix[i, 4,:,:] = np.eye(n) | |
init_S[i, :] = np.ones(n) | |
init_sigma[i] = 1. | |
wrar = WRA(f, n, yb, boundy, tau_thr, cmax, Vmin, Tmin, init_y, init_ymean, init_D_y, init_matrix, init_S, init_sigma) | |
neval = 0 | |
conv = 0 | |
## History | |
res = np.zeros(2+4*m+lambdax+2) | |
## Preparation for BIPOP or IPOP CMA-ES | |
binum = 0 | |
tt = 0 | |
## Preparation of result list | |
nominalx = [] | |
nominaly = [] | |
# Main Loop | |
for t in range(maxitr): | |
# CMA Sampling | |
arx, ary, arz = cma.sample() | |
arc = arx.copy() | |
if boundx==True: | |
for i in range(cma.lam): | |
arc [i, :] = mirror (arx[i, :], xb[0, :], xb[1, :]) | |
# wradd_cma | |
idr = wrar(arc) | |
neval += wrar.fcalls | |
# X Update | |
cma.update(idr, arx, ary, arz) | |
if boundx==True: | |
cp_D_x = cma.D.copy() | |
for j in range(m): | |
cma.D[j] = min(cp_D_x[j], (xb[1, j] - xb[0, j])/4/cma.sigma) | |
cmean = cma.xmean | |
if boundx==True: | |
cmean = mirror (cma.xmean, xb[0, :], xb[1, :]) | |
# Output | |
idx = 0 | |
res[idx] = neval; idx += 1 | |
res[idx] = cma.sigma; idx += 1 | |
res[idx:idx+m] = cmean; idx += m | |
res[idx:idx+m] = cma.coordinate_std; idx += m | |
res[idx:idx+m] = cma.S; idx += m | |
res[idx:idx+lambdax] = wrar.Fnew; idx+=m | |
res[idx] = np.sum(wrar.wraneval[:]); idx +=1 | |
res[idx] = np.sum(wrar.wrac[:]) ; idx +=1 | |
with open(logpath, 'ba') as flog: | |
np.savetxt(flog, res, newline=' ') | |
flog.write(b"\n") | |
# Termination | |
if cma.sigma < cmaterm[0]: | |
conv = 3 | |
if np.max(cma.S) / np.min(cma.S) > cmaterm[1]: | |
conv = 4 | |
if np.max(cma.coordinate_std) / np.min(cma.coordinate_std) > cmaterm[2]: | |
conv = 5 | |
if np.max(cma.coordinate_std) < cmaterm[3]: | |
conv = 6 | |
if neval >= maxeval: | |
break | |
if conv > 1: | |
break | |
# check best solution candidate | |
for i in range(cma.lam): | |
nominalx.append(arc[i]) | |
nominaly.append(wrar.yy[i]) | |
fsize=len(nominalx) | |
flist=np.zeros(fsize) | |
wlist=np.zeros(fsize, dtype='int') | |
for i in range(fsize): | |
f_arr = np.array([f(nominalx[i], nominaly[j]) for j in range(fsize)]) | |
flist[i] = np.max(f_arr) | |
wlist[i] = np.argmax(f_arr) | |
fbest = np.min(flist) | |
bid = np.argmin(flist) | |
xbest = nominalx[bid] | |
yworst = nominaly[wlist[bid]] | |
return fbest, xbest, yworst, nominalx, nominaly | |
## test function | |
a = 1. | |
b = 100. | |
c = 1. | |
def ftest(x, y, a=1, b=10, c=1): | |
fxy = a/2*np.dot(x, x) + b * np.dot(x, y) - c/2*np.dot(y, y) | |
return fxy | |
def f(x, y): | |
return ftest(x, y, a, b, c) | |
def worsty(x, yb, b): | |
m = len(x) | |
worstS=np.zeros(m) | |
for i in range(m): | |
worstS[i] = yb[1, i]*np.sign(x[i]) | |
if yb[0,i]/b<=x[i] and x[i]<=yb[1,i]/b: | |
worstS[i] = b*x[i] | |
return worstS | |
def callworst(x, yb): | |
return worsty(x, yb, b) | |
if __name__ == "__main__": | |
import numpy as np | |
import matplotlib.pyplot as plt | |
logpre = 'CMA-WRA_ftest' | |
logpath = logpre + '.txt' | |
logpathx = logpre + '_x.csv' | |
logpathy = logpre + '_y.csv' | |
with open(logpath, 'w'): | |
pass | |
with open(logpathx, 'w'): | |
pass | |
with open(logpathy, 'w'): | |
pass | |
## Experiment | |
n = 20 | |
m = 20 | |
Vmin = 1e-4 | |
Tmin = 10 | |
tau_thr = 0.7 | |
gamma = 1 | |
cmax = int(n*gamma) | |
xb = np.zeros((2, m)) | |
xb[0, :] = np.ones(m)*-3 | |
xb[1, :] = np.ones(m)*3 | |
yb = np.zeros((2, n)) | |
yb[0, :] = np.ones(n)*-3 | |
yb[1, :] = np.ones(n)*3 | |
boundx = True | |
boundy = True | |
## termination criteria | |
cmaterm = [1e-6, 1e+6, 1e+6, 1e-6] | |
## initial parameters for CMA-ES | |
init_x = np.random.rand(m)*(xb[1, :] - xb[0, :]) + xb[0, :] | |
init_sigma = (xb[1, :]-xb[0, :])/4 | |
lambdax = (4 + int(3 * math.log(m))) | |
maxeval = int(5e+06) | |
maxitr = int(5e+5) | |
## initial parameters for WRA | |
ymember = lambdax | |
init_ymean = np.zeros((ymember, n)) | |
init_ysigma = init_ymean.copy() | |
init_y = init_ymean.copy() | |
for i in range(ymember): | |
init_ymean[i,:] = np.random.rand(n)*(yb[1, :] - yb[0, :]) + yb[0, :] | |
init_ysigma[i,:] = (yb[1, :]-yb[0, :])/4 | |
yz = np.random.randn(n) | |
init_y[i, :] = mirror(yz * init_ysigma[i,:] + init_ymean[i,:], yb[0, :], yb[1, :]) | |
fxy, xbest, yworst, x_arr, y_arr = optimize(\ | |
f, cmaterm, boundx, boundy, lambdax, init_x, init_sigma, maxitr, maxeval, xb, | |
ymember, yb, init_y, init_ymean, init_ysigma, tau_thr, cmax, Vmin, Tmin, logpath = logpath | |
) | |
print ('fxy=', fxy) | |
print ('xbest=', xbest) | |
print ('yworst=', yworst) | |
np.savetxt(logpathx, x_arr) | |
np.savetxt(logpathy, y_arr) | |
dat = np.loadtxt(logpath) | |
optimumx = np.zeros(m) | |
worstSo= callworst(optimumx, yb) | |
idp = 2+m | |
Fgap = np.zeros(len(dat[:, 0])) | |
for t in range (len(dat[:, 0])): | |
cmean = dat[t, idp:idp+m] | |
worstS = callworst(cmean, yb) | |
Fgap[t] = abs(f(cmean, worstS)-f(optimumx, worstSo)) | |
plt.rcParams['font.family'] ='sans-serif' | |
plt.rcParams['xtick.direction'] = 'in' | |
plt.rcParams['ytick.direction'] = 'in' | |
plt.rcParams['xtick.major.width'] = 5.0 | |
plt.rcParams['ytick.major.width'] = 5.0 | |
plt.rcParams['font.size'] = 12 | |
plt.rcParams['axes.linewidth'] = 1.0 | |
plt.figure(figsize=(10,7)) | |
## Plot | |
idp=0 | |
ax1 = plt.subplot(221) | |
ax1.plot(dat[:, 0], Fgap[:], label='$F(m^t) - F(x^*)$') | |
ax1.plot(dat[:, 0], dat[:, 1], label=r'$\sigma_x$') | |
plt.xlabel("#f-calls") | |
plt.yscale('log') | |
plt.legend() | |
plt.grid() | |
ax2 = plt.subplot(222) | |
ax2.plot(dat[:, 0], dat[:, 2:2+m]) | |
plt.xlabel("#f-calls") | |
plt.ylabel("Standard deviation") | |
plt.yscale('log') | |
plt.grid() | |
idp = 2+m | |
ax3 = plt.subplot(223) | |
ax3.plot(dat[:, 0], abs(dat[:, idp:idp+m])) | |
plt.xlabel("#f-calls") | |
plt.ylabel("$m^t$") | |
plt.yscale('log') | |
plt.grid() | |
idp +=m | |
ax4 = plt.subplot(224) | |
ax4.plot(dat[:, 0], dat[:, idp:idp+m]) | |
plt.xlabel("#f-calls") | |
plt.ylabel("Eigen value") | |
plt.yscale('log') | |
plt.grid() | |
plt.tight_layout() | |
plt.savefig('WRA-CMA_ftest_b{0}_cmax{1}.pdf'.format(b, cmax)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment