Skip to content

Instantly share code, notes, and snippets.

@ChuaCheowHuan
Last active July 31, 2020 10:50
Show Gist options
  • Save ChuaCheowHuan/13e2cf69e01c4cc3fa2e803a6afdda98 to your computer and use it in GitHub Desktop.
Save ChuaCheowHuan/13e2cf69e01c4cc3fa2e803a6afdda98 to your computer and use it in GitHub Desktop.
Change hyperparameters during runtime for MARL with ray[rllib]
# -*- coding: utf-8 -*-
"""hyp_chg_MARL.ipynb
Automatically generated by Colaboratory.
Original file is located at
"""
# Commented out IPython magic to ensure Python compatibility.
"""
from google.colab import drive
drive.mount('/content/gdrive')
# %cd "/content/gdrive/My Drive/Colab Notebooks/misc_code_examples/ray_colab_examples/rock_paper_scissors_multiagent/"
!mkdir chkpt
!pwd
!ls -l
!pip install tensorflow==2.2.0
!pip install ray[rllib]==0.8.6
"""
"""
Testing the changing of hyperparameters during runtime.
Learning rate of both agents are initialized to 0.
At the 50th iteration, the learning rate of agt_0 is set to 0.01 while the learning rate of agt_1 remains at 0.
This is the only time the learning rate is changed.
The results will be agt_0 consistently winning after the 50th iteration as it gradually learns.
"""
from collections import defaultdict
from typing import Dict
from gym.spaces import Discrete
import numpy as np
import argparse
import random
import ray
from ray.tune.registry import register_env
from ray.rllib.models import ModelCatalog
from ray.rllib.policy.policy import Policy
from ray.rllib.agents.ppo.ppo_tf_policy import PPOTFPolicy
from ray.rllib.agents.ppo import ppo
from ray.rllib.agents.ppo.ppo import PPOTrainer
from ray.rllib.env import BaseEnv
from ray.rllib.env.multi_agent_env import MultiAgentEnv
from ray.rllib.utils import try_import_tf
from ray.tune.logger import pretty_print
from ray.rllib.policy.sample_batch import SampleBatch
from ray.rllib.evaluation import MultiAgentEpisode, RolloutWorker
from ray.rllib.agents.callbacks import DefaultCallbacks
tf = try_import_tf()
ROCK = 0
PAPER = 1
SCISSORS = 2
parser = argparse.ArgumentParser()
parser.add_argument("--stop", type=int, default=400000)
class RockPaperScissorsEnv(MultiAgentEnv):
"""Two-player environment for rock paper scissors.
The observation is simply the last opponent action."""
def __init__(self, _):
self.action_space = Discrete(3)
self.observation_space = Discrete(3)
self.player1 = "player1"
self.player2 = "player2"
self.last_move = None
self.num_moves = 0
self.P1_eps_r = 0
self.P2_eps_r = 0
def reset(self):
#print("P1_eps_r =", self.P1_eps_r)
#print("P2_eps_r =", self.P2_eps_r)
self.P1_eps_r = 0
self.P2_eps_r = 0
self.last_move = (0, 0)
self.num_moves = 0
return {
self.player1: self.last_move[1],
self.player2: self.last_move[0],
}
def step(self, action_dict):
move1 = action_dict[self.player1]
move2 = action_dict[self.player2]
self.last_move = (move1, move2)
obs = {
self.player1: self.last_move[1],
self.player2: self.last_move[0],
}
r1, r2 = {
(ROCK, ROCK): (0.0, 0.0),
(ROCK, PAPER): (-1.0, 1.0),
(ROCK, SCISSORS): (1.0, -1.0),
(PAPER, ROCK): (1.0, -1.0),
(PAPER, PAPER): (0.0, 0.0),
(PAPER, SCISSORS): (-1.0, 1.0),
(SCISSORS, ROCK): (-1.0, 1.0),
(SCISSORS, PAPER): (1.0, -1.0),
(SCISSORS, SCISSORS): (0.0, 0.0),
}[move1, move2]
rew = {
self.player1: r1,
self.player2: r2,
}
self.num_moves += 1
done = {
"__all__": self.num_moves >= 10,
}
#print("rew =", rew)
self.P1_eps_r += rew[self.player1]
self.P2_eps_r += rew[self.player2]
return obs, rew, done, {}
@ray.remote(num_cpus=0.25, num_gpus=0)
class Helper:
def __init__(self, iter_chg, lr, gamma):
self.lr = lr
self.gamma = gamma
self.P1_policy_reward_mean = 0
self.P2_policy_reward_mean = 0
self.iter = 0
self.is_hpy_chg = False
self.iter_chg = iter_chg
def set_hyperparameters(self, lr, gamma):
if self.iter == self.iter_chg:
self.lr = lr
self.gamma = gamma
self.is_hpy_chg = True
print("set_hyperparameters")
def get_hyperparameters(self):
return self.lr, self.gamma
def set_policy_reward_mean(self, P1, P2):
self.P1_policy_reward_mean += P1
self.P2_policy_reward_mean += P2
def get_policy_reward_mean(self):
return self.P1_policy_reward_mean, self.P2_policy_reward_mean
def add_iter(self):
self.iter += 1
def set_is_hpy_chg(self, status):
self.is_hpy_chg = status
def get_is_hpy_chg(self):
return self.is_hpy_chg
"""#Callbacks"""
class MyCallbacks(DefaultCallbacks):
def on_episode_start(self, worker: RolloutWorker, base_env: BaseEnv,
policies: Dict[str, Policy],
episode: MultiAgentEpisode, **kwargs):
#print("on_episode_start {}, _agent_to_policy {}".format(episode.episode_id, episode._agent_to_policy))
#episode.hist_data["episode_id"] = []
pass
def on_episode_step(self, worker: RolloutWorker, base_env: BaseEnv,
episode: MultiAgentEpisode, **kwargs):
"""
pole_angle = abs(episode.last_observation_for()[2])
raw_angle = abs(episode.last_raw_obs_for()[2])
assert pole_angle == raw_angle
episode.user_data["pole_angles"].append(pole_angle)
"""
pass
def on_episode_end(self, worker: RolloutWorker, base_env: BaseEnv,
policies: Dict[str, Policy], episode: MultiAgentEpisode,
**kwargs):
#print("on_episode_end {}, episode.agent_rewards {}".format(episode.episode_id, episode.agent_rewards))
pass
def on_sample_end(self, worker: RolloutWorker, samples: SampleBatch,
**kwargs):
#print("on_sample_end returned sample batch of size {}".format(samples.count))
pass
def on_train_result(self, trainer, result: dict, **kwargs):
#print("trainer.train() result: {} -> {} episodes".format(trainer, result["episodes_this_iter"]))
# you can mutate the result dict to add new fields to return
#result["callback_ok"] = True
#print("on_train_result result", result)
# hard coded "new" hyperparamters:
lr = 0.01
gamma = 0.9
g_helper = ray.get_actor("g_helper")
ray.get(g_helper.set_hyperparameters.remote(lr, gamma))
#lr, gamma = ray.get(g_helper.get_hyperparameters.remote())
#print("lr= {}, gamma ={}".format(lr, gamma))
ray.get(g_helper.set_policy_reward_mean.remote(result["policy_reward_mean"]["agt_0"], result["policy_reward_mean"]["agt_1"]))
ray.get(g_helper.add_iter.remote())
#pass
def on_postprocess_trajectory(
self, worker: RolloutWorker, episode: MultiAgentEpisode,
agent_id: str, policy_id: str, policies: Dict[str, Policy],
postprocessed_batch: SampleBatch,
original_batches: Dict[str, SampleBatch], **kwargs):
#print("postprocessed {}, {}, {}, {} steps".format(episode, agent_id, policy_id, postprocessed_batch.count))
"""
if "num_batches" not in episode.custom_metrics:
episode.custom_metrics["num_batches"] = 0
episode.custom_metrics["num_batches"] += 1
"""
pass
train_policy_list = ["agt_0", "agt_1"]
use_lstm=True #False
lr = 0.0 #1e-30
gamma = 0.9
policies = {"agt_0": (PPOTFPolicy, Discrete(3), Discrete(3), {"model": {"use_lstm": use_lstm},
"lr": lr,
"gamma": gamma}),
"agt_1": (PPOTFPolicy, Discrete(3), Discrete(3), {"model": {"use_lstm": use_lstm},
"lr": lr,
"gamma": gamma})
}
def select_policy(agent_id):
if agent_id == "player1":
return "agt_0"
else:
return "agt_1"
def my_config():
config = ppo.DEFAULT_CONFIG.copy()
config["multiagent"] = {"policies_to_train": train_policy_list,
"policies": policies,
"policy_mapping_fn": select_policy,
}
config["num_cpus_per_worker"] = 0.25
#config["num_gpus_per_worker"] = 0.25
config["num_workers"] = 2
config["num_envs_per_worker"] = 2
config["batch_mode"] = "truncate_episodes" # "complete_episodes" or "truncate_episodes"
config["rollout_fragment_length"] = 10 # let's sample 10 steps per episode which is the same as batch_mode="complete_episodes"
config["train_batch_size"] = 10 # Training batch size, if applicable. Should be >= rollout_fragment_length.
# Samples batches will be concatenated together to a batch of this size,
# which is then passed to SGD.
# If batch_mode is "complete_episodes",
config["sgd_minibatch_size"] = 10 # default=128, sgd_minibatch_size, must be <= train_batch_size.
config["num_sgd_iter"] = 3 # default=30, number of epochs to execute per train batch.
config["log_level"] = "WARN" # WARN/INFO/DEBUG
config["callbacks"] = MyCallbacks
return config
def test_hyperparam_chg(trainer):
"""
new_policies = {"agt_0": (PPOTFPolicy, Discrete(3), Discrete(3), {"model": {"use_lstm": use_lstm},
"lr": 0.00001,
"gamma": 0.9}),
"agt_1": (PPOTFPolicy, Discrete(3), Discrete(3), {"model": {"use_lstm": use_lstm},
"lr": 0.1,
"gamma": 0.9})
}
"""
g_helper = ray.get_actor("g_helper")
lr, gamma = ray.get(g_helper.get_hyperparameters.remote())
#print("lr= {}, gamma ={}".format(lr, gamma))
is_hpy_chg = ray.get(g_helper.get_is_hpy_chg.remote())
if is_hpy_chg == True:
curr_config = trainer.get_config()
print("curr_config", curr_config)
print("config['multiagent']['policies']", curr_config["multiagent"]["policies"])
new_config = curr_config
#new_config["multiagent"]["policies"] = new_policies
tar_ind = 3
new_config["multiagent"]["policies"]["agt_0"][tar_ind]["lr"] = lr
new_config["multiagent"]["policies"]["agt_0"][tar_ind]["gamma"] = gamma
player1_pol = trainer.get_policy("agt_0")
print("Before")
print("player1_pol.config['lr']", player1_pol.config["lr"])
print("player1_pol.config['gamma']", player1_pol.config["gamma"])
local_dir = "/content/gdrive/My Drive/Colab Notebooks/misc_code_examples/ray_colab_examples/rock_paper_scissors_multiagent/chkpt/"
save_path = trainer.save(local_dir)
trainer.stop()
trainer = ppo.PPOTrainer(config=new_config, env="RockPaperScissorsEnv")
trainer.restore(save_path)
player1_pol = trainer.get_policy("agt_0")
print("After")
print("player1_pol.config['lr']", player1_pol.config["lr"])
print("player1_pol.config['gamma']", player1_pol.config["gamma"])
ray.get(g_helper.set_is_hpy_chg.remote(False))
return trainer
def go_train():
g_helper = ray.get_actor("g_helper")
trainer = ppo.PPOTrainer(config=my_config(), env="RockPaperScissorsEnv")
for i in range(300):
# Perform one iteration of training the policy with PPO
result = trainer.train()
if i % 30 == 0:
print(pretty_print(result))
P1_policy_reward_mean, P2_policy_reward_mean = ray.get(g_helper.get_policy_reward_mean.remote())
print("TOTAL: P1_policy_reward_mean ={}, P2_policy_reward_mean ={}".format(P1_policy_reward_mean, P2_policy_reward_mean))
trainer = test_hyperparam_chg(trainer)
print(pretty_print(result))
P1_policy_reward_mean, P2_policy_reward_mean = ray.get(g_helper.get_policy_reward_mean.remote())
print("TOTAL: P1_policy_reward_mean ={}, P2_policy_reward_mean ={}".format(P1_policy_reward_mean, P2_policy_reward_mean))
register_env("RockPaperScissorsEnv", lambda _: RockPaperScissorsEnv(_))
ray.shutdown()
ray.init(ignore_reinit_error=True, log_to_driver=True, webui_host='127.0.0.1', num_cpus=2, num_gpus=0) #start ray
iter_chg = 49
g_helper = Helper.options(name="g_helper").remote(iter_chg, lr, gamma) # this object runs on a different ray actor process
ray.get(g_helper.set_hyperparameters.remote(lr, gamma))
#lr, gamma = ray.get(g_helper.get_hyperparameters.remote())
#print("lr= {}, gamma ={}".format(lr, gamma))
go_train()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment