Skip to content

Instantly share code, notes, and snippets.

@s-gv
Last active May 25, 2021 12:54
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save s-gv/b13974f896c7baf81ea3a83cf1af4a66 to your computer and use it in GitHub Desktop.
Save s-gv/b13974f896c7baf81ea3a83cf1af4a66 to your computer and use it in GitHub Desktop.
pong-ppo
import numpy as np
import torch
from torch import nn
from torch.nn import functional as F
from PIL import Image
import random
class Policy(nn.Module):
def __init__(self):
super(Policy, self).__init__()
self.gamma = 0.99
self.eps_clip = 0.1
self.layers = nn.Sequential(
nn.Linear(6000*2, 512), nn.ReLU(),
nn.Linear(512, 2),
)
def state_to_tensor(self, I):
""" prepro 210x160x3 uint8 frame into 6000 (75x80) 1D float vector """
if I is None:
return torch.zeros(1, 6000)
I = I[35:185] # crop - remove 35px from start & 25px from end of image in x, to reduce redundant parts of image (i.e. after ball passes paddle)
I = I[::2,::2,0] # downsample by factor of 2.
I[I == 144] = 0 # erase background (background type 1)
I[I == 109] = 0 # erase background (background type 2)
I[I != 0] = 1 # everything else (paddles, ball) just set to 1. this makes the image grayscale effectively
return torch.from_numpy(I.astype(np.float32).ravel()).unsqueeze(0)
def pre_process(self, x, prev_x):
#return self.state_to_tensor(x) - self.state_to_tensor(prev_x)
return torch.cat([self.state_to_tensor(x), self.state_to_tensor(prev_x)], dim=1)
def convert_action(self, action):
return action + 2
def forward(self, d_obs, action=None, action_prob=None, advantage=None, deterministic=False):
if action is None:
with torch.no_grad():
logits = self.layers(d_obs)
if deterministic:
action = int(torch.argmax(logits[0]).detach().cpu().numpy())
action_prob = 1.0
else:
c = torch.distributions.Categorical(logits=logits)
action = int(c.sample().cpu().numpy()[0])
action_prob = float(c.probs[0, action].detach().cpu().numpy())
return action, action_prob
'''
# policy gradient (REINFORCE)
logits = self.layers(d_obs)
loss = F.cross_entropy(logits, action, reduction='none') * advantage
return loss.mean()
'''
# PPO
vs = np.array([[1., 0.], [0., 1.]])
ts = torch.FloatTensor(vs[action.cpu().numpy()])
logits = self.layers(d_obs)
r = torch.sum(F.softmax(logits, dim=1) * ts, dim=1) / action_prob
loss1 = r * advantage
loss2 = torch.clamp(r, 1-self.eps_clip, 1+self.eps_clip) * advantage
loss = -torch.min(loss1, loss2)
loss = torch.mean(loss)
return loss
import gym
import numpy as np
import torch
from torch import nn
from model import Policy
import time
env = gym.make('PongNoFrameskip-v4')
env = gym.wrappers.Monitor(env, './tmp', video_callable=lambda ep_id: True, force=True)
env.reset()
policy = Policy()
policy.load_state_dict(torch.load('params.ckpt'))
policy.eval()
for episode in range(1):
prev_obs = None
obs = env.reset()
for t in range(190000):
#env.render()
d_obs = policy.pre_process(obs, prev_obs)
with torch.no_grad():
action, action_prob = policy(d_obs, deterministic=False)
prev_obs = obs
obs, reward, done, info = env.step(policy.convert_action(action))
if done:
print('Episode %d (%d timesteps) - Reward: %.2f' % (episode, t, reward))
break
#time.sleep(0.033)
env.close()
import gym
import numpy as np
import random
import torch
from torch import nn
from model import Policy
env = gym.make('PongNoFrameskip-v4')
env.reset()
policy = Policy()
opt = torch.optim.Adam(policy.parameters(), lr=1e-3)
reward_sum_running_avg = None
for it in range(100000):
d_obs_history, action_history, action_prob_history, reward_history = [], [], [], []
for ep in range(10):
obs, prev_obs = env.reset(), None
for t in range(190000):
#env.render()
d_obs = policy.pre_process(obs, prev_obs)
with torch.no_grad():
action, action_prob = policy(d_obs)
prev_obs = obs
obs, reward, done, info = env.step(policy.convert_action(action))
d_obs_history.append(d_obs)
action_history.append(action)
action_prob_history.append(action_prob)
reward_history.append(reward)
if done:
reward_sum = sum(reward_history[-t:])
reward_sum_running_avg = 0.99*reward_sum_running_avg + 0.01*reward_sum if reward_sum_running_avg else reward_sum
print('Iteration %d, Episode %d (%d timesteps) - last_action: %d, last_action_prob: %.2f, reward_sum: %.2f, running_avg: %.2f' % (it, ep, t, action, action_prob, reward_sum, reward_sum_running_avg))
#print(action_history[-5:])
break
# compute advantage
R = 0
discounted_rewards = []
for r in reward_history[::-1]:
if r != 0: R = 0 # scored/lost a point in pong, so reset reward sum
R = r + policy.gamma * R
discounted_rewards.insert(0, R)
#print(discounted_rewards[:5])
discounted_rewards = torch.FloatTensor(discounted_rewards)
discounted_rewards = (discounted_rewards - discounted_rewards.mean()) / discounted_rewards.std()
# update policy
for _ in range(5):
n_batch = 24576
idxs = random.sample(range(len(action_history)), n_batch)
d_obs_batch = torch.cat([d_obs_history[idx] for idx in idxs], 0)
action_batch = torch.LongTensor([action_history[idx] for idx in idxs])
action_prob_batch = torch.FloatTensor([action_prob_history[idx] for idx in idxs])
advantage_batch = torch.FloatTensor([discounted_rewards[idx] for idx in idxs])
#advantage_batch = (advantage_batch - advantage_batch.mean()) / advantage_batch.std()
opt.zero_grad()
loss = policy(d_obs_batch, action_batch, action_prob_batch, advantage_batch)
loss.backward()
opt.step()
print('Iteration %d -- Loss: %.3f' % (it, loss))
if it % 5 == 0:
torch.save(policy.state_dict(), 'params.ckpt')
env.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment