Skip to content

Instantly share code, notes, and snippets.

@ByungSunBae
Last active August 9, 2017 05:43
Show Gist options
  • Save ByungSunBae/70e17a756015cf65db345aac6a1bb0e0 to your computer and use it in GitHub Desktop.
Save ByungSunBae/70e17a756015cf65db345aac6a1bb0e0 to your computer and use it in GitHub Desktop.
Deep Reinforcement Learning : Deep Q Network(DQN) and Variants (Double DQN, Dueling DQN)
# Thanks for 주찬웅
# References:
## 1) https://github.com/jcwleo/Reinforcement_Learning/blob/master/Breakout/
## 2) http://pytorch.org/tutorials/
## 3) https://github.com/transedward/pytorch-dqn
## My codes is very very dirty...
## I want to your idea and advice that improves this codes.
import argparse
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Variable
import torchvision.transforms as T
import numpy as np
import random
import sys, os
import gym
import pandas as pd
from collections import deque
from collections import namedtuple
from skimage.color import rgb2gray
from skimage.transform import resize
parser = argparse.ArgumentParser(description=
"This is Deep Reinforcement Learning in Breakout-v0. We have algorithms which are DQN, DoubleDQN and DuelingDQN")
parser.add_argument("-IsDuelingDQN", action="store", type=bool,
default=False, dest="IsDuelingDQN",
help="Whether using DuelingDQN with Average Method")
parser.add_argument("-IsDoubleDQN", action="store", type=bool,
default=False, dest="IsDoubleDQN",
help="Whether using DoubleDQN")
parser.add_argument("-GameName", action="store", type=str,
default="Breakout-v0", dest="GameName",
help="If you want to execute Berzerk-v0, just write -GameName=Berzerk-v0")
results = parser.parse_args()
IsDoubleDQN = results.IsDoubleDQN
IsDuelingDQN = results.IsDuelingDQN
GameName = results.GameName
env = gym.make(GameName)
use_cuda = torch.cuda.is_available()
FloatTensor = torch.cuda.FloatTensor if use_cuda else torch.FloatTensor
LongTensor = torch.cuda.LongTensor if use_cuda else torch.LongTensor
ByteTensor = torch.cuda.ByteTensor if use_cuda else torch.ByteTensor
Tensor = FloatTensor
if use_cuda:
print("Use First GPU")
torch.cuda.set_device(0)
else:
print("Use CPU")
batch_size = 32
gamma = 0.99
eps_start = 1.
eps_end = 0.1
eps_decay = 1000000
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward', 'done'))
class Model(nn.Module):
def __init__(self, name):
super(Model, self).__init__()
self.name = name
self.conv1 = nn.Conv2d(4, 32, kernel_size=8, stride=4, bias=False)
self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2, bias=False)
self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1, bias=False)
self.head1 = nn.Linear(7 * 7 * 64, 512, bias=False)
if self.name is "DQN":
self.head2 = nn.Linear(512, env.action_space.n, bias=False)
elif self.name is "DuelingDQN":
self.head2_1 = nn.Linear(512, 1, bias=False) # V^(s)
self.head2_2 = nn.Linear(512, env.action_space.n, bias=False) # A^(s)
def forward(self, x, method="avg"):
x = F.relu(self.conv1(x))
x = F.relu(self.conv2(x))
x = F.relu(self.conv3(x))
x = F.relu(self.head1(x.view(x.size(0), -1)))
if self.name is "DQN":
return self.head2(x)
elif self.name is "DuelingDQN":
self.V = self.head2_1(x)
self.A = self.head2_2(x)
self.V = self.V.expand_as(self.A)
self.MeanA = torch.mean(self.head2_2(x), 1).expand_as(self.A)
self.MaxA = torch.max(self.head2_2(x), 1)[0].expand_as(self.A)
if method is "max":
return self.V + self.A - self.MaxA
elif method is "avg":
return self.V + self.A - self.MeanA
elif method is None:
return self.V + self.A
if IsDuelingDQN:
M_name = "DuelingDQN"
MainNet = Model(name = M_name).cuda() if use_cuda else Model(name = M_name)
TargetNet = Model(name = M_name).cuda() if use_cuda else Model(name = M_name)
else:
M_name = "DQN"
MainNet = Model(name = M_name).cuda() if use_cuda else Model(name = M_name)
TargetNet = Model(name = M_name).cuda() if use_cuda else Model(name = M_name)
if IsDoubleDQN:
M_name = M_name + "_Double"
optimizer = optim.RMSprop(MainNet.parameters(), lr = 0.00025, eps = 1e-2, momentum = 0.95)
class ReplayMemory():
def __init__(self, max_len):
self.max_len = max_len
self.memory = deque(maxlen = self.max_len)
def PutExperience(self, *args):
"""Save state transition.
*args : state, action, next_state, reward, done
"""
self.memory.append(Transition(*args))
def Sample(self, batch_size):
self.batch_size = batch_size
samples = random.sample(self.memory, self.batch_size)
return samples
def __len__(self):
return len(self.memory)
memory = ReplayMemory(110000)
##memory = ReplayMemory(5000)
## => for mini test
steps_done = 0
def SelectAction(state, eps_start=1.0, eps_end=0.1, eps_decay=1000000):
global steps_done, frame
sample = random.random()
#steps_done += 1
if frame >= 50000:
steps_done += 1
eps_threshold = eps_start + min(float(steps_done)/eps_decay, 1.0)*(eps_end - eps_start)
else:
eps_threshold = eps_start
if sample > eps_threshold:
act_t = MainNet(Variable(state, volatile=True).type(FloatTensor)).data.max(1)[1]
act_t = act_t[0,0]
return act_t, eps_threshold
else:
act_t = random.randrange(env.action_space.n)
return act_t, eps_threshold
def get_screen(screen):
screen = rgb2gray(screen)
screen = np.ascontiguousarray(screen, dtype=np.float32)
screen = resize(screen, (84, 84), mode = 'reflect')
return np.expand_dims(screen, axis=0)
folder_path = "./Atari"
gamename_path = str(GameName)
mid_path = str(folder_path) + "/" + gamename_path
save_path = mid_path + "/" +str(M_name)
if os.path.exists(folder_path) is not True:
os.mkdir(folder_path)
if os.path.exists(mid_path) is not True:
os.mkdir(mid_path)
def OptimizeModel():
global frame, C, save_C, average_dq
if len(memory.memory) < batch_size:
return
transitions = memory.Sample(batch_size)
batch = Transition(*zip(*transitions))
non_final_mask = Variable(torch.from_numpy(np.vstack(np.expand_dims(batch.done,axis=0)))).type(Tensor)
next_state_batch = Variable(torch.cat(torch.from_numpy(np.concatenate(np.expand_dims(batch.next_state, axis=0))).unsqueeze(0)), volatile=True).type(FloatTensor)
state_batch = Variable(torch.cat(torch.from_numpy(np.concatenate(np.expand_dims(batch.state, axis=0))).unsqueeze(0))).type(FloatTensor)
action_batch = Variable(torch.from_numpy(np.vstack(batch.action))).type(LongTensor)
reward_batch = Variable(torch.from_numpy(np.vstack(np.expand_dims(batch.reward, axis=0)))).type(Tensor)
state_action_values = MainNet(state_batch).gather(1, action_batch)
ArgMaxMain = MainNet(next_state_batch).max(1)[1].cpu().data.numpy()
if IsDoubleDQN:
next_state_values = TargetNet(next_state_batch)[:, ArgMaxMain[0][0]].type(FloatTensor)
else:
next_state_values = TargetNet(next_state_batch).max(1)[0].type(FloatTensor)
next_state_values.volatile = False
expected_state_action_values = (non_final_mask * next_state_values * gamma) + reward_batch
average_dq.append(np.sum(expected_state_action_values.cpu().data.numpy()))
loss = F.smooth_l1_loss(state_action_values, expected_state_action_values).cuda() if use_cuda else F.smooth_l1_loss(state_action_values, expected_state_action_values)
optimizer.zero_grad()
loss.backward()
for param in MainNet.parameters():
param.grad.data.clamp(-1, 1)
param.requires_grad = True
optimizer.step()
if frame % C == 0:
TargetNet.load_state_dict(MainNet.state_dict())
print("Target Network Update !!")
ep_reward = 0
recent_100_reward = deque(maxlen=100)
frame = 0
C = 10000
save_C = 1000
LogData = []
average_dq = deque()
episode = 0
num_episodes = 50000
for i_episode in range(num_episodes):
ep_reward = 0
episode += 1
state_dq = deque(maxlen=4)
life_dq = deque(maxlen=2)
for i in range(3):
state_dq.append(np.zeros(shape=[1, 84, 84]))
curr_frame = get_screen(env.reset())
state_dq.append(curr_frame)
done = False
while done is False:
frame += 1
curr_state = np.vstack(state_dq)
action, EpsilonProb = SelectAction(torch.from_numpy(curr_state).unsqueeze(0))
if action == 0:
real_action = 1
else:
real_action = action
next_frame, reward, done, info = env.step(real_action)
reward_t = reward
if int(info['ale.lives']) is not 0:
life_dq.append(info['ale.lives'])
if done is False:
if len(life_dq) == 2:
if life_dq[0] > life_dq[1]:
done_t = 0
reward_t = -1
else:
done_t = 1
reward_t = reward
else:
done_t = 1 - int(done)
reward_t = reward
else:
done_t = 1 - int(done)
reward_t = -1
else:
if done is False:
done_t = 1
reward_t = reward
else:
done_t = 0
reward_t = -1
next_frame = get_screen(next_frame)
state_dq.append(next_frame)
next_state = np.vstack(state_dq)
ep_reward += reward
reward_T = np.clip(reward_t, -1.0, 1.0)
done_T = int(done_t)
if int(np.sum(curr_state[0])) != 0:
memory.PutExperience(curr_state, action, next_state, reward_T, done_T)
if frame >= 50000:
OptimizeModel()
if episode % save_C == 0:
torch.save(MainNet.state_dict(), save_path + "_" + str(episode))
print("Save Model !! : {}".format(episode))
recent_100_reward.append(ep_reward)
if episode >= 10:
print("Episode %1d Done, Frames : %1d, E-Greedy : %.5f, Scores : %.1f, Mean100Ep_Scores : %5f, AvgMaxQ : %.5f" % (episode,
frame, EpsilonProb, ep_reward, np.mean(recent_100_reward), np.mean(average_dq)))
LogData.append((episode, frame, EpsilonProb, ep_reward, np.mean(recent_100_reward), np.mean(average_dq)))
LogDataDF = pd.DataFrame(LogData)
LogDataDF.columns = ['Episode', 'Frames', 'EpsilonProb', 'Scores_per_ep','Mean100Ep_Scores', 'AvgMaxQ']
LogDataDF.to_csv(save_path + "LogData.csv", index=False, header=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment