Instantly share code, notes, and snippets.

Embed
What would you like to do?
GPAR-L kernel implementation and test
## Attempt at implementing the Gaussian Process Autoregressive Regression Model (GPAR).
#
# Main reference:
#
# @article{requeima2018gaussian,
# title={The Gaussian Process Autoregressive Regression Model (GPAR)},
# author={Requeima, James and Tebbutt, Will and Bruinsma, Wessel and Turner, Richard E},
# journal={arXiv preprint arXiv:1802.07182},
# year={2018}
# }
import gpflow
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from gpflow.kernels import RBF, White, RationalQuadratic
from gpflow.models import GPR
from gpflow.training import ScipyOptimizer
from gpflow.decors import params_as_tensors
class GPAR_L_Kernel(gpflow.kernels.Kernel):
def __init__(self, X_dim, output_i, base_kernel, output_kernels):
"""
Kernel is used for the i-th output, given by `output_i`
"""
assert len(output_kernels) == output_i - 1
super().__init__(X_dim + output_i - 1)
self.X_dim = X_dim
self.output_i = output_i
self.base_kernel = base_kernel
self.output_kernels = gpflow.ParamList(output_kernels)
@params_as_tensors
def K(self, X, X2=None):
if X2 is None:
X2 = X
K = self.base_kernel.K(X[:, :self.X_dim], X2[:, :self.X_dim])
for j in range(self.output_i - 1):
yy = X[:, self.X_dim + j][:, None] * X2[:, self.X_dim + j][None, :]
K += self.output_kernels[j].K(X[:, :self.X_dim], X2[:, :self.X_dim]) * yy
return K
@params_as_tensors
def Kdiag(self, X):
Kdiag = self.base_kernel.Kdiag(X[:, :self.X_dim])
for j in range(self.output_i - 1):
yy = X[:, self.X_dim + j] ** 2
Kdiag += self.output_kernels[j].Kdiag(X[:, :self.X_dim]) ** yy
return Kdiag
# / -------------------- Experiment 1 ----------------- /
def data(N, noise=0.01):
x = np.linspace(0, 1, N)
f1 = -np.sin(10 * np.pi * (x + 1)) / (2 * x + 1) - x**4
y1 = f1 + noise * np.random.randn(*x.shape)
f2 = np.cos(y1)**2 + np.sin(3 * x)
y2 = f2 + noise * np.random.randn(*x.shape)
f3 = y2 * y1**2 + 3*x
y3 = f3 + noise * np.random.randn(*x.shape)
return np.stack((x, y1, y2, y3), axis=1)
#### Main
# Constants
N = 40
SIGMA = 0.05
XY = data(N, noise=SIGMA)
XY_true = data(500, noise=0)
X_DIM = 1
OUTPUTS = 3
models = []
# Training
for i in range(OUTPUTS):
base_kernel = RationalQuadratic(X_DIM)
output_kernels = [RationalQuadratic(X_DIM) for _ in range(i)]
gpar_kernel = GPAR_L_Kernel(X_DIM, i + 1, base_kernel, output_kernels)
model = GPR(XY[:, :X_DIM + i], XY[:, [X_DIM + i]], gpar_kernel)
print(model.compute_log_likelihood())
ScipyOptimizer().minimize(model, maxiter=1000)
print(model.compute_log_likelihood())
models.append(model)
# Test time
x_test = np.linspace(0, 1, 500).reshape(-1, 1)
for i in range(OUTPUTS):
y, _ = models[i].predict_f(x_test)
x_test = np.concatenate((x_test, y), axis=1)
# Plotting
fig, axes = plt.subplots(1, 2)
axes[0].plot(XY[:, 0], XY[:, 2], "o", ms=2, label="true")
axes[0].plot(XY_true[:, 0], XY_true[:, 2], ms=2, label="data")
axes[0].plot(x_test[:, 0], x_test[:, 2], label="prediction")
axes[0].legend()
axes[1].plot(XY[:, 0], XY[:, 3], "o", ms=2, label="true")
axes[1].plot(XY_true[:, 0], XY_true[:, 3], ms=2, label="data")
axes[1].plot(x_test[:, 0], x_test[:, 3], label="prediction")
axes[1].legend()
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment