Source code for ariane.app.tas.server._server_gpr.gpr

from .....lib.gaussian_process_regression import GaussianProcessRegressorBase, \
    LogGaussianProcessRegressor
from .....lib.gaussian_process_regression.implementations.sklearn \
    import ScikitLearnGaussianProcessRegressor, kernels

from abc import abstractmethod
import numpy as np


[docs] class TASGaussianProcessRegressorBase(GaussianProcessRegressorBase): """Base class for TAS-related Gaussian process regressors.""" def __init__(self, limits): super().__init__() self.limits_ = np.asarray(limits) @property def limits(self): return self.limits_ @property def dim(self): return len(self.limits)
[docs] class TASLogGaussianProcessRegressor(LogGaussianProcessRegressor, TASGaussianProcessRegressorBase): """Class for TAS-related log-Gaussian process regressors.""" def __init__(self, *, limits, kernel_num_restarts_optim, kernel_bounds_variance, kernel_bounds_length_scales, random_state=None): kernel = kernels.ConstantKernel(1., kernel_bounds_variance) * \ kernels.RBF(len(limits) * [1.], kernel_bounds_length_scales) gpr_impl = ScikitLearnGaussianProcessRegressor( kernel=kernel, optimizer="fmin_l_bfgs_b", n_restarts_optimizer=kernel_num_restarts_optim, normalize_y=False, copy_X_train=True, random_state=random_state) LogGaussianProcessRegressor.__init__(self, gpr_impl) TASGaussianProcessRegressorBase.__init__(self, limits)
[docs] class PotentialRotationDecorator(TASGaussianProcessRegressorBase): """Decorator class for TAS-related Gaussian process regressors. It rotates the coordinate system if a criterion involving length scales of the Gaussian process is fulfilled.""" def __init__(self, tas_gpr: TASGaussianProcessRegressorBase, rotation_matrix, length_scales_min_fact=1e-3, length_scales_max_fact=1.) -> None: super().__init__(tas_gpr.limits) self.tas_gpr = tas_gpr assert np.shape(rotation_matrix) == (self.dim, self.dim) assert np.linalg.det(rotation_matrix) == 1 # check that matrix is a proper rotation self.rotation_matrix = np.asarray(rotation_matrix) assert length_scales_min_fact < length_scales_max_fact self.length_scales_min = length_scales_min_fact * (self.limits[:, 1] - self.limits[:, 0]) self.length_scales_max = length_scales_max_fact * (self.limits[:, 1] - self.limits[:, 0]) self.rotation_active = False def __call__(self, x, return_std=False): if self.rotation_active: x = self._rotate(x) return self.tas_gpr(x, return_std=return_std)
[docs] def fit(self, x, y, noise): if self.rotation_active: self.tas_gpr.fit(self._rotate(x), y, noise) else: self.tas_gpr.fit(x, y, noise) if self._rotation_needed(): self.tas_gpr.fit(self._rotate(x), y, noise) self.rotation_active = True
@property def x_train(self): return self._rotate_inv(self.tas_gpr.x_train) \ if self.rotation_active else self.tas_gpr.x_train @property def y_train(self): return self.tas_gpr.y_train @property def noise(self): return self.tas_gpr.noise @property def kernel(self): return self.tas_gpr.kernel @property def kernel_hyperparameters(self): return self.tas_gpr.kernel_hyperparameters @property def kernel_length_scales(self): return self.tas_gpr.kernel_length_scales
[docs] def add_data(self, x_new, y_new, noise): if self.rotation_active: self.tas_gpr.add_data(self._rotate(x_new), y_new, noise) else: self.tas_gpr.add_data(x_new, y_new, noise) if self._rotation_needed(): self.tas_gpr.fit(self._rotate(self.tas_gpr.x_train), self.tas_gpr.y_train, self.tas_gpr.noise) self.rotation_active = True
[docs] def stop_optimization(self): self.tas_gpr.stop_optimization()
def _rotate(self, x): a = self.limits[:, 0] return np.dot(x - a, self.rotation_matrix) + a def _rotate_inv(self, x): a = self.limits[:, 0] return np.dot(x - a, self.rotation_matrix.T) + a def _rotation_needed(self): return np.any(self.kernel_length_scales > self.length_scales_max) \ or np.any(self.kernel_length_scales < self.length_scales_min)