Created
March 2, 2018 11:26
-
-
Save vdutor/0c720e1399e6bf7eae5c2bf32cdfb47d to your computer and use it in GitHub Desktop.
GPAR-L kernel implementation and test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## Attempt at implementing the Gaussian Process Autoregressive Regression Model (GPAR). | |
# | |
# Main reference: | |
# | |
# @article{requeima2018gaussian, | |
# title={The Gaussian Process Autoregressive Regression Model (GPAR)}, | |
# author={Requeima, James and Tebbutt, Will and Bruinsma, Wessel and Turner, Richard E}, | |
# journal={arXiv preprint arXiv:1802.07182}, | |
# year={2018} | |
# } | |
import gpflow | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import tensorflow as tf | |
from gpflow.kernels import RBF, White, RationalQuadratic | |
from gpflow.models import GPR | |
from gpflow.training import ScipyOptimizer | |
from gpflow.decors import params_as_tensors | |
class GPAR_L_Kernel(gpflow.kernels.Kernel): | |
def __init__(self, X_dim, output_i, base_kernel, output_kernels): | |
""" | |
Kernel is used for the i-th output, given by `output_i` | |
""" | |
assert len(output_kernels) == output_i - 1 | |
super().__init__(X_dim + output_i - 1) | |
self.X_dim = X_dim | |
self.output_i = output_i | |
self.base_kernel = base_kernel | |
self.output_kernels = gpflow.ParamList(output_kernels) | |
@params_as_tensors | |
def K(self, X, X2=None): | |
if X2 is None: | |
X2 = X | |
K = self.base_kernel.K(X[:, :self.X_dim], X2[:, :self.X_dim]) | |
for j in range(self.output_i - 1): | |
yy = X[:, self.X_dim + j][:, None] * X2[:, self.X_dim + j][None, :] | |
K += self.output_kernels[j].K(X[:, :self.X_dim], X2[:, :self.X_dim]) * yy | |
return K | |
@params_as_tensors | |
def Kdiag(self, X): | |
Kdiag = self.base_kernel.Kdiag(X[:, :self.X_dim]) | |
for j in range(self.output_i - 1): | |
yy = X[:, self.X_dim + j] ** 2 | |
Kdiag += self.output_kernels[j].Kdiag(X[:, :self.X_dim]) ** yy | |
return Kdiag | |
# / -------------------- Experiment 1 ----------------- / | |
def data(N, noise=0.01): | |
x = np.linspace(0, 1, N) | |
f1 = -np.sin(10 * np.pi * (x + 1)) / (2 * x + 1) - x**4 | |
y1 = f1 + noise * np.random.randn(*x.shape) | |
f2 = np.cos(y1)**2 + np.sin(3 * x) | |
y2 = f2 + noise * np.random.randn(*x.shape) | |
f3 = y2 * y1**2 + 3*x | |
y3 = f3 + noise * np.random.randn(*x.shape) | |
return np.stack((x, y1, y2, y3), axis=1) | |
#### Main | |
# Constants | |
N = 40 | |
SIGMA = 0.05 | |
XY = data(N, noise=SIGMA) | |
XY_true = data(500, noise=0) | |
X_DIM = 1 | |
OUTPUTS = 3 | |
models = [] | |
# Training | |
for i in range(OUTPUTS): | |
base_kernel = RationalQuadratic(X_DIM) | |
output_kernels = [RationalQuadratic(X_DIM) for _ in range(i)] | |
gpar_kernel = GPAR_L_Kernel(X_DIM, i + 1, base_kernel, output_kernels) | |
model = GPR(XY[:, :X_DIM + i], XY[:, [X_DIM + i]], gpar_kernel) | |
print(model.compute_log_likelihood()) | |
ScipyOptimizer().minimize(model, maxiter=1000) | |
print(model.compute_log_likelihood()) | |
models.append(model) | |
# Test time | |
x_test = np.linspace(0, 1, 500).reshape(-1, 1) | |
for i in range(OUTPUTS): | |
y, _ = models[i].predict_f(x_test) | |
x_test = np.concatenate((x_test, y), axis=1) | |
# Plotting | |
fig, axes = plt.subplots(1, 2) | |
axes[0].plot(XY[:, 0], XY[:, 2], "o", ms=2, label="true") | |
axes[0].plot(XY_true[:, 0], XY_true[:, 2], ms=2, label="data") | |
axes[0].plot(x_test[:, 0], x_test[:, 2], label="prediction") | |
axes[0].legend() | |
axes[1].plot(XY[:, 0], XY[:, 3], "o", ms=2, label="true") | |
axes[1].plot(XY_true[:, 0], XY_true[:, 3], ms=2, label="data") | |
axes[1].plot(x_test[:, 0], x_test[:, 3], label="prediction") | |
axes[1].legend() | |
plt.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment