Skip to content

Instantly share code, notes, and snippets.

@xanderex-sid
Created September 23, 2024 17:02
Show Gist options
  • Save xanderex-sid/ae6cd3ea0c3759c1e3f92835ebd6e158 to your computer and use it in GitHub Desktop.
Save xanderex-sid/ae6cd3ea0c3759c1e3f92835ebd6e158 to your computer and use it in GitHub Desktop.
PyTorch - Training a NN Pong agent with Policy Gradients from raw pixels
import pickle
import numpy as np
import torch
import gym
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
batch_size = 10 # every how many episodes to do a param update?
learning_rate = 1e-3
gamma = 0.95 # discount factor for reward
resume = False # resume from previous checkpoint?
render = False
device = "cuda" if torch.cuda.is_available() else "cpu"
class MLP(nn.Module):
def __init__(self):
super(MLP, self).__init__()
self.fc1 = nn.Linear(80 * 80 * 1, 200)
self.fc2 = nn.Linear(200, 1)
def forward(self, x):
# Ensure the input is in float format
x = x.float()
x = F.relu(self.fc1(x)) # Hidden layer (200)
x = torch.sigmoid(self.fc2(x)) # Single output with sigmoid
return x
def prepro_mlp(I):
""" prepro 210x160x3 uint8 frame into 6400 (80x80) 1D float vector """
I = I[35:195] # crop
I = I[::2,::2,0] # downsample by factor of 2 --- (80, 80)
I[I == 144] = 0 # erase background (background type 1)
I[I == 109] = 0 # erase background (background type 2)
I[I != 0] = 1 # everything else (paddles, ball) just set to 1 -- still of shape (80, 80)
return torch.tensor(I.astype(float).ravel()).unsqueeze(0)
def preprocess_CNN(I): # if you want to create a CNN for policy network use this preprocessing
""" prepro 210x160x3 uint8 frame into torch (80x80) vector """
I = I[35:195] # crop
I = I[:,:,0] # Only take first channel
I[I == 144] = 0 # erase background (background type 1)
I[I == 109] = 0 # erase background (background type 2)
I[I != 0] = 1 # everything else (paddles, ball) just set to 1
I = torch.tensor(I.astype(float)).unsqueeze(0).unsqueeze(0) # (1, 1, 160, 160) shape
return I
def discount_rewards_v1(r):
""" take torch tensor of shape (batch, reward) and compute discounted reward """
discounted_r = torch.zeros_like(r).to(device)
running_add = 0.0
for t in reversed(range(0, r.size(0))):
if r[t] != 0.0: running_add = 0.0 # reset the sum, since this was a game boundary (pong specific!)
running_add = running_add * gamma + r[t]
discounted_r[t] = running_add
return discounted_r
# starting environment
env = gym.make("ALE/Pong-v5")
observation = env.reset()
prev_x = None # used in computing the difference frame
running_reward = None
reward_sum = 0
episode_number = 0
# defining model
if resume: model = pickle.load(open('./saveMLP_v1.p', 'rb'))
else: model = MLP()
model.to(device)
bce_loss = nn.BCELoss(reduction='none') # reduction='none' will calculate BCE loss element-wise
# optimizer = optim.Adam(model.parameters(), lr=learning_rate)
optimizer = optim.RMSprop(model.parameters(), lr=learning_rate, alpha=0.99, eps=1e-08)
total_params = sum(p.numel() for p in model.parameters())
print(f"Total number of parameters: {total_params}") # calculate total parameters
cnt_matches = 0
while True:
if render: env.render()
# preprocess the observation, set input to network to be difference image
cur_x = prepro_mlp(observation)
x = cur_x - prev_x if prev_x is not None else torch.zeros_like(cur_x) # (1, 1, 160, 160)
if prev_x is None: xs = x
else: xs = torch.cat((xs, x), dim=0) # creating batch of input of a each match # if our AI looses all matches then 1 episode = 21 match. why ??
# forward the model to get the probability of y = 1
x = x.to(device)
aprob = model(x)
action = 2 if np.random.uniform() < aprob.item() else 3 # roll the dice!
# store model output
if prev_x is None: batch_probs = aprob
else: batch_probs = torch.cat((batch_probs, aprob), dim=0).to(device) # (batch, probs)
# y is "fake label" -- considering this as a label for this observation --- to move "UP" y = 1
y = 1 if action == 2 else 0
y = torch.tensor([y], dtype=torch.float).unsqueeze(0).to(device) # (batch, target)
if prev_x is None: ys = y
else: ys = torch.cat((ys, y), dim=0).to(device) # creating batch of target of a each match -- (batch, target)
# step the environment and get new measurements
observation, reward, done, info = env.step(action) # done=True when episode will end, an episode equals to a full game played until one wins 21 matches.
reward_sum += reward
# storing reward of each match
if reward != 0: cnt_matches += 1
reward = torch.tensor([reward], dtype=torch.float).unsqueeze(0).to(device) # (batch, reward)
if prev_x is None: rs = reward
else: rs = torch.cat((rs, reward), dim=0).to(device) # creating batch of rewards of a each match -- (batch, target)
prev_x = cur_x
if done: # one episode i.e. one game is finished
episode_number += 1
discounted_rs = discount_rewards_v1(rs) # rs -- torch.size((batch, rewards))
discounted_rs -= torch.mean(discounted_rs)
discounted_rs /= torch.std(discounted_rs) # discounted_rs like weights for loss of each batch
# Compute the loss for each element
loss_per_element = bce_loss(batch_probs, ys) # e.g. batch_probs - shape: (30, 1), ys - shape: (30, 1), loss_per_element - (30, 1)
weighted_loss = loss_per_element * discounted_rs.to(device) # e.g. discounted_rs - shape: (30, 1), weighted_loss - (30, 1)
loss = weighted_loss.mean()
# Backward pass
loss.backward() # store the calculated gradient of each parameter
if episode_number % batch_size == 0: # update parameters of model when episode is multiple of batch_size
# update weights of the model
optimizer.step() # Update weights
optimizer.zero_grad() # Reset gradients, as for batch_size (e.g. 10), gradient of each episode was accumulating.
print("loss :", loss.item())
# boring book-keeping
running_reward = reward_sum if running_reward is None else running_reward * 0.99 + reward_sum * 0.01
print('resetting env. episode reward total was %f. running mean: %f' % (reward_sum, running_reward))
print(f'total matches played in this episode: {cnt_matches}, Note: To change episode, one player should win 21 matches.')
if episode_number % 5 == 0: pickle.dump(model, open('./saveMLP_v2.p', 'wb'))
if episode_number == 200:
break
reward_sum = 0
cnt_matches = 0
observation = env.reset() # reset env
prev_x = None
xs = None
rs = None
ys = None
batch_probs = None
if reward != 0: # Pong has either +1 or -1 reward exactly when game ends.
print('ep %d: (match %d) game finished, reward: %f' % (episode_number, cnt_matches, reward) + ('' if reward == -1 else ' !!!!!!!!'))
@xanderex-sid
Copy link
Author

xanderex-sid commented Sep 23, 2024

The above code is PyTorch version of - https://gist.github.com/karpathy/a4166c7fe253700972fcbc77e4ea32c5

You can use the code above to create various types of CNN or MLP models to train your ATARI Pong agent. The code is compatible with GPU usage as well.

It has been successfully training on Google Colab, so feel free to use it in your experiments. If you find any bugs or have suggestions for improvements, please let me know, as it will help me enhance my skills.

To test your agent against the default OpenAI Pong agent, you can use the code provided below.

import numpy as np
import pickle
import torch
import gym
import torch.nn as nn
import torch.nn.functional as F

# Hyperparameters
H = 200  # number of hidden layer neurons
gamma = 0.99  # discount factor for reward

class MLP(nn.Module):
    def __init__(self):
        super(MLP, self).__init__()

        self.fc1 = nn.Linear(80 * 80 * 1, 200)
        self.fc2 = nn.Linear(200, 1)

    def forward(self, x):
        # Ensure the input is in float format
        x = x.float()
        # mlp layers
        x = F.relu(self.fc1(x))  # Hidden layer (200)
        x = torch.sigmoid(self.fc2(x))  # Single output with sigmoid
        return x

# Load pre-trained model
model_file = 'saveMLP_v1.p'
with open(model_file, 'rb') as f:
    model = pickle.load(f)

# Helper functions
def sigmoid(x):
    return 1.0 / (1.0 + np.exp(-x))  # sigmoid function

def prepro(I):
  """ prepro 210x160x3 uint8 frame into 6400 (80x80) 1D float vector """
  I = I[0]
  I = I[35:195] # crop
  I = I[::2,::2,0] # downsample by factor of 2
  I[I == 144] = 0 # erase background (background type 1)
  I[I == 109] = 0 # erase background (background type 2)
  I[I != 0] = 1 # everything else (paddles, ball) just set to 1
  return I.astype(float).ravel()

def prepro_mlp(I):
  """ prepro 210x160x3 uint8 frame into 6400 (80x80) 1D float vector """
  I = I[35:195] # crop
  I = I[::2,::2,0] # downsample by factor of 2 --- (80, 80)
  I[I == 144] = 0 # erase background (background type 1)
  I[I == 109] = 0 # erase background (background type 2)
  I[I != 0] = 1 # everything else (paddles, ball) just set to 1 -- still of shape (80, 80)
  return torch.tensor(I.astype(float).ravel()).unsqueeze(0)

def policy_forward(x):
    """Forward pass to get action probability from model."""
    h = np.dot(model['W1'], x)
    h[h < 0] = 0  # ReLU nonlinearity
    logp = np.dot(model['W2'], h)
    p = sigmoid(logp)
    return p  # Probability of moving up

# Setup the environment
env = gym.make('ALE/Pong-v5', render_mode='human')
observation = env.reset()[0]
prev_x = None  # Used to calculate the difference between frames

while True:
    env.render()
    # Preprocess the observation
    cur_x = prepro_mlp(observation)
    x = cur_x - prev_x if prev_x is not None else torch.zeros_like(cur_x) #np.zeros(80 * 80)
    prev_x = cur_x

    # Use the pre-trained model to decide an action
    aprob = model(x) #policy_forward(x)
    action = 2 if np.random.uniform() < aprob.item() else 3  # Move up or down (action 2 = UP, action 3 = DOWN)
    # print(env.step(action))
    # Take the action and observe the result
    observation, reward, done, _, info = env.step(action)
    if done:
        observation = env.reset()[0]
        prev_x = None  # Reset frame difference

@eabase
Copy link

eabase commented Oct 29, 2024

Great! Thank you. But can you also provide a brief summary of the steps to install this from scratch?

@morawi
Copy link

morawi commented Oct 29, 2024

h[h < 0] = 0 # ReLU nonlinearity

Worked immediately out of the box.
How did you perform against the default OpenAI Pong agent?

@xanderex-sid
Copy link
Author

xanderex-sid commented Oct 29, 2024

@morawi
https://github.com/xanderex-sid/reinforcement_learning/blob/main/pong_rl/myagent_vs_openAIagent.py
Use this code, with following steps:-

  1. first create same custom MLP/CNN model that you used for training in class MLP.
  2. Then load the weights of trained model
  3. run the code, it should work.

My performance -
It didn’t perform well; out of 19 games, I won only 3. The main reason for this was that I didn’t train it long enough, but I’m confident it could beat the default OpenAI agent with extended training.

@xanderex-sid
Copy link
Author

xanderex-sid commented Oct 29, 2024

@eabase
If you are asking for dependencies, other than standard pip installation for torch, numpy, pickle library, you need to also do -
pip install "gym[accept-rom-license, atari]".
Also,
I am also thinking of creating a blog on this, in detail, and will comment the link here.

@morawi
Copy link

morawi commented Oct 30, 2024

@eabase If you are asking for dependencies, other than standard pip installation for torch, numpy, pickle library, you need to also do - pip install "gym[accept-rom-license, atari]". Also, I am also thinking of creating a blog on this, in detail, and will comment the link here.

I think gym has removed atari/pong in its latest versions. Hence, perhaps one must rely on earlier releases, as in:
pip install gym[atari,accept-rom-license]==0.21.0

See https://stackoverflow.com/questions/69442971/error-in-importing-environment-openai-gym

@eabase
Copy link

eabase commented Nov 1, 2024

@morawi
From another project I learned that gym is outdated, and need to be replaced by gymnasium, like this:

# pip install -U gymnasium
# The in your code, use:

import gymnasium as gym
from gym import wrappers

So not sure what would happen with the additional arguments shown above.

@morawi
Copy link

morawi commented Nov 1, 2024

@morawi From another project I learned that gym is outdated, and need to be replaced by gymnasium, like this:

# pip install -U gymnasium
# The in your code, use:

import gymnasium as gym
from gym import wrappers

So not sure what would happen with the additional arguments shown above.

Yes, interesting. I asked ChatGPT,

Does OpenAi gym last version (0.26) contain pong game environment?

ChatGPT said:
ChatGPT
As of version 0.26, OpenAI Gym does not include Atari games like Pong by default. Atari environments were previously part of Gym but were moved to a separate package, gymnasium[atari] (under the Gymnasium library) or can be accessed via ale-py, the Arcade Learning Environment for Python.

If you want to use Pong or other Atari games, you need to install these additional packages:

bash
Copy code
pip install gymnasium[atari]

Then, you can load Pong using Gymnasium's API:

python
Copy code
import gymnasium as gym

env = gym.make("ALE/Pong-v5")  # Make sure to use the correct environment ID

This setup will allow you to work with Atari environments in the latest Gymnasium versions.

You said:
So, I need to pip install ale-py too?

ChatGPT said:
ChatGPT
Yes, you'll need to install ale-py alongside gymnasium[atari] to run Atari environments like Pong. The ale-py package provides the backend for the Arcade Learning Environment, which is required for these games.

Here's how to install both:

bash
Copy code
pip install gymnasium[atari] ale-py

This should give you access to Atari games through Gymnasium, and you'll be able to load the Pong environment with:

python
Copy code
import gymnasium as gym

env = gym.make("ALE/Pong-v5")

This should set everything up to use Pong and other Atari games in Gymnasium.

@eabase
Copy link

eabase commented Nov 2, 2024

I'm already confused. So what exactly are the steps to get this running. And how to use the 2 programs above?

@xanderex-sid
Copy link
Author

xanderex-sid commented Nov 9, 2024

Hi @eabase @morawi ,
Here is the link to the Colab notebook I used for training, it is still working, and I ran it today to check. -
https://colab.research.google.com/drive/1OYYd_-yVAon8gEcgsTB8bq0IPdycwm00?usp=sharing
See if it helps !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment