Mixing Reinforcement Learning (RL) and Evolution Strategy (ES) using Stable-Baselines
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gym | |
import numpy as np | |
import cma | |
from collections import OrderedDict | |
from stable_baselines import A2C | |
def flatten(params): | |
""" | |
:param params: (dict) | |
:return: (np.ndarray) | |
""" | |
params_ = [] | |
for key in params.keys(): | |
params_.append(params[key].flatten()) | |
return np.concatenate(params_) | |
def to_dict(flat_vec, params): | |
""" | |
:param flat_vec: (np.ndarray) | |
:param params: (OrderedDict) | |
:return: (OrderedDict) | |
""" | |
params_ = OrderedDict() | |
start_idx = 0 | |
for key in params.keys(): | |
n_elem = params[key].size | |
params_[key] = flat_vec[start_idx:start_idx + n_elem].reshape(params[key].shape) | |
start_idx += n_elem | |
return params_ | |
def filter_policy_params(params): | |
""" | |
Include only variables with "/pi/" (policy) or "/shared" (shared layers) | |
in their name: Only these ones affect the action. | |
:param params: (OrderedDict) | |
:return: (OrderedDict) | |
""" | |
return OrderedDict((key, value) for key, value in params.items() | |
if ("/pi/" in key or "/shared" in key)) | |
def evaluate(env, model): | |
""" | |
Return mean fitness (negative sum of episodic rewards) | |
for given model. | |
:param env: (gym.Env) | |
:param model: (RL Model) | |
:return: (float) | |
""" | |
episode_rewards = [] | |
for _ in range(10): | |
reward_sum = 0 | |
done = False | |
obs = env.reset() | |
while not done: | |
action, _states = model.predict(obs) | |
obs, reward, done, info = env.step(action) | |
reward_sum += reward | |
episode_rewards.append(reward_sum) | |
return - np.mean(episode_rewards) | |
# Create RL model with a small policy network | |
model = A2C('MlpPolicy', 'CartPole-v1', ent_coef=0.0, learning_rate=0.1, | |
policy_kwargs={'net_arch': [8]}, verbose=0) | |
# Use RL actor-critic policy gradient updates to | |
# find good initial parameters | |
model.learn(total_timesteps=5000) | |
# Get the parameters as the starting point for ES | |
model_params = model.get_parameters() | |
# Get only the policy parameters | |
policy_params = filter_policy_params(model_params) | |
# Create the Evolution Strategy (ES) object | |
es = cma.CMAEvolutionStrategy(flatten(policy_params), sigma0=1) | |
for iteration in range(10): | |
# Create population of candidates and evaluate them | |
candidates, fitnesses = es.ask(), [] | |
for candidate in candidates: | |
# Load new policy parameters to agent. | |
model.load_parameters(to_dict(candidate, policy_params), exact_match=False) | |
# Evaluate the agent using stable-baselines predict function | |
fitnesses.append(evaluate(model.get_env(), model)) | |
# CMA-ES update | |
es.tell(candidates, fitnesses) | |
# Display some training infos | |
mean_fitness = np.mean(sorted(fitnesses)[:int(0.1 * len(candidates))]) | |
print("Iteration {:<3} Mean top 10% reward: {:.2f}".format(iteration, -mean_fitness)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment