Last active
August 9, 2017 05:43
-
-
Save ByungSunBae/70e17a756015cf65db345aac6a1bb0e0 to your computer and use it in GitHub Desktop.
Deep Reinforcement Learning : Deep Q Network(DQN) and Variants (Double DQN, Dueling DQN)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Thanks for 주찬웅 | |
# References: | |
## 1) https://github.com/jcwleo/Reinforcement_Learning/blob/master/Breakout/ | |
## 2) http://pytorch.org/tutorials/ | |
## 3) https://github.com/transedward/pytorch-dqn | |
## My codes is very very dirty... | |
## I want to your idea and advice that improves this codes. | |
import argparse | |
import torch | |
import torch.nn as nn | |
import torch.optim as optim | |
import torch.nn.functional as F | |
from torch.autograd import Variable | |
import torchvision.transforms as T | |
import numpy as np | |
import random | |
import sys, os | |
import gym | |
import pandas as pd | |
from collections import deque | |
from collections import namedtuple | |
from skimage.color import rgb2gray | |
from skimage.transform import resize | |
parser = argparse.ArgumentParser(description= | |
"This is Deep Reinforcement Learning in Breakout-v0. We have algorithms which are DQN, DoubleDQN and DuelingDQN") | |
parser.add_argument("-IsDuelingDQN", action="store", type=bool, | |
default=False, dest="IsDuelingDQN", | |
help="Whether using DuelingDQN with Average Method") | |
parser.add_argument("-IsDoubleDQN", action="store", type=bool, | |
default=False, dest="IsDoubleDQN", | |
help="Whether using DoubleDQN") | |
parser.add_argument("-GameName", action="store", type=str, | |
default="Breakout-v0", dest="GameName", | |
help="If you want to execute Berzerk-v0, just write -GameName=Berzerk-v0") | |
results = parser.parse_args() | |
IsDoubleDQN = results.IsDoubleDQN | |
IsDuelingDQN = results.IsDuelingDQN | |
GameName = results.GameName | |
env = gym.make(GameName) | |
use_cuda = torch.cuda.is_available() | |
FloatTensor = torch.cuda.FloatTensor if use_cuda else torch.FloatTensor | |
LongTensor = torch.cuda.LongTensor if use_cuda else torch.LongTensor | |
ByteTensor = torch.cuda.ByteTensor if use_cuda else torch.ByteTensor | |
Tensor = FloatTensor | |
if use_cuda: | |
print("Use First GPU") | |
torch.cuda.set_device(0) | |
else: | |
print("Use CPU") | |
batch_size = 32 | |
gamma = 0.99 | |
eps_start = 1. | |
eps_end = 0.1 | |
eps_decay = 1000000 | |
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward', 'done')) | |
class Model(nn.Module): | |
def __init__(self, name): | |
super(Model, self).__init__() | |
self.name = name | |
self.conv1 = nn.Conv2d(4, 32, kernel_size=8, stride=4, bias=False) | |
self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2, bias=False) | |
self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1, bias=False) | |
self.head1 = nn.Linear(7 * 7 * 64, 512, bias=False) | |
if self.name is "DQN": | |
self.head2 = nn.Linear(512, env.action_space.n, bias=False) | |
elif self.name is "DuelingDQN": | |
self.head2_1 = nn.Linear(512, 1, bias=False) # V^(s) | |
self.head2_2 = nn.Linear(512, env.action_space.n, bias=False) # A^(s) | |
def forward(self, x, method="avg"): | |
x = F.relu(self.conv1(x)) | |
x = F.relu(self.conv2(x)) | |
x = F.relu(self.conv3(x)) | |
x = F.relu(self.head1(x.view(x.size(0), -1))) | |
if self.name is "DQN": | |
return self.head2(x) | |
elif self.name is "DuelingDQN": | |
self.V = self.head2_1(x) | |
self.A = self.head2_2(x) | |
self.V = self.V.expand_as(self.A) | |
self.MeanA = torch.mean(self.head2_2(x), 1).expand_as(self.A) | |
self.MaxA = torch.max(self.head2_2(x), 1)[0].expand_as(self.A) | |
if method is "max": | |
return self.V + self.A - self.MaxA | |
elif method is "avg": | |
return self.V + self.A - self.MeanA | |
elif method is None: | |
return self.V + self.A | |
if IsDuelingDQN: | |
M_name = "DuelingDQN" | |
MainNet = Model(name = M_name).cuda() if use_cuda else Model(name = M_name) | |
TargetNet = Model(name = M_name).cuda() if use_cuda else Model(name = M_name) | |
else: | |
M_name = "DQN" | |
MainNet = Model(name = M_name).cuda() if use_cuda else Model(name = M_name) | |
TargetNet = Model(name = M_name).cuda() if use_cuda else Model(name = M_name) | |
if IsDoubleDQN: | |
M_name = M_name + "_Double" | |
optimizer = optim.RMSprop(MainNet.parameters(), lr = 0.00025, eps = 1e-2, momentum = 0.95) | |
class ReplayMemory(): | |
def __init__(self, max_len): | |
self.max_len = max_len | |
self.memory = deque(maxlen = self.max_len) | |
def PutExperience(self, *args): | |
"""Save state transition. | |
*args : state, action, next_state, reward, done | |
""" | |
self.memory.append(Transition(*args)) | |
def Sample(self, batch_size): | |
self.batch_size = batch_size | |
samples = random.sample(self.memory, self.batch_size) | |
return samples | |
def __len__(self): | |
return len(self.memory) | |
memory = ReplayMemory(110000) | |
##memory = ReplayMemory(5000) | |
## => for mini test | |
steps_done = 0 | |
def SelectAction(state, eps_start=1.0, eps_end=0.1, eps_decay=1000000): | |
global steps_done, frame | |
sample = random.random() | |
#steps_done += 1 | |
if frame >= 50000: | |
steps_done += 1 | |
eps_threshold = eps_start + min(float(steps_done)/eps_decay, 1.0)*(eps_end - eps_start) | |
else: | |
eps_threshold = eps_start | |
if sample > eps_threshold: | |
act_t = MainNet(Variable(state, volatile=True).type(FloatTensor)).data.max(1)[1] | |
act_t = act_t[0,0] | |
return act_t, eps_threshold | |
else: | |
act_t = random.randrange(env.action_space.n) | |
return act_t, eps_threshold | |
def get_screen(screen): | |
screen = rgb2gray(screen) | |
screen = np.ascontiguousarray(screen, dtype=np.float32) | |
screen = resize(screen, (84, 84), mode = 'reflect') | |
return np.expand_dims(screen, axis=0) | |
folder_path = "./Atari" | |
gamename_path = str(GameName) | |
mid_path = str(folder_path) + "/" + gamename_path | |
save_path = mid_path + "/" +str(M_name) | |
if os.path.exists(folder_path) is not True: | |
os.mkdir(folder_path) | |
if os.path.exists(mid_path) is not True: | |
os.mkdir(mid_path) | |
def OptimizeModel(): | |
global frame, C, save_C, average_dq | |
if len(memory.memory) < batch_size: | |
return | |
transitions = memory.Sample(batch_size) | |
batch = Transition(*zip(*transitions)) | |
non_final_mask = Variable(torch.from_numpy(np.vstack(np.expand_dims(batch.done,axis=0)))).type(Tensor) | |
next_state_batch = Variable(torch.cat(torch.from_numpy(np.concatenate(np.expand_dims(batch.next_state, axis=0))).unsqueeze(0)), volatile=True).type(FloatTensor) | |
state_batch = Variable(torch.cat(torch.from_numpy(np.concatenate(np.expand_dims(batch.state, axis=0))).unsqueeze(0))).type(FloatTensor) | |
action_batch = Variable(torch.from_numpy(np.vstack(batch.action))).type(LongTensor) | |
reward_batch = Variable(torch.from_numpy(np.vstack(np.expand_dims(batch.reward, axis=0)))).type(Tensor) | |
state_action_values = MainNet(state_batch).gather(1, action_batch) | |
ArgMaxMain = MainNet(next_state_batch).max(1)[1].cpu().data.numpy() | |
if IsDoubleDQN: | |
next_state_values = TargetNet(next_state_batch)[:, ArgMaxMain[0][0]].type(FloatTensor) | |
else: | |
next_state_values = TargetNet(next_state_batch).max(1)[0].type(FloatTensor) | |
next_state_values.volatile = False | |
expected_state_action_values = (non_final_mask * next_state_values * gamma) + reward_batch | |
average_dq.append(np.sum(expected_state_action_values.cpu().data.numpy())) | |
loss = F.smooth_l1_loss(state_action_values, expected_state_action_values).cuda() if use_cuda else F.smooth_l1_loss(state_action_values, expected_state_action_values) | |
optimizer.zero_grad() | |
loss.backward() | |
for param in MainNet.parameters(): | |
param.grad.data.clamp(-1, 1) | |
param.requires_grad = True | |
optimizer.step() | |
if frame % C == 0: | |
TargetNet.load_state_dict(MainNet.state_dict()) | |
print("Target Network Update !!") | |
ep_reward = 0 | |
recent_100_reward = deque(maxlen=100) | |
frame = 0 | |
C = 10000 | |
save_C = 1000 | |
LogData = [] | |
average_dq = deque() | |
episode = 0 | |
num_episodes = 50000 | |
for i_episode in range(num_episodes): | |
ep_reward = 0 | |
episode += 1 | |
state_dq = deque(maxlen=4) | |
life_dq = deque(maxlen=2) | |
for i in range(3): | |
state_dq.append(np.zeros(shape=[1, 84, 84])) | |
curr_frame = get_screen(env.reset()) | |
state_dq.append(curr_frame) | |
done = False | |
while done is False: | |
frame += 1 | |
curr_state = np.vstack(state_dq) | |
action, EpsilonProb = SelectAction(torch.from_numpy(curr_state).unsqueeze(0)) | |
if action == 0: | |
real_action = 1 | |
else: | |
real_action = action | |
next_frame, reward, done, info = env.step(real_action) | |
reward_t = reward | |
if int(info['ale.lives']) is not 0: | |
life_dq.append(info['ale.lives']) | |
if done is False: | |
if len(life_dq) == 2: | |
if life_dq[0] > life_dq[1]: | |
done_t = 0 | |
reward_t = -1 | |
else: | |
done_t = 1 | |
reward_t = reward | |
else: | |
done_t = 1 - int(done) | |
reward_t = reward | |
else: | |
done_t = 1 - int(done) | |
reward_t = -1 | |
else: | |
if done is False: | |
done_t = 1 | |
reward_t = reward | |
else: | |
done_t = 0 | |
reward_t = -1 | |
next_frame = get_screen(next_frame) | |
state_dq.append(next_frame) | |
next_state = np.vstack(state_dq) | |
ep_reward += reward | |
reward_T = np.clip(reward_t, -1.0, 1.0) | |
done_T = int(done_t) | |
if int(np.sum(curr_state[0])) != 0: | |
memory.PutExperience(curr_state, action, next_state, reward_T, done_T) | |
if frame >= 50000: | |
OptimizeModel() | |
if episode % save_C == 0: | |
torch.save(MainNet.state_dict(), save_path + "_" + str(episode)) | |
print("Save Model !! : {}".format(episode)) | |
recent_100_reward.append(ep_reward) | |
if episode >= 10: | |
print("Episode %1d Done, Frames : %1d, E-Greedy : %.5f, Scores : %.1f, Mean100Ep_Scores : %5f, AvgMaxQ : %.5f" % (episode, | |
frame, EpsilonProb, ep_reward, np.mean(recent_100_reward), np.mean(average_dq))) | |
LogData.append((episode, frame, EpsilonProb, ep_reward, np.mean(recent_100_reward), np.mean(average_dq))) | |
LogDataDF = pd.DataFrame(LogData) | |
LogDataDF.columns = ['Episode', 'Frames', 'EpsilonProb', 'Scores_per_ep','Mean100Ep_Scores', 'AvgMaxQ'] | |
LogDataDF.to_csv(save_path + "LogData.csv", index=False, header=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment