Skip to content

Instantly share code, notes, and snippets.

@avalcarce
Last active September 24, 2017 17:11
Show Gist options
  • Save avalcarce/6c3d1903aa66566de714580f819ee1cf to your computer and use it in GitHub Desktop.
Save avalcarce/6c3d1903aa66566de714580f819ee1cf to your computer and use it in GitHub Desktop.
RL DQN solution for MountainCar-v0, CartPole-v0 and CartPole-v1 on OpenAI's Gym

Synopsis

This is a Deep Reinforcement Learning solution to some classic control problems. I've used it to solve MountainCar-v0 problem, CartPole-v0 and [CartPole-v1] (https://gym.openai.com/envs/CartPole-v1) in OpenAI's Gym. This code uses Tensorflow to model a value function for a Reinforcement Learning agent. The code is fundamentally a translation of necnec's algorithm with Theano & Lasagne to Tensorflow. I've run it on Python 3.5 under Windows 7.

References

  1. Deep Learning tutorial, David Silver, Google DeepMind.
  2. necnec's algorithm
import numpy as np
class AgentEpsGreedy:
def __init__(self, n_actions, value_function_model, eps=1.):
self.n_actions = n_actions
self.value_func = value_function_model
self.eps = eps
def act(self, state):
action_values = self.value_func.predict([state])[0]
policy = np.ones(self.n_actions) * self.eps / self.n_actions
a_max = np.argmax(action_values)
policy[a_max] += 1. - self.eps
return np.random.choice(self.n_actions, p=policy)
def train(self, states, targets):
return self.value_func.train(states, targets)
def predict_q_values(self, states):
return self.value_func.predict(states)
from collections import deque
import numpy as np
class ReplayMemory:
def __init__(self, max_size=128):
self.memory = deque(maxlen=max_size)
def sample(self, batch_size):
batch_size = min(len(self.memory), batch_size)
idxs = np.random.choice(len(self.memory), batch_size)
return [self.memory[idx] for idx in idxs]
def add(self, item):
self.memory.append(item)
import copy
import gym
from gym import wrappers
import matplotlib.pyplot as plt
import time
from utils import *
from ReplayMemory import ReplayMemory
from agents import AgentEpsGreedy
from valuefunctions import ValueFunctionDQN
# Inspired by necnec's algorithm at:
# https://gym.openai.com/evaluations/eval_89nQ59Y4SbmrlQ0P9pufiA
# And inspired by David Silver's Deep RL tutorial:
# http://www0.cs.ucl.ac.uk/staff/d.silver/web/Resources_files/deep_rl.pdf
results_dir_prefix = '###'
upload = True
discount = 0.99
decay_eps = 0.9
batch_size = 64
max_n_ep = 4000
min_avg_Rwd = 195 # Minimum average reward to consider the problem as solved
n_avg_ep = 100 # Number of consecutive episodes to calculate the average reward
t = get_last_folder_id(results_dir_prefix) + 1 # Calculate next test id
results_dir = results_dir_prefix + '\\' + str(t).zfill(4)
os.makedirs(results_dir)
def run_episode(env,
agent,
state_normalizer,
memory,
batch_size,
discount,
max_step=10000):
state = env.reset()
if state_normalizer is not None:
state = state_normalizer.transform(state)[0]
done = False
total_reward = 0
step_durations_s = np.zeros(shape=max_step, dtype=float)
train_duration_s = np.zeros(shape=max_step-batch_size, dtype=float)
progress_msg = "Step {:5d}/{:5d}. Avg step duration: {:3.1f} ms. Avg train duration: {:3.1f} ms. Loss = {:2.10f}."
loss_v = 0
w1_m = 0
w2_m = 0
w3_m = 0
i = 0
action = 0
for i in range(max_step):
t = time.time()
if i > 0 and i % 200 == 0:
print(progress_msg.format(i, max_step,
np.mean(step_durations_s[0:i])*1000,
np.mean(train_duration_s[0:i-batch_size])*1000,
loss_v))
if done:
break
action = agent.act(state)
state_next, reward, done, info = env.step(action)
total_reward += reward
if state_normalizer is not None:
state_next = state_normalizer.transform(state_next)[0]
memory.add((state, action, reward, state_next, done))
if len(memory.memory) > batch_size: # DQN Experience Replay
states_b, actions_b, rewards_b, states_n_b, done_b = zip(*memory.sample(batch_size))
states_b = np.array(states_b)
actions_b = np.array(actions_b)
rewards_b = np.array(rewards_b)
states_n_b = np.array(states_n_b)
done_b = np.array(done_b).astype(int)
q_n_b = agent.predict_q_values(states_n_b) # Action values on the arriving state
targets_b = rewards_b + (1. - done_b) * discount * np.amax(q_n_b, axis=1)
targets = agent.predict_q_values(states_b)
for j, action in enumerate(actions_b):
targets[j, action] = targets_b[j]
t_train = time.time()
loss_v, w1_m, w2_m, w3_m = agent.train(states_b, targets)
train_duration_s[i - batch_size] = time.time() - t_train
state = copy.copy(state_next)
step_durations_s[i] = time.time() - t # Time elapsed during this step
return loss_v, w1_m, w2_m, w3_m, total_reward
env = gym.make("CartPole-v0")
n_actions = env.action_space.n
state_dim = env.observation_space.high.shape[0]
value_function = ValueFunctionDQN(state_dim=state_dim, n_actions=n_actions, batch_size=batch_size)
agent = AgentEpsGreedy(n_actions=n_actions, value_function_model=value_function, eps=0.9)
memory = ReplayMemory(max_size=100000)
loss_per_ep = []
w1_m_per_ep = []
w2_m_per_ep = []
w3_m_per_ep = []
total_reward = []
ep = 0
avg_Rwd = -np.inf
episode_end_msg = 'loss={:2.10f}, w1_m={:3.1f}, w2_m={:3.1f}, w3_m={:3.1f}, total reward={}'
while avg_Rwd < min_avg_Rwd and ep < max_n_ep:
if ep >= n_avg_ep:
avg_Rwd = np.mean(total_reward[ep-n_avg_ep:ep])
print("EPISODE {}. Average reward over the last {} episodes: {}.".format(ep, n_avg_ep, avg_Rwd))
else:
print("EPISODE {}.".format(ep))
loss_v, w1_m, w2_m, w3_m, cum_R = run_episode(env, agent, None, memory, batch_size=batch_size, discount=discount,
max_step=15000)
print(episode_end_msg.format(loss_v, w1_m, w2_m, w3_m, cum_R))
if agent.eps > 0.0001:
agent.eps *= decay_eps
# Collect episode results
loss_per_ep.append(loss_v)
w1_m_per_ep.append(w1_m)
w2_m_per_ep.append(w2_m)
w3_m_per_ep.append(w3_m)
total_reward.append(cum_R)
ep += 1
env.close()
#####################
# PLOT RESULTS
eps = range(ep)
plt.figure()
plt.subplot(211)
plt.plot(eps, total_reward)
Rwd_avg = movingaverage(total_reward, 100)
plt.plot(eps[len(eps) - len(Rwd_avg):], Rwd_avg)
plt.xlabel("Episode number")
plt.ylabel("Reward per episode")
plt.grid(True)
plt.title("Total reward")
plt.subplot(212)
plt.plot(eps, loss_per_ep)
Loss_avg = movingaverage(loss_per_ep, 100)
plt.plot(eps[len(eps) - len(loss_per_ep):], loss_per_ep)
plt.xlabel("Episode number")
plt.ylabel("Loss per episode")
plt.grid(True)
plt.title("Value function loss")
plt.ion()
plt.show()
plt.figure()
plt.plot(w1_m_per_ep, label="Max w1")
plt.plot(w2_m_per_ep, label="Max w2")
plt.plot(w3_m_per_ep, label="Max w3")
plt.legend()
plt.xlabel("Episode number")
plt.ylabel("Max weights")
plt.grid(True)
plt.title("Maximum weight in Layer 1")
plt.show()
if upload: # Run more episodes with the trained agent
env = gym.make("CartPole-v0")
env = wrappers.Monitor(env, results_dir)
loss_per_ep = []
w1_m_per_ep = []
w2_m_per_ep = []
w3_m_per_ep = []
total_reward = []
ep = 0
avg_Rwd = -np.inf
while avg_Rwd < min_avg_Rwd and ep < max_n_ep:
if ep >= n_avg_ep:
avg_Rwd = np.mean(total_reward[ep - n_avg_ep:ep])
print("EPISODE {}. Average reward over the last {} episodes: {}.".format(ep, n_avg_ep, avg_Rwd))
else:
print("EPISODE {}.".format(ep))
_, _, _, _, cum_R = run_episode(env, agent, None, memory, batch_size=batch_size,
discount=discount,
max_step=15000)
total_reward.append(cum_R)
ep += 1
print("Trying to upload results to the scoreboard.")
env.close()
gym.upload(results_dir, api_key='###')
import copy
import gym
from gym import wrappers
import matplotlib.pyplot as plt
import time
from collections import deque
from utils import *
from ReplayMemory import ReplayMemory
from agents import AgentEpsGreedy
from valuefunctions import ValueFunctionDQN
# Inspired by necnec's algorithm at:
# https://gym.openai.com/evaluations/eval_89nQ59Y4SbmrlQ0P9pufiA
# And inspired by David Silver's Deep RL tutorial:
# http://www0.cs.ucl.ac.uk/staff/d.silver/web/Resources_files/deep_rl.pdf
results_dir_prefix = '###'
upload = True
env_name = "CartPole-v1"
discount = 0.99
decay_eps = 0.9
batch_size = 64
max_n_ep = 10000
min_avg_Rwd = 475 # Minimum average reward to consider the problem as solved
n_avg_ep = 100 # Number of consecutive episodes to calculate the average reward
min_ep_solved = 100 # Minimum number of consecutive steps during which the minimum average reward must be achieved
t = get_last_folder_id(results_dir_prefix) + 1 # Calculate next test id
results_dir = results_dir_prefix + '\\' + str(t).zfill(4)
os.makedirs(results_dir)
def run_episode(env,
agent,
state_normalizer,
memory,
batch_size,
discount,
max_step=10000):
state = env.reset()
if state_normalizer is not None:
state = state_normalizer.transform(state)[0]
done = False
total_reward = 0
step_durations_s = np.zeros(shape=max_step, dtype=float)
train_duration_s = np.zeros(shape=max_step-batch_size, dtype=float)
progress_msg = "Step {:5d}/{:5d}. Avg step duration: {:3.1f} ms. Avg train duration: {:3.1f} ms. Loss = {:2.10f}."
loss_v = 0
w1_m = 0
w2_m = 0
w3_m = 0
i = 0
while True:
t = time.time()
if i > 0 and i % 200 == 0:
print(progress_msg.format(i, max_step,
np.mean(step_durations_s[0:i])*1000,
np.mean(train_duration_s[0:i-batch_size])*1000,
loss_v))
if done:
break
action = agent.act(state)
state_next, reward, done, info = env.step(action)
total_reward += reward
if state_normalizer is not None:
state_next = state_normalizer.transform(state_next)[0]
# DQN's Experience Replay: Store transitions in replay memory
memory.add((state, action, reward, state_next, done))
if len(memory.memory) > batch_size:
# Extract a batch of random transitions from the replay memory
states_b, actions_b, rewards_b, states_n_b, done_b = zip(*memory.sample(batch_size))
states_b = np.array(states_b)
actions_b = np.array(actions_b)
rewards_b = np.array(rewards_b)
states_n_b = np.array(states_n_b)
done_b = np.array(done_b).astype(int)
# I think this is the target definition as described in Nature's DQN paper (Feb 2015).
# I don't think this includes Double DQN. To do Double DQN, two Q networks are necessary, as done in
# https://github.com/dennybritz/reinforcement-learning/blob/master/DQN/Double%20DQN%20Solution.ipynb
q_n_b = agent.predict_q_values(states_n_b) # Action values on the arriving state
targets_b = rewards_b + (1. - done_b) * discount * np.amax(q_n_b, axis=1)
targets = agent.predict_q_values(states_b) # Q(s, ., w-)
for j, action in enumerate(actions_b):
targets[j, action] = targets_b[j] # Q(s, a, w-) = r + gamma*max_a' Q(s', a', w-)
t_train = time.time()
loss_v, w1_m, w2_m, w3_m = agent.train(states_b, targets)
train_duration_s[i - batch_size] = time.time() - t_train
state = copy.copy(state_next)
step_durations_s[i] = time.time() - t # Time elapsed during this step
i += 1
return loss_v, w1_m, w2_m, w3_m, total_reward
env = gym.make(env_name)
if upload:
env = wrappers.Monitor(env, results_dir)
n_actions = env.action_space.n
state_dim = env.observation_space.high.shape[0]
value_function = ValueFunctionDQN(state_dim=state_dim, n_actions=n_actions, batch_size=batch_size)
agent = AgentEpsGreedy(n_actions=n_actions, value_function_model=value_function, eps=0.9)
memory = ReplayMemory(max_size=100000)
loss_per_ep = []
w1_m_per_ep = []
w2_m_per_ep = []
w3_m_per_ep = []
total_reward = []
ep = 0
avg_Rwds = deque([-np.inf] * min_ep_solved, maxlen=min_ep_solved)
avg_Rwds_np = np.array([avg_Rwds[i] for i in range(min_ep_solved)])
episode_end_msg = 'loss={:2.10f}, w1_m={:3.1f}, w2_m={:3.1f}, w3_m={:3.1f}, total reward={}'
msg_progress = "Episode {} with a reward of {}. Average reward over the last {} episodes: {}." +\
" Minimum of {} reached in {} of the last {} episodes."
while np.any(avg_Rwds_np < min_avg_Rwd) and ep < max_n_ep:
loss_v, w1_m, w2_m, w3_m, cum_R = run_episode(env, agent, None, memory, batch_size=batch_size, discount=discount,
max_step=15000)
if ep >= n_avg_ep:
avg_Rwd = np.mean(total_reward[ep-n_avg_ep:ep])
avg_Rwds.appendleft(avg_Rwd)
avg_Rwds_np = np.array([avg_Rwds[i] for i in range(min_ep_solved)])
n_solved_eps = np.sum(avg_Rwds_np >= min_avg_Rwd)
print(msg_progress.format(ep, cum_R, n_avg_ep, avg_Rwd, min_avg_Rwd, n_solved_eps, min_ep_solved))
else:
print("Episode {} with a reward of {}.".format(ep, cum_R))
#print(episode_end_msg.format(loss_v, w1_m, w2_m, w3_m, cum_R))
if agent.eps > 0.0001:
agent.eps *= decay_eps
# Collect episode results
loss_per_ep.append(loss_v)
w1_m_per_ep.append(w1_m)
w2_m_per_ep.append(w2_m)
w3_m_per_ep.append(w3_m)
total_reward.append(cum_R)
ep += 1
env.close()
if upload:
print("Trying to upload results to the scoreboard.")
gym.upload(results_dir, api_key='###')
#####################
# PLOT RESULTS
eps = range(ep)
plt.figure()
plt.subplot(211)
plt.plot(eps, total_reward)
Rwd_avg = movingaverage(total_reward, 100)
plt.plot(eps[len(eps) - len(Rwd_avg):], Rwd_avg)
plt.xlabel("Episode number")
plt.ylabel("Reward per episode")
plt.grid(True)
plt.title("Total reward")
plt.subplot(212)
plt.plot(eps, loss_per_ep)
Loss_avg = movingaverage(loss_per_ep, 100)
plt.plot(eps[len(eps) - len(Loss_avg):], Loss_avg)
plt.xlabel("Episode number")
plt.ylabel("Loss per episode")
plt.grid(True)
plt.title("Value function loss")
#plt.ion()
plt.show()
plt.figure()
plt.plot(w1_m_per_ep, label="Max w1")
plt.plot(w2_m_per_ep, label="Max w2")
plt.plot(w3_m_per_ep, label="Max w3")
plt.legend()
plt.xlabel("Episode number")
plt.ylabel("Max weights")
plt.grid(True)
plt.title("Maximum weight in Layer 1")
plt.show()
input("Press Enter to terminate.")
import copy
import gym
from gym import wrappers
import matplotlib.pyplot as plt
import time
from collections import deque
from utils import *
from ReplayMemory import ReplayMemory
from agents import AgentEpsGreedy
from valuefunctions import ValueFunctionDQN3
# Inspired by necnec's algorithm at:
# https://gym.openai.com/evaluations/eval_89nQ59Y4SbmrlQ0P9pufiA
# And inspired by David Silver's Deep RL tutorial:
# http://www0.cs.ucl.ac.uk/staff/d.silver/web/Resources_files/deep_rl.pdf
results_dir_prefix = '###'
upload = True
discount = 0.99
decay_eps = 0.9
batch_size = 64
max_n_ep = 112500
min_avg_Rwd = -110 # Minimum average reward to consider the problem as solved
n_avg_ep = 100 # Number of consecutive episodes to calculate the average reward
min_ep_solved = 200 # Minimum number of consecutive steps during which the minimum average reward must be achieved
t = get_last_folder_id(results_dir_prefix) + 1 # Calculate next test id
results_dir = results_dir_prefix + '\\' + str(t).zfill(4)
os.makedirs(results_dir)
def run_episode(env,
agent,
state_normalizer,
memory,
batch_size,
discount,
max_step=10000):
state = env.reset()
if state_normalizer is not None:
state = state_normalizer.transform(state)[0]
done = False
total_reward = 0
step_durations_s = np.zeros(shape=max_step, dtype=float)
train_duration_s = np.zeros(shape=max_step-batch_size, dtype=float)
progress_msg = "Step {:5d}/{:5d}. Avg step duration: {:3.1f} ms. Avg train duration: {:3.1f} ms. Loss = {:2.10f}."
loss_v = 0
w1_m = 0
w2_m = 0
w3_m = 0
for i in range(max_step):
t = time.time()
if i > 0 and i % 200 == 0:
print(progress_msg.format(i, max_step,
np.mean(step_durations_s[0:i])*1000,
np.mean(train_duration_s[0:i-batch_size])*1000,
loss_v))
if done:
break
action = agent.act(state)
state_next, reward, done, info = env.step(action)
total_reward += reward
if state_normalizer is not None:
state_next = state_normalizer.transform(state_next)[0]
memory.add((state, action, reward, state_next, done))
if len(memory.memory) > batch_size: # DQN Experience Replay
# Extract a batch of random transitions from the replay memory
states_b, actions_b, rewards_b, states_n_b, done_b = zip(*memory.sample(batch_size))
states_b = np.array(states_b)
actions_b = np.array(actions_b)
rewards_b = np.array(rewards_b)
states_n_b = np.array(states_n_b)
done_b = np.array(done_b).astype(int)
q_n_b = agent.predict_q_values(states_n_b) # Action values on the arriving state
targets_b = rewards_b + (1. - done_b) * discount * np.amax(q_n_b, axis=1)
targets = agent.predict_q_values(states_b)
for j, action in enumerate(actions_b):
targets[j, action] = targets_b[j]
t_train = time.time()
loss_v, w1_m, w2_m, w3_m = agent.train(states_b, targets)
train_duration_s[i - batch_size] = time.time() - t_train
state = copy.copy(state_next)
step_durations_s[i] = time.time() - t # Time elapsed during this step
return loss_v, w1_m, w2_m, w3_m, total_reward
env = gym.make("MountainCar-v0")
if upload:
env = wrappers.Monitor(env, results_dir)
n_actions = env.action_space.n
state_dim = env.observation_space.high.shape[0]
value_function = ValueFunctionDQN3(state_dim=state_dim, n_actions=n_actions, batch_size=batch_size)
agent = AgentEpsGreedy(n_actions=n_actions, value_function_model=value_function, eps=0.9)
memory = ReplayMemory(max_size=100000)
loss_per_ep = []
w1_m_per_ep = []
w2_m_per_ep = []
w3_m_per_ep = []
total_reward = []
ep = 0
avg_Rwds = deque([-np.inf] * min_ep_solved, maxlen=min_ep_solved)
avg_Rwds_np = np.array([avg_Rwds[i] for i in range(min_ep_solved)])
avg_Rwd = -np.inf
episode_end_msg = 'loss={:2.10f}, w1_m={:3.1f}, w2_m={:3.1f}, w3_m={:3.1f}, total reward={}'
msg_progress = "Episode {:5d} finished with a reward of {:6.1f}. Reward over the last {} episodes: Avg={:4.2f}, Var={:4.2f}." +\
" Minimum of {} reached in {} of the last {} episodes."
while np.any(avg_Rwds_np < min_avg_Rwd) and ep < max_n_ep:
loss_v, w1_m, w2_m, w3_m, cum_R = run_episode(env, agent, None, memory, batch_size=batch_size, discount=discount,
max_step=15000)
if ep >= n_avg_ep:
avg_Rwd = np.mean(total_reward[ep - n_avg_ep:ep])
var_Rwd = np.var(total_reward[ep - n_avg_ep:ep])
avg_Rwds.appendleft(avg_Rwd)
avg_Rwds_np = np.array([avg_Rwds[i] for i in range(min_ep_solved)])
n_solved_eps = np.sum(avg_Rwds_np >= min_avg_Rwd)
print(msg_progress.format(ep, cum_R, n_avg_ep, avg_Rwd, var_Rwd, min_avg_Rwd, n_solved_eps, min_ep_solved))
else:
print("Episode {} with a reward of {}.".format(ep, cum_R))
#print(episode_end_msg.format(loss_v, w1_m, w2_m, w3_m, cum_R))
if agent.eps > 0.0001:
agent.eps *= decay_eps
# Collect episode results
loss_per_ep.append(loss_v)
w1_m_per_ep.append(w1_m)
w2_m_per_ep.append(w2_m)
w3_m_per_ep.append(w3_m)
total_reward.append(cum_R)
ep += 1
env.close()
if upload:
print("Trying to upload results to the scoreboard.")
gym.upload(results_dir, api_key='###')
#####################
# PLOT RESULTS
eps = range(ep)
plt.figure()
plt.subplot(211)
plt.plot(eps, total_reward)
Rwd_avg = movingaverage(total_reward, 100)
plt.plot(eps[len(eps) - len(Rwd_avg):], Rwd_avg)
plt.xlabel("Episode number")
plt.ylabel("Reward per episode")
plt.grid(True)
plt.title("Total reward")
plt.subplot(212)
plt.plot(eps, loss_per_ep)
Loss_avg = movingaverage(loss_per_ep, 100)
plt.plot(eps[len(eps) - len(Loss_avg):], Loss_avg)
plt.xlabel("Episode number")
plt.ylabel("Loss per episode")
plt.grid(True)
plt.title("Value function loss")
#plt.ion()
plt.show()
plt.figure()
plt.plot(w1_m_per_ep, label="Max w1")
plt.plot(w2_m_per_ep, label="Max w2")
plt.plot(w3_m_per_ep, label="Max w3")
plt.legend()
plt.xlabel("Episode number")
plt.ylabel("Max weights")
plt.grid(True)
plt.title("Maximum weight in Layer 1")
plt.show()
input("Press Enter to terminate.")
import os
import numpy as np
def get_last_folder_id(folder_path):
t = 0
for fn in os.listdir(folder_path):
t = max(t, int(fn))
return t
def movingaverage(values, window):
weights = np.repeat(1.0, window)/window
sma = np.convolve(values, weights, 'valid')
return sma
import tensorflow as tf
class ValueFunctionDQN:
def __init__(self, state_dim=2, n_actions=3, batch_size=64):
self.graph = tf.Graph()
with self.graph.as_default():
# Inputs, weights, biases and targets of the ANN
self.x = tf.placeholder(tf.float32, shape=(None, state_dim)) # Single sample
self.train_data = tf.placeholder(tf.float32, shape=(batch_size, state_dim)) # Training batch of samples
self.train_targets = tf.placeholder(tf.float32, shape=(batch_size, n_actions)) # Training batch of targets
#self.l1_weights = tf.Variable(tf.truncated_normal([state_dim, 512], stddev=0.1), trainable=True, name="w1")
self.l1_weights = tf.get_variable(name="w1", shape=[state_dim, 512],
initializer=tf.contrib.layers.xavier_initializer())
self.l1_biases = tf.Variable(tf.zeros([512]), trainable=True, name="b1")
#self.l2_weights = tf.Variable(tf.truncated_normal([512, 256], stddev=0.1), trainable=True, name="w2")
self.l2_weights = tf.get_variable(name="w2", shape=[512, 256],
initializer=tf.contrib.layers.xavier_initializer())
self.l2_biases = tf.Variable(tf.zeros([256]), trainable=True, name="b2")
#self.l3_weights = tf.Variable(tf.truncated_normal([256, n_actions], stddev=0.1), trainable=True, name="w3")
self.l3_weights = tf.get_variable(name="w3", shape=[256, n_actions],
initializer=tf.contrib.layers.xavier_initializer())
self.l3_biases = tf.Variable(tf.zeros([n_actions]), trainable=True, name="b3")
# Interconnection of the various ANN nodes
self.train_prediction = self.model(self.train_data)
# Training calculations
self.loss = tf.reduce_mean(tf.squared_difference(self.train_targets, self.train_prediction))
self.optimizer = tf.train.AdamOptimizer(learning_rate=1e-4).minimize(self.loss)
self.test_prediction = self.model(self.x)
self.w1_max = tf.reduce_max(self.l1_weights)
self.w2_max = tf.reduce_max(self.l2_weights)
self.w3_max = tf.reduce_max(self.l3_weights)
self.init_op = tf.global_variables_initializer()
self.session = None
def model(self, data):
logits1 = tf.matmul(data, self.l1_weights) + self.l1_biases
hidden1 = tf.nn.relu(logits1) # Define units of layer
logits2 = tf.matmul(hidden1, self.l2_weights) + self.l2_biases
hidden2 = tf.nn.relu(logits2) # Define units of layer
return tf.matmul(hidden2, self.l3_weights) + self.l3_biases
def init_tf_session(self):
if self.session is None:
self.session = tf.Session(graph=self.graph)
self.session.run(self.init_op) # Global Variables Initializer (init op)
def predict(self, states):
self.init_tf_session() # Make sure the Tensorflow session exists
feed_dict = {self.x: states}
q = self.session.run(self.test_prediction, feed_dict=feed_dict)
return q
def train(self, states, targets):
self.init_tf_session() # Make sure the Tensorflow session exists
feed_dict = {self.train_data: states, self.train_targets: targets}
[l, _, w1_m, w2_m, w3_m] = self.session.run([self.loss, self.optimizer, self.w1_max, self.w2_max, self.w3_max],
feed_dict=feed_dict)
return [l, w1_m, w2_m, w3_m]
class ValueFunctionDQN3:
# Smaller network: 128 neurons on layer 1, 64 neurons on layer 2
def __init__(self, state_dim=2, n_actions=3, batch_size=64):
self.graph = tf.Graph()
with self.graph.as_default():
# Inputs, weights, biases and targets of the ANN
self.x = tf.placeholder(tf.float32, shape=(None, state_dim)) # Single sample
self.train_data = tf.placeholder(tf.float32, shape=(batch_size, state_dim)) # Training batch of samples
self.train_targets = tf.placeholder(tf.float32, shape=(batch_size, n_actions)) # Training batch of targets
self.l1_weights = tf.get_variable(name="w1", shape=[state_dim, 128],
initializer=tf.contrib.layers.xavier_initializer())
self.l1_biases = tf.Variable(tf.zeros([128]), trainable=True, name="b1")
self.l2_weights = tf.get_variable(name="w2", shape=[128, 64],
initializer=tf.contrib.layers.xavier_initializer())
self.l2_biases = tf.Variable(tf.zeros([64]), trainable=True, name="b2")
self.l3_weights = tf.get_variable(name="w3", shape=[64, n_actions],
initializer=tf.contrib.layers.xavier_initializer())
self.l3_biases = tf.Variable(tf.zeros([n_actions]), trainable=True, name="b3")
# Interconnection of the various ANN nodes
self.train_prediction = self.model(self.train_data)
# Training calculations
self.loss = tf.reduce_mean(tf.squared_difference(self.train_targets, self.train_prediction))
self.optimizer = tf.train.AdamOptimizer(learning_rate=1e-4).minimize(self.loss)
self.test_prediction = self.model(self.x)
self.w1_max = tf.reduce_max(self.l1_weights)
self.w2_max = tf.reduce_max(self.l2_weights)
self.w3_max = tf.reduce_max(self.l3_weights)
self.init_op = tf.global_variables_initializer()
self.session = None
def model(self, data):
logits1 = tf.matmul(data, self.l1_weights) + self.l1_biases
hidden1 = tf.nn.relu(logits1) # Define units of layer
logits2 = tf.matmul(hidden1, self.l2_weights) + self.l2_biases
hidden2 = tf.nn.relu(logits2) # Define units of layer
return tf.matmul(hidden2, self.l3_weights) + self.l3_biases
def init_tf_session(self):
if self.session is None:
self.session = tf.Session(graph=self.graph)
self.session.run(self.init_op) # Global Variables Initializer (init op)
def predict(self, states):
self.init_tf_session() # Make sure the Tensorflow session exists
feed_dict = {self.x: states}
q = self.session.run(self.test_prediction, feed_dict=feed_dict)
return q
def train(self, states, targets):
self.init_tf_session() # Make sure the Tensorflow session exists
feed_dict = {self.train_data: states, self.train_targets: targets}
[l, _, w1_m, w2_m, w3_m] = self.session.run([self.loss, self.optimizer, self.w1_max, self.w2_max, self.w3_max],
feed_dict=feed_dict)
return [l, w1_m, w2_m, w3_m]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment