Source code for ariane.app.tas.client.stubs

from . import GPRTASClient
from ..utils import qe2x, x2qe
from ....app.tas.server import PORT_DEFAULT
from ....lib.utils.math import cartesian_product, make_grid

import numpy as np
from scipy.spatial.distance import cdist


[docs] class GPRTASClientStub(GPRTASClient): """Class imitating a `GPRTASClient`.""" def __init__(self, *, intensity_function, scenario_name=None, axes=None, offset=None, limits=None, metric=None, generate_travel_costs=True, angle_map=None, counting_time=60., num_rows_cols_grid_init=11, level_backgr=None, thresh_intens=None, radius_cons_area=0.025, level_backgr_sim=0, with_noise=False, random_state=None, sleeping_time_busy=1, port=PORT_DEFAULT): super().__init__(sleeping_time_busy=sleeping_time_busy, port=port) self.logger.info(f"Start {self.__class__.__name__} for scenario '{scenario_name}'...") self.intensity_function = intensity_function self.scenario_name = scenario_name self.axes = np.asarray(axes) if axes is not None \ else np.array([[1, 0, 0, 0], [0, 0, 0, 1]]) self.offset = np.asarray(offset) if offset is not None \ else np.array([0, 0, 0, 0]) self.limits = np.asarray(limits) if limits is not None \ else np.array([[0, 1], [0, 1]]) assert len(self.axes) == len(self.limits) assert np.shape(self.axes)[1] == len(self.offset) self.metric = metric self.generate_travel_costs = generate_travel_costs self.angle_map = angle_map self.counting_time = counting_time self.num_rows_cols_grid_init = num_rows_cols_grid_init | 1 # force odd integer value self.level_backgr = level_backgr self.thresh_intens = thresh_intens # compute ellipse matrix for a circle with radius `radius_cons_area` # on a normalized grid used for each measurement location. M = np.diag(1 / (radius_cons_area * (self.limits[:, 1] - self.limits[:, 0]))) self.matrix_consumed_ellipses = (M.T @ M).tolist() self.level_backgr_sim = level_backgr_sim self.with_noise = with_noise self.random_state = random_state or np.random.RandomState() self.dim = len(self.axes) if self.metric is not None: x_corners = cartesian_product(*self.limits) if self.angle_map is not None: # take only those corner points reachable by the instrument x_corners = x_corners[[ self.angle_map.is_valid(x2qe(x_corner, self.axes, self.offset)) for x_corner in x_corners]] # compute maximum of travel costs between the corner points self.travel_cost_max = np.max(cdist(x_corners, x_corners, metric=self.metric)) self.travel_cost_grid_gen = TravelCostGridGenerator( metric=self.metric, limits=self.limits, num=30) \ if generate_travel_costs else None else: self.travel_cost_max = None self.travel_cost_grid_gen = None self.logger.info(f"{self.__class__.__name__} started.") def _make_intensity_observation(self, x): # if `self.level_backgr_sim` is callable, normalize `x` and call it backgr = self.level_backgr_sim((x - self.limits[:, 0]) / (self.limits[:, 1] - self.limits[:, 0])) \ if callable(self.level_backgr_sim) else self.level_backgr_sim intens = self.intensity_function(x) if self.with_noise: backgr = self.random_state.poisson(lam=backgr) intens = self.random_state.poisson(lam=intens) return backgr + intens
[docs] def run(self, measm_pts_max=200): """Run the client. Initialize the client, ask the server for a total of `meas_pts_max` measurement locations, and finalize. Parameters ---------- measm_pts_max : int, optional Maximum number of measurement points. The default is 200. """ # ping method, version = self.ping() self.logger.info(f"Method: {method}") self.logger.info(f"Version: {version}") # initialize self.initialize() if len(self.locations) > measm_pts_max: raise Exception('number of initial points exceeds the maximum specifiedn') # next_loc <--> result num_iter = measm_pts_max - len(self.locations) for _ in range(num_iter): _, _, stop = self.next(with_stop=True) # # plot internal state of GPR TAS server # from ....lib.plotting import plot_interpolation_2d # import matplotlib.pyplot as plt # num = 100 # grid, means, stds = self.state_internal(num) # plot_interpolation_2d(grid, means) # plot_interpolation_2d(grid, stds) # plt.show() if stop: break # stop self.finalize() self.logger.info("Run finished.")
def initialize(self): # setup lists for collection of locations and intensities self.locations = [] self.intensities = [] # reset self.reset('single', self.axes.tolist(), self.offset.tolist(), self.limits.tolist(), self.level_backgr, self.thresh_intens, travel_cost_max=self.travel_cost_max if self.travel_cost_grid_gen is not None else None, scenario_name=self.scenario_name) self.logger.info("Initialize...") x1_0 = self.limits[0, 0] x2_0 = self.limits[1, 0] dx1 = (self.limits[0, 1] - self.limits[0, 0]) / (self.num_rows_cols_grid_init - 1) dx2 = (self.limits[1, 1] - self.limits[1, 0]) / (self.num_rows_cols_grid_init - 1) pos_init = [] # create initialization grid for j in range(0, self.num_rows_cols_grid_init): irange = range(0, self.num_rows_cols_grid_init) if j % 2 == 0 \ else range(self.num_rows_cols_grid_init - 1, -1, -1) for i in irange: if i % 2 == j % 2: x1 = x1_0 + i * dx1 x2 = x2_0 + j * dx2 # x1 += rng.uniform(-0.5*dx1 if i > 0 else 0, 0.5*dx1 if i < n1-1 else 0) # x2 += rng.uniform(-0.5*dx2 if j > 0 else 0, 0.5*dx2 if j < n2-1 else 0) pos_init.append((x1 * self.axes[0] + x2 * self.axes[1] + self.offset).tolist()) pos_init = np.array(pos_init) if self.angle_map is not None: # take only those positions reachable by the instrument pos_init = pos_init[[self.angle_map.is_valid(pos) for pos in pos_init]] x_init = qe2x(pos_init, self.axes, self.offset) y_init = [self._make_intensity_observation(x) for x in x_init] cts_init = list(zip(y_init, np.ones(len(y_init)))) travel_times_init = ([0] + [self.metric(x1, x2) for x1, x2 in zip(x_init[:-1], x_init[1:])]) \ if self.metric is not None else (len(x_init) * [0]) self.locations += x_init.tolist() self.intensities += y_init # send initialization points for i, (x, cts, travel_time) in enumerate(zip(x_init, cts_init, travel_times_init)): travel_cost_grid = None travel_cost_values = None # send travel cost grid for last initialization point if i == len(x_init) - 1 and self.travel_cost_grid_gen is not None: travel_cost_grid, travel_cost_values = self.travel_cost_grid_gen.generate(x) travel_cost_grid = travel_cost_grid.tolist() travel_cost_values = travel_cost_values.tolist() self.result(locs=[x.tolist()], counts=[cts], matrices_ellipses=[self.matrix_consumed_ellipses], travel_time=travel_time, counting_time=self.counting_time, travel_cost_grid=travel_cost_grid, travel_cost_values=travel_cost_values) self.logger.info("Initialization finished.") # request heuristically computed experiment parameters self.logger.info("Request heuristically computed experiment parameters...") heuris_experi_param = self.heuris_experi_param() self.logger.info(f"Heuristically computed experimental parameters: {heuris_experi_param}") return self.locations, self.intensities def next(self, with_stop=False): stop = False while (True): # next_loc self.logger.info("Request next location...") x_next, stop = self.next_loc() self.logger.info(f"Received next location: {x_next}") if self.angle_map is not None \ and not self.angle_map.is_valid(x2qe(x_next, self.axes, self.offset)): # problem_locs self.logger.info("Next location not valid.") self.logger.info("Send problem location to server...") self.problem_locs(locs=[x_next], matrices_ellipses=[self.matrix_consumed_ellipses]) else: break # result self.logger.info("Compute result...") I_obs_next = self._make_intensity_observation(x_next) self.logger.info("Result computed.") cts_next = [I_obs_next, 1] travel_time_next = self.metric(self.locations[-1], x_next) \ if self.metric is not None else 0 self.locations.append(x_next) self.intensities.append(I_obs_next) travel_cost_grid = None travel_cost_values = None if self.travel_cost_grid_gen is not None: travel_cost_grid, travel_cost_values = self.travel_cost_grid_gen.generate(x_next) travel_cost_grid = travel_cost_grid.tolist() travel_cost_values = travel_cost_values.tolist() self.result(locs=[x_next], counts=[cts_next], matrices_ellipses=[self.matrix_consumed_ellipses], travel_time=travel_time_next, counting_time=self.counting_time, travel_cost_grid=travel_cost_grid, travel_cost_values=travel_cost_values) self.logger.info("Result sent.") return (self.locations[-1], self.intensities[-1]) \ if with_stop is False else (self.locations[-1], self.intensities[-1], stop) def finalize(self): self.logger.info("Stop server...") self.stop() self.logger.info("Server stopped.")
class TravelCostGridGenerator(): """Class for generating travel costs on a grid inside `limits` w.r.t. `metric`.""" def __init__(self, metric, limits, num=5): self.metric = metric self.limits = np.asarray(limits) self.grid = make_grid(self.limits, num=num) def generate(self, current_x): """Generate the grid and compute corresponding travel costs. Parameters ---------- current_x : array_like 1-D Array with current location from which the costs are supposed to be computed. Returns ------- grid : ndarray of float 2-D array with grid locations. costs : ndarray of float 1-D array with costs corresponding to locations in `grid`. """ # check that `current_x` lies within the given `limits` assert self.limits[0, 0] <= current_x[0] <= self.limits[0, 1] assert self.limits[1, 0] <= current_x[1] <= self.limits[1, 1] costs = cdist([current_x], self.grid, metric=self.metric)[0] return self.grid, costs