Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Training a Neural Network ATARI Pong agent with Policy Gradients from raw pixels
""" Trains an agent with (stochastic) Policy Gradients on Pong. Uses OpenAI Gym. """
import numpy as np
import cPickle as pickle
import gym
# hyperparameters
H = 200 # number of hidden layer neurons
batch_size = 10 # every how many episodes to do a param update?
learning_rate = 1e-4
gamma = 0.99 # discount factor for reward
decay_rate = 0.99 # decay factor for RMSProp leaky sum of grad^2
resume = False # resume from previous checkpoint?
render = False
# model initialization
D = 80 * 80 # input dimensionality: 80x80 grid
if resume:
model = pickle.load(open('save.p', 'rb'))
model = {}
model['W1'] = np.random.randn(H,D) / np.sqrt(D) # "Xavier" initialization
model['W2'] = np.random.randn(H) / np.sqrt(H)
grad_buffer = { k : np.zeros_like(v) for k,v in model.iteritems() } # update buffers that add up gradients over a batch
rmsprop_cache = { k : np.zeros_like(v) for k,v in model.iteritems() } # rmsprop memory
def sigmoid(x):
return 1.0 / (1.0 + np.exp(-x)) # sigmoid "squashing" function to interval [0,1]
def prepro(I):
""" prepro 210x160x3 uint8 frame into 6400 (80x80) 1D float vector """
I = I[35:195] # crop
I = I[::2,::2,0] # downsample by factor of 2
I[I == 144] = 0 # erase background (background type 1)
I[I == 109] = 0 # erase background (background type 2)
I[I != 0] = 1 # everything else (paddles, ball) just set to 1
return I.astype(np.float).ravel()
def discount_rewards(r):
""" take 1D float array of rewards and compute discounted reward """
discounted_r = np.zeros_like(r)
running_add = 0
for t in reversed(xrange(0, r.size)):
if r[t] != 0: running_add = 0 # reset the sum, since this was a game boundary (pong specific!)
running_add = running_add * gamma + r[t]
discounted_r[t] = running_add
return discounted_r
def policy_forward(x):
h =['W1'], x)
h[h<0] = 0 # ReLU nonlinearity
logp =['W2'], h)
p = sigmoid(logp)
return p, h # return probability of taking action 2, and hidden state
def policy_backward(eph, epdlogp):
""" backward pass. (eph is array of intermediate hidden states) """
dW2 =, epdlogp).ravel()
dh = np.outer(epdlogp, model['W2'])
dh[eph <= 0] = 0 # backpro prelu
dW1 =, epx)
return {'W1':dW1, 'W2':dW2}
env = gym.make("Pong-v0")
observation = env.reset()
prev_x = None # used in computing the difference frame
xs,hs,dlogps,drs = [],[],[],[]
running_reward = None
reward_sum = 0
episode_number = 0
while True:
if render: env.render()
# preprocess the observation, set input to network to be difference image
cur_x = prepro(observation)
x = cur_x - prev_x if prev_x is not None else np.zeros(D)
prev_x = cur_x
# forward the policy network and sample an action from the returned probability
aprob, h = policy_forward(x)
action = 2 if np.random.uniform() < aprob else 3 # roll the dice!
# record various intermediates (needed later for backprop)
xs.append(x) # observation
hs.append(h) # hidden state
y = 1 if action == 2 else 0 # a "fake label"
dlogps.append(y - aprob) # grad that encourages the action that was taken to be taken (see if confused)
# step the environment and get new measurements
observation, reward, done, info = env.step(action)
reward_sum += reward
drs.append(reward) # record reward (has to be done after we call step() to get reward for previous action)
if done: # an episode finished
episode_number += 1
# stack together all inputs, hidden states, action gradients, and rewards for this episode
epx = np.vstack(xs)
eph = np.vstack(hs)
epdlogp = np.vstack(dlogps)
epr = np.vstack(drs)
xs,hs,dlogps,drs = [],[],[],[] # reset array memory
# compute the discounted reward backwards through time
discounted_epr = discount_rewards(epr)
# standardize the rewards to be unit normal (helps control the gradient estimator variance)
discounted_epr -= np.mean(discounted_epr)
discounted_epr /= np.std(discounted_epr)
epdlogp *= discounted_epr # modulate the gradient with advantage (PG magic happens right here.)
grad = policy_backward(eph, epdlogp)
for k in model: grad_buffer[k] += grad[k] # accumulate grad over batch
# perform rmsprop parameter update every batch_size episodes
if episode_number % batch_size == 0:
for k,v in model.iteritems():
g = grad_buffer[k] # gradient
rmsprop_cache[k] = decay_rate * rmsprop_cache[k] + (1 - decay_rate) * g**2
model[k] += learning_rate * g / (np.sqrt(rmsprop_cache[k]) + 1e-5)
grad_buffer[k] = np.zeros_like(v) # reset batch gradient buffer
# boring book-keeping
running_reward = reward_sum if running_reward is None else running_reward * 0.99 + reward_sum * 0.01
print 'resetting env. episode reward total was %f. running mean: %f' % (reward_sum, running_reward)
if episode_number % 100 == 0: pickle.dump(model, open('save.p', 'wb'))
reward_sum = 0
observation = env.reset() # reset env
prev_x = None
if reward != 0: # Pong has either +1 or -1 reward exactly when game ends.
print ('ep %d: game finished, reward: %f' % (episode_number, reward)) + ('' if reward == -1 else ' !!!!!!!!')
Copy link

Grsz commented Nov 23, 2020

I'm trying to implement a model with Tensorflow following this gist. I'm trying to do it in a more general way to support cases where there can be more than 2 actions, so using sparse categorical cross entropy. I've been spending weeks on this, but can't make it work. Done all sorts of research, tried a lot of approaches (made 12 versions of it - surprisingly found a version which outperforms the original - using MSE for loss, and instead of subtracting the previous state from the current state, to get the state, it just uses the current state, but it should be just accidental luck), did lot of testing, but it performs terrible, actually, it doesn't learn at all.

Here's the code:

import tensorflow as tf
import numpy as np
import gym

class Model(tf.keras.Model):
  def __init__(self, h_units, y_size):

    self.whs = [tf.keras.layers.Dense(h_size, 'relu') for h_size in h_units]
    self.wy = tf.keras.layers.Dense(y_size, 'sigmoid')

  def call(self, x):
    for wh in self.whs:
      x = wh(x)
    y = self.wy(x)

    return y

def prepro(state):
  """ prepro 210x160x3 uint8 frame into 6400 (80x80) 1D float vector """
  state = state[35:195] # crop
  state = state[::2,::2,0] # downsample by factor of 2
  state[state == 144] = 0 # erase background (background type 1)
  state[state == 109] = 0 # erase background (background type 2)
  state[state != 0] = 1 # everything else (paddles, ball) just set to 1
  return state.ravel().reshape([1, -1])

class Environment:
  def __init__(self, state_preprocessor):
    self.env = gym.make('Pong-v0')
    self.state_preprocessor = state_preprocessor

    self.prev_s = None

  def init(self):
    cur_s = self.env.reset()
    cur_s = self.state_preprocessor(cur_s)

    s = cur_s - tf.zeros_like(cur_s)

    self.prev_s = cur_s

    return s

  def step(self, action):
    cur_s, r, done, info = self.env.step(action + 2)
    cur_s = self.state_preprocessor(cur_s)
    s = cur_s - self.prev_s

    self.prev_s = cur_s

    return s, r, done

# Runs model (policy) with x (input),
# samples y (action) from y_pred (probability of actions).
# Takes:
#  - x - the input, 1D tensor of the current state
#  - model - policy, returns probability of actions from state
#  - loss function to calculate loss
# Returns:
#  - gradients of model's weights based on loss
#  - loss from y, and y_pred with loss_fn
#  - y - 1D tensor, the sampled value what the action should be
def sample_action(x, model):
  y_pred = model(x)
  samples = tf.random.uniform([1])

  y = y_pred - samples
  y = tf.reshape(tf.argmax(y, 1), [-1, 1])

  return y

def get_gradients(x, y, r, model, loss_fn):
  with tf.GradientTape() as tape:
    y_pred = model(x)
    loss = loss_fn(y, y_pred, r)

  gradients = tape.gradient(loss, model.trainable_variables)

  return gradients

# Discounting later rewards more than sooner.
# Because the final reward happened much more likely
# because of a recent action than one at the beginning.
# Takes:
#  - full value rewards of timesteps
#  - discount multiplier
# Returns:
#  - discounted sum of rewards of timesteps

def discount_rewards(d):
  def discount(rs):
    d_rs = np.zeros_like(rs)
    sum_rt = 0
    for t in reversed(range(rs.shape[0])):
      if rs[t] != 0: sum_rt = 0
      # add rt to the discounted sum of rewards at t
      sum_rt = sum_rt * d + rs[t]
      d_rs[t] = sum_rt
    d_rs -= np.mean(d_rs)
    d_rs /= np.std(d_rs)
    return d_rs
  return discount

model = Model([200], 2)
env = Environment(prepro)
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(reduction=tf.keras.losses.Reduction.NONE)
optimizer = tf.keras.optimizers.RMSprop(0.0001, 0.99)
discounter = discount_rewards(0.99)

epochs = 10000
batch_size = 10

# Train runs the model with the environment,
# collects gradients per execution, and optimizes
# the model's weights at each epoch.
# Takes:
#  - model (policy), which takes the input x (state), and returns y (action)
#  - environment, which performs the action, and returns the new state, and reward
#    must have methods:
#     - init() - initialize the state
#     - step(action) - perform the action, return new state, reward, and indicator if episode is over

def train(model, env, loss_fn, optimizer, discounter, epochs, batch_size):
  for i in range(epochs):
    xs = []
    ys = []
    rs = []
    ep_rs = []

    for e in range(batch_size):
      done = False
      x = env.init()
      ep_r = 0

      while not done:
        y = sample_action(x, model)

        x, r, done = env.step(y.numpy().astype('int'))

        ep_r += r
      print('Epoch:', i, 'Episode:', e, 'Reward:', ep_r)
    xs = tf.concat(xs, 0)
    ys = tf.concat(ys, 0)
    rs = np.vstack(rs)
    rs = discounter(rs)

    gradients = get_gradients(xs, ys, rs, model, loss_fn)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    print('Epoch:', i, 'Avg episode reward:', np.array(ep_rs).mean())

train(model, env, loss_fn, optimizer, discounter, epochs, batch_size)

So in summary, set up a model with 2 dense layers, output the number of possible actions, subtract the random number from the output, get the index of the highest one, use it as the action. Run the environment with it, get the next state, and reward. Store all selected actions, rewards, states. If the number of episodes reach the batch size, rerun the model with the collected states, get the predictions, run sparse categorical cross entropy with the selected actions, and predictions, use the discounted rewards as sample weights for the losses, get the gradients, optimize the weights. Repeat.

During making those versions, and testing them, I realized that somehow the initial version makes a lot better random guesses before any training (10 episodes) with a mean of 20, while the tensorflow version keeps guessing -21 for all 10 episodes in multiple independent tries.

All makes sense, but doesn't work. Why?

Copy link

eabase commented May 15, 2021

@Grsz That is a perfect question for StackOverflow...

Copy link

Python3 version of with the minimum changes to make it work:
Still a great policy gradient blog post and python script - but Python2 is so 2016 : )
Most folks reading this now will fire it up in python3 and blow up and not get the fun experience

Copy link

For those interested in seeing this implemented on top of TensorFlow 2 running entirely on graph mode here's the repo:

The AI trained fairly quickly, in a day it already reached average return of ~14 points. But then it stops there and doesn't quite improve much after all. Not sure on how to further improve it then, other than keep tweaking the hyperparams.

Copy link

SeunghyunSEO commented May 19, 2022

May 19th 2022
I modify some lines of because this is too old (but gold).
In my case rendering option did not work because of openai-gym issue.
pls check this code if you want to train agent playing pong in py38, gym>=0.21.0

Copy link

In case someone wants to share a cool colab demo still - here is my notebook that ended up achieving level performance with the human opponent

Copy link

I created a variation of the original demo by adding another layer of hidden variables. This new variations converges much faster than the original solution. I also fixed a bunch of bugs/issues introduced because of new versions of libraries. Have fun!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment