-
-
Save aadharna/f4d5d579d0ac76c9742bffafa11751b4 to your computer and use it in GitHub Desktop.
memory leak in self-play ppo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Example showing how one can implement a simple self-play training workflow. | |
Uses the open spiel adapter of RLlib with the "connect_four" game and | |
a multi-agent setup with a "main" policy and n "main_v[x]" policies | |
(x=version number), which are all at-some-point-frozen copies of | |
"main". At the very beginning, "main" plays against RandomPolicy. | |
Checks for the training progress after each training update via a custom | |
callback. We simply measure the win rate of "main" vs the opponent | |
("main_v[x]" or RandomPolicy at the beginning) by looking through the | |
achieved rewards in the episodes in the train batch. If this win rate | |
reaches some configurable threshold, we add a new policy to | |
the policy map (a frozen copy of the current "main" one) and change the | |
policy_mapping_fn to make new matches of "main" vs any of the previous | |
versions of "main" (including the just added one). | |
After training for n iterations, a configurable number of episodes can | |
be played by the user against the "main" agent on the command line. | |
""" | |
import os | |
import argparse | |
import numpy as np | |
import gymnasium | |
try: | |
import pyspiel | |
from open_spiel.python.rl_environment import Environment | |
from ray.rllib.env.wrappers.open_spiel import OpenSpielEnv | |
except ImportError: | |
raise Exception("Please install open_spiel python package.") | |
import sys | |
from collections import OrderedDict | |
import ray | |
from ray import air, tune | |
from ray.rllib.algorithms.callbacks import DefaultCallbacks, MultiCallbacks | |
from ray.rllib.algorithms.ppo import PPOConfig | |
from ray.rllib.examples.policy.random_policy import RandomPolicy | |
from ray.rllib.policy.policy import PolicySpec | |
from ray.tune import CLIReporter, register_env | |
from ray.rllib.models import ModelCatalog | |
from ray.rllib.env.vector_env import VectorEnvWrapper | |
# from ray.rllib.algorithms.bandit.bandit import BanditTrainer | |
from ray.air.callbacks.wandb import WandbLoggerCallback | |
from mlp import MLP | |
from callbacks import ActionFreqCallback | |
from novelty_reward_ppo import NoveltyRewardPPO, NoveltyRewardPPOConfig | |
from cache_reward_ppo import CustomPPO, CustomPPOConfig | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"--framework", | |
choices=["tf", "tf2", "torch"], | |
default="torch", | |
help="The DL framework specifier.", | |
) | |
parser.add_argument("--num-cpus", type=int, default=0) | |
parser.add_argument("--num-workers", type=int, default=4) | |
parser.add_argument( | |
"--from-checkpoint", | |
type=str, | |
default=None, | |
help="Full path to a checkpoint file for restoring a previously saved " | |
"Algorithm state.", | |
) | |
parser.add_argument( | |
"--env", type=str, default="connect_four", choices=["markov_soccer", "connect_four"] | |
) | |
parser.add_argument( | |
"--iters", type=int, default=10000, help="Number of iterations to train." | |
) | |
parser.add_argument( | |
"--stop-timesteps", type=int, default=10000000, help="Number of timesteps to train." | |
) | |
parser.add_argument( | |
"--win-rate-threshold", | |
type=float, | |
default=0.95, | |
help="Win-rate at which we setup another opponent by freezing the " | |
"current main policy and playing against a uniform distribution " | |
"of previously frozen 'main's from here on.", | |
) | |
parser.add_argument( | |
"--num-episodes-human-play", | |
type=int, | |
default=2, | |
help="How many episodes to play against the user on the command " | |
"line after training has finished.", | |
) | |
parser.add_argument("--mc_threshold", type=float, default=0.5, | |
help='minimal criterion for switching between learning and optimizing rank') | |
parser.add_argument("--s", type=int, default=1000, help='snapshot the main agent every s episodes') | |
parser.add_argument("--k", type=int, default=5, help='number of nearest neighbors to use in meta reward function') | |
parser.add_argument("--meta_reward", type=bool, default=False, help='whether to use meta reward function') | |
parser.add_argument("--advantage_version", type=bool, default=False, help="using novelty or double advantage as meta reward") | |
parser.add_argument("--novelty_threshold", type=float, default=0.5, help="novelty threshold for novelty reward") | |
args = parser.parse_args() | |
args.meta_reward = False | |
if args.advantage_version: | |
algo_class = CustomPPO | |
config_class = CustomPPOConfig | |
else: | |
algo_class = NoveltyRewardPPO | |
config_class = NoveltyRewardPPOConfig | |
class DSOpenSpielEnv(OpenSpielEnv): | |
def __init__(self, spiel_env, env_config): | |
super().__init__(spiel_env) | |
self._skip_env_checking = True | |
self.env_config = env_config | |
self.dominant_strategy = env_config.get('dominant_strategy', [1, 2]) | |
self.dominant_active_for_agent = 0 | |
self.action_memory = [] | |
# should already exist because of super class, but just in case | |
self.observation_space = gymnasium.spaces.Box( | |
float("-inf"), float("inf"), (self.env.observation_tensor_size(),) | |
) | |
self.action_space = gymnasium.spaces.Discrete(self.env.num_distinct_actions()) | |
def set_dominant_active_for_agent(self, agent_id): | |
self.dominant_active_for_agent = agent_id | |
def reset(self, *, seed=None, options=None): | |
obs, _ = super().reset(seed=seed, options=options) | |
self.dominant_strategy_index = 0 | |
self.action_memory = [] | |
return obs, _ | |
def step(self, action): | |
# call super step | |
check_dom = False | |
curr_player = self.state.current_player() | |
if curr_player == self.dominant_active_for_agent: | |
self.action_memory.append(action[curr_player]) | |
if len(self.action_memory) > len(self.dominant_strategy): | |
_ = self.action_memory.pop(0) | |
check_dom = True | |
obs, rewards, term, truc, info = super().step(action) | |
# check to see if the dominant strategy has been triggered by the 'main' agent | |
if check_dom: | |
if len(self.action_memory) == len(self.dominant_strategy) and all([self.action_memory[i] == self.dominant_strategy[i] | |
for i in range(len(self.action_memory))]): | |
rewards = {ag: r for ag, r in enumerate(self.state.returns())} | |
if self.dominant_active_for_agent == 0: | |
rewards[0] = 1 | |
rewards[1] = -1 | |
else: | |
rewards[0] = -1 | |
rewards[1] = 1 | |
term = {a: True for a in [0, 1, "__all__"]} | |
truc = {a: True for a in [0, 1, "__all__"]} | |
obs = {} | |
return obs, rewards, term, truc, info | |
def ask_user_for_action(time_step): | |
"""Asks the user for a valid action on the command line and returns it. | |
Re-queries the user until she picks a valid one. | |
Args: | |
time_step: The open spiel Environment time-step object. | |
""" | |
pid = time_step.observations["current_player"] | |
legal_moves = time_step.observations["legal_actions"][pid] | |
choice = -1 | |
while choice not in legal_moves: | |
print("Choose an action from {}:".format(legal_moves)) | |
sys.stdout.flush() | |
choice_str = input() | |
try: | |
choice = int(choice_str) | |
except ValueError: | |
continue | |
return choice | |
class SelfPlayCallback(DefaultCallbacks): | |
def __init__(self): | |
super().__init__() | |
# 0=RandomPolicy, 1=1st main policy snapshot, | |
# 2=2nd main policy snapshot, etc.. | |
self.current_opponent = 0 | |
self.opponent_episode_map = {} | |
self.main_states_by_iter = OrderedDict() | |
self.init_eval = None | |
self.mc_threshold = args.mc_threshold | |
self.s = args.s | |
self.win_rate_threshold = args.win_rate_threshold | |
def on_algorithm_init(self, *, algorithm, **kwargs) -> None: | |
self.mc_threshold = getattr(algorithm, 'mc_threshold', self.mc_threshold) | |
self.s = getattr(algorithm, 's', self.s) | |
self.win_rate_threshold = getattr(algorithm, 'win_rate_threshold', self.win_rate_threshold) | |
def on_train_result(self, *, algorithm, result, **kwargs): | |
# Get the win rate for the train batch. | |
# Note that normally, one should set up a proper evaluation config, | |
# such that evaluation always happens on the already updated policy, | |
# instead of on the already used train_batch. | |
try: | |
main_rew = result["hist_stats"].pop("policy_main_reward") | |
except KeyError: | |
return | |
opponent_rew = list(result["hist_stats"].values())[0] | |
assert len(main_rew) == len(opponent_rew) | |
won = 0 | |
for r_main, r_opponent in zip(main_rew, opponent_rew): | |
if r_main > r_opponent: | |
won += 1 | |
win_rate = won / len(main_rew) | |
result["win_rate"] = win_rate | |
result['batch_meta_reward'] = algorithm.meta_reward_value | |
used_mc = getattr(algorithm, 'passed_mc', False) | |
save_policy_trigger = getattr(algorithm, 'save_policy_trigger', False) | |
if used_mc: | |
# last step we added to the buffer. Save the state of the main policy | |
# so that later we can do correct evals | |
# rllib has already increased the iteration counter | |
# so to get it to match, we need to decrease it by 1 | |
self.main_states_by_iter[algorithm.iteration - 1] = algorithm.get_policy('main').get_state() | |
print(f"Iter={algorithm.iteration} win-rate={win_rate} -> ", end="") | |
# If win rate is good -> Snapshot current policy and play against | |
# it next, keeping the snapshot fixed and only improving the "main" | |
# policy. | |
#if win_rate > args.win_rate_threshold: | |
if algorithm.iteration % self.s == 0 or algorithm.iteration in [25, 50]: | |
setattr(algorithm, 'save_policy_trigger', False) | |
self.current_opponent += 1 | |
new_pol_id = f"main_v{self.current_opponent}" | |
print(f"adding new opponent to the mix ({new_pol_id}).") | |
# tell the algorithm that we are adding a new opponent, | |
# so we need to recompute the baseline to account for the new opponent | |
setattr(algorithm, 'new_opponent', True) | |
# Re-define the mapping function, such that "main" is forced | |
# to play against any of the previously played policies | |
# (excluding "random"). | |
def policy_mapping_fn(agent_id, episode, worker, **kwargs): | |
# agent_id = [0|1] -> policy depends on episode ID | |
# This way, we make sure that both policies sometimes play | |
# (start player) and sometimes agent1 (player to move 2nd). | |
return ( | |
"main" | |
if episode.episode_id % 2 == agent_id | |
else "main_v{}".format( | |
np.random.choice(list(range(1, self.current_opponent + 1))) | |
) | |
) | |
new_policy = algorithm.add_policy( | |
policy_id=new_pol_id, | |
policy_cls=type(algorithm.get_policy("main")), | |
policy_mapping_fn=policy_mapping_fn, | |
) | |
# Set the weights of the new policy to the main policy. | |
# We'll keep training the main policy, whereas `new_pol_id` will | |
# remain fixed. | |
main_state = algorithm.get_policy("main").get_state() | |
new_policy.set_state(main_state) | |
# We need to sync the just copied local weights (from main policy) | |
# to all the remote workers as well. | |
algorithm.workers.sync_weights() | |
setattr(algorithm, 'n_opponents', self.current_opponent) | |
# first condition is always true (in default settings) | |
# second condition is true if we have enough opponents | |
if win_rate >= self.mc_threshold and self.current_opponent > 1: | |
setattr(algorithm, 'passed_mc', True) | |
# evaluate the stuff in the archive against itself to fill in the missing values | |
# in the reward-archive so that comparisons can be fair | |
# If we just added a new policy, then this should be the same as adding a column to the | |
# induced reward matrix | |
frozen_main_state = algorithm.get_policy("main").get_state() | |
custom_rollout_runner = getattr(algorithm, 'crw', None) | |
buffer = getattr(algorithm, 'reward_buffer', None) | |
if custom_rollout_runner is not None: | |
for (buffer_ep_index, profile_vector), (previous_main_used_in_profile_ep_index, opponent_state) in \ | |
zip(buffer.items(), self.main_states_by_iter.items()): | |
rewards = custom_rollout_runner.get_discounted_rewards(frozen_main_state, | |
opponent_state, | |
discount=1.0) | |
profile_vector.add(new_pol_id, rewards[0]) | |
else: | |
if self.mc_threshold <= win_rate and self.current_opponent > 1: | |
# if we are above the MC win-rate, we can switch to the meta-reward in the PPOTrainer | |
print("good enough to switch to meta-reward ...") | |
setattr(algorithm, 'passed_mc', True) | |
else: | |
print("not good enough; will keep learning ...") | |
setattr(algorithm, 'passed_mc', False) | |
# +2 = main + random | |
result["league_size"] = self.current_opponent + 2 | |
result['mc_threshold'] = getattr(algorithm, 'mc_threshold', args.mc_threshold) | |
class SetDSCallback(DefaultCallbacks): | |
def __init__(self): | |
super().__init__() | |
def on_episode_start(self, *, worker, base_env, policies, episode, env_index, **kwargs): | |
envs = self._get_envs(base_env) | |
# need to run the policy mapping fn callback to set the dominant agent | |
policy_id = episode.policy_mapping_fn(0, episode, episode.worker) | |
if policy_id == "main": | |
dominant_agent = 0 | |
else: | |
dominant_agent = 1 | |
envs[env_index].set_dominant_active_for_agent(dominant_agent) | |
def _get_envs(self, base_env): | |
if isinstance(base_env, VectorEnvWrapper): | |
return base_env.vector_env.get_sub_environments() | |
else: | |
return base_env.envs | |
if __name__ == "__main__": | |
ray.init(num_cpus=20, num_gpus=0, | |
include_dashboard=False) | |
algo_moniker = "novelty" if not args.advantage_version else "advantage_v_current" | |
game_name = f"{args.env}dom_nov_marching{args.s}_mc{args.mc_threshold}" | |
env_config = { | |
# although random is also tagged with a zero, it will get removed when we update the | |
# policy mapping function therefore this doesn't break anything and the original | |
# policy mapping function can be used at the start which will select random | |
"name": args.env, | |
"spiel": True, | |
"players_ids": ["random_r0", "main0"], | |
"dominant_strategy": [1, 2] | |
} | |
def env_creator(env_config): | |
return DSOpenSpielEnv(pyspiel.load_game(env_config['name']), env_config) | |
register_env(game_name, env_creator) | |
ModelCatalog.register_custom_model('mlp', MLP) | |
def policy_mapping_fn(agent_id, episode, worker, **kwargs): | |
# agent_id = [0|1] -> policy depends on episode ID | |
# This way, we make sure that both policies sometimes play agent0 | |
# (start player) and sometimes agent1 (player to move 2nd). | |
choice = "main" if episode.episode_id % 2 == agent_id else "random" | |
return choice | |
config = ( | |
config_class() | |
.environment(env=game_name, | |
env_config=env_config) | |
.framework(args.framework) | |
.callbacks(MultiCallbacks([SelfPlayCallback, SetDSCallback])) | |
.rollouts(num_envs_per_worker=5, num_rollout_workers=args.num_workers, | |
create_env_on_local_worker=True) | |
.training(num_sgd_iter=20, | |
model={'custom_model': 'mlp'} | |
) | |
.multi_agent( | |
# Initial policy map: Random and PPO. This will be expanded | |
# to more policy snapshots taken from "main" against which "main" | |
# will then play (instead of "random"). This is done in the | |
# custom callback defined above (`SelfPlayCallback`). | |
policies={ | |
# Our main policy, we'd like to optimize. | |
"main": PolicySpec(), | |
# An initial random opponent to play against. | |
"random": PolicySpec(policy_class=RandomPolicy), | |
}, | |
policy_map_capacity=41, | |
# Assign agent 0 and 1 randomly to the "main" policy or | |
# to the opponent ("random" at first). Make sure (via episode_id) | |
# that "main" always plays against "random" (and not against | |
# another "main"). | |
policy_mapping_fn=policy_mapping_fn, | |
# Always just train the "main" policy. | |
policies_to_train=["main"], | |
) | |
# Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0. | |
.resources(num_gpus=0) | |
) | |
alg_config = config.to_dict() | |
# add hparams to custom_model_config | |
alg_config['model']['custom_model_config'] = { | |
'k': args.k, | |
's': args.s, | |
'mc_threshold': args.mc_threshold, | |
'win_rate_threshold': args.win_rate_threshold, | |
'meta_reward': args.meta_reward, | |
'novelty_threshold': args.novelty_threshold, | |
} | |
stop = { | |
"training_iteration": args.iters, | |
} | |
# Train the "main" policy to play really well using self-play. | |
results = None | |
if not args.from_checkpoint: | |
# tuner = tune.Tuner.restore("/mnt/d/PycharmProjects/ubc/temp/results/sp_c4", resume_errored=True) | |
# results = tuner.fit() | |
results = tune.Tuner( | |
algo_class, | |
param_space=alg_config, | |
run_config=air.RunConfig( | |
name='sp_c4', | |
stop=stop, | |
verbose=0, | |
local_dir=os.path.join('../..', 'results'), | |
progress_reporter=CLIReporter( | |
metric_columns={ | |
"training_iteration": "iter", | |
"time_total_s": "time_total_s", | |
"timesteps_total": "ts", | |
"episodes_this_iter": "train_episodes", | |
"policy_reward_mean/main": "reward", | |
"win_rate": "win_rate", | |
"league_size": "league_size", | |
}, | |
sort_by_metric=True, | |
), | |
checkpoint_config=air.CheckpointConfig( | |
checkpoint_at_end=True, | |
checkpoint_frequency=args.iters // 12, | |
), | |
callbacks=[ | |
WandbLoggerCallback(project="mapo", entity="aadharna", | |
# save_checkpoints=True, | |
api_key="key") | |
] | |
), | |
).fit() | |
from pprint import pprint | |
# print("best found hyperparameters: ") | |
pprint(results.get_best_result().config) | |
ray.shutdown() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Example showing how one can implement a simple self-play training workflow. | |
Uses the open spiel adapter of RLlib with the "connect_four" game and | |
a multi-agent setup with a "main" policy and n "main_v[x]" policies | |
(x=version number), which are all at-some-point-frozen copies of | |
"main". At the very beginning, "main" plays against RandomPolicy. | |
Checks for the training progress after each training update via a custom | |
callback. We simply measure the win rate of "main" vs the opponent | |
("main_v[x]" or RandomPolicy at the beginning) by looking through the | |
achieved rewards in the episodes in the train batch. If this win rate | |
reaches some configurable threshold, we add a new policy to | |
the policy map (a frozen copy of the current "main" one) and change the | |
policy_mapping_fn to make new matches of "main" vs any of the previous | |
versions of "main" (including the just added one). | |
After training for n iterations, a configurable number of episodes can | |
be played by the user against the "main" agent on the command line. | |
""" | |
import argparse | |
import numpy as np | |
import os | |
try: | |
import pyspiel | |
from open_spiel.python.rl_environment import Environment | |
from ray.rllib.env.wrappers.open_spiel import OpenSpielEnv | |
except ImportError: | |
raise Exception("Please install open_spiel python package.") | |
import sys | |
from collections import OrderedDict | |
import ray | |
from ray import air, tune | |
from ray.rllib.algorithms.callbacks import DefaultCallbacks | |
from ray.rllib.algorithms.ppo import PPOConfig | |
from ray.rllib.examples.policy.random_policy import RandomPolicy | |
from ray.rllib.policy.policy import PolicySpec | |
from ray.tune import CLIReporter, register_env | |
from ray.rllib.models import ModelCatalog | |
# from ray.rllib.algorithms.bandit.bandit import BanditTrainer | |
from ray.air.callbacks.wandb import WandbLoggerCallback | |
from mlp import MLP | |
from callbacks import ActionFreqCallback | |
from novelty_reward_ppo import NoveltyRewardPPO, NoveltyRewardPPOConfig | |
from cache_reward_ppo import CustomPPO, CustomPPOConfig | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"--framework", | |
choices=["tf", "tf2", "torch"], | |
default="torch", | |
help="The DL framework specifier.", | |
) | |
parser.add_argument("--num-cpus", type=int, default=0) | |
parser.add_argument("--num-workers", type=int, default=4) | |
parser.add_argument( | |
"--from-checkpoint", | |
type=str, | |
default=None, | |
help="Full path to a checkpoint file for restoring a previously saved " | |
"Algorithm state.", | |
) | |
parser.add_argument( | |
"--env", type=str, default="connect_four", choices=["markov_soccer", "connect_four"] | |
) | |
parser.add_argument( | |
"--iters", type=int, default=1000, help="Number of iterations to train." | |
) | |
parser.add_argument( | |
"--stop-timesteps", type=int, default=10000000, help="Number of timesteps to train." | |
) | |
parser.add_argument( | |
"--win-rate-threshold", | |
type=float, | |
default=0.95, | |
help="Win-rate at which we setup another opponent by freezing the " | |
"current main policy and playing against a uniform distribution " | |
"of previously frozen 'main's from here on.", | |
) | |
parser.add_argument( | |
"--num-episodes-human-play", | |
type=int, | |
default=2, | |
help="How many episodes to play against the user on the command " | |
"line after training has finished.", | |
) | |
parser.add_argument("--mc_threshold", type=float, default=0.85, | |
help='minimal criterion for switching between learning and optimizing rank') | |
parser.add_argument("--s", type=int, default=25, help='snapshot the main agent every s episodes') | |
parser.add_argument("--k", type=int, default=5, help='number of nearest neighbors to use in meta reward function') | |
parser.add_argument("--meta_reward", type=bool, default=True, help='whether to use meta reward function') | |
parser.add_argument("--advantage_version", type=bool, default=False, help="using novelty or double advantage as meta reward") | |
parser.add_argument("--novelty_threshold", type=float, default=0.5, help="novelty threshold for novelty reward") | |
args = parser.parse_args() | |
if args.advantage_version: | |
algo_class = CustomPPO | |
config_class = CustomPPOConfig | |
else: | |
algo_class = NoveltyRewardPPO | |
config_class = NoveltyRewardPPOConfig | |
def ask_user_for_action(time_step): | |
"""Asks the user for a valid action on the command line and returns it. | |
Re-queries the user until she picks a valid one. | |
Args: | |
time_step: The open spiel Environment time-step object. | |
""" | |
pid = time_step.observations["current_player"] | |
legal_moves = time_step.observations["legal_actions"][pid] | |
choice = -1 | |
while choice not in legal_moves: | |
print("Choose an action from {}:".format(legal_moves)) | |
sys.stdout.flush() | |
choice_str = input() | |
try: | |
choice = int(choice_str) | |
except ValueError: | |
continue | |
return choice | |
class SelfPlayCallback(DefaultCallbacks): | |
def __init__(self): | |
super().__init__() | |
# 0=RandomPolicy, 1=1st main policy snapshot, | |
# 2=2nd main policy snapshot, etc.. | |
self.current_opponent = 0 | |
self.opponent_episode_map = {} | |
self.main_states_by_iter = OrderedDict() | |
self.init_eval = None | |
self.mc_threshold = args.mc_threshold | |
self.s = args.s | |
self.win_rate_threshold = args.win_rate_threshold | |
def on_algorithm_init(self, *, algorithm, **kwargs) -> None: | |
self.mc_threshold = getattr(algorithm, 'mc_threshold', self.mc_threshold) | |
self.s = getattr(algorithm, 's', self.s) | |
self.win_rate_threshold = getattr(algorithm, 'win_rate_threshold', self.win_rate_threshold) | |
def on_train_result(self, *, algorithm, result, **kwargs): | |
# Get the win rate for the train batch. | |
# Note that normally, one should set up a proper evaluation config, | |
# such that evaluation always happens on the already updated policy, | |
# instead of on the already used train_batch. | |
try: | |
main_rew = result["hist_stats"].pop("policy_main_reward") | |
except KeyError: | |
return | |
opponent_rew = list(result["hist_stats"].values())[0] | |
assert len(main_rew) == len(opponent_rew) | |
won = 0 | |
for r_main, r_opponent in zip(main_rew, opponent_rew): | |
if r_main > r_opponent: | |
won += 1 | |
win_rate = won / len(main_rew) | |
result["win_rate"] = win_rate | |
result['batch_meta_reward'] = algorithm.meta_reward_value | |
used_mc = getattr(algorithm, 'passed_mc', False) | |
save_policy_trigger = getattr(algorithm, 'save_policy_trigger', False) | |
if used_mc: | |
# last step we added to the buffer. Save the state of the main policy | |
# so that later we can do correct evals | |
# rllib has already increased the iteration counter | |
# so to get it to match, we need to decrease it by 1 | |
self.main_states_by_iter[algorithm.iteration - 1] = algorithm.get_policy('main').get_state() | |
print(f"Iter={algorithm.iteration} win-rate={win_rate} -> ", end="") | |
# If win rate is good -> Snapshot current policy and play against | |
# it next, keeping the snapshot fixed and only improving the "main" | |
# policy. | |
#if win_rate > args.win_rate_threshold: | |
if algorithm.iteration % self.s == 0 or save_policy_trigger: | |
setattr(algorithm, 'save_policy_trigger', False) | |
self.current_opponent += 1 | |
new_pol_id = f"main_v{self.current_opponent}" | |
print(f"adding new opponent to the mix ({new_pol_id}).") | |
# tell the algorithm that we are adding a new opponent, | |
# so we need to recompute the baseline to account for the new opponent | |
setattr(algorithm, 'new_opponent', True) | |
# Re-define the mapping function, such that "main" is forced | |
# to play against any of the previously played policies | |
# (excluding "random"). | |
def policy_mapping_fn(agent_id, episode, worker, **kwargs): | |
# agent_id = [0|1] -> policy depends on episode ID | |
# This way, we make sure that both policies sometimes play | |
# (start player) and sometimes agent1 (player to move 2nd). | |
return ( | |
"main" | |
if episode.episode_id % 2 == agent_id | |
else "main_v{}".format( | |
np.random.choice(list(range(1, self.current_opponent + 1))) | |
) | |
) | |
new_policy = algorithm.add_policy( | |
policy_id=new_pol_id, | |
policy_cls=type(algorithm.get_policy("main")), | |
policy_mapping_fn=policy_mapping_fn, | |
) | |
# Set the weights of the new policy to the main policy. | |
# We'll keep training the main policy, whereas `new_pol_id` will | |
# remain fixed. | |
main_state = algorithm.get_policy("main").get_state() | |
new_policy.set_state(main_state) | |
# We need to sync the just copied local weights (from main policy) | |
# to all the remote workers as well. | |
algorithm.workers.sync_weights() | |
setattr(algorithm, 'n_opponents', self.current_opponent) | |
# first condition is always true (in default settings) | |
# second condition is true if we have enough opponents | |
if win_rate >= self.mc_threshold and self.current_opponent > 1: | |
setattr(algorithm, 'passed_mc', True) | |
# evaluate the stuff in the archive against itself to fill in the missing values | |
# in the reward-archive so that comparisons can be fair | |
# If we just added a new policy, then this should be the same as adding a column to the | |
# induced reward matrix | |
frozen_main_state = algorithm.get_policy("main").get_state() | |
custom_rollout_runner = getattr(algorithm, 'crw', None) | |
buffer = getattr(algorithm, 'reward_buffer', None) | |
if custom_rollout_runner is not None: | |
for (buffer_ep_index, profile_vector), (previous_main_used_in_profile_ep_index, opponent_state) in \ | |
zip(buffer.items(), self.main_states_by_iter.items()): | |
rewards = custom_rollout_runner.get_discounted_rewards(frozen_main_state, | |
opponent_state, | |
discount=1.0) | |
profile_vector.add(new_pol_id, rewards[0]) | |
else: | |
if self.mc_threshold <= win_rate and self.current_opponent > 1: | |
# if we are above the MC win-rate, we can switch to the meta-reward in the PPOTrainer | |
print("good enough to switch to meta-reward ...") | |
setattr(algorithm, 'passed_mc', True) | |
else: | |
print("not good enough; will keep learning ...") | |
setattr(algorithm, 'passed_mc', False) | |
# +2 = main + random | |
result["league_size"] = self.current_opponent + 2 | |
result['mc_threshold'] = getattr(algorithm, 'mc_threshold', args.mc_threshold) | |
if __name__ == "__main__": | |
ray.init(num_cpus=10, num_gpus=0, | |
include_dashboard=False) | |
algo_moniker = "novelty" if not args.advantage_version else "advantage_v_current" | |
game_name = f"{args.env}_{algo_moniker}_novgrad_marching{args.s}_mc{args.mc_threshold}" | |
env_config = { | |
# although random is also tagged with a zero, it will get removed when we update the | |
# policy mapping function therefore this doesn't break anything and the original | |
# policy mapping function can be used at the start which will select random | |
"name": args.env, | |
"spiel": True, | |
"players_ids": ["random_r0", "main0"], | |
} | |
def env_creator(env_config): | |
return OpenSpielEnv(pyspiel.load_game(env_config['name'])) | |
register_env(game_name, env_creator) | |
ModelCatalog.register_custom_model('mlp', MLP) | |
def policy_mapping_fn(agent_id, episode, worker, **kwargs): | |
# agent_id = [0|1] -> policy depends on episode ID | |
# This way, we make sure that both policies sometimes play agent0 | |
# (start player) and sometimes agent1 (player to move 2nd). | |
choice = "main" if episode.episode_id % 2 == agent_id else "random" | |
return choice | |
config = ( | |
config_class() | |
.environment(env=game_name, | |
env_config=env_config) | |
.framework(args.framework) | |
.callbacks(SelfPlayCallback) | |
.rollouts(num_envs_per_worker=5, num_rollout_workers=args.num_workers, | |
create_env_on_local_worker=True) | |
.training(num_sgd_iter=20, | |
model={'custom_model': 'mlp'} | |
) | |
.multi_agent( | |
# Initial policy map: Random and PPO. This will be expanded | |
# to more policy snapshots taken from "main" against which "main" | |
# will then play (instead of "random"). This is done in the | |
# custom callback defined above (`SelfPlayCallback`). | |
policies={ | |
# Our main policy, we'd like to optimize. | |
"main": PolicySpec(), | |
# An initial random opponent to play against. | |
"random": PolicySpec(policy_class=RandomPolicy), | |
}, | |
policy_map_capacity=41, | |
# Assign agent 0 and 1 randomly to the "main" policy or | |
# to the opponent ("random" at first). Make sure (via episode_id) | |
# that "main" always plays against "random" (and not against | |
# another "main"). | |
policy_mapping_fn=policy_mapping_fn, | |
# Always just train the "main" policy. | |
policies_to_train=["main"], | |
) | |
# Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0. | |
.resources(num_gpus=0) | |
) | |
alg_config = config.to_dict() | |
# add hparams to custom_model_config | |
alg_config['model']['custom_model_config'] = { | |
'k': args.k, | |
's': args.s, | |
'mc_threshold': args.mc_threshold, | |
'win_rate_threshold': args.win_rate_threshold, | |
'meta_reward': args.meta_reward, | |
'novelty_threshold': args.novelty_threshold, | |
} | |
stop = { | |
"training_iteration": args.iters, | |
} | |
# Train the "main" policy to play really well using self-play. | |
results = None | |
if not args.from_checkpoint: | |
results = tune.Tuner( | |
algo_class, | |
param_space=alg_config, | |
run_config=air.RunConfig( | |
name='sp_c4', | |
stop=stop, | |
verbose=1, | |
local_dir=os.path.join('../..', 'results'), | |
progress_reporter=CLIReporter( | |
metric_columns={ | |
"training_iteration": "iter", | |
"time_total_s": "time_total_s", | |
"timesteps_total": "ts", | |
"episodes_this_iter": "train_episodes", | |
"policy_reward_mean/main": "reward", | |
"win_rate": "win_rate", | |
"league_size": "league_size", | |
}, | |
sort_by_metric=True, | |
), | |
checkpoint_config=air.CheckpointConfig( | |
checkpoint_at_end=True, | |
checkpoint_frequency=10, | |
), | |
callbacks=[ | |
WandbLoggerCallback(project="mapo", entity="aadharna", | |
# save_checkpoints=True, | |
api_key="key") | |
] | |
), | |
).fit() | |
from pprint import pprint | |
# print("best found hyperparameters: ") | |
pprint(results.get_best_result().config) | |
ray.shutdown() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import numpy as np | |
from collections import OrderedDict | |
from typing import List, Optional, Type, Union | |
import math | |
import time | |
from ray.util.debug import log_once | |
from ray.rllib.policy.sample_batch import SampleBatch, MultiAgentBatch | |
from ray.rllib.evaluation.postprocessing import compute_gae_for_sample_batch | |
from ray.rllib.algorithms.algorithm import Algorithm | |
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig | |
from ray.rllib.execution.rollout_ops import ( | |
standardize_fields, | |
) | |
from ray.rllib.execution.train_ops import ( | |
train_one_step, | |
multi_gpu_train_one_step, | |
) | |
from ray.rllib.utils.annotations import ExperimentalAPI | |
from ray.rllib.policy.policy import Policy | |
from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID | |
from ray.rllib.utils.annotations import override | |
from ray.rllib.utils.deprecation import ( | |
Deprecated, | |
DEPRECATED_VALUE, | |
deprecation_warning, | |
) | |
from ray.rllib.utils.metrics.learner_info import LEARNER_INFO, LEARNER_STATS_KEY | |
from ray.rllib.utils.typing import AlgorithmConfigDict, ResultDict | |
from ray.rllib.execution.rollout_ops import synchronous_parallel_sample | |
from ray.rllib.utils.metrics import ( | |
NUM_AGENT_STEPS_SAMPLED, | |
NUM_ENV_STEPS_SAMPLED, | |
SYNCH_WORKER_WEIGHTS_TIMER, | |
) | |
from ray.rllib.algorithms.ppo.ppo import PPO, PPOConfig | |
from ray.rllib.utils.annotations import override | |
from rollout_wrapper import CRW | |
import torch | |
logger = logging.getLogger(__name__) | |
class NoveltyRewardPPOConfig(PPOConfig): | |
def __init__(self, algo_class=None): | |
super().__init__(algo_class=algo_class) | |
# knn to calculate meta-reward against | |
self.k = 5 | |
# whether to use the meta-reward or not | |
self.meta_reward = True | |
# hyperparameters for the self-play callback | |
# how often to snapshot the current policy | |
self.s = 20 | |
# when applicable, above what win-rate threshold should we snapshot the policy | |
self.win_rate_threshold = 0.95 | |
# above what win-rate threshold should we switch to the meta-reward | |
self.mc_threshold = 0.8 | |
# threshold for when a current profile / policy should be saved into the respective buffers | |
self.novelty_threshold = 0.5 | |
def to_dict(self): | |
config = super().to_dict() | |
config["k"] = self.k | |
config["meta_reward"] = self.meta_reward | |
config["s"] = self.s | |
config["win_rate_threshold"] = self.win_rate_threshold | |
config["mc_threshold"] = self.mc_threshold | |
config['novelty_threshold'] = self.novelty_threshold | |
return config | |
def hparams(self, *, | |
knn: Optional[int] = 5, | |
meta_reward: Optional[bool] = True, | |
snapshot_timer: Optional[int] = 25, | |
novelty_threshold: Optional[float] = 0.5, | |
win_rate_threshold: Optional[float] = 0.95, | |
mc_threshold: Optional[float] = 0.8) -> "PPOConfig": | |
"""Returns a copy of this config with the given hyperparameters""" | |
self.k = knn | |
self.meta_reward = meta_reward | |
self.s = snapshot_timer | |
self.win_rate_threshold = win_rate_threshold | |
self.mc_threshold = mc_threshold | |
self.novelty_threshold = novelty_threshold | |
return self | |
def update_from_dict(self, config_dict) -> "AlgorithmConfig": | |
for k in ["k", "meta_reward", "s", "win_rate_threshold", "mc_threshold", 'novelty_threshold']: | |
if k in config_dict: | |
setattr(self, k, config_dict.pop(k)) | |
return super().update_from_dict(config_dict) | |
class NoveltyRewardPPO(PPO): | |
@classmethod | |
@override(PPO) | |
def get_default_config(cls) -> AlgorithmConfig: | |
return NoveltyRewardPPOConfig() | |
def __init__(self, config=None, env=None, logger_creator=None): | |
# todo make this a real replay buffer object to take | |
# advantage of things like prioritized sampling to determine | |
# which samples to use for meta reward-shaping / learning | |
self.reward_buffer = OrderedDict() | |
self.seen_snapshot_ids = set() | |
self.batch_order = [] | |
self.passed_mc = False | |
self.meta_reward_value = 0 | |
self.n_opponents = 0 | |
self.learner_key = 'main0' if not config['env_config'].get('name', False) == 'connect_four' else 'main' | |
# configs passed in through the custom_model_config | |
# cmc = config['multiagent']['policies'][config['env_config']["players_ids"][1]][3] | |
_config = config.get('model', {}).get('custom_model_config', {}) | |
# _config = config | |
# | |
# Assume passed a customPPOConfig object that contains these extra parameters | |
# knn to calculate meta-reward against | |
self.k = _config.get("k", 5) | |
# whether to use the meta-reward or not | |
self.meta_reward = _config.get("meta_reward", True) | |
# hyperparameters for the self-play callback | |
# how often to snapshot the current policy | |
self.s = _config.get("s", 20) | |
# when applicable, above what win-rate threshold should we snapshot the policy | |
self.win_rate_threshold = _config.get("win_rate_threshold", 0.95) | |
# above what win-rate threshold should we switch to the meta-reward | |
self.mc_threshold = _config.get("mc_threshold", 0.8) | |
# threshold for when a current profile / policy should be saved into the respective buffers | |
self.novelty_threshold = _config.get("novelty_threshold", 0.5) | |
# if the novelty reward is greater than the threshold, then we save the current profile and policy | |
self.save_policy_trigger = False | |
# at the end so that the self-play callback has access to these attributes above | |
super().__init__(config, env, logger_creator) | |
# initialize a worker to compute rewards on demand | |
policy_config = {'model': {'custom_model': 'mlp'}} | |
# custom rollout worker: band-aid fix for rllib not | |
# letting me do e.g., algorithm.evaluate(p1, p2) | |
self.crw = CRW(self.env_creator, | |
config['env_config'], | |
policy_config) | |
class BufferElement: | |
""" | |
BufferElement is a class that represents a single element of the buffer. | |
It is used to compute the distance between the current reward and the | |
rewards in the full buffer. | |
Each dimension of the reward is represented by a single value that is the | |
reward of the agent that is playing in that dimension. | |
""" | |
def __init__(self, reward_dict, mask, neighbors=10): | |
assert isinstance(reward_dict, OrderedDict) | |
self.reward_dict = reward_dict | |
self.reward_keys = reward_dict.keys() | |
self.k = neighbors | |
self.mask = mask | |
# [r_1, ..., r_n] | |
self.values = np.array(list(self.reward_dict.values())) | |
# self.sample_mean = np.mean(self._values) | |
# self.values = self._values - self.sample_mean | |
def add(self, k, v): | |
self.reward_dict[k] = v | |
self.reward_keys = self.reward_dict.keys() | |
self.values = np.array(list(self.reward_dict.values())) | |
self.mask = np.append(self.mask, False) | |
def __sub__(self, other): | |
# assume is a list of BufferElement objects | |
if isinstance(other, OrderedDict) and len(other) >= 2: | |
buffer_values = np.array([o.values[self.mask] for o in other.values()]) | |
total_reward, credited_rewards = self._assign_credit(buffer_values) | |
elif isinstance(other, OrderedDict) and len(other) == 1: | |
buffer_values = np.array([list(other.values())[0].values[self.mask]]) | |
total_reward, credited_rewards = self._assign_credit(buffer_values) | |
else: | |
# if len(other) == 0: | |
total_reward = 1 | |
credited_rewards = [total_reward for _ in range(len(self.reward_keys))] | |
return total_reward, credited_rewards | |
def _assign_credit(self, masked_profile_matrix): | |
# let's do some credit assignment! | |
profile_matrix = torch.tensor(masked_profile_matrix, requires_grad=True) | |
current_profile = torch.tensor(self.values[self.mask], requires_grad=True) | |
torch_mask = torch.tensor(self.mask, requires_grad=False) | |
difference = current_profile - profile_matrix | |
dists = torch.norm(difference, dim=1) | |
sorted_dists, sorted_indices = torch.sort(dists) | |
important_indices = sorted_indices[:self.k] | |
indicator = torch.zeros(profile_matrix.shape[0]) | |
indicator[important_indices] = 1 | |
total_reward = torch.sum(dists * indicator) / torch.sum(indicator) | |
normalized_total_reward = total_reward / torch_mask.sum() | |
# get the gradient with respect to the current profile vector | |
normalized_total_reward.backward() | |
# save total reward to python value | |
# divide by the number of agents involved in the novelty calculation | |
# this should normalize the score so it doesn't always go up? | |
total_reward_python = normalized_total_reward.item() | |
# push the grad through the softmax | |
current_profile_grad = current_profile.grad | |
# ??? should I split the meta-reward into the individual rewards? | |
# by the softmax of the gradient? Or just provide the gradient? | |
# version 0: just provide the meta-reward | |
# reward = [total_reward_python for _ in range(len(current_profile_grad))] | |
# version 1: split the meta-reward into the individual rewards | |
# by the softmax of the gradient | |
# current_profile_grad_01 = torch.softmax(current_profile_grad, dim=0) | |
# grad_reward = current_profile_grad_01 * total_reward | |
# # get the values back out of the tensor | |
# grad_reward = grad_reward.detach().numpy() | |
# version 2: just provide the gradient | |
# reward = current_profile_grad.detach().numpy() | |
# version 3: weight the total reward by the gradient | |
reward = total_reward_python * current_profile_grad.detach().numpy() | |
# version 4: weight the total reward by the abs marginal contribution of each agent | |
# get the nearest neighbor differences and | |
# compute the mean contribution of each dimension | |
# mean_contribution_per_snapshot = torch.mean(difference[sorted_indices][:self.k], dim=0) | |
# # split the scalar value reward into the individual contributions | |
# percent_contrib = (torch.abs(mean_contribution_per_snapshot) / | |
# torch.sum(torch.abs(mean_contribution_per_snapshot))).detach().numpy() | |
# reward = total_reward_python * percent_contrib | |
reward = self._add_in_zeros(reward) | |
return total_reward_python, reward | |
def _add_in_zeros(self, reward): | |
foo = [] | |
r_index = 0 | |
for m in self.mask: | |
if m: | |
foo.append(reward[r_index]) | |
r_index += 1 | |
else: | |
foo.append(0) | |
return np.array(foo) | |
def _update_batch_for_trainable_with_meta_reward(self, train_batch: "MultiAgentBatch"): | |
""" | |
This function updates the batch for the main0 policy with the meta reward. | |
The meta reward is the distance between the current reward and the rewards | |
in the buffer. | |
""" | |
policy_ids_in_batch = list(train_batch.policy_batches.keys()) | |
snap_ids_in_batch = [pid for pid in policy_ids_in_batch if not pid == self.learner_key] | |
# generate reward profile vector for each snapshot agent | |
# and the main0 agent by running 10 games against each | |
# and then calculating the average reward for each agent | |
# low_variance_profile = OrderedDict() | |
main_agent_state = self.get_policy(self.learner_key).get_state() | |
# for n in list(np.arange(0, self.n_opponents + 1)): | |
# pid = 'random_r0' if n == 0 else f'main_v{n}' | |
# opponent_state = self.get_policy(pid).get_state() | |
# rewards = [self.crw.get_discounted_rewards(main_agent_state, opponent_state)['random_r0'] for _ in | |
# range(10)] | |
# low_variance_profile[pid] = np.mean(rewards) | |
main_batch = train_batch.policy_batches[self.learner_key] | |
snap_batches = {pid: train_batch.policy_batches[pid] for pid in snap_ids_in_batch} | |
# split batches by episode | |
episodic_main_batches = main_batch.split_by_episode() | |
episodic_snap_batches = {pid: snap_batches[pid].split_by_episode() for pid in snap_ids_in_batch} | |
# calculate average score of each snapshop policy using total reward | |
snap_scores = {pid: np.mean([ep_batch[ep_batch.REWARDS].sum() for ep_batch in episodic_snap_batches[pid]]) for pid in | |
snap_ids_in_batch} | |
# fill in missing values in the buffer sample | |
# this uses current main0 | |
# if this wasn't in the training batch, mask it out of the meta_reward calculation | |
# but still fill the value in for use later!! | |
comparison_mask = [] | |
for n in list(np.arange(0, self.n_opponents + 1)): | |
pid = 'random_r0' if n == 0 else f'main_v{n}' | |
if pid not in snap_scores: | |
opponent_pol = self.get_policy(pid) | |
if opponent_pol is not None: | |
opponent_state = opponent_pol.get_state() | |
else: | |
opponent_state = self.get_policy('random').get_state() | |
rewards = self.crw.get_discounted_rewards(opponent_state, main_agent_state) | |
try: | |
snap_scores[pid] = rewards['random_r0'] | |
except KeyError: | |
# 0 = random | |
# 1 = main | |
snap_scores[pid] = rewards[0] | |
finally: | |
snap_scores[pid] = 0 | |
comparison_mask.append(False) | |
else: | |
comparison_mask.append(True) | |
comparison_mask = np.array(comparison_mask) | |
# sort snapshot rewards by name e.g., main_v2 | |
sorted_snap_scores = OrderedDict() | |
for k in sorted(snap_scores, key=lambda x: int(x.split('_')[1][1:])): | |
sorted_snap_scores[k] = snap_scores[k] | |
# calculate distance between snapshot policy rewards and buffer examples | |
snap_sample_reward = self.BufferElement(sorted_snap_scores, comparison_mask, neighbors=self.k) | |
total_reward, credited_rewards = snap_sample_reward - self.reward_buffer | |
# save the meta reward value for logging | |
self.meta_reward_value = total_reward | |
# add to buffer if total reward is greater than novelty threshold | |
if total_reward >= self.novelty_threshold: | |
self.reward_buffer[self.iteration] = snap_sample_reward | |
self.save_policy_trigger = True | |
main_policy = self.get_policy(self.learner_key) | |
# set rewards in main0 batch | |
# give each batch the percent of the total reward that the opponent contributed | |
ordered_keys = sorted_snap_scores.keys() | |
recombined_main_batch = SampleBatch() | |
for ep_batch in episodic_main_batches: | |
episode_id = ep_batch[ep_batch.EPS_ID][0] | |
# find which opponent was used in this episode | |
opponent_id = None | |
for pid in snap_ids_in_batch: | |
for ep in episodic_snap_batches[pid]: | |
if ep[ep.EPS_ID][0] == episode_id: | |
opponent_id = pid | |
break | |
if opponent_id is not None: | |
break | |
if opponent_id is None or opponent_id == self.learner_key: | |
# what the fuck, this shouldn't be possible | |
continue | |
reward_index = list(ordered_keys).index(opponent_id) | |
ep_batch[ep_batch.REWARDS] = np.zeros_like(ep_batch[ep_batch.REWARDS]) | |
ep_batch[ep_batch.REWARDS][-1] = credited_rewards[reward_index] | |
# recalculate advantages using new rewards | |
# use the main policy since that's the batch we're updating | |
updated_batch = compute_gae_for_sample_batch(policy=main_policy, | |
sample_batch=ep_batch) | |
recombined_main_batch = recombined_main_batch.concat(updated_batch) | |
# place updated version back into the train_batch we were sent | |
train_batch.policy_batches[self.learner_key] = recombined_main_batch | |
return train_batch | |
@override(PPO) | |
def training_step(self) -> ResultDict: | |
# Collect SampleBatches from sample workers until we have a full batch. | |
if self.config.count_steps_by == "agent_steps": | |
train_batch = synchronous_parallel_sample( | |
worker_set=self.workers, | |
max_agent_steps=self.config.train_batch_size, | |
) | |
else: | |
train_batch = synchronous_parallel_sample( | |
worker_set=self.workers, max_env_steps=self.config.train_batch_size | |
) | |
train_batch = train_batch.as_multi_agent() | |
# update batch for main0 policy with meta reward | |
# from the other snapshot policies | |
# todo put in an if statement to only do this if: | |
# 2. we have added a snapshot policy to the self-play process | |
if self.passed_mc and self.meta_reward: | |
train_batch = self._update_batch_for_trainable_with_meta_reward(train_batch) | |
else: | |
self.meta_reward_value = 0 | |
self._counters[NUM_AGENT_STEPS_SAMPLED] += train_batch.agent_steps() | |
self._counters[NUM_ENV_STEPS_SAMPLED] += train_batch.env_steps() | |
# Standardize advantages | |
train_batch = standardize_fields(train_batch, ["advantages"]) | |
# Train | |
if self.config._enable_rl_trainer_api: | |
train_results = self.trainer_runner.update(train_batch) | |
elif self.config.simple_optimizer: | |
train_results = train_one_step(self, train_batch) | |
else: | |
train_results = multi_gpu_train_one_step(self, train_batch) | |
policies_to_update = list(train_results.keys()) | |
global_vars = { | |
"timestep": self._counters[NUM_AGENT_STEPS_SAMPLED], | |
"num_grad_updates_per_policy": { | |
pid: self.workers.local_worker().policy_map[pid].num_grad_updates | |
for pid in policies_to_update | |
}, | |
} | |
# Update weights - after learning on the local worker - on all remote | |
# workers. | |
if self.workers.num_remote_workers() > 0: | |
with self._timers[SYNCH_WORKER_WEIGHTS_TIMER]: | |
from_worker = None | |
if self.config._enable_rl_trainer_api: | |
from_worker = self.trainer_runner | |
self.workers.sync_weights( | |
from_worker=from_worker, | |
policies=list(train_results.keys()), | |
global_vars=global_vars, | |
) | |
if self.config._enable_rl_trainer_api: | |
kl_dict = { | |
pid: pinfo[LEARNER_STATS_KEY].get("kl") | |
for pid, pinfo in train_results.items() | |
} | |
# triggers a special update method on RLOptimizer to update the KL values. | |
self.trainer_runner.additional_update(kl_values=kl_dict) | |
return train_results | |
# For each policy: Update KL scale and warn about possible issues | |
for policy_id, policy_info in train_results.items(): | |
# Update KL loss with dynamic scaling | |
# for each (possibly multiagent) policy we are training | |
kl_divergence = policy_info[LEARNER_STATS_KEY].get("kl") | |
self.get_policy(policy_id).update_kl(kl_divergence) | |
# Warn about excessively high value function loss | |
scaled_vf_loss = ( | |
self.config.vf_loss_coeff * policy_info[LEARNER_STATS_KEY]["vf_loss"] | |
) | |
policy_loss = policy_info[LEARNER_STATS_KEY]["policy_loss"] | |
if ( | |
log_once("ppo_warned_lr_ratio") | |
and self.config.get("model", {}).get("vf_share_layers") | |
and scaled_vf_loss > 100 | |
): | |
logger.warning( | |
"The magnitude of your value function loss for policy: {} is " | |
"extremely large ({}) compared to the policy loss ({}). This " | |
"can prevent the policy from learning. Consider scaling down " | |
"the VF loss by reducing vf_loss_coeff, or disabling " | |
"vf_share_layers.".format(policy_id, scaled_vf_loss, policy_loss) | |
) | |
# Warn about bad clipping configs. | |
train_batch.policy_batches[policy_id].set_get_interceptor(None) | |
mean_reward = train_batch.policy_batches[policy_id]["rewards"].mean() | |
if ( | |
log_once("ppo_warned_vf_clip") | |
and mean_reward > self.config.vf_clip_param | |
): | |
self.warned_vf_clip = True | |
logger.warning( | |
f"The mean reward returned from the environment is {mean_reward}" | |
f" but the vf_clip_param is set to {self.config['vf_clip_param']}." | |
f" Consider increasing it for policy: {policy_id} to improve" | |
" value function convergence." | |
) | |
# Update global vars on local worker as well. | |
self.workers.local_worker().set_global_vars(global_vars) | |
return train_results |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment