Skip to content

Instantly share code, notes, and snippets.

@ovechkin-dm
Last active September 20, 2018 15:57
Show Gist options
  • Save ovechkin-dm/7210471b240d993f5b6fa9962317efe2 to your computer and use it in GitHub Desktop.
Save ovechkin-dm/7210471b240d993f5b6fa9962317efe2 to your computer and use it in GitHub Desktop.
Natural Evolution Strategies Cartpole example
import gym
import numpy as np
import tensorflow as tf
state_dim = 4
action_dim = 2
population_size = 5
std = 0.1
alpha = 0.1
def get_plain_shape(s):
if len(s) == 2:
return s[0] * s[1]
else:
return s[0]
env = gym.make("CartPole-v1")
with tf.variable_scope("net"):
cur_input = tf.placeholder(dtype=tf.float32, shape=[1, state_dim])
d1 = tf.layers.dense(cur_input, 64, tf.nn.elu)
d2 = tf.layers.dense(d1, 64, tf.nn.elu)
cur_out = tf.layers.dense(d1, action_dim, tf.identity)
optim = tf.train.AdamOptimizer(alpha)
cur_vars = tf.trainable_variables("net")
accum_vars = [tf.Variable(tf.zeros_like(tv.initialized_value()), trainable=False) for tv in cur_vars]
assign_grad_vars = [tf.Variable(tf.zeros_like(tv.initialized_value()), trainable=False) for tv in cur_vars]
assign_op = tf.group([cur_vars[i].assign(accum_vars[i]) for i in range(len(accum_vars))])
apply_grad_op = optim.apply_gradients(zip(assign_grad_vars, cur_vars))
plain_w_len = sum([get_plain_shape(x.get_shape()) for x in cur_vars])
session = tf.Session()
session.run(tf.global_variables_initializer())
def make_rollout():
s0 = env.reset()
terminal = False
cum_rew = 0
while not terminal:
values = session.run(cur_out, feed_dict={
cur_input: np.array([s0])
})[0]
action = np.argmax(values)
s0, r, terminal, _ = env.step(action)
cum_rew += r
return cum_rew
def to_plain(weights):
return np.concatenate([w.reshape(-1) for w in weights], axis=0)
def from_plain(plain_w):
pos = 0
result = []
for v in cur_vars:
shape = v.get_shape()
n = get_plain_shape(v.get_shape())
result.append(plain_w[pos:pos + n].reshape(shape))
return result
def assign_plain_weights(plain_w):
weights = from_plain(plain_w)
fd = {}
for i in range(len(weights)):
fd[accum_vars[i]] = weights[i]
session.run(assign_op, fd)
def apply_grads(plain_grads):
g = from_plain(plain_grads)
fd = {}
for i in range(len(g)):
fd[assign_grad_vars[i]] = -g[i]
session.run(apply_grad_op, fd)
while True:
cur_plain_w = to_plain(session.run(cur_vars))
candidates = np.random.randn(population_size, plain_w_len) * std
returns = np.zeros(population_size)
for i in range(population_size):
cand = candidates[i] + cur_plain_w
assign_plain_weights(cand)
returns[i] = make_rollout()
print(returns.mean())
returns = (returns - returns.mean()) / (returns.std() + 0.0001)
w_diff = np.dot(candidates.T, returns)
assign_plain_weights(cur_plain_w)
grads = w_diff / (population_size * std)
apply_grads(grads)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment