Created
May 28, 2018 03:26
-
-
Save caesoma/ee16f5fbcca8c9dfb9eb03cf34837896 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import numpy | |
from numpy.random import normal as rnormal | |
from matplotlib.pyplot import figure, plot, show | |
import matplotlib.pyplot as pyplot | |
# Define the kernel functions | |
def se_kernel_pt(xp, xq, la, lb): | |
"""squared exponential kernel for a pair of points """ | |
corr = numpy.exp(-(numpy.abs(xp-xq)**2)/((la**2)+(lb**2))) | |
return corr | |
# functions to compute the 'K' matrices of the gaussian processes | |
def compute_multi_kss_matrix(xTest, params, Kf): | |
""" Computes the covariance matrix for test points of multiple channels, multiple bandwidths, and a positive semi-definite matrix with process and "cross-process" variance. M matrices are produced. """ | |
M = xTest.shape[0] # channels | |
N = xTest.shape[1] # number of data points for all channels (assumes they are equal) | |
# empty matrix with terms computed on the loop below | |
Kss = numpy.full([M,N, N], -numpy.Infinity) | |
# compute each K_l,k matrix terms of the K** matrix | |
for l in range(0,M): | |
for k in range(0,M): | |
la, lb = params[l], params[l] | |
for xpq in range(0,N): | |
Kss[l][xpq][xpq] = Kf[l][l]*1 # computes values on the diagonal, with correlation equal to unity | |
for xp in range(0,N): | |
for xq in range(xp+1,N): | |
Kss[l][xp][xq] = Kf[l][l]*se_kernel_pt(xTest[l][xp], xTest[l][xq], la, lb) | |
Kss[l][xq][xp] = Kss[l][xp][xq] # computes terms off of the diagonal, and their symmetric values ont he opposite side of the diagonal | |
return Kss | |
def compute_multi_k_matrix(xTrain, params, Kf): | |
""" computes the covariance matrix for training points of multiple channels, multiple bandwidths, and a positive semi-definite matrix with process and "cross-process" variance """ | |
M = xTrain.shape[0] | |
N = xTrain.shape[1] | |
ylen = M*N | |
K = numpy.full([M*N, M*N], -numpy.Infinity) | |
for l in range(0,M): | |
subl = l*N | |
for k in range(0,M): | |
subk = k*N | |
la, lb = params[l], params[k] | |
for xp in range(0,N): | |
for xq in range(xp, N): | |
K[subl+xp][subk+xq] = Kf[l][k]*se_kernel_pt(xTrain[l][xp], xTrain[k][xq], la, lb) | |
K[subk+xq][subl+xp] = K[subl+xp][subk+xq] | |
return K | |
def compute_multi_ks_matrix(xTrain, xTest, params, Kf): | |
""" computes the coavariance matrix between training and test points of multiple channels, with the appropriate cross-channel parameters """ | |
M = xTrain.shape[0] | |
N = xTrain.shape[1] | |
testN = xTest.shape[1] | |
ylen = M*N | |
Ks = numpy.full([M, testN, M*N], -numpy.Infinity) | |
for l in range(0,M): | |
lb = params[l] | |
for xp in range(0,testN): | |
for trn in range(0,M): | |
la = params[trn] | |
subtrn = trn*N | |
for xq in range(N): | |
Ks[l][xp][subtrn+xq] = Kf[l][trn]*se_kernel_pt(xTest[l][xp],xTrain[trn][xq], la, lb) | |
return Ks | |
# Training data | |
mu = 0 # mean is assumed to be zero, but does not have to be | |
sigman = 1 # standard deviation value from the real function values | |
# x training point values for each channel | |
xTrain1 = numpy.arange(2.05, 9.05, 1) | |
xTrain2 = xTrain1 | |
xTrain3 = xTrain1 | |
# array with training data points for all channels | |
xTrain = numpy.array([xTrain1, xTrain2, xTrain3]) | |
# number of channels and training data points | |
M = xTrain.shape[0] | |
N = xTrain.shape[1] | |
# noisy observed values for the x training points | |
y1 = mu + xTrain1*numpy.sin(xTrain1) + rnormal(0, sigman, xTrain.shape[1]) | |
y2 = mu + xTrain2*numpy.cos(xTrain2) + rnormal(0, sigman, xTrain.shape[1]) | |
y3 = mu + (10-xTrain3)*numpy.sin(xTrain3) + rnormal(0, sigman, xTrain.shape[1]) | |
y = numpy.array([y1, y2, y3]) # array with the multiple channels | |
yVec = y.reshape([1,-1])[0] # concatenated form of data | |
# test points | |
xTest1 = numpy.arange(0, 10.1, 0.1) | |
xTest2 = xTest1 | |
xTest3 = xTest1 | |
xTest = numpy.array([xTest1, xTest2, xTest3]) | |
# "hyperparameters", i.e. parameters | |
params = numpy.sqrt([0.2, 0.1, 0.3]) # kernel 'bandwidth' parameters | |
Kf = numpy.array([[5, 0.7, 0.4],[0.7, 4, 0.5],[0.4, 0.5, 3]]) # signal variance matrix | |
# compute covariance matrices | |
Kss = compute_multi_kss_matrix(xTest, params, Kf) | |
K = compute_multi_k_matrix(xTrain, params, Kf) | |
Ks = compute_multi_ks_matrix(xTrain, xTest, params, Kf) | |
# get cholesky decomposition of the covariance matrix | |
L = numpy.linalg.cholesky(K + (sigman**2)*numpy.eye(M*N)) | |
# compute solutions for mean channel values, following algorithm in Rasmussen and Williams, adapted for multiple channels | |
alpha = numpy.linalg.solve(L.T, numpy.linalg.solve(L, yVec.T)) | |
fs = numpy.array([numpy.dot(Ks[l], alpha) for l in range(M)]) | |
# compute variances for the channels | |
vee = numpy.array([numpy.linalg.solve(L, Ks[l].T) for l in range(M)]) | |
Var = numpy.array([Kss[l] - numpy.dot(vee[l].T, vee[l]) for l in range(M)]) | |
sd = numpy.array([numpy.sqrt(numpy.diag(Var[l])) for l in range(M)]) | |
# the functions generating the 'true' values | |
fully1 = mu+xTest1*numpy.sin(xTest1) | |
fully2 = mu+xTest2*numpy.cos(xTest2) | |
fully3 = mu+(xTest3[-1]-xTest3)*numpy.sin(xTest3) | |
fully = numpy.array([fully1, fully2, fully3]) | |
# plot results | |
channelcolor = ["CornFlowerBlue", "Crimson", "DarkOrange"] | |
pyplot.figure() | |
for l in range(M): | |
pyplot.plot(xTest[l].T, fully[l], color=channelcolor[l]) | |
pyplot.plot(xTrain[l].T, y[l].T, 'o', color=channelcolor[l]) | |
pyplot.axis([0, 10, 2*y.min(), 2*y.max()]) | |
pyplot.title('True model') | |
for l in range(M): | |
pyplot.figure() | |
pyplot.plot(xTest[l], fully[l], color="Black", alpha=0.9-l*0.2) | |
pyplot.plot(xTest[l].T, fs[l], color=channelcolor[l]) | |
pyplot.fill_between(xTest[l], fs[l]-2*sd[l], fs[l]+2*sd[l], color=channelcolor[l], alpha=0.2) | |
pyplot.plot(xTrain[l].T, y[l].T, 'o', color=channelcolor[l]) | |
pyplot.axis([0, 10, 2*y.min(), y.max()*2]) | |
pyplot.title('Samples from the GP posterior') | |
pyplot.show() | |
# exec(open("./threechannel_gaussianprocess.py").read()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment