Skip to content

Instantly share code, notes, and snippets.

@a2hi6
Last active February 10, 2023 11:11
Show Gist options
  • Save a2hi6/ac511f101a494197b5fab56a407aa094 to your computer and use it in GitHub Desktop.
Save a2hi6/ac511f101a494197b5fab56a407aa094 to your computer and use it in GitHub Desktop.
import math
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
import ddcma
from ddcma import DdCma
"""dd-cma.py should be imported from
https://gist.github.com/youheiakimoto/1180b67b5a0b1265c204cba991fa8518"""
class MyDdCma(DdCma):
def set_dynamic_parameters(self, init_matrix, init_S, init_sigma):
self.Z = np.array(init_matrix[0,:,:], copy=True)
self.C = np.array(init_matrix[1,:,:], copy=True)
self.B = np.array(init_matrix[2,:,:], copy=True)
self.sqrtC = np.array(init_matrix[3,:,:], copy=True)
self.invsqrtC = np.array(init_matrix[4,:,:], copy=True)
self.S = np.array(init_S[:], copy=True)
self.sigma = init_sigma.copy()
pass
def mirror(z, lbound, ubound):
""" Mirroring constraint handling
Parameters
----------
z : np.ndarray (1D)
solution vector to be mirrorred
lbound, ubound : np.ndarray (1D)
lower and upper bound of the box constraint
Returns
-------
np.ndarray (1D) : mirrorred solution
"""
width = ubound - lbound
return ubound - np.abs(np.mod(z - lbound, 2 * width) - width)
class WRA:
""" Worst-case Raking Approximation (WRA) : approximation of the worst-case ranking
for the given solution candidates by approximately solving internal maximization problem max_{y} f(x, y) with using CMA-ES
"""
def __init__(self, f, n, yb, boundy, tau_thr, cmax, Vmin, Tmin, init_y, init_ymean, init_D_y, init_matrix, init_S, init_sigma):
""" Initialization of WRA
Parameters
----------
f : callable f (a solution candidate, a scenario variable) -> float
objective function
n : int
number of dimension for y
yb : np.ndarray (2D)
lower and upper bound of the box constraint
boundy : bool
the domain for scenario variable is bounded (== initialization interval) if it is True
tau_thr : float
threshold of Kendall's tau for early stopping strategy
cmax : int
parameter for interrupting CMA-ES
If worse scenario is found cmax times, CMA-ES is interrupted.
Vmin : float
termination parameter for maximum standard deviation of CMA-ES
Tmin : int
minimum iteration for CMA-ES
init_y : np.ndarray (2D)
initial candidate vector of scenario variables
init_ymean : np.ndarray (2D)
initial mean vector of CMA-ES for each solution candidate
init_D_y : np.ndarray (2D)
initial coordinate-wise std of CMA-ES for each solution candidate
Initial dynamic parameters of CMA-ES for each solution candidate
init_matrix : np.ndarray (4D)
init_S : np.ndarray (2D)
init_sigma : np.ndarray (1D)
Followings are dynamic parameters for solution candidate i
init_matrix[i, 0, :,:] = Z
init_matrix[i, 1, :,:] = C
init_matrix[i, 2, :,:] = B
init_matrix[i, 3, :,:] = sqrt.C
init_matrix[i, 4, :,:] = invsqrt.C
init_S[i, :] = S
init_sigma[i] = sigma
"""
self.f = f
self.n = n
self.yb = yb
self.tau_thr = tau_thr
self.cmax = cmax
self.Vmin = Vmin
self.Tmin = Tmin
self.init_ymean = init_ymean
self.init_D_y = init_D_y
self.yy = init_y
self.ymean = init_ymean
self.D_y = init_D_y
self.init_matrix = init_matrix
self.init_S = init_S
self.init_sigma = init_sigma
self.fcalls = 0
self.boundy = boundy
self.tau = 0
self.wrac = []
self.wraneval = []
self.Fnew = []
def __call__(self, arx):
return self.wra_ddcma(arx)
def wra_ddcma(self, x_t):
"""approximating the worst-case ranking of solution candidate
Parameters
----------
x^t : np.ndarray (2D)
solution candidates in iteration t
Return
idr : np.ndarray (1D)
approximated ranking of solution candidates x^t
"""
ymember = self.yy.shape[0]
lambdax = x_t.shape[0]
self.fcalls = 0
### Initialization part
fx_arr = np.array([[self.f(x, y) for y in self.yy] for x in x_t])
kworst = np.argmax(fx_arr, axis=1)
Fold = np.max(fx_arr, axis=1)
self.Fnew = Fold.copy()
self.fcalls += ymember * lambdax
ymean_tilde = np.zeros((lambdax, self.n))
D_y_tilde = np.zeros((lambdax, self.n))
y_tilde = np.zeros((lambdax, self.n))
init_matrix_tilde = np.zeros((lambdax, 5, self.n, self.n))
init_S_tilde = np.ones((lambdax, self.n))
init_sigma_tilde = np.ones(lambdax)
for i in range(lambdax):
y_tilde[i, :] = self.yy[kworst[i], :]
ymean_tilde[i, :] = self.ymean[kworst[i], :]
D_y_tilde[i, :] = self.D_y[kworst[i], :]
init_matrix_tilde[i, :, :, :] = self.init_matrix[kworst[i], :, :, :]
init_S_tilde[i, :] = self.init_S[kworst[i], :]
init_sigma_tilde[i] = self.init_sigma[kworst[i]]
cma_instances = [MyDdCma(ymean_tilde[i, :], D_y_tilde[i, :], flg_variance_update=False) for i in range(lambdax)]
for i in range(lambdax):
cma_instances[i].set_dynamic_parameters(init_matrix_tilde[i, :, :, :], init_S_tilde[i, :], init_sigma_tilde[i])
### Worst-case Ranking Approximation
h = np.ones(lambdax, dtype=bool)
updaten = np.zeros(lambdax)
tt = np.zeros(lambdax)
self.wrac = np.zeros(lambdax)
self.wraneval = np.zeros(lambdax)
self.tau = -1.0
while self.tau <= self.tau_thr:
for i in range(lambdax):
if h[i]==True:
wracma = cma_instances[i]
Fdash = Fold[i]
updaten[i] += 1
c = 0
while c<self.cmax:
wrax, wray, wraz = wracma.sample()
wracy = np.zeros((wracma.lam, self.n))
fy = np.zeros(wracma.lam)
for k in range(wracma.lam):
wracy[k, :]=wrax[k, :]
if self.boundy==True:
wracy[k, :]= mirror(wrax[k, :], self.yb[0, :], self.yb[1, :])
fy[k] = self.f(x_t[i,:], wracy[k,:])
self.fcalls +=1
self.wraneval[i] += 1
if max(fy) > Fdash:
yid = np.argmax(fy)
y_tilde[i, :] = wracy[yid, :]
Fdash = max(fy)
c += 1
self.wrac[i] += 1
idy = np.argsort(-fy)
wracma.update(idy, wrax, wray, wraz)
tt[i] +=1
if self.boundy==True:
wraD_y = wracma.D.copy()
for k in range(self.n):
wracma.D[k] = min(wraD_y[k], (self.yb[1, k] - self.yb[0, k])/4/wracma.sigma)
if np.max(wracma.coordinate_std) < self.Vmin and tt[i] >=self.Tmin:
h[i] = False
break
self.Fnew[i] = Fdash
self.tau, p_value = stats.kendalltau(self.Fnew, Fold)
Fold[:] = self.Fnew[:]
for i in range(lambdax):
wracma = cma_instances[i]
ymean_tilde[i, :] = wracma.xmean
D_y_tilde[i, :]= wracma.D
init_matrix_tilde[i, 0, :,:] = wracma.Z
init_matrix_tilde[i, 1, :,:] = wracma.C
init_matrix_tilde[i, 2, :,:] = wracma.B
init_matrix_tilde[i, 3, :,:] = wracma.sqrtC
init_matrix_tilde[i, 4, :,:] = wracma.invsqrtC
init_S_tilde[i, :] = wracma.S
init_sigma_tilde[i] = wracma.sigma
idr = np.argsort(self.Fnew)
self.yy[:,:] = y_tilde[:,:]
self.ymean [:,:] = ymean_tilde[:, :]
self.D_y[:,:] = D_y_tilde[:, :]
self.init_matrix[:, :, :, : ] = init_matrix_tilde[:, :, :, : ]
self.init_S[:, :] = init_S_tilde[:, :]
self.init_sigma[:] = init_sigma_tilde[:]
### postprocess part
self.post_process(ymember)
return idr
def post_process(self, ymember):
"""Post process for following two points
1 : prevent the coordinate-wise standard deviation from becoming smaller than Vmin
2 : prevent the Gaussian distributions of CMA-ES instances from converging to the same point
Parameters
----------
ymember : int
number of the Gaussian distribution
"""
for i in range(ymember):
cp_D_y = self.D_y[i,:].copy()
for k in range(self.n):
if self.boundy==True:
self.D_y[i, k] = min((self.yb[1, k] - self.yb[0, k])/4/self.init_sigma[i], max(cp_D_y[k], self.Vmin/self.init_sigma[i]))
else:
self.D_y[i, k] = max(cp_D_y[k], self.Vmin/self.init_sigma[i])
for j in range(i+1, ymember):
self.yy[j,:], self.init_matrix[j,:,:,:], self.init_S[j,:], self.init_sigma[j], self.ymean[j,:], self.D_y[j,:] = self.purge(\
self.yy[i,:], self.yy[j,:], self.init_matrix[j,:,:,:], self.init_S[j,:], self.init_sigma[j], self.ymean[j,:], self.D_y[j,:], self.Vmin, j)
def purge(self, ya, jy, jmatrix, jS, jsigma, jmean, jD, Vmin, j):
"""If two Gaussian distribution is closer than Vmin, the distribution is reset. Otherwise, nothing is happen"""
mdistance = abs(math.dist(ya, jy))
if mdistance <= self.Vmin*np.sqrt(self.n):
jmatrix[:,:,:], jS[:], jsigma = self.set_init_dp()
jD[:] = self.init_D_y[j,:]
jmean[:] = self.init_ymean[j,:]
jyz = np.random.randn(self.n)
jyy = jyz * jD + jmean
jy = mirror(jyy, self.yb[0, :], self.yb[1, :])
return jy, jmatrix, jS, jsigma, jmean, jD
## set initial dynamic parameters for DD-CMA-ES
def set_init_dp (self):
"""Initialization of the dynamic parameters for CMA-ES"""
init_matrix = np.zeros ((5, self.n, self.n))
init_S = np.zeros (self.n)
init_matrix[0,:,:] = np.zeros((self.n, self.n)) # Z
init_matrix[1,:,:] = np.eye(self.n) # C
init_matrix[2,:,:] = np.eye(self.n) # B
init_matrix[3,:,:] = np.eye(self.n) # sqrtC
init_matrix[4,:,:] = np.eye(self.n) # invsqrtC
init_S[:] = np.ones(self.n) # S
init_sigma = 1. # sigma
return init_matrix, init_S, init_sigma
def optimize(f, cmaterm, boundx, boundy, lambdax, init_x, init_D_x, maxitr, maxeval, xb,
ymember, yb, init_y, init_ymean, init_D_y, tau_thr, cmax, Vmin, Tmin, logpath='test.txt'):
"""Optimize min-max problem min_x max_y f(x,y) by CMA-ES with WRA
Parameters
----------
f : callable
min-max objective function f(x, y)
cmaterm : termination criteria for the CMA-ES, 1D array of size 4
cmaterm[0] : minimum sigma
cmaterm[1] : maximum ratio between maximum and minimum eigen value of covariance matrix
cmaterm[2] : maximum ratio between maximum and minimum standard deviasion
cmaterm[3] : minimum value for maximum standard deviasion
boundx and boundy : bool
the domain for design variable or scenario variable is bounded (== initialization interval) if it is True
lambdax : int, (default : 4 + int(3 * log(n)))
population size for the CMA-ES
init_x : 1D array of size n
initial x vector
init_D_x : 1D array, positive
coordinate-wise std for x update
maxitr : integer
maximum number of iterations
maxeval : int
maximum number of f-calls
xb and yb : np.ndarray (2D)
lower and upper bound of the box constraint for x and y, respectively
ymember : int
number of initial distribution for CMA-ES approximately solving internal maximization problem
tau_thr : float
threshold of Kendall's tau for early stopping strategy
cmax : int
parameter for interrupting CMA-ES
If worse scenario is found cmax times, CMA-ES is interrupted.
Vmin : float
termination parameter for maximum standard deviation of CMA-ES
Tmin : int
minimum iteration for CMA-ES
init_y : np.ndarray (2D)
initial candidate vector of scenario variables
init_ymean : np.ndarray (2D)
initial mean vector of CMA-ES for each solution candidate
init_D_y : np.ndarray (2D)
initial coordinate-wise std of CMA-ES for each solution candidate
Returns
-------
float : f(x, y) value
numpy.ndarray (1d) : x best
numpy.ndarray (1d) : y worst
list of numpy.ndarray (1d) : list of candidate x
list of numpy.ndarray (1d) : list of candidate y
"""
m = len(init_x)
n = len(init_y[0, :])
# Initialization for CMA-ES
cma = MyDdCma(init_x, init_D_x, lambdax, flg_variance_update=False)
# Initialization for WRA
init_matrix = np.zeros((ymember, 5, n, n))
init_S = np.zeros((ymember, n))
init_sigma = np.zeros(ymember)
for i in range(ymember):
init_matrix[i, 0,:,:] = np.zeros((n, n))
init_matrix[i, 1,:,:] = np.eye(n)
init_matrix[i, 2,:,:] = np.eye(n)
init_matrix[i, 3,:,:] = np.eye(n)
init_matrix[i, 4,:,:] = np.eye(n)
init_S[i, :] = np.ones(n)
init_sigma[i] = 1.
wrar = WRA(f, n, yb, boundy, tau_thr, cmax, Vmin, Tmin, init_y, init_ymean, init_D_y, init_matrix, init_S, init_sigma)
neval = 0
conv = 0
## History
res = np.zeros(2+4*m+lambdax+2)
## Preparation for BIPOP or IPOP CMA-ES
binum = 0
tt = 0
## Preparation of result list
nominalx = []
nominaly = []
# Main Loop
for t in range(maxitr):
# CMA Sampling
arx, ary, arz = cma.sample()
arc = arx.copy()
if boundx==True:
for i in range(cma.lam):
arc [i, :] = mirror (arx[i, :], xb[0, :], xb[1, :])
# wradd_cma
idr = wrar(arc)
neval += wrar.fcalls
# X Update
cma.update(idr, arx, ary, arz)
if boundx==True:
cp_D_x = cma.D.copy()
for j in range(m):
cma.D[j] = min(cp_D_x[j], (xb[1, j] - xb[0, j])/4/cma.sigma)
cmean = cma.xmean
if boundx==True:
cmean = mirror (cma.xmean, xb[0, :], xb[1, :])
# Output
idx = 0
res[idx] = neval; idx += 1
res[idx] = cma.sigma; idx += 1
res[idx:idx+m] = cmean; idx += m
res[idx:idx+m] = cma.coordinate_std; idx += m
res[idx:idx+m] = cma.S; idx += m
res[idx:idx+lambdax] = wrar.Fnew; idx+=m
res[idx] = np.sum(wrar.wraneval[:]); idx +=1
res[idx] = np.sum(wrar.wrac[:]) ; idx +=1
with open(logpath, 'ba') as flog:
np.savetxt(flog, res, newline=' ')
flog.write(b"\n")
# Termination
if cma.sigma < cmaterm[0]:
conv = 3
if np.max(cma.S) / np.min(cma.S) > cmaterm[1]:
conv = 4
if np.max(cma.coordinate_std) / np.min(cma.coordinate_std) > cmaterm[2]:
conv = 5
if np.max(cma.coordinate_std) < cmaterm[3]:
conv = 6
if neval >= maxeval:
break
if conv > 1:
break
# check best solution candidate
for i in range(cma.lam):
nominalx.append(arc[i])
nominaly.append(wrar.yy[i])
fsize=len(nominalx)
flist=np.zeros(fsize)
wlist=np.zeros(fsize, dtype='int')
for i in range(fsize):
f_arr = np.array([f(nominalx[i], nominaly[j]) for j in range(fsize)])
flist[i] = np.max(f_arr)
wlist[i] = np.argmax(f_arr)
fbest = np.min(flist)
bid = np.argmin(flist)
xbest = nominalx[bid]
yworst = nominaly[wlist[bid]]
return fbest, xbest, yworst, nominalx, nominaly
## test function
a = 1.
b = 100.
c = 1.
def ftest(x, y, a=1, b=10, c=1):
fxy = a/2*np.dot(x, x) + b * np.dot(x, y) - c/2*np.dot(y, y)
return fxy
def f(x, y):
return ftest(x, y, a, b, c)
def worsty(x, yb, b):
m = len(x)
worstS=np.zeros(m)
for i in range(m):
worstS[i] = yb[1, i]*np.sign(x[i])
if yb[0,i]/b<=x[i] and x[i]<=yb[1,i]/b:
worstS[i] = b*x[i]
return worstS
def callworst(x, yb):
return worsty(x, yb, b)
if __name__ == "__main__":
import numpy as np
import matplotlib.pyplot as plt
logpre = 'CMA-WRA_ftest'
logpath = logpre + '.txt'
logpathx = logpre + '_x.csv'
logpathy = logpre + '_y.csv'
with open(logpath, 'w'):
pass
with open(logpathx, 'w'):
pass
with open(logpathy, 'w'):
pass
## Experiment
n = 20
m = 20
Vmin = 1e-4
Tmin = 10
tau_thr = 0.7
gamma = 1
cmax = int(n*gamma)
xb = np.zeros((2, m))
xb[0, :] = np.ones(m)*-3
xb[1, :] = np.ones(m)*3
yb = np.zeros((2, n))
yb[0, :] = np.ones(n)*-3
yb[1, :] = np.ones(n)*3
boundx = True
boundy = True
## termination criteria
cmaterm = [1e-6, 1e+6, 1e+6, 1e-6]
## initial parameters for CMA-ES
init_x = np.random.rand(m)*(xb[1, :] - xb[0, :]) + xb[0, :]
init_sigma = (xb[1, :]-xb[0, :])/4
lambdax = (4 + int(3 * math.log(m)))
maxeval = int(5e+06)
maxitr = int(5e+5)
## initial parameters for WRA
ymember = lambdax
init_ymean = np.zeros((ymember, n))
init_ysigma = init_ymean.copy()
init_y = init_ymean.copy()
for i in range(ymember):
init_ymean[i,:] = np.random.rand(n)*(yb[1, :] - yb[0, :]) + yb[0, :]
init_ysigma[i,:] = (yb[1, :]-yb[0, :])/4
yz = np.random.randn(n)
init_y[i, :] = mirror(yz * init_ysigma[i,:] + init_ymean[i,:], yb[0, :], yb[1, :])
fxy, xbest, yworst, x_arr, y_arr = optimize(\
f, cmaterm, boundx, boundy, lambdax, init_x, init_sigma, maxitr, maxeval, xb,
ymember, yb, init_y, init_ymean, init_ysigma, tau_thr, cmax, Vmin, Tmin, logpath = logpath
)
print ('fxy=', fxy)
print ('xbest=', xbest)
print ('yworst=', yworst)
np.savetxt(logpathx, x_arr)
np.savetxt(logpathy, y_arr)
dat = np.loadtxt(logpath)
optimumx = np.zeros(m)
worstSo= callworst(optimumx, yb)
idp = 2+m
Fgap = np.zeros(len(dat[:, 0]))
for t in range (len(dat[:, 0])):
cmean = dat[t, idp:idp+m]
worstS = callworst(cmean, yb)
Fgap[t] = abs(f(cmean, worstS)-f(optimumx, worstSo))
plt.rcParams['font.family'] ='sans-serif'
plt.rcParams['xtick.direction'] = 'in'
plt.rcParams['ytick.direction'] = 'in'
plt.rcParams['xtick.major.width'] = 5.0
plt.rcParams['ytick.major.width'] = 5.0
plt.rcParams['font.size'] = 12
plt.rcParams['axes.linewidth'] = 1.0
plt.figure(figsize=(10,7))
## Plot
idp=0
ax1 = plt.subplot(221)
ax1.plot(dat[:, 0], Fgap[:], label='$F(m^t) - F(x^*)$')
ax1.plot(dat[:, 0], dat[:, 1], label=r'$\sigma_x$')
plt.xlabel("#f-calls")
plt.yscale('log')
plt.legend()
plt.grid()
ax2 = plt.subplot(222)
ax2.plot(dat[:, 0], dat[:, 2:2+m])
plt.xlabel("#f-calls")
plt.ylabel("Standard deviation")
plt.yscale('log')
plt.grid()
idp = 2+m
ax3 = plt.subplot(223)
ax3.plot(dat[:, 0], abs(dat[:, idp:idp+m]))
plt.xlabel("#f-calls")
plt.ylabel("$m^t$")
plt.yscale('log')
plt.grid()
idp +=m
ax4 = plt.subplot(224)
ax4.plot(dat[:, 0], dat[:, idp:idp+m])
plt.xlabel("#f-calls")
plt.ylabel("Eigen value")
plt.yscale('log')
plt.grid()
plt.tight_layout()
plt.savefig('WRA-CMA_ftest_b{0}_cmax{1}.pdf'.format(b, cmax))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment