Source code for ariane.app.tas.server._server_gpr.server

from .gpr import PotentialRotationDecorator, TASLogGaussianProcessRegressor
from ...server import PORT_DEFAULT, TASServer
from ..... import VERSION as ARIANE_VERSION
from .....lib.gaussian_process_regression.learning import acquisitions
from .....lib.utils import logging, ROOT_DIRECTORY
from .....lib.utils.config import FOLDER as FOLDER_CONFIG
from .....lib.utils.math import make_grid, normalized_linear_interpolator, rotation_matrix_2d

import numpy as np
from scipy.optimize import differential_evolution


VERSION = ARIANE_VERSION

CONFIG_ID = "ariane-tas-server-gpr"
CONFIG_FILE_NAME = f"{CONFIG_ID}.cfg"
CONFIG_FILE_PATH = f"{ROOT_DIRECTORY}/{FOLDER_CONFIG}/{CONFIG_FILE_NAME}"


[docs] def compute_heuris_experi_param(intens, level_backgr_diffs_rel_max, level_backgr_diffs_abs_min, level_backgr_decile_max, thresh_intens_fact, level_backgr=None): """Compute heuristic for experimental parameters like the background level and intensity threshold with intensities in `intens`. """ # if there are not enough intensities, return default values if len(intens) < 10: return level_backgr, None # compute deciles (prepend minimum and append maximum intensity for computing buckets) deciles = [-np.inf] + [np.percentile(intens, q=10 * n) for n in range(1, 10)] + [np.inf] # buckets are limited by deciles of intensities buckets = [intens[np.logical_and(deciles[n] < intens, intens <= deciles[n + 1])] for n in range(10)] # values are computed as the median of buckets # (if not empty (happens if several intensities have the same value), # otherwise take corresponding decile) medians = np.array([np.median(bucket) if len(bucket) > 0 else deciles[n] for n, bucket in enumerate(buckets)]) # compute background level using (relative and absolute) differences # in `medians` (first derivative) diffs_abs = medians[1:] - medians[:-1] diffs_rel = diffs_abs / medians[:-1] if level_backgr is None: index_level_backgr = min(level_backgr_decile_max - 1, np.argmax( np.logical_or( np.logical_and( diffs_rel > level_backgr_diffs_rel_max, diffs_abs >= level_backgr_diffs_abs_min), np.arange(len(diffs_abs)) == len(diffs_abs) - 1))) level_backgr = medians[index_level_backgr] else: index_level_backgr = max(0, np.argmax(level_backgr < medians) - 1) # compute intensity threshold thresh_intens = level_backgr + thresh_intens_fact * (max(medians) - level_backgr) return level_backgr, thresh_intens
[docs] def maximize_obj_fct(obj_fct, limits, *, random_state=None): res = differential_evolution(func=lambda x: -obj_fct(x), bounds=limits, seed=random_state) if not res.success: raise Exception("maximization of objective function was not successful") return res.x, -res.fun
[docs] def difference_hyperparameters(param1, param2): """Compute the difference of two arrays of kernel hyperparameters. Parameters ---------- param1, param2 : array_like Both arrays have to be of the same length. Returns ------- difference_hyperparameters : float """ assert len(param1) == len(param2) return np.linalg.norm(np.log(param1) - np.log(param2), ord=2) \ / np.linalg.norm(np.log(param2), ord=2)
[docs] def criterion_kernel_optims_stop(num_kernel_optims, num_kernel_optims_min, num_kernel_optims_max, diffs_kernel, kernel_optims_stop_num_last, kernel_optims_stop_eps): """Boolean criterion for stopping optimization of kernel hyperparameters. If True, the optimization is supposed to be stopped for performance reasons. """ return num_kernel_optims > num_kernel_optims_max \ or (num_kernel_optims >= num_kernel_optims_min and len(diffs_kernel) >= kernel_optims_stop_num_last and np.mean(diffs_kernel[-kernel_optims_stop_num_last:]) < kernel_optims_stop_eps)
[docs] class GPRTASServer(TASServer): """Class implementing template methods from `TASServer` based on log-Gaussian processes.""" def __init__(self, gpr_acq, *, config, random_seed=None, port=PORT_DEFAULT): super().__init__(version=VERSION, method='GPR', config=config, random_seed=random_seed, port=port) self.logger.info(f"Start {self.__class__.__name__}...") # read configuration config = self.config[CONFIG_ID] level_backgr_diffs_rel_max = float(config["level_backgr_diffs_rel_max"]) level_backgr_diffs_abs_min = float(config["level_backgr_diffs_abs_min"]) level_backgr_decile_max = int(config["level_backgr_decile_max"]) thresh_intens_fact = float(config["thresh_intens_fact"]) num_kernel_optim_restarts = int(config["num_kernel_optim_restarts"]) kernel_bounds_variance = \ [float(config["kernel_variance_bounds_min"]), float(config["kernel_variance_bounds_max"])] kernel_bounds_length_scales = \ [float(config["kernel_length_scales_bounds_min"]), float(config["kernel_length_scales_bounds_max"])] num_kernel_optims_min = int(config["num_kernel_optims_min"]) num_kernel_optims_max = int(config["num_kernel_optims_max"]) kernel_optims_stop_num_last = int(config["kernel_optims_stop_num_last"]) kernel_optims_stop_eps = float(config["kernel_optims_stop_eps"]) length_scales_min_fact = float(config["length_scales_min_fact"]) length_scales_max_fact = float(config["length_scales_max_fact"]) # num_meas_pts_post_init_min = int(config["num_meas_pts_post_init_min"]) # meas_pts_stop_num_last = int(config["meas_pts_stop_num_last"]) # meas_pts_stop_eps = float(config["meas_pts_stop_eps"]) self.gpr_acq = gpr_acq # parameters for autonomous computation of background level assert level_backgr_diffs_rel_max > 0 assert level_backgr_diffs_abs_min > 0 assert 1 <= level_backgr_decile_max <= 9 self.level_backgr_diffs_rel_max = level_backgr_diffs_rel_max self.level_backgr_diffs_abs_min = level_backgr_diffs_abs_min self.level_backgr_decile_max = level_backgr_decile_max # parameters for autonomous computation of intensity threshold assert 0 < thresh_intens_fact <= 1 self.thresh_intens_fact = thresh_intens_fact # parameters for kernel optimization assert num_kernel_optim_restarts >= 0 assert len(kernel_bounds_variance) == len(kernel_bounds_length_scales) == 2 self.num_kernel_optim_restarts = num_kernel_optim_restarts self.kernel_bounds_variance = kernel_bounds_variance self.kernel_bounds_length_scales = kernel_bounds_length_scales # parameters for criterion of stopping kernel optimization assert 2 <= kernel_optims_stop_num_last <= num_kernel_optims_min <= num_kernel_optims_max self.num_kernel_optims_min = num_kernel_optims_min self.num_kernel_optims_max = num_kernel_optims_max self.kernel_optims_stop_num_last = kernel_optims_stop_num_last self.kernel_optims_stop_eps = kernel_optims_stop_eps # factors for identification of erroneous length scales assert length_scales_min_fact > 0 assert length_scales_max_fact > 0 self.length_scales_min_fact = length_scales_min_fact self.length_scales_max_fact = length_scales_max_fact # # parameters for criterion of stopping algorithm # assert num_meas_pts_post_init_min > 0 # assert meas_pts_stop_num_last > 1 # assert meas_pts_stop_eps > 0 # self.num_meas_pts_post_init_min = num_meas_pts_post_init_min # self.meas_pts_stop_num_last = meas_pts_stop_num_last # self.meas_pts_stop_eps = meas_pts_stop_eps self._init() self.logger.info(f"{self.__class__.__name__} started.") def _init(self): self.x_train_init = np.array([]) self.num_meas_pts_init = None # number of initial measurement points self.det_cts = np.array([]) # detector counts self.mon_cts = np.array([]) # monitor counts self.mon_cts0 = None # first monitor count for normalization self.intens = np.array([]) # normalized intensities self.matrices_consumed_ellipses = np.array([]) # matrices for ellipses around consumed locs self.x_problem = np.array([]) # problem locations self.matrices_problem_ellipses = np.array([]) # matrices for ellipses around problem locs self.tas_gpr = None # variables for status of kernel optimizations self.kernel_optim_stopped = False self.num_kernel_optims = 0 self.diffs_kernel = [] # variables for logging acquisitions self.acqs = [] self.diffs_acqs = [] self.level_backgr_automated = None # indicating automatically computed background level self.thresh_intens_automated = None # indicating automatically computed intensity threshold # travel cost-related variables self.travel_cost_fct = None self.travel_costs_centers = [] self.travel_costs_grids = [] self.travel_costs_values = [] self.travel_times = [] # pairs of travel time and corresponding number of measurement points self.counting_times = [] # pairs of counting time and corresponding number of measurement points @property def method_initializable(self): return len(self.intens) > 10 def _init_method(self): # set background level and intensity threshold self.level_backgr_automated = False self.thresh_intens_automated = False if self.level_backgr is None or self.thresh_intens is None: level_backgr, thresh_intens = compute_heuris_experi_param( self.intens, self.level_backgr_diffs_rel_max, self.level_backgr_diffs_abs_min, self.level_backgr_decile_max, self.thresh_intens_fact, self.level_backgr) if self.level_backgr is None: self.level_backgr = level_backgr self.level_backgr_automated = True if self.thresh_intens is None: self.thresh_intens = thresh_intens self.thresh_intens_automated = True assert self.level_backgr < self.thresh_intens # compute initial training data and corresponding noise y_train_init, error_init = process_det_cts_to_training_data( self.det_cts, self.mon_cts, self.mon_cts0, self.level_backgr, self.thresh_intens) noise_train_init = error_init**2 # instantiate and initialize potentially rotated TAS log-GPR self.tas_gpr = PotentialRotationDecorator( TASLogGaussianProcessRegressor( limits=self.limits, kernel_num_restarts_optim=self.num_kernel_optim_restarts, kernel_bounds_variance=self.kernel_bounds_variance, kernel_bounds_length_scales=self.kernel_bounds_length_scales, random_state=self.random_state), rotation_matrix_2d(45), length_scales_min_fact=self.length_scales_min_fact, length_scales_max_fact=self.length_scales_max_fact) self.tas_gpr.fit(self.x_train_init, y_train_init, noise=noise_train_init) self.num_meas_pts_init = len(self.x_train_init) del self.x_train_init # not needed after initialization # set GPR acquisition function self.gpr_acq_fct = self.gpr_acq(self.tas_gpr) # set objective function def obj_fct(x): diffs_x_train = (x - self.x_train)[:, np.newaxis, :] if np.any(diffs_x_train @ self.matrices_consumed_ellipses @ np.transpose(diffs_x_train, axes=(0, 2, 1)) <= 1): return -1 if len(self.x_problem) > 0: diffs_x_problem = (x - self.x_problem)[:, np.newaxis, :] if np.any(diffs_x_problem @ self.matrices_problem_ellipses @ np.transpose(diffs_x_problem, axes=(0, 2, 1)) <= 1): return -1 return self.gpr_acq_fct(x) / ((self.travel_cost_fct(x) / self.travel_cost_max + 1) if self.travel_cost_fct is not None else 1) self.obj_fct = obj_fct self.logger.info("Initialization finished.") self.logger.info(f"Background level: {self.level_backgr}") self.logger.info(f"Intensity threshold: {self.thresh_intens}") @property def method_initialized(self): return self.gpr_initialized @property def param_optim_stopped(self): return self.kernel_optim_stopped @property def gpr_initialized(self): return self.tas_gpr is not None @property def x_train(self): return self.tas_gpr.x_train @property def y_train(self): return self.tas_gpr.y_train @property def noise_train(self): return self.tas_gpr.noise @property def current_x(self): return self.x_train[-1] def reset(self, mode, axes, offset, limits, level_backgr=None, thresh_intens=None, travel_cost_max=None, scenario_name=None): self._init() self.mode = mode self.axes = np.array(axes) self.offset = np.array(offset) self.limits = np.array(limits) self.level_backgr = level_backgr self.thresh_intens = thresh_intens self.travel_cost_max = travel_cost_max self.scenario_name = scenario_name def result(self, locs, counts, matrices_ellipses, travel_time, counting_time, travel_cost_grid=None, travel_cost_values=None): # get new measurement locations, counts, and ellipse matrices x_train_new = np.array(locs) cts_new = np.array(counts) matrices_consumed_ellipses_new = np.array(matrices_ellipses) travel_time_new = travel_time counting_time_new = counting_time # get detector counts det_cts_new = cts_new[:, 0] det_cts_new[det_cts_new < 1] = 1 # get monitor counts mon_cts_new = cts_new[:, 1] mon_cts_new[mon_cts_new < 1] = 1 # initialize normalizing monitor counts if not done yet if self.mon_cts0 is None: self.mon_cts0 = float(cts_new[0, 1]) # store detector counts, monitor counts, and (normalized) intensities self.det_cts = np.append(self.det_cts, det_cts_new) self.mon_cts = np.append(self.mon_cts, mon_cts_new) self.intens = np.append(self.intens, normalize_det_cts(det_cts_new, mon_cts_new, self.mon_cts0)) if not self.gpr_initialized: self.x_train_init = np.append(self.x_train_init, x_train_new, axis=0) \ if len(self.x_train_init) > 0 else x_train_new else: # truncate new counts using background and intensity threshold y_train_new, error_train_new = process_det_cts_to_training_data( det_cts_new, mon_cts_new, self.mon_cts0, self.level_backgr, self.thresh_intens) noise_train_new = error_train_new**2 if self.kernel_optim_stopped: self.tas_gpr.add_data(x_train_new, y_train_new, noise=noise_train_new) else: # add data, compute difference of kernel hyperparameters, # and stop kernel optimizations if criterion is met param1 = self.tas_gpr.kernel_hyperparameters self.tas_gpr.add_data(x_train_new, y_train_new, noise=noise_train_new) param2 = self.tas_gpr.kernel_hyperparameters diff_kernel = difference_hyperparameters(param1, param2) self.diffs_kernel.append(diff_kernel) self.num_kernel_optims += 1 if criterion_kernel_optims_stop( self.num_kernel_optims, self.num_kernel_optims_min, self.num_kernel_optims_max, self.diffs_kernel, self.kernel_optims_stop_num_last, self.kernel_optims_stop_eps): self.tas_gpr.stop_optimization() self.kernel_optim_stopped = True # print log message with total number of measurement points received self.logger.info("Total number of measurement points received: " + str(len(self.x_train if self.gpr_initialized else self.x_train_init))) # append ellipse matrices self.matrices_consumed_ellipses = \ np.append(self.matrices_consumed_ellipses, matrices_consumed_ellipses_new, axis=0) \ if len(self.matrices_consumed_ellipses) > 0 else matrices_consumed_ellipses_new # if available, get travel cost grid and compute a travel cost function through linear interpolation. # otherwise, set travel cost function to `None`. if travel_cost_grid is not None and travel_cost_values is not None: travel_cost_grid = np.array(travel_cost_grid) travel_cost_values = np.array(travel_cost_values) self.travel_costs_centers.append(x_train_new[-1].tolist()) self.travel_costs_grids.append(travel_cost_grid.tolist()) self.travel_costs_values.append(travel_cost_values.tolist()) self.travel_cost_fct = normalized_linear_interpolator( travel_cost_grid, travel_cost_values) else: self.travel_cost_fct = None # append travel and counting time self.travel_times.append([travel_time_new, len(x_train_new)]) self.counting_times.append([counting_time_new, len(x_train_new)]) # print log messages with cumulative travel time, counting time, and total time cum_travel_time = round(sum([t for t, _ in self.travel_times]) / 3600, 2) cum_counting_time = round(sum([t for t, _ in self.counting_times]) / 3600, 2) self.logger.info(f"Cum. travel time: {cum_travel_time} hours") self.logger.info(f"Cum. counting time: {cum_counting_time} hours") self.logger.info(f"Total time: {cum_travel_time + cum_counting_time} hours") def next_loc(self): x_train_next, _ = maximize_obj_fct( self.obj_fct, limits=self.limits, random_state=self.random_state) # self.acqs.append(self.tas_gpr([x_train_next], return_std=True)[1][0]) # if len(self.acqs) > 1: # self.diffs_acqs.append(np.abs(self.acqs[-2] - self.acqs[-1]) # / self.acqs[-1]) # if len(self.diffs_acqs) > 0: # self.logger.info(f"Mean of last {self.meas_pts_stop_num_last} diffs acqs:", # np.mean(self.diffs_acqs[-self.meas_pts_stop_num_last:])) # self.logger.info("Upper bound for stopping server:", self.meas_pts_stop_eps) # stop = len(self.x_train)+1 - self.num_meas_pts_init > self.num_meas_pts_post_init_min \ # and len(self.diffs_acqs) >= self.meas_pts_stop_num_last \ # and float(np.mean(self.diffs_acqs[-self.meas_pts_stop_num_last:])) < self.meas_pts_stop_eps stop = False return x_train_next.tolist(), stop def heuris_experi_param(self): return compute_heuris_experi_param( self.intens, self.level_backgr_diffs_rel_max, self.level_backgr_diffs_abs_min, self.level_backgr_decile_max, self.thresh_intens_fact) def problem_locs(self, locs, matrices_ellipses): x_problem_new = np.array(locs) matrices_problem_ellipse_new = np.array(matrices_ellipses) self.x_problem = np.append(self.x_problem, x_problem_new, axis=0) \ if len(self.x_problem) > 0 else x_problem_new self.matrices_problem_ellipses = \ np.append(self.matrices_problem_ellipses, matrices_problem_ellipse_new, axis=0) \ if len(self.matrices_problem_ellipses) > 0 else matrices_problem_ellipse_new def state_internal(self, data): # discretize the posterior mean and standard deviation function on a grid. # the number(s) of points per dimension is given in `data['num']` as a `list` or `int`. num = data['num'] grid = make_grid(self.limits, num) means, stds = self.tas_gpr(grid, return_std=True) ret_data = {'grid': grid.tolist(), 'means': means.tolist(), 'stds': stds.tolist()} return ret_data def stop(self): pass
[docs] def save_state(self, id=None): # save state to a JSON file for diagnostics data_state = {} data_state['method'] = self.method data_state['version'] = self.version data_state['start_time'] = self.start_time.isoformat(' ', timespec='seconds') data_state['end_time'] = self.end_time.isoformat(' ', timespec='seconds') \ if self.end_time is not None else None data_state['scenario_name'] = self.scenario_name data_state['mode'] = self.mode data_state['axes'] = self.axes.tolist() data_state['offset'] = self.offset.tolist() data_state['limits'] = self.limits.tolist() data_state['method_initialized'] = self.method_initialized data_state['x_train'] = np.round(self.x_train if self.method_initialized else self.x_train_init, decimals=3).tolist() data_state['y_train'] = np.round(self.y_train, decimals=2).tolist() \ if self.method_initialized else None data_state['noise_train'] = np.round(self.noise_train, decimals=2).tolist() \ if self.method_initialized else None data_state['det_cts'] = self.det_cts.tolist() data_state['mon_cts'] = self.mon_cts.tolist() data_state['mon_cts0'] = self.mon_cts0 data_state['intens'] = np.round(self.intens, decimals=2).tolist() data_state['matrices_consumed_ellipses'] = \ np.round(self.matrices_consumed_ellipses, decimals=2).tolist() data_state['x_problem'] = np.round(self.x_problem, decimals=3).tolist() data_state['matrices_problem_ellipses'] = \ np.round(self.matrices_problem_ellipses, decimals=2).tolist() data_state['acq_function'] = self.gpr_acq.__name__ data_state['num_meas_pts_init'] = self.num_meas_pts_init \ if self.method_initialized else len(data_state['x_train']) data_state['level_backgr_automated'] = self.level_backgr_automated data_state['level_backgr'] = float(np.round(self.level_backgr, decimals=2)) \ if self.level_backgr is not None else None data_state['level_backgr_diffs_rel_max'] = self.level_backgr_diffs_rel_max data_state['level_backgr_diffs_abs_min'] = self.level_backgr_diffs_abs_min data_state['level_backgr_decile_max'] = self.level_backgr_decile_max data_state['thresh_intens_automated'] = self.thresh_intens_automated data_state['thresh_intens'] = float(np.round(self.thresh_intens, decimals=2)) \ if self.thresh_intens is not None else None data_state['thresh_intens_fact'] = self.thresh_intens_fact data_state['kernel_optims_stopped'] = self.kernel_optim_stopped data_state['rotation_active'] = self.tas_gpr.rotation_active \ if self.method_initialized else None data_state['kernel_final'] = str(self.tas_gpr.kernel) \ if self.method_initialized else None data_state['num_kernel_optims'] = self.num_kernel_optims data_state['num_kernel_optim_restarts'] = self.num_kernel_optim_restarts data_state['kernel_bounds_variance'] = self.kernel_bounds_variance data_state['kernel_bounds_length_scales'] = self.kernel_bounds_length_scales data_state['num_kernel_optims_min'] = self.num_kernel_optims_min data_state['num_kernel_optims_max'] = self.num_kernel_optims_max data_state['kernel_optims_stop_num_last'] = self.kernel_optims_stop_num_last data_state['kernel_optims_stop_eps'] = self.kernel_optims_stop_eps data_state['length_scales_min_fact'] = self.length_scales_min_fact data_state['length_scales_max_fact'] = self.length_scales_max_fact # data_state['num_meas_pts_post_init_min'] = self.num_meas_pts_post_init_min # data_state['meas_pts_stop_num_last'] = self.meas_pts_stop_num_last # data_state['meas_pts_stop_eps'] = self.meas_pts_stop_eps data_state['travel_cost_max'] = float(np.round(self.travel_cost_max, decimals=3)) if len(self.travel_costs_centers) > 0: step = max(len(self.travel_costs_centers) // 5, 1) data_state['travel_costs_centers'] = \ np.round(self.travel_costs_centers[::step], decimals=2).tolist() data_state['travel_costs_grids'] = \ np.round(self.travel_costs_grids[::step], decimals=2).tolist() data_state['travel_costs_values'] = \ np.round(self.travel_costs_values[::step], decimals=2).tolist() data_state['travel_times'] = [[round(t, ndigits=2), n] for t, n in self.travel_times] data_state['counting_times'] = [[round(t, ndigits=2), n] for t, n in self.counting_times] data_state['random_seed'] = self.random_seed logfile_spec = f"ariane_{self.start_time.isoformat('_', timespec='seconds')}" + \ (f"_{self.scenario_name}" if self.scenario_name is not None else "") + \ (f"_{id}" if id is not None else "") logging.save_dict(data_state, f"{logfile_spec}.json")
[docs] def normalize_det_cts(det_cts, mon_cts, mon_cts0): """Normalize detector counts to reference value `mon_cts0`.""" assert len(det_cts) == len(mon_cts) return det_cts / mon_cts * mon_cts0
[docs] def process_det_cts_to_training_data(det_cts, mon_cts, mon_cts0, level_backgr, thresh_intens): """Process detector counts before using it for training. Normalize detector counts by monitor counts with a reference value and adjust them by truncating to a intensity threshold and subtracting a background level. Parameters ---------- det_cts : array_like Detector counts. mon_cts : array_like Monitor counts. mon_cts0 : float Reference value for monitor counts. level_backgr : float Background level. thresh_intens : float Intensity threshold. Returns ------- intens_train : ndarray of float Intensities for training error_train : ndarray of float Errors (noise) for training. The array has the same length as `intens_train`. """ assert len(det_cts) == len(mon_cts) # normalize detector counts to "intensities" intens_train = normalize_det_cts(det_cts, mon_cts, mon_cts0) # normalize error in detector counts error_train = normalize_det_cts(np.sqrt(det_cts), mon_cts, mon_cts0) if thresh_intens is not None: # set errors of intensities above threshold to "error" in threshold value error_train[intens_train > thresh_intens] = np.sqrt(thresh_intens) # truncate intensities above threshold intens_train[intens_train > thresh_intens] = thresh_intens if level_backgr is not None: # subtract background level from intensities intens_train -= level_backgr # ensure that intensities count at least 1 intens_train[intens_train < 1] = 1 return intens_train, error_train
# auxiliary function for running a server instance from outside
[docs] def run(*, config=None, random_seed=None, port=None): if config is None: config = CONFIG_FILE_PATH if port is None: port = PORT_DEFAULT server = GPRTASServer(gpr_acq=acquisitions.standard_deviation, config=config, random_seed=random_seed, port=port) server.run_service()