Created
September 14, 2016 02:23
-
-
Save imironhead/3d4e2f71cc32aa8be1628aa625c8433c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Solve OpenAI Gym Cartpole V1 with DQN. | |
""" | |
import gym | |
import numpy as np | |
import tensorflow as tf | |
class DeepQLearningAgent(object): | |
def __init__(self, state_space, action_space): | |
""" | |
initialize the agent. Solve the problem without temporal difference. | |
""" | |
self._action_space = action_space | |
self._dim_state = state_space.shape[0] | |
self._dim_action = action_space.n | |
self._batch_size = 400 | |
self._gamma = 0.95 | |
self._prev_state = None | |
self._prev_action = None | |
self._prev_reward = 0 | |
# Build a neural network with tensorflow. The goal is to mapping an | |
# observation to action-values (the Q). If the Q is optimized, ask the | |
# action-values before each step, then pick the action with higher | |
# value. The action with higher value is the action would give us more | |
# rewards. | |
# | |
# value(prev_state) = reward + gamma * value*(next_state) | |
w1 = tf.random_uniform([self._dim_state, 128], -1.0, 1.0) | |
w1 = tf.Variable(w1) | |
b1 = tf.random_uniform([128], -1.0, 1.0) | |
b1 = tf.Variable(b1) | |
w2 = tf.random_uniform([128, 128], -1.0, 1.0) | |
w2 = tf.Variable(w2) | |
b2 = tf.random_uniform([128], -1.0, 1.0) | |
b2 = tf.Variable(b2) | |
w3 = tf.random_uniform([128, 128], -1.0, 1.0) | |
w3 = tf.Variable(w3) | |
b3 = tf.random_uniform([128], -1.0, 1.0) | |
b3 = tf.Variable(b3) | |
w4 = tf.random_uniform([128, self._dim_action], -1.0, 1.0) | |
w4 = tf.Variable(w4) | |
b4 = tf.random_uniform([self._dim_action], -1.0, 1.0) | |
b4 = tf.Variable(b4) | |
prev_states = tf.placeholder(tf.float32, [None, self._dim_state]) | |
hidden_1 = tf.nn.relu(tf.matmul(prev_states, w1) + b1) | |
hidden_2 = tf.nn.relu(tf.matmul(hidden_1, w2) + b2) | |
hidden_3 = tf.nn.relu(tf.matmul(hidden_2, w3) + b3) | |
prev_action_values = tf.squeeze(tf.matmul(hidden_3, w4) + b4) | |
prev_action_masks = \ | |
tf.placeholder(tf.float32, [None, self._dim_action]) | |
prev_values = tf.reduce_sum( | |
tf.mul(prev_action_values, prev_action_masks), reduction_indices=1) | |
prev_rewards = tf.placeholder(tf.float32, [None, ]) | |
next_states = tf.placeholder(tf.float32, [None, self._dim_state]) | |
hidden_1 = tf.nn.relu(tf.matmul(next_states, w1) + b1) | |
hidden_2 = tf.nn.relu(tf.matmul(hidden_1, w2) + b2) | |
hidden_3 = tf.nn.relu(tf.matmul(hidden_2, w3) + b3) | |
next_action_values = tf.squeeze(tf.matmul(hidden_3, w4) + b4) | |
next_values = prev_rewards + self._gamma * \ | |
tf.reduce_max(next_action_values, reduction_indices=1) | |
loss = tf.reduce_mean(tf.square(prev_values - next_values)) | |
training = tf.train.AdamOptimizer(1e-4).minimize(loss) | |
self._tf_action_value_predict = prev_action_values | |
self._tf_prev_states = prev_states | |
self._tf_prev_action_masks = prev_action_masks | |
self._tf_prev_rewards = prev_rewards | |
self._tf_next_states = next_states | |
self._tf_training = training | |
self._tf_loss = loss | |
self._tf_session = tf.InteractiveSession() | |
self._tf_session.run(tf.initialize_all_variables()) | |
# Build the D which keeps experiences. | |
self._time = 0 | |
self._epislon = 1.0 | |
self._epislon_decay_time = 100 | |
self._epislon_decay_rate = 0.95 | |
self._experiences_max = 4000 | |
self._experiences_num = 0 | |
self._experiences_prev_states = \ | |
np.zeros((self._experiences_max, self._dim_state)) | |
self._experiences_next_states = \ | |
np.zeros((self._experiences_max, self._dim_state)) | |
self._experiences_rewards = \ | |
np.zeros((self._experiences_max)) | |
self._experiences_actions_mask = \ | |
np.zeros((self._experiences_max, self._dim_action)) | |
def create_experience(self, prev_state, prev_action, reward, next_state): | |
""" | |
keep an experience for later training. | |
""" | |
if self._experiences_num >= self._experiences_max: | |
# make sure we always have some painalities. | |
a = self._experiences_max * 8 / 10 | |
b = self._experiences_max - a | |
if reward > 0.0: | |
idx = np.random.choice(a) | |
else: | |
idx = np.random.choice(b) + a | |
else: | |
idx = self._experiences_num | |
self._experiences_num += 1 | |
self._experiences_prev_states[idx] = np.array(prev_state) | |
self._experiences_next_states[idx] = np.array(next_state) | |
self._experiences_rewards[idx] = reward | |
self._experiences_actions_mask[idx] = np.zeros(self._dim_action) | |
self._experiences_actions_mask[idx, prev_action] = 1.0 | |
def train(self): | |
""" | |
train the deep q-learning network. | |
""" | |
# start training only when there are enough experiences. | |
if self._experiences_num < self._experiences_max: | |
return | |
ixs = np.random.choice( | |
self._experiences_max, self._batch_size, replace=True) | |
fatches = [self._tf_loss, self._tf_training] | |
feed = { | |
self._tf_prev_states: self._experiences_prev_states[ixs], | |
self._tf_prev_action_masks: self._experiences_actions_mask[ixs], | |
self._tf_prev_rewards: self._experiences_rewards[ixs], | |
self._tf_next_states: self._experiences_next_states[ixs] | |
} | |
loss, _ = self._tf_session.run(fatches, feed_dict=feed) | |
def act(self, observation, reward, done): | |
""" | |
ask the next action from the agent | |
""" | |
self._time += 1 | |
if self._time % self._epislon_decay_time == 0: | |
self._epislon *= self._epislon_decay_rate | |
if np.random.rand() > self._epislon: | |
states = np.array([observation]) | |
action_values = self._tf_action_value_predict.eval( | |
feed_dict={self._tf_prev_states: states}) | |
action = np.argmax(action_values) | |
else: | |
action = self._action_space.sample() | |
if self._prev_state is not None: | |
self.create_experience( | |
self._prev_state, self._prev_action, reward, observation) | |
self._prev_state = None if done else observation | |
self._prev_action = None if done else action | |
self._prev_reward = 0 if done else self._prev_reward + reward | |
self.train() | |
return action | |
if __name__ == '__main__': | |
env = gym.make('CartPole-v1') | |
env.monitor.start('training_dir', force=True) | |
max_episodes = 2000 | |
max_steps = 500 | |
running_reward = [] | |
agent = DeepQLearningAgent(env.observation_space, env.action_space) | |
for episode in xrange(max_episodes): | |
observation, reward, done = env.reset(), 0.0, False | |
for step in xrange(max_steps): | |
if done and step + 1 < max_steps: | |
reward = -5000.0 | |
observation = np.zeros_like(observation) | |
action = agent.act(observation, reward, done) | |
if done or step + 1 == max_steps: | |
running_reward.append(step) | |
if len(running_reward) > 100: | |
running_reward = running_reward[-100:] | |
avg_reward = sum(running_reward) / float(len(running_reward)) | |
print "{} - {} - {}".format(episode, step, avg_reward) | |
break | |
observation, reward, done, _ = env.step(action) | |
env.monitor.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Differences between openai_cartpole_v0_dqn_2.py and openai_cartpole_v1_dqn_0.py:
I guess the difference between environments of v0 & v1 is the final average rewards. For more rewards, some episodes become unacceptable (the episodes those move the cart to the edge slowly must become slowlier or change direction).