Skip to content

Instantly share code, notes, and snippets.

@jietang

jietang/README Secret

Created June 23, 2016 03:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jietang/e7f572e95634fca6e85359a567f8bebc to your computer and use it in GitHub Desktop.
Save jietang/e7f572e95634fca6e85359a567f8bebc to your computer and use it in GitHub Desktop.
CEM writeup
This algorithm is an implementation of the cross-entropy method using feed-forward neural networks as the policy class.
(code adapted from the gym example cem agent and joschu's modular_rl Github repo)
Parameters: 2 hidden layers of size 10 and 5, 50 iterations, 200 evaluated models per iteration, top 20% retained, initial parameter variance of 1.0, initial parameter mean all zeros, initial seed 0.
For Doom environments the input image was scaled down to 40x40, then whitened (per component)
To reproduce:
python cem.py {CartPole-v0,Acrobot-v0,...}
from __future__ import print_function
import gym
import logging
import numpy as np
try:
import cPickle as pickle
except ImportError:
import pickle
import json, sys, os
from os import path
from mlp import DeterministicAgent, FilteredEnv, RGBImageToVector, WhitenFilter
import argparse
import cProfile
def do_cprofile_generator(func):
def profiled_func(*args, **kwargs):
profile = cProfile.Profile()
try:
profile.enable()
for item in func(*args, **kwargs):
yield item
profile.disable()
finally:
profile.print_stats()
return profiled_func
# @do_cprofile_generator
def cem(f, th_mean, batch_size, n_iter, elite_frac, initial_std=1.0):
"""
Generic implementation of the cross-entropy method for maximizing a black-box function
f: a function mapping from vector -> scalar
th_mean: initial mean over input distribution
batch_size: number of samples of theta to evaluate per batch
n_iter: number of batches
elite_frac: each batch, select this fraction of the top-performing samples
initial_std: initial standard deviation over parameter vectors
"""
n_elite = int(np.round(batch_size*elite_frac))
th_std = np.ones_like(th_mean) * initial_std
for _ in range(n_iter):
ths = np.array([th_mean + dth for dth in th_std[None,:]*np.random.randn(batch_size, th_mean.size)])
ys = np.array([f(th) for th in ths])
elite_inds = ys.argsort()[::-1][:n_elite]
elite_ths = ths[elite_inds]
th_mean = elite_ths.mean(axis=0)
th_std = elite_ths.std(axis=0)
yield {'ys' : ys, 'theta_mean' : th_mean, 'y_mean' : ys.mean()}
def do_rollout(agent, env, num_steps, render=False):
total_rew = 0
ob = env.reset()
obs = [ob]
for t in range(num_steps):
a = agent.act(ob)
(ob, reward, done, _info) = env.step(a)
obs.append(ob)
total_rew += reward
if render and t%3==0: env.render()
if done:
if isinstance(env, FilteredEnv):
for ob in obs:
env.ob_filter.update(ob)
break
return total_rew, t+1
if __name__ == '__main__':
logger = logging.getLogger()
logger.setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument('--display', action='store_true')
parser.add_argument('--debug', action='store_true')
parser.add_argument('target', nargs="?", default="CartPole-v0")
args = parser.parse_args()
env = gym.make(args.target)
env.seed(0)
np.random.seed(0)
if 'Doom' in args.target:
from doom_py import ScreenResolution
env.configure(check_bounds=False, screen_resolution=ScreenResolution.RES_160X120)
# observation and action transforms for Doom
# we transform the input space by
rgb_filter = RGBImageToVector()
whiten_filter = WhitenFilter(rgb_filter.out_width * rgb_filter.out_height, clip=5)
# ugh, have to proxy these over
obs_filter = lambda x: whiten_filter(rgb_filter(x))
obs_filter.output_shape = lambda x: whiten_filter.output_shape(x)
obs_filter.update = lambda x: whiten_filter.update(x)
env = FilteredEnv(env, obs_filter, None)
if args.debug:
params = dict(n_iter=25, batch_size=25, elite_frac=0.2)
else:
params = dict(n_iter=50, batch_size=200, elite_frac=0.2)
num_steps = 200 if args.debug else 10000 # use built-in env semantics if not debug
# You provide the directory to write to (can be an existing
# directory, but can't contain previous monitor results. You can
# also dump to a tempdir if you'd like: tempfile.mkdtemp().
outdir = '/tmp/cem-agent-results'
if not args.debug:
env.monitor.start(outdir, force=True)
# Prepare snapshotting
# ----------------------------------------
def writefile(fname, s):
with open(path.join(outdir, fname), 'w') as fh: fh.write(s)
info = {}
info['params'] = params
info['argv'] = sys.argv
info['env_id'] = env.spec.id
# ------------------------------------------
# initial_parameters = np.zeros(env.observation_space.shape[0]+1)
# def noisy_evaluation(theta):
# agent = BinaryActionLinearPolicy(theta)
# rew, T = do_rollout(agent, env, num_steps)
# return rew
agent = DeterministicAgent(
env.observation_space,
env.action_space,
dict(hid_sizes=[10, 5]),
)
initial_parameters = np.zeros(agent.get_flat().shape)
def noisy_evaluation(theta):
agent.set_from_flat(theta)
rew, T = do_rollout(agent, env, num_steps)
return rew
# Train the agent, and snapshot each stage
for (i, iterdata) in enumerate(
cem(noisy_evaluation, initial_parameters, **params)):
print('Iteration %2i. Episode mean reward: %7.3f'%(i, iterdata['y_mean']))
agent.set_from_flat(iterdata['theta_mean'])
if args.display: do_rollout(agent, env, 200, render=True)
if not args.debug:
env.monitor.close()
logger.info("Successfully ran CEM. Now trying to upload results to the scoreboard. If it breaks, you can always just try re-uploading the same results.")
if not args.debug:
gym.upload(outdir, algorithm_id='alg_ZIW1ntx0RQ2LZpw309n7A')
import numpy as np
from gym import Env, spaces
from gym.spaces import Box, Discrete, HighLow
import keras
from keras.models import Sequential
from keras.layers.core import Dense
import theano
import theano.tensor as T
import scipy
floatX = theano.config.floatX # pylint: disable=E1101
keras.backend.set_floatx(floatX)
keras.backend.set_epsilon(1e-7)
FNOPTS = dict(allow_input_downcast=True, on_unused_input='ignore')
# cfg is a list of ints representing the size of each hidden layer "hid_sizes"
def make_deterministic_mlp(ob_space, ac_space, cfg):
assert isinstance(ob_space, Box)
hid_sizes = cfg["hid_sizes"]
if isinstance(ac_space, Box):
outdim = ac_space.shape[0]
probtype = DiagGauss(outdim)
elif isinstance(ac_space, Discrete):
outdim = ac_space.n
probtype = Categorical(outdim)
elif isinstance(ac_space, HighLow):
outdim = ac_space.num_rows
probtype = BernoulliHighLow(ac_space)
else:
print "huh jt: ", ac_space
net = Sequential()
for (i, layeroutsize) in enumerate(hid_sizes):
inshp = dict(input_shape=ob_space.shape) if i==0 else {}
net.add(Dense(layeroutsize, activation="tanh", **inshp))
inshp = dict(input_shape=ob_space.shape) if len(hid_sizes) == 0 else {}
net.add(Dense(outdim, **inshp))
Wlast = net.layers[-1].W
Wlast.set_value(Wlast.get_value(borrow=True)*0.1)
policy = StochPolicyKeras(net, probtype)
return policy
def flatten(arrs):
return np.concatenate([arr.flat for arr in arrs])
def unflatten(vec, shapes):
i=0
arrs = []
for shape in shapes:
size = np.prod(shape)
arr = vec[i:i+size].reshape(shape)
arrs.append(arr)
return arrs
class EzPickle(object):
"""Objects that are pickled and unpickled via their constructor
arguments.
Example usage:
class Dog(Animal, EzPickle):
def __init__(self, furcolor, tailkind="bushy"):
Animal.__init__()
EzPickle.__init__(furcolor, tailkind)
...
When this object is unpickled, a new Dog will be constructed by passing the provided
furcolor and tailkind into the constructor. However, philosophers are still not sure
whether it is still the same dog.
This is generally needed only for environments which wrap C/C++ code, such as MuJoCo
and Atari.
"""
def __init__(self, *args, **kwargs):
self._ezpickle_args = args
self._ezpickle_kwargs = kwargs
def __getstate__(self):
return {"_ezpickle_args" : self._ezpickle_args, "_ezpickle_kwargs": self._ezpickle_kwargs}
def __setstate__(self, d):
out = type(self)(*d["_ezpickle_args"], **d["_ezpickle_kwargs"])
self.__dict__.update(out.__dict__)
class StochPolicy(object):
@property
def probtype(self):
raise NotImplementedError
@property
def trainable_variables(self):
raise NotImplementedError
@property
def input(self):
raise NotImplementedError
def get_output(self):
raise NotImplementedError
def act(self, ob, stochastic=True):
prob = self._act_prob(ob[None])
if stochastic:
return self.probtype.sample(prob)[0]
# return self.probtype.sample(prob)[0], {"prob" : prob[0]}
else:
# return self.probtype.maxprob(prob)[0], {"prob" : prob[0]}
return self.probtype.maxprob(prob)[0]
def finalize(self):
self._act_prob = theano.function([self.input], self.get_output(), **FNOPTS)
class StochPolicyKeras(StochPolicy, EzPickle):
def __init__(self, net, probtype):
EzPickle.__init__(self, net, probtype)
self._net = net
self._probtype = probtype
self.finalize()
@property
def probtype(self):
return self._probtype
@property
def net(self):
return self._net
@property
def trainable_variables(self):
return self._net.trainable_weights
@property
def variables(self):
return self._net.get_params()[0]
@property
def input(self):
return self._net.input
def get_output(self):
return self._net.output
def get_updates(self):
self._net.output #pylint: disable=W0104
return self._net.updates
def get_flat(self):
return flatten(self.net.get_weights())
def set_from_flat(self, th):
weights = self.net.get_weights()
self._weight_shapes = [weight.shape for weight in weights]
self.net.set_weights(unflatten(th, self._weight_shapes))
def categorical_sample(prob_nk):
"""
Sample from categorical distribution
Each row specifies class probabilities
"""
prob_nk = np.asarray(prob_nk)
assert prob_nk.ndim == 2
N = prob_nk.shape[0]
csprob_nk = np.cumsum(prob_nk, axis=1)
return np.argmax(csprob_nk > np.random.rand(N,1), axis=1)
class BernoulliHighLow(object):
# Each output is a likelihood of max/min in the dimension
def __init__(self, highlow):
self.highlow = highlow
def sample(self, prob):
raise NotImplementedError
def maxprob(self, prob):
idxes = (prob[0] < 0).astype(int)
return self.highlow.matrix[xrange(idxes.shape[0]), idxes].A
class Categorical(object):
def __init__(self, n):
self.n = n
def sampled_variable(self):
return T.ivector('a')
def prob_variable(self):
return T.matrix('prob')
def likelihood(self, a, prob):
return prob[T.arange(prob.shape[0]), a]
def loglikelihood(self, a, prob):
return T.log(self.likelihood(a, prob))
def kl(self, prob0, prob1):
return (prob0 * T.log(prob0/prob1)).sum(axis=1)
def entropy(self, prob0):
return - (prob0 * T.log(prob0)).sum(axis=1)
def sample(self, prob):
return categorical_sample(prob)
def maxprob(self, prob):
return prob.argmax(axis=1)
class DiagGauss(object):
def __init__(self, d):
self.d = d
def sampled_variable(self):
return T.matrix('a')
def prob_variable(self):
return T.matrix('prob')
def loglikelihood(self, a, prob):
mean0 = prob[:,:self.d]
std0 = prob[:, self.d:]
# exp[ -(a - mu)^2/(2*sigma^2) ] / sqrt(2*pi*sigma^2)
return - 0.5 * T.square((a - mean0) / std0).sum(axis=1) - 0.5 * T.log(2.0 * np.pi) * self.d - T.log(std0).sum(axis=1)
def likelihood(self, a, prob):
return T.exp(self.loglikelihood(a, prob))
def kl(self, prob0, prob1):
mean0 = prob0[:, :self.d]
std0 = prob0[:, self.d:]
mean1 = prob1[:, :self.d]
std1 = prob1[:, self.d:]
return T.log(std1 / std0).sum(axis=1) + ((T.square(std0) + T.square(mean0 - mean1)) / (2.0 * T.square(std1))).sum(axis=1) - 0.5 * self.d
def entropy(self, prob):
std_nd = prob[:, self.d:]
return T.log(std_nd).sum(axis=1) + .5 * np.log(2 * np.pi * np.e) * self.d
def sample(self, prob):
mean_nd = prob[:, :self.d]
std_nd = prob[:, self.d:]
return np.random.randn(prob.shape[0], self.d).astype(floatX) * std_nd + mean_nd
def maxprob(self, prob):
return prob[:, :self.d]
class AgentWithPolicy(object):
def __init__(self, policy):
self.policy = policy
self.stochastic = True
def set_stochastic(self, stochastic):
self.stochastic = stochastic
def act(self, ob_no):
return self.policy.act(ob_no, stochastic = self.stochastic)
def get_flat(self):
return self.policy.get_flat()
def set_from_flat(self, th):
return self.policy.set_from_flat(th)
class DeterministicAgent(AgentWithPolicy):
def __init__(self, ob_space, ac_space, usercfg):
policy = make_deterministic_mlp(ob_space, ac_space, usercfg)
AgentWithPolicy.__init__(self, policy)
self.set_stochastic(False)
class FilteredEnv(Env): #pylint: disable=W0223
def __init__(self, env, ob_filter, rew_filter):
self.env = env
# copy over relevant parts of the child env
self.spec = self.env.spec
self.metadata = self.env.metadata
self.action_space = self.env.action_space
ob_space = self.env.observation_space
shape = ob_filter.output_shape(ob_space)
self.observation_space = spaces.Box(-np.inf, np.inf, shape)
self.ob_filter = ob_filter
self.rew_filter = rew_filter
def _step(self, ac):
ob, rew, done, info = self.env._step(ac)
nob = self.ob_filter(ob) if self.ob_filter else ob
nrew = self.rew_filter(rew) if self.rew_filter else rew
info["reward_raw"] = rew
return (nob, nrew, done, info)
def _reset(self):
ob = self.env.reset()
return self.ob_filter(ob) if self.ob_filter else ob
def _render(self, *args, **kw):
self.env.render(*args, **kw)
def rgb2gray(rgb):
r, g, b = rgb[:,:,0], rgb[:,:,1], rgb[:,:,2]
gray = 0.2989 * r + 0.5870 * g + 0.1140 * b
return gray
class RGBImageToVector(object):
def __init__(self, out_width=40, out_height=40):
self.out_width = out_width
self.out_height = out_height
def __call__(self, obs):
# obs is an M x N x 3 rgb image, want an (out_width x out_height,)
# vector
# nearly 2x faster to downsample then grayscale
downsample = scipy.misc.imresize(obs, (self.out_width, self.out_height, 3))
grayscale = rgb2gray(downsample)
flatten = grayscale.reshape(self.out_width * self.out_height)
return flatten
def output_shape(self, x):
return self.out_width * self.out_height
class WhitenFilter(object):
"""
y = (x-mean)/std
using running estimates of mean,std
"""
def __init__(self, shape, demean=True, destd=True, clip=10.0):
self.demean = demean
self.destd = destd
self.clip = clip
self.shape = shape
self.rs = RunningStat(shape)
def output_shape(self, x):
return self.shape
def __call__(self, x):
if self.demean:
x = x - self.rs.mean
if self.destd:
x = x / (self.rs.std+1e-8)
if self.clip:
x = np.clip(x, -self.clip, self.clip)
return x
def update(self, x):
self.rs.push(x)
# http://www.johndcook.com/blog/standard_deviation/
class RunningStat(object):
def __init__(self, shape):
self._n = 0
self._M = np.zeros(shape)
self._S = np.zeros(shape)
def push(self, x):
x = np.asarray(x)
assert x.shape == self._M.shape
self._n += 1
if self._n == 1:
self._M[...] = x
else:
oldM = self._M.copy()
self._M[...] = oldM + (x - oldM)/self._n
self._S[...] = self._S + (x - oldM)*(x - self._M)
@property
def n(self):
return self._n
@property
def mean(self):
return self._M
@property
def var(self):
return self._S/(self._n - 1) if self._n > 1 else np.square(self._M)
@property
def std(self):
return np.sqrt(self.var)
@property
def shape(self):
return self._M.shape
@Baichenjia
Copy link

Szita I, Lörincz A. Learning Tetris using the noisy cross-entropy method.[J]. Neural Computation, 2006, 18(12):2936.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment