Skip to content

Instantly share code, notes, and snippets.

@jmbr
Created November 3, 2022 22:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jmbr/47c9ac5b20adbd3c1777fbaf844e2302 to your computer and use it in GitHub Desktop.
Save jmbr/47c9ac5b20adbd3c1777fbaf844e2302 to your computer and use it in GitHub Desktop.
Code for experiments on Bayesian optimization in dimensionally-reduced models
from abc import ABC, abstractmethod
from typing import Optional
import numpy as np
from diffusion_maps import diffusion_maps
from mapping import Mapping
class Embedding(ABC):
"""Embedding of a point cloud.
"""
points: Optional[np.ndarray] = None
images: Optional[np.ndarray] = None
@abstractmethod
def embed(self, points: np.ndarray) -> None:
pass
@abstractmethod
def __call__(self, X: np.ndarray) -> np.ndarray:
pass
@abstractmethod
def preimage(self, Y: np.ndarray) -> np.ndarray:
pass
class IdentityEmbedding(Embedding):
"""Trivial embedding.
"""
def __init__(self) -> None:
self.points: Optional[np.ndarray] = None
self.images: Optional[np.ndarray] = None
def embed(self, points: np.ndarray) -> None:
if self.points is None:
self.points = points.copy()
else:
self.points = np.concatenate((self.points, points))
self.images = self.points
def __call__(self, X: np.ndarray) -> np.ndarray:
return np.atleast_2d(X)
def preimage(self, Y: np.ndarray) -> np.ndarray:
return np.atleast_2d(Y)
class DiffusionMapsEmbedding(Embedding):
"""Embedding using diffusion maps.
"""
def __init__(self, epsilon: float,
codomain_dim: int) -> None:
self.epsilon = epsilon
self.codomain_dim = codomain_dim
self.points: Optional[np.ndarray] = None
self.images: Optional[np.ndarray] = None
self.mapping: Optional[Mapping] = None
self.inverse_mapping: Optional[Mapping] = None
def embed(self, points: np.ndarray) -> None:
if self.points is None:
self.points = points.copy()
else:
self.points = np.concatenate((self.points, points))
ew, ev = diffusion_maps(self.points, self.epsilon**2,
num_eigenpairs=1 + self.codomain_dim)
self.ew = ew.copy()
self.ev = ev.copy()
self.images = ev * ew[np.newaxis, :]**self.epsilon
self.mapping = Mapping(self.points, self.images)
self.inverse_mapping = Mapping(self.images, self.points)
def __call__(self, X: np.ndarray) -> np.ndarray:
"""Return diffusion map coordinates.
"""
assert self.mapping is not None
return self.mapping(X)
def preimage(self, Y: np.ndarray) -> np.ndarray:
"""Return preimage of diffusion map coordinates.
"""
assert self.inverse_mapping is not None
return self.inverse_mapping(Y)
def copy(self):
dme = DiffusionMapsEmbedding(self.epsilon, self.codomain_dim)
dme.embed(self.points.copy())
return dme
from typing import List, Optional
import numpy as np
import scipy
import matplotlib.pyplot as plt
import seaborn as sns
from embedding import Embedding
from point_sampler import PointSampler
from optimization_problem import (AcquisitionFunctionFactory,
SurrogateFunction,
ObjectiveFunction)
DEFAULT_NUM_STEPS: int = 0
DEFAULT_TIME_STEP_LENGTH: float = 1e-8
def relax_points(objective_function: ObjectiveFunction, points: np.ndarray,
num_steps: int = DEFAULT_NUM_STEPS,
time_step_length: float = DEFAULT_TIME_STEP_LENGTH) \
-> np.ndarray:
"""Relax points using gradient descent.
"""
relaxed_points = np.empty_like(points)
for i, point in enumerate(points):
for n in range(num_steps):
grad = objective_function.gradient(point).squeeze()
point -= time_step_length * grad
relaxed_points[i, :] = point
return relaxed_points
class ExperimentError(Exception):
pass
class BaseExperiment:
def __init__(self, embedding: Embedding,
point_sampler: PointSampler,
objective_function: ObjectiveFunction,
acquisition_function_factory: AcquisitionFunctionFactory,
points_per_iteration: int,
radius: float,
initial_point: np.ndarray) -> None:
self.embedding: Embedding = embedding
self.point_sampler: PointSampler = point_sampler
self.points_per_iteration: int = points_per_iteration
self.radius: float = radius # XXX radius belongs to PointSampler
self.initial_point: np.ndarray = initial_point.copy()
self.initial_points = []
self.optima: List[np.ndarray] = []
self.acquisition_function_factory: AcquisitionFunctionFactory \
= acquisition_function_factory
self.objective_function: ObjectiveFunction = objective_function
def __iter__(self):
return self
def __next__(self):
new_points = self.sample_new_points()
self.embedding.embed(new_points)
points, images = self.embedding.points, self.embedding.images
self.acquisition_function = self.acquisition_function_factory.get(
self.objective_function, points, images)
bounds = list(zip(images.min(axis=0) - 0.0025, # XXX hardcoded
images.max(axis=0) + 0.0025))
results = scipy.optimize.minimize(
self.acquisition_function, x0=images[-1, :],
jac=self.acquisition_function.gradient,
bounds=bounds)
print(results)
print()
if results.success is False:
raise ExperimentError(results)
optimum = self.embedding.preimage(results.x)
self.optima.append(relax_points(
self.objective_function, np.atleast_2d(optimum), 2, 1e-3))
return self
class ExperimentMetropolis(BaseExperiment):
def sample_new_points(self):
initial_point = self.optima[-1] if self.optima else self.initial_point
self.latest_raw_points = self.point_sampler.sample_neighbors(
initial_point, self.radius, self.points_per_iteration)
return self.latest_raw_points.copy()
class ExperimentMetropolisPlusRelaxation(ExperimentMetropolis):
def sample_new_points(self):
self.latest_relaxed_points = relax_points(
self.objective_function, super().sample_new_points())
return self.latest_relaxed_points.copy()
class ExperimentMetropolisPlusRelaxationOnlyOnce(ExperimentMetropolis):
def sample_new_points(self):
if self.optima:
# Just add the latest optimum into the available samples.
self.initial_points.append(self.optima[-1].squeeze())
new_point = relax_points(self.objective_function,
self.optima[-1],
num_steps=1)
# return np.atleast_2d(self.optima[-1])
return new_point.copy()
else:
# First iteration.
self.initial_points.append(self.initial_point.squeeze())
new_points = self.point_sampler.sample_neighbors(
self.initial_point, self.radius, self.points_per_iteration)
self.latest_raw_points = new_points.copy()
new_points = relax_points(self.objective_function, new_points)
self.latest_relaxed_points = new_points.copy()
return new_points
class ExperimentPlainBayesianOptimization(BaseExperiment):
def sample_new_points(self):
print('points\n', self.embedding.points)
if self.optima:
# # Just add the latest optimum into the available samples.
# new_point = relax_points(self.objective_function,
# self.optima[-1],
# num_steps=1)
# # return np.atleast_2d(self.optima[-1])
# return new_point.copy()
self.initial_points.append(self.optima[-1].squeeze())
print('.')
return self.optima[-1].copy()
else:
# First iteration.
self.initial_points.append(self.initial_point.squeeze())
new_points = self.point_sampler.sample_neighbors(
self.initial_point, self.radius, self.points_per_iteration)
self.latest_raw_points = new_points.copy()
new_points = relax_points(self.objective_function, new_points)
self.latest_relaxed_points = new_points.copy()
return new_points
from typing import List
import numpy as np
from sklearn.gaussian_process import GaussianProcessRegressor
class Mapping:
"""Mapping between two sets of points.
"""
def __init__(self, X: np.ndarray, Y: np.ndarray) -> None:
self.interpolators: List[GaussianProcessRegressor] = []
for y in Y.T:
interpolator = GaussianProcessRegressor(copy_X_train=False)
interpolator.fit(X, y)
self.interpolators.append(interpolator)
def __call__(self, X: np.ndarray) -> np.ndarray:
X = np.atleast_2d(X)
values = np.zeros((X.shape[0], len(self.interpolators)))
for i, interpolator in enumerate(self.interpolators):
values[:, i] = interpolator.predict(X)
return values
def sample(self, X: np.ndarray, num_samples: int) -> np.ndarray:
Z = np.zeros((num_samples, len(self.interpolators)))
for i, interpolator in enumerate(self.interpolators):
Z[:, i] = interpolator.sample_y(np.atleast_2d(X), num_samples)
return Z
"""Metropolis sampler module.
"""
import itertools
import math
from typing import Callable, Any
import numpy as np
class Metropolis:
"""Implementation of the Metropolis-Hastings algorithm.
"""
def __init__(self, potential: Callable[[np.ndarray, Any], float],
temperature: float,
delta: float,
initial_point: np.ndarray) -> None:
self.potential = potential
self.temperature = temperature
self.delta = delta
self.initial_point = initial_point.copy()
self.current_point = initial_point.copy()
self.dim = initial_point.shape[1]
self.probability = self._compute_probability(initial_point)
self.accepted = 1
self.total = 1
def __repr__(self) -> str:
return (f'{self.__class__.__name__}(potential={self.potential}, '
+ f'temperature={self.temperature}, delta={self.delta}, '
+ f'initial_point={self.current_point})')
def __iter__(self) -> 'Metropolis':
return self
def __next__(self) -> np.ndarray:
candidate = self._generate_candidate()
p_old = self.probability
p_new = self._compute_probability(candidate)
if p_new >= p_old or np.random.rand() * p_old < p_new:
self.current_point = candidate
self.probability = p_new
self.accepted += 1
self.total += 1
return self.current_point
def _generate_candidate(self) -> np.ndarray:
r = self.delta * (2 * np.random.rand(self.dim) - np.ones(self.dim))
return self.current_point + r
def get_acceptance_ratio(self) -> float:
"""Return current acceptance ratio.
"""
return self.accepted / self.total
def _compute_probability(self, point: np.ndarray) -> float:
return math.exp(-self.potential(point) / self.temperature)
def draw_samples(self, num_samples: int, step: int = 1) -> np.ndarray:
"""Sample a number of points from the chain.
"""
points = np.zeros((num_samples // step, self.dim))
iterator = itertools.islice(self, 0, num_samples, step)
for i, point in enumerate(iterator):
points[i, :] = point
return points
import numpy as np
from sklearn.neighbors import DistanceMetric
from sklearn.gaussian_process import GaussianProcessRegressor
class ObjectiveFunction:
def __init__(self, spring_constant1: float, spring_constant2: float,
angle: float) -> None:
self.k1 = spring_constant1
self.k2 = spring_constant2
self.theta = angle
def __call__(self, X: np.ndarray) -> np.ndarray:
X = np.atleast_2d(X)
return (self.k1 * (np.arctan2(X[:, 1], X[:, 0]) - self.theta)**2
+ self.k2 * (X[:, 0]**2 + X[:, 1]**2 - 1.0)**2)
def gradient(self, X: np.ndarray) -> np.ndarray:
X = np.atleast_2d(X)
k1, k2 = self.k1, self.k2
return np.array([-k1 * (4 * np.arctan2(X[:, 1], X[:, 0]) - np.pi)
* X[:, 1] / (2 * X[:, 0]**2 + 2 * X[:, 1]**2)
+ 4 * k2 * (X[:, 0]**2 + X[:, 1]**2 - 1) * X[:, 0],
k1 * (4 * np.arctan2(X[:, 1], X[:, 0]) - np.pi)
* X[:, 0] / (2 * X[:, 0]**2 + 2 * X[:, 1]**2)
+ 4 * k2 * (X[:, 0]**2 + X[:, 1]**2 - 1) * X[:, 1]])
class SurrogateFunction:
def __init__(self, X, y):
# self.gpr = GaussianProcessRegressor(copy_X_train=False)
self.gpr = GaussianProcessRegressor(normalize_y=True)
self.gpr.fit(X, y)
self.length_scale = self.gpr.kernel_.get_params()['k2__length_scale']
def __call__(self, X: np.ndarray, return_std: bool = False):
result = self.gpr.predict(np.atleast_2d(X),
return_std=return_std)
if return_std is True:
return (result[0].squeeze(),
result[1].squeeze())
else:
return result.squeeze()
def gradient(self, X: np.ndarray) -> np.ndarray:
"""Evaluate the gradient of a Gaussian process at a single point.
"""
print('gradient', X)
x = np.atleast_2d(X)
gpr = self.gpr
kxX = gpr.kernel_(x, gpr.X_train_)
length_scale = gpr.kernel_.get_params()['k2__length_scale']
x_minus_X = x - gpr.X_train_
return ((kxX * gpr.alpha_ * (-x_minus_X.T / length_scale**2)).sum(axis=1)
* gpr._y_train_std)
class AcquisitionFunction:
def __init__(self,
objective_function: ObjectiveFunction,
points: np.ndarray, images: np.ndarray,
kappa: float, tau: float) -> None:
self.points = points
self.images = images
self.objective_function = objective_function
self.surrogate_function = SurrogateFunction(
images, objective_function(points))
self.kappa = kappa
self.tau = tau
self.metric = DistanceMetric.get_metric('euclidean')
self.analyze_distances()
def analyze_distances(self) -> None:
distances = np.triu(self.metric.pairwise(self.images))
distances = distances[distances > 0.0]
self.median_distance = np.median(distances)
self.std_distance = np.std(distances)
def _distance_to_data(self, Y: np.ndarray) -> float:
return self.metric.pairwise(np.atleast_2d(Y), self.images).min()
def __call__(self, Y: np.ndarray) -> float:
Y = np.atleast_2d(Y)
mean, std = self.surrogate_function(Y, True)
distance_to_data = self._distance_to_data(Y[-100:, :]) # XXX constant
self.analyze_distances()
return (mean - self.kappa * std
+ self.tau * distance_to_data / self.median_distance)
def gradient(self, X: np.ndarray) -> np.ndarray:
"""Evaluate gradient of a Gaussian process at a single point."""
assert self.kappa == 0.0 and self.tau == 0.0
return self.surrogate_function.gradient(X)
class AcquisitionFunctionFactory:
def __init__(self, kappa: float, tau: float) -> None:
self.kappa = kappa
self.tau = tau
def get(self, objective_function: ObjectiveFunction,
points: np.ndarray, images: np.ndarray) -> AcquisitionFunction:
return AcquisitionFunction(
objective_function, points, images, self.kappa, self.tau)
from typing import Optional
from abc import ABC, abstractmethod
import random
import numpy as np
import scipy.spatial
from metropolis import Metropolis
from optimization_problem import ObjectiveFunction
class PointSampler(ABC):
@abstractmethod
def sample_points(self, num_points: int) -> np.ndarray:
pass
@abstractmethod
def sample_neighbors(self, point: np.ndarray, radius: float,
max_neighbors: Optional[int] = None) -> np.ndarray:
pass
def make_synthetic_data(num_points, noise_variance: float,
shuffle: bool = True) -> np.ndarray:
"""Sample num_points from quarter-circle with additive noise.
"""
t = np.linspace(0.0, np.pi / 2.0, num_points)
points = np.array([np.cos(t), np.sin(t)]).T
covariance_matrix = noise_variance * np.eye(points.shape[-1])
noise = np.random.multivariate_normal([0, 0], covariance_matrix,
size=num_points)
permutation = np.arange(num_points)
if shuffle is True:
random.shuffle(permutation)
points = points[permutation, :]
return points + noise
class PoolPointSampler(PointSampler):
"""Sample points from a generated data set.
"""
def __init__(self, total_points: int, noise_variance: float) \
-> None:
self.pool = make_synthetic_data(total_points, noise_variance,
shuffle=False)
self.kdtree = scipy.spatial.cKDTree(self.pool)
def sample_points(self, num_points: int) -> np.ndarray:
permutation = np.random.permutation(self.pool.shape[0])
return self.pool[permutation[:num_points], :]
def sample_neighbors(self, point: np.ndarray, radius: float,
max_neighbors: Optional[int] = None) -> np.ndarray:
point = np.atleast_2d(point)
new_indices = self.kdtree.query_ball_point(point.squeeze(), r=radius)
random.shuffle(new_indices)
return self.pool[new_indices[:max_neighbors], :]
class MetropolisPointSampler(PointSampler):
def __init__(self, objective_function: ObjectiveFunction,
temperature: float, delta: float,
initial_point: np.ndarray) -> None:
self.objective_function = objective_function
self.temperature = temperature
self.delta = delta
self.initial_point = np.atleast_2d(initial_point)
def sample_points(self, num_points: int) -> np.ndarray:
metropolis = Metropolis(
self.objective_function, self.temperature, self.delta,
self.initial_point)
return metropolis.draw_samples(10 * num_points, step=10)
def sample_neighbors(self, point: np.ndarray,
radius: float = None,
max_neighbors: Optional[int] = None) -> np.ndarray:
self.initial_point = np.atleast_2d(point)
metropolis = Metropolis(
self.objective_function, self.temperature, self.delta,
np.atleast_2d(point))
return metropolis.draw_samples(10 * max_neighbors, step=10)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment