from . import GPRTASClient
from ..utils import qe2x, x2qe
from ....app.tas.server import PORT_DEFAULT
from ....lib.utils.math import cartesian_product, make_grid
import numpy as np
from scipy.spatial.distance import cdist
[docs]
class GPRTASClientStub(GPRTASClient):
"""Class imitating a `GPRTASClient`."""
def __init__(self, *, intensity_function, scenario_name=None,
axes=None, offset=None, limits=None,
metric=None, generate_travel_costs=True, angle_map=None,
counting_time=60., num_rows_cols_grid_init=11,
level_backgr=None, thresh_intens=None, radius_cons_area=0.025,
level_backgr_sim=0, with_noise=False, random_state=None,
sleeping_time_busy=1, port=PORT_DEFAULT):
super().__init__(sleeping_time_busy=sleeping_time_busy, port=port)
self.logger.info(f"Start {self.__class__.__name__} for scenario '{scenario_name}'...")
self.intensity_function = intensity_function
self.scenario_name = scenario_name
self.axes = np.asarray(axes) if axes is not None \
else np.array([[1, 0, 0, 0], [0, 0, 0, 1]])
self.offset = np.asarray(offset) if offset is not None \
else np.array([0, 0, 0, 0])
self.limits = np.asarray(limits) if limits is not None \
else np.array([[0, 1], [0, 1]])
assert len(self.axes) == len(self.limits)
assert np.shape(self.axes)[1] == len(self.offset)
self.metric = metric
self.generate_travel_costs = generate_travel_costs
self.angle_map = angle_map
self.counting_time = counting_time
self.num_rows_cols_grid_init = num_rows_cols_grid_init | 1 # force odd integer value
self.level_backgr = level_backgr
self.thresh_intens = thresh_intens
# compute ellipse matrix for a circle with radius `radius_cons_area`
# on a normalized grid used for each measurement location.
M = np.diag(1 / (radius_cons_area * (self.limits[:, 1] - self.limits[:, 0])))
self.matrix_consumed_ellipses = (M.T @ M).tolist()
self.level_backgr_sim = level_backgr_sim
self.with_noise = with_noise
self.random_state = random_state or np.random.RandomState()
self.dim = len(self.axes)
if self.metric is not None:
x_corners = cartesian_product(*self.limits)
if self.angle_map is not None:
# take only those corner points reachable by the instrument
x_corners = x_corners[[
self.angle_map.is_valid(x2qe(x_corner, self.axes, self.offset))
for x_corner in x_corners]]
# compute maximum of travel costs between the corner points
self.travel_cost_max = np.max(cdist(x_corners, x_corners, metric=self.metric))
self.travel_cost_grid_gen = TravelCostGridGenerator(
metric=self.metric, limits=self.limits, num=30) \
if generate_travel_costs else None
else:
self.travel_cost_max = None
self.travel_cost_grid_gen = None
self.logger.info(f"{self.__class__.__name__} started.")
def _make_intensity_observation(self, x):
# if `self.level_backgr_sim` is callable, normalize `x` and call it
backgr = self.level_backgr_sim((x - self.limits[:, 0]) / (self.limits[:, 1] - self.limits[:, 0])) \
if callable(self.level_backgr_sim) else self.level_backgr_sim
intens = self.intensity_function(x)
if self.with_noise:
backgr = self.random_state.poisson(lam=backgr)
intens = self.random_state.poisson(lam=intens)
return backgr + intens
[docs]
def run(self, measm_pts_max=200):
"""Run the client.
Initialize the client, ask the server for a total of `meas_pts_max` measurement
locations, and finalize.
Parameters
----------
measm_pts_max : int, optional
Maximum number of measurement points. The default is 200.
"""
# ping
method, version = self.ping()
self.logger.info(f"Method: {method}")
self.logger.info(f"Version: {version}")
# initialize
self.initialize()
if len(self.locations) > measm_pts_max:
raise Exception('number of initial points exceeds the maximum specifiedn')
# next_loc <--> result
num_iter = measm_pts_max - len(self.locations)
for _ in range(num_iter):
_, _, stop = self.next(with_stop=True)
# # plot internal state of GPR TAS server
# from ....lib.plotting import plot_interpolation_2d
# import matplotlib.pyplot as plt
# num = 100
# grid, means, stds = self.state_internal(num)
# plot_interpolation_2d(grid, means)
# plot_interpolation_2d(grid, stds)
# plt.show()
if stop:
break
# stop
self.finalize()
self.logger.info("Run finished.")
def initialize(self):
# setup lists for collection of locations and intensities
self.locations = []
self.intensities = []
# reset
self.reset('single', self.axes.tolist(), self.offset.tolist(), self.limits.tolist(),
self.level_backgr, self.thresh_intens,
travel_cost_max=self.travel_cost_max if self.travel_cost_grid_gen is not None else None,
scenario_name=self.scenario_name)
self.logger.info("Initialize...")
x1_0 = self.limits[0, 0]
x2_0 = self.limits[1, 0]
dx1 = (self.limits[0, 1] - self.limits[0, 0]) / (self.num_rows_cols_grid_init - 1)
dx2 = (self.limits[1, 1] - self.limits[1, 0]) / (self.num_rows_cols_grid_init - 1)
pos_init = []
# create initialization grid
for j in range(0, self.num_rows_cols_grid_init):
irange = range(0, self.num_rows_cols_grid_init) if j % 2 == 0 \
else range(self.num_rows_cols_grid_init - 1, -1, -1)
for i in irange:
if i % 2 == j % 2:
x1 = x1_0 + i * dx1
x2 = x2_0 + j * dx2
# x1 += rng.uniform(-0.5*dx1 if i > 0 else 0, 0.5*dx1 if i < n1-1 else 0)
# x2 += rng.uniform(-0.5*dx2 if j > 0 else 0, 0.5*dx2 if j < n2-1 else 0)
pos_init.append((x1 * self.axes[0] + x2 * self.axes[1] + self.offset).tolist())
pos_init = np.array(pos_init)
if self.angle_map is not None:
# take only those positions reachable by the instrument
pos_init = pos_init[[self.angle_map.is_valid(pos) for pos in pos_init]]
x_init = qe2x(pos_init, self.axes, self.offset)
y_init = [self._make_intensity_observation(x) for x in x_init]
cts_init = list(zip(y_init, np.ones(len(y_init))))
travel_times_init = ([0] + [self.metric(x1, x2) for x1, x2 in zip(x_init[:-1], x_init[1:])]) \
if self.metric is not None else (len(x_init) * [0])
self.locations += x_init.tolist()
self.intensities += y_init
# send initialization points
for i, (x, cts, travel_time) in enumerate(zip(x_init, cts_init, travel_times_init)):
travel_cost_grid = None
travel_cost_values = None
# send travel cost grid for last initialization point
if i == len(x_init) - 1 and self.travel_cost_grid_gen is not None:
travel_cost_grid, travel_cost_values = self.travel_cost_grid_gen.generate(x)
travel_cost_grid = travel_cost_grid.tolist()
travel_cost_values = travel_cost_values.tolist()
self.result(locs=[x.tolist()], counts=[cts], matrices_ellipses=[self.matrix_consumed_ellipses],
travel_time=travel_time, counting_time=self.counting_time,
travel_cost_grid=travel_cost_grid, travel_cost_values=travel_cost_values)
self.logger.info("Initialization finished.")
# request heuristically computed experiment parameters
self.logger.info("Request heuristically computed experiment parameters...")
heuris_experi_param = self.heuris_experi_param()
self.logger.info(f"Heuristically computed experimental parameters: {heuris_experi_param}")
return self.locations, self.intensities
def next(self, with_stop=False):
stop = False
while (True):
# next_loc
self.logger.info("Request next location...")
x_next, stop = self.next_loc()
self.logger.info(f"Received next location: {x_next}")
if self.angle_map is not None \
and not self.angle_map.is_valid(x2qe(x_next, self.axes, self.offset)):
# problem_locs
self.logger.info("Next location not valid.")
self.logger.info("Send problem location to server...")
self.problem_locs(locs=[x_next],
matrices_ellipses=[self.matrix_consumed_ellipses])
else:
break
# result
self.logger.info("Compute result...")
I_obs_next = self._make_intensity_observation(x_next)
self.logger.info("Result computed.")
cts_next = [I_obs_next, 1]
travel_time_next = self.metric(self.locations[-1], x_next) \
if self.metric is not None else 0
self.locations.append(x_next)
self.intensities.append(I_obs_next)
travel_cost_grid = None
travel_cost_values = None
if self.travel_cost_grid_gen is not None:
travel_cost_grid, travel_cost_values = self.travel_cost_grid_gen.generate(x_next)
travel_cost_grid = travel_cost_grid.tolist()
travel_cost_values = travel_cost_values.tolist()
self.result(locs=[x_next], counts=[cts_next], matrices_ellipses=[self.matrix_consumed_ellipses],
travel_time=travel_time_next, counting_time=self.counting_time,
travel_cost_grid=travel_cost_grid, travel_cost_values=travel_cost_values)
self.logger.info("Result sent.")
return (self.locations[-1], self.intensities[-1]) \
if with_stop is False else (self.locations[-1], self.intensities[-1], stop)
def finalize(self):
self.logger.info("Stop server...")
self.stop()
self.logger.info("Server stopped.")
class TravelCostGridGenerator():
"""Class for generating travel costs on a grid inside `limits` w.r.t. `metric`."""
def __init__(self, metric, limits, num=5):
self.metric = metric
self.limits = np.asarray(limits)
self.grid = make_grid(self.limits, num=num)
def generate(self, current_x):
"""Generate the grid and compute corresponding travel costs.
Parameters
----------
current_x : array_like
1-D Array with current location from which the costs are supposed to be computed.
Returns
-------
grid : ndarray of float
2-D array with grid locations.
costs : ndarray of float
1-D array with costs corresponding to locations in `grid`.
"""
# check that `current_x` lies within the given `limits`
assert self.limits[0, 0] <= current_x[0] <= self.limits[0, 1]
assert self.limits[1, 0] <= current_x[1] <= self.limits[1, 1]
costs = cdist([current_x], self.grid, metric=self.metric)[0]
return self.grid, costs