Skip to content

Instantly share code, notes, and snippets.

@apicquot
Forked from quq99/DQN.py
Last active August 26, 2017 16:35
Show Gist options
  • Save apicquot/a262e2a10fe567544043b5201134799c to your computer and use it in GitHub Desktop.
Save apicquot/a262e2a10fe567544043b5201134799c to your computer and use it in GitHub Desktop.
a DQN code solving CartPole-v0
Works with Tensorflow 1.2 and Python 3.5
import gym
from gym import wrappers
import tensorflow as tf
import numpy as np
import random
from collections import deque
# Hyper Parameters for DQN
gamma = 0.9 # discount factor for target Q
epsilonMax = 0.50 # starting value of epsilon .5 for carpole
epsilonMin = 0.01 # final value of epsilon
epsilonN = 10000 # 10000 for cartpole
replaySize = 10000 # experience replay buffer size
batchSize = 32 # size of minibatch
lr = 0.0003
modelSaveFrequency =1000000
hidden_dims = [16,16,16]
envName = 'CartPole-v0'
# envName = 'CartPole-v1'
#envName = 'Acrobot-v1'
nEpisodesMax = 2000 # Episode limitation
nStepsMax = 200 # Step limitation in an episode
testFrequency = 20
nTests = 10 # The number of experiment test every 100 episode
class AgentDQN():
def __init__(self, env):
# init experience replay
self.memory = deque(maxlen=replaySize)
# init some parameters
self.stepIdx = 0
self.lr = lr
self.epsilon = epsilonMax
self.state_dim = env.observation_space.shape[0]
self.action_dim = env.action_space.n
self.create_Q_network()
self.create_training_method()
# Init session
self.session = tf.InteractiveSession()
self.session.run(tf.global_variables_initializer())
# loading networks
self.saver = tf.train.Saver()
checkpoint = tf.train.get_checkpoint_state("saved_networks")
if checkpoint and checkpoint.model_checkpoint_path:
self.saver.restore(self.session, checkpoint.model_checkpoint_path)
print ("Successfully loaded:", checkpoint.model_checkpoint_path)
else:
print("Could not find old network weights")
global summary_writer
summary_writer = tf.summary.FileWriter('tensorboard/', graph=self.session.graph)
def create_Q_network(self):
# input layer
self.state_input = tf.placeholder("float", [None, self.state_dim])
if False:
#original creation of the network - no longer used
# network weights
hidden_dim = 50
W1 = self.weight_variable([self.state_dim, hidden_dim])
b1 = self.bias_variable([hidden_dim])
W2 = self.weight_variable([hidden_dim, self.action_dim])
b2 = self.bias_variable([self.action_dim])
# hidden layers
h_layer = tf.nn.relu(tf.matmul(self.state_input, W1) + b1)
# Q Value layer
self.Q_value = tf.matmul(h_layer, W2) + b2
if True: #creation using layers much easier - allows easy config multi hidden layers
hidden = self.state_input
for dim in hidden_dims:
hidden = tf.layers.dense(hidden, dim, activation=tf.nn.relu)
# finale layer is linear
self.Q_value = tf.layers.dense(hidden, self.action_dim, activation=None)
def create_training_method(self):
self.action_input = tf.placeholder(shape=[None, self.action_dim],dtype=tf.float32) # one hot presentation
self.y_input = tf.placeholder(shape=[None],dtype=tf.float32)
Q_action = tf.reduce_sum(tf.multiply(self.Q_value, self.action_input), reduction_indices=1)
self.loss = tf.reduce_mean(tf.square(self.y_input - Q_action))
tf.summary.scalar("loss", self.loss)
global merged_summary_op
merged_summary_op = tf.summary.merge_all()
self.optimizer = tf.train.AdamOptimizer(self.lr).minimize(self.loss)
self.action_holder = tf.argmax(self.action_input,axis=1,output_type=tf.int32)
Q_shape = tf.shape(self.Q_value,out_type=tf.int32)
Q_indexes = tf.range(0, Q_shape[0]) * Q_shape[1] + self.action_holder
# Q_action3 = tf.gather(tf.reshape(self.Q_value, [-1]), Q_indexes)
# self.loss3 = tf.reduce_mean(tf.square(tf.subtract(self.y_input,Q_action3)))
# self.optimizer3 = tf.train.AdamOptimizer(self.lr).minimize(self.loss3)
self.next_state_input = tf.placeholder("float", [None, self.state_dim])
def perceive(self, state, action, reward, next_state, done):
one_hot_action = np.zeros(self.action_dim)
one_hot_action[action] = 1
self.memory.append((state, one_hot_action, reward, next_state, done))
if len(self.memory) > batchSize:
self.train_Q_network()
def train_Q_network(self):
self.stepIdx += 1
# Step 1: obtain random minibatch from replay memory
minibatch = random.sample(self.memory, batchSize)
state_batch = [data[0] for data in minibatch]
action_batch = [data[1] for data in minibatch]
reward_batch = [data[2] for data in minibatch]
next_state_batch = [data[3] for data in minibatch]
# Step 2: calculate y
y_batch = []
Q_value_batch = self.Q_value.eval(feed_dict={self.state_input: next_state_batch})
for i in range(0, batchSize):
done = minibatch[i][4]
if done:
y_batch.append(reward_batch[i])
else:
y_batch.append(reward_batch[i] + gamma * np.max(Q_value_batch[i]))
self.optimizer.run(feed_dict={
self.y_input: y_batch,
self.action_input: action_batch,
self.state_input: state_batch
})
summary_str = self.session.run(merged_summary_op, feed_dict={
self.y_input: y_batch,
self.action_input: action_batch,
self.state_input: state_batch
})
summary_writer.add_summary(summary_str, self.stepIdx)
# save network every 1000 iteration
if (self.stepIdx+1) % modelSaveFrequency == 0:
self.saver.save(self.session, 'saved_networks/' + 'network' + '-dqn', global_step=self.stepIdx)
def egreedy_action(self, state):
Q_value = self.Q_value.eval(feed_dict={ \
self.state_input: [state] \
})[0]
self.epsilon = max(epsilonMin,epsilonMax + (epsilonMin-epsilonMax) * self.stepIdx / epsilonN)
if random.random() <= self.epsilon:
return random.randint(0, self.action_dim - 1)
else:
return np.argmax(Q_value)
"""
def boltzman_action(self,state):
"" "
Sample from categorical distribution,
specified by a vector of class probabilities
boltzmann , better than e-greedy
"" "
Q_value = self.Q_value.eval(feed_dict = {\
self.state_input:[state] \
})[0]
prob_n = np.asarray(Q_value)
csprob_n = np.cumsum(prob_n)
return (csprob_n > np.random.rand()).argmax()
"""
def action(self, state):
return np.argmax(self.Q_value.eval(feed_dict={
self.state_input: [state]
})[0])
def weight_variable(self, shape):
initial = tf.truncated_normal(shape)
return tf.Variable(initial)
def bias_variable(self, shape):
initial = tf.constant(0.01, shape=shape)
return tf.Variable(initial)
def main():
# initialize OpenAI Gym env and dqn agent
env = gym.make(envName)
# set seeds to 0
random.seed(0)
np.random.seed(0)
env.seed(0)
tf.set_random_seed(0)
agent = AgentDQN(env)
# record results
#env = wrappers.Monitor(env, "./tmp/CartPole-v0-dqn")
for episodeIdx in range(nEpisodesMax):
# initialize task
state = env.reset()
# Train
for step in range(nStepsMax):
action = agent.egreedy_action(state) # e-greedy action for train
next_state, reward, done, _ = env.step(action)
agent.perceive(state, action, reward, next_state, done)
state = next_state
if done:
break
if (episodeIdx+1) % testFrequency == 0:
total_reward = 0
for i in range(nTests):
state = env.reset()
for j in range(nStepsMax):
env.render()
action = agent.action(state) # direct action for test
state, reward, done, _ = env.step(action)
total_reward += reward
if done:
break
ave_reward = total_reward / nTests
print('episode: ', episodeIdx, 'Evaluation Average Reward:', ave_reward)
print('epsilon: {}'.format(agent.epsilon))
# if ave_reward >= 200:
# break
# upload results and make a gist
env.close()
# gym.upload
if __name__ == '__main__':
main()
@apicquot
Copy link
Author

syntax changes to make it work on tensorflow>1.0 and Python 3

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment