-
-
Save CPPAlien/91388eb16a85e80ec55689069bda0c25 to your computer and use it in GitHub Desktop.
Training a Neural Network ATARI Pong agent with Policy Gradients from raw pixels
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import torch | |
import torch.nn as nn | |
import torch.optim as optim | |
import gym | |
import numpy as np | |
import pickle | |
# Hyperparameters | |
H = 200 # Number of hidden layer neurons | |
batch_size = 10 # Every how many episodes to do a param update? | |
learning_rate = 1e-4 | |
gamma = 0.99 # Discount factor for reward | |
decay_rate = 0.99 # Decay factor for RMSProp leaky sum of grad^2 | |
resume = False # Resume from previous checkpoint? | |
render = False | |
# Model initialization | |
D = 80 * 80 # Input dimensionality: 80x80 grid | |
model = nn.Sequential( | |
nn.Linear(D, H), | |
nn.ReLU(), | |
nn.Linear(H, 2) # Output layer for two actions | |
) | |
optimizer = optim.RMSprop(model.parameters(), lr=learning_rate, alpha=decay_rate) | |
if resume: | |
model.load_state_dict(torch.load('save.p')) | |
def prepro(I): | |
""" Preprocess 210x160x3 uint8 frame into 6400 (80x80) 1D float vector """ | |
I = np.array(I[0]) if isinstance(I, tuple) else np.array(I) # Ensure I is a NumPy array | |
I = I[35:195] | |
if I.ndim == 3: # Check if I has 3 dimensions | |
I = I[::2, ::2, 0] # Downsample by factor of 2 | |
else: # Handle the case where I is 2-dimensional | |
I = I[::2, ::2] # Downsample accordingly | |
I[I == 144] = 0 # Erase background (background type 1) | |
I[I == 109] = 0 # Erase background (background type 2) | |
I[I != 0] = 1 # Everything else (paddles, ball) just set to 1 | |
return torch.FloatTensor(I).view(-1) # Convert to PyTorch tensor and flatten | |
def discount_rewards(r): | |
""" Take 1D float array of rewards and compute discounted reward """ | |
discounted_r = np.zeros_like(r) | |
running_add = 0 | |
for t in reversed(range(r.size)): | |
if r[t] != 0: running_add = 0 # Reset the sum, since this was a game boundary | |
running_add = running_add * gamma + r[t] | |
discounted_r[t] = running_add | |
return discounted_r | |
def policy_forward(x): | |
h = model(x) | |
p = torch.softmax(h, dim=0) # Apply softmax to get action probabilities | |
return p | |
env = gym.make("PongNoFrameskip-v4") | |
observation = env.reset() | |
prev_x = None # Used in computing the difference frame | |
xs, hs, dlogps, drs = [], [], [], [] | |
running_reward = None | |
reward_sum = 0 | |
episode_number = 0 | |
while True: | |
if render: env.render() | |
# Preprocess the observation, set input to network to be difference image | |
cur_x = prepro(observation) | |
x = cur_x - prev_x if prev_x is not None else torch.zeros(D) | |
prev_x = cur_x | |
# Forward the policy network and sample an action from the returned probability | |
aprob = policy_forward(x) | |
action = 2 if torch.rand(1).item() < aprob[1] else 3 # Roll the dice! | |
# Record various intermediates (needed later for backprop) | |
xs.append(x) # Observation | |
hs.append(aprob) # Hidden state | |
y = 1 if action == 2 else 0 # A "fake label" | |
dlogps.append(y - aprob[1]) # Grad that encourages the action that was taken | |
# Step the environment and get new measurements | |
result = env.step(action) # Capture all returned values | |
observation, reward, done, info = result[:4] # Unpack the first four values | |
reward_sum += reward | |
drs.append(reward) # Record reward | |
if done: # An episode finished | |
episode_number += 1 | |
# Stack together all inputs, hidden states, action gradients, and rewards for this episode | |
epx = torch.stack(xs) | |
eph = torch.stack(hs) | |
epdlogp = torch.stack(dlogps) | |
epr = torch.FloatTensor(drs) | |
xs, hs, dlogps, drs = [], [], [], [] # Reset array memory | |
# Compute the discounted reward backwards through time | |
discounted_epr = discount_rewards(epr.numpy()) | |
discounted_epr = torch.FloatTensor(discounted_epr) | |
discounted_epr -= discounted_epr.mean() # Standardize the rewards | |
discounted_epr /= discounted_epr.std() | |
epdlogp *= discounted_epr # Modulate the gradient with advantage | |
# Backpropagation | |
optimizer.zero_grad() | |
loss = -torch.sum(epdlogp * torch.log(aprob[1])) # Negative log likelihood | |
loss.backward() | |
optimizer.step() | |
# Boring bookkeeping | |
running_reward = reward_sum if running_reward is None else running_reward * 0.99 + reward_sum * 0.01 | |
print('Resetting env. Episode reward total was %f. Running mean: %f' % (reward_sum, running_reward)) | |
if episode_number % 100 == 0: torch.save(model.state_dict(), 'save.p') | |
reward_sum = 0 | |
observation = env.reset() # Reset env | |
prev_x = None | |
if reward != 0: # Pong has either +1 or -1 reward exactly when game ends. | |
print('Episode %d: Game finished, reward: %f' % (episode_number, reward) + ('' if reward == -1 else ' !!!!!!!!')) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I had a quick skim. You appear to be minimizing the negative log likehood. However, I believe you should be maximizing this.
i.e.
loss = torch.sum(epdlogp * torch.log(aprob[1]))
Also, I'm not sure what
aprob[1]
means here, it looks like it is the action probability for the second action? You might want to double check this is right.