Skip to content

Instantly share code, notes, and snippets.

@aadharna
Last active March 20, 2023 21:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aadharna/fedd9ba7cec2d4c31651e17e2bbfd948 to your computer and use it in GitHub Desktop.
Save aadharna/fedd9ba7cec2d4c31651e17e2bbfd948 to your computer and use it in GitHub Desktop.

Either something is wrong with how my rollout class is using the policies or the data returned by rllib is wrong.

rllib is telling me that a given policy achieved win-rates of 0.89 -> 0.95 -> 0.99 and when I get a run from algo.evaluate() that matches.

However, if I take those same models and run them in a rollout manually, I'm getting completely different results e.g., win_rate = 0.48

import gym
import torch
import torch.nn as nn
from ray.rllib.models.torch.torch_modelv2 import TorchModelV2
from ray.rllib.models.torch.torch_modelv2 import ModelConfigDict
from ray.rllib.utils.annotations import override
class MLP(TorchModelV2, nn.Module):
"""MLP"""
def __init__(self, obs_space: gym.spaces.Space,
action_space: gym.spaces.Space, num_outputs: int,
model_config: ModelConfigDict, name: str):
TorchModelV2.__init__(self,
obs_space,
action_space,
num_outputs,
model_config,
name)
nn.Module.__init__(self)
self.fc1 = nn.Linear(obs_space.shape[0], 128)
self.relu1 = nn.ReLU()
self.fc2 = nn.Linear(128, 128)
self.relu2 = nn.ReLU()
self.fc3 = nn.Linear(128, num_outputs)
self.value_fn = nn.Linear(128, 1)
@override(TorchModelV2)
def forward(self, input_dict, state, seq_lens):
x = input_dict['obs']
x = self.fc1(x)
x = self.relu1(x)
x = self.fc2(x)
x = self.relu2(x)
self.value = self.value_fn(x)
x = self.fc3(x)
return x, state
@override(TorchModelV2)
def value_function(self):
return self.value.squeeze(1)
class MaskedMLP(MLP):
"""MLP with a mask applied to the output."""
def __init__(self, obs_space: gym.spaces.Space,
action_space: gym.spaces.Space, num_outputs: int,
model_config: ModelConfigDict, name: str):
super().__init__(obs_space, action_space, num_outputs, model_config,
name)
@override(TorchModelV2)
def forward(self, input_dict, state, seq_lens):
x, state = super().forward(input_dict, state, seq_lens)
mask = torch.zeros_like(x)
mask[:, -1] = -torch.inf
return x + mask, state
@override(TorchModelV2)
def value_function(self):
return super().value_function()
import ray
import gym
import numpy as np
from ray.rllib.policy.torch_policy import TorchPolicy
# custom remote worker class
class CRW:
"""
Custom remote worker class
Temporary workaround for the fact that you cannot do:
algorithm.evaluation_workers.local_worker().rollout('policy_1', 'policy_2')
"""
def __init__(self, env_creator, env_config, policy_config):
self.env = env_creator(env_config)
self.p1_name = env_config["players_ids"][0]
self.p2_name = env_config["players_ids"][1]
if isinstance(self.env.observation_space, gym.spaces.Discrete):
self.observation_space = gym.spaces.Box(low=0, high=1, shape=(self.env.observation_space.n,))
# make the config meet minimal requirements that rllib expects
policy_config['num_gpus'] = 0
policy_config['_fake_gpus'] = False
# dummy values
policy_config['lr'] = 0.0001
policy_config['explore'] = True
foo = policy_config.copy()
foo['model']['custom_model'] = 'masked_mlp'
# random opponent
self.p1 = TorchPolicy(observation_space=self.observation_space,
action_space=self.env.P2_ACTION_SPACE,
config=foo)
self.use_random = False
# main agent
self.p2 = TorchPolicy(observation_space=self.observation_space,
action_space=self.env.P2_ACTION_SPACE,
config=policy_config)
def get_discounted_rewards(self, p1_state, p2_state, discount=1.0):
if p1_state['weights']:
self.p1.set_state(p1_state)
self.use_random = False
else:
self.use_random = True
self.p2.set_state(p2_state)
r1_list, r2_list = self.rollout()
# get rewards for the opponent
r1_disc = r1_list * discount ** np.arange(len(r1_list))
r2_disc = r2_list * discount ** np.arange(len(r2_list))
return {self.p1_name: r1_disc.sum(), self.p2_name: r2_disc.sum()}
def rollout(self):
obs = self.env.reset()
done = {"__all__": False}
r1_list = []
r2_list = []
while not done["__all__"]:
o1 = obs[self.p1_name]
o2 = obs[self.p2_name]
if not self.use_random:
hot_o1 = np.zeros(self.observation_space.shape[0])
hot_o1[o1] = 1
action1 = self.p1.compute_single_action(obs=hot_o1)[0]
else:
action1 = self.env.P1_ACTION_SPACE.sample()
hot_o2 = np.zeros(self.observation_space.shape[0])
hot_o2[o2] = 1
action2 = self.p2.compute_single_action(obs=hot_o2)[0]
obs, reward, done, info = self.env.step({self.p1_name: action1, self.p2_name: action2})
r1_list.append(reward[self.p1_name])
r2_list.append(reward[self.p2_name])
return np.array(r1_list), np.array(r2_list)
if __name__ == "__main__":
from ray.tune import CLIReporter, register_env
from ray.rllib.models import ModelCatalog
from RPS_W import IteratedRPS_W
from mlp import MLP, MaskedMLP
from ray.rllib.algorithms.ppo import PPOConfig
from tqdm import tqdm
ray.init()
env_config = {
# although random is also tagged with a zero, it will get removed when we update the
# policy mapping function therefore this doesn't break anything and the original
# policy mapping function can be used at the start which will select random
"players_ids": ["random_r0", "main0"],
"max_steps": 20,
"get_additional_info": True,
}
game_name = 'foo'
register_env(game_name, lambda config: IteratedRPS_W(config))
ModelCatalog.register_custom_model('mlp', MLP)
ModelCatalog.register_custom_model('masked_mlp', MaskedMLP)
config = (
PPOConfig()
.environment(env=game_name,
env_config=env_config)
.rollouts(create_env_on_local_worker=True)
.framework("torch")
.multi_agent(
# Initial policy map: Random and PPO. This will be expanded
# to more policy snapshots taken from "main" against which "main"
# will then play (instead of "random"). This is done in the
# custom callback defined above (`SelfPlayCallback`).
policies={
env_config["players_ids"][0]: (
None,
IteratedRPS_W.OBSERVATION_SPACE,
IteratedRPS_W.P2_ACTION_SPACE,
{'model': {'custom_model': 'masked_mlp'}},
),
env_config["players_ids"][1]: (
None,
IteratedRPS_W.OBSERVATION_SPACE,
IteratedRPS_W.P2_ACTION_SPACE,
{'model': {'custom_model': 'mlp'}},
),
},
# Assign agent 0 and 1 randomly to the "main" policy or
# to the opponent ("random" at first). Make sure (via episode_id)
# that "main" always plays against "random" (and not against
# another "main").
policy_mapping_fn=lambda agent_id, episode, worker, **kwargs: agent_id,
# Always just train the "main" policy.
policies_to_train=["main0"],
)
# Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0.
.resources(num_gpus=0)
)
algo = config.build(env=game_name)
policy_config = {'model': {'custom_model': 'mlp'}}
crw = CRW(lambda config: IteratedRPS_W(config), env_config,
policy_config)
m_state = algo.get_policy('main0').get_state()
r_state = algo.get_policy('random_r0').get_state()
results = []
for i in tqdm(range(500)):
results.append(crw.get_discounted_rewards(r_state, m_state)['main0'])
results = np.array(results)
print("step 0 manual evaluate: ", np.mean(results > 0))
r = algo.evaluate()['evaluation']
biz = np.array(r['hist_stats']['policy_main0_reward'])
print("step 0 rllib evaluate: ", np.mean(biz > 0))
train_results = []
eval_results = []
manual_eval_results = []
for i in tqdm(range(20)):
train_results.append(np.mean(np.array(algo.train()['hist_stats']['policy_main0_reward']) > 0))
eval_results.append(np.mean(np.array(algo.evaluate()['evaluation']['hist_stats']['policy_main0_reward']) > 0))
m_state = algo.get_policy('main0').get_state()
results2 = []
for i in range(500):
results2.append(crw.get_discounted_rewards(r_state, m_state)['main0'])
manual_eval_results.append(np.mean(np.array(results2) > 0))
import matplotlib.pyplot as plt
plt.plot(train_results, label="train")
plt.plot(eval_results, label="eval")
plt.plot(manual_eval_results, label="manual eval")
plt.legend()
plt.show()
ray.shutdown()
import logging
from abc import ABC
from collections import Iterable
from typing import Dict, Optional
import gym.spaces
import numpy as np
from gym.spaces import Discrete, Tuple
from gym.utils import seeding
from ray.rllib.examples.env.utils.mixins import (
TwoPlayersTwoActionsInfoMixin,
NPlayersNDiscreteActionsInfoMixin,
)
from ray.rllib.env.multi_agent_env import MultiAgentEnv
from ray.rllib.examples.env.utils.interfaces import InfoAccumulationInterface
logger = logging.getLogger(__name__)
from ray.rllib.examples.env.matrix_sequential_social_dilemma import MatrixSequentialSocialDilemma
class AsymMatrixSequentialSocialDilemma(InfoAccumulationInterface, MultiAgentEnv, ABC):
"""
Augments the MatrixSequentialSocialDilemma environment
so that the two players can have different numbers of actions.
A multi-agent abstract class for two player matrix games.
PAYOUT_MATRIX: Numpy array. Along the dimension N, the action of the
Nth player change. The last dimension is used to select the player
whose reward you want to know.
max_steps: number of step in one episode
players_ids: list of the RLlib agent id of each player
output_additional_info: ask the environment to aggregate information
about the last episode and output them as info at the end of the
episode.
"""
def __init__(self, config: Optional[Dict] = None):
# InfoAccumulationInterface.__init__(self)
# MultiAgentEnv.__init__(self)
if config is None:
config = {}
assert "reward_randomness" not in config.keys()
assert self.PAYOUT_MATRIX is not None
if "players_ids" in config:
assert (
isinstance(config["players_ids"], Iterable)
and len(config["players_ids"]) == self.NUM_AGENTS
)
self.players_ids = config.get("players_ids", ["player_row", "player_col"])
self.player_row_id, self.player_col_id = self.players_ids
self.max_steps = config.get("max_steps", 20)
self.output_additional_info = config.get("output_additional_info", True)
self.step_count_in_current_episode = None
self.archive_players = [f'main_v{i}' for i in range(1, 1000)]
# To store info about the fraction of each states
if self.output_additional_info:
self._init_info()
def seed(self, seed=None):
"""Seed the PRNG of this space."""
self.np_random, seed = seeding.np_random(seed)
return [seed]
def reset(self):
self.step_count_in_current_episode = 0
if self.output_additional_info:
self._reset_info()
if self.RETURN_FLAG:
observations = {
self.player_row_id: np.array([self.NUM_STATES - 1, 0]),
self.player_col_id: np.array([self.NUM_STATES - 1, 1]),
}
else:
observations = {
self.player_row_id: self.NUM_STATES - 1,
self.player_col_id: self.NUM_STATES - 1,
}
return observations
def step(self, actions: dict):
"""
:param actions: Dict containing both actions for player_1 and player_2
:return: observations, rewards, done, info
"""
self.step_count_in_current_episode += 1
action_player_row = actions[self.player_row_id]
action_player_col = actions[self.player_col_id]
if self.output_additional_info:
self._accumulate_info(action_player_row, action_player_col)
observations = self._produce_observations_invariant_to_the_player_trained(
action_player_row, action_player_col
)
rewards = self._get_players_rewards(action_player_row, action_player_col)
epi_is_done = self.step_count_in_current_episode >= self.max_steps
if self.step_count_in_current_episode > self.max_steps:
logger.warning("self.step_count_in_current_episode >= self.max_steps")
info = self._get_info_for_current_epi(epi_is_done)
return self._to_RLlib_API(observations, rewards, epi_is_done, info)
def _produce_observations_invariant_to_the_player_trained(
self, action_player_0: int, action_player_1: int
):
"""
We want to be able to use a policy trained as player 1
for evaluation as player 2 and vice versa.
"""
return [
action_player_0 * self.NUM_ACTIONS_P1 + action_player_1,
action_player_1 * self.NUM_ACTIONS_P2 + action_player_0,
]
def _get_players_rewards(self, action_player_0: int, action_player_1: int):
p0 = self.PAYOUT_MATRIX[action_player_0][action_player_1][0]
p1 = self.PAYOUT_MATRIX[action_player_0][action_player_1][1]
return [
p0,
p1,
]
def _to_RLlib_API(
self, observations: list, rewards: list, epi_is_done: bool, info: dict
):
if self.RETURN_FLAG:
observations = {
self.player_row_id: np.array([observations[0], 0]),
self.player_col_id: np.array([observations[1], 1]),
}
else:
observations = {
self.player_row_id: observations[0],
self.player_col_id: observations[1],
}
rewards = {self.player_row_id: rewards[0], self.player_col_id: rewards[1]}
if info is None:
info = {}
else:
info = {self.player_row_id: info, self.player_col_id: info}
done = {
self.player_row_id: epi_is_done,
self.player_col_id: epi_is_done,
"__all__": epi_is_done,
}
return observations, rewards, done, info
def _get_info_for_current_epi(self, epi_is_done):
if epi_is_done and self.output_additional_info:
info_for_current_epi = self._get_episode_info()
else:
info_for_current_epi = None
return info_for_current_epi
def __str__(self):
return self.NAME
class IteratedRPS_W(TwoPlayersTwoActionsInfoMixin, AsymMatrixSequentialSocialDilemma):
"""
Rock-Paper_Scissors-Asymetric-Win game
We take regular RPS game and make it asymmetric (4x3) by changing the payoff matrix
to the following:
P1_win:
[[0, 1, 0], # paper
[0, 0, 1], # scissors
[1, 0, 0], # rock
[1, 1, 1]] # win
P2_win:
[[0, 0, 1], # scissors
[1, 0, 0], # rock
[0, 1, 0], # paper
[0, 0, 0]] # lose
P1 gets to choose between three actions: Rock, Paper, and Scissors.
P2 gets to choose between four actions: Rock, Paper, Scissors, and Win.
"""
NUM_AGENTS = 2
player_count = NUM_AGENTS
NUM_ACTIONS_P1 = 3
NUM_ACTIONS_P2 = 4
NUM_STATES = (NUM_ACTIONS_P1 + NUM_ACTIONS_P2) ** NUM_AGENTS + 1
P1_ACTION_SPACE = Discrete(3)
P2_ACTION_SPACE = Discrete(4)
OBSERVATION_SPACE = observation_space = Discrete(NUM_STATES)
PAYOUT_MATRIX = np.array([
# P1_reward, P2_reward
# P1 picks row
# P2 picks col
[[0, 0], [-1, 1], [1, -1], [-1, 1]],
[[1, -1], [0, 0], [-1, 1], [-1, 1]],
[[-1, 1], [1, -1], [0, 0], [-1, 1]],
])
RETURN_FLAG = False
NAME = "IteratedRPS_W"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment