from .....lib.gaussian_process_regression import GaussianProcessRegressorBase, \
LogGaussianProcessRegressor
from .....lib.gaussian_process_regression.implementations.sklearn \
import ScikitLearnGaussianProcessRegressor, kernels
from abc import abstractmethod
import numpy as np
[docs]
class TASGaussianProcessRegressorBase(GaussianProcessRegressorBase):
"""Base class for TAS-related Gaussian process regressors."""
def __init__(self, limits):
super().__init__()
self.limits_ = np.asarray(limits)
@property
def limits(self):
return self.limits_
@property
def dim(self):
return len(self.limits)
[docs]
class TASLogGaussianProcessRegressor(LogGaussianProcessRegressor, TASGaussianProcessRegressorBase):
"""Class for TAS-related log-Gaussian process regressors."""
def __init__(self, *, limits,
kernel_num_restarts_optim, kernel_bounds_variance, kernel_bounds_length_scales,
random_state=None):
kernel = kernels.ConstantKernel(1., kernel_bounds_variance) * \
kernels.RBF(len(limits) * [1.], kernel_bounds_length_scales)
gpr_impl = ScikitLearnGaussianProcessRegressor(
kernel=kernel, optimizer="fmin_l_bfgs_b",
n_restarts_optimizer=kernel_num_restarts_optim,
normalize_y=False, copy_X_train=True, random_state=random_state)
LogGaussianProcessRegressor.__init__(self, gpr_impl)
TASGaussianProcessRegressorBase.__init__(self, limits)
[docs]
class PotentialRotationDecorator(TASGaussianProcessRegressorBase):
"""Decorator class for TAS-related Gaussian process regressors.
It rotates the coordinate system if a criterion involving length scales of the Gaussian process is fulfilled."""
def __init__(self, tas_gpr: TASGaussianProcessRegressorBase,
rotation_matrix, length_scales_min_fact=1e-3, length_scales_max_fact=1.) -> None:
super().__init__(tas_gpr.limits)
self.tas_gpr = tas_gpr
assert np.shape(rotation_matrix) == (self.dim, self.dim)
assert np.linalg.det(rotation_matrix) == 1 # check that matrix is a proper rotation
self.rotation_matrix = np.asarray(rotation_matrix)
assert length_scales_min_fact < length_scales_max_fact
self.length_scales_min = length_scales_min_fact * (self.limits[:, 1] - self.limits[:, 0])
self.length_scales_max = length_scales_max_fact * (self.limits[:, 1] - self.limits[:, 0])
self.rotation_active = False
def __call__(self, x, return_std=False):
if self.rotation_active:
x = self._rotate(x)
return self.tas_gpr(x, return_std=return_std)
[docs]
def fit(self, x, y, noise):
if self.rotation_active:
self.tas_gpr.fit(self._rotate(x), y, noise)
else:
self.tas_gpr.fit(x, y, noise)
if self._rotation_needed():
self.tas_gpr.fit(self._rotate(x), y, noise)
self.rotation_active = True
@property
def x_train(self):
return self._rotate_inv(self.tas_gpr.x_train) \
if self.rotation_active else self.tas_gpr.x_train
@property
def y_train(self):
return self.tas_gpr.y_train
@property
def noise(self):
return self.tas_gpr.noise
@property
def kernel(self):
return self.tas_gpr.kernel
@property
def kernel_hyperparameters(self):
return self.tas_gpr.kernel_hyperparameters
@property
def kernel_length_scales(self):
return self.tas_gpr.kernel_length_scales
[docs]
def add_data(self, x_new, y_new, noise):
if self.rotation_active:
self.tas_gpr.add_data(self._rotate(x_new), y_new, noise)
else:
self.tas_gpr.add_data(x_new, y_new, noise)
if self._rotation_needed():
self.tas_gpr.fit(self._rotate(self.tas_gpr.x_train),
self.tas_gpr.y_train,
self.tas_gpr.noise)
self.rotation_active = True
[docs]
def stop_optimization(self):
self.tas_gpr.stop_optimization()
def _rotate(self, x):
a = self.limits[:, 0]
return np.dot(x - a, self.rotation_matrix) + a
def _rotate_inv(self, x):
a = self.limits[:, 0]
return np.dot(x - a, self.rotation_matrix.T) + a
def _rotation_needed(self):
return np.any(self.kernel_length_scales > self.length_scales_max) \
or np.any(self.kernel_length_scales < self.length_scales_min)