Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save carlthome/542a9899dedcb2c7adf3403df3fe1f14 to your computer and use it in GitHub Desktop.
Save carlthome/542a9899dedcb2c7adf3403df3fe1f14 to your computer and use it in GitHub Desktop.
Proximal Policy Optimization (PPO) of the CartPole problem with PyTorch
{
"nbformat": 4,
"nbformat_minor": 0,
"metadata": {
"colab": {
"provenance": [],
"include_colab_link": true
},
"kernelspec": {
"name": "python3",
"display_name": "Python 3"
},
"language_info": {
"name": "python"
},
"widgets": {
"application/vnd.jupyter.widget-state+json": {
"9eb3f337d54041adba415a90fc714218": {
"model_module": "@jupyter-widgets/controls",
"model_name": "HBoxModel",
"model_module_version": "1.5.0",
"state": {
"_dom_classes": [],
"_model_module": "@jupyter-widgets/controls",
"_model_module_version": "1.5.0",
"_model_name": "HBoxModel",
"_view_count": null,
"_view_module": "@jupyter-widgets/controls",
"_view_module_version": "1.5.0",
"_view_name": "HBoxView",
"box_style": "",
"children": [
"IPY_MODEL_f1da27748e4d4a19bde362c1152220bd",
"IPY_MODEL_4cd8e38a83314f30a1d6d4b2dbb87d1c",
"IPY_MODEL_d475415af510481eae562066aa800379"
],
"layout": "IPY_MODEL_d01b6ac880054fa6b3fa9d271e651983"
}
},
"f1da27748e4d4a19bde362c1152220bd": {
"model_module": "@jupyter-widgets/controls",
"model_name": "HTMLModel",
"model_module_version": "1.5.0",
"state": {
"_dom_classes": [],
"_model_module": "@jupyter-widgets/controls",
"_model_module_version": "1.5.0",
"_model_name": "HTMLModel",
"_view_count": null,
"_view_module": "@jupyter-widgets/controls",
"_view_module_version": "1.5.0",
"_view_name": "HTMLView",
"description": "",
"description_tooltip": null,
"layout": "IPY_MODEL_44c54eb3e04f44b49e4e25a9887b807f",
"placeholder": "​",
"style": "IPY_MODEL_336b6ce9b017402b8627f54ccdcf28b1",
"value": "Training: 100%"
}
},
"4cd8e38a83314f30a1d6d4b2dbb87d1c": {
"model_module": "@jupyter-widgets/controls",
"model_name": "FloatProgressModel",
"model_module_version": "1.5.0",
"state": {
"_dom_classes": [],
"_model_module": "@jupyter-widgets/controls",
"_model_module_version": "1.5.0",
"_model_name": "FloatProgressModel",
"_view_count": null,
"_view_module": "@jupyter-widgets/controls",
"_view_module_version": "1.5.0",
"_view_name": "ProgressView",
"bar_style": "success",
"description": "",
"description_tooltip": null,
"layout": "IPY_MODEL_ac920b2a8fd14cdc8e1204344a68e712",
"max": 300,
"min": 0,
"orientation": "horizontal",
"style": "IPY_MODEL_b3213e27c9694d30bd329f9acaee2bba",
"value": 300
}
},
"d475415af510481eae562066aa800379": {
"model_module": "@jupyter-widgets/controls",
"model_name": "HTMLModel",
"model_module_version": "1.5.0",
"state": {
"_dom_classes": [],
"_model_module": "@jupyter-widgets/controls",
"_model_module_version": "1.5.0",
"_model_name": "HTMLModel",
"_view_count": null,
"_view_module": "@jupyter-widgets/controls",
"_view_module_version": "1.5.0",
"_view_name": "HTMLView",
"description": "",
"description_tooltip": null,
"layout": "IPY_MODEL_c160814f1ed6487aaf89907fd7ff9375",
"placeholder": "​",
"style": "IPY_MODEL_9bf93268fba940738d13448296a2c8e9",
"value": " 300/300 [02:09<00:00, 13.62it/s]"
}
},
"d01b6ac880054fa6b3fa9d271e651983": {
"model_module": "@jupyter-widgets/base",
"model_name": "LayoutModel",
"model_module_version": "1.2.0",
"state": {
"_model_module": "@jupyter-widgets/base",
"_model_module_version": "1.2.0",
"_model_name": "LayoutModel",
"_view_count": null,
"_view_module": "@jupyter-widgets/base",
"_view_module_version": "1.2.0",
"_view_name": "LayoutView",
"align_content": null,
"align_items": null,
"align_self": null,
"border": null,
"bottom": null,
"display": null,
"flex": null,
"flex_flow": null,
"grid_area": null,
"grid_auto_columns": null,
"grid_auto_flow": null,
"grid_auto_rows": null,
"grid_column": null,
"grid_gap": null,
"grid_row": null,
"grid_template_areas": null,
"grid_template_columns": null,
"grid_template_rows": null,
"height": null,
"justify_content": null,
"justify_items": null,
"left": null,
"margin": null,
"max_height": null,
"max_width": null,
"min_height": null,
"min_width": null,
"object_fit": null,
"object_position": null,
"order": null,
"overflow": null,
"overflow_x": null,
"overflow_y": null,
"padding": null,
"right": null,
"top": null,
"visibility": null,
"width": null
}
},
"44c54eb3e04f44b49e4e25a9887b807f": {
"model_module": "@jupyter-widgets/base",
"model_name": "LayoutModel",
"model_module_version": "1.2.0",
"state": {
"_model_module": "@jupyter-widgets/base",
"_model_module_version": "1.2.0",
"_model_name": "LayoutModel",
"_view_count": null,
"_view_module": "@jupyter-widgets/base",
"_view_module_version": "1.2.0",
"_view_name": "LayoutView",
"align_content": null,
"align_items": null,
"align_self": null,
"border": null,
"bottom": null,
"display": null,
"flex": null,
"flex_flow": null,
"grid_area": null,
"grid_auto_columns": null,
"grid_auto_flow": null,
"grid_auto_rows": null,
"grid_column": null,
"grid_gap": null,
"grid_row": null,
"grid_template_areas": null,
"grid_template_columns": null,
"grid_template_rows": null,
"height": null,
"justify_content": null,
"justify_items": null,
"left": null,
"margin": null,
"max_height": null,
"max_width": null,
"min_height": null,
"min_width": null,
"object_fit": null,
"object_position": null,
"order": null,
"overflow": null,
"overflow_x": null,
"overflow_y": null,
"padding": null,
"right": null,
"top": null,
"visibility": null,
"width": null
}
},
"336b6ce9b017402b8627f54ccdcf28b1": {
"model_module": "@jupyter-widgets/controls",
"model_name": "DescriptionStyleModel",
"model_module_version": "1.5.0",
"state": {
"_model_module": "@jupyter-widgets/controls",
"_model_module_version": "1.5.0",
"_model_name": "DescriptionStyleModel",
"_view_count": null,
"_view_module": "@jupyter-widgets/base",
"_view_module_version": "1.2.0",
"_view_name": "StyleView",
"description_width": ""
}
},
"ac920b2a8fd14cdc8e1204344a68e712": {
"model_module": "@jupyter-widgets/base",
"model_name": "LayoutModel",
"model_module_version": "1.2.0",
"state": {
"_model_module": "@jupyter-widgets/base",
"_model_module_version": "1.2.0",
"_model_name": "LayoutModel",
"_view_count": null,
"_view_module": "@jupyter-widgets/base",
"_view_module_version": "1.2.0",
"_view_name": "LayoutView",
"align_content": null,
"align_items": null,
"align_self": null,
"border": null,
"bottom": null,
"display": null,
"flex": null,
"flex_flow": null,
"grid_area": null,
"grid_auto_columns": null,
"grid_auto_flow": null,
"grid_auto_rows": null,
"grid_column": null,
"grid_gap": null,
"grid_row": null,
"grid_template_areas": null,
"grid_template_columns": null,
"grid_template_rows": null,
"height": null,
"justify_content": null,
"justify_items": null,
"left": null,
"margin": null,
"max_height": null,
"max_width": null,
"min_height": null,
"min_width": null,
"object_fit": null,
"object_position": null,
"order": null,
"overflow": null,
"overflow_x": null,
"overflow_y": null,
"padding": null,
"right": null,
"top": null,
"visibility": null,
"width": null
}
},
"b3213e27c9694d30bd329f9acaee2bba": {
"model_module": "@jupyter-widgets/controls",
"model_name": "ProgressStyleModel",
"model_module_version": "1.5.0",
"state": {
"_model_module": "@jupyter-widgets/controls",
"_model_module_version": "1.5.0",
"_model_name": "ProgressStyleModel",
"_view_count": null,
"_view_module": "@jupyter-widgets/base",
"_view_module_version": "1.2.0",
"_view_name": "StyleView",
"bar_color": null,
"description_width": ""
}
},
"c160814f1ed6487aaf89907fd7ff9375": {
"model_module": "@jupyter-widgets/base",
"model_name": "LayoutModel",
"model_module_version": "1.2.0",
"state": {
"_model_module": "@jupyter-widgets/base",
"_model_module_version": "1.2.0",
"_model_name": "LayoutModel",
"_view_count": null,
"_view_module": "@jupyter-widgets/base",
"_view_module_version": "1.2.0",
"_view_name": "LayoutView",
"align_content": null,
"align_items": null,
"align_self": null,
"border": null,
"bottom": null,
"display": null,
"flex": null,
"flex_flow": null,
"grid_area": null,
"grid_auto_columns": null,
"grid_auto_flow": null,
"grid_auto_rows": null,
"grid_column": null,
"grid_gap": null,
"grid_row": null,
"grid_template_areas": null,
"grid_template_columns": null,
"grid_template_rows": null,
"height": null,
"justify_content": null,
"justify_items": null,
"left": null,
"margin": null,
"max_height": null,
"max_width": null,
"min_height": null,
"min_width": null,
"object_fit": null,
"object_position": null,
"order": null,
"overflow": null,
"overflow_x": null,
"overflow_y": null,
"padding": null,
"right": null,
"top": null,
"visibility": null,
"width": null
}
},
"9bf93268fba940738d13448296a2c8e9": {
"model_module": "@jupyter-widgets/controls",
"model_name": "DescriptionStyleModel",
"model_module_version": "1.5.0",
"state": {
"_model_module": "@jupyter-widgets/controls",
"_model_module_version": "1.5.0",
"_model_name": "DescriptionStyleModel",
"_view_count": null,
"_view_module": "@jupyter-widgets/base",
"_view_module_version": "1.2.0",
"_view_name": "StyleView",
"description_width": ""
}
}
}
}
},
"cells": [
{
"cell_type": "markdown",
"metadata": {
"id": "view-in-github",
"colab_type": "text"
},
"source": [
"<a href=\"https://colab.research.google.com/gist/carlthome/542a9899dedcb2c7adf3403df3fe1f14/proximal-policy-optimization-ppo-of-the-cartpole-problem-with-pytorch.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "rFU7C-g5IN-n"
},
"outputs": [],
"source": [
"import os\n",
"\n",
"import torch\n",
"import numpy as np"
]
},
{
"cell_type": "code",
"source": [
"class Memory:\n",
" def __init__(self, batch_size):\n",
" self.batch_size = batch_size\n",
" self.history = {\n",
" \"state\": [],\n",
" \"probs\": [],\n",
" \"values\": [],\n",
" \"action\": [],\n",
" \"reward\": [],\n",
" \"done\": [],\n",
" }\n",
"\n",
" def generate_batches(self):\n",
" num_states = len(self.history[\"state\"])\n",
" batch_start = np.arange(0, num_states, self.batch_size)\n",
" indices = np.arange(num_states, dtype=np.int64)\n",
" np.random.shuffle(indices)\n",
" rows = [indices[i : i + self.batch_size] for i in batch_start]\n",
" batches = {k: np.array(v) for k, v in self.history.items()}\n",
" batches[\"batches\"] = rows\n",
" return batches\n",
"\n",
" def append(self, row):\n",
" for k, v in row.items():\n",
" self.history[k].append(v)\n",
"\n",
" def clear(self):\n",
" for k in self.history:\n",
" self.history[k].clear()"
],
"metadata": {
"id": "9SrnghsTIVyW"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"class Actor(torch.nn.Module):\n",
" def __init__(\n",
" self,\n",
" num_actions,\n",
" input_dims,\n",
" alpha,\n",
" fc1_dims=256,\n",
" fc2_dims=256,\n",
" chkpt_dir=\"tmp/ppo\",\n",
" ):\n",
" super().__init__()\n",
" self.checkpoint_file = os.path.join(chkpt_dir, \"actor_torch_ppo\")\n",
"\n",
" self.actor = torch.nn.Sequential(\n",
" torch.nn.Linear(*input_dims, fc1_dims),\n",
" torch.nn.ReLU(),\n",
" torch.nn.Linear(fc1_dims, fc2_dims),\n",
" torch.nn.ReLU(),\n",
" torch.nn.Linear(fc2_dims, num_actions),\n",
" torch.nn.Softmax(dim=-1),\n",
" )\n",
"\n",
" self.optimizer = torch.optim.Adam(self.parameters(), lr=alpha)\n",
" self.device = torch.device(\"cuda:0\" if torch.cuda.is_available() else \"cpu\")\n",
" self.to(self.device)\n",
"\n",
" def forward(self, state):\n",
" dist = self.actor(state)\n",
" dist = torch.distributions.categorical.Categorical(dist)\n",
" return dist\n",
"\n",
" def save_checkpoint(self):\n",
" torch.save(self.state_dict(), self.checkpoint_file)\n",
"\n",
" def load_checkpoint(self):\n",
" self.load_state_dict(torch.load(self.checkpoint_file))"
],
"metadata": {
"id": "5xhn1E_YJLF4"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"class Critic(torch.nn.Module):\n",
" def __init__(\n",
" self, input_dims, alpha, fc1_dims=256, fc2_dims=256, chkpt_dir=\"tmp/ppo\"\n",
" ):\n",
" super().__init__()\n",
" self.checkpoint_file = os.path.join(chkpt_dir, \"critic_torch_ppo\")\n",
" self.critic = torch.nn.Sequential(\n",
" torch.nn.Linear(*input_dims, fc1_dims),\n",
" torch.nn.ReLU(),\n",
" torch.nn.Linear(fc1_dims, fc2_dims),\n",
" torch.nn.Linear(fc2_dims, 1),\n",
" )\n",
"\n",
" self.optimizer = torch.optim.Adam(self.parameters(), lr=alpha)\n",
" self.device = torch.device(\"cuda:0\" if torch.cuda.is_available() else \"cpu\")\n",
" self.to(self.device)\n",
"\n",
" def forward(self, state):\n",
" value = self.critic(state)\n",
" return value\n",
"\n",
" def save_checkpoint(self):\n",
" torch.save(self.state_dict(), self.checkpoint_file)\n",
"\n",
" def load_checkpoint(self):\n",
" self.load_state_dict(torch.load(self.checkpoint_file))"
],
"metadata": {
"id": "L-zogfxjKBXD"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"class Agent:\n",
" def __init__(\n",
" self,\n",
" num_actions,\n",
" input_dims,\n",
" gamma=0.99,\n",
" alpha=1e-3,\n",
" gae_lambda=0.95,\n",
" policy_clip=0.2,\n",
" batch_size=64,\n",
" N=2048,\n",
" num_epochs=10,\n",
" ):\n",
" self.gamma = gamma\n",
" self.policy_clip = policy_clip\n",
" self.num_epochs = num_epochs\n",
" self.gae_lambda = gae_lambda\n",
"\n",
" self.actor = Actor(num_actions, input_dims, alpha)\n",
" self.critic = Critic(input_dims, alpha)\n",
" self.memory = Memory(batch_size)\n",
"\n",
" def remember(self, state, action, probs, values, reward, done):\n",
" row = {\n",
" \"state\": state,\n",
" \"probs\": probs,\n",
" \"values\": values,\n",
" \"action\": action,\n",
" \"reward\": reward,\n",
" \"done\": done,\n",
" }\n",
" self.memory.append(row)\n",
"\n",
" def save_models(self):\n",
" self.actor.save_checkpoint()\n",
" self.critic.save_checkpoint()\n",
"\n",
" def load_models(self):\n",
" self.actor.load_checkpoint()\n",
" self.critic.load_checkpoint()\n",
"\n",
" def choose_action(self, observation):\n",
" state = torch.tensor(np.asarray([observation]), dtype=torch.float).to(self.actor.device)\n",
"\n",
" dist = self.actor(state)\n",
" value = self.critic(state)\n",
" action = dist.sample()\n",
"\n",
" probs = torch.squeeze(dist.log_prob(action)).item()\n",
" action = torch.squeeze(action).item()\n",
" value = torch.squeeze(value).item()\n",
"\n",
" return action, probs, value\n",
"\n",
" def learn(self):\n",
" for _ in range(self.num_epochs):\n",
"\n",
" batches = self.memory.generate_batches()\n",
" state_history = batches[\"state\"]\n",
" old_probs_history = batches[\"probs\"]\n",
" values_history = batches[\"values\"]\n",
" action_history = batches[\"action\"]\n",
" reward_history = batches[\"reward\"]\n",
" dones_history = batches[\"done\"]\n",
" batches = batches[\"batches\"]\n",
"\n",
" values = values_history\n",
" advantage = np.zeros(len(reward_history), dtype=np.float32)\n",
" for t in range(len(reward_history) - 1):\n",
" discount = 1.0\n",
" a_t = 0\n",
" for k in range(t, len(reward_history) - 1):\n",
" a_t += discount * (\n",
" reward_history[k]\n",
" + self.gamma * values[k + 1] * (1 - int(dones_history[k]))\n",
" - values[k]\n",
" )\n",
" discount *= self.gamma * self.gae_lambda\n",
" advantage[t] = a_t\n",
"\n",
" advantage = torch.tensor(advantage).to(self.actor.device)\n",
"\n",
" values = torch.tensor(values).to(self.actor.device)\n",
"\n",
" for batch in batches:\n",
" states = torch.tensor(state_history[batch], dtype=torch.float).to(self.actor.device)\n",
" old_probs = torch.tensor(old_probs_history[batch]).to(self.actor.device)\n",
" actions = torch.tensor(action_history[batch]).to(self.actor.device)\n",
"\n",
" dist = self.actor(states)\n",
" critic_value = self.critic(states)\n",
"\n",
" critic_value = torch.squeeze(critic_value)\n",
"\n",
" new_probs = dist.log_prob(actions)\n",
" prob_ratio = new_probs.exp() / old_probs.exp()\n",
"\n",
" weighted_probs = advantage[batch] * prob_ratio\n",
" weighted_clipped_probs = (\n",
" torch.clamp(prob_ratio, 1 - self.policy_clip, 1 + self.policy_clip)\n",
" * advantage[batch]\n",
" )\n",
"\n",
" actor_loss = -torch.min(weighted_probs, weighted_clipped_probs).mean()\n",
"\n",
" returns = advantage[batch] + values[batch]\n",
" critic_loss = (returns - critic_value) ** 2\n",
" critic_loss = critic_loss.mean()\n",
"\n",
" total_loss = actor_loss + 0.5 * critic_loss\n",
"\n",
" self.actor.optimizer.zero_grad()\n",
" self.critic.optimizer.zero_grad()\n",
"\n",
" total_loss.backward()\n",
"\n",
" self.actor.optimizer.step()\n",
" self.critic.optimizer.step()\n",
"\n",
" self.memory.clear()"
],
"metadata": {
"id": "oaawO7INLMbc"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"# Experiments\n"
],
"metadata": {
"id": "vhZwr2rhPiXS"
}
},
{
"cell_type": "code",
"source": [
"!mkdir -p tmp/ppo"
],
"metadata": {
"id": "jIkeuMqQRQNn"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"import gym\n",
"import numpy as np\n",
"import matplotlib.pyplot as plt\n",
"import tqdm.auto as tqdm\n",
"\n",
"env = gym.make(\"CartPole-v1\", new_step_api=True)\n",
"env.action_space.seed(42)\n",
"steps_per_update = 20\n",
"batch_size = 5\n",
"num_epochs = 4\n",
"alpha = 1e-3\n",
"\n",
"agent = Agent(\n",
" num_actions=env.action_space.n,\n",
" batch_size=batch_size,\n",
" alpha=alpha,\n",
" num_epochs=num_epochs,\n",
" input_dims=env.observation_space.shape,\n",
")\n",
"\n",
"num_episodes = 300\n",
"score = 0\n",
"score_history = []\n",
"best_score = env.reward_range[0]\n",
"num_updates = 0\n",
"num_steps = 0\n",
"\n",
"records = []\n",
"for episode in tqdm.trange(num_episodes, desc=\"Training\"):\n",
" observation = env.reset()\n",
" terminated = False\n",
" truncated = False\n",
" score = 0\n",
" while not terminated and not truncated:\n",
" action, prob, val = agent.choose_action(observation)\n",
"\n",
" new_observation, reward, terminated, truncated, info = env.step(action)\n",
" num_steps += 1\n",
" score += reward\n",
"\n",
" done = truncated or terminated\n",
" agent.remember(observation, action, prob, val, reward, done)\n",
"\n",
" if num_steps % steps_per_update == 0:\n",
" agent.learn()\n",
" num_updates += 1\n",
"\n",
" observation = new_observation\n",
"\n",
" score_history.append(score)\n",
"\n",
" avg_score = np.mean(score_history[-100:])\n",
" if avg_score > best_score:\n",
" best_score = avg_score\n",
" agent.save_models()\n",
"\n",
" record = {\n",
" \"Episode\": episode,\n",
" \"Score\": score,\n",
" \"Average Score\": avg_score,\n",
" \"Best Score\": best_score,\n",
" \"Step\": num_steps,\n",
" \"Updates\": num_updates,\n",
" }\n",
" records.append(record)"
],
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 49,
"referenced_widgets": [
"9eb3f337d54041adba415a90fc714218",
"f1da27748e4d4a19bde362c1152220bd",
"4cd8e38a83314f30a1d6d4b2dbb87d1c",
"d475415af510481eae562066aa800379",
"d01b6ac880054fa6b3fa9d271e651983",
"44c54eb3e04f44b49e4e25a9887b807f",
"336b6ce9b017402b8627f54ccdcf28b1",
"ac920b2a8fd14cdc8e1204344a68e712",
"b3213e27c9694d30bd329f9acaee2bba",
"c160814f1ed6487aaf89907fd7ff9375",
"9bf93268fba940738d13448296a2c8e9"
]
},
"collapsed": true,
"id": "nKPFUrdiPbgD",
"outputId": "388dcc48-6b33-41d0-f33f-5132e8044e58"
},
"execution_count": null,
"outputs": [
{
"output_type": "display_data",
"data": {
"text/plain": [
"Training: 0%| | 0/300 [00:00<?, ?it/s]"
],
"application/vnd.jupyter.widget-view+json": {
"version_major": 2,
"version_minor": 0,
"model_id": "9eb3f337d54041adba415a90fc714218"
}
},
"metadata": {}
}
]
},
{
"cell_type": "code",
"source": [
"import pandas as pd\n",
"\n",
"pd.DataFrame(records).set_index(\"Episode\").plot(subplots=True, figsize=(5, 10));"
],
"metadata": {
"id": "qDVSZfVfUXwH",
"colab": {
"base_uri": "https://localhost:8080/",
"height": 850
},
"outputId": "0c00ebd6-3972-43e2-e583-5236c60823c1"
},
"execution_count": null,
"outputs": [
{
"output_type": "display_data",
"data": {
"text/plain": [
"<Figure size 500x1000 with 5 Axes>"
],
"image/png": "\n"
},
"metadata": {}
}
]
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment