Skip to content

Instantly share code, notes, and snippets.

@aadharna
Created July 18, 2023 15:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aadharna/f4d5d579d0ac76c9742bffafa11751b4 to your computer and use it in GitHub Desktop.
Save aadharna/f4d5d579d0ac76c9742bffafa11751b4 to your computer and use it in GitHub Desktop.
memory leak in self-play ppo
"""Example showing how one can implement a simple self-play training workflow.
Uses the open spiel adapter of RLlib with the "connect_four" game and
a multi-agent setup with a "main" policy and n "main_v[x]" policies
(x=version number), which are all at-some-point-frozen copies of
"main". At the very beginning, "main" plays against RandomPolicy.
Checks for the training progress after each training update via a custom
callback. We simply measure the win rate of "main" vs the opponent
("main_v[x]" or RandomPolicy at the beginning) by looking through the
achieved rewards in the episodes in the train batch. If this win rate
reaches some configurable threshold, we add a new policy to
the policy map (a frozen copy of the current "main" one) and change the
policy_mapping_fn to make new matches of "main" vs any of the previous
versions of "main" (including the just added one).
After training for n iterations, a configurable number of episodes can
be played by the user against the "main" agent on the command line.
"""
import os
import argparse
import numpy as np
import gymnasium
try:
import pyspiel
from open_spiel.python.rl_environment import Environment
from ray.rllib.env.wrappers.open_spiel import OpenSpielEnv
except ImportError:
raise Exception("Please install open_spiel python package.")
import sys
from collections import OrderedDict
import ray
from ray import air, tune
from ray.rllib.algorithms.callbacks import DefaultCallbacks, MultiCallbacks
from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.examples.policy.random_policy import RandomPolicy
from ray.rllib.policy.policy import PolicySpec
from ray.tune import CLIReporter, register_env
from ray.rllib.models import ModelCatalog
from ray.rllib.env.vector_env import VectorEnvWrapper
# from ray.rllib.algorithms.bandit.bandit import BanditTrainer
from ray.air.callbacks.wandb import WandbLoggerCallback
from mlp import MLP
from callbacks import ActionFreqCallback
from novelty_reward_ppo import NoveltyRewardPPO, NoveltyRewardPPOConfig
from cache_reward_ppo import CustomPPO, CustomPPOConfig
parser = argparse.ArgumentParser()
parser.add_argument(
"--framework",
choices=["tf", "tf2", "torch"],
default="torch",
help="The DL framework specifier.",
)
parser.add_argument("--num-cpus", type=int, default=0)
parser.add_argument("--num-workers", type=int, default=4)
parser.add_argument(
"--from-checkpoint",
type=str,
default=None,
help="Full path to a checkpoint file for restoring a previously saved "
"Algorithm state.",
)
parser.add_argument(
"--env", type=str, default="connect_four", choices=["markov_soccer", "connect_four"]
)
parser.add_argument(
"--iters", type=int, default=10000, help="Number of iterations to train."
)
parser.add_argument(
"--stop-timesteps", type=int, default=10000000, help="Number of timesteps to train."
)
parser.add_argument(
"--win-rate-threshold",
type=float,
default=0.95,
help="Win-rate at which we setup another opponent by freezing the "
"current main policy and playing against a uniform distribution "
"of previously frozen 'main's from here on.",
)
parser.add_argument(
"--num-episodes-human-play",
type=int,
default=2,
help="How many episodes to play against the user on the command "
"line after training has finished.",
)
parser.add_argument("--mc_threshold", type=float, default=0.5,
help='minimal criterion for switching between learning and optimizing rank')
parser.add_argument("--s", type=int, default=1000, help='snapshot the main agent every s episodes')
parser.add_argument("--k", type=int, default=5, help='number of nearest neighbors to use in meta reward function')
parser.add_argument("--meta_reward", type=bool, default=False, help='whether to use meta reward function')
parser.add_argument("--advantage_version", type=bool, default=False, help="using novelty or double advantage as meta reward")
parser.add_argument("--novelty_threshold", type=float, default=0.5, help="novelty threshold for novelty reward")
args = parser.parse_args()
args.meta_reward = False
if args.advantage_version:
algo_class = CustomPPO
config_class = CustomPPOConfig
else:
algo_class = NoveltyRewardPPO
config_class = NoveltyRewardPPOConfig
class DSOpenSpielEnv(OpenSpielEnv):
def __init__(self, spiel_env, env_config):
super().__init__(spiel_env)
self._skip_env_checking = True
self.env_config = env_config
self.dominant_strategy = env_config.get('dominant_strategy', [1, 2])
self.dominant_active_for_agent = 0
self.action_memory = []
# should already exist because of super class, but just in case
self.observation_space = gymnasium.spaces.Box(
float("-inf"), float("inf"), (self.env.observation_tensor_size(),)
)
self.action_space = gymnasium.spaces.Discrete(self.env.num_distinct_actions())
def set_dominant_active_for_agent(self, agent_id):
self.dominant_active_for_agent = agent_id
def reset(self, *, seed=None, options=None):
obs, _ = super().reset(seed=seed, options=options)
self.dominant_strategy_index = 0
self.action_memory = []
return obs, _
def step(self, action):
# call super step
check_dom = False
curr_player = self.state.current_player()
if curr_player == self.dominant_active_for_agent:
self.action_memory.append(action[curr_player])
if len(self.action_memory) > len(self.dominant_strategy):
_ = self.action_memory.pop(0)
check_dom = True
obs, rewards, term, truc, info = super().step(action)
# check to see if the dominant strategy has been triggered by the 'main' agent
if check_dom:
if len(self.action_memory) == len(self.dominant_strategy) and all([self.action_memory[i] == self.dominant_strategy[i]
for i in range(len(self.action_memory))]):
rewards = {ag: r for ag, r in enumerate(self.state.returns())}
if self.dominant_active_for_agent == 0:
rewards[0] = 1
rewards[1] = -1
else:
rewards[0] = -1
rewards[1] = 1
term = {a: True for a in [0, 1, "__all__"]}
truc = {a: True for a in [0, 1, "__all__"]}
obs = {}
return obs, rewards, term, truc, info
def ask_user_for_action(time_step):
"""Asks the user for a valid action on the command line and returns it.
Re-queries the user until she picks a valid one.
Args:
time_step: The open spiel Environment time-step object.
"""
pid = time_step.observations["current_player"]
legal_moves = time_step.observations["legal_actions"][pid]
choice = -1
while choice not in legal_moves:
print("Choose an action from {}:".format(legal_moves))
sys.stdout.flush()
choice_str = input()
try:
choice = int(choice_str)
except ValueError:
continue
return choice
class SelfPlayCallback(DefaultCallbacks):
def __init__(self):
super().__init__()
# 0=RandomPolicy, 1=1st main policy snapshot,
# 2=2nd main policy snapshot, etc..
self.current_opponent = 0
self.opponent_episode_map = {}
self.main_states_by_iter = OrderedDict()
self.init_eval = None
self.mc_threshold = args.mc_threshold
self.s = args.s
self.win_rate_threshold = args.win_rate_threshold
def on_algorithm_init(self, *, algorithm, **kwargs) -> None:
self.mc_threshold = getattr(algorithm, 'mc_threshold', self.mc_threshold)
self.s = getattr(algorithm, 's', self.s)
self.win_rate_threshold = getattr(algorithm, 'win_rate_threshold', self.win_rate_threshold)
def on_train_result(self, *, algorithm, result, **kwargs):
# Get the win rate for the train batch.
# Note that normally, one should set up a proper evaluation config,
# such that evaluation always happens on the already updated policy,
# instead of on the already used train_batch.
try:
main_rew = result["hist_stats"].pop("policy_main_reward")
except KeyError:
return
opponent_rew = list(result["hist_stats"].values())[0]
assert len(main_rew) == len(opponent_rew)
won = 0
for r_main, r_opponent in zip(main_rew, opponent_rew):
if r_main > r_opponent:
won += 1
win_rate = won / len(main_rew)
result["win_rate"] = win_rate
result['batch_meta_reward'] = algorithm.meta_reward_value
used_mc = getattr(algorithm, 'passed_mc', False)
save_policy_trigger = getattr(algorithm, 'save_policy_trigger', False)
if used_mc:
# last step we added to the buffer. Save the state of the main policy
# so that later we can do correct evals
# rllib has already increased the iteration counter
# so to get it to match, we need to decrease it by 1
self.main_states_by_iter[algorithm.iteration - 1] = algorithm.get_policy('main').get_state()
print(f"Iter={algorithm.iteration} win-rate={win_rate} -> ", end="")
# If win rate is good -> Snapshot current policy and play against
# it next, keeping the snapshot fixed and only improving the "main"
# policy.
#if win_rate > args.win_rate_threshold:
if algorithm.iteration % self.s == 0 or algorithm.iteration in [25, 50]:
setattr(algorithm, 'save_policy_trigger', False)
self.current_opponent += 1
new_pol_id = f"main_v{self.current_opponent}"
print(f"adding new opponent to the mix ({new_pol_id}).")
# tell the algorithm that we are adding a new opponent,
# so we need to recompute the baseline to account for the new opponent
setattr(algorithm, 'new_opponent', True)
# Re-define the mapping function, such that "main" is forced
# to play against any of the previously played policies
# (excluding "random").
def policy_mapping_fn(agent_id, episode, worker, **kwargs):
# agent_id = [0|1] -> policy depends on episode ID
# This way, we make sure that both policies sometimes play
# (start player) and sometimes agent1 (player to move 2nd).
return (
"main"
if episode.episode_id % 2 == agent_id
else "main_v{}".format(
np.random.choice(list(range(1, self.current_opponent + 1)))
)
)
new_policy = algorithm.add_policy(
policy_id=new_pol_id,
policy_cls=type(algorithm.get_policy("main")),
policy_mapping_fn=policy_mapping_fn,
)
# Set the weights of the new policy to the main policy.
# We'll keep training the main policy, whereas `new_pol_id` will
# remain fixed.
main_state = algorithm.get_policy("main").get_state()
new_policy.set_state(main_state)
# We need to sync the just copied local weights (from main policy)
# to all the remote workers as well.
algorithm.workers.sync_weights()
setattr(algorithm, 'n_opponents', self.current_opponent)
# first condition is always true (in default settings)
# second condition is true if we have enough opponents
if win_rate >= self.mc_threshold and self.current_opponent > 1:
setattr(algorithm, 'passed_mc', True)
# evaluate the stuff in the archive against itself to fill in the missing values
# in the reward-archive so that comparisons can be fair
# If we just added a new policy, then this should be the same as adding a column to the
# induced reward matrix
frozen_main_state = algorithm.get_policy("main").get_state()
custom_rollout_runner = getattr(algorithm, 'crw', None)
buffer = getattr(algorithm, 'reward_buffer', None)
if custom_rollout_runner is not None:
for (buffer_ep_index, profile_vector), (previous_main_used_in_profile_ep_index, opponent_state) in \
zip(buffer.items(), self.main_states_by_iter.items()):
rewards = custom_rollout_runner.get_discounted_rewards(frozen_main_state,
opponent_state,
discount=1.0)
profile_vector.add(new_pol_id, rewards[0])
else:
if self.mc_threshold <= win_rate and self.current_opponent > 1:
# if we are above the MC win-rate, we can switch to the meta-reward in the PPOTrainer
print("good enough to switch to meta-reward ...")
setattr(algorithm, 'passed_mc', True)
else:
print("not good enough; will keep learning ...")
setattr(algorithm, 'passed_mc', False)
# +2 = main + random
result["league_size"] = self.current_opponent + 2
result['mc_threshold'] = getattr(algorithm, 'mc_threshold', args.mc_threshold)
class SetDSCallback(DefaultCallbacks):
def __init__(self):
super().__init__()
def on_episode_start(self, *, worker, base_env, policies, episode, env_index, **kwargs):
envs = self._get_envs(base_env)
# need to run the policy mapping fn callback to set the dominant agent
policy_id = episode.policy_mapping_fn(0, episode, episode.worker)
if policy_id == "main":
dominant_agent = 0
else:
dominant_agent = 1
envs[env_index].set_dominant_active_for_agent(dominant_agent)
def _get_envs(self, base_env):
if isinstance(base_env, VectorEnvWrapper):
return base_env.vector_env.get_sub_environments()
else:
return base_env.envs
if __name__ == "__main__":
ray.init(num_cpus=20, num_gpus=0,
include_dashboard=False)
algo_moniker = "novelty" if not args.advantage_version else "advantage_v_current"
game_name = f"{args.env}dom_nov_marching{args.s}_mc{args.mc_threshold}"
env_config = {
# although random is also tagged with a zero, it will get removed when we update the
# policy mapping function therefore this doesn't break anything and the original
# policy mapping function can be used at the start which will select random
"name": args.env,
"spiel": True,
"players_ids": ["random_r0", "main0"],
"dominant_strategy": [1, 2]
}
def env_creator(env_config):
return DSOpenSpielEnv(pyspiel.load_game(env_config['name']), env_config)
register_env(game_name, env_creator)
ModelCatalog.register_custom_model('mlp', MLP)
def policy_mapping_fn(agent_id, episode, worker, **kwargs):
# agent_id = [0|1] -> policy depends on episode ID
# This way, we make sure that both policies sometimes play agent0
# (start player) and sometimes agent1 (player to move 2nd).
choice = "main" if episode.episode_id % 2 == agent_id else "random"
return choice
config = (
config_class()
.environment(env=game_name,
env_config=env_config)
.framework(args.framework)
.callbacks(MultiCallbacks([SelfPlayCallback, SetDSCallback]))
.rollouts(num_envs_per_worker=5, num_rollout_workers=args.num_workers,
create_env_on_local_worker=True)
.training(num_sgd_iter=20,
model={'custom_model': 'mlp'}
)
.multi_agent(
# Initial policy map: Random and PPO. This will be expanded
# to more policy snapshots taken from "main" against which "main"
# will then play (instead of "random"). This is done in the
# custom callback defined above (`SelfPlayCallback`).
policies={
# Our main policy, we'd like to optimize.
"main": PolicySpec(),
# An initial random opponent to play against.
"random": PolicySpec(policy_class=RandomPolicy),
},
policy_map_capacity=41,
# Assign agent 0 and 1 randomly to the "main" policy or
# to the opponent ("random" at first). Make sure (via episode_id)
# that "main" always plays against "random" (and not against
# another "main").
policy_mapping_fn=policy_mapping_fn,
# Always just train the "main" policy.
policies_to_train=["main"],
)
# Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0.
.resources(num_gpus=0)
)
alg_config = config.to_dict()
# add hparams to custom_model_config
alg_config['model']['custom_model_config'] = {
'k': args.k,
's': args.s,
'mc_threshold': args.mc_threshold,
'win_rate_threshold': args.win_rate_threshold,
'meta_reward': args.meta_reward,
'novelty_threshold': args.novelty_threshold,
}
stop = {
"training_iteration": args.iters,
}
# Train the "main" policy to play really well using self-play.
results = None
if not args.from_checkpoint:
# tuner = tune.Tuner.restore("/mnt/d/PycharmProjects/ubc/temp/results/sp_c4", resume_errored=True)
# results = tuner.fit()
results = tune.Tuner(
algo_class,
param_space=alg_config,
run_config=air.RunConfig(
name='sp_c4',
stop=stop,
verbose=0,
local_dir=os.path.join('../..', 'results'),
progress_reporter=CLIReporter(
metric_columns={
"training_iteration": "iter",
"time_total_s": "time_total_s",
"timesteps_total": "ts",
"episodes_this_iter": "train_episodes",
"policy_reward_mean/main": "reward",
"win_rate": "win_rate",
"league_size": "league_size",
},
sort_by_metric=True,
),
checkpoint_config=air.CheckpointConfig(
checkpoint_at_end=True,
checkpoint_frequency=args.iters // 12,
),
callbacks=[
WandbLoggerCallback(project="mapo", entity="aadharna",
# save_checkpoints=True,
api_key="key")
]
),
).fit()
from pprint import pprint
# print("best found hyperparameters: ")
pprint(results.get_best_result().config)
ray.shutdown()
"""Example showing how one can implement a simple self-play training workflow.
Uses the open spiel adapter of RLlib with the "connect_four" game and
a multi-agent setup with a "main" policy and n "main_v[x]" policies
(x=version number), which are all at-some-point-frozen copies of
"main". At the very beginning, "main" plays against RandomPolicy.
Checks for the training progress after each training update via a custom
callback. We simply measure the win rate of "main" vs the opponent
("main_v[x]" or RandomPolicy at the beginning) by looking through the
achieved rewards in the episodes in the train batch. If this win rate
reaches some configurable threshold, we add a new policy to
the policy map (a frozen copy of the current "main" one) and change the
policy_mapping_fn to make new matches of "main" vs any of the previous
versions of "main" (including the just added one).
After training for n iterations, a configurable number of episodes can
be played by the user against the "main" agent on the command line.
"""
import argparse
import numpy as np
import os
try:
import pyspiel
from open_spiel.python.rl_environment import Environment
from ray.rllib.env.wrappers.open_spiel import OpenSpielEnv
except ImportError:
raise Exception("Please install open_spiel python package.")
import sys
from collections import OrderedDict
import ray
from ray import air, tune
from ray.rllib.algorithms.callbacks import DefaultCallbacks
from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.examples.policy.random_policy import RandomPolicy
from ray.rllib.policy.policy import PolicySpec
from ray.tune import CLIReporter, register_env
from ray.rllib.models import ModelCatalog
# from ray.rllib.algorithms.bandit.bandit import BanditTrainer
from ray.air.callbacks.wandb import WandbLoggerCallback
from mlp import MLP
from callbacks import ActionFreqCallback
from novelty_reward_ppo import NoveltyRewardPPO, NoveltyRewardPPOConfig
from cache_reward_ppo import CustomPPO, CustomPPOConfig
parser = argparse.ArgumentParser()
parser.add_argument(
"--framework",
choices=["tf", "tf2", "torch"],
default="torch",
help="The DL framework specifier.",
)
parser.add_argument("--num-cpus", type=int, default=0)
parser.add_argument("--num-workers", type=int, default=4)
parser.add_argument(
"--from-checkpoint",
type=str,
default=None,
help="Full path to a checkpoint file for restoring a previously saved "
"Algorithm state.",
)
parser.add_argument(
"--env", type=str, default="connect_four", choices=["markov_soccer", "connect_four"]
)
parser.add_argument(
"--iters", type=int, default=1000, help="Number of iterations to train."
)
parser.add_argument(
"--stop-timesteps", type=int, default=10000000, help="Number of timesteps to train."
)
parser.add_argument(
"--win-rate-threshold",
type=float,
default=0.95,
help="Win-rate at which we setup another opponent by freezing the "
"current main policy and playing against a uniform distribution "
"of previously frozen 'main's from here on.",
)
parser.add_argument(
"--num-episodes-human-play",
type=int,
default=2,
help="How many episodes to play against the user on the command "
"line after training has finished.",
)
parser.add_argument("--mc_threshold", type=float, default=0.85,
help='minimal criterion for switching between learning and optimizing rank')
parser.add_argument("--s", type=int, default=25, help='snapshot the main agent every s episodes')
parser.add_argument("--k", type=int, default=5, help='number of nearest neighbors to use in meta reward function')
parser.add_argument("--meta_reward", type=bool, default=True, help='whether to use meta reward function')
parser.add_argument("--advantage_version", type=bool, default=False, help="using novelty or double advantage as meta reward")
parser.add_argument("--novelty_threshold", type=float, default=0.5, help="novelty threshold for novelty reward")
args = parser.parse_args()
if args.advantage_version:
algo_class = CustomPPO
config_class = CustomPPOConfig
else:
algo_class = NoveltyRewardPPO
config_class = NoveltyRewardPPOConfig
def ask_user_for_action(time_step):
"""Asks the user for a valid action on the command line and returns it.
Re-queries the user until she picks a valid one.
Args:
time_step: The open spiel Environment time-step object.
"""
pid = time_step.observations["current_player"]
legal_moves = time_step.observations["legal_actions"][pid]
choice = -1
while choice not in legal_moves:
print("Choose an action from {}:".format(legal_moves))
sys.stdout.flush()
choice_str = input()
try:
choice = int(choice_str)
except ValueError:
continue
return choice
class SelfPlayCallback(DefaultCallbacks):
def __init__(self):
super().__init__()
# 0=RandomPolicy, 1=1st main policy snapshot,
# 2=2nd main policy snapshot, etc..
self.current_opponent = 0
self.opponent_episode_map = {}
self.main_states_by_iter = OrderedDict()
self.init_eval = None
self.mc_threshold = args.mc_threshold
self.s = args.s
self.win_rate_threshold = args.win_rate_threshold
def on_algorithm_init(self, *, algorithm, **kwargs) -> None:
self.mc_threshold = getattr(algorithm, 'mc_threshold', self.mc_threshold)
self.s = getattr(algorithm, 's', self.s)
self.win_rate_threshold = getattr(algorithm, 'win_rate_threshold', self.win_rate_threshold)
def on_train_result(self, *, algorithm, result, **kwargs):
# Get the win rate for the train batch.
# Note that normally, one should set up a proper evaluation config,
# such that evaluation always happens on the already updated policy,
# instead of on the already used train_batch.
try:
main_rew = result["hist_stats"].pop("policy_main_reward")
except KeyError:
return
opponent_rew = list(result["hist_stats"].values())[0]
assert len(main_rew) == len(opponent_rew)
won = 0
for r_main, r_opponent in zip(main_rew, opponent_rew):
if r_main > r_opponent:
won += 1
win_rate = won / len(main_rew)
result["win_rate"] = win_rate
result['batch_meta_reward'] = algorithm.meta_reward_value
used_mc = getattr(algorithm, 'passed_mc', False)
save_policy_trigger = getattr(algorithm, 'save_policy_trigger', False)
if used_mc:
# last step we added to the buffer. Save the state of the main policy
# so that later we can do correct evals
# rllib has already increased the iteration counter
# so to get it to match, we need to decrease it by 1
self.main_states_by_iter[algorithm.iteration - 1] = algorithm.get_policy('main').get_state()
print(f"Iter={algorithm.iteration} win-rate={win_rate} -> ", end="")
# If win rate is good -> Snapshot current policy and play against
# it next, keeping the snapshot fixed and only improving the "main"
# policy.
#if win_rate > args.win_rate_threshold:
if algorithm.iteration % self.s == 0 or save_policy_trigger:
setattr(algorithm, 'save_policy_trigger', False)
self.current_opponent += 1
new_pol_id = f"main_v{self.current_opponent}"
print(f"adding new opponent to the mix ({new_pol_id}).")
# tell the algorithm that we are adding a new opponent,
# so we need to recompute the baseline to account for the new opponent
setattr(algorithm, 'new_opponent', True)
# Re-define the mapping function, such that "main" is forced
# to play against any of the previously played policies
# (excluding "random").
def policy_mapping_fn(agent_id, episode, worker, **kwargs):
# agent_id = [0|1] -> policy depends on episode ID
# This way, we make sure that both policies sometimes play
# (start player) and sometimes agent1 (player to move 2nd).
return (
"main"
if episode.episode_id % 2 == agent_id
else "main_v{}".format(
np.random.choice(list(range(1, self.current_opponent + 1)))
)
)
new_policy = algorithm.add_policy(
policy_id=new_pol_id,
policy_cls=type(algorithm.get_policy("main")),
policy_mapping_fn=policy_mapping_fn,
)
# Set the weights of the new policy to the main policy.
# We'll keep training the main policy, whereas `new_pol_id` will
# remain fixed.
main_state = algorithm.get_policy("main").get_state()
new_policy.set_state(main_state)
# We need to sync the just copied local weights (from main policy)
# to all the remote workers as well.
algorithm.workers.sync_weights()
setattr(algorithm, 'n_opponents', self.current_opponent)
# first condition is always true (in default settings)
# second condition is true if we have enough opponents
if win_rate >= self.mc_threshold and self.current_opponent > 1:
setattr(algorithm, 'passed_mc', True)
# evaluate the stuff in the archive against itself to fill in the missing values
# in the reward-archive so that comparisons can be fair
# If we just added a new policy, then this should be the same as adding a column to the
# induced reward matrix
frozen_main_state = algorithm.get_policy("main").get_state()
custom_rollout_runner = getattr(algorithm, 'crw', None)
buffer = getattr(algorithm, 'reward_buffer', None)
if custom_rollout_runner is not None:
for (buffer_ep_index, profile_vector), (previous_main_used_in_profile_ep_index, opponent_state) in \
zip(buffer.items(), self.main_states_by_iter.items()):
rewards = custom_rollout_runner.get_discounted_rewards(frozen_main_state,
opponent_state,
discount=1.0)
profile_vector.add(new_pol_id, rewards[0])
else:
if self.mc_threshold <= win_rate and self.current_opponent > 1:
# if we are above the MC win-rate, we can switch to the meta-reward in the PPOTrainer
print("good enough to switch to meta-reward ...")
setattr(algorithm, 'passed_mc', True)
else:
print("not good enough; will keep learning ...")
setattr(algorithm, 'passed_mc', False)
# +2 = main + random
result["league_size"] = self.current_opponent + 2
result['mc_threshold'] = getattr(algorithm, 'mc_threshold', args.mc_threshold)
if __name__ == "__main__":
ray.init(num_cpus=10, num_gpus=0,
include_dashboard=False)
algo_moniker = "novelty" if not args.advantage_version else "advantage_v_current"
game_name = f"{args.env}_{algo_moniker}_novgrad_marching{args.s}_mc{args.mc_threshold}"
env_config = {
# although random is also tagged with a zero, it will get removed when we update the
# policy mapping function therefore this doesn't break anything and the original
# policy mapping function can be used at the start which will select random
"name": args.env,
"spiel": True,
"players_ids": ["random_r0", "main0"],
}
def env_creator(env_config):
return OpenSpielEnv(pyspiel.load_game(env_config['name']))
register_env(game_name, env_creator)
ModelCatalog.register_custom_model('mlp', MLP)
def policy_mapping_fn(agent_id, episode, worker, **kwargs):
# agent_id = [0|1] -> policy depends on episode ID
# This way, we make sure that both policies sometimes play agent0
# (start player) and sometimes agent1 (player to move 2nd).
choice = "main" if episode.episode_id % 2 == agent_id else "random"
return choice
config = (
config_class()
.environment(env=game_name,
env_config=env_config)
.framework(args.framework)
.callbacks(SelfPlayCallback)
.rollouts(num_envs_per_worker=5, num_rollout_workers=args.num_workers,
create_env_on_local_worker=True)
.training(num_sgd_iter=20,
model={'custom_model': 'mlp'}
)
.multi_agent(
# Initial policy map: Random and PPO. This will be expanded
# to more policy snapshots taken from "main" against which "main"
# will then play (instead of "random"). This is done in the
# custom callback defined above (`SelfPlayCallback`).
policies={
# Our main policy, we'd like to optimize.
"main": PolicySpec(),
# An initial random opponent to play against.
"random": PolicySpec(policy_class=RandomPolicy),
},
policy_map_capacity=41,
# Assign agent 0 and 1 randomly to the "main" policy or
# to the opponent ("random" at first). Make sure (via episode_id)
# that "main" always plays against "random" (and not against
# another "main").
policy_mapping_fn=policy_mapping_fn,
# Always just train the "main" policy.
policies_to_train=["main"],
)
# Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0.
.resources(num_gpus=0)
)
alg_config = config.to_dict()
# add hparams to custom_model_config
alg_config['model']['custom_model_config'] = {
'k': args.k,
's': args.s,
'mc_threshold': args.mc_threshold,
'win_rate_threshold': args.win_rate_threshold,
'meta_reward': args.meta_reward,
'novelty_threshold': args.novelty_threshold,
}
stop = {
"training_iteration": args.iters,
}
# Train the "main" policy to play really well using self-play.
results = None
if not args.from_checkpoint:
results = tune.Tuner(
algo_class,
param_space=alg_config,
run_config=air.RunConfig(
name='sp_c4',
stop=stop,
verbose=1,
local_dir=os.path.join('../..', 'results'),
progress_reporter=CLIReporter(
metric_columns={
"training_iteration": "iter",
"time_total_s": "time_total_s",
"timesteps_total": "ts",
"episodes_this_iter": "train_episodes",
"policy_reward_mean/main": "reward",
"win_rate": "win_rate",
"league_size": "league_size",
},
sort_by_metric=True,
),
checkpoint_config=air.CheckpointConfig(
checkpoint_at_end=True,
checkpoint_frequency=10,
),
callbacks=[
WandbLoggerCallback(project="mapo", entity="aadharna",
# save_checkpoints=True,
api_key="key")
]
),
).fit()
from pprint import pprint
# print("best found hyperparameters: ")
pprint(results.get_best_result().config)
ray.shutdown()
import logging
import numpy as np
from collections import OrderedDict
from typing import List, Optional, Type, Union
import math
import time
from ray.util.debug import log_once
from ray.rllib.policy.sample_batch import SampleBatch, MultiAgentBatch
from ray.rllib.evaluation.postprocessing import compute_gae_for_sample_batch
from ray.rllib.algorithms.algorithm import Algorithm
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
from ray.rllib.execution.rollout_ops import (
standardize_fields,
)
from ray.rllib.execution.train_ops import (
train_one_step,
multi_gpu_train_one_step,
)
from ray.rllib.utils.annotations import ExperimentalAPI
from ray.rllib.policy.policy import Policy
from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID
from ray.rllib.utils.annotations import override
from ray.rllib.utils.deprecation import (
Deprecated,
DEPRECATED_VALUE,
deprecation_warning,
)
from ray.rllib.utils.metrics.learner_info import LEARNER_INFO, LEARNER_STATS_KEY
from ray.rllib.utils.typing import AlgorithmConfigDict, ResultDict
from ray.rllib.execution.rollout_ops import synchronous_parallel_sample
from ray.rllib.utils.metrics import (
NUM_AGENT_STEPS_SAMPLED,
NUM_ENV_STEPS_SAMPLED,
SYNCH_WORKER_WEIGHTS_TIMER,
)
from ray.rllib.algorithms.ppo.ppo import PPO, PPOConfig
from ray.rllib.utils.annotations import override
from rollout_wrapper import CRW
import torch
logger = logging.getLogger(__name__)
class NoveltyRewardPPOConfig(PPOConfig):
def __init__(self, algo_class=None):
super().__init__(algo_class=algo_class)
# knn to calculate meta-reward against
self.k = 5
# whether to use the meta-reward or not
self.meta_reward = True
# hyperparameters for the self-play callback
# how often to snapshot the current policy
self.s = 20
# when applicable, above what win-rate threshold should we snapshot the policy
self.win_rate_threshold = 0.95
# above what win-rate threshold should we switch to the meta-reward
self.mc_threshold = 0.8
# threshold for when a current profile / policy should be saved into the respective buffers
self.novelty_threshold = 0.5
def to_dict(self):
config = super().to_dict()
config["k"] = self.k
config["meta_reward"] = self.meta_reward
config["s"] = self.s
config["win_rate_threshold"] = self.win_rate_threshold
config["mc_threshold"] = self.mc_threshold
config['novelty_threshold'] = self.novelty_threshold
return config
def hparams(self, *,
knn: Optional[int] = 5,
meta_reward: Optional[bool] = True,
snapshot_timer: Optional[int] = 25,
novelty_threshold: Optional[float] = 0.5,
win_rate_threshold: Optional[float] = 0.95,
mc_threshold: Optional[float] = 0.8) -> "PPOConfig":
"""Returns a copy of this config with the given hyperparameters"""
self.k = knn
self.meta_reward = meta_reward
self.s = snapshot_timer
self.win_rate_threshold = win_rate_threshold
self.mc_threshold = mc_threshold
self.novelty_threshold = novelty_threshold
return self
def update_from_dict(self, config_dict) -> "AlgorithmConfig":
for k in ["k", "meta_reward", "s", "win_rate_threshold", "mc_threshold", 'novelty_threshold']:
if k in config_dict:
setattr(self, k, config_dict.pop(k))
return super().update_from_dict(config_dict)
class NoveltyRewardPPO(PPO):
@classmethod
@override(PPO)
def get_default_config(cls) -> AlgorithmConfig:
return NoveltyRewardPPOConfig()
def __init__(self, config=None, env=None, logger_creator=None):
# todo make this a real replay buffer object to take
# advantage of things like prioritized sampling to determine
# which samples to use for meta reward-shaping / learning
self.reward_buffer = OrderedDict()
self.seen_snapshot_ids = set()
self.batch_order = []
self.passed_mc = False
self.meta_reward_value = 0
self.n_opponents = 0
self.learner_key = 'main0' if not config['env_config'].get('name', False) == 'connect_four' else 'main'
# configs passed in through the custom_model_config
# cmc = config['multiagent']['policies'][config['env_config']["players_ids"][1]][3]
_config = config.get('model', {}).get('custom_model_config', {})
# _config = config
#
# Assume passed a customPPOConfig object that contains these extra parameters
# knn to calculate meta-reward against
self.k = _config.get("k", 5)
# whether to use the meta-reward or not
self.meta_reward = _config.get("meta_reward", True)
# hyperparameters for the self-play callback
# how often to snapshot the current policy
self.s = _config.get("s", 20)
# when applicable, above what win-rate threshold should we snapshot the policy
self.win_rate_threshold = _config.get("win_rate_threshold", 0.95)
# above what win-rate threshold should we switch to the meta-reward
self.mc_threshold = _config.get("mc_threshold", 0.8)
# threshold for when a current profile / policy should be saved into the respective buffers
self.novelty_threshold = _config.get("novelty_threshold", 0.5)
# if the novelty reward is greater than the threshold, then we save the current profile and policy
self.save_policy_trigger = False
# at the end so that the self-play callback has access to these attributes above
super().__init__(config, env, logger_creator)
# initialize a worker to compute rewards on demand
policy_config = {'model': {'custom_model': 'mlp'}}
# custom rollout worker: band-aid fix for rllib not
# letting me do e.g., algorithm.evaluate(p1, p2)
self.crw = CRW(self.env_creator,
config['env_config'],
policy_config)
class BufferElement:
"""
BufferElement is a class that represents a single element of the buffer.
It is used to compute the distance between the current reward and the
rewards in the full buffer.
Each dimension of the reward is represented by a single value that is the
reward of the agent that is playing in that dimension.
"""
def __init__(self, reward_dict, mask, neighbors=10):
assert isinstance(reward_dict, OrderedDict)
self.reward_dict = reward_dict
self.reward_keys = reward_dict.keys()
self.k = neighbors
self.mask = mask
# [r_1, ..., r_n]
self.values = np.array(list(self.reward_dict.values()))
# self.sample_mean = np.mean(self._values)
# self.values = self._values - self.sample_mean
def add(self, k, v):
self.reward_dict[k] = v
self.reward_keys = self.reward_dict.keys()
self.values = np.array(list(self.reward_dict.values()))
self.mask = np.append(self.mask, False)
def __sub__(self, other):
# assume is a list of BufferElement objects
if isinstance(other, OrderedDict) and len(other) >= 2:
buffer_values = np.array([o.values[self.mask] for o in other.values()])
total_reward, credited_rewards = self._assign_credit(buffer_values)
elif isinstance(other, OrderedDict) and len(other) == 1:
buffer_values = np.array([list(other.values())[0].values[self.mask]])
total_reward, credited_rewards = self._assign_credit(buffer_values)
else:
# if len(other) == 0:
total_reward = 1
credited_rewards = [total_reward for _ in range(len(self.reward_keys))]
return total_reward, credited_rewards
def _assign_credit(self, masked_profile_matrix):
# let's do some credit assignment!
profile_matrix = torch.tensor(masked_profile_matrix, requires_grad=True)
current_profile = torch.tensor(self.values[self.mask], requires_grad=True)
torch_mask = torch.tensor(self.mask, requires_grad=False)
difference = current_profile - profile_matrix
dists = torch.norm(difference, dim=1)
sorted_dists, sorted_indices = torch.sort(dists)
important_indices = sorted_indices[:self.k]
indicator = torch.zeros(profile_matrix.shape[0])
indicator[important_indices] = 1
total_reward = torch.sum(dists * indicator) / torch.sum(indicator)
normalized_total_reward = total_reward / torch_mask.sum()
# get the gradient with respect to the current profile vector
normalized_total_reward.backward()
# save total reward to python value
# divide by the number of agents involved in the novelty calculation
# this should normalize the score so it doesn't always go up?
total_reward_python = normalized_total_reward.item()
# push the grad through the softmax
current_profile_grad = current_profile.grad
# ??? should I split the meta-reward into the individual rewards?
# by the softmax of the gradient? Or just provide the gradient?
# version 0: just provide the meta-reward
# reward = [total_reward_python for _ in range(len(current_profile_grad))]
# version 1: split the meta-reward into the individual rewards
# by the softmax of the gradient
# current_profile_grad_01 = torch.softmax(current_profile_grad, dim=0)
# grad_reward = current_profile_grad_01 * total_reward
# # get the values back out of the tensor
# grad_reward = grad_reward.detach().numpy()
# version 2: just provide the gradient
# reward = current_profile_grad.detach().numpy()
# version 3: weight the total reward by the gradient
reward = total_reward_python * current_profile_grad.detach().numpy()
# version 4: weight the total reward by the abs marginal contribution of each agent
# get the nearest neighbor differences and
# compute the mean contribution of each dimension
# mean_contribution_per_snapshot = torch.mean(difference[sorted_indices][:self.k], dim=0)
# # split the scalar value reward into the individual contributions
# percent_contrib = (torch.abs(mean_contribution_per_snapshot) /
# torch.sum(torch.abs(mean_contribution_per_snapshot))).detach().numpy()
# reward = total_reward_python * percent_contrib
reward = self._add_in_zeros(reward)
return total_reward_python, reward
def _add_in_zeros(self, reward):
foo = []
r_index = 0
for m in self.mask:
if m:
foo.append(reward[r_index])
r_index += 1
else:
foo.append(0)
return np.array(foo)
def _update_batch_for_trainable_with_meta_reward(self, train_batch: "MultiAgentBatch"):
"""
This function updates the batch for the main0 policy with the meta reward.
The meta reward is the distance between the current reward and the rewards
in the buffer.
"""
policy_ids_in_batch = list(train_batch.policy_batches.keys())
snap_ids_in_batch = [pid for pid in policy_ids_in_batch if not pid == self.learner_key]
# generate reward profile vector for each snapshot agent
# and the main0 agent by running 10 games against each
# and then calculating the average reward for each agent
# low_variance_profile = OrderedDict()
main_agent_state = self.get_policy(self.learner_key).get_state()
# for n in list(np.arange(0, self.n_opponents + 1)):
# pid = 'random_r0' if n == 0 else f'main_v{n}'
# opponent_state = self.get_policy(pid).get_state()
# rewards = [self.crw.get_discounted_rewards(main_agent_state, opponent_state)['random_r0'] for _ in
# range(10)]
# low_variance_profile[pid] = np.mean(rewards)
main_batch = train_batch.policy_batches[self.learner_key]
snap_batches = {pid: train_batch.policy_batches[pid] for pid in snap_ids_in_batch}
# split batches by episode
episodic_main_batches = main_batch.split_by_episode()
episodic_snap_batches = {pid: snap_batches[pid].split_by_episode() for pid in snap_ids_in_batch}
# calculate average score of each snapshop policy using total reward
snap_scores = {pid: np.mean([ep_batch[ep_batch.REWARDS].sum() for ep_batch in episodic_snap_batches[pid]]) for pid in
snap_ids_in_batch}
# fill in missing values in the buffer sample
# this uses current main0
# if this wasn't in the training batch, mask it out of the meta_reward calculation
# but still fill the value in for use later!!
comparison_mask = []
for n in list(np.arange(0, self.n_opponents + 1)):
pid = 'random_r0' if n == 0 else f'main_v{n}'
if pid not in snap_scores:
opponent_pol = self.get_policy(pid)
if opponent_pol is not None:
opponent_state = opponent_pol.get_state()
else:
opponent_state = self.get_policy('random').get_state()
rewards = self.crw.get_discounted_rewards(opponent_state, main_agent_state)
try:
snap_scores[pid] = rewards['random_r0']
except KeyError:
# 0 = random
# 1 = main
snap_scores[pid] = rewards[0]
finally:
snap_scores[pid] = 0
comparison_mask.append(False)
else:
comparison_mask.append(True)
comparison_mask = np.array(comparison_mask)
# sort snapshot rewards by name e.g., main_v2
sorted_snap_scores = OrderedDict()
for k in sorted(snap_scores, key=lambda x: int(x.split('_')[1][1:])):
sorted_snap_scores[k] = snap_scores[k]
# calculate distance between snapshot policy rewards and buffer examples
snap_sample_reward = self.BufferElement(sorted_snap_scores, comparison_mask, neighbors=self.k)
total_reward, credited_rewards = snap_sample_reward - self.reward_buffer
# save the meta reward value for logging
self.meta_reward_value = total_reward
# add to buffer if total reward is greater than novelty threshold
if total_reward >= self.novelty_threshold:
self.reward_buffer[self.iteration] = snap_sample_reward
self.save_policy_trigger = True
main_policy = self.get_policy(self.learner_key)
# set rewards in main0 batch
# give each batch the percent of the total reward that the opponent contributed
ordered_keys = sorted_snap_scores.keys()
recombined_main_batch = SampleBatch()
for ep_batch in episodic_main_batches:
episode_id = ep_batch[ep_batch.EPS_ID][0]
# find which opponent was used in this episode
opponent_id = None
for pid in snap_ids_in_batch:
for ep in episodic_snap_batches[pid]:
if ep[ep.EPS_ID][0] == episode_id:
opponent_id = pid
break
if opponent_id is not None:
break
if opponent_id is None or opponent_id == self.learner_key:
# what the fuck, this shouldn't be possible
continue
reward_index = list(ordered_keys).index(opponent_id)
ep_batch[ep_batch.REWARDS] = np.zeros_like(ep_batch[ep_batch.REWARDS])
ep_batch[ep_batch.REWARDS][-1] = credited_rewards[reward_index]
# recalculate advantages using new rewards
# use the main policy since that's the batch we're updating
updated_batch = compute_gae_for_sample_batch(policy=main_policy,
sample_batch=ep_batch)
recombined_main_batch = recombined_main_batch.concat(updated_batch)
# place updated version back into the train_batch we were sent
train_batch.policy_batches[self.learner_key] = recombined_main_batch
return train_batch
@override(PPO)
def training_step(self) -> ResultDict:
# Collect SampleBatches from sample workers until we have a full batch.
if self.config.count_steps_by == "agent_steps":
train_batch = synchronous_parallel_sample(
worker_set=self.workers,
max_agent_steps=self.config.train_batch_size,
)
else:
train_batch = synchronous_parallel_sample(
worker_set=self.workers, max_env_steps=self.config.train_batch_size
)
train_batch = train_batch.as_multi_agent()
# update batch for main0 policy with meta reward
# from the other snapshot policies
# todo put in an if statement to only do this if:
# 2. we have added a snapshot policy to the self-play process
if self.passed_mc and self.meta_reward:
train_batch = self._update_batch_for_trainable_with_meta_reward(train_batch)
else:
self.meta_reward_value = 0
self._counters[NUM_AGENT_STEPS_SAMPLED] += train_batch.agent_steps()
self._counters[NUM_ENV_STEPS_SAMPLED] += train_batch.env_steps()
# Standardize advantages
train_batch = standardize_fields(train_batch, ["advantages"])
# Train
if self.config._enable_rl_trainer_api:
train_results = self.trainer_runner.update(train_batch)
elif self.config.simple_optimizer:
train_results = train_one_step(self, train_batch)
else:
train_results = multi_gpu_train_one_step(self, train_batch)
policies_to_update = list(train_results.keys())
global_vars = {
"timestep": self._counters[NUM_AGENT_STEPS_SAMPLED],
"num_grad_updates_per_policy": {
pid: self.workers.local_worker().policy_map[pid].num_grad_updates
for pid in policies_to_update
},
}
# Update weights - after learning on the local worker - on all remote
# workers.
if self.workers.num_remote_workers() > 0:
with self._timers[SYNCH_WORKER_WEIGHTS_TIMER]:
from_worker = None
if self.config._enable_rl_trainer_api:
from_worker = self.trainer_runner
self.workers.sync_weights(
from_worker=from_worker,
policies=list(train_results.keys()),
global_vars=global_vars,
)
if self.config._enable_rl_trainer_api:
kl_dict = {
pid: pinfo[LEARNER_STATS_KEY].get("kl")
for pid, pinfo in train_results.items()
}
# triggers a special update method on RLOptimizer to update the KL values.
self.trainer_runner.additional_update(kl_values=kl_dict)
return train_results
# For each policy: Update KL scale and warn about possible issues
for policy_id, policy_info in train_results.items():
# Update KL loss with dynamic scaling
# for each (possibly multiagent) policy we are training
kl_divergence = policy_info[LEARNER_STATS_KEY].get("kl")
self.get_policy(policy_id).update_kl(kl_divergence)
# Warn about excessively high value function loss
scaled_vf_loss = (
self.config.vf_loss_coeff * policy_info[LEARNER_STATS_KEY]["vf_loss"]
)
policy_loss = policy_info[LEARNER_STATS_KEY]["policy_loss"]
if (
log_once("ppo_warned_lr_ratio")
and self.config.get("model", {}).get("vf_share_layers")
and scaled_vf_loss > 100
):
logger.warning(
"The magnitude of your value function loss for policy: {} is "
"extremely large ({}) compared to the policy loss ({}). This "
"can prevent the policy from learning. Consider scaling down "
"the VF loss by reducing vf_loss_coeff, or disabling "
"vf_share_layers.".format(policy_id, scaled_vf_loss, policy_loss)
)
# Warn about bad clipping configs.
train_batch.policy_batches[policy_id].set_get_interceptor(None)
mean_reward = train_batch.policy_batches[policy_id]["rewards"].mean()
if (
log_once("ppo_warned_vf_clip")
and mean_reward > self.config.vf_clip_param
):
self.warned_vf_clip = True
logger.warning(
f"The mean reward returned from the environment is {mean_reward}"
f" but the vf_clip_param is set to {self.config['vf_clip_param']}."
f" Consider increasing it for policy: {policy_id} to improve"
" value function convergence."
)
# Update global vars on local worker as well.
self.workers.local_worker().set_global_vars(global_vars)
return train_results
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment