from time import sleep
from ....app.tas.server import PORT_DEFAULT
from ....lib.communication import Client
[docs]
class TASClient(Client):
"""Base class for a client in a TAS application.
It implements all actions from the communication protocol.
"""
def __init__(self, *, sleeping_time_busy=1, port=PORT_DEFAULT, ip_server="localhost"):
super().__init__('ARIANE', port, ip_server)
self.sleeping_time_busy = sleeping_time_busy
def ping(self):
data = self._busy_loop(action='ping', data={})
return data['method'], data['version']
def reset(self, mode, axes, offset, limits, level_backgr=None, thresh_intens=None,
travel_cost_max=None, scenario_name=None):
data = {'mode': mode,
'axes': axes,
'offset': offset,
'limits': limits,
'level_backgr': level_backgr,
'thresh_intens': thresh_intens,
'travel_cost_max': travel_cost_max,
'scenario_name': scenario_name}
self._busy_loop(action='reset', data=data)
def result(self, locs, counts, matrices_ellipses, travel_time, counting_time,
travel_cost_grid=None, travel_cost_values=None):
data = {'locs': locs,
'counts': counts,
'matrices_ellipses': matrices_ellipses,
'travel_time': travel_time,
'counting_time': counting_time,
'travel_cost_grid': travel_cost_grid,
'travel_cost_values': travel_cost_values}
self._busy_loop(action='result', data=data)
def next_loc(self):
data = self._busy_loop(action='next_loc', data={})
return data['loc'], data['stop']
def heuris_experi_param(self):
data = self._busy_loop(action='heuris_experi_param', data={})
return {k: v for k, v in data.items() if k != 'success'}
def problem_locs(self, locs, matrices_ellipses):
data = {'locs': locs,
'matrices_ellipses': matrices_ellipses}
self._busy_loop(action='problem_locs', data=data)
def state_internal(self, data):
return self._busy_loop(action='state_internal', data=data)
def stop(self):
self._busy_loop(action='stop', data={})
def _busy_loop(self, action, data):
"""Send `action` (with `data`) to the server.
Sending is tried in a loop every `self.sleeping_time_busy` seconds as long as
the server is busy. Once the server is responsive, this function exits the loop.
"""
while(True):
self.send(action, data)
action_r, data_r = self.receive()
assert action_r == action
if 'busy' in data_r and data_r['busy']:
self.logger.info("Server busy...")
sleep(self.sleeping_time_busy)
continue
else:
break
return data_r