Skip to content

Instantly share code, notes, and snippets.

@vdutor
Created March 2, 2018 11:26
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vdutor/0c720e1399e6bf7eae5c2bf32cdfb47d to your computer and use it in GitHub Desktop.
Save vdutor/0c720e1399e6bf7eae5c2bf32cdfb47d to your computer and use it in GitHub Desktop.
GPAR-L kernel implementation and test
## Attempt at implementing the Gaussian Process Autoregressive Regression Model (GPAR).
#
# Main reference:
#
# @article{requeima2018gaussian,
# title={The Gaussian Process Autoregressive Regression Model (GPAR)},
# author={Requeima, James and Tebbutt, Will and Bruinsma, Wessel and Turner, Richard E},
# journal={arXiv preprint arXiv:1802.07182},
# year={2018}
# }
import gpflow
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from gpflow.kernels import RBF, White, RationalQuadratic
from gpflow.models import GPR
from gpflow.training import ScipyOptimizer
from gpflow.decors import params_as_tensors
class GPAR_L_Kernel(gpflow.kernels.Kernel):
def __init__(self, X_dim, output_i, base_kernel, output_kernels):
"""
Kernel is used for the i-th output, given by `output_i`
"""
assert len(output_kernels) == output_i - 1
super().__init__(X_dim + output_i - 1)
self.X_dim = X_dim
self.output_i = output_i
self.base_kernel = base_kernel
self.output_kernels = gpflow.ParamList(output_kernels)
@params_as_tensors
def K(self, X, X2=None):
if X2 is None:
X2 = X
K = self.base_kernel.K(X[:, :self.X_dim], X2[:, :self.X_dim])
for j in range(self.output_i - 1):
yy = X[:, self.X_dim + j][:, None] * X2[:, self.X_dim + j][None, :]
K += self.output_kernels[j].K(X[:, :self.X_dim], X2[:, :self.X_dim]) * yy
return K
@params_as_tensors
def Kdiag(self, X):
Kdiag = self.base_kernel.Kdiag(X[:, :self.X_dim])
for j in range(self.output_i - 1):
yy = X[:, self.X_dim + j] ** 2
Kdiag += self.output_kernels[j].Kdiag(X[:, :self.X_dim]) ** yy
return Kdiag
# / -------------------- Experiment 1 ----------------- /
def data(N, noise=0.01):
x = np.linspace(0, 1, N)
f1 = -np.sin(10 * np.pi * (x + 1)) / (2 * x + 1) - x**4
y1 = f1 + noise * np.random.randn(*x.shape)
f2 = np.cos(y1)**2 + np.sin(3 * x)
y2 = f2 + noise * np.random.randn(*x.shape)
f3 = y2 * y1**2 + 3*x
y3 = f3 + noise * np.random.randn(*x.shape)
return np.stack((x, y1, y2, y3), axis=1)
#### Main
# Constants
N = 40
SIGMA = 0.05
XY = data(N, noise=SIGMA)
XY_true = data(500, noise=0)
X_DIM = 1
OUTPUTS = 3
models = []
# Training
for i in range(OUTPUTS):
base_kernel = RationalQuadratic(X_DIM)
output_kernels = [RationalQuadratic(X_DIM) for _ in range(i)]
gpar_kernel = GPAR_L_Kernel(X_DIM, i + 1, base_kernel, output_kernels)
model = GPR(XY[:, :X_DIM + i], XY[:, [X_DIM + i]], gpar_kernel)
print(model.compute_log_likelihood())
ScipyOptimizer().minimize(model, maxiter=1000)
print(model.compute_log_likelihood())
models.append(model)
# Test time
x_test = np.linspace(0, 1, 500).reshape(-1, 1)
for i in range(OUTPUTS):
y, _ = models[i].predict_f(x_test)
x_test = np.concatenate((x_test, y), axis=1)
# Plotting
fig, axes = plt.subplots(1, 2)
axes[0].plot(XY[:, 0], XY[:, 2], "o", ms=2, label="true")
axes[0].plot(XY_true[:, 0], XY_true[:, 2], ms=2, label="data")
axes[0].plot(x_test[:, 0], x_test[:, 2], label="prediction")
axes[0].legend()
axes[1].plot(XY[:, 0], XY[:, 3], "o", ms=2, label="true")
axes[1].plot(XY_true[:, 0], XY_true[:, 3], ms=2, label="data")
axes[1].plot(x_test[:, 0], x_test[:, 3], label="prediction")
axes[1].legend()
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment