Pong from Pixels in Pytorch
"### Pong from pixels in Pytorch - Part 1\n",
"This notebook is based on Andrej's Karpathy gist \n",
"Main difference is that it uses PyTorch. \n",
"Necessary imports - OpenAI's Gym, PyTorch and NumPy"
"import gym\n",
"import torch\n",
"import torch.nn as nn\n",
"import torch.optim as optim\n",
"import torch.nn.functional as F\n",
"from math import log\n",
"import numpy as np"
"This is a simple 2 layer fully-connected neural network such as was used in original Karpathy's blogpost"
"class PolicyNetK(nn.Module):\n",
" \"\"\"Predict the probability of moving UP\"\"\"\n",
" def __init__(self):\n",
" super(PolicyNetK, self).__init__() \n",
" self.fc1 = nn.Linear(80*80, 100, bias=False)\n",
" self.fc2 = nn.Linear(100, 1, bias=False)\n",
" nn.init.xavier_uniform_(self.fc1.weight)\n",
" nn.init.xavier_uniform_(self.fc2.weight)\n",
" \n",
" def forward(self, x): \n",
" x = F.relu(self.fc1(x))\n",
" logp = self.fc2(x)\n",
" return torch.sigmoid(logp) # the output is p_up (sigmoid puts it into (0,1) )\n",
"pnet = PolicyNetK()\n",
"# This part will move the network to the NVIDIA GPU if you have one\n",
"device = torch.device(\"cuda:0\" if torch.cuda.is_available() else \"cpu\")\n",
"# from Karpathy's post, preprocessing function\n",
"def prepro(I):\n",
" \"\"\" prepro 210x160x3 uint8 frame into 6400 (80x80) 1D float vector \"\"\"\n",
" I = I[35:195] # crop\n",
" I = I[::2,::2,0] # downsample by factor of 2 \n",
" I[I == 144] = 0 # erase background (background type 1)\n",
" I[I == 109] = 0 # erase background (background type 2)\n",
" I[I != 0] = 1 # everything else (paddles, ball) just set to 1\n",
" return I.astype(np.float).ravel() \n",
"# from Karpathy's post, discounting rewards function\n",
"def discount_rewards(r):\n",
" gamma = 0.99 # discount factor for reward\n",
" \"\"\" take 1D float array of rewards and compute discounted reward \"\"\"\n",
" discounted_r = np.zeros_like(r)\n",
" running_add = 0\n",
" for t in reversed(range(0, r.size)):\n",
" if r[t] != 0: running_add = 0 # reset the sum, since this was a game boundary (pong specific!)\n",
" running_add = running_add * gamma + r[t]\n",
" discounted_r[t] = running_add\n",
" return discounted_r"
"env = gym.make(\"Pong-v0\")\n",
"observation = env.reset()\n",
"prev_x = None\n",
"episode_number = 0\n",
"batch_size = 4\n",
"lr = 1e-4\n",
"p_ups, fake_labels, rewards = [], [], []"
"Minimizing BCELoss is equivalent to maximizing logP if network outputs P.\n",
"Note that this variant of loss expects probability not logits\n",
"reduction='none' because we need to weight by advantage before reducing\n",
"therefore, we'll reduce manually later"
"loss_computator = torch.nn.BCELoss(reduction='none') \n",
"optimizer = optim.RMSprop(pnet.parameters(), lr=lr)\n",
"running_reward = None\n",
"import os.path\n",
"if os.path.isfile(\"\"):\n",
" pnet.load_state_dict(torch.load(\"\"))\n",
"while True:\n",
" if episode_number % 100 == 0 and episode_number>0: \n",
", \"\")\n",
" # to see your AI play game's built-in AI call env.render()\n",
" env.render()\n",
" \n",
" # create input for the policy network (e.g. difference between two frames)\n",
" cur_x = prepro(observation)\n",
" x = cur_x - prev_x if prev_x is not None else np.zeros_like(cur_x) \n",
" prev_x = cur_x \n",
" x = torch.from_numpy(x).float().to(device) \n",
" \n",
" p_up = pnet(x) # probability of going UP. P_up \n",
" p_ups.append(p_up)\n",
" action = 2 if np.random.uniform() <[0] else 3\n",
" y = 1.0 if action == 2 else 0.0 # fake label\n",
" fake_labels.append(y)\n",
" \n",
" observation, reward, done, info = env.step(action) \n",
" rewards.append(reward) # reward for action can be seen after executing action\n",
" \n",
" if done: # an episode finished \n",
" episode_number += 1 \n",
" eprewards = np.vstack(rewards) \n",
" discounted_epr = discount_rewards(eprewards)\n",
" discounted_epr -= np.mean(discounted_epr)\n",
" discounted_epr /= np.std(discounted_epr) \n",
" lx = torch.stack(p_ups).float().to(device).squeeze(1)\n",
" ly = torch.tensor(fake_labels).float().to(device)\n",
" losses = loss_computator(lx, ly)\n",
" t_discounted_epr = torch.from_numpy(discounted_epr).squeeze(1).float().to(device)\n",
" losses *= t_discounted_epr \n",
" loss = torch.mean(losses) \n",
" reward_sum = sum(rewards)\n",
" running_reward = reward_sum if running_reward is None else running_reward * 0.99 + reward_sum * 0.01\n",
" if episode_number % 10 == 1: \n",
" print(\"EPNUM: {0}, LOSS: {1}. REWARDS: {2} RUNNING_REWARDS: {3}\"\n",
" .format(episode_number, loss, reward_sum, running_reward)) \n",
" \n",
" loss.backward(torch.tensor(1.0/batch_size).to(device))\n",
" if episode_number % batch_size == 0 and episode_number > 0: \n",
" optimizer.step()\n",
" optimizer.zero_grad() \n",
" \n",
" p_ups, fake_labels, rewards = [], [], [] # reset\n",
" observation = env.reset() # reset env\n",
" prev_x = None \n",
" "
Copy link

Does this code actually work for you? Im running it and rewards dont get better. after how many episodes does you agent start winning?

Copy link

Any ideas why it learns slower than Karapathy's original code?

