Skip to content

Instantly share code, notes, and snippets.

@terjehaukaas
Last active September 6, 2023 19:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save terjehaukaas/1a4dd7146c70672e1d5524bfda235c49 to your computer and use it in GitHub Desktop.
Save terjehaukaas/1a4dd7146c70672e1d5524bfda235c49 to your computer and use it in GitHub Desktop.
modifyCorrelationMatrix()
# ------------------------------------------------------------------------
# The following Python code is implemented by Professor Terje Haukaas at
# the University of British Columbia in Vancouver, Canada. It is made
# freely available online at terje.civil.ubc.ca together with notes,
# examples, and additional Python code. Please be cautious when using
# this code; it may contain bugs and comes without warranty of any kind.
# ------------------------------------------------------------------------
import numpy as np
from scipy.stats import norm, lognorm, uniform
def modifyCorrelationMatrix(means, stdvs, distributions, correlation):
numRV = len(means)
R0 = np.identity(numRV)
for i in range(len(correlation)):
rv_i = int(correlation[i, 0])
rv_j = int(correlation[i, 1])
rho_original = correlation[i, 2]
mean_i = means[rv_i - 1]
mean_j = means[rv_j - 1]
stdv_i = stdvs[rv_i - 1]
stdv_j = stdvs[rv_j - 1]
distr_i = distributions[rv_i - 1]
distr_j = distributions[rv_j - 1]
# See Appx. 3A in A. Der Kiureghian's 2022 textbook "Structural and System Reliability"
if distr_i == "Normal" and distr_j == "Normal":
rho_new = rho_original
elif distr_i == "Lognormal" and distr_j == "Lognormal":
cov_i = stdv_i / mean_i
cov_j = stdv_j / mean_j
rho_new = np.log(1+rho_original*cov_i*cov_j) / np.sqrt(np.log(1+cov_i**2) * np.log(1+cov_j**2))
elif distr_i == "Uniform" and distr_j == "Uniform":
C = 1.047 - 0.047 * rho_original**2
rho_new = C * rho_original
elif (distr_i == "Normal" and distr_j == "Uniform") or (distr_i == "Uniform" and distr_j == "Normal"):
C = 1.023
rho_new = C * rho_original
elif distr_i == "Normal" and distr_j == "Lognormal":
cov = stdv_j / mean_j
C = cov / np.sqrt(np.log(1+cov**2))
rho_new = C * rho_original
elif distr_i == "Lognormal" and distr_j == "Normal":
cov = stdv_i / mean_i
C = cov / np.sqrt(np.log(1+cov**2))
rho_new = C * rho_original
elif distr_i == "Lognormal" and distr_j == "Uniform":
cov = stdv_i / mean_i
C = 1.019 + 0.014*cov + 0.01*rho_original**2 + 0.249 * cov**2
rho_new = C * rho_original
elif distr_i == "Uniform" and distr_j == "Lognormal":
cov = stdv_j / mean_j
C = 1.019 + 0.014*cov + 0.01*rho_original**2 + 0.249 * cov**2
rho_new = C * rho_original
else:
rho_new = modifyCorrelationCoefficient(mean_i, mean_j, stdv_i, stdv_j, distr_i, distr_j, rho_original)
R0[rv_i-1, rv_j-1] = rho_new
R0[rv_j-1, rv_i-1] = rho_new
return R0
def modifyCorrelationCoefficient(mean_i, mean_j, stdv_i, stdv_j, distr_i, distr_j, rho_original):
tolerance = 1e-6
p = 1e-4
rho_old = rho_original
for i in range(1, 100+1):
# Evaluate the function
f = residualFunction(rho_original, rho_old, mean_i, mean_j, stdv_i, stdv_j, distr_i, distr_j)
# Evaluate the perturbed function
f_perturbed = residualFunction(rho_original, (rho_old+p), mean_i, mean_j, stdv_i, stdv_j, distr_i, distr_j)
# Evaluate derivative of function
df = (f_perturbed - f) / p
# Take a Newton step
rho_new = rho_old - f/df
# Check convergence
if np.abs(1.0 - np.abs(rho_old/rho_new)) < tolerance:
return rho_new
else:
if i==100:
print("Error: The Newton algorithm within the Nataf transformation did not converge")
return 0.0
else:
rho_old = rho_new
def residualFunction(rho_original, rho, mean_i, mean_j, stdv_i, stdv_j, distr_i, distr_j):
return rho_original - doubleIntegral(rho, mean_i, mean_j, stdv_i, stdv_j, distr_i, distr_j)
def doubleIntegral(rho, mean_i, mean_j, stdv_i, stdv_j, distr_i, distr_j):
# The grid of integration points:
# 1, 2, ..., i, ..., 2*n in x-direction with intervals h
# 1, 2, ..., j, ..., 2*m in y-direction with intervals k
# Selected integration boundaries
bound = 5.6
z_i0 = -bound
z_in = bound
z_j0 = -bound
z_jm = bound
# Half the number of integration points
numPoints = 25
n = numPoints
m = numPoints
# Integration pointervals
h = (z_in-z_i0)/(2.0*n)
k = (z_jm-z_j0)/(2.0*m)
# Grid of integration points
z_i = []
z_j = []
for i in range(2*n+1):
z_i.append(z_i0 + i*h)
for j in range(2*m+1):
z_j.append(z_j0 + j*k)
# Computing sums (naming terms according to p. 126 in "Numerical Methods" by Faires & Burden)
term1 = 0.0
term2 = 0.0
term3 = 0.0
term4 = 0.0
term5 = 0.0
term6 = 0.0
term7 = 0.0
term8 = 0.0
term9 = 0.0
term10 = 0.0
term11 = 0.0
term12 = 0.0
term13 = 0.0
term14 = 0.0
term15 = 0.0
term16 = 0.0
term1 = integrand( distr_i, z_i[0] , mean_i, stdv_i, distr_j, z_j[0] , mean_j, stdv_j, rho)
term4 = integrand( distr_i, z_i[2*n] , mean_i, stdv_i, distr_j, z_j[0] , mean_j, stdv_j, rho)
term13 = integrand(distr_i, z_i[0] , mean_i, stdv_i, distr_j, z_j[2*m], mean_j, stdv_j, rho)
term16 = integrand(distr_i, z_i[2*n], mean_i, stdv_i, distr_j, z_j[2*m], mean_j, stdv_j, rho)
for i in range(1, n+1):
term3 += integrand(distr_i, z_i[2*i-1], mean_i, stdv_i, distr_j, z_j[0], mean_j, stdv_j, rho)
term15 += integrand(distr_i, z_i[2*i-1], mean_i, stdv_i, distr_j, z_j[2*m], mean_j, stdv_j, rho)
for i in range(1, n):
term2 += integrand(distr_i, z_i[2*i], mean_i, stdv_i, distr_j, z_j[0], mean_j, stdv_j, rho)
term14 += integrand(distr_i, z_i[2*i], mean_i, stdv_i, distr_j, z_j[2*m], mean_j, stdv_j, rho)
for j in range(1, m):
term5 += integrand(distr_i, z_i[0], mean_i, stdv_i, distr_j, z_j[2*j], mean_j, stdv_j, rho)
term8 += integrand(distr_i, z_i[2*n], mean_i, stdv_i, distr_j, z_j[2*j], mean_j, stdv_j, rho)
for j in range(1, m+1):
term9 += integrand(distr_i, z_i[0], mean_i, stdv_i, distr_j, z_j[2*j-1], mean_j, stdv_j, rho)
term12 += integrand(distr_i, z_i[2*n], mean_i, stdv_i, distr_j, z_j[2*j-1], mean_j, stdv_j, rho)
for j in range(1, m):
for i in range(1, n):
term6 += integrand(distr_i, z_i[2*i], mean_i, stdv_i, distr_j, z_j[2*j], mean_j, stdv_j, rho)
for j in range(1, m):
for i in range(1, n+1):
term7 += integrand(distr_i, z_i[2*i-1], mean_i, stdv_i, distr_j, z_j[2*j], mean_j, stdv_j, rho)
for j in range(1, m+1):
for i in range(1, n):
term10 += integrand(distr_i, z_i[2*i], mean_i, stdv_i, distr_j, z_j[2*j-1], mean_j, stdv_j, rho)
for j in range(1, m+1):
for i in range(1, n+1):
term11 += integrand(distr_i, z_i[2*i-1], mean_i, stdv_i, distr_j, z_j[2*j-1], mean_j, stdv_j, rho)
par1 = term1 + 2.0*term2 + 4.0*term3 + term4
par2 = term5 + 2.0*term6 + 4.0*term7 + term8
par3 = term9 + 2.0*term10 + 4.0*term11 + term12
par4 = term13 + 2.0*term14 + 4.0*term15 + term16
return h*k/9.0 * (par1 + 2.0*par2 + 4.0*par3 + par4)
def integrand(distr_i, z_i, mean_i, stdv_i, distr_j, z_j, mean_j, stdv_j, rho):
x_i = 0
x_j = 0
if distr_i == "Normal":
x_i = z_i * stdv_i + mean_i
elif distr_i == "Lognormal":
mu = np.log(mean_i) - 0.5 * np.log(1 + (stdv_i / mean_i) * (stdv_i / mean_i))
sigma = np.sqrt(np.log((stdv_i / mean_i) * (stdv_i / mean_i) + 1))
x_i = np.exp(z_i * sigma + mu)
elif distr_i == "Uniform":
halfspan = np.sqrt(3) * stdv_i
a = mean_i - halfspan
x_i = uniform.ppf(norm.cdf(z_i), a, 2 * halfspan)
else:
print("Error: Distribution type not found!")
if distr_j == "Normal":
x_j = z_j * stdv_j + mean_j
elif distr_j == "Lognormal":
mu = np.log(mean_j) - 0.5 * np.log(1 + (stdv_j / mean_j) * (stdv_j / mean_j))
sigma = np.sqrt(np.log((stdv_j / mean_j) * (stdv_j / mean_j) + 1))
x_j = np.exp(z_j * sigma + mu)
elif distr_j == "Uniform":
halfspan = np.sqrt(3) * stdv_j
a = mean_j - halfspan
x_j = uniform.ppf(norm.cdf(z_j), a, 2 * halfspan)
else:
print("Error: Distribution type not found!")
return (x_i-mean_i)/stdv_i * (x_j-mean_j)/stdv_j * phi2(z_i, z_j, rho)
def phi2(z_i, z_j, rho):
par = z_i * z_i + z_j * z_j - 2.0 * rho * z_i * z_j
theExp = np.exp(-par/(2.0 * (1.0 - rho * rho)))
return theExp/(2.0*3.14159265359 * np.sqrt(1.0-rho*rho))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment