Skip to content

Instantly share code, notes, and snippets.

@kkweon
Created May 18, 2017 07:17
Show Gist options
  • Star 34 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save kkweon/c8d1caabaf7b43317bc8825c226045d2 to your computer and use it in GitHub Desktop.
Save kkweon/c8d1caabaf7b43317bc8825c226045d2 to your computer and use it in GitHub Desktop.
Keras Policy Gradient Example
"""
Simple policy gradient in Keras
"""
import gym
import numpy as np
from keras import layers
from keras.models import Model
from keras import backend as K
from keras import utils as np_utils
from keras import optimizers
class Agent(object):
def __init__(self, input_dim, output_dim, hidden_dims=[32, 32]):
"""Gym Playing Agent
Args:
input_dim (int): the dimension of state.
Same as `env.observation_space.shape[0]`
output_dim (int): the number of discrete actions
Same as `env.action_space.n`
hidden_dims (list): hidden dimensions
Methods:
private:
__build_train_fn -> None
It creates a train function
It's similar to defining `train_op` in Tensorflow
__build_network -> None
It create a base model
Its output is each action probability
public:
get_action(state) -> action
fit(state, action, reward) -> None
"""
self.input_dim = input_dim
self.output_dim = output_dim
self.__build_network(input_dim, output_dim, hidden_dims)
self.__build_train_fn()
def __build_network(self, input_dim, output_dim, hidden_dims=[32, 32]):
"""Create a base network"""
self.X = layers.Input(shape=(input_dim,))
net = self.X
for h_dim in hidden_dims:
net = layers.Dense(h_dim)(net)
net = layers.Activation("relu")(net)
net = layers.Dense(output_dim)(net)
net = layers.Activation("softmax")(net)
self.model = Model(inputs=self.X, outputs=net)
def __build_train_fn(self):
"""Create a train function
It replaces `model.fit(X, y)` because we use the output of model and use it for training.
For example, we need action placeholder
called `action_one_hot` that stores, which action we took at state `s`.
Hence, we can update the same action.
This function will create
`self.train_fn([state, action_one_hot, discount_reward])`
which would train the model.
"""
action_prob_placeholder = self.model.output
action_onehot_placeholder = K.placeholder(shape=(None, self.output_dim),
name="action_onehot")
discount_reward_placeholder = K.placeholder(shape=(None,),
name="discount_reward")
action_prob = K.sum(action_prob_placeholder * action_onehot_placeholder, axis=1)
log_action_prob = K.log(action_prob)
loss = - log_action_prob * discount_reward_placeholder
loss = K.mean(loss)
adam = optimizers.Adam()
updates = adam.get_updates(params=self.model.trainable_weights,
constraints=[],
loss=loss)
self.train_fn = K.function(inputs=[self.model.input,
action_onehot_placeholder,
discount_reward_placeholder],
outputs=[],
updates=updates)
def get_action(self, state):
"""Returns an action at given `state`
Args:
state (1-D or 2-D Array): It can be either 1-D array of shape (state_dimension, )
or 2-D array shape of (n_samples, state_dimension)
Returns:
action: an integer action value ranging from 0 to (n_actions - 1)
"""
shape = state.shape
if len(shape) == 1:
assert shape == (self.input_dim,), "{} != {}".format(shape, self.input_dim)
state = np.expand_dims(state, axis=0)
elif len(shape) == 2:
assert shape[1] == (self.input_dim), "{} != {}".format(shape, self.input_dim)
else:
raise TypeError("Wrong state shape is given: {}".format(state.shape))
action_prob = np.squeeze(self.model.predict(state))
assert len(action_prob) == self.output_dim, "{} != {}".format(len(action_prob), self.output_dim)
return np.random.choice(np.arange(self.output_dim), p=action_prob)
def fit(self, S, A, R):
"""Train a network
Args:
S (2-D Array): `state` array of shape (n_samples, state_dimension)
A (1-D Array): `action` array of shape (n_samples,)
It's simply a list of int that stores which actions the agent chose
R (1-D Array): `reward` array of shape (n_samples,)
A reward is given after each action.
"""
action_onehot = np_utils.to_categorical(A, num_classes=self.output_dim)
discount_reward = compute_discounted_R(R)
assert S.shape[1] == self.input_dim, "{} != {}".format(S.shape[1], self.input_dim)
assert action_onehot.shape[0] == S.shape[0], "{} != {}".format(action_onehot.shape[0], S.shape[0])
assert action_onehot.shape[1] == self.output_dim, "{} != {}".format(action_onehot.shape[1], self.output_dim)
assert len(discount_reward.shape) == 1, "{} != 1".format(len(discount_reward.shape))
self.train_fn([S, action_onehot, discount_reward])
def compute_discounted_R(R, discount_rate=.99):
"""Returns discounted rewards
Args:
R (1-D array): a list of `reward` at each time step
discount_rate (float): Will discount the future value by this rate
Returns:
discounted_r (1-D array): same shape as input `R`
but the values are discounted
Examples:
>>> R = [1, 1, 1]
>>> compute_discounted_R(R, .99) # before normalization
[1 + 0.99 + 0.99**2, 1 + 0.99, 1]
"""
discounted_r = np.zeros_like(R, dtype=np.float32)
running_add = 0
for t in reversed(range(len(R))):
running_add = running_add * discount_rate + R[t]
discounted_r[t] = running_add
discounted_r -= discounted_r.mean() / discounted_r.std()
return discounted_r
def run_episode(env, agent):
"""Returns an episode reward
(1) Play until the game is done
(2) The agent will choose an action according to the policy
(3) When it's done, it will train from the game play
Args:
env (gym.env): Gym environment
agent (Agent): Game Playing Agent
Returns:
total_reward (int): total reward earned during the whole episode
"""
done = False
S = []
A = []
R = []
s = env.reset()
total_reward = 0
while not done:
a = agent.get_action(s)
s2, r, done, info = env.step(a)
total_reward += r
S.append(s)
A.append(a)
R.append(r)
s = s2
if done:
S = np.array(S)
A = np.array(A)
R = np.array(R)
agent.fit(S, A, R)
return total_reward
def main():
try:
env = gym.make("CartPole-v0")
input_dim = env.observation_space.shape[0]
output_dim = env.action_space.n
agent = Agent(input_dim, output_dim, [16, 16])
for episode in range(2000):
reward = run_episode(env, agent)
print(episode, reward)
finally:
env.close()
if __name__ == '__main__':
main()
@byewokko
Copy link

byewokko commented Mar 5, 2020

I'm probably using a different version of keras/tf, but I had to fix a couple of things to make the code run.

  1. The code as it is here throws a TypeError: get_updates() got an unexpected keyword argument 'constraints'. I simply commented out the constraints=[], at line 93.
  2. Then I got ValueError: Cannot create a Keras backend function with updates but no outputs during eager execution. To disable eager execution I imported tensorflow and called tf.compat.v1.disable_eager_execution().
  3. Then it throws IndexError: list index out of range because of outputs=[], (line 99). Just change it to outputs=[self.model.output], and it runs fine for me.

@halataa
Copy link

halataa commented Apr 1, 2020

I'm probably using a different version of keras/tf, but I had to fix a couple of things to make the code run.

  1. The code as it is here throws a TypeError: get_updates() got an unexpected keyword argument 'constraints'. I simply commented out the constraints=[], at line 93.
  2. Then I got ValueError: Cannot create a Keras backend function with updates but no outputs during eager execution. To disable eager execution I imported tensorflow and called tf.compat.v1.disable_eager_execution().
  3. Then it throws IndexError: list index out of range because of outputs=[], (line 99). Just change it to outputs=[self.model.output], and it runs fine for me.

thank you :D

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment