Last active
May 25, 2021 12:54
-
-
Save s-gv/b13974f896c7baf81ea3a83cf1af4a66 to your computer and use it in GitHub Desktop.
pong-ppo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import torch | |
from torch import nn | |
from torch.nn import functional as F | |
from PIL import Image | |
import random | |
class Policy(nn.Module): | |
def __init__(self): | |
super(Policy, self).__init__() | |
self.gamma = 0.99 | |
self.eps_clip = 0.1 | |
self.layers = nn.Sequential( | |
nn.Linear(6000*2, 512), nn.ReLU(), | |
nn.Linear(512, 2), | |
) | |
def state_to_tensor(self, I): | |
""" prepro 210x160x3 uint8 frame into 6000 (75x80) 1D float vector """ | |
if I is None: | |
return torch.zeros(1, 6000) | |
I = I[35:185] # crop - remove 35px from start & 25px from end of image in x, to reduce redundant parts of image (i.e. after ball passes paddle) | |
I = I[::2,::2,0] # downsample by factor of 2. | |
I[I == 144] = 0 # erase background (background type 1) | |
I[I == 109] = 0 # erase background (background type 2) | |
I[I != 0] = 1 # everything else (paddles, ball) just set to 1. this makes the image grayscale effectively | |
return torch.from_numpy(I.astype(np.float32).ravel()).unsqueeze(0) | |
def pre_process(self, x, prev_x): | |
#return self.state_to_tensor(x) - self.state_to_tensor(prev_x) | |
return torch.cat([self.state_to_tensor(x), self.state_to_tensor(prev_x)], dim=1) | |
def convert_action(self, action): | |
return action + 2 | |
def forward(self, d_obs, action=None, action_prob=None, advantage=None, deterministic=False): | |
if action is None: | |
with torch.no_grad(): | |
logits = self.layers(d_obs) | |
if deterministic: | |
action = int(torch.argmax(logits[0]).detach().cpu().numpy()) | |
action_prob = 1.0 | |
else: | |
c = torch.distributions.Categorical(logits=logits) | |
action = int(c.sample().cpu().numpy()[0]) | |
action_prob = float(c.probs[0, action].detach().cpu().numpy()) | |
return action, action_prob | |
''' | |
# policy gradient (REINFORCE) | |
logits = self.layers(d_obs) | |
loss = F.cross_entropy(logits, action, reduction='none') * advantage | |
return loss.mean() | |
''' | |
# PPO | |
vs = np.array([[1., 0.], [0., 1.]]) | |
ts = torch.FloatTensor(vs[action.cpu().numpy()]) | |
logits = self.layers(d_obs) | |
r = torch.sum(F.softmax(logits, dim=1) * ts, dim=1) / action_prob | |
loss1 = r * advantage | |
loss2 = torch.clamp(r, 1-self.eps_clip, 1+self.eps_clip) * advantage | |
loss = -torch.min(loss1, loss2) | |
loss = torch.mean(loss) | |
return loss |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gym | |
import numpy as np | |
import torch | |
from torch import nn | |
from model import Policy | |
import time | |
env = gym.make('PongNoFrameskip-v4') | |
env = gym.wrappers.Monitor(env, './tmp', video_callable=lambda ep_id: True, force=True) | |
env.reset() | |
policy = Policy() | |
policy.load_state_dict(torch.load('params.ckpt')) | |
policy.eval() | |
for episode in range(1): | |
prev_obs = None | |
obs = env.reset() | |
for t in range(190000): | |
#env.render() | |
d_obs = policy.pre_process(obs, prev_obs) | |
with torch.no_grad(): | |
action, action_prob = policy(d_obs, deterministic=False) | |
prev_obs = obs | |
obs, reward, done, info = env.step(policy.convert_action(action)) | |
if done: | |
print('Episode %d (%d timesteps) - Reward: %.2f' % (episode, t, reward)) | |
break | |
#time.sleep(0.033) | |
env.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gym | |
import numpy as np | |
import random | |
import torch | |
from torch import nn | |
from model import Policy | |
env = gym.make('PongNoFrameskip-v4') | |
env.reset() | |
policy = Policy() | |
opt = torch.optim.Adam(policy.parameters(), lr=1e-3) | |
reward_sum_running_avg = None | |
for it in range(100000): | |
d_obs_history, action_history, action_prob_history, reward_history = [], [], [], [] | |
for ep in range(10): | |
obs, prev_obs = env.reset(), None | |
for t in range(190000): | |
#env.render() | |
d_obs = policy.pre_process(obs, prev_obs) | |
with torch.no_grad(): | |
action, action_prob = policy(d_obs) | |
prev_obs = obs | |
obs, reward, done, info = env.step(policy.convert_action(action)) | |
d_obs_history.append(d_obs) | |
action_history.append(action) | |
action_prob_history.append(action_prob) | |
reward_history.append(reward) | |
if done: | |
reward_sum = sum(reward_history[-t:]) | |
reward_sum_running_avg = 0.99*reward_sum_running_avg + 0.01*reward_sum if reward_sum_running_avg else reward_sum | |
print('Iteration %d, Episode %d (%d timesteps) - last_action: %d, last_action_prob: %.2f, reward_sum: %.2f, running_avg: %.2f' % (it, ep, t, action, action_prob, reward_sum, reward_sum_running_avg)) | |
#print(action_history[-5:]) | |
break | |
# compute advantage | |
R = 0 | |
discounted_rewards = [] | |
for r in reward_history[::-1]: | |
if r != 0: R = 0 # scored/lost a point in pong, so reset reward sum | |
R = r + policy.gamma * R | |
discounted_rewards.insert(0, R) | |
#print(discounted_rewards[:5]) | |
discounted_rewards = torch.FloatTensor(discounted_rewards) | |
discounted_rewards = (discounted_rewards - discounted_rewards.mean()) / discounted_rewards.std() | |
# update policy | |
for _ in range(5): | |
n_batch = 24576 | |
idxs = random.sample(range(len(action_history)), n_batch) | |
d_obs_batch = torch.cat([d_obs_history[idx] for idx in idxs], 0) | |
action_batch = torch.LongTensor([action_history[idx] for idx in idxs]) | |
action_prob_batch = torch.FloatTensor([action_prob_history[idx] for idx in idxs]) | |
advantage_batch = torch.FloatTensor([discounted_rewards[idx] for idx in idxs]) | |
#advantage_batch = (advantage_batch - advantage_batch.mean()) / advantage_batch.std() | |
opt.zero_grad() | |
loss = policy(d_obs_batch, action_batch, action_prob_batch, advantage_batch) | |
loss.backward() | |
opt.step() | |
print('Iteration %d -- Loss: %.3f' % (it, loss)) | |
if it % 5 == 0: | |
torch.save(policy.state_dict(), 'params.ckpt') | |
env.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment