Created
March 12, 2021 18:10
-
-
Save jfpettit/afdeef30f767b3c2e3b84fbd89cf3507 to your computer and use it in GitHub Desktop.
Hacky script to optimize tiny RL policies using Optuna Hyperparameter optimizers.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import gym | |
import pybullet_envs | |
import optuna | |
import click | |
from rich.console import Console | |
import time | |
console = Console() | |
def sigmoid(x): | |
return 1 / (1 + np.exp(-x)) | |
def relu(x): | |
x[x < 0] = 0 | |
return x | |
def linear(x): | |
return x | |
class Policy: | |
def __init__(self): | |
pass | |
def __call__(self): | |
pass | |
class DensePolicy(Policy): | |
def __init__(self, kernel=None, env=None, activation=linear): | |
assert isinstance(env.action_space, gym.spaces.Box), "DenseGaussianPolicy requires Box action spaces." | |
self.kernel = kernel | |
self.env = env | |
self.action_dim = env.action_space.shape[0] | |
self.activation = activation | |
def __call__(self, state): | |
out = np.dot(self.kernel, state) | |
act = self.activation(out) | |
return act | |
def load(self, params, env): | |
params_k = np.array([v for k, v in params.items() if "kernel" in k]).reshape((env.action_space.shape[0], env.observation_space.shape[0])) | |
self.__init__(params_k, env) | |
class DenseGaussianPolicy(Policy): | |
def __init__(self, kernel=None, vars_=None, env=None, activation=linear): | |
assert isinstance(env.action_space, gym.spaces.Box), "DenseGaussianPolicy requires Box action spaces." | |
self.kernel = kernel | |
self.vs = vars_ | |
self.env = env | |
self.action_dim = env.action_space.shape[0] | |
self.activation = activation | |
def __call__(self, state): | |
mus = np.dot(self.kernel, state) | |
mus = self.activation(mus) | |
act = mus + np.random.randn(self.action_dim) * self.vs | |
return act | |
def load(self, params, env): | |
params_k = np.array([v for k, v in params.items() if "kernel" in k]).reshape((env.action_space.shape[0], env.observation_space.shape[0])) | |
params_v = np.array([v for k, v in params.items() if "vars" in k]) | |
self.__init__(params_k, params_v, env) | |
class SigmoidDensePolicy(Policy): | |
def __init__(self, kernel=None, env=None, activation=sigmoid): | |
self.pol = DensePolicy(kernel, env, activation=activation) | |
def __call__(self, state): | |
return self.pol(state) | |
def load(self, params, env): | |
self.pol.load(params, env) | |
class SigmoidDenseGaussianPolicy(Policy): | |
def __init__(self, kernel=None, vars_=None, env=None, activation=sigmoid): | |
self.pol = DenseGaussianPolicy(kernel, vars_=vars_, env=env, activation=activation) | |
def __call__(self, state): | |
return self.pol(state) | |
def load(self, params, env): | |
self.pol.load(params, env) | |
def optimize_gdense_policy(trial, env_name, n_episodes=5, T=10000, min_param=-3., max_param=3., var_max=3.): | |
env = gym.make(env_name) | |
kernel = np.zeros((env.action_space.shape[0], env.observation_space.shape[0])) | |
vs = np.zeros(env.action_space.shape[0]) | |
ctr = 0 | |
for i in range(env.action_space.shape[0]): | |
vs[i] = trial.suggest_float("vars"+str(i), 0, var_max) | |
for j in range(env.observation_space.shape[0]): | |
kernel[i,j] = trial.suggest_float("kernel"+str(ctr), min_param, max_param) | |
ctr += 1 | |
policy = DenseGaussianPolicy(kernel, vs, env) | |
result = run_policy(policy, env_name, n_episodes=n_episodes, T=T) | |
return result | |
def optimize_dense_policy(trial, env_name, n_episodes=5, T=10000, min_param=-3., max_param=3., var_max=3.): | |
env = gym.make(env_name) | |
kernel = np.zeros((env.action_space.shape[0], env.observation_space.shape[0])) | |
ctr = 0 | |
for i in range(env.action_space.shape[0]): | |
for j in range(env.observation_space.shape[0]): | |
kernel[i,j] = trial.suggest_float("kernel"+str(ctr), min_param, max_param) | |
ctr += 1 | |
policy = DensePolicy(kernel, env) | |
result = run_policy(policy, env_name, n_episodes=n_episodes, T=T) | |
return result | |
def optimize_sdense_policy(trial, env_name, n_episodes=5, T=10000, min_param=-3., max_param=3., var_max=3.): | |
env = gym.make(env_name) | |
kernel = np.zeros((env.action_space.shape[0], env.observation_space.shape[0])) | |
ctr = 0 | |
for i in range(env.action_space.shape[0]): | |
for j in range(env.observation_space.shape[0]): | |
kernel[i,j] = trial.suggest_float("kernel"+str(ctr), min_param, max_param) | |
ctr += 1 | |
policy = SigmoidDensePolicy(kernel, env) | |
result = run_policy(policy, env_name, n_episodes=n_episodes, T=T) | |
return result | |
def optimize_sgdense_policy(trial, env_name, n_episodes=5, T=10000, min_param=-3., max_param=3., var_max=3.): | |
env = gym.make(env_name) | |
kernel = np.zeros((env.action_space.shape[0], env.observation_space.shape[0])) | |
vs = np.zeros(env.action_space.shape[0]) | |
ctr = 0 | |
for i in range(env.action_space.shape[0]): | |
vs[i] = trial.suggest_float("vars"+str(i), 0, var_max) | |
for j in range(env.observation_space.shape[0]): | |
kernel[i,j] = trial.suggest_float("kernel"+str(ctr), min_param, max_param) | |
ctr += 1 | |
policy = SigmoidDenseGaussianPolicy(kernel, vs, env) | |
result = run_policy(policy, env_name, n_episodes=n_episodes, T=T) | |
return result | |
def run_policy(policy, env_name, n_episodes=5, T=1000): | |
env = gym.make(env_name) | |
Rs = [] | |
lens = [] | |
for n in range(n_episodes): | |
obs = env.reset() | |
R = 0 | |
l = 0 | |
for t in range(T): | |
action = policy(obs) | |
obs, rew, done, infos = env.step(action) | |
R += rew | |
l += 1 | |
if done: | |
Rs.append(R) | |
lens.append(l) | |
console.log(f"Episode: {n}\tReturn: {R}\tLength: {l}\n") | |
R = 0 | |
l = 0 | |
break | |
console.log(f"Mean Return this step: {np.mean(Rs)}\tStd Return this step: {np.std(Rs)}\tMean episode length this step: {np.mean(lens)}\tStd episode length this step: {np.std(lens)}") | |
return np.mean(Rs) | |
def video_rollout(policy, params, env_name, n_episodes, horizon, save_dir): | |
env = gym.make(env_name) | |
env = gym.wrappers.Monitor(env, directory=save_dir, force=True) | |
policy = policy(env=env) | |
policy.load(params, env) | |
Rs = [] | |
lens = [] | |
for n in range(n_episodes): | |
obs = env.reset() | |
R = 0 | |
l = 0 | |
for t in range(horizon): | |
action = policy(obs) | |
obs, rew, done, infos = env.step(action) | |
R += rew | |
l += 1 | |
if done: | |
Rs.append(R) | |
lens.append(l) | |
console.log(f"Episode: {n}\tReturn: {R}\tLength: {l}\n") | |
R = 0 | |
l = 0 | |
break | |
return | |
@click.command() | |
@click.option("--env-name", "-env", type=str, default="InvertedPendulumBulletEnv-v0") | |
@click.option("--n-trials", "-trials", type=int, default=100) | |
@click.option("--n-episodes", "-neps", type=int, default=5) | |
@click.option("--horizon", "-t", type=int, default=1000) | |
@click.option("--search-sampler", "-search", type=str, default="cma") | |
@click.option("--policy-type", "-policy", type=str, default="gkernel") | |
@click.option("--save-params", "-save", type=bool, default=False) | |
@click.option("--play-best", "-play", type=bool, default=True) | |
def train(env_name, n_trials, n_episodes, horizon, search_sampler, policy_type, save_params, play_best): | |
allowed_samplers = ("cma", "tpe", "random") | |
allowed_policies = ("gdense", "dense", "sdense", "sgdense") | |
assert search_sampler in allowed_samplers, f"{search_sampler} not supported. Pick one of {allowed_samplers}" | |
assert policy_type in allowed_policies, f"{policy_type} not supported. Pick one of {allowed_policies}" | |
if search_sampler == "cma": | |
sampler = optuna.samplers.CmaEsSampler() | |
elif search_sampler == "tpe": | |
sampler = optuna.samplers.TPESampler() | |
elif search_sampler == "random": | |
sampler = optuna.samplers.RandomSampler() | |
else: | |
sampler = optuna.samplers.CmaEsSampler() | |
if policy_type == "gdense": | |
pol_fcn = optimize_gdense_policy | |
policy = DenseGaussianPolicy | |
elif policy_type == "dense": | |
pol_fcn = optimize_dense_policy | |
policy = DensePolicy | |
elif policy_type == "sgdense": | |
pol_fcn = optimize_sdense_policy | |
policy = SigmoidDenseGaussianPolicy | |
elif policy_type == "sdense": | |
pol_fcn = optimize_sdense_policy | |
policy = SigmoidDensePolicy | |
else: | |
raise ValueError(f"Picked unsupported policy! Available options are {allowed_policies}") | |
study = optuna.create_study(direction="maximize", sampler=sampler) | |
study.optimize(lambda trial: pol_fcn(trial, env_name, n_episodes=n_episodes, T=horizon), n_trials=n_trials) | |
best_params = study.best_params | |
if save_params: | |
import pickle as pkl | |
import os | |
path = f"params/{env_name}/{policy_type}/{int(time.time())}" | |
os.makedirs(path, exist_ok=True) | |
with open(path+"params.pkl", "wb") as f: | |
pkl.dump(best_params, f) | |
if play_best: | |
path = f"videos/{env_name}/{policy_type}/{int(time.time())}/" | |
video_rollout(policy, best_params, env_name, n_episodes, horizon, path) | |
if __name__ == "__main__": | |
train() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment