Last active
September 6, 2023 19:25
-
-
Save terjehaukaas/1a4dd7146c70672e1d5524bfda235c49 to your computer and use it in GitHub Desktop.
modifyCorrelationMatrix()
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ------------------------------------------------------------------------ | |
# The following Python code is implemented by Professor Terje Haukaas at | |
# the University of British Columbia in Vancouver, Canada. It is made | |
# freely available online at terje.civil.ubc.ca together with notes, | |
# examples, and additional Python code. Please be cautious when using | |
# this code; it may contain bugs and comes without warranty of any kind. | |
# ------------------------------------------------------------------------ | |
import numpy as np | |
from scipy.stats import norm, lognorm, uniform | |
def modifyCorrelationMatrix(means, stdvs, distributions, correlation): | |
numRV = len(means) | |
R0 = np.identity(numRV) | |
for i in range(len(correlation)): | |
rv_i = int(correlation[i, 0]) | |
rv_j = int(correlation[i, 1]) | |
rho_original = correlation[i, 2] | |
mean_i = means[rv_i - 1] | |
mean_j = means[rv_j - 1] | |
stdv_i = stdvs[rv_i - 1] | |
stdv_j = stdvs[rv_j - 1] | |
distr_i = distributions[rv_i - 1] | |
distr_j = distributions[rv_j - 1] | |
# See Appx. 3A in A. Der Kiureghian's 2022 textbook "Structural and System Reliability" | |
if distr_i == "Normal" and distr_j == "Normal": | |
rho_new = rho_original | |
elif distr_i == "Lognormal" and distr_j == "Lognormal": | |
cov_i = stdv_i / mean_i | |
cov_j = stdv_j / mean_j | |
rho_new = np.log(1+rho_original*cov_i*cov_j) / np.sqrt(np.log(1+cov_i**2) * np.log(1+cov_j**2)) | |
elif distr_i == "Uniform" and distr_j == "Uniform": | |
C = 1.047 - 0.047 * rho_original**2 | |
rho_new = C * rho_original | |
elif (distr_i == "Normal" and distr_j == "Uniform") or (distr_i == "Uniform" and distr_j == "Normal"): | |
C = 1.023 | |
rho_new = C * rho_original | |
elif distr_i == "Normal" and distr_j == "Lognormal": | |
cov = stdv_j / mean_j | |
C = cov / np.sqrt(np.log(1+cov**2)) | |
rho_new = C * rho_original | |
elif distr_i == "Lognormal" and distr_j == "Normal": | |
cov = stdv_i / mean_i | |
C = cov / np.sqrt(np.log(1+cov**2)) | |
rho_new = C * rho_original | |
elif distr_i == "Lognormal" and distr_j == "Uniform": | |
cov = stdv_i / mean_i | |
C = 1.019 + 0.014*cov + 0.01*rho_original**2 + 0.249 * cov**2 | |
rho_new = C * rho_original | |
elif distr_i == "Uniform" and distr_j == "Lognormal": | |
cov = stdv_j / mean_j | |
C = 1.019 + 0.014*cov + 0.01*rho_original**2 + 0.249 * cov**2 | |
rho_new = C * rho_original | |
else: | |
rho_new = modifyCorrelationCoefficient(mean_i, mean_j, stdv_i, stdv_j, distr_i, distr_j, rho_original) | |
R0[rv_i-1, rv_j-1] = rho_new | |
R0[rv_j-1, rv_i-1] = rho_new | |
return R0 | |
def modifyCorrelationCoefficient(mean_i, mean_j, stdv_i, stdv_j, distr_i, distr_j, rho_original): | |
tolerance = 1e-6 | |
p = 1e-4 | |
rho_old = rho_original | |
for i in range(1, 100+1): | |
# Evaluate the function | |
f = residualFunction(rho_original, rho_old, mean_i, mean_j, stdv_i, stdv_j, distr_i, distr_j) | |
# Evaluate the perturbed function | |
f_perturbed = residualFunction(rho_original, (rho_old+p), mean_i, mean_j, stdv_i, stdv_j, distr_i, distr_j) | |
# Evaluate derivative of function | |
df = (f_perturbed - f) / p | |
# Take a Newton step | |
rho_new = rho_old - f/df | |
# Check convergence | |
if np.abs(1.0 - np.abs(rho_old/rho_new)) < tolerance: | |
return rho_new | |
else: | |
if i==100: | |
print("Error: The Newton algorithm within the Nataf transformation did not converge") | |
return 0.0 | |
else: | |
rho_old = rho_new | |
def residualFunction(rho_original, rho, mean_i, mean_j, stdv_i, stdv_j, distr_i, distr_j): | |
return rho_original - doubleIntegral(rho, mean_i, mean_j, stdv_i, stdv_j, distr_i, distr_j) | |
def doubleIntegral(rho, mean_i, mean_j, stdv_i, stdv_j, distr_i, distr_j): | |
# The grid of integration points: | |
# 1, 2, ..., i, ..., 2*n in x-direction with intervals h | |
# 1, 2, ..., j, ..., 2*m in y-direction with intervals k | |
# Selected integration boundaries | |
bound = 5.6 | |
z_i0 = -bound | |
z_in = bound | |
z_j0 = -bound | |
z_jm = bound | |
# Half the number of integration points | |
numPoints = 25 | |
n = numPoints | |
m = numPoints | |
# Integration pointervals | |
h = (z_in-z_i0)/(2.0*n) | |
k = (z_jm-z_j0)/(2.0*m) | |
# Grid of integration points | |
z_i = [] | |
z_j = [] | |
for i in range(2*n+1): | |
z_i.append(z_i0 + i*h) | |
for j in range(2*m+1): | |
z_j.append(z_j0 + j*k) | |
# Computing sums (naming terms according to p. 126 in "Numerical Methods" by Faires & Burden) | |
term1 = 0.0 | |
term2 = 0.0 | |
term3 = 0.0 | |
term4 = 0.0 | |
term5 = 0.0 | |
term6 = 0.0 | |
term7 = 0.0 | |
term8 = 0.0 | |
term9 = 0.0 | |
term10 = 0.0 | |
term11 = 0.0 | |
term12 = 0.0 | |
term13 = 0.0 | |
term14 = 0.0 | |
term15 = 0.0 | |
term16 = 0.0 | |
term1 = integrand( distr_i, z_i[0] , mean_i, stdv_i, distr_j, z_j[0] , mean_j, stdv_j, rho) | |
term4 = integrand( distr_i, z_i[2*n] , mean_i, stdv_i, distr_j, z_j[0] , mean_j, stdv_j, rho) | |
term13 = integrand(distr_i, z_i[0] , mean_i, stdv_i, distr_j, z_j[2*m], mean_j, stdv_j, rho) | |
term16 = integrand(distr_i, z_i[2*n], mean_i, stdv_i, distr_j, z_j[2*m], mean_j, stdv_j, rho) | |
for i in range(1, n+1): | |
term3 += integrand(distr_i, z_i[2*i-1], mean_i, stdv_i, distr_j, z_j[0], mean_j, stdv_j, rho) | |
term15 += integrand(distr_i, z_i[2*i-1], mean_i, stdv_i, distr_j, z_j[2*m], mean_j, stdv_j, rho) | |
for i in range(1, n): | |
term2 += integrand(distr_i, z_i[2*i], mean_i, stdv_i, distr_j, z_j[0], mean_j, stdv_j, rho) | |
term14 += integrand(distr_i, z_i[2*i], mean_i, stdv_i, distr_j, z_j[2*m], mean_j, stdv_j, rho) | |
for j in range(1, m): | |
term5 += integrand(distr_i, z_i[0], mean_i, stdv_i, distr_j, z_j[2*j], mean_j, stdv_j, rho) | |
term8 += integrand(distr_i, z_i[2*n], mean_i, stdv_i, distr_j, z_j[2*j], mean_j, stdv_j, rho) | |
for j in range(1, m+1): | |
term9 += integrand(distr_i, z_i[0], mean_i, stdv_i, distr_j, z_j[2*j-1], mean_j, stdv_j, rho) | |
term12 += integrand(distr_i, z_i[2*n], mean_i, stdv_i, distr_j, z_j[2*j-1], mean_j, stdv_j, rho) | |
for j in range(1, m): | |
for i in range(1, n): | |
term6 += integrand(distr_i, z_i[2*i], mean_i, stdv_i, distr_j, z_j[2*j], mean_j, stdv_j, rho) | |
for j in range(1, m): | |
for i in range(1, n+1): | |
term7 += integrand(distr_i, z_i[2*i-1], mean_i, stdv_i, distr_j, z_j[2*j], mean_j, stdv_j, rho) | |
for j in range(1, m+1): | |
for i in range(1, n): | |
term10 += integrand(distr_i, z_i[2*i], mean_i, stdv_i, distr_j, z_j[2*j-1], mean_j, stdv_j, rho) | |
for j in range(1, m+1): | |
for i in range(1, n+1): | |
term11 += integrand(distr_i, z_i[2*i-1], mean_i, stdv_i, distr_j, z_j[2*j-1], mean_j, stdv_j, rho) | |
par1 = term1 + 2.0*term2 + 4.0*term3 + term4 | |
par2 = term5 + 2.0*term6 + 4.0*term7 + term8 | |
par3 = term9 + 2.0*term10 + 4.0*term11 + term12 | |
par4 = term13 + 2.0*term14 + 4.0*term15 + term16 | |
return h*k/9.0 * (par1 + 2.0*par2 + 4.0*par3 + par4) | |
def integrand(distr_i, z_i, mean_i, stdv_i, distr_j, z_j, mean_j, stdv_j, rho): | |
x_i = 0 | |
x_j = 0 | |
if distr_i == "Normal": | |
x_i = z_i * stdv_i + mean_i | |
elif distr_i == "Lognormal": | |
mu = np.log(mean_i) - 0.5 * np.log(1 + (stdv_i / mean_i) * (stdv_i / mean_i)) | |
sigma = np.sqrt(np.log((stdv_i / mean_i) * (stdv_i / mean_i) + 1)) | |
x_i = np.exp(z_i * sigma + mu) | |
elif distr_i == "Uniform": | |
halfspan = np.sqrt(3) * stdv_i | |
a = mean_i - halfspan | |
x_i = uniform.ppf(norm.cdf(z_i), a, 2 * halfspan) | |
else: | |
print("Error: Distribution type not found!") | |
if distr_j == "Normal": | |
x_j = z_j * stdv_j + mean_j | |
elif distr_j == "Lognormal": | |
mu = np.log(mean_j) - 0.5 * np.log(1 + (stdv_j / mean_j) * (stdv_j / mean_j)) | |
sigma = np.sqrt(np.log((stdv_j / mean_j) * (stdv_j / mean_j) + 1)) | |
x_j = np.exp(z_j * sigma + mu) | |
elif distr_j == "Uniform": | |
halfspan = np.sqrt(3) * stdv_j | |
a = mean_j - halfspan | |
x_j = uniform.ppf(norm.cdf(z_j), a, 2 * halfspan) | |
else: | |
print("Error: Distribution type not found!") | |
return (x_i-mean_i)/stdv_i * (x_j-mean_j)/stdv_j * phi2(z_i, z_j, rho) | |
def phi2(z_i, z_j, rho): | |
par = z_i * z_i + z_j * z_j - 2.0 * rho * z_i * z_j | |
theExp = np.exp(-par/(2.0 * (1.0 - rho * rho))) | |
return theExp/(2.0*3.14159265359 * np.sqrt(1.0-rho*rho)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment