Skip to content

Instantly share code, notes, and snippets.

@okuchaiev
Last active July 13, 2020 20:04
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save okuchaiev/d85179ec6315e5823db9895e583db937 to your computer and use it in GitHub Desktop.
Save okuchaiev/d85179ec6315e5823db9895e583db937 to your computer and use it in GitHub Desktop.
Pong from Pixels in Pytorch
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Pong from pixels in Pytorch - Part 1\n",
"This notebook is based on Andrej's Karpathy gist https://gist.github.com/karpathy/a4166c7fe253700972fcbc77e4ea32c5 \n",
"Main difference is that it uses PyTorch. \n",
"Necessary imports - OpenAI's Gym, PyTorch and NumPy"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import gym\n",
"import torch\n",
"import torch.nn as nn\n",
"import torch.optim as optim\n",
"import torch.nn.functional as F\n",
"from math import log\n",
"import numpy as np"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This is a simple 2 layer fully-connected neural network such as was used in original Karpathy's blogpost"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"class PolicyNetK(nn.Module):\n",
" \"\"\"Predict the probability of moving UP\"\"\"\n",
" def __init__(self):\n",
" super(PolicyNetK, self).__init__() \n",
" self.fc1 = nn.Linear(80*80, 100, bias=False)\n",
" self.fc2 = nn.Linear(100, 1, bias=False)\n",
" nn.init.xavier_uniform_(self.fc1.weight)\n",
" nn.init.xavier_uniform_(self.fc2.weight)\n",
" \n",
" def forward(self, x): \n",
" x = F.relu(self.fc1(x))\n",
" logp = self.fc2(x)\n",
" return torch.sigmoid(logp) # the output is p_up (sigmoid puts it into (0,1) )\n",
"\n",
"pnet = PolicyNetK()\n",
"# This part will move the network to the NVIDIA GPU if you have one\n",
"device = torch.device(\"cuda:0\" if torch.cuda.is_available() else \"cpu\")\n",
"pnet.to(device)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# from Karpathy's post, preprocessing function\n",
"def prepro(I):\n",
" \"\"\" prepro 210x160x3 uint8 frame into 6400 (80x80) 1D float vector \"\"\"\n",
" I = I[35:195] # crop\n",
" I = I[::2,::2,0] # downsample by factor of 2 \n",
" I[I == 144] = 0 # erase background (background type 1)\n",
" I[I == 109] = 0 # erase background (background type 2)\n",
" I[I != 0] = 1 # everything else (paddles, ball) just set to 1\n",
" return I.astype(np.float).ravel() \n",
"\n",
"# from Karpathy's post, discounting rewards function\n",
"def discount_rewards(r):\n",
" gamma = 0.99 # discount factor for reward\n",
" \"\"\" take 1D float array of rewards and compute discounted reward \"\"\"\n",
" discounted_r = np.zeros_like(r)\n",
" running_add = 0\n",
" for t in reversed(range(0, r.size)):\n",
" if r[t] != 0: running_add = 0 # reset the sum, since this was a game boundary (pong specific!)\n",
" running_add = running_add * gamma + r[t]\n",
" discounted_r[t] = running_add\n",
" return discounted_r"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"env = gym.make(\"Pong-v0\")\n",
"observation = env.reset()\n",
"prev_x = None\n",
"episode_number = 0\n",
"batch_size = 4\n",
"lr = 1e-4\n",
"p_ups, fake_labels, rewards = [], [], []"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"Minimizing BCELoss is equivalent to maximizing logP if network outputs P.\n",
"Note that this variant of loss expects probability not logits\n",
"reduction='none' because we need to weight by advantage before reducing\n",
"therefore, we'll reduce manually later"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"loss_computator = torch.nn.BCELoss(reduction='none') \n",
"optimizer = optim.RMSprop(pnet.parameters(), lr=lr)\n",
"running_reward = None\n",
"optimizer.zero_grad()\n",
"\n",
"import os.path\n",
"if os.path.isfile(\"model-2fc.pt\"):\n",
" pnet.load_state_dict(torch.load(\"model-2fc.pt\"))\n",
"\n",
"while True:\n",
" if episode_number % 100 == 0 and episode_number>0: \n",
" torch.save(pnet.state_dict(), \"model-2fc.pt\")\n",
" # to see your AI play game's built-in AI call env.render()\n",
" env.render()\n",
" \n",
" # create input for the policy network (e.g. difference between two frames)\n",
" cur_x = prepro(observation)\n",
" x = cur_x - prev_x if prev_x is not None else np.zeros_like(cur_x) \n",
" prev_x = cur_x \n",
" x = torch.from_numpy(x).float().to(device) \n",
" \n",
" p_up = pnet(x) # probability of going UP. P_up \n",
" p_ups.append(p_up)\n",
" action = 2 if np.random.uniform() < p_up.data[0] else 3\n",
" y = 1.0 if action == 2 else 0.0 # fake label\n",
" fake_labels.append(y)\n",
" \n",
" observation, reward, done, info = env.step(action) \n",
" rewards.append(reward) # reward for action can be seen after executing action\n",
" \n",
" if done: # an episode finished \n",
" episode_number += 1 \n",
" eprewards = np.vstack(rewards) \n",
" discounted_epr = discount_rewards(eprewards)\n",
" discounted_epr -= np.mean(discounted_epr)\n",
" discounted_epr /= np.std(discounted_epr) \n",
"\n",
" lx = torch.stack(p_ups).float().to(device).squeeze(1)\n",
" ly = torch.tensor(fake_labels).float().to(device)\n",
" losses = loss_computator(lx, ly)\n",
" t_discounted_epr = torch.from_numpy(discounted_epr).squeeze(1).float().to(device)\n",
" losses *= t_discounted_epr \n",
" loss = torch.mean(losses) \n",
" reward_sum = sum(rewards)\n",
" running_reward = reward_sum if running_reward is None else running_reward * 0.99 + reward_sum * 0.01\n",
" if episode_number % 10 == 1: \n",
" print(\"EPNUM: {0}, LOSS: {1}. REWARDS: {2} RUNNING_REWARDS: {3}\"\n",
" .format(episode_number, loss, reward_sum, running_reward)) \n",
" \n",
" loss.backward(torch.tensor(1.0/batch_size).to(device))\n",
" if episode_number % batch_size == 0 and episode_number > 0: \n",
" optimizer.step()\n",
" optimizer.zero_grad() \n",
" \n",
" p_ups, fake_labels, rewards = [], [], [] # reset\n",
" observation = env.reset() # reset env\n",
" prev_x = None \n",
" "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.4"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
@yonigottesman
Copy link

Does this code actually work for you? Im running it and rewards dont get better. after how many episodes does you agent start winning?

@kennethmitra
Copy link

Any ideas why it learns slower than Karapathy's original code?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment