Skip to content

Instantly share code, notes, and snippets.

@SelvamArul
Created December 19, 2017 21:05
Show Gist options
  • Save SelvamArul/d5f53a181d2a430583a5d1b3c78a09db to your computer and use it in GitHub Desktop.
Save SelvamArul/d5f53a181d2a430583a5d1b3c78a09db to your computer and use it in GitHub Desktop.
DQN for gym breakout game.
import gym
import math
import random
import numpy as np
from collections import namedtuple
from itertools import count
from copy import deepcopy
from PIL import Image
import visdom
vis = visdom.Visdom( env = "breakout" )
import torch as th
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Variable
import torchvision.transforms as T
import time
# if gpu is to be used
use_cuda = th.cuda.is_available()
FloatTensor = th.cuda.FloatTensor if use_cuda else th.FloatTensor
LongTensor = th.cuda.LongTensor if use_cuda else th.LongTensor
ByteTensor = th.cuda.ByteTensor if use_cuda else th.ByteTensor
Tensor = FloatTensor
EPISODE = 0
EPISODE_SCORE = 0
Transition = namedtuple('Transition',
('state', 'action', 'next_state', 'reward'))
class ReplayMemory(object):
def __init__(self, capacity):
self.capacity = capacity
self.memory = []
self.position = 0
def push(self, *args):
"""Saves a transition."""
if len(self.memory) < self.capacity:
self.memory.append(None)
self.memory[self.position] = Transition(*args)
self.position = (self.position + 1) % self.capacity
def sample(self, batch_size):
return random.sample(self.memory, batch_size)
def __len__(self):
return len(self.memory)
env = gym.make('Breakout-v0').unwrapped
def get_screen():
screen = env.render(mode='rgb_array').transpose(
(2, 0, 1)) # transpose into torch order (CHW)
screen = screen[:,30:-15,:]
t = th.from_numpy(screen).type(FloatTensor).unsqueeze(0)
return t
# check the get_screen() and the visdom interface
game_win = vis.image(np.ones((3, 210, 160)))
plot_win = vis.line(X = np.array([EPISODE]),
Y = np.array([EPISODE_SCORE]),
opts=dict(showlegend=True, title='Break-out Log', legend=['Durations']))
for i in range(2):
env.reset()
t = get_screen().cpu().squeeze(0).numpy()
vis.image(t, win=game_win)
time.sleep(1)
# Breakout based GLOBAL
print ('Game : Breakout')
print ('Action space :', env.action_space)
print ('Action\'s Meaning :', env.unwrapped.get_action_meanings())
NO_OF_ACTIONS = len(env.unwrapped.get_action_meanings())
class DQN(nn.Module):
def __init__(self):
super(DQN, self).__init__()
self.conv1 = nn.Conv2d(3, 32, kernel_size=5, stride=2)
self.bn1 = nn.BatchNorm2d(32)
self.conv2 = nn.Conv2d(32, 64, kernel_size=5, stride=2)
self.bn2 = nn.BatchNorm2d(64)
self.conv3 = nn.Conv2d(64, 128, kernel_size=5, stride=2)
self.bn3 = nn.BatchNorm2d(128)
self.conv4 = nn.Conv2d(128, 256, kernel_size=5, stride=2)
self.bn4 = nn.BatchNorm2d(256)
self.fc1 = nn.Linear(12544, 1024)
self.fc2 = nn.Linear(1024, 128)
self.head = nn.Linear(128, NO_OF_ACTIONS)
def forward(self, x):
x = F.relu(self.bn1(self.conv1(x)))
x = F.relu(self.bn2(self.conv2(x)))
x = F.relu(self.bn3(self.conv3(x)))
x = F.relu(self.bn4(self.conv4(x)))
x = F.relu(self.fc1(x.view(x.size(0), -1)))
x = F.relu(self.fc2(x))
return self.head(x)
# Hyper parameters
BATCH_SIZE = 128
GAMMA = 0.999
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 200
model = DQN()
if use_cuda:
model.cuda()
optimizer = optim.RMSprop(model.parameters())
memory = ReplayMemory(10000)
steps_done = 0
def select_action(state):
global steps_done
sample = random.random()
eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * steps_done / EPS_DECAY)
steps_done += 1
if sample > eps_threshold:
action = model(Variable(state, volatile=True).type(FloatTensor)).data.max(1)[1].view(1, 1)
return action
else:
return LongTensor([[random.randrange(NO_OF_ACTIONS)]])
episode_durations = []
def plot_durations():
_X = np.array([EPISODE])
vis.line(
X= _X,
Y=np.array([EPISODE_SCORE]),
win= plot_win,
update='append'
)
last_sync = 0
def optimize_model():
global last_sync
if len(memory) < BATCH_SIZE:
return
transitions = memory.sample(BATCH_SIZE)
# Transpose the batch (see http://stackoverflow.com/a/19343/3343043 for
# detailed explanation).
batch = Transition(*zip(*transitions))
# Compute a mask of non-final states and concatenate the batch elements
non_final_mask = ByteTensor(tuple(map(lambda s: s is not None,
batch.next_state)))
# We don't want to backprop through the expected action values and volatile
# will save us on temporarily changing the model parameters'
# requires_grad to False!
non_final_next_states = Variable(th.cat([s for s in batch.next_state
if s is not None]),
volatile=True)
state_batch = Variable(th.cat(batch.state))
action_batch = Variable(th.cat(batch.action))
reward_batch = Variable(th.cat(batch.reward))
# Compute Q(s_t, a) - the model computes Q(s_t), then we select the
# columns of actions taken
state_action_values = model(state_batch).gather(1, action_batch)
# Compute V(s_{t+1}) for all next states.
next_state_values = Variable(th.zeros(BATCH_SIZE).type(Tensor))
next_state_values[non_final_mask] = model(non_final_next_states).max(1)[0]
# Now, we don't want to mess up the loss with a volatile flag, so let's
# clear it. After this, we'll just end up with a Variable that has
# requires_grad=False
next_state_values.volatile = False
# Compute the expected Q values
expected_state_action_values = (next_state_values * GAMMA) + reward_batch
# Compute Huber loss
loss = F.smooth_l1_loss(state_action_values, expected_state_action_values)
# Optimize the model
optimizer.zero_grad()
loss.backward()
for param in model.parameters():
param.grad.data.clamp_(-1, 1)
optimizer.step()
num_episodes = 100000
print ('Starting')
for i_episode in range(num_episodes):
EPISODE = i_episode
print (' Episode ', EPISODE)
# Initialize the environment and state
env.reset()
last_screen = get_screen()
current_screen = get_screen()
state = current_screen - last_screen
for t in count():
# Select and perform an action
action = select_action(state)
_, reward, done, _ = env.step(action[0, 0])
reward = Tensor([reward])
# Observe new state
last_screen = current_screen
current_screen = get_screen()
if not done:
next_state = current_screen - last_screen
else:
next_state = None
# Store the transition in memory
memory.push(state, action, next_state, reward)
# Move to the next state
state = next_state
# Perform one step of the optimization (on the target network)
optimize_model()
if done:
episode_durations.append(t + 1)
EPISODE_SCORE = t + 1
plot_durations()
break
print('Done')
env.render(close=True)
env.close()
# save the model
model.save_state_dict('~/breakout.pt')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment