-
-
Save jietang/e7f572e95634fca6e85359a567f8bebc to your computer and use it in GitHub Desktop.
CEM writeup
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This algorithm is an implementation of the cross-entropy method using feed-forward neural networks as the policy class. | |
(code adapted from the gym example cem agent and joschu's modular_rl Github repo) | |
Parameters: 2 hidden layers of size 10 and 5, 50 iterations, 200 evaluated models per iteration, top 20% retained, initial parameter variance of 1.0, initial parameter mean all zeros, initial seed 0. | |
For Doom environments the input image was scaled down to 40x40, then whitened (per component) | |
To reproduce: | |
python cem.py {CartPole-v0,Acrobot-v0,...} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import gym | |
import logging | |
import numpy as np | |
try: | |
import cPickle as pickle | |
except ImportError: | |
import pickle | |
import json, sys, os | |
from os import path | |
from mlp import DeterministicAgent, FilteredEnv, RGBImageToVector, WhitenFilter | |
import argparse | |
import cProfile | |
def do_cprofile_generator(func): | |
def profiled_func(*args, **kwargs): | |
profile = cProfile.Profile() | |
try: | |
profile.enable() | |
for item in func(*args, **kwargs): | |
yield item | |
profile.disable() | |
finally: | |
profile.print_stats() | |
return profiled_func | |
# @do_cprofile_generator | |
def cem(f, th_mean, batch_size, n_iter, elite_frac, initial_std=1.0): | |
""" | |
Generic implementation of the cross-entropy method for maximizing a black-box function | |
f: a function mapping from vector -> scalar | |
th_mean: initial mean over input distribution | |
batch_size: number of samples of theta to evaluate per batch | |
n_iter: number of batches | |
elite_frac: each batch, select this fraction of the top-performing samples | |
initial_std: initial standard deviation over parameter vectors | |
""" | |
n_elite = int(np.round(batch_size*elite_frac)) | |
th_std = np.ones_like(th_mean) * initial_std | |
for _ in range(n_iter): | |
ths = np.array([th_mean + dth for dth in th_std[None,:]*np.random.randn(batch_size, th_mean.size)]) | |
ys = np.array([f(th) for th in ths]) | |
elite_inds = ys.argsort()[::-1][:n_elite] | |
elite_ths = ths[elite_inds] | |
th_mean = elite_ths.mean(axis=0) | |
th_std = elite_ths.std(axis=0) | |
yield {'ys' : ys, 'theta_mean' : th_mean, 'y_mean' : ys.mean()} | |
def do_rollout(agent, env, num_steps, render=False): | |
total_rew = 0 | |
ob = env.reset() | |
obs = [ob] | |
for t in range(num_steps): | |
a = agent.act(ob) | |
(ob, reward, done, _info) = env.step(a) | |
obs.append(ob) | |
total_rew += reward | |
if render and t%3==0: env.render() | |
if done: | |
if isinstance(env, FilteredEnv): | |
for ob in obs: | |
env.ob_filter.update(ob) | |
break | |
return total_rew, t+1 | |
if __name__ == '__main__': | |
logger = logging.getLogger() | |
logger.setLevel(logging.INFO) | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--display', action='store_true') | |
parser.add_argument('--debug', action='store_true') | |
parser.add_argument('target', nargs="?", default="CartPole-v0") | |
args = parser.parse_args() | |
env = gym.make(args.target) | |
env.seed(0) | |
np.random.seed(0) | |
if 'Doom' in args.target: | |
from doom_py import ScreenResolution | |
env.configure(check_bounds=False, screen_resolution=ScreenResolution.RES_160X120) | |
# observation and action transforms for Doom | |
# we transform the input space by | |
rgb_filter = RGBImageToVector() | |
whiten_filter = WhitenFilter(rgb_filter.out_width * rgb_filter.out_height, clip=5) | |
# ugh, have to proxy these over | |
obs_filter = lambda x: whiten_filter(rgb_filter(x)) | |
obs_filter.output_shape = lambda x: whiten_filter.output_shape(x) | |
obs_filter.update = lambda x: whiten_filter.update(x) | |
env = FilteredEnv(env, obs_filter, None) | |
if args.debug: | |
params = dict(n_iter=25, batch_size=25, elite_frac=0.2) | |
else: | |
params = dict(n_iter=50, batch_size=200, elite_frac=0.2) | |
num_steps = 200 if args.debug else 10000 # use built-in env semantics if not debug | |
# You provide the directory to write to (can be an existing | |
# directory, but can't contain previous monitor results. You can | |
# also dump to a tempdir if you'd like: tempfile.mkdtemp(). | |
outdir = '/tmp/cem-agent-results' | |
if not args.debug: | |
env.monitor.start(outdir, force=True) | |
# Prepare snapshotting | |
# ---------------------------------------- | |
def writefile(fname, s): | |
with open(path.join(outdir, fname), 'w') as fh: fh.write(s) | |
info = {} | |
info['params'] = params | |
info['argv'] = sys.argv | |
info['env_id'] = env.spec.id | |
# ------------------------------------------ | |
# initial_parameters = np.zeros(env.observation_space.shape[0]+1) | |
# def noisy_evaluation(theta): | |
# agent = BinaryActionLinearPolicy(theta) | |
# rew, T = do_rollout(agent, env, num_steps) | |
# return rew | |
agent = DeterministicAgent( | |
env.observation_space, | |
env.action_space, | |
dict(hid_sizes=[10, 5]), | |
) | |
initial_parameters = np.zeros(agent.get_flat().shape) | |
def noisy_evaluation(theta): | |
agent.set_from_flat(theta) | |
rew, T = do_rollout(agent, env, num_steps) | |
return rew | |
# Train the agent, and snapshot each stage | |
for (i, iterdata) in enumerate( | |
cem(noisy_evaluation, initial_parameters, **params)): | |
print('Iteration %2i. Episode mean reward: %7.3f'%(i, iterdata['y_mean'])) | |
agent.set_from_flat(iterdata['theta_mean']) | |
if args.display: do_rollout(agent, env, 200, render=True) | |
if not args.debug: | |
env.monitor.close() | |
logger.info("Successfully ran CEM. Now trying to upload results to the scoreboard. If it breaks, you can always just try re-uploading the same results.") | |
if not args.debug: | |
gym.upload(outdir, algorithm_id='alg_ZIW1ntx0RQ2LZpw309n7A') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from gym import Env, spaces | |
from gym.spaces import Box, Discrete, HighLow | |
import keras | |
from keras.models import Sequential | |
from keras.layers.core import Dense | |
import theano | |
import theano.tensor as T | |
import scipy | |
floatX = theano.config.floatX # pylint: disable=E1101 | |
keras.backend.set_floatx(floatX) | |
keras.backend.set_epsilon(1e-7) | |
FNOPTS = dict(allow_input_downcast=True, on_unused_input='ignore') | |
# cfg is a list of ints representing the size of each hidden layer "hid_sizes" | |
def make_deterministic_mlp(ob_space, ac_space, cfg): | |
assert isinstance(ob_space, Box) | |
hid_sizes = cfg["hid_sizes"] | |
if isinstance(ac_space, Box): | |
outdim = ac_space.shape[0] | |
probtype = DiagGauss(outdim) | |
elif isinstance(ac_space, Discrete): | |
outdim = ac_space.n | |
probtype = Categorical(outdim) | |
elif isinstance(ac_space, HighLow): | |
outdim = ac_space.num_rows | |
probtype = BernoulliHighLow(ac_space) | |
else: | |
print "huh jt: ", ac_space | |
net = Sequential() | |
for (i, layeroutsize) in enumerate(hid_sizes): | |
inshp = dict(input_shape=ob_space.shape) if i==0 else {} | |
net.add(Dense(layeroutsize, activation="tanh", **inshp)) | |
inshp = dict(input_shape=ob_space.shape) if len(hid_sizes) == 0 else {} | |
net.add(Dense(outdim, **inshp)) | |
Wlast = net.layers[-1].W | |
Wlast.set_value(Wlast.get_value(borrow=True)*0.1) | |
policy = StochPolicyKeras(net, probtype) | |
return policy | |
def flatten(arrs): | |
return np.concatenate([arr.flat for arr in arrs]) | |
def unflatten(vec, shapes): | |
i=0 | |
arrs = [] | |
for shape in shapes: | |
size = np.prod(shape) | |
arr = vec[i:i+size].reshape(shape) | |
arrs.append(arr) | |
return arrs | |
class EzPickle(object): | |
"""Objects that are pickled and unpickled via their constructor | |
arguments. | |
Example usage: | |
class Dog(Animal, EzPickle): | |
def __init__(self, furcolor, tailkind="bushy"): | |
Animal.__init__() | |
EzPickle.__init__(furcolor, tailkind) | |
... | |
When this object is unpickled, a new Dog will be constructed by passing the provided | |
furcolor and tailkind into the constructor. However, philosophers are still not sure | |
whether it is still the same dog. | |
This is generally needed only for environments which wrap C/C++ code, such as MuJoCo | |
and Atari. | |
""" | |
def __init__(self, *args, **kwargs): | |
self._ezpickle_args = args | |
self._ezpickle_kwargs = kwargs | |
def __getstate__(self): | |
return {"_ezpickle_args" : self._ezpickle_args, "_ezpickle_kwargs": self._ezpickle_kwargs} | |
def __setstate__(self, d): | |
out = type(self)(*d["_ezpickle_args"], **d["_ezpickle_kwargs"]) | |
self.__dict__.update(out.__dict__) | |
class StochPolicy(object): | |
@property | |
def probtype(self): | |
raise NotImplementedError | |
@property | |
def trainable_variables(self): | |
raise NotImplementedError | |
@property | |
def input(self): | |
raise NotImplementedError | |
def get_output(self): | |
raise NotImplementedError | |
def act(self, ob, stochastic=True): | |
prob = self._act_prob(ob[None]) | |
if stochastic: | |
return self.probtype.sample(prob)[0] | |
# return self.probtype.sample(prob)[0], {"prob" : prob[0]} | |
else: | |
# return self.probtype.maxprob(prob)[0], {"prob" : prob[0]} | |
return self.probtype.maxprob(prob)[0] | |
def finalize(self): | |
self._act_prob = theano.function([self.input], self.get_output(), **FNOPTS) | |
class StochPolicyKeras(StochPolicy, EzPickle): | |
def __init__(self, net, probtype): | |
EzPickle.__init__(self, net, probtype) | |
self._net = net | |
self._probtype = probtype | |
self.finalize() | |
@property | |
def probtype(self): | |
return self._probtype | |
@property | |
def net(self): | |
return self._net | |
@property | |
def trainable_variables(self): | |
return self._net.trainable_weights | |
@property | |
def variables(self): | |
return self._net.get_params()[0] | |
@property | |
def input(self): | |
return self._net.input | |
def get_output(self): | |
return self._net.output | |
def get_updates(self): | |
self._net.output #pylint: disable=W0104 | |
return self._net.updates | |
def get_flat(self): | |
return flatten(self.net.get_weights()) | |
def set_from_flat(self, th): | |
weights = self.net.get_weights() | |
self._weight_shapes = [weight.shape for weight in weights] | |
self.net.set_weights(unflatten(th, self._weight_shapes)) | |
def categorical_sample(prob_nk): | |
""" | |
Sample from categorical distribution | |
Each row specifies class probabilities | |
""" | |
prob_nk = np.asarray(prob_nk) | |
assert prob_nk.ndim == 2 | |
N = prob_nk.shape[0] | |
csprob_nk = np.cumsum(prob_nk, axis=1) | |
return np.argmax(csprob_nk > np.random.rand(N,1), axis=1) | |
class BernoulliHighLow(object): | |
# Each output is a likelihood of max/min in the dimension | |
def __init__(self, highlow): | |
self.highlow = highlow | |
def sample(self, prob): | |
raise NotImplementedError | |
def maxprob(self, prob): | |
idxes = (prob[0] < 0).astype(int) | |
return self.highlow.matrix[xrange(idxes.shape[0]), idxes].A | |
class Categorical(object): | |
def __init__(self, n): | |
self.n = n | |
def sampled_variable(self): | |
return T.ivector('a') | |
def prob_variable(self): | |
return T.matrix('prob') | |
def likelihood(self, a, prob): | |
return prob[T.arange(prob.shape[0]), a] | |
def loglikelihood(self, a, prob): | |
return T.log(self.likelihood(a, prob)) | |
def kl(self, prob0, prob1): | |
return (prob0 * T.log(prob0/prob1)).sum(axis=1) | |
def entropy(self, prob0): | |
return - (prob0 * T.log(prob0)).sum(axis=1) | |
def sample(self, prob): | |
return categorical_sample(prob) | |
def maxprob(self, prob): | |
return prob.argmax(axis=1) | |
class DiagGauss(object): | |
def __init__(self, d): | |
self.d = d | |
def sampled_variable(self): | |
return T.matrix('a') | |
def prob_variable(self): | |
return T.matrix('prob') | |
def loglikelihood(self, a, prob): | |
mean0 = prob[:,:self.d] | |
std0 = prob[:, self.d:] | |
# exp[ -(a - mu)^2/(2*sigma^2) ] / sqrt(2*pi*sigma^2) | |
return - 0.5 * T.square((a - mean0) / std0).sum(axis=1) - 0.5 * T.log(2.0 * np.pi) * self.d - T.log(std0).sum(axis=1) | |
def likelihood(self, a, prob): | |
return T.exp(self.loglikelihood(a, prob)) | |
def kl(self, prob0, prob1): | |
mean0 = prob0[:, :self.d] | |
std0 = prob0[:, self.d:] | |
mean1 = prob1[:, :self.d] | |
std1 = prob1[:, self.d:] | |
return T.log(std1 / std0).sum(axis=1) + ((T.square(std0) + T.square(mean0 - mean1)) / (2.0 * T.square(std1))).sum(axis=1) - 0.5 * self.d | |
def entropy(self, prob): | |
std_nd = prob[:, self.d:] | |
return T.log(std_nd).sum(axis=1) + .5 * np.log(2 * np.pi * np.e) * self.d | |
def sample(self, prob): | |
mean_nd = prob[:, :self.d] | |
std_nd = prob[:, self.d:] | |
return np.random.randn(prob.shape[0], self.d).astype(floatX) * std_nd + mean_nd | |
def maxprob(self, prob): | |
return prob[:, :self.d] | |
class AgentWithPolicy(object): | |
def __init__(self, policy): | |
self.policy = policy | |
self.stochastic = True | |
def set_stochastic(self, stochastic): | |
self.stochastic = stochastic | |
def act(self, ob_no): | |
return self.policy.act(ob_no, stochastic = self.stochastic) | |
def get_flat(self): | |
return self.policy.get_flat() | |
def set_from_flat(self, th): | |
return self.policy.set_from_flat(th) | |
class DeterministicAgent(AgentWithPolicy): | |
def __init__(self, ob_space, ac_space, usercfg): | |
policy = make_deterministic_mlp(ob_space, ac_space, usercfg) | |
AgentWithPolicy.__init__(self, policy) | |
self.set_stochastic(False) | |
class FilteredEnv(Env): #pylint: disable=W0223 | |
def __init__(self, env, ob_filter, rew_filter): | |
self.env = env | |
# copy over relevant parts of the child env | |
self.spec = self.env.spec | |
self.metadata = self.env.metadata | |
self.action_space = self.env.action_space | |
ob_space = self.env.observation_space | |
shape = ob_filter.output_shape(ob_space) | |
self.observation_space = spaces.Box(-np.inf, np.inf, shape) | |
self.ob_filter = ob_filter | |
self.rew_filter = rew_filter | |
def _step(self, ac): | |
ob, rew, done, info = self.env._step(ac) | |
nob = self.ob_filter(ob) if self.ob_filter else ob | |
nrew = self.rew_filter(rew) if self.rew_filter else rew | |
info["reward_raw"] = rew | |
return (nob, nrew, done, info) | |
def _reset(self): | |
ob = self.env.reset() | |
return self.ob_filter(ob) if self.ob_filter else ob | |
def _render(self, *args, **kw): | |
self.env.render(*args, **kw) | |
def rgb2gray(rgb): | |
r, g, b = rgb[:,:,0], rgb[:,:,1], rgb[:,:,2] | |
gray = 0.2989 * r + 0.5870 * g + 0.1140 * b | |
return gray | |
class RGBImageToVector(object): | |
def __init__(self, out_width=40, out_height=40): | |
self.out_width = out_width | |
self.out_height = out_height | |
def __call__(self, obs): | |
# obs is an M x N x 3 rgb image, want an (out_width x out_height,) | |
# vector | |
# nearly 2x faster to downsample then grayscale | |
downsample = scipy.misc.imresize(obs, (self.out_width, self.out_height, 3)) | |
grayscale = rgb2gray(downsample) | |
flatten = grayscale.reshape(self.out_width * self.out_height) | |
return flatten | |
def output_shape(self, x): | |
return self.out_width * self.out_height | |
class WhitenFilter(object): | |
""" | |
y = (x-mean)/std | |
using running estimates of mean,std | |
""" | |
def __init__(self, shape, demean=True, destd=True, clip=10.0): | |
self.demean = demean | |
self.destd = destd | |
self.clip = clip | |
self.shape = shape | |
self.rs = RunningStat(shape) | |
def output_shape(self, x): | |
return self.shape | |
def __call__(self, x): | |
if self.demean: | |
x = x - self.rs.mean | |
if self.destd: | |
x = x / (self.rs.std+1e-8) | |
if self.clip: | |
x = np.clip(x, -self.clip, self.clip) | |
return x | |
def update(self, x): | |
self.rs.push(x) | |
# http://www.johndcook.com/blog/standard_deviation/ | |
class RunningStat(object): | |
def __init__(self, shape): | |
self._n = 0 | |
self._M = np.zeros(shape) | |
self._S = np.zeros(shape) | |
def push(self, x): | |
x = np.asarray(x) | |
assert x.shape == self._M.shape | |
self._n += 1 | |
if self._n == 1: | |
self._M[...] = x | |
else: | |
oldM = self._M.copy() | |
self._M[...] = oldM + (x - oldM)/self._n | |
self._S[...] = self._S + (x - oldM)*(x - self._M) | |
@property | |
def n(self): | |
return self._n | |
@property | |
def mean(self): | |
return self._M | |
@property | |
def var(self): | |
return self._S/(self._n - 1) if self._n > 1 else np.square(self._M) | |
@property | |
def std(self): | |
return np.sqrt(self.var) | |
@property | |
def shape(self): | |
return self._M.shape |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Szita I, Lörincz A. Learning Tetris using the noisy cross-entropy method.[J]. Neural Computation, 2006, 18(12):2936.