Skip to content

Instantly share code, notes, and snippets.

Created May 18, 2017 07:17
Show Gist options
  • Save kkweon/c8d1caabaf7b43317bc8825c226045d2 to your computer and use it in GitHub Desktop.
Save kkweon/c8d1caabaf7b43317bc8825c226045d2 to your computer and use it in GitHub Desktop.
Keras Policy Gradient Example
Simple policy gradient in Keras
import gym
import numpy as np
from keras import layers
from keras.models import Model
from keras import backend as K
from keras import utils as np_utils
from keras import optimizers
class Agent(object):
def __init__(self, input_dim, output_dim, hidden_dims=[32, 32]):
"""Gym Playing Agent
input_dim (int): the dimension of state.
Same as `env.observation_space.shape[0]`
output_dim (int): the number of discrete actions
Same as `env.action_space.n`
hidden_dims (list): hidden dimensions
__build_train_fn -> None
It creates a train function
It's similar to defining `train_op` in Tensorflow
__build_network -> None
It create a base model
Its output is each action probability
get_action(state) -> action
fit(state, action, reward) -> None
self.input_dim = input_dim
self.output_dim = output_dim
self.__build_network(input_dim, output_dim, hidden_dims)
def __build_network(self, input_dim, output_dim, hidden_dims=[32, 32]):
"""Create a base network"""
self.X = layers.Input(shape=(input_dim,))
net = self.X
for h_dim in hidden_dims:
net = layers.Dense(h_dim)(net)
net = layers.Activation("relu")(net)
net = layers.Dense(output_dim)(net)
net = layers.Activation("softmax")(net)
self.model = Model(inputs=self.X, outputs=net)
def __build_train_fn(self):
"""Create a train function
It replaces `, y)` because we use the output of model and use it for training.
For example, we need action placeholder
called `action_one_hot` that stores, which action we took at state `s`.
Hence, we can update the same action.
This function will create
`self.train_fn([state, action_one_hot, discount_reward])`
which would train the model.
action_prob_placeholder = self.model.output
action_onehot_placeholder = K.placeholder(shape=(None, self.output_dim),
discount_reward_placeholder = K.placeholder(shape=(None,),
action_prob = K.sum(action_prob_placeholder * action_onehot_placeholder, axis=1)
log_action_prob = K.log(action_prob)
loss = - log_action_prob * discount_reward_placeholder
loss = K.mean(loss)
adam = optimizers.Adam()
updates = adam.get_updates(params=self.model.trainable_weights,
self.train_fn = K.function(inputs=[self.model.input,
def get_action(self, state):
"""Returns an action at given `state`
state (1-D or 2-D Array): It can be either 1-D array of shape (state_dimension, )
or 2-D array shape of (n_samples, state_dimension)
action: an integer action value ranging from 0 to (n_actions - 1)
shape = state.shape
if len(shape) == 1:
assert shape == (self.input_dim,), "{} != {}".format(shape, self.input_dim)
state = np.expand_dims(state, axis=0)
elif len(shape) == 2:
assert shape[1] == (self.input_dim), "{} != {}".format(shape, self.input_dim)
raise TypeError("Wrong state shape is given: {}".format(state.shape))
action_prob = np.squeeze(self.model.predict(state))
assert len(action_prob) == self.output_dim, "{} != {}".format(len(action_prob), self.output_dim)
return np.random.choice(np.arange(self.output_dim), p=action_prob)
def fit(self, S, A, R):
"""Train a network
S (2-D Array): `state` array of shape (n_samples, state_dimension)
A (1-D Array): `action` array of shape (n_samples,)
It's simply a list of int that stores which actions the agent chose
R (1-D Array): `reward` array of shape (n_samples,)
A reward is given after each action.
action_onehot = np_utils.to_categorical(A, num_classes=self.output_dim)
discount_reward = compute_discounted_R(R)
assert S.shape[1] == self.input_dim, "{} != {}".format(S.shape[1], self.input_dim)
assert action_onehot.shape[0] == S.shape[0], "{} != {}".format(action_onehot.shape[0], S.shape[0])
assert action_onehot.shape[1] == self.output_dim, "{} != {}".format(action_onehot.shape[1], self.output_dim)
assert len(discount_reward.shape) == 1, "{} != 1".format(len(discount_reward.shape))
self.train_fn([S, action_onehot, discount_reward])
def compute_discounted_R(R, discount_rate=.99):
"""Returns discounted rewards
R (1-D array): a list of `reward` at each time step
discount_rate (float): Will discount the future value by this rate
discounted_r (1-D array): same shape as input `R`
but the values are discounted
>>> R = [1, 1, 1]
>>> compute_discounted_R(R, .99) # before normalization
[1 + 0.99 + 0.99**2, 1 + 0.99, 1]
discounted_r = np.zeros_like(R, dtype=np.float32)
running_add = 0
for t in reversed(range(len(R))):
running_add = running_add * discount_rate + R[t]
discounted_r[t] = running_add
discounted_r -= discounted_r.mean() / discounted_r.std()
return discounted_r
def run_episode(env, agent):
"""Returns an episode reward
(1) Play until the game is done
(2) The agent will choose an action according to the policy
(3) When it's done, it will train from the game play
env (gym.env): Gym environment
agent (Agent): Game Playing Agent
total_reward (int): total reward earned during the whole episode
done = False
S = []
A = []
R = []
s = env.reset()
total_reward = 0
while not done:
a = agent.get_action(s)
s2, r, done, info = env.step(a)
total_reward += r
s = s2
if done:
S = np.array(S)
A = np.array(A)
R = np.array(R), A, R)
return total_reward
def main():
env = gym.make("CartPole-v0")
input_dim = env.observation_space.shape[0]
output_dim = env.action_space.n
agent = Agent(input_dim, output_dim, [16, 16])
for episode in range(2000):
reward = run_episode(env, agent)
print(episode, reward)
if __name__ == '__main__':
Copy link

byewokko commented Mar 5, 2020

I'm probably using a different version of keras/tf, but I had to fix a couple of things to make the code run.

  1. The code as it is here throws a TypeError: get_updates() got an unexpected keyword argument 'constraints'. I simply commented out the constraints=[], at line 93.
  2. Then I got ValueError: Cannot create a Keras backend function with updates but no outputs during eager execution. To disable eager execution I imported tensorflow and called tf.compat.v1.disable_eager_execution().
  3. Then it throws IndexError: list index out of range because of outputs=[], (line 99). Just change it to outputs=[self.model.output], and it runs fine for me.

Copy link

halataa commented Apr 1, 2020

I'm probably using a different version of keras/tf, but I had to fix a couple of things to make the code run.

  1. The code as it is here throws a TypeError: get_updates() got an unexpected keyword argument 'constraints'. I simply commented out the constraints=[], at line 93.
  2. Then I got ValueError: Cannot create a Keras backend function with updates but no outputs during eager execution. To disable eager execution I imported tensorflow and called tf.compat.v1.disable_eager_execution().
  3. Then it throws IndexError: list index out of range because of outputs=[], (line 99). Just change it to outputs=[self.model.output], and it runs fine for me.

thank you :D

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment