Create a gist now

Instantly share code, notes, and snippets.

@kashif /cem.md
Last active May 12, 2018

What would you like to do?
Cross Entropy Method

Cross Entropy Method

How do we solve for the policy optimization problem which is to maximize the total reward given some parametrized policy?

Discounted future reward

To begin with, for an episode the total reward is the sum of all the rewards. If our environment is stochastic, we can never be sure if we will get the same rewards the next time we perform the same actions. Thus the more we go into the future the more the total future reward may diverge. So for that reason it is common to use the discounted future reward where the parameter discount is called the discount factor and is between 0 and 1.

A good strategy for an agent would be to always choose an action that maximizes the (discounted) future reward. In other words we want to maximize the expected reward per episode.

Parametrized policy

A stochastic policy is defined as a conditional probability of some action given a state. A family of policies indexed by a parameter vector theta are called parametrized policies. These policies are defined analogous to the supervised learning classification or regression problems. In the case of discrete policies we output a vector of probabilities of the possible actions and in the case of continuous policies we output a mean and diagonal covariance of a Gaussian distribution from which we can then sample our continous actions.

Cross entropy method (CEM)

So how do we solve for the policy optimization problem of maximizing the total (discounted) reward given some parametrized policy? The simplest approach is the derivative free optimization (DFO) which looks at this problem as a black box with respect to the parameter theta. We try out many different theta and store the rewards for each episode. The main idea then is to move towards good theta.

One particular DFO approach is called the CEM. Here at any point in time, you maintain a distribution over parameter vectors and move the distribution towards parameters with higher reward. This works surprisingly well, even if its not that effictive when theta is a high dimensional vector.

Algorithm

The idea is to initialize the mean and sigma of a Gaussian and then for n_iter times we:

  1. collect batch_size samples of theta from a Gaussian with the current mean and sigma
  2. perform a noisy evaluation to get the total rewards with these thetas
  3. select n_elite of the best thetas into an elite set
  4. upate our mean and sigma to be that from the elite set

Discrete linear policy

For the CartPole-v0 case let us define the linear parametrized policy as the following diagram:

         │               ┌───theta ~ N(mean,std)───┐
         │
   4 observations        [[ 2.2  4.5 ]
[-0.1 -0.4  0.06  0.5] *  [ 3.4  0.2 ]  + [[ 0.2 ]
         |                [ 4.2  3.4 ]     [ 1.1 ]]
         │                [ 0.1  9.0 ]]
         |                     W              b
    ┌────o────┐
<─0─│2 actions│─1─>    = [-0.4  0.1] ──argmax()─> 1
    └─o─────o─┘

Which means we can use the Space introspection of the env to create an appropriatly sized theta parameter vector from which we can use a part as the matrix W and the rest as the bias vector b so that the number of output probabilities correspond to the number of actions of our particular env.

Extra noise

We can also add extra decayed noise to our distribution in the form of extra_cov which decays after extra_decay_time iterations.

Discounted total reward

We can also return the discounted total reward per episode via the discount parameter of the do_episode function:

...
for t in xrange(num_steps):
  ...
  disc_total_rew += reward * discount**t
  ...
import numpy as np
import gym
from gym.spaces import Discrete, Box
# ================================================================
# Policies
# ================================================================
class DeterministicDiscreteActionLinearPolicy(object):
def __init__(self, theta, ob_space, ac_space):
"""
dim_ob: dimension of observations
n_actions: number of actions
theta: flat vector of parameters
"""
dim_ob = ob_space.shape[0]
n_actions = ac_space.n
assert len(theta) == (dim_ob + 1) * n_actions
self.W = theta[0 : dim_ob * n_actions].reshape(dim_ob, n_actions)
self.b = theta[dim_ob * n_actions : None].reshape(1, n_actions)
def act(self, ob):
"""
"""
y = ob.dot(self.W) + self.b
a = y.argmax()
return a
class DeterministicContinuousActionLinearPolicy(object):
def __init__(self, theta, ob_space, ac_space):
"""
dim_ob: dimension of observations
dim_ac: dimension of action vector
theta: flat vector of parameters
"""
self.ac_space = ac_space
dim_ob = ob_space.shape[0]
dim_ac = ac_space.shape[0]
assert len(theta) == (dim_ob + 1) * dim_ac
self.W = theta[0 : dim_ob * dim_ac].reshape(dim_ob, dim_ac)
self.b = theta[dim_ob * dim_ac : None]
def act(self, ob):
a = np.clip(ob.dot(self.W) + self.b, self.ac_space.low, self.ac_space.high)
return a
def do_episode(policy, env, num_steps, discount=1.0, render=False):
disc_total_rew = 0
ob = env.reset()
for t in xrange(num_steps):
a = policy.act(ob)
(ob, reward, done, _info) = env.step(a)
disc_total_rew += reward * discount**t
if render and t%3==0:
env.render()
if done: break
return disc_total_rew
env = None
def noisy_evaluation(theta, discount=0.90):
policy = make_policy(theta)
reward = do_episode(policy, env, num_steps, discount)
return reward
def make_policy(theta):
if isinstance(env.action_space, Discrete):
return DeterministicDiscreteActionLinearPolicy(theta,
env.observation_space, env.action_space)
elif isinstance(env.action_space, Box):
return DeterministicContinuousActionLinearPolicy(theta,
env.observation_space, env.action_space)
else:
raise NotImplementedError
# Task settings:
env = gym.make('CartPole-v0') # Change as needed
num_steps = 500 # maximum length of episode
env.monitor.start('/tmp/cartpole-experiment-1')
# Alg settings:
n_iter = 20 # number of iterations of CEM
batch_size = 25 # number of samples per batch
elite_frac = 0.2 # fraction of samples used as elite set
n_elite = int(batch_size * elite_frac)
extra_std = 2.0
extra_decay_time = 10
if isinstance(env.action_space, Discrete):
dim_theta = (env.observation_space.shape[0]+1) * env.action_space.n
elif isinstance(env.action_space, Box):
dim_theta = (env.observation_space.shape[0]+1) * env.action_space.shape[0]
else:
raise NotImplementedError
# Initialize mean and standard deviation
theta_mean = np.zeros(dim_theta)
theta_std = np.ones(dim_theta)
# Now, for the algorithm
for itr in xrange(n_iter):
# Sample parameter vectors
extra_cov = max(1.0 - itr / extra_decay_time, 0) * extra_std**2
thetas = np.random.multivariate_normal(mean=theta_mean,
cov=np.diag(np.array(theta_std**2) + extra_cov),
size=batch_size)
rewards = np.array(map(noisy_evaluation, thetas))
# Get elite parameters
elite_inds = rewards.argsort()[-n_elite:]
elite_thetas = thetas[elite_inds]
# Update theta_mean, theta_std
theta_mean = elite_thetas.mean(axis=0)
theta_std = elite_thetas.std(axis=0)
print "iteration %i. mean f: %8.3g. max f: %8.3g"%(itr, np.mean(rewards), np.max(rewards))
do_episode(make_policy(theta_mean), env, num_steps, discount=0.90, render=True)
env.monitor.close()
gym.upload('/tmp/cartpole-experiment-1', api_key='sk_fzIs7KDMTtGbSREbrO4yfg')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment