Skip to content

Instantly share code, notes, and snippets.

@jfpettit
Created March 12, 2021 18:10
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jfpettit/afdeef30f767b3c2e3b84fbd89cf3507 to your computer and use it in GitHub Desktop.
Save jfpettit/afdeef30f767b3c2e3b84fbd89cf3507 to your computer and use it in GitHub Desktop.
Hacky script to optimize tiny RL policies using Optuna Hyperparameter optimizers.
import numpy as np
import gym
import pybullet_envs
import optuna
import click
from rich.console import Console
import time
console = Console()
def sigmoid(x):
return 1 / (1 + np.exp(-x))
def relu(x):
x[x < 0] = 0
return x
def linear(x):
return x
class Policy:
def __init__(self):
pass
def __call__(self):
pass
class DensePolicy(Policy):
def __init__(self, kernel=None, env=None, activation=linear):
assert isinstance(env.action_space, gym.spaces.Box), "DenseGaussianPolicy requires Box action spaces."
self.kernel = kernel
self.env = env
self.action_dim = env.action_space.shape[0]
self.activation = activation
def __call__(self, state):
out = np.dot(self.kernel, state)
act = self.activation(out)
return act
def load(self, params, env):
params_k = np.array([v for k, v in params.items() if "kernel" in k]).reshape((env.action_space.shape[0], env.observation_space.shape[0]))
self.__init__(params_k, env)
class DenseGaussianPolicy(Policy):
def __init__(self, kernel=None, vars_=None, env=None, activation=linear):
assert isinstance(env.action_space, gym.spaces.Box), "DenseGaussianPolicy requires Box action spaces."
self.kernel = kernel
self.vs = vars_
self.env = env
self.action_dim = env.action_space.shape[0]
self.activation = activation
def __call__(self, state):
mus = np.dot(self.kernel, state)
mus = self.activation(mus)
act = mus + np.random.randn(self.action_dim) * self.vs
return act
def load(self, params, env):
params_k = np.array([v for k, v in params.items() if "kernel" in k]).reshape((env.action_space.shape[0], env.observation_space.shape[0]))
params_v = np.array([v for k, v in params.items() if "vars" in k])
self.__init__(params_k, params_v, env)
class SigmoidDensePolicy(Policy):
def __init__(self, kernel=None, env=None, activation=sigmoid):
self.pol = DensePolicy(kernel, env, activation=activation)
def __call__(self, state):
return self.pol(state)
def load(self, params, env):
self.pol.load(params, env)
class SigmoidDenseGaussianPolicy(Policy):
def __init__(self, kernel=None, vars_=None, env=None, activation=sigmoid):
self.pol = DenseGaussianPolicy(kernel, vars_=vars_, env=env, activation=activation)
def __call__(self, state):
return self.pol(state)
def load(self, params, env):
self.pol.load(params, env)
def optimize_gdense_policy(trial, env_name, n_episodes=5, T=10000, min_param=-3., max_param=3., var_max=3.):
env = gym.make(env_name)
kernel = np.zeros((env.action_space.shape[0], env.observation_space.shape[0]))
vs = np.zeros(env.action_space.shape[0])
ctr = 0
for i in range(env.action_space.shape[0]):
vs[i] = trial.suggest_float("vars"+str(i), 0, var_max)
for j in range(env.observation_space.shape[0]):
kernel[i,j] = trial.suggest_float("kernel"+str(ctr), min_param, max_param)
ctr += 1
policy = DenseGaussianPolicy(kernel, vs, env)
result = run_policy(policy, env_name, n_episodes=n_episodes, T=T)
return result
def optimize_dense_policy(trial, env_name, n_episodes=5, T=10000, min_param=-3., max_param=3., var_max=3.):
env = gym.make(env_name)
kernel = np.zeros((env.action_space.shape[0], env.observation_space.shape[0]))
ctr = 0
for i in range(env.action_space.shape[0]):
for j in range(env.observation_space.shape[0]):
kernel[i,j] = trial.suggest_float("kernel"+str(ctr), min_param, max_param)
ctr += 1
policy = DensePolicy(kernel, env)
result = run_policy(policy, env_name, n_episodes=n_episodes, T=T)
return result
def optimize_sdense_policy(trial, env_name, n_episodes=5, T=10000, min_param=-3., max_param=3., var_max=3.):
env = gym.make(env_name)
kernel = np.zeros((env.action_space.shape[0], env.observation_space.shape[0]))
ctr = 0
for i in range(env.action_space.shape[0]):
for j in range(env.observation_space.shape[0]):
kernel[i,j] = trial.suggest_float("kernel"+str(ctr), min_param, max_param)
ctr += 1
policy = SigmoidDensePolicy(kernel, env)
result = run_policy(policy, env_name, n_episodes=n_episodes, T=T)
return result
def optimize_sgdense_policy(trial, env_name, n_episodes=5, T=10000, min_param=-3., max_param=3., var_max=3.):
env = gym.make(env_name)
kernel = np.zeros((env.action_space.shape[0], env.observation_space.shape[0]))
vs = np.zeros(env.action_space.shape[0])
ctr = 0
for i in range(env.action_space.shape[0]):
vs[i] = trial.suggest_float("vars"+str(i), 0, var_max)
for j in range(env.observation_space.shape[0]):
kernel[i,j] = trial.suggest_float("kernel"+str(ctr), min_param, max_param)
ctr += 1
policy = SigmoidDenseGaussianPolicy(kernel, vs, env)
result = run_policy(policy, env_name, n_episodes=n_episodes, T=T)
return result
def run_policy(policy, env_name, n_episodes=5, T=1000):
env = gym.make(env_name)
Rs = []
lens = []
for n in range(n_episodes):
obs = env.reset()
R = 0
l = 0
for t in range(T):
action = policy(obs)
obs, rew, done, infos = env.step(action)
R += rew
l += 1
if done:
Rs.append(R)
lens.append(l)
console.log(f"Episode: {n}\tReturn: {R}\tLength: {l}\n")
R = 0
l = 0
break
console.log(f"Mean Return this step: {np.mean(Rs)}\tStd Return this step: {np.std(Rs)}\tMean episode length this step: {np.mean(lens)}\tStd episode length this step: {np.std(lens)}")
return np.mean(Rs)
def video_rollout(policy, params, env_name, n_episodes, horizon, save_dir):
env = gym.make(env_name)
env = gym.wrappers.Monitor(env, directory=save_dir, force=True)
policy = policy(env=env)
policy.load(params, env)
Rs = []
lens = []
for n in range(n_episodes):
obs = env.reset()
R = 0
l = 0
for t in range(horizon):
action = policy(obs)
obs, rew, done, infos = env.step(action)
R += rew
l += 1
if done:
Rs.append(R)
lens.append(l)
console.log(f"Episode: {n}\tReturn: {R}\tLength: {l}\n")
R = 0
l = 0
break
return
@click.command()
@click.option("--env-name", "-env", type=str, default="InvertedPendulumBulletEnv-v0")
@click.option("--n-trials", "-trials", type=int, default=100)
@click.option("--n-episodes", "-neps", type=int, default=5)
@click.option("--horizon", "-t", type=int, default=1000)
@click.option("--search-sampler", "-search", type=str, default="cma")
@click.option("--policy-type", "-policy", type=str, default="gkernel")
@click.option("--save-params", "-save", type=bool, default=False)
@click.option("--play-best", "-play", type=bool, default=True)
def train(env_name, n_trials, n_episodes, horizon, search_sampler, policy_type, save_params, play_best):
allowed_samplers = ("cma", "tpe", "random")
allowed_policies = ("gdense", "dense", "sdense", "sgdense")
assert search_sampler in allowed_samplers, f"{search_sampler} not supported. Pick one of {allowed_samplers}"
assert policy_type in allowed_policies, f"{policy_type} not supported. Pick one of {allowed_policies}"
if search_sampler == "cma":
sampler = optuna.samplers.CmaEsSampler()
elif search_sampler == "tpe":
sampler = optuna.samplers.TPESampler()
elif search_sampler == "random":
sampler = optuna.samplers.RandomSampler()
else:
sampler = optuna.samplers.CmaEsSampler()
if policy_type == "gdense":
pol_fcn = optimize_gdense_policy
policy = DenseGaussianPolicy
elif policy_type == "dense":
pol_fcn = optimize_dense_policy
policy = DensePolicy
elif policy_type == "sgdense":
pol_fcn = optimize_sdense_policy
policy = SigmoidDenseGaussianPolicy
elif policy_type == "sdense":
pol_fcn = optimize_sdense_policy
policy = SigmoidDensePolicy
else:
raise ValueError(f"Picked unsupported policy! Available options are {allowed_policies}")
study = optuna.create_study(direction="maximize", sampler=sampler)
study.optimize(lambda trial: pol_fcn(trial, env_name, n_episodes=n_episodes, T=horizon), n_trials=n_trials)
best_params = study.best_params
if save_params:
import pickle as pkl
import os
path = f"params/{env_name}/{policy_type}/{int(time.time())}"
os.makedirs(path, exist_ok=True)
with open(path+"params.pkl", "wb") as f:
pkl.dump(best_params, f)
if play_best:
path = f"videos/{env_name}/{policy_type}/{int(time.time())}/"
video_rollout(policy, best_params, env_name, n_episodes, horizon, path)
if __name__ == "__main__":
train()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment