Skip to content

Instantly share code, notes, and snippets.

@benblack769
Last active October 24, 2020 02:34
Show Gist options
  • Save benblack769/3eeb1056de5613c5aafa6510de4bbf44 to your computer and use it in GitHub Desktop.
Save benblack769/3eeb1056de5613c5aafa6510de4bbf44 to your computer and use it in GitHub Desktop.
from ray.rllib.env.multi_agent_env import MultiAgentEnv
class PettingZooEnv(MultiAgentEnv):
"""An interface to the PettingZoo MARL environment library.
See: https://github.com/PettingZoo-Team/PettingZoo
Inherits from MultiAgentEnv and exposes a given AEC
(actor-environment-cycle) game from the PettingZoo project via the
MultiAgentEnv public API.
Note that the wrapper has some important limitations:
1. All agents have the same action_spaces and observation_spaces.
Note: If, within your aec game, agents do not have homogeneous action /
observation spaces, apply SuperSuit wrappers
to apply padding functionality: https://github.com/PettingZoo-Team/
SuperSuit#built-in-multi-agent-only-functions
2. Environments are positive sum games (-> Agents are expected to cooperate
to maximize reward). This isn't a hard restriction, it just that
standard algorithms aren't expected to work well in highly competitive
games.
Examples:
>>> from pettingzoo.butterfly import prison_v1
>>> env = PettingZooEnv(prison_v1.env())
>>> obs = env.reset()
>>> print(obs)
# only returns the observation for the agent which should be stepping
{
'prisoner_0': array([[[0, 0, 0],
[0, 0, 0],
[0, 0, 0],
...,
[0, 0, 0],
[0, 0, 0],
[0, 0, 0]]], dtype=uint8)
}
>>> obs, rewards, dones, infos = env.step({
... "prisoner_0": 1
... })
# only returns the observation, reward, info, etc, for the agent who's turn is next.
>>> print(obs)
{
'prisoner_1': array([[[0, 0, 0],
[0, 0, 0],
[0, 0, 0],
...,
[0, 0, 0],
[0, 0, 0],
[0, 0, 0]]], dtype=uint8)
}
>>> print(rewards)
{
'prisoner_1': 0
}
>>> print(dones)
{
'prisoner_1': False, '__all__': False
}
>>> print(infos)
{
'prisoner_1': {'map_tuple': (1, 0)}
}
"""
def __init__(self, env):
self.env = env
# agent idx list
self.agents = self.env.possible_agents
# Get dictionaries of obs_spaces and act_spaces
self.observation_spaces = self.env.observation_spaces
self.action_spaces = self.env.action_spaces
# Get first observation space, assuming all agents have equal space
self.observation_space = self.observation_spaces[self.agents[0]]
# Get first action space, assuming all agents have equal space
self.action_space = self.action_spaces[self.agents[0]]
assert all(obs_space == self.observation_space
for obs_space
in self.env.observation_spaces.values()), \
"Observation spaces for all agents must be identical. Perhaps " \
"SuperSuit's pad_observations wrapper can help (useage: " \
"`supersuit.aec_wrappers.pad_observations(env)`"
assert all(act_space == self.action_space
for act_space in self.env.action_spaces.values()), \
"Action spaces for all agents must be identical. Perhaps " \
"SuperSuit's pad_action_space wrapper can help (useage: " \
"`supersuit.aec_wrappers.pad_action_space(env)`"
self.reset()
def reset(self):
self.env.reset()
return {self.env.agent_selection: self.env.observe(self.env.agent_selection)}
def step(self, action):
self.env.step(action[self.env.agent_selection])
obs_d = {}
rew_d = {}
done_d = {}
info_d = {}
first = True
while first or (self.env.agents and self.env.dones[self.env.agent_selection]):
obs, rew, done, info = self.env.last()
a = self.env.agent_selection
obs_d[a] = obs
rew_d[a] = rew
done_d[a] = done
info_d[a] = info
if self.env.dones[self.env.agent_selection]:
self.env.step(None)
first = False
all_done = not self.env.agents
done_d['__all__'] = all_done
return obs_d, rew_d, done_d, info_d
def close(self):
self.env.close()
def seed(self, seed=None):
self.env.seed(seed)
def render(self, mode="human"):
return self.env.render(mode)
class ParallelPettingZooEnv(MultiAgentEnv):
def __init__(self, env):
self.par_env = env
# agent idx list
self.agents = self.par_env.possible_agents
# Get dictionaries of obs_spaces and act_spaces
self.observation_spaces = self.par_env.observation_spaces
self.action_spaces = self.par_env.action_spaces
# Get first observation space, assuming all agents have equal space
self.observation_space = self.observation_spaces[self.agents[0]]
# Get first action space, assuming all agents have equal space
self.action_space = self.action_spaces[self.agents[0]]
assert all(obs_space == self.observation_space
for obs_space
in self.par_env.observation_spaces.values()), \
"Observation spaces for all agents must be identical. Perhaps " \
"SuperSuit's pad_observations wrapper can help (useage: " \
"`supersuit.aec_wrappers.pad_observations(env)`"
assert all(act_space == self.action_space
for act_space in self.par_env.action_spaces.values()), \
"Action spaces for all agents must be identical. Perhaps " \
"SuperSuit's pad_action_space wrapper can help (useage: " \
"`supersuit.aec_wrappers.pad_action_space(env)`"
self.reset()
def reset(self):
return self.par_env.reset()
def step(self, action_dict):
obss, rews, dones, infos = self.par_env.step(action_dict)
dones["__all__"] = all(dones.values())
return obss, rews, dones, infos
def close(self):
self.par_env.close()
def seed(self, seed=None):
self.par_env.seed(seed)
def render(self, mode="human"):
return self.par_env.render(mode)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment