Export typing annotations when available.

PiperOrigin-RevId: 328527159
This commit is contained in:
Florent Altché
2020-08-26 15:22:58 +01:00
committed by Saran Tunyasuvunakool
parent 85187de3dc
commit 7e7255eed1
29 changed files with 3846 additions and 0 deletions
+108
View File
@@ -0,0 +1,108 @@
# Physically Embedded Planning Environments
This repository contains the three environments introduced in
'Physically Embedded Planning Problems: New Challenges for Reinforcement
Learning'
If you use this package, please cite our accompanying [tech report]:
```
@misc{,
title={Physically Embedded Planning Problems: New Challenges for
Reinforcement Learning},
author={Mehdi Mirza, Andrew Jaegle, Jonathan J. Hunt, Arthur Guez,
Saran Tunyasuvunakool, Alistair Muldal, Théophane Weber,
Peter Karkus, Sébastien Racanière, Lars Buesing,
Timothy Lillicrap, Nicolas Heess},
year={2020},
eprint={},
archivePrefix={arXiv},
primaryClass={cs.RO}
}
```
## Requirements and Installation
This repository is divided into 'mujoban' and 'board_games' folders.
Both of them are built on top of [dm_control] which requires MuJoCo. Please
follow [these] instructions to install MuJoCo.
Other dependencies can be installed
by:
```
pip3 install -r requirements.txt
```
### Board games
The game logic is based on [open_spiel]. Please install as instructed [here].
[gnugo] is required to play the game of Go against a non-random opponent. [gnugo] can be installed in Ubuntu by:
```
apt install gnugo
```
. Board game scripts except gnugo binary to be at: `/usr/games/gnugo`
## Example usage
The code snippets below show examples of instantiating each of the environments.
### Mujoban
```python
from dm_control import composer
from dm_control.locomotion import walkers
from physics_planning_games.mujoban.mujoban import Mujoban
from physics_planning_games.mujoban.mujoban_level import MujobanLevel
from physics_planning_games.mujoban.boxoban import boxoban_level_generator
walker = walkers.JumpingBallWithHead(add_ears=True, camera_height=0.25)
maze = MujobanLevel(boxoban_level_generator)
task = Mujoban(walker=walker,
maze=maze,
control_timestep=0.1,
top_camera_height=96,
top_camera_width=96)
env = composer.Environment(time_limit=1000, task=task)
```
### Board games
```python
from physics_planning_games import board_games
environment_name = 'go_7x7'
env = board_games.load(environment_name=environment_name)
```
### Stepping through environment.
The returned environments are of type of `dm_env.Environment` and can be stepped
through as shown here with random actions:
```python
import numpy as np
timestep = env.reset()
action_spec = env.action_spec()
while True:
action = np.stack([
np.random.uniform(low=minimum, high=maximum)
for minimum, maximum in zip(action_spec.minimum, action_spec.maximum)
])
timestep = env.step(action)
```
### Visualization
For visualization of the environments `explore.py` loads them using the [viewer]
from [dm_control].
## More details
For more details please refer to the [tech report], [dm_control] and [dm_env].
[tech report]: https://arxiv.org/abs/
[dm_control]: https://github.com/deepmind/dm_control
[dm_env]: https://github.com/deepmind/dm_env
[gnugo]: https://www.gnu.org/software/gnugo/
[open_spiel]: https://github.com/deepmind/open_spiel
[here]: https://github.com/deepmind/open_spiel/blob/master/docs/install.md
[these]: https://github.com/deepmind/dm_control#requirements-and-installation
[viewer]: https://github.com/deepmind/dm_control/tree/master/dm_control/viewer
@@ -0,0 +1,73 @@
# Copyright 2020 DeepMind Technologies Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Physically-grounded board game environments."""
from dm_control import composer as _composer
from physics_planning_games.board_games import go as _go
from physics_planning_games.board_games import tic_tac_toe as _tic_tac_toe
from physics_planning_games.board_games._internal import registry as _registry
_registry.done_importing_tasks()
ALL = tuple(_registry.get_all_names())
TAGS = tuple(_registry.get_tags())
def get_environments_by_tag(tag):
"""Returns the names of all environments matching a given tag.
Args:
tag: A string from `TAGS`.
Returns:
A tuple of environment names.
"""
return tuple(_registry.get_names_by_tag(tag))
def load(environment_name,
env_kwargs=None,
seed=None,
time_limit=float('inf'),
strip_singleton_obs_buffer_dim=False):
"""Loads an environment from board_games.
Args:
environment_name: String, the name of the environment to load. Must be in
`ALL`.
env_kwargs: extra params to pass to task creation.
seed: Optional, either an int seed or an `np.random.RandomState`
object. If None (default), the random number generator will self-seed
from a platform-dependent source of entropy.
time_limit: (optional) A float, the time limit in seconds beyond which an
episode is forced to terminate.
strip_singleton_obs_buffer_dim: (optional) A boolean, if `True`,
the array shape of observations with `buffer_size == 1` will not have a
leading buffer dimension.
Returns:
An instance of `composer.Environment`.
"""
if env_kwargs is not None:
task = _registry.get_constructor(environment_name)(**env_kwargs)
else:
task = _registry.get_constructor(environment_name)()
return _composer.Environment(
task=task,
time_limit=time_limit,
strip_singleton_obs_buffer_dim=strip_singleton_obs_buffer_dim,
random_state=seed)
@@ -0,0 +1,164 @@
# Copyright 2020 DeepMind Technologies Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Board game-specific arena classes."""
from dm_control import composer
from dm_control.composer.observation import observable
from dm_control.mujoco import wrapper
# Robot geoms will be assigned to this group in order to disable their
# visibility to the top-down camera.
ROBOT_GEOM_GROUP = 1
class Standard(composer.Arena):
""""Board game-specific arena class."""
def _build(self, name=None):
"""Initializes this arena.
Args:
name: (optional) A string, the name of this arena. If `None`, use the
model name defined in the MJCF file.
"""
super(Standard, self)._build(name=name)
# Add visual assets.
self.mjcf_model.asset.add(
'texture',
type='skybox',
builtin='gradient',
rgb1=(0.4, 0.6, 0.8),
rgb2=(0., 0., 0.),
width=100,
height=100)
groundplane_texture = self.mjcf_model.asset.add(
'texture',
name='groundplane',
type='2d',
builtin='checker',
rgb1=(0.2, 0.3, 0.4),
rgb2=(0.1, 0.2, 0.3),
width=300,
height=300,
mark='edge',
markrgb=(.8, .8, .8))
groundplane_material = self.mjcf_model.asset.add(
'material',
name='groundplane',
texture=groundplane_texture,
texrepeat=(5, 5),
texuniform='true',
reflectance=0.2)
# Add ground plane.
self.mjcf_model.worldbody.add(
'geom',
name='ground',
type='plane',
material=groundplane_material,
size=(1, 1, 0.1),
friction=(0.4,),
solimp=(0.95, 0.99, 0.001),
solref=(0.002, 1))
# Add lighting
self.mjcf_model.worldbody.add(
'light',
pos=(0, 0, 1.5),
dir=(0, 0, -1),
diffuse=(0.7, 0.7, 0.7),
specular=(.3, .3, .3),
directional='false',
castshadow='true')
# Add some fixed cameras to the arena.
self._front_camera = self.mjcf_model.worldbody.add(
'camera',
name='front',
pos=(0., -0.6, 0.75),
xyaxes=(1., 0., 0., 0., 0.7, 0.75))
# Ensures a 7x7 go board fits into the view from camera
self._front_camera_2 = self.mjcf_model.worldbody.add(
'camera',
name='front_2',
pos=(0., -0.65, 0.85),
xyaxes=(1., 0., 0., 0., 0.85, 0.6))
self._top_down_camera = self.mjcf_model.worldbody.add(
'camera',
name='top_down',
pos=(0., 0., 0.5),
xyaxes=(1., 0., 0., 0., 1., 0.))
# Always initialize the free camera so that it points at the origin.
self.mjcf_model.statistic.center = (0., 0., 0.)
def _build_observables(self):
return ArenaObservables(self)
@property
def front_camera(self):
return self._front_camera
@property
def front_camera_2(self):
return self._front_camera_2
@property
def top_down_camera(self):
return self._top_down_camera
def attach_offset(self, entity, offset, attach_site=None):
"""Attaches another entity at a position offset from the attachment site.
Args:
entity: The `Entity` to attach.
offset: A length 3 array-like object representing the XYZ offset.
attach_site: (optional) The site to which to attach the entity's model.
If not set, defaults to self.attachment_site.
Returns:
The frame of the attached model.
"""
frame = self.attach(entity, attach_site=attach_site)
frame.pos = offset
return frame
class ArenaObservables(composer.Observables):
"""Observables belonging to the arena."""
@composer.observable
def front_camera(self):
return observable.MJCFCamera(mjcf_element=self._entity.front_camera)
@composer.observable
def front_camera_2(self):
return observable.MJCFCamera(mjcf_element=self._entity.front_camera_2)
@composer.observable
def top_down_camera(self):
return observable.MJCFCamera(mjcf_element=self._entity.top_down_camera)
@composer.observable
def top_down_camera_invisible_robot(self):
# Custom scene options for making robot geoms invisible.
robot_geoms_invisible = wrapper.MjvOption()
robot_geoms_invisible.geomgroup[ROBOT_GEOM_GROUP] = 0
return observable.MJCFCamera(mjcf_element=self._entity.top_down_camera,
scene_option=robot_geoms_invisible)
@@ -0,0 +1,295 @@
# Copyright 2020 DeepMind Technologies Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Composer entities corresponding to game boards."""
import os
from dm_control import composer
from dm_control import mjcf
import numpy as np
from dm_control.utils import io as resources
_TOUCH_THRESHOLD = 1e-3 # Activation threshold for touch sensors (N).
# whether to display underlying sensors for Goboard (useful to align texture)
_SHOW_DEBUG_GRID = False
_TEXTURE_PATH = os.path.join(os.path.dirname(__file__), 'goboard_7x7.png')
def _make_checkerboard(rows,
columns,
square_halfwidth,
height=0.01,
sensor_size=0.7,
name='checkerboard'):
"""Builds a checkerboard with touch sensors centered on each square."""
root = mjcf.RootElement(model=name)
black_mat = root.asset.add('material', name='black', rgba=(0.2, 0.2, 0.2, 1))
white_mat = root.asset.add('material', name='white', rgba=(0.8, 0.8, 0.8, 1))
sensor_mat = root.asset.add('material', name='sensor', rgba=(0, 1, 0, 0.3))
root.default.geom.set_attributes(
type='box', size=(square_halfwidth, square_halfwidth, height))
root.default.site.set_attributes(
type='box',
size=(sensor_size * square_halfwidth,) * 2 + (0.5 * height,),
material=sensor_mat, group=composer.SENSOR_SITES_GROUP)
xpos = (np.arange(columns) - 0.5*(columns - 1)) * 2 * square_halfwidth
ypos = (np.arange(rows) - 0.5*(rows - 1)) * 2 * square_halfwidth
geoms = []
touch_sensors = []
for i in range(rows):
for j in range(columns):
geom_mat = black_mat if ((i % 2) == (j % 2)) else white_mat
name = '{}_{}'.format(i, j)
geoms.append(
root.worldbody.add(
'geom',
pos=(xpos[j], ypos[i], height),
name=name,
material=geom_mat))
site = root.worldbody.add('site', pos=(xpos[j], ypos[i], 2*height),
name=name)
touch_sensors.append(root.sensor.add('touch', site=site, name=name))
return root, geoms, touch_sensors
def _make_goboard(boardsize,
square_halfwidth,
height=0.01,
sensor_size=0.7,
name='goboard'):
"""Builds a Go with touch sensors centered on each intersection."""
y_offset = -0.08
rows = boardsize
columns = boardsize
root = mjcf.RootElement(model=name)
if _SHOW_DEBUG_GRID:
black_mat = root.asset.add('material', name='black',
rgba=(0.2, 0.2, 0.2, 0.5))
white_mat = root.asset.add('material', name='white',
rgba=(0.8, 0.8, 0.8, 0.5))
else:
transparent_mat = root.asset.add('material', name='intersection',
rgba=(0, 1, 0, 0.0))
sensor_mat = root.asset.add('material', name='sensor', rgba=(0, 1, 0, 0.3))
contents = resources.GetResource(_TEXTURE_PATH)
root.asset.add('texture', name='goboard', type='2d',
file=mjcf.Asset(contents, '.png'))
board_mat = root.asset.add(
'material', name='goboard', texture='goboard',
texrepeat=[0.97, 0.97])
root.default.geom.set_attributes(
type='box', size=(square_halfwidth, square_halfwidth, height))
root.default.site.set_attributes(
type='box',
size=(sensor_size * square_halfwidth,) * 2 + (0.5 * height,),
material=sensor_mat, group=composer.SENSOR_SITES_GROUP)
board_height = height
if _SHOW_DEBUG_GRID:
board_height = 0.5*height
root.worldbody.add(
'geom',
pos=(0, 0+y_offset, height),
type='box',
size=(square_halfwidth * boardsize,) * 2 + (board_height,),
name=name,
material=board_mat)
xpos = (np.arange(columns) - 0.5*(columns - 1)) * 2 * square_halfwidth
ypos = (np.arange(rows) - 0.5*(rows - 1)) * 2 * square_halfwidth + y_offset
geoms = []
touch_sensors = []
for i in range(rows):
for j in range(columns):
name = '{}_{}'.format(i, j)
if _SHOW_DEBUG_GRID:
transparent_mat = black_mat if ((i % 2) == (j % 2)) else white_mat
geoms.append(
root.worldbody.add(
'geom',
pos=(xpos[j], ypos[i], height),
name=name,
material=transparent_mat))
site = root.worldbody.add('site', pos=(xpos[j], ypos[i], 2*height),
name=name)
touch_sensors.append(root.sensor.add('touch', site=site, name=name))
pass_geom = root.worldbody.add(
'geom',
pos=(0, y_offset, 0.0),
size=(square_halfwidth*boardsize*2,
square_halfwidth*boardsize) + (0.5 * height,),
name='pass',
material=transparent_mat)
site = root.worldbody.add('site', pos=(0, y_offset, 0.0),
size=(square_halfwidth*boardsize*2,
square_halfwidth*boardsize) + (0.5 * height,),
name='pass')
pass_sensor = root.sensor.add('touch', site=site, name='pass')
return root, geoms, touch_sensors, pass_geom, pass_sensor
class CheckerBoard(composer.Entity):
"""An entity representing a checkerboard."""
def __init__(self, *args, **kwargs):
super(CheckerBoard, self).__init__(*args, **kwargs)
self._contact_from_before_substep = None
def _build(self, rows=3, columns=3, square_halfwidth=0.05):
"""Builds a `CheckerBoard` entity.
Args:
rows: Integer, the number of rows.
columns: Integer, the number of columns.
square_halfwidth: Float, the halfwidth of the squares on the board.
"""
root, geoms, touch_sensors = _make_checkerboard(
rows=rows, columns=columns, square_halfwidth=square_halfwidth)
self._mjcf_model = root
self._geoms = np.array(geoms).reshape(rows, columns)
self._touch_sensors = np.array(touch_sensors).reshape(rows, columns)
@property
def mjcf_model(self):
return self._mjcf_model
def before_substep(self, physics, random_state):
del random_state # Unused.
# Cache a copy of the array of active contacts before each substep.
self._contact_from_before_substep = physics.data.contact.copy()
def validate_finger_touch(self, physics, row, col, hand):
# Geom for the board square
geom_id = physics.bind(self._geoms[row, col]).element_id
# finger geoms
finger_geoms_ids = set(physics.bind(hand.finger_geoms).element_id)
contacts = self._contact_from_before_substep
set1, set2 = set([geom_id]), finger_geoms_ids
for contact in contacts:
finger_tile_contact = ((contact.geom1 in set1 and
contact.geom2 in set2) or
(contact.geom1 in set2 and contact.geom2 in set1))
if finger_tile_contact:
return True
return False
def get_contact_pos(self, physics, row, col):
geom_id = physics.bind(self._geoms[row, col]).element_id
# Here we use the array of active contacts from the previous substep, rather
# than the current values in `physics.data.contact`. This is because we use
# touch sensors to detect when a square on the board is being pressed, and
# the pressure readings are based on forces that were calculated at the end
# of the previous substep. It's possible that `physics.data.contact` no
# longer contains any active contacts involving the board geoms, even though
# the touch sensors are telling us that one of the squares on the board is
# being pressed.
contact = self._contact_from_before_substep
involves_geom = (contact.geom1 == geom_id) | (contact.geom2 == geom_id)
[relevant_contact_ids] = np.where(involves_geom)
if relevant_contact_ids.size:
# If there are multiple contacts involving this square of the board, just
# pick the first one.
return contact[relevant_contact_ids[0]].pos.copy()
else:
print("Touch sensor at ({},{}) doesn't have any active contacts!".format(
row, col))
return False
def get_contact_indices(self, physics):
pressures = physics.bind(self._touch_sensors.ravel()).sensordata
# If any of the touch sensors exceed the threshold, return the (row, col)
# indices of the most strongly activated sensor.
if np.any(pressures > _TOUCH_THRESHOLD):
return np.unravel_index(np.argmax(pressures), self._touch_sensors.shape)
else:
return None
def sample_pos_inside_touch_sensor(self, physics, random_state, row, col):
bound_site = physics.bind(self._touch_sensors[row, col].site)
jitter = bound_site.size * np.array([1., 1., 0.])
return bound_site.xpos + random_state.uniform(-jitter, jitter)
class GoBoard(CheckerBoard):
"""An entity representing a Goboard."""
def _build(self, boardsize=7, square_halfwidth=0.05):
"""Builds a `GoBoard` entity.
Args:
boardsize: Integer, the size of the board (boardsize x boardsize).
square_halfwidth: Float, the halfwidth of the squares on the board.
"""
if boardsize != 7:
raise ValueError('Only boardsize of 7x7 is implemented at the moment')
root, geoms, touch_sensors, pass_geom, pass_sensor = _make_goboard(
boardsize=boardsize, square_halfwidth=square_halfwidth)
self._mjcf_model = root
self._geoms = np.array(geoms).reshape(boardsize, boardsize)
self._touch_sensors = np.array(touch_sensors).reshape(boardsize, boardsize)
self._pass_geom = pass_geom
self._pass_sensor = pass_sensor
def get_contact_indices(self, physics):
pressures = physics.bind(self._touch_sensors.ravel()).sensordata
# Deal with pass first
pass_pressure = physics.bind(self._pass_sensor).sensordata
if pass_pressure > np.max(pressures) and pass_pressure > _TOUCH_THRESHOLD:
return -1, -1
# If any of the other touch sensors exceed the threshold, return the
# (row, col) indices of the most strongly activated sensor.
if np.any(pressures > _TOUCH_THRESHOLD):
return np.unravel_index(np.argmax(pressures), self._touch_sensors.shape)
else:
return None
def validate_finger_touch(self, physics, row, col, hand):
# Geom for the board square
if row == -1 and col == -1:
geom_id = physics.bind(self._pass_geom).element_id
else:
geom_id = physics.bind(self._geoms[row, col]).element_id
# finger geoms
finger_geoms_ids = set(physics.bind(hand.finger_geoms).element_id)
contacts = self._contact_from_before_substep
set1, set2 = set([geom_id]), finger_geoms_ids
for contact in contacts:
finger_tile_contact = ((contact.geom1 in set1 and
contact.geom2 in set2) or
(contact.geom1 in set2 and contact.geom2 in set1))
if finger_tile_contact:
return True
return False
def sample_pos_inside_touch_sensor(self, physics, random_state, row, col):
bound_site = physics.bind(self._touch_sensors[row, col].site)
jitter = bound_site.size * np.array([0.25, 0.25, 0.])
return bound_site.xpos + random_state.uniform(-jitter, jitter)
Binary file not shown.

After

Width:  |  Height:  |  Size: 6.9 KiB

@@ -0,0 +1,118 @@
# Copyright 2020 DeepMind Technologies Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Configuration for observations."""
import collections
import numpy as np
class ObservableSpec(collections.namedtuple(
'ObservableSpec',
['enabled', 'update_interval', 'buffer_size', 'delay', 'aggregator',
'corruptor'])):
"""Configuration options for generic observables."""
__slots__ = ()
class CameraObservableSpec(collections.namedtuple(
'CameraObservableSpec', ('height', 'width') + ObservableSpec._fields)):
"""Configuration options for camera observables."""
__slots__ = ()
class ObservationSettings(collections.namedtuple(
'ObservationSettings',
['proprio', 'ftt', 'prop_pose', 'board_state', 'camera'])):
"""Container of `ObservableSpecs` grouped by category."""
__slots__ = ()
class ObservableNames(collections.namedtuple(
'ObservableNames',
['proprio', 'ftt', 'prop_pose', 'board_state', 'camera'])):
"""Container that groups the names of observables by category."""
__slots__ = ()
def __new__(cls, proprio=(), ftt=(), prop_pose=(), board_state=(), camera=()):
return super(ObservableNames, cls).__new__(
cls,
proprio=proprio,
ftt=ftt,
prop_pose=prop_pose,
board_state=board_state,
camera=camera)
# Global defaults for "feature" observables (i.e. anything that isn't a camera).
_DISABLED_FEATURE = ObservableSpec(
enabled=False,
update_interval=1,
buffer_size=1,
delay=0,
aggregator=None,
corruptor=None)
_ENABLED_FEATURE = _DISABLED_FEATURE._replace(enabled=True)
# Force, torque and touch-sensor readings are scaled using a symmetric
# logarithmic transformation that handles 0 and negative values.
_symlog1p = lambda x, random_state: np.sign(x) * np.log1p(abs(x))
_DISABLED_FTT = _DISABLED_FEATURE._replace(corruptor=_symlog1p)
_ENABLED_FTT = _ENABLED_FEATURE._replace(corruptor=_symlog1p)
# Global defaults for camera observables.
_DISABLED_CAMERA = CameraObservableSpec(
height=84,
width=84,
enabled=False,
update_interval=1,
buffer_size=1,
delay=0,
aggregator=None,
corruptor=None)
_ENABLED_CAMERA = _DISABLED_CAMERA._replace(enabled=True)
# Predefined sets of configurations options to apply to each category of
# observable.
PERFECT_FEATURES = ObservationSettings(
proprio=_ENABLED_FEATURE,
ftt=_ENABLED_FTT,
prop_pose=_ENABLED_FEATURE,
board_state=_ENABLED_FEATURE,
camera=_ENABLED_CAMERA)
ARENA_OBSERVABLES = ObservableNames(camera=['front_camera', 'front_camera_2'])
JACO_ARM_OBSERVABLES = ObservableNames(
proprio=['joints_pos', 'joints_vel'], ftt=['joints_torque'])
JACO_HAND_OBSERVABLES = ObservableNames(
proprio=['joints_pos', 'joints_vel', 'pinch_site_pos', 'pinch_site_rmat'])
MARKER_OBSERVABLES = ObservableNames(prop_pose=['position'])
def make_options(obs_settings, obs_names):
"""Constructs a dict of configuration options for a set of named observables.
Args:
obs_settings: An `ObservationSettings` instance.
obs_names: An `ObservableNames` instance.
Returns:
A nested dict containing `{observable_name: {option_name: value}}`.
"""
observable_options = {}
for category, spec in obs_settings._asdict().items():
for observable_name in getattr(obs_names, category):
observable_options[observable_name] = spec._asdict()
return observable_options
@@ -0,0 +1,169 @@
# Copyright 2020 DeepMind Technologies Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Entities representing board game pieces."""
import itertools
from dm_control import composer
from dm_control import mjcf
from dm_control.composer.observation import observable
import numpy as np
_VISIBLE_SITE_GROUP = 0
_INVISIBLE_SITE_GROUP = 3
_RED = (1., 0., 0., 0.5)
_BLUE = (0., 0, 1., 0.5)
_INVALID_PLAYER_ID = '`player_id` must be between 0 and {}, got {}.'
_NO_MORE_MARKERS_AVAILABLE = (
'All {} markers for player {} have already been placed.')
class Markers(composer.Entity):
"""A collection of non-physical entities for marking board positions."""
def _build(self,
num_per_player,
player_colors=(_RED, _BLUE),
halfwidth=0.025,
height=0.01,
board_size=7):
"""Builds a `Markers` entity.
Args:
num_per_player: Integer, the total number of markers to create per player.
player_colors: Sequence of (R, G, B, A) values specifying the marker
colors for each player.
halfwidth: Scalar, the halfwidth of each marker.
height: Scalar, height of each marker.
board_size: Integer, optional if using the integer indexing.
"""
root = mjcf.RootElement(model='markers')
root.default.site.set_attributes(type='cylinder', size=(halfwidth, height))
all_markers = []
for i, color in enumerate(player_colors):
player_name = 'player_{}'.format(i)
# TODO(alimuldal): Would look cool if these were textured.
material = root.asset.add('material', name=player_name, rgba=color)
player_markers = []
for j in range(num_per_player):
player_markers.append(
root.worldbody.add(
'site',
name='player_{}_move_{}'.format(i, j),
material=material))
all_markers.append(player_markers)
self._num_players = len(player_colors)
self._mjcf_model = root
self._all_markers = all_markers
self._move_counts = [0] * self._num_players
# To go from integer position to marker index in the all_markers array
self._marker_ids = np.zeros((2, board_size, board_size))
self._board_size = board_size
def _build_observables(self):
return MarkersObservables(self)
@property
def mjcf_model(self):
"""`mjcf.RootElement` for this entity."""
return self._mjcf_model
@property
def markers(self):
"""Marker sites belonging to all players.
Returns:
A nested list, where `markers[i][j]` contains the `mjcf.Element`
corresponding to player i's jth marker.
"""
return self._all_markers
def initialize_episode(self, physics, random_state):
"""Resets the markers at the start of an episode."""
del random_state # Unused.
self._reset(physics)
def _reset(self, physics):
for player_markers in self._all_markers:
for marker in player_markers:
bound_marker = physics.bind(marker)
bound_marker.pos = 0. # Markers are initially placed at the origin.
bound_marker.group = _INVISIBLE_SITE_GROUP
self._move_counts = [0] * self._num_players
self._marker_ids = np.zeros((2, self._board_size, self._board_size),
dtype=np.int32)
def make_all_invisible(self, physics):
for player_markers in self._all_markers:
for marker in player_markers:
bound_marker = physics.bind(marker)
bound_marker.group = _INVISIBLE_SITE_GROUP
def make_visible_by_bpos(self, physics, player_id, all_bpos):
for bpos in all_bpos:
marker_id = self._marker_ids[player_id][bpos[0]][bpos[1]]
marker = self._all_markers[player_id][marker_id]
bound_marker = physics.bind(marker)
bound_marker.group = _VISIBLE_SITE_GROUP
def mark(self, physics, player_id, pos, bpos=None):
"""Enables the visibility of a marker, moves it to the specified position.
Args:
physics: `mjcf.Physics` instance.
player_id: Integer specifying the ID of the player whose marker to use.
pos: Array-like object specifying the cartesian position of the marker.
bpos: Board position, optional integer coordinates to index the markers.
Raises:
ValueError: If `player_id` is invalid.
RuntimeError: If `player_id` has no more available markers.
"""
if not 0 <= player_id < self._num_players:
raise ValueError(
_INVALID_PLAYER_ID.format(self._num_players - 1, player_id))
markers = self._all_markers[player_id]
move_count = self._move_counts[player_id]
if move_count >= len(markers):
raise RuntimeError(
_NO_MORE_MARKERS_AVAILABLE.format(move_count, player_id))
bound_marker = physics.bind(markers[move_count])
bound_marker.pos = pos
# TODO(alimuldal): Set orientation as well (random? same as contact frame?)
bound_marker.group = _VISIBLE_SITE_GROUP
self._move_counts[player_id] += 1
if bpos:
self._marker_ids[player_id][bpos[0]][bpos[1]] = move_count
class MarkersObservables(composer.Observables):
"""Observables for a `Markers` entity."""
@composer.observable
def position(self):
"""Cartesian positions of all marker sites.
Returns:
An `observable.MJCFFeature` instance. When called with an instance of
`physics` as the argument, this will return a numpy float64 array of shape
(num_players * num_markers, 3) where each row contains the cartesian
position of a marker. Unplaced markers will have position (0, 0, 0).
"""
return observable.MJCFFeature(
'xpos', list(itertools.chain.from_iterable(self._entity.markers)))
@@ -0,0 +1,67 @@
# Copyright 2020 DeepMind Technologies Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Tests for physics_planning_games.board_games._internal.pieces.py."""
from absl.testing import absltest
from dm_control import mjcf
import numpy as np
from physics_planning_games.board_games._internal import pieces
class MarkersTest(absltest.TestCase):
def test_position_observable(self):
num_per_player = 3
markers = pieces.Markers(num_per_player=num_per_player)
physics = mjcf.Physics.from_mjcf_model(markers.mjcf_model)
all_positions = [
[(0, 1, 2), (3, 4, 5), (6, 7, 8)], # Player 0
[(-1, 2, -3), (4, -5, 6)], # Player 1
]
for player_id, positions in enumerate(all_positions):
for marker_pos in positions:
markers.mark(physics=physics, player_id=player_id, pos=marker_pos)
expected_positions = np.zeros((2, num_per_player, 3), dtype=np.double)
expected_positions[0, :len(all_positions[0])] = all_positions[0]
expected_positions[1, :len(all_positions[1])] = all_positions[1]
observed_positions = markers.observables.position(physics)
np.testing.assert_array_equal(
expected_positions.reshape(-1, 3), observed_positions)
def test_invalid_player_id(self):
markers = pieces.Markers(num_per_player=5)
physics = mjcf.Physics.from_mjcf_model(markers.mjcf_model)
invalid_player_id = 99
with self.assertRaisesWithLiteralMatch(
ValueError, pieces._INVALID_PLAYER_ID.format(1, 99)):
markers.mark(physics=physics, player_id=invalid_player_id, pos=(1, 2, 3))
def test_too_many_moves(self):
num_per_player = 5
player_id = 0
markers = pieces.Markers(num_per_player=num_per_player)
physics = mjcf.Physics.from_mjcf_model(markers.mjcf_model)
for _ in range(num_per_player):
markers.mark(physics=physics, player_id=player_id, pos=(1, 2, 3))
with self.assertRaisesWithLiteralMatch(
RuntimeError,
pieces._NO_MORE_MARKERS_AVAILABLE.format(num_per_player, player_id)):
markers.mark(physics=physics, player_id=player_id, pos=(1, 2, 3))
if __name__ == '__main__':
absltest.main()
@@ -0,0 +1,36 @@
# Copyright 2020 DeepMind Technologies Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""A global registry of constructors for board game environments."""
from dm_control.utils import containers
_ALL_CONSTRUCTORS = containers.TaggedTasks(allow_overriding_keys=False)
add = _ALL_CONSTRUCTORS.add
get_constructor = _ALL_CONSTRUCTORS.__getitem__
get_all_names = _ALL_CONSTRUCTORS.keys
get_tags = _ALL_CONSTRUCTORS.tags
get_names_by_tag = _ALL_CONSTRUCTORS.tagged
# This disables the check that prevents the same task constructor name from
# being added to the container more than once. This is done in order to allow
# individual task modules to be reloaded without also reloading `registry.py`
# first (e.g. when "hot-reloading" environments in domain explorer).
def done_importing_tasks():
_ALL_CONSTRUCTORS.allow_overriding_keys = True
@@ -0,0 +1,23 @@
# Copyright 2020 DeepMind Technologies Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""String constants used to annotate task constructors."""
FEATURES = 'features'
VISION = 'vision'
EASY = 'easy'
MED = 'medium'
HARD = 'hard'
@@ -0,0 +1,39 @@
# Copyright 2020 DeepMind Technologies Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Basic smoke test for board_games environments."""
from absl.testing import absltest
from dm_env import test_utils
from physics_planning_games import board_games
class GoTest(test_utils.EnvironmentTestMixin, absltest.TestCase):
def make_object_under_test(self):
return board_games.load(environment_name='go_7x7', seed=0)
class TicTacToeTest(test_utils.EnvironmentTestMixin, absltest.TestCase):
def make_object_under_test(self):
return board_games.load(
environment_name='tic_tac_toe_mixture_opponent_markers_features',
seed=0)
if __name__ == '__main__':
absltest.main()
+154
View File
@@ -0,0 +1,154 @@
# Copyright 2020 DeepMind Technologies Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""A Go board game."""
from dm_control.composer.observation import observable
import numpy as np
from physics_planning_games.board_games import go_logic
from physics_planning_games.board_games import jaco_arm_board_game
from physics_planning_games.board_games._internal import boards
from physics_planning_games.board_games._internal import observations
from physics_planning_games.board_games._internal import pieces
from physics_planning_games.board_games._internal import registry
from physics_planning_games.board_games._internal import tags
_BLACK = (0., 0., 0., 0.75)
_WHITE = (1., 1., 1., 0.75)
_GO_PIECE_SIZE = 0.04
_DEFAULT_OPPONENT_MIXTURE = 0.2
class Go(jaco_arm_board_game.JacoArmBoardGame):
"""Single-player Go of configurable size."""
def __init__(self, board_size, observation_settings, opponent=None,
reset_arm_after_move=True):
"""Initializes a `Go` task.
Args:
board_size: board size
observation_settings: An `observations.ObservationSettings` namedtuple
specifying configuration options for each category of observation.
opponent: Go opponent to use for the opponent player actions.
reset_arm_after_move: Whether to reset arm to random position after every
piece being placed on the board.
"""
game_logic = go_logic.GoGameLogic(board_size=board_size)
if opponent is None:
opponent = go_logic.GoGTPOpponent(board_size=board_size,
mixture_p=_DEFAULT_OPPONENT_MIXTURE)
self._last_valid_move_is_pass = False
super(Go, self).__init__(observation_settings=observation_settings,
opponent=opponent,
game_logic=game_logic,
board=boards.GoBoard(boardsize=board_size),
markers=pieces.Markers(
player_colors=(_BLACK, _WHITE),
halfwidth=_GO_PIECE_SIZE,
num_per_player=board_size*board_size*2,
observable_options=observations.make_options(
observation_settings,
observations.MARKER_OBSERVABLES),
board_size=board_size))
self._reset_arm_after_move = reset_arm_after_move
# Add an observable exposing the move history (to reconstruct game states)
move_history_observable = observable.Generic(
lambda physics: self._game_logic.get_move_history())
move_history_observable.configure(
**observation_settings.board_state._asdict())
self._task_observables['move_history'] = move_history_observable
@property
def name(self):
return 'Go'
@property
def control_timestep(self):
return 0.05
def after_substep(self, physics, random_state):
if not self._made_move_this_step:
# which board square received the most contact pressure
indices = self._board.get_contact_indices(physics)
if not indices:
return
row, col = indices
# Makes sure that contact with that board square involved a finger
finger_touch = self._board.validate_finger_touch(physics,
row, col, self._hand)
if not finger_touch:
return
pass_action = True if (row == -1 and col == -1) else False
if pass_action and self._last_valid_move_is_pass:
# Don't allow two passes in a row (otherwise hard to only pass once)
valid_move = False
else:
valid_move = self._game_logic.apply(
player=jaco_arm_board_game.SELF,
action=go_logic.GoMarkerAction(row=int(row), col=int(col),
pass_action=pass_action))
if valid_move:
self._made_move_this_step = True
if not pass_action:
self._last_valid_move_is_pass = False
marker_pos = self._board.get_contact_pos(
physics=physics, row=row, col=col)
self._markers.mark(physics=physics,
player_id=jaco_arm_board_game.SELF,
pos=marker_pos,
bpos=(row, col))
else:
self._last_valid_move_is_pass = True
if not self._game_logic.is_game_over:
opponent_move = self._game_opponent.policy(
game_logic=self._game_logic, player=jaco_arm_board_game.OPPONENT,
random_state=random_state)
assert opponent_move
assert self._game_logic.apply(player=jaco_arm_board_game.OPPONENT,
action=opponent_move)
marker_pos = self._board.sample_pos_inside_touch_sensor(
physics=physics,
random_state=random_state,
row=opponent_move.row,
col=opponent_move.col)
self._markers.mark(physics=physics,
player_id=jaco_arm_board_game.OPPONENT,
pos=marker_pos,
bpos=(opponent_move.row,
opponent_move.col))
if self._reset_arm_after_move:
self._tcp_initializer(physics, random_state)
# Redraw all markers that are on the board (after captures)
self._markers.make_all_invisible(physics)
board = self._game_logic.get_board_state()
black_stones = np.transpose(np.nonzero(board[:, :, 1]))
white_stones = np.transpose(np.nonzero(board[:, :, 2]))
if black_stones.size > 0:
self._markers.make_visible_by_bpos(physics, 0, black_stones)
if white_stones.size > 0:
self._markers.make_visible_by_bpos(physics, 1, white_stones)
@registry.add(tags.EASY, tags.FEATURES)
def go_7x7():
return Go(board_size=7,
observation_settings=observations.PERFECT_FEATURES)
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,145 @@
# Copyright 2020 DeepMind Technologies Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
from absl.testing import absltest
from absl.testing import parameterized
import numpy as np
from physics_planning_games.board_games import go_logic
class GoGameLogicTest(parameterized.TestCase):
def setUp(self):
super(GoGameLogicTest, self).setUp()
self.logic = go_logic.GoGameLogic(board_size=5)
self.expected_board_state = np.zeros((5, 5, 4), dtype=bool)
self.expected_board_state[:, :, 0] = True
def test_valid_move_sequence(self):
np.testing.assert_array_equal(self.logic.get_board_state(),
self.expected_board_state)
action = go_logic.GoMarkerAction(col=1, row=2, pass_action=False)
self.assertTrue(self.logic.apply(player=0, action=action),
msg='Invalid action: {}'.format(action))
def test_pass(self):
action = go_logic.GoMarkerAction(col=0, row=0, pass_action=True)
self.assertTrue(self.logic.apply(player=0, action=action),
msg='Invalid action: {}'.format(action))
self.expected_board_state[:, :, 3] = True
np.testing.assert_array_equal(self.logic.get_board_state(),
self.expected_board_state)
def test_invalid_move_sequence(self):
np.testing.assert_array_equal(self.logic.get_board_state(),
self.expected_board_state)
action = go_logic.GoMarkerAction(col=1, row=2, pass_action=False)
self.assertTrue(self.logic.apply(player=0, action=action),
msg='Invalid action: {}'.format(action))
self.expected_board_state[action.row, action.col, 0] = False
self.expected_board_state[action.row, action.col, 1] = True
self.expected_board_state[:, :, 3] = True
np.testing.assert_array_equal(self.logic.get_board_state(),
self.expected_board_state)
action = go_logic.GoMarkerAction(col=1, row=2, pass_action=False)
self.assertFalse(self.logic.apply(player=0, action=action),
msg='Invalid action was accepted: {}'.format(action))
# Player 1 tries to move in the same location as player 0.
self.assertFalse(self.logic.apply(player=1, action=action),
msg='Invalid action was accepted: {}'.format(action))
# The board state should not have changed as a result of invalid actions.
np.testing.assert_array_equal(self.logic.get_board_state(),
self.expected_board_state)
def test_random_opponent_vs_gnugo(self):
"""Play random v gnugo opponents and check that optimal largely wins.
"""
board_size = 9
rand_state = np.random.RandomState(42)
pachi_opponent = go_logic.GoGTPOpponent(board_size)
random_opponent = go_logic.GoRandomOpponent(board_size)
players = [pachi_opponent, random_opponent]
pachi_returns = []
random_returns = []
for _ in range(3):
logic = go_logic.GoGameLogic(board_size)
pachi_opponent.reset()
random_opponent.reset()
rand_state.shuffle(players)
current_player_idx = 0
while not logic.is_game_over:
current_player = players[current_player_idx]
action = current_player.policy(logic, current_player_idx, rand_state)
valid_action = logic.apply(current_player_idx, action)
self.assertTrue(valid_action,
msg='Opponent {} selected invalid action {}'.format(
current_player, action))
current_player_idx = (current_player_idx + 1) % 2
# Record the winner.
reward = logic.get_reward
if players[0] == pachi_opponent:
pachi_return = reward[0]
random_return = reward[1]
else:
pachi_return = reward[1]
random_return = reward[0]
pachi_returns.append(pachi_return)
random_returns.append(random_return)
mean_pachi_returns = np.mean(pachi_returns)
mean_random_returns = np.mean(random_returns)
self.assertGreater(mean_pachi_returns, 0.95)
self.assertLess(mean_random_returns, 0.05)
@parameterized.named_parameters([
dict(testcase_name='00',
row=0, col=0),
dict(testcase_name='01',
row=1, col=0)])
def test_go_marker_to_int(self, row, col):
go_marker = go_logic.GoMarkerAction(row=row, col=col, pass_action=False)
int_action = go_logic._go_marker_to_int(go_marker, board_size=19)
recovered_go_marker = go_logic._int_to_go_marker(int_action, board_size=19)
self.assertEqual(go_marker, recovered_go_marker,
msg='Initial go marker {}, recovered {}'.format(
go_marker, recovered_go_marker))
@parameterized.named_parameters([
dict(testcase_name='00',
row=0, col=0),
dict(testcase_name='01',
row=1, col=0)])
def test_go_marker_to_str(self, row, col):
go_marker = go_logic.GoMarkerAction(row=row, col=col, pass_action=False)
str_action = go_logic._go_marker_to_str(go_marker)
recovered_go_marker = go_logic._str_to_go_marker(str_action)
self.assertEqual(go_marker,
recovered_go_marker,
msg='Initial go marker {}, recovered {}, '
'str_action {}'.format(go_marker, recovered_go_marker,
str_action))
if __name__ == '__main__':
absltest.main()
@@ -0,0 +1,135 @@
# Copyright 2020 DeepMind Technologies Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Shared base class for two-player Jaco arm board games.
"""
import functools
from dm_control import composer
from dm_control.composer import initializers
from dm_control.composer.observation import observable
from dm_control.composer.variation import distributions
from dm_control.composer.variation import rotations
from dm_control.entities.manipulators import base
from dm_control.entities.manipulators import kinova
import numpy as np
from physics_planning_games.board_games._internal import arenas
from physics_planning_games.board_games._internal import observations
_ARM_Y_OFFSET = 0.4
_TCP_LOWER_BOUNDS = (-0.1, -0.1, 0.2)
_TCP_UPPER_BOUNDS = (0.1, 0.1, 0.4)
# Player IDs
SELF = 0
OPPONENT = 1
def _uniform_downward_rotation():
angle = distributions.Uniform(-np.pi, np.pi, single_sample=True)
quaternion = rotations.QuaternionFromAxisAngle(axis=(0., 0., 1.), angle=angle)
return functools.partial(rotations.QuaternionPreMultiply(quaternion),
initial_value=base.DOWN_QUATERNION)
class JacoArmBoardGame(composer.Task):
"""Base class for two-player checker-like board games."""
def __init__(self, observation_settings, opponent, game_logic, board,
markers):
"""Initializes the task.
Args:
observation_settings: An `observations.ObservationSettings` namedtuple
specifying configuration options for each category of observation.
opponent: Opponent used for generating opponent moves.
game_logic: Logic for keeping track of the logical state of the board.
board: Board to use.
markers: Markers to use.
"""
self._game_logic = game_logic
self._game_opponent = opponent
arena = arenas.Standard(observable_options=observations.make_options(
observation_settings, observations.ARENA_OBSERVABLES))
arena.attach(board)
arm = kinova.JacoArm(observable_options=observations.make_options(
observation_settings, observations.JACO_ARM_OBSERVABLES))
hand = kinova.JacoHand(observable_options=observations.make_options(
observation_settings, observations.JACO_HAND_OBSERVABLES))
arm.attach(hand)
arena.attach_offset(arm, offset=(0, _ARM_Y_OFFSET, 0))
arena.attach(markers)
# Geoms belonging to the arm and hand are placed in a custom group in order
# to disable their visibility to the top-down camera. NB: we assume that
# there are no other geoms in ROBOT_GEOM_GROUP that don't belong to the
# robot (this is usually the case since the default geom group is 0). If
# there are then these will also be invisible to the top-down camera.
for robot_geom in arm.mjcf_model.find_all('geom'):
robot_geom.group = arenas.ROBOT_GEOM_GROUP
self._arena = arena
self._board = board
self._arm = arm
self._hand = hand
self._markers = markers
self._tcp_initializer = initializers.ToolCenterPointInitializer(
hand=hand, arm=arm,
position=distributions.Uniform(_TCP_LOWER_BOUNDS, _TCP_UPPER_BOUNDS),
quaternion=_uniform_downward_rotation())
# Add an observable exposing the logical state of the board.
board_state_observable = observable.Generic(
lambda physics: self._game_logic.get_board_state())
board_state_observable.configure(
**observation_settings.board_state._asdict())
self._task_observables = {'board_state': board_state_observable}
@property
def root_entity(self):
return self._arena
@property
def arm(self):
return self._arm
@property
def hand(self):
return self._hand
@property
def task_observables(self):
return self._task_observables
def get_reward(self, physics):
del physics # Unused.
return self._game_logic.get_reward[SELF]
def should_terminate_episode(self, physics):
return self._game_logic.is_game_over
def initialize_episode(self, physics, random_state):
self._tcp_initializer(physics, random_state)
self._game_logic.reset()
self._game_opponent.reset()
def before_step(self, physics, action, random_state):
super(JacoArmBoardGame, self).before_step(physics, action, random_state)
self._made_move_this_step = False
def after_substep(self, physics, random_state):
raise NotImplementedError('Subclass must implement after_substep.')
@@ -0,0 +1,116 @@
# Copyright 2020 DeepMind Technologies Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Abstract base classes and utility functions for logical aspects of the games.
"""
import abc
ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()})
class GameLogic(ABC):
"""Define the abstrat game logic class.
"""
@abc.abstractmethod
def __init__(self):
pass
@abc.abstractmethod
def reset(self):
pass
@abc.abstractproperty
def is_game_over(self):
"""Boolean specifying whether the current game has ended."""
@abc.abstractproperty
def get_reward(self):
pass
@abc.abstractmethod
def get_board_state(self):
"""Returns the logical board state as a numpy array."""
@abc.abstractmethod
def apply(self, player, action):
"""Checks whether action is valid, and if so applies it to the game state.
Args:
player: Integer specifying the player ID; either 0 or 1.
action: A `GoMarkerAction` instance.
Returns:
True if the action was valid, else False.
"""
class OpenSpielBasedLogic(GameLogic):
"""GameLogic using OpenSpiel for tracking game state.
"""
@property
def is_game_over(self):
"""Boolean specifying whether the current game has ended."""
return self._open_spiel_state.is_terminal()
@property
def get_reward(self):
"""Returns a dictionary that maps from `{player_id: player_reward}`."""
if self.is_game_over:
player0_return = self._open_spiel_state.player_return(0)
# Translate from OpenSpiel returns to 0.5 for draw, -1 for loss,
# +1 for win.
if player0_return == 0.:
reward = {0: 0.5, 1: 0.5}
elif player0_return == 1.:
reward = {0: 1., 1: 0.}
else:
assert player0_return == -1.
reward = {0: 0., 1: 1.}
else:
reward = {0: 0.,
1: 0.}
return reward
@property
def open_spiel_state(self):
"""OpenSpiel object representing the underlying game state."""
return self._open_spiel_state
class Opponent(ABC):
"""Abstract Opponent class."""
@abc.abstractmethod
def __init__(self):
pass
@abc.abstractmethod
def reset(self):
pass
@abc.abstractmethod
def policy(self, game_logic, random_state):
"""Return policy action.
Args:
game_logic: Go game logic state.
random_state: Numpy random state object.
Returns:
NamedTuple indicating opponent move.
"""
@@ -0,0 +1,110 @@
# Copyright 2020 DeepMind Technologies Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""A Tic Tac Toe task."""
from physics_planning_games.board_games import jaco_arm_board_game
from physics_planning_games.board_games import tic_tac_toe_logic
from physics_planning_games.board_games._internal import boards
from physics_planning_games.board_games._internal import observations
from physics_planning_games.board_games._internal import pieces
from physics_planning_games.board_games._internal import registry
from physics_planning_games.board_games._internal import tags
class TicTacToe(jaco_arm_board_game.JacoArmBoardGame):
"""Single-player Tic Tac Toe."""
def __init__(self, observation_settings, opponent=None,
reset_arm_after_move=True):
"""Initializes a `TicTacToe` task.
Args:
observation_settings: An `observations.ObservationSettings` namedtuple
specifying configuration options for each category of observation.
opponent: TicTacToeOpponent used for generating opponent moves.
reset_arm_after_move: Whether to reset arm to random position after every
piece being placed on the board.
"""
game_logic = tic_tac_toe_logic.TicTacToeGameLogic()
if opponent is None:
opponent = tic_tac_toe_logic.TicTacToeRandomOpponent()
markers = pieces.Markers(num_per_player=5,
observable_options=observations.make_options(
observation_settings,
observations.MARKER_OBSERVABLES))
self._reset_arm_after_move = reset_arm_after_move
super(TicTacToe, self).__init__(observation_settings=observation_settings,
opponent=opponent,
game_logic=game_logic,
board=boards.CheckerBoard(),
markers=markers)
@property
def control_timestep(self):
return 0.05
def after_substep(self, physics, random_state):
if not self._made_move_this_step:
indices = self._board.get_contact_indices(physics)
if not indices:
return
row, col = indices
valid_move = self._game_logic.apply(
player=jaco_arm_board_game.SELF,
action=tic_tac_toe_logic.SingleMarkerAction(row=row, col=col))
if valid_move:
self._made_move_this_step = True
marker_pos = self._board.get_contact_pos(
physics=physics, row=row, col=col)
self._markers.mark(physics=physics, player_id=jaco_arm_board_game.SELF,
pos=marker_pos)
if not self._game_logic.is_game_over:
opponent_move = self._game_opponent.policy(
game_logic=self._game_logic, random_state=random_state)
assert opponent_move
assert self._game_logic.apply(player=jaco_arm_board_game.OPPONENT,
action=opponent_move)
marker_pos = self._board.sample_pos_inside_touch_sensor(
physics=physics,
random_state=random_state,
row=opponent_move.row,
col=opponent_move.col)
self._markers.mark(physics=physics,
player_id=jaco_arm_board_game.OPPONENT,
pos=marker_pos)
if self._reset_arm_after_move:
self._tcp_initializer(physics, random_state)
@registry.add(tags.EASY, tags.FEATURES)
def tic_tac_toe_markers_features(**unused_kwargs):
return TicTacToe(observation_settings=observations.PERFECT_FEATURES)
@registry.add(tags.MED, tags.FEATURES)
def tic_tac_toe_mixture_opponent_markers_features(mixture_p=0.25):
print('Creating tictactoe task with random/optimal opponent mixture, p={}'
.format(mixture_p))
return TicTacToe(
observation_settings=observations.PERFECT_FEATURES,
opponent=tic_tac_toe_logic.TicTacToeMixtureOpponent(mixture_p))
@registry.add(tags.HARD, tags.FEATURES)
def tic_tac_toe_optimal_opponent_markers_features(**unused_kwargs):
return TicTacToe(observation_settings=observations.PERFECT_FEATURES,
opponent=tic_tac_toe_logic.TicTacToeOptimalOpponent())
@@ -0,0 +1,265 @@
# Copyright 2020 DeepMind Technologies Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""TicTacToe logic wrapper for use in manipulation tasks."""
import collections
import itertools
import numpy as np
from physics_planning_games.board_games import logic_base
from open_spiel.python.algorithms import minimax
import pyspiel
SingleMarkerAction = collections.namedtuple('SingleMarkerAction',
['row', 'col'])
force_random_start_position = False
class TicTacToeGameLogic(logic_base.OpenSpielBasedLogic):
"""Logic for TicTacToe game."""
def __init__(self):
self.reset()
def reset(self):
"""Resets the game state."""
# For now we always assume we are the starting player.
game = pyspiel.load_game('tic_tac_toe')
self._open_spiel_state = game.new_initial_state()
if force_random_start_position:
# For debugging purposes only, force some random moves
rand_state = np.random.RandomState(46)
rand_player = TicTacToeRandomOpponent()
num_moves = 4
for _ in range(num_moves):
action = rand_player.policy(self, rand_state)
action_1d = np.ravel_multi_index(action, (3, 3))
self._open_spiel_state.apply_action(action_1d)
def get_board_state(self):
"""Returns the logical board state as a numpy array.
Returns:
A boolean array of shape (H, W, C), where H=3, W=3 (height and width
of the board) and C=3 for the 3 planes. The 3 planes are, in order,
unmarked squares, x's (player 0) and y's (player 1).
"""
board_state = np.reshape(
np.array(self._open_spiel_state.observation_tensor(0), dtype=np.bool),
[3, 3, 3])
board_state = np.transpose(board_state, [1, 2, 0])
board_state = board_state[:, :, [0, 2, 1]]
return board_state
def apply(self, player, action):
"""Checks whether action is valid, and if so applies it to the game state.
Args:
player: Integer specifying the player ID; either 0 or 1.
action: A `SingleMarkerAction` instance.
Returns:
True if the action was valid, else False.
"""
action_value = np.ravel_multi_index((action.row, action.col), (3, 3))
if self._open_spiel_state.current_player() != player:
return False
try:
self._open_spiel_state.apply_action(action_value)
was_valid_move = True
except RuntimeError:
was_valid_move = False
return was_valid_move
class TicTacToeRandomOpponent(logic_base.Opponent):
"""An easy opponent for TicTacToe."""
def __init__(self):
pass
def reset(self):
"""Resets the opponent's internal state (not implemented)."""
pass
def policy(self, game_logic, random_state):
"""Return a random, valid move.
Args:
game_logic: TicTacToeGameLogic state of the game.
random_state: An instance of `np.random.RandomState`
Returns:
SingleMarkerAction of opponent.
"""
if game_logic.is_game_over:
return None
valid_moves = game_logic.open_spiel_state.legal_actions()
assert valid_moves
move = random_state.choice(valid_moves)
row, col = np.unravel_index(move, dims=(3, 3))
return SingleMarkerAction(row=row, col=col)
class TicTacToeMixtureOpponent(logic_base.Opponent):
"""A TicTacToe opponent which makes a mixture of optimal and random moves.
The optimal mixture component uses minimax search.
"""
def __init__(self, mixture_p):
"""Initialize the mixture opponent.
Args:
mixture_p: The mixture probability. We choose moves from the random
opponent with probability mixture_p and moves from the optimal
opponent with probability 1 - mixture_p.
"""
self._random_opponent = TicTacToeRandomOpponent()
self._optimal_opponent = TicTacToeOptimalOpponent()
self._mixture_p = mixture_p
def reset(self):
pass
def policy(self, game_logic, random_state):
if random_state.rand() < self._mixture_p:
return self._random_opponent.policy(game_logic, random_state)
else:
return self._optimal_opponent.policy(game_logic, random_state)
class TicTacToeOptimalOpponent(logic_base.Opponent):
"""A TicTacToe opponent which makes perfect moves.
Uses minimax search.
"""
def __init__(self):
pass
def reset(self):
pass
def policy(self, game_logic, random_state):
action = tic_tac_toe_minimax(game_logic.open_spiel_state, random_state)
return action
def numpy_array_to_open_spiel_state(board_state):
"""Take a numpy observation [3x3x3] bool area and create an OpenSpiel state.
Args:
board_state: 3x3x3 bool array with [col, row, c] with c indexing, in order,
empty squares, x moves, y moves.
Returns:
open_spiel_state: OpenSpiel state of this position.
"""
game = pyspiel.load_game('tic_tac_toe')
open_spiel_state = game.new_initial_state()
x_moves = np.flatnonzero(board_state[:, :, 1])
y_moves = np.flatnonzero(board_state[:, :, 2])
for x_m, y_m in itertools.zip_longest(x_moves, y_moves):
if open_spiel_state.is_terminal():
break
open_spiel_state.apply_action(x_m)
if open_spiel_state.is_terminal():
break
if y_m is not None:
open_spiel_state.apply_action(y_m)
return open_spiel_state
def open_spiel_move_to_single_marker_action(action):
row, col = np.unravel_index(action, dims=(3, 3))
return SingleMarkerAction(row=row, col=col)
def tic_tac_toe_random_move(state, random_state):
"""Returns a legal move at random from current state.
Args:
state: World state of the game. Either an OpenSpiel state
or a numpy encoding of the board.
random_state: numpy random state used for choosing randomly if there is more
than one optimal action.
Returns:
action: SingleMarkerAction of a random move.
"""
if isinstance(state, np.ndarray):
spiel_state = numpy_array_to_open_spiel_state(state)
else:
spiel_state = state
if spiel_state.is_terminal():
return False
legal_actions = spiel_state.legal_actions()
action = random_state.choice(legal_actions)
return open_spiel_move_to_single_marker_action(action)
def tic_tac_toe_minimax(state, random_state):
"""Tree search from the world_state in order to find the optimal action.
Args:
state: World state of the game. Either an OpenSpiel state
or a numpy encoding of the board.
random_state: numpy random state used for choosing randomly if there is more
than one optimal action.
Returns:
action: SingleMarkerAction of an optimal move.
"""
if isinstance(state, np.ndarray):
spiel_state = numpy_array_to_open_spiel_state(state)
else:
spiel_state = state
if spiel_state.is_terminal():
return False
current_player = spiel_state.current_player()
legal_actions = spiel_state.legal_actions()
best_actions = []
best_value = -100
for action in legal_actions:
state_after_action = spiel_state.clone()
state_after_action.apply_action(action)
value, _ = minimax.expectiminimax(state_after_action, 100, None,
current_player)
if value > best_value:
best_value = value
best_actions = [action]
elif value == best_value:
best_actions.append(action)
assert best_actions
action = random_state.choice(best_actions)
return open_spiel_move_to_single_marker_action(action)
@@ -0,0 +1,204 @@
# Copyright 2020 DeepMind Technologies Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
from absl.testing import absltest
from absl.testing import parameterized
import numpy as np
from physics_planning_games.board_games import tic_tac_toe_logic
class TicTacToeGameLogicTest(parameterized.TestCase):
def setUp(self):
super(TicTacToeGameLogicTest, self).setUp()
self.logic = tic_tac_toe_logic.TicTacToeGameLogic()
self.expected_board_state = np.zeros((3, 3, 3), dtype=bool)
self.expected_board_state[..., 0] = True # All positions initially empty.
def test_valid_move_sequence(self):
np.testing.assert_array_equal(self.logic.get_board_state(),
self.expected_board_state)
action = tic_tac_toe_logic.SingleMarkerAction(col=1, row=2)
self.assertTrue(self.logic.apply(player=0, action=action),
msg='Invalid action: {}'.format(action))
self.expected_board_state[action.row, action.col, 0] = False
self.expected_board_state[action.row, action.col, 1] = True
np.testing.assert_array_equal(self.logic.get_board_state(),
self.expected_board_state)
action = tic_tac_toe_logic.SingleMarkerAction(col=0, row=1)
self.assertTrue(self.logic.apply(player=1, action=action),
msg='Invalid action: {}'.format(action))
self.expected_board_state[action.row, action.col, 0] = False
self.expected_board_state[action.row, action.col, 2] = True
np.testing.assert_array_equal(self.logic.get_board_state(),
self.expected_board_state)
def test_invalid_move_sequence(self):
np.testing.assert_array_equal(self.logic.get_board_state(),
self.expected_board_state)
action = tic_tac_toe_logic.SingleMarkerAction(col=1, row=2)
self.assertTrue(self.logic.apply(player=0, action=action),
msg='Invalid action: {}'.format(action))
self.expected_board_state[action.row, action.col, 0] = False
self.expected_board_state[action.row, action.col, 1] = True
np.testing.assert_array_equal(self.logic.get_board_state(),
self.expected_board_state)
# Player 0 tries to move again in the same location.
action = tic_tac_toe_logic.SingleMarkerAction(col=1, row=2)
self.assertFalse(self.logic.apply(player=0, action=action),
msg='Invalid action was accepted: {}'.format(action))
# Player 1 tries to move in the same location as player 0.
self.assertFalse(self.logic.apply(player=1, action=action),
msg='Invalid action was accepted: {}'.format(action))
# The board state should not have changed as a result of invalid actions.
np.testing.assert_array_equal(self.logic.get_board_state(),
self.expected_board_state)
@parameterized.named_parameters([
dict(testcase_name='player_0_win',
move_sequence=((0, 0, 0),
(1, 0, 1),
(0, 1, 0),
(1, 2, 1),
(0, 2, 0)),
winner_id=0),
dict(testcase_name='player_1_win',
move_sequence=((0, 0, 0),
(1, 0, 2),
(0, 1, 0),
(1, 1, 1),
(0, 0, 1),
(1, 2, 0)),
winner_id=1),
dict(testcase_name='draw',
move_sequence=((0, 0, 0),
(1, 1, 1),
(0, 1, 0),
(1, 2, 0),
(0, 0, 2),
(1, 0, 1),
(0, 2, 1),
(1, 2, 2),
(0, 1, 2)),
winner_id=None)])
def test_reward_and_termination(self, move_sequence, winner_id):
for (player_id, row, col) in move_sequence:
self.assertFalse(self.logic.is_game_over)
self.assertDictEqual(self.logic.get_reward, {0: 0.0, 1: 0.0})
action = tic_tac_toe_logic.SingleMarkerAction(col=col, row=row)
self.assertTrue(self.logic.apply(player=player_id, action=action),
msg='Invalid action: {}'.format(action))
self.assertTrue(self.logic.is_game_over)
rewards = self.logic.get_reward
if winner_id is not None:
loser_id = 1 - winner_id
self.assertDictEqual(rewards, {winner_id: 1.0, loser_id: 0.0})
else: # Draw
self.assertDictEqual(rewards, {0: 0.5, 1: 0.5})
def test_random_opponent_vs_optimal(self):
"""Play random v optimal opponents and check that optimal largely wins.
"""
rand_state = np.random.RandomState(42)
optimal_opponent = tic_tac_toe_logic.TicTacToeOptimalOpponent()
random_opponent = tic_tac_toe_logic.TicTacToeRandomOpponent()
players = [optimal_opponent, random_opponent]
optimal_returns = []
random_returns = []
for _ in range(20):
logic = tic_tac_toe_logic.TicTacToeGameLogic()
optimal_opponent.reset()
random_opponent.reset()
rand_state.shuffle(players)
current_player_idx = 0
while not logic.is_game_over:
current_player = players[current_player_idx]
action = current_player.policy(logic, rand_state)
self.assertTrue(logic.apply(current_player_idx, action),
msg='Opponent {} selected invalid action {}'.format(
current_player, action))
current_player_idx = (current_player_idx + 1) % 2
# Record the winner.
reward = logic.get_reward
if players[0] == optimal_opponent:
optimal_return = reward[0]
random_return = reward[1]
else:
optimal_return = reward[1]
random_return = reward[0]
optimal_returns.append(optimal_return)
random_returns.append(random_return)
mean_optimal_returns = np.mean(optimal_returns)
mean_random_returns = np.mean(random_returns)
self.assertGreater(mean_optimal_returns, 0.9)
self.assertLess(mean_random_returns, 0.1)
@parameterized.named_parameters([
dict(testcase_name='pos0',
move_sequence=((0, 0, 1),
(1, 1, 1),
(0, 0, 2),
(1, 1, 2)),
optimal_move=(0, 0)),
dict(testcase_name='pos1',
move_sequence=((0, 0, 1),
(1, 1, 2),
(0, 0, 2),
(1, 1, 1)),
optimal_move=(0, 0)),
dict(testcase_name='pos2',
move_sequence=((0, 2, 1),
(1, 1, 2),
(0, 2, 2),
(1, 1, 1)),
optimal_move=(2, 0)),
])
def test_minimax_policy(self, move_sequence, optimal_move):
rand_state = np.random.RandomState(42)
for (player_id, row, col) in move_sequence:
action = tic_tac_toe_logic.SingleMarkerAction(col=col, row=row)
self.assertTrue(self.logic.apply(player=player_id, action=action),
msg='Invalid action: {}'.format(action))
state = self.logic.open_spiel_state
planner_action = tic_tac_toe_logic.tic_tac_toe_minimax(state,
rand_state)
self.assertEqual(planner_action, optimal_move)
# Do the same but with np array as input
self.logic = tic_tac_toe_logic.TicTacToeGameLogic()
for (player_id, row, col) in move_sequence:
action = tic_tac_toe_logic.SingleMarkerAction(col=col, row=row)
self.assertTrue(self.logic.apply(player=player_id, action=action),
msg='Invalid action: {}'.format(action))
board = self.logic.get_board_state()
planner_action = tic_tac_toe_logic.tic_tac_toe_minimax(board,
rand_state)
self.assertEqual(planner_action, optimal_move)
if __name__ == '__main__':
absltest.main()
+66
View File
@@ -0,0 +1,66 @@
# Copyright 2020 DeepMind Technologies Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Visualize physical planning games in Domain Explorer.
"""
import functools
from absl import app
from absl import flags
from dm_control import composer
from dm_control import viewer
from dm_control.locomotion import walkers
from physics_planning_games import board_games
from physics_planning_games.mujoban.boxoban import boxoban_level_generator
from physics_planning_games.mujoban.mujoban import Mujoban
from physics_planning_games.mujoban.mujoban_level import MujobanLevel
flags.DEFINE_enum('environment_name', 'mujoban', [
'mujoban', 'go_7x7', 'tic_tac_toe_markers_features',
'tic_tac_toe_mixture_opponent_markers_features',
'tic_tac_toe_optimal_opponent_markers_features'],
'Name of an environment to load.')
FLAGS = flags.FLAGS
TIME_LIMIT = 1000
CONTROL_TIMESTEP = .1
def main(argv):
if len(argv) > 1:
raise app.UsageError('Too many command-line arguments.')
environment_name = FLAGS.environment_name
if environment_name == 'mujoban':
walker = walkers.JumpingBallWithHead(add_ears=True, camera_height=0.25)
arena = MujobanLevel(boxoban_level_generator)
task = Mujoban(
walker=walker,
maze=arena,
control_timestep=CONTROL_TIMESTEP,
top_camera_height=64,
top_camera_width=48)
env = composer.Environment(
time_limit=TIME_LIMIT, task=task, strip_singleton_obs_buffer_dim=True)
else:
env = functools.partial(
board_games.load, environment_name=environment_name)
viewer.launch(env)
if __name__ == '__main__':
app.run(main)
@@ -0,0 +1,19 @@
# Copyright 2020 DeepMind Technologies Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Top-level module definitions for mujoban."""
from physics_planning_games.mujoban.mujoban import Mujoban
from physics_planning_games.mujoban.mujoban_level import MujobanLevel
+101
View File
@@ -0,0 +1,101 @@
# Copyright 2020 DeepMind Technologies Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Level generator for Mujoban based on levels from follwing dataset.
https://github.com/deepmind/boxoban-levels/
"""
import glob
import os
import zipfile
import numpy as np
import requests
BOXOBAN_URL = "https://github.com/deepmind/boxoban-levels/archive/master.zip"
def boxoban_level_generator(levels_set="unfiltered", data_split="valid"):
env = Boxoban(levels_set=levels_set, data_split=data_split)
while True:
index = np.random.randint(0, env.num_levels-1)
yield env.levels[index]
class Boxoban(object):
"""Class for loading and generatting Boxoban levels."""
def __init__(self,
levels_set="unfiltered",
data_split="valid"):
self._levels_set = levels_set
self._data_split = data_split
self._levels = []
data_file_path_local = os.path.join(os.path.dirname(__file__),
"boxoban_cache",
"{}_{}.npz".format(self._levels_set,
self._data_split))
data_file_path_global = os.path.join("/tmp/boxoban_cache",
"{}_{}.npz".format(self._levels_set,
self._data_split))
if os.path.exists(data_file_path_local):
self.levels = np.load(data_file_path_local)["levels"]
elif os.path.exists(data_file_path_global):
self.levels = np.load(data_file_path_global)["levels"]
else:
self.levels = self.get_data()
self.num_levels = len(self.levels)
def get_data(self):
"""Downloads and cache the data."""
try:
cache_path = os.path.join(
os.path.dirname(__file__), "boxoban_cache")
os.makedirs(cache_path, exist_ok=True)
except PermissionError:
cache_path = os.path.join("/tmp/boxoban_cache")
if not os.path.exists(cache_path):
os.makedirs(cache_path, exist_ok=True)
# Get the zip file
zip_file_path = os.path.join(cache_path, "master.zip")
if not os.path.exists(zip_file_path):
response = requests.get(BOXOBAN_URL, stream=True)
handle = open(zip_file_path, "wb")
for chunk in response.iter_content(chunk_size=512):
if chunk:
handle.write(chunk)
handle.close()
with zipfile.ZipFile(zip_file_path, "r") as zipref:
zipref.extractall(cache_path)
# convert to npz
path = os.path.join(cache_path, "boxoban-levels-master",
self._levels_set,
self._data_split)
files = glob.glob(path + "/*.txt")
levels = "".join([open(f, "r").read() for f in files])
levels = levels.split("\n;")
levels = ["\n".join(item.split("\n")[1:]) for item in levels]
levels = np.asarray(levels)
data_file_path = os.path.join(
cache_path, "{}_{}.npz".format(self._levels_set, self._data_split))
np.savez(data_file_path, levels=levels)
return levels
+451
View File
@@ -0,0 +1,451 @@
# Copyright 2020 DeepMind Technologies Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""MuJoban task.
Mujoban is a single player puzzle-solving game embedded in the MuJoCo
simulation environment. The puzzle is based on the 2D game of Sokoban,
where an agent situated on a grid has to push boxes onto target locations.
"""
import collections
from dm_control import composer
from dm_control.composer.observation import observable as observable_lib
from dm_control.locomotion.arenas import labmaze_textures
from dm_control.locomotion.arenas.mazes import MazeWithTargets
from dm_env import specs
import numpy as np
from six.moves import range
from six.moves import zip
from physics_planning_games.mujoban import mujoban_level
from physics_planning_games.mujoban.mujoban_pad import MujobanPad
from physics_planning_games.mujoban.props import BoxWithSites
_FLOOR_GAP_CHAR = '#'
_AMBIENT_HEADLIGHT = 0.8
_BOX_SIZE = 0.4
_BOX_HEIGHT = 0.15
_BOX_MASS = 2.5
_BOX_FRICTION = [0.5, 0.005, 0.0001]
_BOX_RGBA = [173. / 255., 179. / 255., 60. / 255., 1.]
_BOX_PRESSED_RGBA = [0, 0, 1, 1]
_TARGET_RGBA = [1.0, 0., 0., 1.]
_PRESSED_TARGET_RGBA = [0., 1., 0., 1.]
_PEG_SIZE = 0.05
_PEG_HEIGHT = 0.25
_PEG_RGBA = [0.5, 0.5, 0.5, 1]
_PEG_ANGLE = np.pi / 4
# Aliveness in [-1., 0.].
_ALIVE_THRESHOLD = -0.5
# Constants used by the full entity layer
_WALL_LAYER = 0
_TARGET_LAYER = 1
_SOKOBAN_LAYER = 2
_BOX_LAYER = 3
def _round_positions(boxes, walker, last_round_walker):
"""Round float positions to snap objects to grid."""
round_walker = np.round(walker).astype('int32')
round_boxes = [np.round(box).astype('int32') for box in boxes]
for box in round_boxes:
if np.array_equal(box, round_walker):
round_walker = last_round_walker
return round_boxes, round_walker
class Mujoban(composer.Task):
"""Requires objects to be moved onto matching-colored floor pads.
Agent only receives instantaneous rewards of +1 for the
timestep in which a box first enters a target, and -1 for the
timestep in which a box leaves the target. There is an additional reward of
+10 when all the boxes are put on targets, at which point the episode
terminates.
"""
def __init__(self,
walker,
maze,
target_height=0,
box_prop=None,
box_size=None,
box_mass=None,
with_grid_pegs=False,
detection_tolerance=0.0,
physics_timestep=0.001,
control_timestep=0.025,
top_camera_height=128,
top_camera_width=128,
box_on_target_reward=1.0,
level_solved_reward=10.0):
"""Initializes this task.
Args:
walker: A `Walker` object.
maze: A `BaseMaze` object.
target_height: The height of the target pads above the ground, in meters.
box_prop: An optional `Primitive` prop to use as the box.
box_size: An optional three element sequence defining the half lengths of
the sides of the box.
box_mass: Box mass. If this is a list or tuple, a random value is sampled
from the truncated exponential distribution in [a, b) where a =
box_mass[0] and b = box_mass[1], with scale factor box_mass[2] * (b -
a).
with_grid_pegs: Whether to add solid pegs at the corners of the maze
grid cells. This helps to enforce the usual Sokoban rules where
diagonal movements are forbidden.
detection_tolerance: A maximum length scale (in metres) within which a
box is allowed to stick outside a target pad while still activating it.
For example, if this is set to 0.1 then a box will activate a pad if it
sticks out of the pad by no more than 10 centimetres.
physics_timestep: The time step of the physics simulation.
control_timestep: Should be an integer multiple of the physics time step.
top_camera_height: An int; the height of the top camera in the
observation. Setting this to 0 will disable the top camera.
top_camera_width: An int; the width of the top camera in the observation.
Setting this to 0 will disable the top camera.
box_on_target_reward: A float; reward for putting a box on a target.
level_solved_reward: A float: reward for solving the level.
"""
skybox_texture = labmaze_textures.SkyBox(style='sky_03')
wall_textures = labmaze_textures.WallTextures(style='style_01')
floor_textures = labmaze_textures.FloorTextures(style='style_01')
self._detection_tolerance = detection_tolerance
self._box_prop = box_prop
self._box_on_target_reward = box_on_target_reward
self._level_solved_reward = level_solved_reward
self._maze = maze
self._arena = MazeWithTargets(
maze=maze,
xy_scale=1,
z_height=1,
skybox_texture=skybox_texture,
wall_textures=wall_textures,
floor_textures=floor_textures)
self._walker = walker
self._arena.mjcf_model.visual.headlight.ambient = [_AMBIENT_HEADLIGHT] * 3
self._arena.text_maze_regenerated_hook = self._regenerate_positions
self._first_step = True
# Targets.
self._targets = []
self._target_positions = []
# Boxes.
self._box_size = box_size or [_BOX_SIZE] * 2 + [_BOX_HEIGHT]
self._box_mass = box_mass or _BOX_MASS
self._boxes = []
self._box_positions = []
self._with_grid_pegs = with_grid_pegs
self._peg_body = None
self._last_walker_position = None
# Create walkers and corresponding observables.
self._walker.create_root_joints(self._arena.attach(self._walker))
enabled_observables = [self._walker.observables.sensors_touch,
self._walker.observables.orientation]
enabled_observables += self._walker.observables.proprioception
enabled_observables += self._walker.observables.kinematic_sensors
for observable in enabled_observables:
observable.enabled = True
if top_camera_width and top_camera_height:
self._arena.observables.top_camera.enabled = True
self._arena.observables.top_camera.width = top_camera_width
self._arena.observables.top_camera.height = top_camera_height
# symbolic entity repenstaion in labyrinth format.
self._entity_layer = self._maze.entity_layer
# pixel layer is same as pixel rendering of symbolic sokoban.
self._pixel_layer = np.zeros(self._entity_layer.shape + (3,), dtype='uint8')
self._full_entity_layer = np.zeros(self._entity_layer.shape + (4,),
dtype='bool')
pixel_layer_obs = observable_lib.Generic(lambda _: self._pixel_layer)
pixel_layer_obs.enabled = True
full_entity_layer_obs = observable_lib.Generic(
lambda _: self._full_entity_layer)
full_entity_layer_obs.enabled = True
self._task_observables = collections.OrderedDict({
'pixel_layer': pixel_layer_obs,
'full_entity_layer': full_entity_layer_obs,
})
# Set time steps.
self.set_timesteps(
physics_timestep=physics_timestep, control_timestep=control_timestep)
self._discount = 1.
@property
def name(self):
return 'Mujoban'
@property
def root_entity(self):
return self._arena
def _regenerate_positions(self):
self._object_positions = self._arena.find_token_grid_positions(
[mujoban_level.TARGET_CHAR, mujoban_level.BOX_CHAR])
self._box_positions = self._arena.grid_to_world_positions(
self._object_positions[mujoban_level.BOX_CHAR])
target_grid_positions = self._object_positions[mujoban_level.TARGET_CHAR]
self._target_positions = self._arena.grid_to_world_positions(
target_grid_positions)
for idx in range(len(self._target_positions)):
target_grid_position = target_grid_positions[idx]
grid_y, grid_x = target_grid_position
self._arena.maze.variations_layer[grid_y, grid_x] = _FLOOR_GAP_CHAR
def initialize_episode_mjcf(self, random_state):
self._arena.regenerate()
# Clear existing targets and boxes
for target in self._targets:
target.detach()
self._targets = []
for box in self._boxes:
box.detach()
self._boxes = []
self._arena.mjcf_model.contact.remove('pair')
for _ in range(self._maze.num_targets):
target = MujobanPad(
size=self._arena.xy_scale,
height=0,
detection_tolerance=self._detection_tolerance)
self._arena.attach(target)
self._targets.append(target)
for _ in range(self._maze.num_boxes):
box = self._box_prop
if not box:
box = BoxWithSites(half_lengths=self._box_size)
box.geom.mass = _BOX_MASS
box.geom.rgba = [0, 0, 0, 1] # Will be randomized for each episode.
frame = self._arena.attach(box)
frame.add('joint', type='slide', axis=[1, 0, 0], name='x_slider')
frame.add('joint', type='slide', axis=[0, 1, 0], name='y_slider')
frame.add('joint', type='slide', axis=[0, 0, 1], name='z_slider')
self._boxes.append(box)
for target in self._targets:
target.register_box(box)
# Reduce the friction between box and ground.
ground_geom = self._arena.mjcf_model.find('geom', 'ground')
self._arena.mjcf_model.contact.add(
'pair',
geom1=box.geom,
geom2=ground_geom,
condim=6,
friction=[
_BOX_FRICTION[0], _BOX_FRICTION[0], _BOX_FRICTION[1],
_BOX_FRICTION[2], _BOX_FRICTION[2]
])
# Set box masses.
for box in self._boxes:
box.geom.mass = _BOX_MASS
box.geom.rgba[:] = _BOX_RGBA
for target in self._targets:
target.rgba[:] = _TARGET_RGBA
target.pressed_rgba[:] = _PRESSED_TARGET_RGBA
if self._with_grid_pegs:
if self._peg_body is not None:
self._peg_body.remove()
self._peg_body = self._arena.mjcf_model.worldbody.add('body')
for y in range(self._arena.maze.height - 1):
for x in range(self._arena.maze.width - 1):
peg_x, peg_y, _ = self._arena.grid_to_world_positions(
[[x + 0.5, y + 0.5]])[0]
self._peg_body.add(
'geom', type='box',
size=[_PEG_SIZE / np.sqrt(2),
_PEG_SIZE / np.sqrt(2),
_PEG_HEIGHT / 2],
pos=[peg_x, peg_y, _PEG_HEIGHT / 2],
quat=[np.cos(_PEG_ANGLE / 2), 0, 0, np.sin(_PEG_ANGLE / 2)],
rgba=_PEG_RGBA)
def initialize_episode(self, physics, random_state):
self._first_step = True
self._was_activated = [False] * len(self._targets)
self._is_solved = False
self._discount = 1.
self._walker.reinitialize_pose(physics, random_state)
spawn_position = self._arena.spawn_positions[0]
spawn_rotation = random_state.uniform(-np.pi, np.pi)
spawn_quat = np.array(
[np.cos(spawn_rotation / 2), 0, 0,
np.sin(spawn_rotation / 2)])
self._walker.shift_pose(
physics, [spawn_position[0], spawn_position[1], 0.0], spawn_quat)
for box, box_xy_position in zip(self._boxes, self._box_positions):
# Position at the middle of a maze cell.
box_position = np.array(
[box_xy_position[0], box_xy_position[1], self._box_size[2]])
# Commit the box's final pose.
box.set_pose(physics, position=box_position, quaternion=[1., 0., 0., 0.])
for target, target_position in zip(self._targets, self._target_positions):
target.set_pose(physics, position=target_position)
target.reset(physics)
self._update_entity_pixel_layers(physics)
def before_step(self, physics, actions, random_state):
if isinstance(actions, list):
actions = np.concatenate(actions)
super(Mujoban, self).before_step(physics, actions, random_state)
if self._first_step:
self._first_step = False
else:
self._was_activated = [target.activated for target in self._targets]
def _get_object_positions_in_grid(self, physics):
box_positions = self._arena.world_to_grid_positions(
[physics.bind(box.geom).xpos for box in self._boxes])
walker_position = self._arena.world_to_grid_positions(
[physics.bind(self._walker.root_body).xpos])[0]
return box_positions, walker_position
def _update_entity_pixel_layers(self, physics):
"""Updates the pixel observation and both layered representations.
Mujoban offers 3 grid representations of the world:
* the pixel layer: this is a grid representations with an RGB value at
each grid point;
* the entity layer: this is a grid representation with a character at
each grid point. This representation hides information since if Sokoban
or a box are over a target, then the target is occluded. This is the
official entity layer used by arenas which is based on dm_control labmaze;
* the full entity layer: this is a grid represention with a boolean vector
of length 4 at each grid point. The first value is `True` iff there is a
wall at this location. The second value is `True` iff there is a target at
this location. The third value is for Sokoban, and fourth value is for
boxes. Note that this is not a one-hot encoding since Sokoban or a box
can share the same location as a target.
Args:
physics: a Mujoco physics object.
Raises:
RuntimeError: if a box or walker are overlapping with a wall.
"""
# The entity layer from the maze is a string that shows the maze at the
# *beginning* of the level. This is fixed throughout an episode.
entity_layer = self._maze.entity_layer.copy()
box_positions, walker_position = self._get_object_positions_in_grid(physics)
# round positions to snap to grid.
box_positions, walker_position = _round_positions(
box_positions, walker_position, self._last_walker_position)
# setup pixel layer
map_size = entity_layer.shape
pixel_layer = np.ndarray(map_size + (3,), dtype='uint8')
pixel_layer.fill(128)
# setup full entity layer
full_entity_layer = np.zeros(map_size + (4,), dtype='bool')
# remove boxes and agent
entity_layer[entity_layer == mujoban_level.BOX_CHAR] = '.'
entity_layer[entity_layer == 'P'] = '.'
# draw empty space and goals
pixel_layer[entity_layer == '.'] = [0, 0, 0]
pixel_layer[entity_layer == 'G'] = [255, 0, 0]
full_entity_layer[:, :, _WALL_LAYER] = True
full_entity_layer[:, :, _WALL_LAYER][entity_layer == '.'] = False
full_entity_layer[:, :, _WALL_LAYER][entity_layer == 'G'] = False
full_entity_layer[:, :, _TARGET_LAYER][entity_layer == 'G'] = True
# update boxes
for pos in box_positions:
# to ensure we are not changing the walls.
if entity_layer[pos[0], pos[1]] == '*':
raise RuntimeError('Box and wall positions are overlapping and this ',
'should not happen. It requires investigation and ',
'and fixing.')
# the entity layer has no representation of box on goal.
entity_layer[pos[0], pos[1]] = mujoban_level.BOX_CHAR
if np.array_equal(pixel_layer[pos[0], pos[1]], [255, 0, 0]):
pixel_layer[pos[0], pos[1]] = [0, 255, 0] # box on goal
else:
pixel_layer[pos[0], pos[1]] = [255, 255, 0]
full_entity_layer[pos[0], pos[1], _BOX_LAYER] = True
# update player
if entity_layer[walker_position[0], walker_position[1]] == '*':
raise RuntimeError('Walker and wall positions are overlapping and this ',
'should have not happen. It requires investigation ',
'and fixing.')
entity_layer[walker_position[0], walker_position[1]] = 'P'
pixel_layer[walker_position[0], walker_position[1]] = 0, 0, 255
full_entity_layer[
walker_position[0], walker_position[1], _SOKOBAN_LAYER] = True
self._last_walker_position = walker_position
self._entity_layer = entity_layer
self._pixel_layer = pixel_layer
self._full_entity_layer = full_entity_layer
def after_step(self, physics, random_state):
super(Mujoban, self).after_step(physics, random_state)
for box in self._boxes:
physics.bind(box.geom).rgba = _BOX_RGBA
for target in self._targets:
if target.activated:
target.activator.rgba = _BOX_PRESSED_RGBA
self._update_entity_pixel_layers(physics)
self._is_solved = all([target.activated for target in self._targets])
if self._is_solved:
self._discount = 0.
def get_reward(self, physics):
reward = 0.0
for target, was_activated in zip(self._targets, self._was_activated):
if target.activated and not was_activated:
reward += self._box_on_target_reward
elif was_activated and not target.activated:
reward -= self._box_on_target_reward
if self._is_solved:
reward += self._level_solved_reward
return reward
def get_discount(self, physics):
return self._discount
def should_terminate_episode(self, physics):
is_dead = self._walker.aliveness(physics) < _ALIVE_THRESHOLD
return self._is_solved or is_dead
def get_reward_spec(self):
return specs.ArraySpec(shape=[], dtype=np.float32)
@property
def task_observables(self):
return self._task_observables
@@ -0,0 +1,140 @@
# Copyright 2020 DeepMind Technologies Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Module for generating Mujoban level.
"""
import labmaze
BOX_CHAR = 'B'
TARGET_CHAR = labmaze.defaults.OBJECT_TOKEN
_DEFAULT_LEVEL = """
#####
# #
#### # #
# . .# #
# . #
# .## $##
## #$$ #
## $@#
## ###
####"""
# The meaning of symbols here are the same as defined in
# http://sneezingtiger.com/sokoban/levels/sasquatch5Text.html. These are the
# same symbols as used by the Sokoban community.
EMPTY_CELL = ' '
GOAL = '.'
PLAYER = '@'
PLAYER_ON_GOAL = '+'
BOX = '$'
BOX_ON_GOAL = '*'
WALL = '#'
_SOKOBAN_SYMBOLS = [
EMPTY_CELL, GOAL, PLAYER, PLAYER_ON_GOAL, BOX, BOX_ON_GOAL, WALL
]
def single_level_generator(level=_DEFAULT_LEVEL):
while True:
yield level
def _ascii_to_text_grid_level(ascii_level):
"""Goes from official Sokoban ASCII art to string understood by Mujoban.
Args:
ascii_level: a multiline string; each character is a location in a
gridworld.
Returns:
A string.
"""
level = ascii_level
if level.startswith('\n'):
level = level[1:]
level = level.replace('$', BOX_CHAR)
level = level.replace('.', TARGET_CHAR)
level = level.replace(' ', '.')
level = level.replace('#', '*')
level = level.replace('@', 'P')
if level[-1] == '\n':
level = level[:-1]
# Pad
all_rows = level.split('\n')
width = max(len(row) for row in all_rows)
padded_rows = []
for row in all_rows:
row += '*' * (width - len(row))
padded_rows.append(row)
level = '\n'.join(padded_rows)
return level + '\n'
class MujobanLevel(labmaze.BaseMaze):
"""A maze that represents a level in Mujoban."""
def __init__(self, ascii_level_generator=single_level_generator):
"""Constructor.
Args:
ascii_level_generator: a Python generator. At each iteration, this should
return a string representing a level. The symbols in the string should be
those of http://sneezingtiger.com/sokoban/levels/sasquatch5Text.html.
These are the same symbols as used by the Sokoban community.
"""
self._level_iterator = ascii_level_generator()
self.regenerate()
def regenerate(self):
"""Regenerates the maze if required."""
level = next(self._level_iterator)
self._entity_layer = labmaze.TextGrid(_ascii_to_text_grid_level(level))
self._variation_layer = self._entity_layer.copy()
self._variation_layer[:] = '.'
self._num_boxes = (self._entity_layer == BOX_CHAR).sum()
num_targets = (self._entity_layer == TARGET_CHAR).sum()
if num_targets != self._num_boxes:
raise ValueError('Number of targets {} should equal number of boxes {}.'
.format(num_targets, self._num_boxes))
@property
def num_boxes(self):
return self._num_boxes
@property
def num_targets(self):
return self._num_boxes
@property
def entity_layer(self):
return self._entity_layer
@property
def variations_layer(self):
return self._variation_layer
@property
def height(self):
return self._entity_layer.shape[0]
@property
def width(self):
return self._entity_layer.shape[1]
@@ -0,0 +1,53 @@
# Copyright 2020 DeepMind Technologies Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Tests for mujoban_level."""
from absl.testing import absltest
from physics_planning_games.mujoban import mujoban_level
_LEVEL = """
#####
# @####
# $. #
###$.# #
# $.# #
# #$. #
# ###
######"""
_GRID_LEVEL = """********
*..P****
*..BG..*
***BG*.*
*..BG*.*
*.*BG..*
*....***
********
"""
class MujobanLevelTest(absltest.TestCase):
def test_ascii_to_text_grid_level(self):
grid_level = mujoban_level._ascii_to_text_grid_level(_LEVEL)
self.assertEqual(_GRID_LEVEL, grid_level)
if __name__ == '__main__':
absltest.main()
@@ -0,0 +1,126 @@
# Copyright 2020 DeepMind Technologies Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""A floor pad that is activated through touch."""
import weakref
from dm_control import composer
from dm_control import mjcf
import numpy as np
def _get_activator_box(pad_xpos, pad_size, boxes, tolerance=0.0):
"""Returns the activator box, if any. Otherwise returns None."""
# Ignore the height
pad_min = pad_xpos[0:2] - pad_size[0:2]
pad_max = pad_xpos[0:2] + pad_size[0:2]
for box in boxes:
box_xpos = np.array(box.xpos[0:2])
box_size = np.array(box.size[0:2])
min_ = pad_min + box_size - tolerance
max_ = pad_max - box_size + tolerance
in_range = np.logical_and(box_xpos >= min_, box_xpos <= max_).all()
if in_range:
return box
# No activator box was found
return None
class MujobanPad(composer.Entity):
"""A less sensitive floor pad for Mujoban."""
def _build(self, rgba=None, pressed_rgba=None,
size=1, height=0.02, detection_tolerance=0.0, name='mujoban_pad'):
rgba = tuple(rgba or (1, 0, 0, 1))
pressed_rgba = tuple(pressed_rgba or (0.2, 0, 0, 1))
self._mjcf_root = mjcf.RootElement(model=name)
self._site = self._mjcf_root.worldbody.add(
'site', type='box', name='site',
pos=[0, 0, (height / 2 or -0.001)],
size=[size / 2, size / 2, (height / 2 or 0.001)], rgba=rgba)
self._activated = False
self._rgba = np.array(rgba, dtype=np.float)
self._pressed_rgba = np.array(pressed_rgba, dtype=np.float)
self._activator = None
self._detection_tolerance = detection_tolerance
self._boxes = []
@property
def rgba(self):
return self._rgba
@property
def pressed_rgba(self):
return self._pressed_rgba
def register_box(self, box_entity):
self._boxes.append(weakref.proxy(box_entity))
@property
def site(self):
return self._site
@property
def boxes(self):
return self._boxes
@property
def activator(self):
return self._activator if self._activated else None
@property
def mjcf_model(self):
return self._mjcf_root
def initialize_episode_mjcf(self, unused_random_state):
self._activated = False
def initialize_episode(self, physics, unused_random_state):
self._update_activation(physics)
def _update_activation(self, physics):
# Note: we get the physically bound box, not an object from self._boxes.
# That's because the generator expression below generates bound objects.
box = _get_activator_box(
pad_xpos=np.array(physics.bind(self._site).xpos),
pad_size=np.array(physics.bind(self._site).size),
boxes=(physics.bind(box.geom) for box in self._boxes),
tolerance=self._detection_tolerance,)
if box:
self._activated = True
self._activator = box
else:
self._activated = False
self._activator = None
if self._activated:
physics.bind(self._site).rgba = self._pressed_rgba
else:
physics.bind(self._site).rgba = self._rgba
def before_step(self, physics, unused_random_state):
self._update_activation(physics)
def after_substep(self, physics, unused_random_state):
self._update_activation(physics)
@property
def activated(self):
"""Whether this floor pad is pressed at the moment."""
return self._activated
def reset(self, physics):
self._activated = False
physics.bind(self._site).rgba = self._rgba
@@ -0,0 +1,75 @@
# Copyright 2020 DeepMind Technologies Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Tests for Mujoban."""
from absl.testing import absltest
from dm_control import composer
from dm_control.locomotion import walkers
import dm_env as environment
import numpy as np
from physics_planning_games.mujoban.mujoban import Mujoban
from physics_planning_games.mujoban.mujoban_level import MujobanLevel
TIME_LIMIT = 5
CONTROL_TIMESTEP = .1
class MujobanTest(absltest.TestCase):
def test(self):
walker = walkers.JumpingBallWithHead(add_ears=True, camera_height=0.25)
arena = MujobanLevel()
task = Mujoban(
walker=walker,
maze=arena,
control_timestep=CONTROL_TIMESTEP,
top_camera_height=64,
top_camera_width=48)
env = composer.Environment(
time_limit=TIME_LIMIT,
task=task,
strip_singleton_obs_buffer_dim=True)
time_step = env.reset()
self.assertEqual(
set([
'pixel_layer', 'full_entity_layer', 'top_camera',
'walker/body_height', 'walker/end_effectors_pos',
'walker/joints_pos', 'walker/joints_vel',
'walker/sensors_accelerometer', 'walker/sensors_gyro',
'walker/sensors_touch', 'walker/sensors_velocimeter',
'walker/world_zaxis', 'walker/orientation',
]), set(time_step.observation.keys()))
top_camera = time_step.observation['top_camera']
self.assertEqual(np.uint8, top_camera.dtype)
self.assertEqual((64, 48, 3), top_camera.shape)
all_step_types = []
# Run enough actions that we are guaranteed to have restarted the
# episode at least once.
for _ in range(int(2*TIME_LIMIT/CONTROL_TIMESTEP)):
action = 2*np.random.random(env.action_spec().shape) - 1
time_step = env.step(action)
all_step_types.append(time_step.step_type)
self.assertEqual(set([environment.StepType.FIRST,
environment.StepType.MID,
environment.StepType.LAST]),
set(all_step_types))
if __name__ == '__main__':
absltest.main()
+62
View File
@@ -0,0 +1,62 @@
# Copyright 2020 DeepMind Technologies Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Box props used in Mujoban that the agent pushes.
"""
import itertools
from dm_control import composer
from dm_control.entities import props
class Box(props.Primitive):
"""A class representing a box prop."""
def _build(self, half_lengths=None, mass=None, name='box'):
half_lengths = half_lengths or [0.05, 0.1, 0.15]
super(Box, self)._build(geom_type='box',
size=half_lengths,
mass=mass,
name=name)
class BoxWithSites(Box):
"""A class representing a box prop with sites on the corners."""
def _build(self, half_lengths=None, mass=None, name='box'):
half_lengths = half_lengths or [0.05, 0.1, 0.15]
super(BoxWithSites, self)._build(half_lengths=half_lengths, mass=mass,
name=name)
corner_positions = itertools.product([half_lengths[0], -half_lengths[0]],
[half_lengths[1], -half_lengths[1]],
[half_lengths[2], -half_lengths[2]])
corner_sites = []
for i, corner_pos in enumerate(corner_positions):
corner_sites.append(
self.mjcf_model.worldbody.add(
'site',
type='sphere',
name='corner_{}'.format(i),
size=[0.1],
pos=corner_pos,
rgba=[1, 0, 0, 1.0],
group=composer.SENSOR_SITES_GROUP))
self._corner_sites = tuple(corner_sites)
@property
def corner_sites(self):
return self._corner_sites
+6
View File
@@ -0,0 +1,6 @@
absl-py == 0.9.0
dm-control
dm-env
labmaze
numpy == 1.19.1
requests == 2.24.0