From 7e7255eed10d227154cd746614642d0322ada755 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florent=20Altch=C3=A9?= Date: Wed, 26 Aug 2020 15:22:58 +0100 Subject: [PATCH] Export typing annotations when available. PiperOrigin-RevId: 328527159 --- physics_planning_games/README.md | 108 ++++ .../board_games/__init__.py | 73 +++ .../board_games/_internal/arenas.py | 164 ++++++ .../board_games/_internal/boards.py | 295 ++++++++++ .../board_games/_internal/goboard_7x7.png | Bin 0 -> 7077 bytes .../board_games/_internal/observations.py | 118 ++++ .../board_games/_internal/pieces.py | 169 ++++++ .../board_games/_internal/pieces_test.py | 67 +++ .../board_games/_internal/registry.py | 36 ++ .../board_games/_internal/tags.py | 23 + .../board_games/board_games_test.py | 39 ++ physics_planning_games/board_games/go.py | 154 +++++ .../board_games/go_logic.py | 526 ++++++++++++++++++ .../board_games/go_logic_test.py | 145 +++++ .../board_games/jaco_arm_board_game.py | 135 +++++ .../board_games/logic_base.py | 116 ++++ .../board_games/tic_tac_toe.py | 110 ++++ .../board_games/tic_tac_toe_logic.py | 265 +++++++++ .../board_games/tic_tac_toe_logic_test.py | 204 +++++++ physics_planning_games/explore.py | 66 +++ physics_planning_games/mujoban/__init__.py | 19 + physics_planning_games/mujoban/boxoban.py | 101 ++++ physics_planning_games/mujoban/mujoban.py | 451 +++++++++++++++ .../mujoban/mujoban_level.py | 140 +++++ .../mujoban/mujoban_level_test.py | 53 ++ physics_planning_games/mujoban/mujoban_pad.py | 126 +++++ .../mujoban/mujoban_test.py | 75 +++ physics_planning_games/mujoban/props.py | 62 +++ physics_planning_games/requirements.txt | 6 + 29 files changed, 3846 insertions(+) create mode 100644 physics_planning_games/README.md create mode 100644 physics_planning_games/board_games/__init__.py create mode 100644 physics_planning_games/board_games/_internal/arenas.py create mode 100644 physics_planning_games/board_games/_internal/boards.py create mode 100644 physics_planning_games/board_games/_internal/goboard_7x7.png create mode 100644 physics_planning_games/board_games/_internal/observations.py create mode 100644 physics_planning_games/board_games/_internal/pieces.py create mode 100644 physics_planning_games/board_games/_internal/pieces_test.py create mode 100644 physics_planning_games/board_games/_internal/registry.py create mode 100644 physics_planning_games/board_games/_internal/tags.py create mode 100644 physics_planning_games/board_games/board_games_test.py create mode 100644 physics_planning_games/board_games/go.py create mode 100644 physics_planning_games/board_games/go_logic.py create mode 100644 physics_planning_games/board_games/go_logic_test.py create mode 100644 physics_planning_games/board_games/jaco_arm_board_game.py create mode 100644 physics_planning_games/board_games/logic_base.py create mode 100644 physics_planning_games/board_games/tic_tac_toe.py create mode 100644 physics_planning_games/board_games/tic_tac_toe_logic.py create mode 100644 physics_planning_games/board_games/tic_tac_toe_logic_test.py create mode 100644 physics_planning_games/explore.py create mode 100644 physics_planning_games/mujoban/__init__.py create mode 100644 physics_planning_games/mujoban/boxoban.py create mode 100644 physics_planning_games/mujoban/mujoban.py create mode 100644 physics_planning_games/mujoban/mujoban_level.py create mode 100644 physics_planning_games/mujoban/mujoban_level_test.py create mode 100644 physics_planning_games/mujoban/mujoban_pad.py create mode 100644 physics_planning_games/mujoban/mujoban_test.py create mode 100644 physics_planning_games/mujoban/props.py create mode 100644 physics_planning_games/requirements.txt diff --git a/physics_planning_games/README.md b/physics_planning_games/README.md new file mode 100644 index 0000000..75ec466 --- /dev/null +++ b/physics_planning_games/README.md @@ -0,0 +1,108 @@ +# Physically Embedded Planning Environments + +This repository contains the three environments introduced in +'Physically Embedded Planning Problems: New Challenges for Reinforcement +Learning' + +If you use this package, please cite our accompanying [tech report]: + +``` +@misc{, + title={Physically Embedded Planning Problems: New Challenges for + Reinforcement Learning}, + author={Mehdi Mirza, Andrew Jaegle, Jonathan J. Hunt, Arthur Guez, + Saran Tunyasuvunakool, Alistair Muldal, Théophane Weber, + Peter Karkus, Sébastien Racanière, Lars Buesing, + Timothy Lillicrap, Nicolas Heess}, + year={2020}, + eprint={}, + archivePrefix={arXiv}, + primaryClass={cs.RO} +} +``` + +## Requirements and Installation + +This repository is divided into 'mujoban' and 'board_games' folders. +Both of them are built on top of [dm_control] which requires MuJoCo. Please +follow [these] instructions to install MuJoCo. +Other dependencies can be installed +by: +``` +pip3 install -r requirements.txt +``` + +### Board games +The game logic is based on [open_spiel]. Please install as instructed [here]. +[gnugo] is required to play the game of Go against a non-random opponent. [gnugo] can be installed in Ubuntu by: +``` +apt install gnugo +``` +. Board game scripts except gnugo binary to be at: `/usr/games/gnugo` +## Example usage + +The code snippets below show examples of instantiating each of the environments. + +### Mujoban + +```python +from dm_control import composer +from dm_control.locomotion import walkers +from physics_planning_games.mujoban.mujoban import Mujoban +from physics_planning_games.mujoban.mujoban_level import MujobanLevel +from physics_planning_games.mujoban.boxoban import boxoban_level_generator + +walker = walkers.JumpingBallWithHead(add_ears=True, camera_height=0.25) +maze = MujobanLevel(boxoban_level_generator) +task = Mujoban(walker=walker, + maze=maze, + control_timestep=0.1, + top_camera_height=96, + top_camera_width=96) +env = composer.Environment(time_limit=1000, task=task) +``` + +### Board games + +```python +from physics_planning_games import board_games + +environment_name = 'go_7x7' +env = board_games.load(environment_name=environment_name) +``` + +### Stepping through environment. + +The returned environments are of type of `dm_env.Environment` and can be stepped +through as shown here with random actions: + +```python +import numpy as np + +timestep = env.reset() +action_spec = env.action_spec() +while True: + action = np.stack([ + np.random.uniform(low=minimum, high=maximum) + for minimum, maximum in zip(action_spec.minimum, action_spec.maximum) + ]) + timestep = env.step(action) +``` + +### Visualization + +For visualization of the environments `explore.py` loads them using the [viewer] +from [dm_control]. + +## More details + +For more details please refer to the [tech report], [dm_control] and [dm_env]. + +[tech report]: https://arxiv.org/abs/ +[dm_control]: https://github.com/deepmind/dm_control +[dm_env]: https://github.com/deepmind/dm_env +[gnugo]: https://www.gnu.org/software/gnugo/ +[open_spiel]: https://github.com/deepmind/open_spiel +[here]: https://github.com/deepmind/open_spiel/blob/master/docs/install.md +[these]: https://github.com/deepmind/dm_control#requirements-and-installation +[viewer]: https://github.com/deepmind/dm_control/tree/master/dm_control/viewer diff --git a/physics_planning_games/board_games/__init__.py b/physics_planning_games/board_games/__init__.py new file mode 100644 index 0000000..b5dfc42 --- /dev/null +++ b/physics_planning_games/board_games/__init__.py @@ -0,0 +1,73 @@ +# Copyright 2020 DeepMind Technologies Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +"""Physically-grounded board game environments.""" + +from dm_control import composer as _composer + +from physics_planning_games.board_games import go as _go +from physics_planning_games.board_games import tic_tac_toe as _tic_tac_toe +from physics_planning_games.board_games._internal import registry as _registry + +_registry.done_importing_tasks() + +ALL = tuple(_registry.get_all_names()) +TAGS = tuple(_registry.get_tags()) + + +def get_environments_by_tag(tag): + """Returns the names of all environments matching a given tag. + + Args: + tag: A string from `TAGS`. + + Returns: + A tuple of environment names. + """ + return tuple(_registry.get_names_by_tag(tag)) + + +def load(environment_name, + env_kwargs=None, + seed=None, + time_limit=float('inf'), + strip_singleton_obs_buffer_dim=False): + """Loads an environment from board_games. + + Args: + environment_name: String, the name of the environment to load. Must be in + `ALL`. + env_kwargs: extra params to pass to task creation. + seed: Optional, either an int seed or an `np.random.RandomState` + object. If None (default), the random number generator will self-seed + from a platform-dependent source of entropy. + time_limit: (optional) A float, the time limit in seconds beyond which an + episode is forced to terminate. + strip_singleton_obs_buffer_dim: (optional) A boolean, if `True`, + the array shape of observations with `buffer_size == 1` will not have a + leading buffer dimension. + + Returns: + An instance of `composer.Environment`. + """ + if env_kwargs is not None: + task = _registry.get_constructor(environment_name)(**env_kwargs) + else: + task = _registry.get_constructor(environment_name)() + return _composer.Environment( + task=task, + time_limit=time_limit, + strip_singleton_obs_buffer_dim=strip_singleton_obs_buffer_dim, + random_state=seed) diff --git a/physics_planning_games/board_games/_internal/arenas.py b/physics_planning_games/board_games/_internal/arenas.py new file mode 100644 index 0000000..ab42df5 --- /dev/null +++ b/physics_planning_games/board_games/_internal/arenas.py @@ -0,0 +1,164 @@ +# Copyright 2020 DeepMind Technologies Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +"""Board game-specific arena classes.""" + + +from dm_control import composer +from dm_control.composer.observation import observable +from dm_control.mujoco import wrapper + +# Robot geoms will be assigned to this group in order to disable their +# visibility to the top-down camera. +ROBOT_GEOM_GROUP = 1 + + +class Standard(composer.Arena): + """"Board game-specific arena class.""" + + def _build(self, name=None): + """Initializes this arena. + + Args: + name: (optional) A string, the name of this arena. If `None`, use the + model name defined in the MJCF file. + """ + super(Standard, self)._build(name=name) + + # Add visual assets. + self.mjcf_model.asset.add( + 'texture', + type='skybox', + builtin='gradient', + rgb1=(0.4, 0.6, 0.8), + rgb2=(0., 0., 0.), + width=100, + height=100) + groundplane_texture = self.mjcf_model.asset.add( + 'texture', + name='groundplane', + type='2d', + builtin='checker', + rgb1=(0.2, 0.3, 0.4), + rgb2=(0.1, 0.2, 0.3), + width=300, + height=300, + mark='edge', + markrgb=(.8, .8, .8)) + groundplane_material = self.mjcf_model.asset.add( + 'material', + name='groundplane', + texture=groundplane_texture, + texrepeat=(5, 5), + texuniform='true', + reflectance=0.2) + + # Add ground plane. + self.mjcf_model.worldbody.add( + 'geom', + name='ground', + type='plane', + material=groundplane_material, + size=(1, 1, 0.1), + friction=(0.4,), + solimp=(0.95, 0.99, 0.001), + solref=(0.002, 1)) + + # Add lighting + self.mjcf_model.worldbody.add( + 'light', + pos=(0, 0, 1.5), + dir=(0, 0, -1), + diffuse=(0.7, 0.7, 0.7), + specular=(.3, .3, .3), + directional='false', + castshadow='true') + + # Add some fixed cameras to the arena. + self._front_camera = self.mjcf_model.worldbody.add( + 'camera', + name='front', + pos=(0., -0.6, 0.75), + xyaxes=(1., 0., 0., 0., 0.7, 0.75)) + + # Ensures a 7x7 go board fits into the view from camera + self._front_camera_2 = self.mjcf_model.worldbody.add( + 'camera', + name='front_2', + pos=(0., -0.65, 0.85), + xyaxes=(1., 0., 0., 0., 0.85, 0.6)) + + self._top_down_camera = self.mjcf_model.worldbody.add( + 'camera', + name='top_down', + pos=(0., 0., 0.5), + xyaxes=(1., 0., 0., 0., 1., 0.)) + + # Always initialize the free camera so that it points at the origin. + self.mjcf_model.statistic.center = (0., 0., 0.) + + def _build_observables(self): + return ArenaObservables(self) + + @property + def front_camera(self): + return self._front_camera + + @property + def front_camera_2(self): + return self._front_camera_2 + + @property + def top_down_camera(self): + return self._top_down_camera + + def attach_offset(self, entity, offset, attach_site=None): + """Attaches another entity at a position offset from the attachment site. + + Args: + entity: The `Entity` to attach. + offset: A length 3 array-like object representing the XYZ offset. + attach_site: (optional) The site to which to attach the entity's model. + If not set, defaults to self.attachment_site. + Returns: + The frame of the attached model. + """ + frame = self.attach(entity, attach_site=attach_site) + frame.pos = offset + return frame + + +class ArenaObservables(composer.Observables): + """Observables belonging to the arena.""" + + @composer.observable + def front_camera(self): + return observable.MJCFCamera(mjcf_element=self._entity.front_camera) + + @composer.observable + def front_camera_2(self): + return observable.MJCFCamera(mjcf_element=self._entity.front_camera_2) + + @composer.observable + def top_down_camera(self): + return observable.MJCFCamera(mjcf_element=self._entity.top_down_camera) + + @composer.observable + def top_down_camera_invisible_robot(self): + # Custom scene options for making robot geoms invisible. + robot_geoms_invisible = wrapper.MjvOption() + robot_geoms_invisible.geomgroup[ROBOT_GEOM_GROUP] = 0 + return observable.MJCFCamera(mjcf_element=self._entity.top_down_camera, + scene_option=robot_geoms_invisible) diff --git a/physics_planning_games/board_games/_internal/boards.py b/physics_planning_games/board_games/_internal/boards.py new file mode 100644 index 0000000..7032c1d --- /dev/null +++ b/physics_planning_games/board_games/_internal/boards.py @@ -0,0 +1,295 @@ +# Copyright 2020 DeepMind Technologies Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +"""Composer entities corresponding to game boards.""" + +import os + +from dm_control import composer +from dm_control import mjcf +import numpy as np + +from dm_control.utils import io as resources + +_TOUCH_THRESHOLD = 1e-3 # Activation threshold for touch sensors (N). + +# whether to display underlying sensors for Goboard (useful to align texture) +_SHOW_DEBUG_GRID = False +_TEXTURE_PATH = os.path.join(os.path.dirname(__file__), 'goboard_7x7.png') + + +def _make_checkerboard(rows, + columns, + square_halfwidth, + height=0.01, + sensor_size=0.7, + name='checkerboard'): + """Builds a checkerboard with touch sensors centered on each square.""" + root = mjcf.RootElement(model=name) + black_mat = root.asset.add('material', name='black', rgba=(0.2, 0.2, 0.2, 1)) + white_mat = root.asset.add('material', name='white', rgba=(0.8, 0.8, 0.8, 1)) + sensor_mat = root.asset.add('material', name='sensor', rgba=(0, 1, 0, 0.3)) + root.default.geom.set_attributes( + type='box', size=(square_halfwidth, square_halfwidth, height)) + root.default.site.set_attributes( + type='box', + size=(sensor_size * square_halfwidth,) * 2 + (0.5 * height,), + material=sensor_mat, group=composer.SENSOR_SITES_GROUP) + + xpos = (np.arange(columns) - 0.5*(columns - 1)) * 2 * square_halfwidth + ypos = (np.arange(rows) - 0.5*(rows - 1)) * 2 * square_halfwidth + geoms = [] + touch_sensors = [] + for i in range(rows): + for j in range(columns): + geom_mat = black_mat if ((i % 2) == (j % 2)) else white_mat + name = '{}_{}'.format(i, j) + geoms.append( + root.worldbody.add( + 'geom', + pos=(xpos[j], ypos[i], height), + name=name, + material=geom_mat)) + site = root.worldbody.add('site', pos=(xpos[j], ypos[i], 2*height), + name=name) + touch_sensors.append(root.sensor.add('touch', site=site, name=name)) + + return root, geoms, touch_sensors + + +def _make_goboard(boardsize, + square_halfwidth, + height=0.01, + sensor_size=0.7, + name='goboard'): + """Builds a Go with touch sensors centered on each intersection.""" + y_offset = -0.08 + rows = boardsize + columns = boardsize + root = mjcf.RootElement(model=name) + if _SHOW_DEBUG_GRID: + black_mat = root.asset.add('material', name='black', + rgba=(0.2, 0.2, 0.2, 0.5)) + white_mat = root.asset.add('material', name='white', + rgba=(0.8, 0.8, 0.8, 0.5)) + else: + transparent_mat = root.asset.add('material', name='intersection', + rgba=(0, 1, 0, 0.0)) + + sensor_mat = root.asset.add('material', name='sensor', rgba=(0, 1, 0, 0.3)) + + contents = resources.GetResource(_TEXTURE_PATH) + root.asset.add('texture', name='goboard', type='2d', + file=mjcf.Asset(contents, '.png')) + board_mat = root.asset.add( + 'material', name='goboard', texture='goboard', + texrepeat=[0.97, 0.97]) + + root.default.geom.set_attributes( + type='box', size=(square_halfwidth, square_halfwidth, height)) + root.default.site.set_attributes( + type='box', + size=(sensor_size * square_halfwidth,) * 2 + (0.5 * height,), + material=sensor_mat, group=composer.SENSOR_SITES_GROUP) + + board_height = height + if _SHOW_DEBUG_GRID: + board_height = 0.5*height + + root.worldbody.add( + 'geom', + pos=(0, 0+y_offset, height), + type='box', + size=(square_halfwidth * boardsize,) * 2 + (board_height,), + name=name, + material=board_mat) + + xpos = (np.arange(columns) - 0.5*(columns - 1)) * 2 * square_halfwidth + ypos = (np.arange(rows) - 0.5*(rows - 1)) * 2 * square_halfwidth + y_offset + geoms = [] + touch_sensors = [] + for i in range(rows): + for j in range(columns): + name = '{}_{}'.format(i, j) + if _SHOW_DEBUG_GRID: + transparent_mat = black_mat if ((i % 2) == (j % 2)) else white_mat + geoms.append( + root.worldbody.add( + 'geom', + pos=(xpos[j], ypos[i], height), + name=name, + material=transparent_mat)) + site = root.worldbody.add('site', pos=(xpos[j], ypos[i], 2*height), + name=name) + touch_sensors.append(root.sensor.add('touch', site=site, name=name)) + + pass_geom = root.worldbody.add( + 'geom', + pos=(0, y_offset, 0.0), + size=(square_halfwidth*boardsize*2, + square_halfwidth*boardsize) + (0.5 * height,), + name='pass', + material=transparent_mat) + site = root.worldbody.add('site', pos=(0, y_offset, 0.0), + size=(square_halfwidth*boardsize*2, + square_halfwidth*boardsize) + (0.5 * height,), + name='pass') + pass_sensor = root.sensor.add('touch', site=site, name='pass') + + return root, geoms, touch_sensors, pass_geom, pass_sensor + + +class CheckerBoard(composer.Entity): + """An entity representing a checkerboard.""" + + def __init__(self, *args, **kwargs): + super(CheckerBoard, self).__init__(*args, **kwargs) + self._contact_from_before_substep = None + + def _build(self, rows=3, columns=3, square_halfwidth=0.05): + """Builds a `CheckerBoard` entity. + + Args: + rows: Integer, the number of rows. + columns: Integer, the number of columns. + square_halfwidth: Float, the halfwidth of the squares on the board. + """ + root, geoms, touch_sensors = _make_checkerboard( + rows=rows, columns=columns, square_halfwidth=square_halfwidth) + self._mjcf_model = root + self._geoms = np.array(geoms).reshape(rows, columns) + self._touch_sensors = np.array(touch_sensors).reshape(rows, columns) + + @property + def mjcf_model(self): + return self._mjcf_model + + def before_substep(self, physics, random_state): + del random_state # Unused. + # Cache a copy of the array of active contacts before each substep. + self._contact_from_before_substep = physics.data.contact.copy() + + def validate_finger_touch(self, physics, row, col, hand): + # Geom for the board square + geom_id = physics.bind(self._geoms[row, col]).element_id + # finger geoms + finger_geoms_ids = set(physics.bind(hand.finger_geoms).element_id) + contacts = self._contact_from_before_substep + + set1, set2 = set([geom_id]), finger_geoms_ids + for contact in contacts: + finger_tile_contact = ((contact.geom1 in set1 and + contact.geom2 in set2) or + (contact.geom1 in set2 and contact.geom2 in set1)) + if finger_tile_contact: + return True + return False + + def get_contact_pos(self, physics, row, col): + geom_id = physics.bind(self._geoms[row, col]).element_id + # Here we use the array of active contacts from the previous substep, rather + # than the current values in `physics.data.contact`. This is because we use + # touch sensors to detect when a square on the board is being pressed, and + # the pressure readings are based on forces that were calculated at the end + # of the previous substep. It's possible that `physics.data.contact` no + # longer contains any active contacts involving the board geoms, even though + # the touch sensors are telling us that one of the squares on the board is + # being pressed. + contact = self._contact_from_before_substep + involves_geom = (contact.geom1 == geom_id) | (contact.geom2 == geom_id) + [relevant_contact_ids] = np.where(involves_geom) + if relevant_contact_ids.size: + # If there are multiple contacts involving this square of the board, just + # pick the first one. + return contact[relevant_contact_ids[0]].pos.copy() + else: + print("Touch sensor at ({},{}) doesn't have any active contacts!".format( + row, col)) + return False + + def get_contact_indices(self, physics): + pressures = physics.bind(self._touch_sensors.ravel()).sensordata + # If any of the touch sensors exceed the threshold, return the (row, col) + # indices of the most strongly activated sensor. + if np.any(pressures > _TOUCH_THRESHOLD): + return np.unravel_index(np.argmax(pressures), self._touch_sensors.shape) + else: + return None + + def sample_pos_inside_touch_sensor(self, physics, random_state, row, col): + bound_site = physics.bind(self._touch_sensors[row, col].site) + jitter = bound_site.size * np.array([1., 1., 0.]) + return bound_site.xpos + random_state.uniform(-jitter, jitter) + + +class GoBoard(CheckerBoard): + """An entity representing a Goboard.""" + + def _build(self, boardsize=7, square_halfwidth=0.05): + """Builds a `GoBoard` entity. + + Args: + boardsize: Integer, the size of the board (boardsize x boardsize). + square_halfwidth: Float, the halfwidth of the squares on the board. + """ + + if boardsize != 7: + raise ValueError('Only boardsize of 7x7 is implemented at the moment') + + root, geoms, touch_sensors, pass_geom, pass_sensor = _make_goboard( + boardsize=boardsize, square_halfwidth=square_halfwidth) + self._mjcf_model = root + self._geoms = np.array(geoms).reshape(boardsize, boardsize) + self._touch_sensors = np.array(touch_sensors).reshape(boardsize, boardsize) + self._pass_geom = pass_geom + self._pass_sensor = pass_sensor + + def get_contact_indices(self, physics): + pressures = physics.bind(self._touch_sensors.ravel()).sensordata + # Deal with pass first + pass_pressure = physics.bind(self._pass_sensor).sensordata + if pass_pressure > np.max(pressures) and pass_pressure > _TOUCH_THRESHOLD: + return -1, -1 + + # If any of the other touch sensors exceed the threshold, return the + # (row, col) indices of the most strongly activated sensor. + if np.any(pressures > _TOUCH_THRESHOLD): + return np.unravel_index(np.argmax(pressures), self._touch_sensors.shape) + else: + return None + + def validate_finger_touch(self, physics, row, col, hand): + # Geom for the board square + if row == -1 and col == -1: + geom_id = physics.bind(self._pass_geom).element_id + else: + geom_id = physics.bind(self._geoms[row, col]).element_id + # finger geoms + finger_geoms_ids = set(physics.bind(hand.finger_geoms).element_id) + contacts = self._contact_from_before_substep + + set1, set2 = set([geom_id]), finger_geoms_ids + for contact in contacts: + finger_tile_contact = ((contact.geom1 in set1 and + contact.geom2 in set2) or + (contact.geom1 in set2 and contact.geom2 in set1)) + if finger_tile_contact: + return True + return False + + def sample_pos_inside_touch_sensor(self, physics, random_state, row, col): + bound_site = physics.bind(self._touch_sensors[row, col].site) + jitter = bound_site.size * np.array([0.25, 0.25, 0.]) + return bound_site.xpos + random_state.uniform(-jitter, jitter) diff --git a/physics_planning_games/board_games/_internal/goboard_7x7.png b/physics_planning_games/board_games/_internal/goboard_7x7.png new file mode 100644 index 0000000000000000000000000000000000000000..5e5f1036b0b8e496adf1964d4d7bb2e3ddf14ad3 GIT binary patch literal 7077 zcmeHMZA?>V6z;|b3tyUr*$L`2!>zhm1({tPF4m7=8$UYd{%E6u3d1q4u3{*Q7$M*X zQ(V){5*hoQs}I5d|e#9k$J)uGPP;7Am5g^eGj5=z zI`8o7hW31L{N=ZAC`&x(S*j`yBLhFu6Pom@5!V|M^IyqGH+Q;Ps+>hR>{Fihh?cJV z*LG`}6yFrDE6xy=rS5cD%C+Va&-V1_o7wPdopI(qdvA7|Lt{?Ss9GEi_TEuuZ)9D@ zcMc8XZESHkbv?7kl@_2m_=acc9`+<#R@(}&GNpcaJPS>c$?sW2NZ^7UF zdyIl7D`CZ~Xnd`IvCmy4H3%2C&^NS5|2-Sg;3ub1K;bR(N5L;ERbt=fI^DpW8tf3k?F#*z*$k zsSM9g*aJ?yb~PRtJ$xBI!HZo86t^QrW-){l19+x(Bnb+?7{l{|nh$(@GekNt?!ny@ z#GM5r99?`egc?qP10x_45ct+KpSx5LcK|M~y^eo}9m}1%NeC)946p}JE&%WVZ^TbU zag5Y>$zpRRT|9@FF`61Q9k$_sKvDuXu3`<^moWkJj)1rsY=FE1Okf@n;F9cc3joh% z5|aec z17RFd0)3<*bH*BwI?uw7!7>lS6d;K2BpFIP+ZF}xQYzksP=UNj#BFCRC51{fDbQ^= zMmBXTXf!cZ0AyYhP=vW;DW6N2DR4+6ci~WX@<|AA9P)VC$fpCXAaLl3Ar{o3upoE5 z|2>dtvJA~O0s_IsfPYL8xTJiIXk^rEijZhh+(3T#lZ7xalCed^HUw{!xjAe~Srt(O zd`zkXur1adNlgf3ZdyM$)YG~Xl;*j`L(x1?Z~$sJKh@#+DW8KPKPlj`?kGbgtS~!Z zF^1<&lc@C3sw`spFsm71`BdgW*^Sw`H}=2y`N~Wep^T%qz4fbNTgxVo07Z165UzPH zpv2%|RD=CxB*mL*fFRlv0xBCOE8~4gm5sw%8}JTC`C;h~A=cPxe+DX^UnQVe@tu;B z6d|zI*PSZLDpLAc#t%V#ymZq3H4_(|AYiRp+llR^(3-lYuYg!|q)i|+I+I#dR8JbRZPid;@+f(lB zGA7#Yzqi|2Fff(r`kEn<6PgTlV;raqO`D0HE_5t?d$9NuIT~5Vm`IfZ*%)UKJau!1 z>KE{biXGNcqVd*&(fmON^X-mhvw7<~`3pPW*Ek@Mp-T;IL;{7a7u9?i0!3ONpQSr1g>fl~tf9nB9uG21=yQMm<|J=vRj!lDJ7|dud?7cWOa^2id zqM-d)e-f_ia0s&&$bn35bUhkd(TCq-BkE@i=&oq@gmm#BCIt>K0FHsJC^rIlBuarl zySWj-y^Uxu+cBZI4oF#+K*Oabe`~BB)(}8$Vi0NwAgR~?pFaBE>~Pt6ff>jHd} z6P@Y(<>d2@><5|PcjP|RNI%E~`_}aEzCulW3YsigUt}%ry8T3qeasHiA7|f%&u= len(markers): + raise RuntimeError( + _NO_MORE_MARKERS_AVAILABLE.format(move_count, player_id)) + bound_marker = physics.bind(markers[move_count]) + bound_marker.pos = pos + # TODO(alimuldal): Set orientation as well (random? same as contact frame?) + bound_marker.group = _VISIBLE_SITE_GROUP + self._move_counts[player_id] += 1 + + if bpos: + self._marker_ids[player_id][bpos[0]][bpos[1]] = move_count + + +class MarkersObservables(composer.Observables): + """Observables for a `Markers` entity.""" + + @composer.observable + def position(self): + """Cartesian positions of all marker sites. + + Returns: + An `observable.MJCFFeature` instance. When called with an instance of + `physics` as the argument, this will return a numpy float64 array of shape + (num_players * num_markers, 3) where each row contains the cartesian + position of a marker. Unplaced markers will have position (0, 0, 0). + """ + return observable.MJCFFeature( + 'xpos', list(itertools.chain.from_iterable(self._entity.markers))) diff --git a/physics_planning_games/board_games/_internal/pieces_test.py b/physics_planning_games/board_games/_internal/pieces_test.py new file mode 100644 index 0000000..d695836 --- /dev/null +++ b/physics_planning_games/board_games/_internal/pieces_test.py @@ -0,0 +1,67 @@ +# Copyright 2020 DeepMind Technologies Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +"""Tests for physics_planning_games.board_games._internal.pieces.py.""" + +from absl.testing import absltest +from dm_control import mjcf +import numpy as np + +from physics_planning_games.board_games._internal import pieces + + +class MarkersTest(absltest.TestCase): + + def test_position_observable(self): + num_per_player = 3 + markers = pieces.Markers(num_per_player=num_per_player) + physics = mjcf.Physics.from_mjcf_model(markers.mjcf_model) + all_positions = [ + [(0, 1, 2), (3, 4, 5), (6, 7, 8)], # Player 0 + [(-1, 2, -3), (4, -5, 6)], # Player 1 + ] + for player_id, positions in enumerate(all_positions): + for marker_pos in positions: + markers.mark(physics=physics, player_id=player_id, pos=marker_pos) + expected_positions = np.zeros((2, num_per_player, 3), dtype=np.double) + expected_positions[0, :len(all_positions[0])] = all_positions[0] + expected_positions[1, :len(all_positions[1])] = all_positions[1] + observed_positions = markers.observables.position(physics) + np.testing.assert_array_equal( + expected_positions.reshape(-1, 3), observed_positions) + + def test_invalid_player_id(self): + markers = pieces.Markers(num_per_player=5) + physics = mjcf.Physics.from_mjcf_model(markers.mjcf_model) + invalid_player_id = 99 + with self.assertRaisesWithLiteralMatch( + ValueError, pieces._INVALID_PLAYER_ID.format(1, 99)): + markers.mark(physics=physics, player_id=invalid_player_id, pos=(1, 2, 3)) + + def test_too_many_moves(self): + num_per_player = 5 + player_id = 0 + markers = pieces.Markers(num_per_player=num_per_player) + physics = mjcf.Physics.from_mjcf_model(markers.mjcf_model) + for _ in range(num_per_player): + markers.mark(physics=physics, player_id=player_id, pos=(1, 2, 3)) + with self.assertRaisesWithLiteralMatch( + RuntimeError, + pieces._NO_MORE_MARKERS_AVAILABLE.format(num_per_player, player_id)): + markers.mark(physics=physics, player_id=player_id, pos=(1, 2, 3)) + + +if __name__ == '__main__': + absltest.main() diff --git a/physics_planning_games/board_games/_internal/registry.py b/physics_planning_games/board_games/_internal/registry.py new file mode 100644 index 0000000..4fea6d0 --- /dev/null +++ b/physics_planning_games/board_games/_internal/registry.py @@ -0,0 +1,36 @@ +# Copyright 2020 DeepMind Technologies Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +"""A global registry of constructors for board game environments.""" + + +from dm_control.utils import containers + +_ALL_CONSTRUCTORS = containers.TaggedTasks(allow_overriding_keys=False) + +add = _ALL_CONSTRUCTORS.add +get_constructor = _ALL_CONSTRUCTORS.__getitem__ +get_all_names = _ALL_CONSTRUCTORS.keys +get_tags = _ALL_CONSTRUCTORS.tags +get_names_by_tag = _ALL_CONSTRUCTORS.tagged + +# This disables the check that prevents the same task constructor name from +# being added to the container more than once. This is done in order to allow +# individual task modules to be reloaded without also reloading `registry.py` +# first (e.g. when "hot-reloading" environments in domain explorer). + + +def done_importing_tasks(): + _ALL_CONSTRUCTORS.allow_overriding_keys = True diff --git a/physics_planning_games/board_games/_internal/tags.py b/physics_planning_games/board_games/_internal/tags.py new file mode 100644 index 0000000..3f8d4fd --- /dev/null +++ b/physics_planning_games/board_games/_internal/tags.py @@ -0,0 +1,23 @@ +# Copyright 2020 DeepMind Technologies Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +"""String constants used to annotate task constructors.""" + +FEATURES = 'features' +VISION = 'vision' + +EASY = 'easy' +MED = 'medium' +HARD = 'hard' diff --git a/physics_planning_games/board_games/board_games_test.py b/physics_planning_games/board_games/board_games_test.py new file mode 100644 index 0000000..82ffedf --- /dev/null +++ b/physics_planning_games/board_games/board_games_test.py @@ -0,0 +1,39 @@ +# Copyright 2020 DeepMind Technologies Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +"""Basic smoke test for board_games environments.""" + +from absl.testing import absltest +from dm_env import test_utils + +from physics_planning_games import board_games + + +class GoTest(test_utils.EnvironmentTestMixin, absltest.TestCase): + + def make_object_under_test(self): + return board_games.load(environment_name='go_7x7', seed=0) + + +class TicTacToeTest(test_utils.EnvironmentTestMixin, absltest.TestCase): + + def make_object_under_test(self): + return board_games.load( + environment_name='tic_tac_toe_mixture_opponent_markers_features', + seed=0) + + +if __name__ == '__main__': + absltest.main() diff --git a/physics_planning_games/board_games/go.py b/physics_planning_games/board_games/go.py new file mode 100644 index 0000000..742793f --- /dev/null +++ b/physics_planning_games/board_games/go.py @@ -0,0 +1,154 @@ +# Copyright 2020 DeepMind Technologies Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +"""A Go board game.""" + +from dm_control.composer.observation import observable +import numpy as np +from physics_planning_games.board_games import go_logic +from physics_planning_games.board_games import jaco_arm_board_game +from physics_planning_games.board_games._internal import boards +from physics_planning_games.board_games._internal import observations +from physics_planning_games.board_games._internal import pieces +from physics_planning_games.board_games._internal import registry +from physics_planning_games.board_games._internal import tags + +_BLACK = (0., 0., 0., 0.75) +_WHITE = (1., 1., 1., 0.75) + +_GO_PIECE_SIZE = 0.04 +_DEFAULT_OPPONENT_MIXTURE = 0.2 + + +class Go(jaco_arm_board_game.JacoArmBoardGame): + """Single-player Go of configurable size.""" + + def __init__(self, board_size, observation_settings, opponent=None, + reset_arm_after_move=True): + """Initializes a `Go` task. + + Args: + board_size: board size + observation_settings: An `observations.ObservationSettings` namedtuple + specifying configuration options for each category of observation. + opponent: Go opponent to use for the opponent player actions. + reset_arm_after_move: Whether to reset arm to random position after every + piece being placed on the board. + """ + game_logic = go_logic.GoGameLogic(board_size=board_size) + + if opponent is None: + opponent = go_logic.GoGTPOpponent(board_size=board_size, + mixture_p=_DEFAULT_OPPONENT_MIXTURE) + + self._last_valid_move_is_pass = False + super(Go, self).__init__(observation_settings=observation_settings, + opponent=opponent, + game_logic=game_logic, + board=boards.GoBoard(boardsize=board_size), + markers=pieces.Markers( + player_colors=(_BLACK, _WHITE), + halfwidth=_GO_PIECE_SIZE, + num_per_player=board_size*board_size*2, + observable_options=observations.make_options( + observation_settings, + observations.MARKER_OBSERVABLES), + board_size=board_size)) + self._reset_arm_after_move = reset_arm_after_move + # Add an observable exposing the move history (to reconstruct game states) + move_history_observable = observable.Generic( + lambda physics: self._game_logic.get_move_history()) + move_history_observable.configure( + **observation_settings.board_state._asdict()) + self._task_observables['move_history'] = move_history_observable + + @property + def name(self): + return 'Go' + + @property + def control_timestep(self): + return 0.05 + + def after_substep(self, physics, random_state): + if not self._made_move_this_step: + # which board square received the most contact pressure + indices = self._board.get_contact_indices(physics) + if not indices: + return + row, col = indices + # Makes sure that contact with that board square involved a finger + finger_touch = self._board.validate_finger_touch(physics, + row, col, self._hand) + if not finger_touch: + return + + pass_action = True if (row == -1 and col == -1) else False + if pass_action and self._last_valid_move_is_pass: + # Don't allow two passes in a row (otherwise hard to only pass once) + valid_move = False + else: + valid_move = self._game_logic.apply( + player=jaco_arm_board_game.SELF, + action=go_logic.GoMarkerAction(row=int(row), col=int(col), + pass_action=pass_action)) + + if valid_move: + self._made_move_this_step = True + if not pass_action: + self._last_valid_move_is_pass = False + marker_pos = self._board.get_contact_pos( + physics=physics, row=row, col=col) + self._markers.mark(physics=physics, + player_id=jaco_arm_board_game.SELF, + pos=marker_pos, + bpos=(row, col)) + else: + self._last_valid_move_is_pass = True + if not self._game_logic.is_game_over: + opponent_move = self._game_opponent.policy( + game_logic=self._game_logic, player=jaco_arm_board_game.OPPONENT, + random_state=random_state) + assert opponent_move + assert self._game_logic.apply(player=jaco_arm_board_game.OPPONENT, + action=opponent_move) + marker_pos = self._board.sample_pos_inside_touch_sensor( + physics=physics, + random_state=random_state, + row=opponent_move.row, + col=opponent_move.col) + self._markers.mark(physics=physics, + player_id=jaco_arm_board_game.OPPONENT, + pos=marker_pos, + bpos=(opponent_move.row, + opponent_move.col)) + if self._reset_arm_after_move: + self._tcp_initializer(physics, random_state) + + # Redraw all markers that are on the board (after captures) + self._markers.make_all_invisible(physics) + board = self._game_logic.get_board_state() + black_stones = np.transpose(np.nonzero(board[:, :, 1])) + white_stones = np.transpose(np.nonzero(board[:, :, 2])) + if black_stones.size > 0: + self._markers.make_visible_by_bpos(physics, 0, black_stones) + if white_stones.size > 0: + self._markers.make_visible_by_bpos(physics, 1, white_stones) + + +@registry.add(tags.EASY, tags.FEATURES) +def go_7x7(): + return Go(board_size=7, + observation_settings=observations.PERFECT_FEATURES) diff --git a/physics_planning_games/board_games/go_logic.py b/physics_planning_games/board_games/go_logic.py new file mode 100644 index 0000000..fc9c6ca --- /dev/null +++ b/physics_planning_games/board_games/go_logic.py @@ -0,0 +1,526 @@ +# Copyright 2020 DeepMind Technologies Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +"""Logic for the Go Game.""" + +import abc +import collections +import enum +import shutil +import subprocess + +from absl import logging +import numpy as np + +from dm_control.utils import io as resources +from physics_planning_games.board_games import logic_base +import pyspiel + +GNUGO_PATH = '/usr/games/gnugo' +GoMarkerAction = collections.namedtuple('GoMarkerAction', + ['row', 'col', 'pass_action']) + +# Note that there is no 'i' in these Go board coordinates +# (cf https://senseis.xmp.net/?Coordinates) +_X_CHARS = 'abcdefghjklmnopqrstuvwxyz' +_X_MAP = {c: x for c, x in zip(_X_CHARS, range(len(_X_CHARS)))} + + +def _go_marker_to_int(go_marker, board_size): + """Convert GoMarkerAction into GoPoint integer encoding of move. + + Args: + go_marker: GoMarkerAction. + board_size: Board size of the go board (e.g. 9 or 19). + + Returns: + GoPoint int value. + """ + if go_marker.pass_action: + return board_size * board_size + else: + return int((go_marker.row) * board_size + go_marker.col) + + +def _int_to_go_marker(move_int, board_size): + """Decode the integer move encoding to a GoMarkerAction. + + Args: + move_int: Integer encoding the go move. + board_size: Board size of the go board (e.g. 9 or 19). + + Returns: + GoMarkerAction encoding of move. + """ + if move_int == board_size * board_size: + go_marker_action = GoMarkerAction(row=-1, col=-1, pass_action=True) + else: + row = move_int // board_size + col = move_int % board_size + go_marker_action = GoMarkerAction(row=row, col=col, pass_action=False) + + return go_marker_action + + +def _go_marker_to_str(go_marker): + if go_marker.pass_action: + return 'PASS' + else: + move_str = _X_CHARS[go_marker.col] + str(go_marker.row + 1) + return move_str + + +def _str_to_go_marker(move_str): + """Convert from a 2-letter Go move str (e.g. + + a3) to a GoMarker. + + Args: + move_str: String describing the move (e.g. a3). + + Returns: + GoMarkerAction encoding of move. + """ + move_str = move_str.lower() + if move_str == 'pass': + action = GoMarkerAction(row=-1, col=-1, pass_action=True) + elif move_str == 'resign': + raise NotImplementedError('Not dealing with resign') + else: + assert len(move_str) == 2 + col, row = move_str[0], move_str[1] + col = _X_MAP[col] + row = int(row) - 1 + action = GoMarkerAction(row=row, col=col, pass_action=False) + return action + + +def _get_gnugo_ref_config(level=1, binary_path=None): + """Reference config for GnuGo. + + Args: + level: GnuGo level + binary_path: string pointing to GnuGo binary + + Returns: + Config dict that can be passed to gtp engine + """ + + try: + gnugo_binary_path = resources.GetResourceFilename(binary_path) + except FileNotFoundError: + gnugo_binary_path = shutil.which('gnugo') + if not gnugo_binary_path: + raise FileNotFoundError('Not able to locate gnugo library. ', + 'Try installing it by: apt install gnugo') + + gnugo_extra_flags = ['--mode', 'gtp'] + gnugo_extra_flags += ['--chinese-rules', '--capture-all-dead'] + gtp_player_cfg = { + 'name': 'gnugo', + 'binary_path': gnugo_binary_path, + 'level': level, + 'extra_flags': gnugo_extra_flags, + } + return gtp_player_cfg + + +class Stone(enum.Enum): + EMPTY = 1 + WHITE = 2 + BLACK = 3 + + def __lt__(self, other): + value = int(self.value) + return value < other.value + + +def gtp_to_sgf_point(gtp_point, board_size): + """Format a GTP point according to the SGF format.""" + if gtp_point.lower() == 'pass' or gtp_point.lower() == 'resign': + return 'tt' + column, row = gtp_point[0], gtp_point[1:] + # GTP doesn't use i, but SGF does, so we need to convert. + gtp_columns = 'abcdefghjklmnopqrstuvwxyz' + sgf_columns = 'abcdefghijklmnopqrstuvwxyz' + x = gtp_columns.find(column.lower()) + y = board_size - int(row) + return '%s%s' % (sgf_columns[x], sgf_columns[y]) + + +class Gtp(object): + """Wrapper around Go playing program that communicates using GTP.""" + + __metaclass__ = abc.ABCMeta + + def __init__(self, checkpoint_file=None): + self.stones = { + '.': Stone.EMPTY, + '+': Stone.EMPTY, + 'O': Stone.WHITE, + 'X': Stone.BLACK + } + self.moves = [] + self.comments = [] + self.handicap = 0 + self.board_size = 19 + self.komi = 0 + self.free_handicap = None + self.byo_yomi_time = None + self.checkpoint_file = checkpoint_file + self.stderr = None + + def set_board_size(self, size): + self.board_size = size + self.gtp_command('boardsize %d' % size) + self.gtp_command('clear_board') + + def set_komi(self, komi): + self.komi = komi + self.gtp_command('komi %s' % komi) + + def set_free_handicap(self, vertices): + self.free_handicap = vertices + self.gtp_command('set_free_handicap %s' % vertices) + + def place_free_handicap(self, n): + self.free_handicap = self.gtp_command('place_free_handicap %d' % n) + return self.free_handicap + + def make_move(self, move, record=True): + self.gtp_command('play %s' % move) + if record: + self._record_move(move) + + def set_byo_yomi_time(self, t): + self.byo_yomi_time = t + + def num_moves(self): + return len(self.moves) + + def clear_board(self): + self.moves = [] + self.comments = [] + self.gtp_command('clear_board') + + def generate_move(self, color): + if self.byo_yomi_time is not None: + self.gtp_command('time_left %s %d 1' % (color, self.byo_yomi_time)) + move = '%s %s' % (color, self.gtp_command( + 'genmove %s' % color).split(' ')[-1].lower()) + self._record_move(move, stderr=self.stderr) + return move + + def board(self): + raw_board = self.gtp_command('showboard', log=False)[1:].strip() + rows = [line.strip().split(' ')[0] for line in raw_board.split('\n')][1:-1] + rows = [''.join(row.split(' ')[1:-1]) for row in rows] + return [[self.stones[cell] for cell in row] for row in rows] + + def quit(self): + self.gtp_command('quit') + + def final_status(self, status): + return self.gtp_command('final_status_list %s' % status)[2:].replace( + '\n', ' ').split(' ') + + def fixed_handicap(self, handicap): + self.handicap = handicap + self.gtp_command('fixed_handicap %d' % handicap) + + def undo(self, num_moves): + self.gtp_command('gg-undo %d' % num_moves) + for _ in range(num_moves): + self.moves.pop() + self.comments.pop() + + def _record_move(self, move, stderr=None): + self.moves.append(move) + self.comments.append(stderr) + + if self.checkpoint_file: + with open(self.checkpoint_file, 'w') as f: + f.write(self.to_sgf()) + + def to_sgf(self): + sgf = '(;PB[Black]PW[White]KM[%.1f]HA[%d]SZ[19]' % (self.komi, + self.handicap) + for i, move in enumerate(self.moves): + sgf += '\n;' + self._format_sgf_move(move) + if self.comments[i]: + sgf += 'C[' + self._sgf_escape(self.comments[i]) + ']' + return sgf + ')' + + def _format_sgf_move(self, move): + """Format a move according to the SGF format.""" + color, vertex = str(move).split(' ') + return '%s[%s]' % (color[0].upper(), + gtp_to_sgf_point(vertex, self.board_size)) + + def _sgf_escape(self, text): + return ''.join(['\\' + t if t == ']' or t == '\\' else t for t in text]) + + @abc.abstractmethod + def gtp_command(self, command, log=True): + """Executes a GTP command and returns its response. + + Args: + command: The GTP command to run, no trailing newline. + log: Whether to log command and response to INFO. + + Returns: + The GTP response. + Raises: + GtpError: if the response is not ok (doesn't start with '='). + """ + pass + + +class GtpError(Exception): + + def __init__(self, response): + super(GtpError, self).__init__() + self.response = response + + def __str__(self): + return self.response + + +class GoEngine(Gtp): + """GTP-based Go engine. + + Supports at least GnuGo and Pachi. + + For GnuGo, at least specify ['--mode', 'gtp'] in extra_flags. + """ + + def __init__(self, command='', checkpoint_file=None, extra_flags=None): + super(GoEngine, self).__init__(checkpoint_file) + if extra_flags: + command = [command] + extra_flags + self.p = subprocess.Popen( + command, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + universal_newlines=True) + + def gtp_command(self, command, log=True): + if log: + logging.info('GTP: %s', command) + self.p.stdin.write(command) + self.p.stdin.write('\n') + self.p.stdin.flush() + + response = [self.p.stdout.readline()] + while response[-1] != '\n': + response.append(self.p.stdout.readline()) + response = ''.join(response).strip() + + if log: + logging.info('GTP: %s', response) + + if response[0][0] != '=': + raise GtpError(response) + + return response + + +class GoGameLogic(logic_base.OpenSpielBasedLogic): + """Logic for Go game.""" + + def __init__(self, board_size, gnugo_level=1, komi=5.5): + self._board_size = board_size + self._komi = komi + gtp_player_cfg = _get_gnugo_ref_config( + level=gnugo_level, + binary_path=GNUGO_PATH) + + self._gtp_player = GoEngine( + command=gtp_player_cfg['binary_path'], + extra_flags=gtp_player_cfg['extra_flags']) + self._gtp_player.set_board_size(board_size) + self.reset() + + def board_size(self): + return self._board_size + + def get_gtp_player(self): + return self._gtp_player + + def reset(self): + """Resets the game state.""" + # For now we always assume we are the starting player and use a random + # opponent. + self._gtp_player.gtp_command('clear_board', log=False) + self._gtp_player.set_board_size(self._board_size) + self._gtp_player.set_komi(self._komi) + game = pyspiel.load_game( + 'go', {'board_size': pyspiel.GameParameter(self._board_size)}) + self._open_spiel_state = game.new_initial_state() + + self._moves = np.ones( + (self._board_size * self._board_size * 2,), dtype=np.int32) * -1 + self._move_id = 0 + + def show_board(self): + self._gtp_player.gtp_command('showboard') + + def get_gtp_reward(self): + self._gtp_player.gtp_command('final_score') + + def get_board_state(self): + """Returns the logical board state as a numpy array. + + Returns: A boolean array of shape (H, W, C), where H=3, W=3 (height and + width of the board) and C=4 for the 4 planes. The 4 planes are, in order, + unmarked, black (player 0), white (player 1) and komi (this layer is + always all the same value indicating whether white is to play). + """ + board_state = np.reshape( + np.array(self._open_spiel_state.observation_tensor(0), dtype=np.bool), + [4, self._board_size, self._board_size]) + board_state = np.transpose(board_state, [1, 2, 0]) + board_state = board_state[:, :, [2, 0, 1, 3]] + return board_state + + def set_state_from_history(self, move_history): + self.reset() + move_history = np.squeeze(move_history.numpy()) + for t in range(move_history.size): + if move_history[t] < 0: + break + else: + self.apply(t % 2, move_history[t]) + # self.show_board() + + def get_move_history(self): + """Returns the move history as padded numpy array.""" + return self._moves + + def apply(self, player, action): + """Checks whether action is valid, and if so applies it to the game state. + + Args: + player: Integer specifying the player ID; either 0 or 1. + action: A `GoMarkerAction` instance (or numpy.int32) which represent the + action in the board of size `board_size`. + + Returns: + True if the action was valid, else False. + """ + if isinstance(action, GoMarkerAction): + action = _go_marker_to_int(action, self._board_size) + + if self._open_spiel_state.current_player() != player: + return False + + legal_actions = self._open_spiel_state.legal_actions() + if np.isin(action, legal_actions): + self._open_spiel_state.apply_action(action) + was_valid_move = True + else: + was_valid_move = False + + if not was_valid_move: + return False + + self._moves[self._move_id] = action + self._move_id += 1 + + # Apply to the Go program + player_color = 'B' if player == 0 else 'W' + action_str = _go_marker_to_str(_int_to_go_marker(action, self._board_size)) + self._gtp_player.gtp_command('play {} {}'.format(player_color, action_str)) + + return was_valid_move + + +def gen_move(game_logic, player): + """Generate move from GTP player and game state defined in game_logic.""" + player_color = 'B' if player == 0 else 'W' + gtp_player = game_logic.get_gtp_player() + move_str = gtp_player.gtp_command( + 'reg_genmove {}'.format(player_color), log=True) + move_str = move_str[2:].lower() + action = _str_to_go_marker(move_str) + return action + + +def gen_random_move(game_logic, random_state): + """Generate random move for current state in game logic.""" + if game_logic.is_game_over: + return None + valid_moves = game_logic.open_spiel_state.legal_actions() + assert valid_moves + move = random_state.choice(valid_moves) + go_action = _int_to_go_marker(move, board_size=game_logic.board_size()) + return go_action + + +class GoGTPOpponent(logic_base.Opponent): + """Use external binary Pachi to generate opponent moves.""" + + def __init__(self, board_size, mixture_p=0.0): + """Initialize Go opponent. + + Args: + board_size: Go board size (int) + mixture_p: Probability of playing a random move (amongst legal moves). + """ + self._board_size = board_size + self._mixture_p = mixture_p + + def reset(self): + pass + + def policy(self, game_logic, player, random_state): + """Return policy action. + + Args: + game_logic: Go game logic state. + player: Integer specifying the player ID; either 0 or 1. + random_state: Numpy random state object. + + Returns: + GoMarkerAction indicating opponent move. + """ + if random_state.rand() < self._mixture_p: + return gen_random_move(game_logic, random_state) + else: + return gen_move(game_logic, player) + + +class GoRandomOpponent(logic_base.Opponent): + """An easy opponent for Go.""" + + def __init__(self, board_size): + self._board_size = board_size + + def reset(self): + """Resets the opponent's internal state (not implemented).""" + pass + + def policy(self, game_logic, player, random_state): + """Return a random, valid move. + + Args: + game_logic: TicTacToeGameLogic state of the game. + player: Integer specifying the player ID; either 0 or 1. + random_state: An instance of `np.random.RandomState` + + Returns: + GoMarkerAction of opponent. + """ + return gen_random_move(game_logic, random_state) diff --git a/physics_planning_games/board_games/go_logic_test.py b/physics_planning_games/board_games/go_logic_test.py new file mode 100644 index 0000000..3d20850 --- /dev/null +++ b/physics_planning_games/board_games/go_logic_test.py @@ -0,0 +1,145 @@ +# Copyright 2020 DeepMind Technologies Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +from absl.testing import absltest +from absl.testing import parameterized +import numpy as np + +from physics_planning_games.board_games import go_logic + + +class GoGameLogicTest(parameterized.TestCase): + + def setUp(self): + super(GoGameLogicTest, self).setUp() + self.logic = go_logic.GoGameLogic(board_size=5) + self.expected_board_state = np.zeros((5, 5, 4), dtype=bool) + self.expected_board_state[:, :, 0] = True + + def test_valid_move_sequence(self): + np.testing.assert_array_equal(self.logic.get_board_state(), + self.expected_board_state) + + action = go_logic.GoMarkerAction(col=1, row=2, pass_action=False) + self.assertTrue(self.logic.apply(player=0, action=action), + msg='Invalid action: {}'.format(action)) + + def test_pass(self): + action = go_logic.GoMarkerAction(col=0, row=0, pass_action=True) + self.assertTrue(self.logic.apply(player=0, action=action), + msg='Invalid action: {}'.format(action)) + self.expected_board_state[:, :, 3] = True + np.testing.assert_array_equal(self.logic.get_board_state(), + self.expected_board_state) + + def test_invalid_move_sequence(self): + np.testing.assert_array_equal(self.logic.get_board_state(), + self.expected_board_state) + action = go_logic.GoMarkerAction(col=1, row=2, pass_action=False) + self.assertTrue(self.logic.apply(player=0, action=action), + msg='Invalid action: {}'.format(action)) + self.expected_board_state[action.row, action.col, 0] = False + self.expected_board_state[action.row, action.col, 1] = True + self.expected_board_state[:, :, 3] = True + np.testing.assert_array_equal(self.logic.get_board_state(), + self.expected_board_state) + + action = go_logic.GoMarkerAction(col=1, row=2, pass_action=False) + self.assertFalse(self.logic.apply(player=0, action=action), + msg='Invalid action was accepted: {}'.format(action)) + + # Player 1 tries to move in the same location as player 0. + self.assertFalse(self.logic.apply(player=1, action=action), + msg='Invalid action was accepted: {}'.format(action)) + + # The board state should not have changed as a result of invalid actions. + np.testing.assert_array_equal(self.logic.get_board_state(), + self.expected_board_state) + + def test_random_opponent_vs_gnugo(self): + """Play random v gnugo opponents and check that optimal largely wins. + """ + board_size = 9 + rand_state = np.random.RandomState(42) + pachi_opponent = go_logic.GoGTPOpponent(board_size) + random_opponent = go_logic.GoRandomOpponent(board_size) + players = [pachi_opponent, random_opponent] + pachi_returns = [] + random_returns = [] + + for _ in range(3): + logic = go_logic.GoGameLogic(board_size) + pachi_opponent.reset() + random_opponent.reset() + + rand_state.shuffle(players) + current_player_idx = 0 + + while not logic.is_game_over: + current_player = players[current_player_idx] + action = current_player.policy(logic, current_player_idx, rand_state) + valid_action = logic.apply(current_player_idx, action) + self.assertTrue(valid_action, + msg='Opponent {} selected invalid action {}'.format( + current_player, action)) + current_player_idx = (current_player_idx + 1) % 2 + + # Record the winner. + reward = logic.get_reward + if players[0] == pachi_opponent: + pachi_return = reward[0] + random_return = reward[1] + else: + pachi_return = reward[1] + random_return = reward[0] + pachi_returns.append(pachi_return) + random_returns.append(random_return) + + mean_pachi_returns = np.mean(pachi_returns) + mean_random_returns = np.mean(random_returns) + self.assertGreater(mean_pachi_returns, 0.95) + self.assertLess(mean_random_returns, 0.05) + + @parameterized.named_parameters([ + dict(testcase_name='00', + row=0, col=0), + dict(testcase_name='01', + row=1, col=0)]) + def test_go_marker_to_int(self, row, col): + go_marker = go_logic.GoMarkerAction(row=row, col=col, pass_action=False) + int_action = go_logic._go_marker_to_int(go_marker, board_size=19) + recovered_go_marker = go_logic._int_to_go_marker(int_action, board_size=19) + self.assertEqual(go_marker, recovered_go_marker, + msg='Initial go marker {}, recovered {}'.format( + go_marker, recovered_go_marker)) + + @parameterized.named_parameters([ + dict(testcase_name='00', + row=0, col=0), + dict(testcase_name='01', + row=1, col=0)]) + def test_go_marker_to_str(self, row, col): + go_marker = go_logic.GoMarkerAction(row=row, col=col, pass_action=False) + str_action = go_logic._go_marker_to_str(go_marker) + recovered_go_marker = go_logic._str_to_go_marker(str_action) + self.assertEqual(go_marker, + recovered_go_marker, + msg='Initial go marker {}, recovered {}, ' + 'str_action {}'.format(go_marker, recovered_go_marker, + str_action)) + + +if __name__ == '__main__': + absltest.main() diff --git a/physics_planning_games/board_games/jaco_arm_board_game.py b/physics_planning_games/board_games/jaco_arm_board_game.py new file mode 100644 index 0000000..b6a86e4 --- /dev/null +++ b/physics_planning_games/board_games/jaco_arm_board_game.py @@ -0,0 +1,135 @@ +# Copyright 2020 DeepMind Technologies Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +"""Shared base class for two-player Jaco arm board games. +""" + +import functools + +from dm_control import composer +from dm_control.composer import initializers +from dm_control.composer.observation import observable +from dm_control.composer.variation import distributions +from dm_control.composer.variation import rotations +from dm_control.entities.manipulators import base +from dm_control.entities.manipulators import kinova +import numpy as np + +from physics_planning_games.board_games._internal import arenas +from physics_planning_games.board_games._internal import observations + +_ARM_Y_OFFSET = 0.4 +_TCP_LOWER_BOUNDS = (-0.1, -0.1, 0.2) +_TCP_UPPER_BOUNDS = (0.1, 0.1, 0.4) + +# Player IDs +SELF = 0 +OPPONENT = 1 + + +def _uniform_downward_rotation(): + angle = distributions.Uniform(-np.pi, np.pi, single_sample=True) + quaternion = rotations.QuaternionFromAxisAngle(axis=(0., 0., 1.), angle=angle) + return functools.partial(rotations.QuaternionPreMultiply(quaternion), + initial_value=base.DOWN_QUATERNION) + + +class JacoArmBoardGame(composer.Task): + """Base class for two-player checker-like board games.""" + + def __init__(self, observation_settings, opponent, game_logic, board, + markers): + """Initializes the task. + + Args: + observation_settings: An `observations.ObservationSettings` namedtuple + specifying configuration options for each category of observation. + opponent: Opponent used for generating opponent moves. + game_logic: Logic for keeping track of the logical state of the board. + board: Board to use. + markers: Markers to use. + """ + self._game_logic = game_logic + self._game_opponent = opponent + arena = arenas.Standard(observable_options=observations.make_options( + observation_settings, observations.ARENA_OBSERVABLES)) + arena.attach(board) + arm = kinova.JacoArm(observable_options=observations.make_options( + observation_settings, observations.JACO_ARM_OBSERVABLES)) + hand = kinova.JacoHand(observable_options=observations.make_options( + observation_settings, observations.JACO_HAND_OBSERVABLES)) + arm.attach(hand) + arena.attach_offset(arm, offset=(0, _ARM_Y_OFFSET, 0)) + arena.attach(markers) + + # Geoms belonging to the arm and hand are placed in a custom group in order + # to disable their visibility to the top-down camera. NB: we assume that + # there are no other geoms in ROBOT_GEOM_GROUP that don't belong to the + # robot (this is usually the case since the default geom group is 0). If + # there are then these will also be invisible to the top-down camera. + for robot_geom in arm.mjcf_model.find_all('geom'): + robot_geom.group = arenas.ROBOT_GEOM_GROUP + + self._arena = arena + self._board = board + self._arm = arm + self._hand = hand + self._markers = markers + self._tcp_initializer = initializers.ToolCenterPointInitializer( + hand=hand, arm=arm, + position=distributions.Uniform(_TCP_LOWER_BOUNDS, _TCP_UPPER_BOUNDS), + quaternion=_uniform_downward_rotation()) + + # Add an observable exposing the logical state of the board. + board_state_observable = observable.Generic( + lambda physics: self._game_logic.get_board_state()) + board_state_observable.configure( + **observation_settings.board_state._asdict()) + self._task_observables = {'board_state': board_state_observable} + + @property + def root_entity(self): + return self._arena + + @property + def arm(self): + return self._arm + + @property + def hand(self): + return self._hand + + @property + def task_observables(self): + return self._task_observables + + def get_reward(self, physics): + del physics # Unused. + return self._game_logic.get_reward[SELF] + + def should_terminate_episode(self, physics): + return self._game_logic.is_game_over + + def initialize_episode(self, physics, random_state): + self._tcp_initializer(physics, random_state) + self._game_logic.reset() + self._game_opponent.reset() + + def before_step(self, physics, action, random_state): + super(JacoArmBoardGame, self).before_step(physics, action, random_state) + self._made_move_this_step = False + + def after_substep(self, physics, random_state): + raise NotImplementedError('Subclass must implement after_substep.') diff --git a/physics_planning_games/board_games/logic_base.py b/physics_planning_games/board_games/logic_base.py new file mode 100644 index 0000000..369d530 --- /dev/null +++ b/physics_planning_games/board_games/logic_base.py @@ -0,0 +1,116 @@ +# Copyright 2020 DeepMind Technologies Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +"""Abstract base classes and utility functions for logical aspects of the games. +""" + +import abc + +ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()}) + + +class GameLogic(ABC): + """Define the abstrat game logic class. + """ + + @abc.abstractmethod + def __init__(self): + pass + + @abc.abstractmethod + def reset(self): + pass + + @abc.abstractproperty + def is_game_over(self): + """Boolean specifying whether the current game has ended.""" + + @abc.abstractproperty + def get_reward(self): + pass + + @abc.abstractmethod + def get_board_state(self): + """Returns the logical board state as a numpy array.""" + + @abc.abstractmethod + def apply(self, player, action): + """Checks whether action is valid, and if so applies it to the game state. + + Args: + player: Integer specifying the player ID; either 0 or 1. + action: A `GoMarkerAction` instance. + + Returns: + True if the action was valid, else False. + """ + + +class OpenSpielBasedLogic(GameLogic): + """GameLogic using OpenSpiel for tracking game state. + """ + + @property + def is_game_over(self): + """Boolean specifying whether the current game has ended.""" + return self._open_spiel_state.is_terminal() + + @property + def get_reward(self): + """Returns a dictionary that maps from `{player_id: player_reward}`.""" + + if self.is_game_over: + player0_return = self._open_spiel_state.player_return(0) + # Translate from OpenSpiel returns to 0.5 for draw, -1 for loss, + # +1 for win. + if player0_return == 0.: + reward = {0: 0.5, 1: 0.5} + elif player0_return == 1.: + reward = {0: 1., 1: 0.} + else: + assert player0_return == -1. + reward = {0: 0., 1: 1.} + else: + reward = {0: 0., + 1: 0.} + return reward + + @property + def open_spiel_state(self): + """OpenSpiel object representing the underlying game state.""" + return self._open_spiel_state + + +class Opponent(ABC): + """Abstract Opponent class.""" + + @abc.abstractmethod + def __init__(self): + pass + + @abc.abstractmethod + def reset(self): + pass + + @abc.abstractmethod + def policy(self, game_logic, random_state): + """Return policy action. + + Args: + game_logic: Go game logic state. + random_state: Numpy random state object. + Returns: + NamedTuple indicating opponent move. + """ diff --git a/physics_planning_games/board_games/tic_tac_toe.py b/physics_planning_games/board_games/tic_tac_toe.py new file mode 100644 index 0000000..ba42f53 --- /dev/null +++ b/physics_planning_games/board_games/tic_tac_toe.py @@ -0,0 +1,110 @@ +# Copyright 2020 DeepMind Technologies Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +"""A Tic Tac Toe task.""" + +from physics_planning_games.board_games import jaco_arm_board_game +from physics_planning_games.board_games import tic_tac_toe_logic +from physics_planning_games.board_games._internal import boards +from physics_planning_games.board_games._internal import observations +from physics_planning_games.board_games._internal import pieces +from physics_planning_games.board_games._internal import registry +from physics_planning_games.board_games._internal import tags + + +class TicTacToe(jaco_arm_board_game.JacoArmBoardGame): + """Single-player Tic Tac Toe.""" + + def __init__(self, observation_settings, opponent=None, + reset_arm_after_move=True): + """Initializes a `TicTacToe` task. + + Args: + observation_settings: An `observations.ObservationSettings` namedtuple + specifying configuration options for each category of observation. + opponent: TicTacToeOpponent used for generating opponent moves. + reset_arm_after_move: Whether to reset arm to random position after every + piece being placed on the board. + """ + game_logic = tic_tac_toe_logic.TicTacToeGameLogic() + if opponent is None: + opponent = tic_tac_toe_logic.TicTacToeRandomOpponent() + + markers = pieces.Markers(num_per_player=5, + observable_options=observations.make_options( + observation_settings, + observations.MARKER_OBSERVABLES)) + self._reset_arm_after_move = reset_arm_after_move + super(TicTacToe, self).__init__(observation_settings=observation_settings, + opponent=opponent, + game_logic=game_logic, + board=boards.CheckerBoard(), + markers=markers) + + @property + def control_timestep(self): + return 0.05 + + def after_substep(self, physics, random_state): + if not self._made_move_this_step: + indices = self._board.get_contact_indices(physics) + if not indices: + return + row, col = indices + valid_move = self._game_logic.apply( + player=jaco_arm_board_game.SELF, + action=tic_tac_toe_logic.SingleMarkerAction(row=row, col=col)) + if valid_move: + self._made_move_this_step = True + marker_pos = self._board.get_contact_pos( + physics=physics, row=row, col=col) + self._markers.mark(physics=physics, player_id=jaco_arm_board_game.SELF, + pos=marker_pos) + if not self._game_logic.is_game_over: + opponent_move = self._game_opponent.policy( + game_logic=self._game_logic, random_state=random_state) + assert opponent_move + assert self._game_logic.apply(player=jaco_arm_board_game.OPPONENT, + action=opponent_move) + marker_pos = self._board.sample_pos_inside_touch_sensor( + physics=physics, + random_state=random_state, + row=opponent_move.row, + col=opponent_move.col) + self._markers.mark(physics=physics, + player_id=jaco_arm_board_game.OPPONENT, + pos=marker_pos) + if self._reset_arm_after_move: + self._tcp_initializer(physics, random_state) + + +@registry.add(tags.EASY, tags.FEATURES) +def tic_tac_toe_markers_features(**unused_kwargs): + return TicTacToe(observation_settings=observations.PERFECT_FEATURES) + + +@registry.add(tags.MED, tags.FEATURES) +def tic_tac_toe_mixture_opponent_markers_features(mixture_p=0.25): + print('Creating tictactoe task with random/optimal opponent mixture, p={}' + .format(mixture_p)) + return TicTacToe( + observation_settings=observations.PERFECT_FEATURES, + opponent=tic_tac_toe_logic.TicTacToeMixtureOpponent(mixture_p)) + + +@registry.add(tags.HARD, tags.FEATURES) +def tic_tac_toe_optimal_opponent_markers_features(**unused_kwargs): + return TicTacToe(observation_settings=observations.PERFECT_FEATURES, + opponent=tic_tac_toe_logic.TicTacToeOptimalOpponent()) diff --git a/physics_planning_games/board_games/tic_tac_toe_logic.py b/physics_planning_games/board_games/tic_tac_toe_logic.py new file mode 100644 index 0000000..3705cde --- /dev/null +++ b/physics_planning_games/board_games/tic_tac_toe_logic.py @@ -0,0 +1,265 @@ +# Copyright 2020 DeepMind Technologies Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +"""TicTacToe logic wrapper for use in manipulation tasks.""" + +import collections +import itertools + +import numpy as np + +from physics_planning_games.board_games import logic_base +from open_spiel.python.algorithms import minimax +import pyspiel + + +SingleMarkerAction = collections.namedtuple('SingleMarkerAction', + ['row', 'col']) +force_random_start_position = False + + +class TicTacToeGameLogic(logic_base.OpenSpielBasedLogic): + """Logic for TicTacToe game.""" + + def __init__(self): + self.reset() + + def reset(self): + """Resets the game state.""" + # For now we always assume we are the starting player. + + game = pyspiel.load_game('tic_tac_toe') + self._open_spiel_state = game.new_initial_state() + + if force_random_start_position: + # For debugging purposes only, force some random moves + rand_state = np.random.RandomState(46) + rand_player = TicTacToeRandomOpponent() + num_moves = 4 + for _ in range(num_moves): + action = rand_player.policy(self, rand_state) + action_1d = np.ravel_multi_index(action, (3, 3)) + self._open_spiel_state.apply_action(action_1d) + + def get_board_state(self): + """Returns the logical board state as a numpy array. + + Returns: + A boolean array of shape (H, W, C), where H=3, W=3 (height and width + of the board) and C=3 for the 3 planes. The 3 planes are, in order, + unmarked squares, x's (player 0) and y's (player 1). + """ + board_state = np.reshape( + np.array(self._open_spiel_state.observation_tensor(0), dtype=np.bool), + [3, 3, 3]) + board_state = np.transpose(board_state, [1, 2, 0]) + board_state = board_state[:, :, [0, 2, 1]] + return board_state + + def apply(self, player, action): + """Checks whether action is valid, and if so applies it to the game state. + + Args: + player: Integer specifying the player ID; either 0 or 1. + action: A `SingleMarkerAction` instance. + + Returns: + True if the action was valid, else False. + """ + action_value = np.ravel_multi_index((action.row, action.col), (3, 3)) + if self._open_spiel_state.current_player() != player: + return False + + try: + self._open_spiel_state.apply_action(action_value) + was_valid_move = True + except RuntimeError: + was_valid_move = False + + return was_valid_move + + +class TicTacToeRandomOpponent(logic_base.Opponent): + """An easy opponent for TicTacToe.""" + + def __init__(self): + pass + + def reset(self): + """Resets the opponent's internal state (not implemented).""" + pass + + def policy(self, game_logic, random_state): + """Return a random, valid move. + + Args: + game_logic: TicTacToeGameLogic state of the game. + random_state: An instance of `np.random.RandomState` + + Returns: + SingleMarkerAction of opponent. + """ + if game_logic.is_game_over: + return None + + valid_moves = game_logic.open_spiel_state.legal_actions() + assert valid_moves + move = random_state.choice(valid_moves) + row, col = np.unravel_index(move, dims=(3, 3)) + return SingleMarkerAction(row=row, col=col) + + +class TicTacToeMixtureOpponent(logic_base.Opponent): + """A TicTacToe opponent which makes a mixture of optimal and random moves. + + The optimal mixture component uses minimax search. + """ + + def __init__(self, mixture_p): + """Initialize the mixture opponent. + + Args: + mixture_p: The mixture probability. We choose moves from the random + opponent with probability mixture_p and moves from the optimal + opponent with probability 1 - mixture_p. + """ + + self._random_opponent = TicTacToeRandomOpponent() + self._optimal_opponent = TicTacToeOptimalOpponent() + self._mixture_p = mixture_p + + def reset(self): + pass + + def policy(self, game_logic, random_state): + if random_state.rand() < self._mixture_p: + return self._random_opponent.policy(game_logic, random_state) + else: + return self._optimal_opponent.policy(game_logic, random_state) + + +class TicTacToeOptimalOpponent(logic_base.Opponent): + """A TicTacToe opponent which makes perfect moves. + + Uses minimax search. + """ + + def __init__(self): + pass + + def reset(self): + pass + + def policy(self, game_logic, random_state): + action = tic_tac_toe_minimax(game_logic.open_spiel_state, random_state) + return action + + +def numpy_array_to_open_spiel_state(board_state): + """Take a numpy observation [3x3x3] bool area and create an OpenSpiel state. + + Args: + board_state: 3x3x3 bool array with [col, row, c] with c indexing, in order, + empty squares, x moves, y moves. + + Returns: + open_spiel_state: OpenSpiel state of this position. + """ + game = pyspiel.load_game('tic_tac_toe') + open_spiel_state = game.new_initial_state() + + x_moves = np.flatnonzero(board_state[:, :, 1]) + y_moves = np.flatnonzero(board_state[:, :, 2]) + + for x_m, y_m in itertools.zip_longest(x_moves, y_moves): + if open_spiel_state.is_terminal(): + break + open_spiel_state.apply_action(x_m) + if open_spiel_state.is_terminal(): + break + if y_m is not None: + open_spiel_state.apply_action(y_m) + + return open_spiel_state + + +def open_spiel_move_to_single_marker_action(action): + row, col = np.unravel_index(action, dims=(3, 3)) + return SingleMarkerAction(row=row, col=col) + + +def tic_tac_toe_random_move(state, random_state): + """Returns a legal move at random from current state. + + Args: + state: World state of the game. Either an OpenSpiel state + or a numpy encoding of the board. + random_state: numpy random state used for choosing randomly if there is more + than one optimal action. + + Returns: + action: SingleMarkerAction of a random move. + """ + if isinstance(state, np.ndarray): + spiel_state = numpy_array_to_open_spiel_state(state) + else: + spiel_state = state + if spiel_state.is_terminal(): + return False + + legal_actions = spiel_state.legal_actions() + action = random_state.choice(legal_actions) + return open_spiel_move_to_single_marker_action(action) + + +def tic_tac_toe_minimax(state, random_state): + """Tree search from the world_state in order to find the optimal action. + + Args: + state: World state of the game. Either an OpenSpiel state + or a numpy encoding of the board. + random_state: numpy random state used for choosing randomly if there is more + than one optimal action. + + Returns: + action: SingleMarkerAction of an optimal move. + """ + if isinstance(state, np.ndarray): + spiel_state = numpy_array_to_open_spiel_state(state) + else: + spiel_state = state + if spiel_state.is_terminal(): + return False + + current_player = spiel_state.current_player() + legal_actions = spiel_state.legal_actions() + best_actions = [] + best_value = -100 + + for action in legal_actions: + state_after_action = spiel_state.clone() + state_after_action.apply_action(action) + value, _ = minimax.expectiminimax(state_after_action, 100, None, + current_player) + if value > best_value: + best_value = value + best_actions = [action] + elif value == best_value: + best_actions.append(action) + + assert best_actions + action = random_state.choice(best_actions) + + return open_spiel_move_to_single_marker_action(action) diff --git a/physics_planning_games/board_games/tic_tac_toe_logic_test.py b/physics_planning_games/board_games/tic_tac_toe_logic_test.py new file mode 100644 index 0000000..9b15300 --- /dev/null +++ b/physics_planning_games/board_games/tic_tac_toe_logic_test.py @@ -0,0 +1,204 @@ +# Copyright 2020 DeepMind Technologies Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +from absl.testing import absltest +from absl.testing import parameterized +import numpy as np + +from physics_planning_games.board_games import tic_tac_toe_logic + + +class TicTacToeGameLogicTest(parameterized.TestCase): + + def setUp(self): + super(TicTacToeGameLogicTest, self).setUp() + self.logic = tic_tac_toe_logic.TicTacToeGameLogic() + self.expected_board_state = np.zeros((3, 3, 3), dtype=bool) + self.expected_board_state[..., 0] = True # All positions initially empty. + + def test_valid_move_sequence(self): + np.testing.assert_array_equal(self.logic.get_board_state(), + self.expected_board_state) + + action = tic_tac_toe_logic.SingleMarkerAction(col=1, row=2) + self.assertTrue(self.logic.apply(player=0, action=action), + msg='Invalid action: {}'.format(action)) + self.expected_board_state[action.row, action.col, 0] = False + self.expected_board_state[action.row, action.col, 1] = True + np.testing.assert_array_equal(self.logic.get_board_state(), + self.expected_board_state) + + action = tic_tac_toe_logic.SingleMarkerAction(col=0, row=1) + self.assertTrue(self.logic.apply(player=1, action=action), + msg='Invalid action: {}'.format(action)) + self.expected_board_state[action.row, action.col, 0] = False + self.expected_board_state[action.row, action.col, 2] = True + np.testing.assert_array_equal(self.logic.get_board_state(), + self.expected_board_state) + + def test_invalid_move_sequence(self): + np.testing.assert_array_equal(self.logic.get_board_state(), + self.expected_board_state) + action = tic_tac_toe_logic.SingleMarkerAction(col=1, row=2) + self.assertTrue(self.logic.apply(player=0, action=action), + msg='Invalid action: {}'.format(action)) + self.expected_board_state[action.row, action.col, 0] = False + self.expected_board_state[action.row, action.col, 1] = True + np.testing.assert_array_equal(self.logic.get_board_state(), + self.expected_board_state) + + # Player 0 tries to move again in the same location. + action = tic_tac_toe_logic.SingleMarkerAction(col=1, row=2) + self.assertFalse(self.logic.apply(player=0, action=action), + msg='Invalid action was accepted: {}'.format(action)) + + # Player 1 tries to move in the same location as player 0. + self.assertFalse(self.logic.apply(player=1, action=action), + msg='Invalid action was accepted: {}'.format(action)) + + # The board state should not have changed as a result of invalid actions. + np.testing.assert_array_equal(self.logic.get_board_state(), + self.expected_board_state) + + @parameterized.named_parameters([ + dict(testcase_name='player_0_win', + move_sequence=((0, 0, 0), + (1, 0, 1), + (0, 1, 0), + (1, 2, 1), + (0, 2, 0)), + winner_id=0), + dict(testcase_name='player_1_win', + move_sequence=((0, 0, 0), + (1, 0, 2), + (0, 1, 0), + (1, 1, 1), + (0, 0, 1), + (1, 2, 0)), + winner_id=1), + dict(testcase_name='draw', + move_sequence=((0, 0, 0), + (1, 1, 1), + (0, 1, 0), + (1, 2, 0), + (0, 0, 2), + (1, 0, 1), + (0, 2, 1), + (1, 2, 2), + (0, 1, 2)), + winner_id=None)]) + def test_reward_and_termination(self, move_sequence, winner_id): + for (player_id, row, col) in move_sequence: + self.assertFalse(self.logic.is_game_over) + self.assertDictEqual(self.logic.get_reward, {0: 0.0, 1: 0.0}) + action = tic_tac_toe_logic.SingleMarkerAction(col=col, row=row) + self.assertTrue(self.logic.apply(player=player_id, action=action), + msg='Invalid action: {}'.format(action)) + self.assertTrue(self.logic.is_game_over) + rewards = self.logic.get_reward + if winner_id is not None: + loser_id = 1 - winner_id + self.assertDictEqual(rewards, {winner_id: 1.0, loser_id: 0.0}) + else: # Draw + self.assertDictEqual(rewards, {0: 0.5, 1: 0.5}) + + def test_random_opponent_vs_optimal(self): + """Play random v optimal opponents and check that optimal largely wins. + """ + rand_state = np.random.RandomState(42) + optimal_opponent = tic_tac_toe_logic.TicTacToeOptimalOpponent() + random_opponent = tic_tac_toe_logic.TicTacToeRandomOpponent() + players = [optimal_opponent, random_opponent] + optimal_returns = [] + random_returns = [] + + for _ in range(20): + logic = tic_tac_toe_logic.TicTacToeGameLogic() + optimal_opponent.reset() + random_opponent.reset() + + rand_state.shuffle(players) + current_player_idx = 0 + + while not logic.is_game_over: + current_player = players[current_player_idx] + action = current_player.policy(logic, rand_state) + self.assertTrue(logic.apply(current_player_idx, action), + msg='Opponent {} selected invalid action {}'.format( + current_player, action)) + current_player_idx = (current_player_idx + 1) % 2 + + # Record the winner. + reward = logic.get_reward + if players[0] == optimal_opponent: + optimal_return = reward[0] + random_return = reward[1] + else: + optimal_return = reward[1] + random_return = reward[0] + optimal_returns.append(optimal_return) + random_returns.append(random_return) + + mean_optimal_returns = np.mean(optimal_returns) + mean_random_returns = np.mean(random_returns) + self.assertGreater(mean_optimal_returns, 0.9) + self.assertLess(mean_random_returns, 0.1) + + @parameterized.named_parameters([ + dict(testcase_name='pos0', + move_sequence=((0, 0, 1), + (1, 1, 1), + (0, 0, 2), + (1, 1, 2)), + optimal_move=(0, 0)), + dict(testcase_name='pos1', + move_sequence=((0, 0, 1), + (1, 1, 2), + (0, 0, 2), + (1, 1, 1)), + optimal_move=(0, 0)), + dict(testcase_name='pos2', + move_sequence=((0, 2, 1), + (1, 1, 2), + (0, 2, 2), + (1, 1, 1)), + optimal_move=(2, 0)), + ]) + def test_minimax_policy(self, move_sequence, optimal_move): + rand_state = np.random.RandomState(42) + for (player_id, row, col) in move_sequence: + action = tic_tac_toe_logic.SingleMarkerAction(col=col, row=row) + self.assertTrue(self.logic.apply(player=player_id, action=action), + msg='Invalid action: {}'.format(action)) + + state = self.logic.open_spiel_state + planner_action = tic_tac_toe_logic.tic_tac_toe_minimax(state, + rand_state) + self.assertEqual(planner_action, optimal_move) + + # Do the same but with np array as input + self.logic = tic_tac_toe_logic.TicTacToeGameLogic() + for (player_id, row, col) in move_sequence: + action = tic_tac_toe_logic.SingleMarkerAction(col=col, row=row) + self.assertTrue(self.logic.apply(player=player_id, action=action), + msg='Invalid action: {}'.format(action)) + + board = self.logic.get_board_state() + planner_action = tic_tac_toe_logic.tic_tac_toe_minimax(board, + rand_state) + self.assertEqual(planner_action, optimal_move) + +if __name__ == '__main__': + absltest.main() diff --git a/physics_planning_games/explore.py b/physics_planning_games/explore.py new file mode 100644 index 0000000..19748ae --- /dev/null +++ b/physics_planning_games/explore.py @@ -0,0 +1,66 @@ +# Copyright 2020 DeepMind Technologies Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +"""Visualize physical planning games in Domain Explorer. +""" + +import functools + +from absl import app +from absl import flags +from dm_control import composer +from dm_control import viewer +from dm_control.locomotion import walkers + +from physics_planning_games import board_games +from physics_planning_games.mujoban.boxoban import boxoban_level_generator +from physics_planning_games.mujoban.mujoban import Mujoban +from physics_planning_games.mujoban.mujoban_level import MujobanLevel + +flags.DEFINE_enum('environment_name', 'mujoban', [ + 'mujoban', 'go_7x7', 'tic_tac_toe_markers_features', + 'tic_tac_toe_mixture_opponent_markers_features', + 'tic_tac_toe_optimal_opponent_markers_features'], + 'Name of an environment to load.') +FLAGS = flags.FLAGS + +TIME_LIMIT = 1000 +CONTROL_TIMESTEP = .1 + + +def main(argv): + if len(argv) > 1: + raise app.UsageError('Too many command-line arguments.') + + environment_name = FLAGS.environment_name + if environment_name == 'mujoban': + walker = walkers.JumpingBallWithHead(add_ears=True, camera_height=0.25) + arena = MujobanLevel(boxoban_level_generator) + task = Mujoban( + walker=walker, + maze=arena, + control_timestep=CONTROL_TIMESTEP, + top_camera_height=64, + top_camera_width=48) + env = composer.Environment( + time_limit=TIME_LIMIT, task=task, strip_singleton_obs_buffer_dim=True) + else: + env = functools.partial( + board_games.load, environment_name=environment_name) + + viewer.launch(env) + +if __name__ == '__main__': + app.run(main) diff --git a/physics_planning_games/mujoban/__init__.py b/physics_planning_games/mujoban/__init__.py new file mode 100644 index 0000000..d41b43a --- /dev/null +++ b/physics_planning_games/mujoban/__init__.py @@ -0,0 +1,19 @@ +# Copyright 2020 DeepMind Technologies Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +"""Top-level module definitions for mujoban.""" + +from physics_planning_games.mujoban.mujoban import Mujoban +from physics_planning_games.mujoban.mujoban_level import MujobanLevel diff --git a/physics_planning_games/mujoban/boxoban.py b/physics_planning_games/mujoban/boxoban.py new file mode 100644 index 0000000..12cdf9f --- /dev/null +++ b/physics_planning_games/mujoban/boxoban.py @@ -0,0 +1,101 @@ +# Copyright 2020 DeepMind Technologies Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +"""Level generator for Mujoban based on levels from follwing dataset. + +https://github.com/deepmind/boxoban-levels/ +""" + +import glob +import os +import zipfile + +import numpy as np +import requests + +BOXOBAN_URL = "https://github.com/deepmind/boxoban-levels/archive/master.zip" + + +def boxoban_level_generator(levels_set="unfiltered", data_split="valid"): + env = Boxoban(levels_set=levels_set, data_split=data_split) + while True: + index = np.random.randint(0, env.num_levels-1) + yield env.levels[index] + + +class Boxoban(object): + """Class for loading and generatting Boxoban levels.""" + + def __init__(self, + levels_set="unfiltered", + data_split="valid"): + self._levels_set = levels_set + self._data_split = data_split + self._levels = [] + + data_file_path_local = os.path.join(os.path.dirname(__file__), + "boxoban_cache", + "{}_{}.npz".format(self._levels_set, + self._data_split)) + + data_file_path_global = os.path.join("/tmp/boxoban_cache", + "{}_{}.npz".format(self._levels_set, + self._data_split)) + + if os.path.exists(data_file_path_local): + self.levels = np.load(data_file_path_local)["levels"] + elif os.path.exists(data_file_path_global): + self.levels = np.load(data_file_path_global)["levels"] + else: + self.levels = self.get_data() + self.num_levels = len(self.levels) + + def get_data(self): + """Downloads and cache the data.""" + try: + cache_path = os.path.join( + os.path.dirname(__file__), "boxoban_cache") + os.makedirs(cache_path, exist_ok=True) + except PermissionError: + cache_path = os.path.join("/tmp/boxoban_cache") + if not os.path.exists(cache_path): + os.makedirs(cache_path, exist_ok=True) + + # Get the zip file + zip_file_path = os.path.join(cache_path, "master.zip") + if not os.path.exists(zip_file_path): + response = requests.get(BOXOBAN_URL, stream=True) + handle = open(zip_file_path, "wb") + for chunk in response.iter_content(chunk_size=512): + if chunk: + handle.write(chunk) + handle.close() + + with zipfile.ZipFile(zip_file_path, "r") as zipref: + zipref.extractall(cache_path) + + # convert to npz + path = os.path.join(cache_path, "boxoban-levels-master", + self._levels_set, + self._data_split) + files = glob.glob(path + "/*.txt") + levels = "".join([open(f, "r").read() for f in files]) + levels = levels.split("\n;") + levels = ["\n".join(item.split("\n")[1:]) for item in levels] + levels = np.asarray(levels) + data_file_path = os.path.join( + cache_path, "{}_{}.npz".format(self._levels_set, self._data_split)) + np.savez(data_file_path, levels=levels) + return levels diff --git a/physics_planning_games/mujoban/mujoban.py b/physics_planning_games/mujoban/mujoban.py new file mode 100644 index 0000000..8370f21 --- /dev/null +++ b/physics_planning_games/mujoban/mujoban.py @@ -0,0 +1,451 @@ +# Copyright 2020 DeepMind Technologies Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +"""MuJoban task. + +Mujoban is a single player puzzle-solving game embedded in the MuJoCo +simulation environment. The puzzle is based on the 2D game of Sokoban, +where an agent situated on a grid has to push boxes onto target locations. +""" + +import collections + +from dm_control import composer +from dm_control.composer.observation import observable as observable_lib +from dm_control.locomotion.arenas import labmaze_textures +from dm_control.locomotion.arenas.mazes import MazeWithTargets +from dm_env import specs +import numpy as np +from six.moves import range +from six.moves import zip + +from physics_planning_games.mujoban import mujoban_level +from physics_planning_games.mujoban.mujoban_pad import MujobanPad +from physics_planning_games.mujoban.props import BoxWithSites + +_FLOOR_GAP_CHAR = '#' +_AMBIENT_HEADLIGHT = 0.8 +_BOX_SIZE = 0.4 +_BOX_HEIGHT = 0.15 +_BOX_MASS = 2.5 +_BOX_FRICTION = [0.5, 0.005, 0.0001] + +_BOX_RGBA = [173. / 255., 179. / 255., 60. / 255., 1.] +_BOX_PRESSED_RGBA = [0, 0, 1, 1] +_TARGET_RGBA = [1.0, 0., 0., 1.] +_PRESSED_TARGET_RGBA = [0., 1., 0., 1.] + +_PEG_SIZE = 0.05 +_PEG_HEIGHT = 0.25 +_PEG_RGBA = [0.5, 0.5, 0.5, 1] +_PEG_ANGLE = np.pi / 4 + +# Aliveness in [-1., 0.]. +_ALIVE_THRESHOLD = -0.5 + +# Constants used by the full entity layer +_WALL_LAYER = 0 +_TARGET_LAYER = 1 +_SOKOBAN_LAYER = 2 +_BOX_LAYER = 3 + + +def _round_positions(boxes, walker, last_round_walker): + """Round float positions to snap objects to grid.""" + round_walker = np.round(walker).astype('int32') + round_boxes = [np.round(box).astype('int32') for box in boxes] + for box in round_boxes: + if np.array_equal(box, round_walker): + round_walker = last_round_walker + return round_boxes, round_walker + + +class Mujoban(composer.Task): + """Requires objects to be moved onto matching-colored floor pads. + + Agent only receives instantaneous rewards of +1 for the + timestep in which a box first enters a target, and -1 for the + timestep in which a box leaves the target. There is an additional reward of + +10 when all the boxes are put on targets, at which point the episode + terminates. + """ + + def __init__(self, + walker, + maze, + target_height=0, + box_prop=None, + box_size=None, + box_mass=None, + with_grid_pegs=False, + detection_tolerance=0.0, + physics_timestep=0.001, + control_timestep=0.025, + top_camera_height=128, + top_camera_width=128, + box_on_target_reward=1.0, + level_solved_reward=10.0): + """Initializes this task. + + Args: + walker: A `Walker` object. + maze: A `BaseMaze` object. + target_height: The height of the target pads above the ground, in meters. + box_prop: An optional `Primitive` prop to use as the box. + box_size: An optional three element sequence defining the half lengths of + the sides of the box. + box_mass: Box mass. If this is a list or tuple, a random value is sampled + from the truncated exponential distribution in [a, b) where a = + box_mass[0] and b = box_mass[1], with scale factor box_mass[2] * (b - + a). + with_grid_pegs: Whether to add solid pegs at the corners of the maze + grid cells. This helps to enforce the usual Sokoban rules where + diagonal movements are forbidden. + detection_tolerance: A maximum length scale (in metres) within which a + box is allowed to stick outside a target pad while still activating it. + For example, if this is set to 0.1 then a box will activate a pad if it + sticks out of the pad by no more than 10 centimetres. + physics_timestep: The time step of the physics simulation. + control_timestep: Should be an integer multiple of the physics time step. + top_camera_height: An int; the height of the top camera in the + observation. Setting this to 0 will disable the top camera. + top_camera_width: An int; the width of the top camera in the observation. + Setting this to 0 will disable the top camera. + box_on_target_reward: A float; reward for putting a box on a target. + level_solved_reward: A float: reward for solving the level. + """ + skybox_texture = labmaze_textures.SkyBox(style='sky_03') + wall_textures = labmaze_textures.WallTextures(style='style_01') + floor_textures = labmaze_textures.FloorTextures(style='style_01') + + self._detection_tolerance = detection_tolerance + self._box_prop = box_prop + self._box_on_target_reward = box_on_target_reward + self._level_solved_reward = level_solved_reward + + self._maze = maze + self._arena = MazeWithTargets( + maze=maze, + xy_scale=1, + z_height=1, + skybox_texture=skybox_texture, + wall_textures=wall_textures, + floor_textures=floor_textures) + self._walker = walker + self._arena.mjcf_model.visual.headlight.ambient = [_AMBIENT_HEADLIGHT] * 3 + self._arena.text_maze_regenerated_hook = self._regenerate_positions + self._first_step = True + + # Targets. + self._targets = [] + self._target_positions = [] + + # Boxes. + self._box_size = box_size or [_BOX_SIZE] * 2 + [_BOX_HEIGHT] + self._box_mass = box_mass or _BOX_MASS + self._boxes = [] + self._box_positions = [] + self._with_grid_pegs = with_grid_pegs + self._peg_body = None + self._last_walker_position = None + + # Create walkers and corresponding observables. + self._walker.create_root_joints(self._arena.attach(self._walker)) + enabled_observables = [self._walker.observables.sensors_touch, + self._walker.observables.orientation] + enabled_observables += self._walker.observables.proprioception + enabled_observables += self._walker.observables.kinematic_sensors + for observable in enabled_observables: + observable.enabled = True + if top_camera_width and top_camera_height: + self._arena.observables.top_camera.enabled = True + self._arena.observables.top_camera.width = top_camera_width + self._arena.observables.top_camera.height = top_camera_height + # symbolic entity repenstaion in labyrinth format. + self._entity_layer = self._maze.entity_layer + # pixel layer is same as pixel rendering of symbolic sokoban. + self._pixel_layer = np.zeros(self._entity_layer.shape + (3,), dtype='uint8') + self._full_entity_layer = np.zeros(self._entity_layer.shape + (4,), + dtype='bool') + pixel_layer_obs = observable_lib.Generic(lambda _: self._pixel_layer) + pixel_layer_obs.enabled = True + full_entity_layer_obs = observable_lib.Generic( + lambda _: self._full_entity_layer) + full_entity_layer_obs.enabled = True + self._task_observables = collections.OrderedDict({ + 'pixel_layer': pixel_layer_obs, + 'full_entity_layer': full_entity_layer_obs, + }) + # Set time steps. + self.set_timesteps( + physics_timestep=physics_timestep, control_timestep=control_timestep) + self._discount = 1. + + @property + def name(self): + return 'Mujoban' + + @property + def root_entity(self): + return self._arena + + def _regenerate_positions(self): + self._object_positions = self._arena.find_token_grid_positions( + [mujoban_level.TARGET_CHAR, mujoban_level.BOX_CHAR]) + self._box_positions = self._arena.grid_to_world_positions( + self._object_positions[mujoban_level.BOX_CHAR]) + target_grid_positions = self._object_positions[mujoban_level.TARGET_CHAR] + self._target_positions = self._arena.grid_to_world_positions( + target_grid_positions) + + for idx in range(len(self._target_positions)): + target_grid_position = target_grid_positions[idx] + grid_y, grid_x = target_grid_position + self._arena.maze.variations_layer[grid_y, grid_x] = _FLOOR_GAP_CHAR + + def initialize_episode_mjcf(self, random_state): + self._arena.regenerate() + + # Clear existing targets and boxes + for target in self._targets: + target.detach() + self._targets = [] + for box in self._boxes: + box.detach() + self._boxes = [] + self._arena.mjcf_model.contact.remove('pair') + + for _ in range(self._maze.num_targets): + target = MujobanPad( + size=self._arena.xy_scale, + height=0, + detection_tolerance=self._detection_tolerance) + self._arena.attach(target) + self._targets.append(target) + + for _ in range(self._maze.num_boxes): + box = self._box_prop + if not box: + box = BoxWithSites(half_lengths=self._box_size) + box.geom.mass = _BOX_MASS + box.geom.rgba = [0, 0, 0, 1] # Will be randomized for each episode. + frame = self._arena.attach(box) + frame.add('joint', type='slide', axis=[1, 0, 0], name='x_slider') + frame.add('joint', type='slide', axis=[0, 1, 0], name='y_slider') + frame.add('joint', type='slide', axis=[0, 0, 1], name='z_slider') + self._boxes.append(box) + for target in self._targets: + target.register_box(box) + + # Reduce the friction between box and ground. + ground_geom = self._arena.mjcf_model.find('geom', 'ground') + self._arena.mjcf_model.contact.add( + 'pair', + geom1=box.geom, + geom2=ground_geom, + condim=6, + friction=[ + _BOX_FRICTION[0], _BOX_FRICTION[0], _BOX_FRICTION[1], + _BOX_FRICTION[2], _BOX_FRICTION[2] + ]) + + # Set box masses. + for box in self._boxes: + box.geom.mass = _BOX_MASS + box.geom.rgba[:] = _BOX_RGBA + + for target in self._targets: + target.rgba[:] = _TARGET_RGBA + target.pressed_rgba[:] = _PRESSED_TARGET_RGBA + + if self._with_grid_pegs: + if self._peg_body is not None: + self._peg_body.remove() + + self._peg_body = self._arena.mjcf_model.worldbody.add('body') + for y in range(self._arena.maze.height - 1): + for x in range(self._arena.maze.width - 1): + peg_x, peg_y, _ = self._arena.grid_to_world_positions( + [[x + 0.5, y + 0.5]])[0] + self._peg_body.add( + 'geom', type='box', + size=[_PEG_SIZE / np.sqrt(2), + _PEG_SIZE / np.sqrt(2), + _PEG_HEIGHT / 2], + pos=[peg_x, peg_y, _PEG_HEIGHT / 2], + quat=[np.cos(_PEG_ANGLE / 2), 0, 0, np.sin(_PEG_ANGLE / 2)], + rgba=_PEG_RGBA) + + def initialize_episode(self, physics, random_state): + self._first_step = True + self._was_activated = [False] * len(self._targets) + self._is_solved = False + self._discount = 1. + + self._walker.reinitialize_pose(physics, random_state) + spawn_position = self._arena.spawn_positions[0] + spawn_rotation = random_state.uniform(-np.pi, np.pi) + spawn_quat = np.array( + [np.cos(spawn_rotation / 2), 0, 0, + np.sin(spawn_rotation / 2)]) + self._walker.shift_pose( + physics, [spawn_position[0], spawn_position[1], 0.0], spawn_quat) + + for box, box_xy_position in zip(self._boxes, self._box_positions): + # Position at the middle of a maze cell. + box_position = np.array( + [box_xy_position[0], box_xy_position[1], self._box_size[2]]) + + # Commit the box's final pose. + box.set_pose(physics, position=box_position, quaternion=[1., 0., 0., 0.]) + + for target, target_position in zip(self._targets, self._target_positions): + target.set_pose(physics, position=target_position) + target.reset(physics) + + self._update_entity_pixel_layers(physics) + + def before_step(self, physics, actions, random_state): + if isinstance(actions, list): + actions = np.concatenate(actions) + super(Mujoban, self).before_step(physics, actions, random_state) + if self._first_step: + self._first_step = False + else: + self._was_activated = [target.activated for target in self._targets] + + def _get_object_positions_in_grid(self, physics): + box_positions = self._arena.world_to_grid_positions( + [physics.bind(box.geom).xpos for box in self._boxes]) + walker_position = self._arena.world_to_grid_positions( + [physics.bind(self._walker.root_body).xpos])[0] + + return box_positions, walker_position + + def _update_entity_pixel_layers(self, physics): + """Updates the pixel observation and both layered representations. + + Mujoban offers 3 grid representations of the world: + * the pixel layer: this is a grid representations with an RGB value at + each grid point; + * the entity layer: this is a grid representation with a character at + each grid point. This representation hides information since if Sokoban + or a box are over a target, then the target is occluded. This is the + official entity layer used by arenas which is based on dm_control labmaze; + * the full entity layer: this is a grid represention with a boolean vector + of length 4 at each grid point. The first value is `True` iff there is a + wall at this location. The second value is `True` iff there is a target at + this location. The third value is for Sokoban, and fourth value is for + boxes. Note that this is not a one-hot encoding since Sokoban or a box + can share the same location as a target. + + Args: + physics: a Mujoco physics object. + + Raises: + RuntimeError: if a box or walker are overlapping with a wall. + """ + # The entity layer from the maze is a string that shows the maze at the + # *beginning* of the level. This is fixed throughout an episode. + entity_layer = self._maze.entity_layer.copy() + box_positions, walker_position = self._get_object_positions_in_grid(physics) + # round positions to snap to grid. + box_positions, walker_position = _round_positions( + box_positions, walker_position, self._last_walker_position) + + # setup pixel layer + map_size = entity_layer.shape + pixel_layer = np.ndarray(map_size + (3,), dtype='uint8') + pixel_layer.fill(128) + # setup full entity layer + full_entity_layer = np.zeros(map_size + (4,), dtype='bool') + # remove boxes and agent + entity_layer[entity_layer == mujoban_level.BOX_CHAR] = '.' + entity_layer[entity_layer == 'P'] = '.' + # draw empty space and goals + pixel_layer[entity_layer == '.'] = [0, 0, 0] + pixel_layer[entity_layer == 'G'] = [255, 0, 0] + full_entity_layer[:, :, _WALL_LAYER] = True + full_entity_layer[:, :, _WALL_LAYER][entity_layer == '.'] = False + full_entity_layer[:, :, _WALL_LAYER][entity_layer == 'G'] = False + full_entity_layer[:, :, _TARGET_LAYER][entity_layer == 'G'] = True + + # update boxes + for pos in box_positions: + # to ensure we are not changing the walls. + if entity_layer[pos[0], pos[1]] == '*': + raise RuntimeError('Box and wall positions are overlapping and this ', + 'should not happen. It requires investigation and ', + 'and fixing.') + # the entity layer has no representation of box on goal. + entity_layer[pos[0], pos[1]] = mujoban_level.BOX_CHAR + if np.array_equal(pixel_layer[pos[0], pos[1]], [255, 0, 0]): + pixel_layer[pos[0], pos[1]] = [0, 255, 0] # box on goal + else: + pixel_layer[pos[0], pos[1]] = [255, 255, 0] + full_entity_layer[pos[0], pos[1], _BOX_LAYER] = True + + # update player + if entity_layer[walker_position[0], walker_position[1]] == '*': + raise RuntimeError('Walker and wall positions are overlapping and this ', + 'should have not happen. It requires investigation ', + 'and fixing.') + + entity_layer[walker_position[0], walker_position[1]] = 'P' + pixel_layer[walker_position[0], walker_position[1]] = 0, 0, 255 + full_entity_layer[ + walker_position[0], walker_position[1], _SOKOBAN_LAYER] = True + + self._last_walker_position = walker_position + self._entity_layer = entity_layer + self._pixel_layer = pixel_layer + self._full_entity_layer = full_entity_layer + + def after_step(self, physics, random_state): + super(Mujoban, self).after_step(physics, random_state) + for box in self._boxes: + physics.bind(box.geom).rgba = _BOX_RGBA + for target in self._targets: + if target.activated: + target.activator.rgba = _BOX_PRESSED_RGBA + self._update_entity_pixel_layers(physics) + self._is_solved = all([target.activated for target in self._targets]) + if self._is_solved: + self._discount = 0. + + def get_reward(self, physics): + reward = 0.0 + for target, was_activated in zip(self._targets, self._was_activated): + if target.activated and not was_activated: + reward += self._box_on_target_reward + elif was_activated and not target.activated: + reward -= self._box_on_target_reward + if self._is_solved: + reward += self._level_solved_reward + return reward + + def get_discount(self, physics): + return self._discount + + def should_terminate_episode(self, physics): + is_dead = self._walker.aliveness(physics) < _ALIVE_THRESHOLD + return self._is_solved or is_dead + + def get_reward_spec(self): + return specs.ArraySpec(shape=[], dtype=np.float32) + + @property + def task_observables(self): + return self._task_observables diff --git a/physics_planning_games/mujoban/mujoban_level.py b/physics_planning_games/mujoban/mujoban_level.py new file mode 100644 index 0000000..3f6e93e --- /dev/null +++ b/physics_planning_games/mujoban/mujoban_level.py @@ -0,0 +1,140 @@ +# Copyright 2020 DeepMind Technologies Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +"""Module for generating Mujoban level. + +""" + + +import labmaze + + +BOX_CHAR = 'B' +TARGET_CHAR = labmaze.defaults.OBJECT_TOKEN + +_DEFAULT_LEVEL = """ + ##### + # # +#### # # +# . .# # +# . # +# .## $## +## #$$ # + ## $@# + ## ### + ####""" + + +# The meaning of symbols here are the same as defined in +# http://sneezingtiger.com/sokoban/levels/sasquatch5Text.html. These are the +# same symbols as used by the Sokoban community. +EMPTY_CELL = ' ' +GOAL = '.' +PLAYER = '@' +PLAYER_ON_GOAL = '+' +BOX = '$' +BOX_ON_GOAL = '*' +WALL = '#' +_SOKOBAN_SYMBOLS = [ + EMPTY_CELL, GOAL, PLAYER, PLAYER_ON_GOAL, BOX, BOX_ON_GOAL, WALL +] + + +def single_level_generator(level=_DEFAULT_LEVEL): + while True: + yield level + + +def _ascii_to_text_grid_level(ascii_level): + """Goes from official Sokoban ASCII art to string understood by Mujoban. + + Args: + ascii_level: a multiline string; each character is a location in a + gridworld. + + Returns: + A string. + """ + level = ascii_level + if level.startswith('\n'): + level = level[1:] + level = level.replace('$', BOX_CHAR) + level = level.replace('.', TARGET_CHAR) + level = level.replace(' ', '.') + level = level.replace('#', '*') + level = level.replace('@', 'P') + if level[-1] == '\n': + level = level[:-1] + # Pad + all_rows = level.split('\n') + width = max(len(row) for row in all_rows) + padded_rows = [] + for row in all_rows: + row += '*' * (width - len(row)) + padded_rows.append(row) + level = '\n'.join(padded_rows) + return level + '\n' + + +class MujobanLevel(labmaze.BaseMaze): + """A maze that represents a level in Mujoban.""" + + def __init__(self, ascii_level_generator=single_level_generator): + """Constructor. + + Args: + ascii_level_generator: a Python generator. At each iteration, this should + return a string representing a level. The symbols in the string should be + those of http://sneezingtiger.com/sokoban/levels/sasquatch5Text.html. + These are the same symbols as used by the Sokoban community. + """ + self._level_iterator = ascii_level_generator() + self.regenerate() + + def regenerate(self): + """Regenerates the maze if required.""" + level = next(self._level_iterator) + self._entity_layer = labmaze.TextGrid(_ascii_to_text_grid_level(level)) + self._variation_layer = self._entity_layer.copy() + self._variation_layer[:] = '.' + self._num_boxes = (self._entity_layer == BOX_CHAR).sum() + num_targets = (self._entity_layer == TARGET_CHAR).sum() + if num_targets != self._num_boxes: + raise ValueError('Number of targets {} should equal number of boxes {}.' + .format(num_targets, self._num_boxes)) + + @property + def num_boxes(self): + return self._num_boxes + + @property + def num_targets(self): + return self._num_boxes + + @property + def entity_layer(self): + return self._entity_layer + + @property + def variations_layer(self): + return self._variation_layer + + @property + def height(self): + return self._entity_layer.shape[0] + + @property + def width(self): + return self._entity_layer.shape[1] diff --git a/physics_planning_games/mujoban/mujoban_level_test.py b/physics_planning_games/mujoban/mujoban_level_test.py new file mode 100644 index 0000000..842586b --- /dev/null +++ b/physics_planning_games/mujoban/mujoban_level_test.py @@ -0,0 +1,53 @@ +# Copyright 2020 DeepMind Technologies Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +"""Tests for mujoban_level.""" + + +from absl.testing import absltest + +from physics_planning_games.mujoban import mujoban_level + + +_LEVEL = """ +##### +# @#### +# $. # +###$.# # +# $.# # +# #$. # +# ### +######""" + +_GRID_LEVEL = """******** +*..P**** +*..BG..* +***BG*.* +*..BG*.* +*.*BG..* +*....*** +******** +""" + + +class MujobanLevelTest(absltest.TestCase): + + def test_ascii_to_text_grid_level(self): + grid_level = mujoban_level._ascii_to_text_grid_level(_LEVEL) + self.assertEqual(_GRID_LEVEL, grid_level) + + +if __name__ == '__main__': + absltest.main() diff --git a/physics_planning_games/mujoban/mujoban_pad.py b/physics_planning_games/mujoban/mujoban_pad.py new file mode 100644 index 0000000..c637972 --- /dev/null +++ b/physics_planning_games/mujoban/mujoban_pad.py @@ -0,0 +1,126 @@ +# Copyright 2020 DeepMind Technologies Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +"""A floor pad that is activated through touch.""" + +import weakref +from dm_control import composer +from dm_control import mjcf +import numpy as np + + +def _get_activator_box(pad_xpos, pad_size, boxes, tolerance=0.0): + """Returns the activator box, if any. Otherwise returns None.""" + # Ignore the height + pad_min = pad_xpos[0:2] - pad_size[0:2] + pad_max = pad_xpos[0:2] + pad_size[0:2] + for box in boxes: + box_xpos = np.array(box.xpos[0:2]) + box_size = np.array(box.size[0:2]) + + min_ = pad_min + box_size - tolerance + max_ = pad_max - box_size + tolerance + in_range = np.logical_and(box_xpos >= min_, box_xpos <= max_).all() + if in_range: + return box + # No activator box was found + return None + + +class MujobanPad(composer.Entity): + """A less sensitive floor pad for Mujoban.""" + + def _build(self, rgba=None, pressed_rgba=None, + size=1, height=0.02, detection_tolerance=0.0, name='mujoban_pad'): + rgba = tuple(rgba or (1, 0, 0, 1)) + pressed_rgba = tuple(pressed_rgba or (0.2, 0, 0, 1)) + self._mjcf_root = mjcf.RootElement(model=name) + self._site = self._mjcf_root.worldbody.add( + 'site', type='box', name='site', + pos=[0, 0, (height / 2 or -0.001)], + size=[size / 2, size / 2, (height / 2 or 0.001)], rgba=rgba) + self._activated = False + self._rgba = np.array(rgba, dtype=np.float) + self._pressed_rgba = np.array(pressed_rgba, dtype=np.float) + self._activator = None + self._detection_tolerance = detection_tolerance + self._boxes = [] + + @property + def rgba(self): + return self._rgba + + @property + def pressed_rgba(self): + return self._pressed_rgba + + def register_box(self, box_entity): + self._boxes.append(weakref.proxy(box_entity)) + + @property + def site(self): + return self._site + + @property + def boxes(self): + return self._boxes + + @property + def activator(self): + return self._activator if self._activated else None + + @property + def mjcf_model(self): + return self._mjcf_root + + def initialize_episode_mjcf(self, unused_random_state): + self._activated = False + + def initialize_episode(self, physics, unused_random_state): + self._update_activation(physics) + + def _update_activation(self, physics): + # Note: we get the physically bound box, not an object from self._boxes. + # That's because the generator expression below generates bound objects. + box = _get_activator_box( + pad_xpos=np.array(physics.bind(self._site).xpos), + pad_size=np.array(physics.bind(self._site).size), + boxes=(physics.bind(box.geom) for box in self._boxes), + tolerance=self._detection_tolerance,) + if box: + self._activated = True + self._activator = box + else: + self._activated = False + self._activator = None + if self._activated: + physics.bind(self._site).rgba = self._pressed_rgba + else: + physics.bind(self._site).rgba = self._rgba + + def before_step(self, physics, unused_random_state): + self._update_activation(physics) + + def after_substep(self, physics, unused_random_state): + self._update_activation(physics) + + @property + def activated(self): + """Whether this floor pad is pressed at the moment.""" + return self._activated + + def reset(self, physics): + self._activated = False + physics.bind(self._site).rgba = self._rgba diff --git a/physics_planning_games/mujoban/mujoban_test.py b/physics_planning_games/mujoban/mujoban_test.py new file mode 100644 index 0000000..5809c28 --- /dev/null +++ b/physics_planning_games/mujoban/mujoban_test.py @@ -0,0 +1,75 @@ +# Copyright 2020 DeepMind Technologies Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +"""Tests for Mujoban.""" + + +from absl.testing import absltest +from dm_control import composer +from dm_control.locomotion import walkers +import dm_env as environment +import numpy as np + +from physics_planning_games.mujoban.mujoban import Mujoban +from physics_planning_games.mujoban.mujoban_level import MujobanLevel + + +TIME_LIMIT = 5 +CONTROL_TIMESTEP = .1 + + +class MujobanTest(absltest.TestCase): + + def test(self): + walker = walkers.JumpingBallWithHead(add_ears=True, camera_height=0.25) + arena = MujobanLevel() + task = Mujoban( + walker=walker, + maze=arena, + control_timestep=CONTROL_TIMESTEP, + top_camera_height=64, + top_camera_width=48) + env = composer.Environment( + time_limit=TIME_LIMIT, + task=task, + strip_singleton_obs_buffer_dim=True) + time_step = env.reset() + self.assertEqual( + set([ + 'pixel_layer', 'full_entity_layer', 'top_camera', + 'walker/body_height', 'walker/end_effectors_pos', + 'walker/joints_pos', 'walker/joints_vel', + 'walker/sensors_accelerometer', 'walker/sensors_gyro', + 'walker/sensors_touch', 'walker/sensors_velocimeter', + 'walker/world_zaxis', 'walker/orientation', + ]), set(time_step.observation.keys())) + top_camera = time_step.observation['top_camera'] + self.assertEqual(np.uint8, top_camera.dtype) + self.assertEqual((64, 48, 3), top_camera.shape) + all_step_types = [] + # Run enough actions that we are guaranteed to have restarted the + # episode at least once. + for _ in range(int(2*TIME_LIMIT/CONTROL_TIMESTEP)): + action = 2*np.random.random(env.action_spec().shape) - 1 + time_step = env.step(action) + all_step_types.append(time_step.step_type) + self.assertEqual(set([environment.StepType.FIRST, + environment.StepType.MID, + environment.StepType.LAST]), + set(all_step_types)) + + +if __name__ == '__main__': + absltest.main() diff --git a/physics_planning_games/mujoban/props.py b/physics_planning_games/mujoban/props.py new file mode 100644 index 0000000..09008e3 --- /dev/null +++ b/physics_planning_games/mujoban/props.py @@ -0,0 +1,62 @@ +# Copyright 2020 DeepMind Technologies Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +"""Box props used in Mujoban that the agent pushes. +""" + +import itertools + +from dm_control import composer +from dm_control.entities import props + + +class Box(props.Primitive): + """A class representing a box prop.""" + + def _build(self, half_lengths=None, mass=None, name='box'): + half_lengths = half_lengths or [0.05, 0.1, 0.15] + super(Box, self)._build(geom_type='box', + size=half_lengths, + mass=mass, + name=name) + + +class BoxWithSites(Box): + """A class representing a box prop with sites on the corners.""" + + def _build(self, half_lengths=None, mass=None, name='box'): + half_lengths = half_lengths or [0.05, 0.1, 0.15] + super(BoxWithSites, self)._build(half_lengths=half_lengths, mass=mass, + name=name) + + corner_positions = itertools.product([half_lengths[0], -half_lengths[0]], + [half_lengths[1], -half_lengths[1]], + [half_lengths[2], -half_lengths[2]]) + corner_sites = [] + for i, corner_pos in enumerate(corner_positions): + corner_sites.append( + self.mjcf_model.worldbody.add( + 'site', + type='sphere', + name='corner_{}'.format(i), + size=[0.1], + pos=corner_pos, + rgba=[1, 0, 0, 1.0], + group=composer.SENSOR_SITES_GROUP)) + self._corner_sites = tuple(corner_sites) + + @property + def corner_sites(self): + return self._corner_sites diff --git a/physics_planning_games/requirements.txt b/physics_planning_games/requirements.txt new file mode 100644 index 0000000..3a20edc --- /dev/null +++ b/physics_planning_games/requirements.txt @@ -0,0 +1,6 @@ +absl-py == 0.9.0 +dm-control +dm-env +labmaze +numpy == 1.19.1 +requests == 2.24.0