Skip to content

Instantly share code, notes, and snippets.

@dapatil211
Last active October 16, 2017 10:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dapatil211/6957ee71a4bc0d916419df9155fd9408 to your computer and use it in GitHub Desktop.
Save dapatil211/6957ee71a4bc0d916419df9155fd9408 to your computer and use it in GitHub Desktop.
# coding: utf-8
# In[1]:
import gym
import numpy as np
import tensorflow as tf
import queue
import threading
import multiprocessing
import time
# In[2]:
MODEL_DIR = "model3/"
CHECKPOINT_DIR = os.path.join(MODEL_DIR, "checkpoints")
# In[3]:
class Model():
def __init__(self, ob_dim, ac_dim, sess, ent_coef=.01, max_grad_norm=0.5, lr=7e-4,
total_timesteps=int(1e6), lrschedule='linear'):
action = tf.placeholder(dtype=tf.float32, shape=[None, ac_dim], name='action')
advantage = tf.placeholder(dtype=tf.float32, shape=[None, 1], name='adv')
reward = tf.placeholder(dtype=tf.float32, shape=[None, 1], name='reward')
X = tf.placeholder(dtype=tf.float32, shape=[None, ob_dim], name='action')
w_init = tf.random_normal_initializer(0., .1)
with tf.variable_scope("policy"):
fc1 = tf.contrib.layers.fully_connected(X, 256, weights_initializer=w_init)
mu = tf.contrib.layers.fully_connected(fc1, ac_dim, activation_fn=tf.tanh, weights_initializer=w_init)
sigma = tf.contrib.layers.fully_connected(fc1, ac_dim, activation_fn=tf.nn.softplus, weights_initializer=w_init)
with tf.variable_scope("value"):
fc1 = tf.contrib.layers.fully_connected(X, 256, weights_initializer=w_init)
v = tf.contrib.layers.fully_connected(fc1, 1, activation_fn=None, weights_initializer=w_init)
policy = tf.distributions.Normal(mu, sigma)
neg_log_p = tf.reduce_sum(-policy.log_prob(action), 1)
vf_loss = tf.reduce_mean(tf.square(tf.squeeze(v) - reward))
entropy = tf.reduce_sum(policy.entropy())
pg_loss = tf.reduce_mean(advantage * neg_log_p)
pi_loss = pg_loss - ent_coef * entropy
sample_a = policy.sample()
pi_optim = tf.train.AdamOptimizer(learning_rate=.0005)
update_pi = pi_optim.minimize(pi_loss)
vf_optim = tf.train.AdamOptimizer()
update_vf = vf_optim.minimize(vf_loss)
def train(states, actions, rewards, values):
advs = rewards - values
policy_loss, ent, value_loss, _, _ = sess.run([pg_loss, entropy, vf_loss, update_pi, update_vf],
{X:states, action:actions, reward:rewards, advantage:advs})
return policy_loss, ent, value_loss
def predict(states):
samp_a, v_pred = sess.run([sample_a, v], {X:states})
return samp_a, v_pred
def get_value(states):
v_pred = sess.run([v], {X:states})
return v_pred
self.train = train
self.predict = predict
self.get_value = get_value
# In[4]:
class Worker():
def __init__(self, env, model, coord, q, i):
self.i = i
self.env = env
self.model = model
self.coord = coord
self.q = q
def work(self):
done = True
step = 0
episode = 0
ep_r = 0.
while not self.coord.should_stop():
if done:
print('%d, %d: %f' % (self.i, episode, ep_r))
episode += 1
ep_r = 0.
ob = self.env.reset()
ob = np.squeeze(ob)
action, value = self.model.predict(ob[np.newaxis, ...])
action, value = action, np.squeeze(value)
next_ob, reward, done, _ = self.env.step(action)
next_ob = np.squeeze(next_ob)
ep_r += reward
if self.q.full():
self.q.join()
else:
self.q.put((self.i, step, (ob, next_ob, action, value, reward, done)))
ob = next_ob
step -= 1
# In[5]:
class Runner():
def __init__(self, num_envs, make_env, num_batch, model, coord, discount = .9):
self.q = queue.Queue(maxsize=num_batch)
workers = []
self.model = model
self.discount = discount
self.coord = coord
self.threads = []
for i in range(num_envs):
workers.append(Worker(make_env(), model, coord, self.q, i))
for w in workers:
t = threading.Thread(target=lambda: w.work())
t.start()
self.threads.append(t)
def finished_batch(self, num_processed):
for i in range(num_processed):
self.q.task_done()
def run(self):
while not self.q.full():
time.sleep(.5)
prev_i = -1
rewards = []
actions = []
states = []
values = []
num_processed = 0
while not self.q.empty():
i, step, (ob, next_ob, action, value, reward, done) = self.q.get()
if done:
cum_rew = 0
elif prev_i != i:
cum_rew = np.squeeze(self.model.get_value(next_ob[np.newaxis, ...])[0])
prev_i = i
cum_rew = reward + cum_rew * self.discount
states.append(ob)
actions.append(action)
values.append(value)
rewards.append(cum_rew)
num_processed += 1
rewards = np.vstack(rewards)
actions = np.vstack(actions)
states = np.vstack(states)
values = np.vstack(values)
return rewards, actions, states, values, num_processed
# In[6]:
def make_env():
return gym.make('Pendulum-v0')
# In[7]:
def write_summaries(summary_writer, pg_loss, ent, vf_loss, steps):
summary = tf.Summary()
summary.value.add(tag='Policy Loss', simple_value=pg_loss)
summary.value.add(tag='Entropy', simple_value=ent)
summary.value.add(tag='Value Function Loss', simple_value=vf_loss)
summary_writer.add_summary(summary, steps)
summary_writer.flush()
# In[8]:
def test(interval, sess, coord, model, env, summary_writer, saver):
step = 0
with sess.as_default(), sess.graph.as_default():
while not coord.should_stop():
total_reward = 0
length = 0
for i in range(10):
state = np.array(env.reset())
done = False
while not done:
state = state[np.newaxis, ...]
action, v = model.predict(state)
action, v = action, np.squeeze(v)
#action, v = np.squeeze(action), np.squeeze(v)
state, reward, done, _ = env.step(action)
state = state.squeeze()
total_reward += reward
length += 1
episode_summary = tf.Summary()
episode_summary.value.add(simple_value=total_reward/10., tag="eval/total_reward")
episode_summary.value.add(simple_value=length/10., tag="eval/episode_length")
summary_writer.add_summary(episode_summary, step)
summary_writer.flush()
saver.save(sess, CHECKPOINT_DIR + '/my-model', global_step=step)
step += 1
time.sleep(interval)
# In[9]:
def learn(total_steps):
with tf.Session() as sess:
env = make_env()
num_threads = multiprocessing.cpu_count()
summary_writer = tf.summary.FileWriter(os.path.join(MODEL_DIR, "train"))
coord = tf.train.Coordinator()
model = Model(env.observation_space.shape[0], env.action_space.shape[0], sess)
sess.run(tf.global_variables_initializer())
runner = Runner(num_threads, make_env, 10 * num_threads, model, coord)
saver = tf.train.Saver(keep_checkpoint_every_n_hours=1.0, max_to_keep=5)
latest_checkpoint = tf.train.latest_checkpoint(CHECKPOINT_DIR)
if latest_checkpoint:
print("Loading model checkpoint: {}".format(latest_checkpoint))
saver.restore(sess, latest_checkpoint)
monitor_thread = threading.Thread(target=lambda:test(5, sess, coord, model, env, summary_writer, saver))
monitor_thread.start()
steps = 0
while steps < total_steps:
rewards, actions, states, values, num_processed = runner.run()
pg_loss, ent, vf_loss = model.train(states, actions, rewards, values)
runner.finished_batch(num_processed)
write_summaries(summary_writer, pg_loss, ent, vf_loss, steps)
steps += num_processed
coord.request_stop()
coord.join(runner.threads)
coord.join([monitor_thread])
# In[ ]:
def main():
learn(100000)
# In[ ]:
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment