Skip to content

Instantly share code, notes, and snippets.

@Pocuston
Created September 3, 2017 15:51
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save Pocuston/13f1a7786648e1e2ff95bfad02a51521 to your computer and use it in GitHub Desktop.
Save Pocuston/13f1a7786648e1e2ff95bfad02a51521 to your computer and use it in GitHub Desktop.
Cartpole-v0 using Pytorch and DQN
# Solution of Open AI gym environment "Cartpole-v0" (https://gym.openai.com/envs/CartPole-v0) using DQN and Pytorch.
# It is is slightly modified version of Pytorch DQN tutorial from
# http://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html.
# The main difference is that it does not take rendered screen as input but it simply uses observation values from the \
# environment.
import gym
from gym import wrappers
import random
import math
import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
import torch.nn.functional as F
import matplotlib.pyplot as plt
# hyper parameters
EPISODES = 200 # number of episodes
EPS_START = 0.9 # e-greedy threshold start value
EPS_END = 0.05 # e-greedy threshold end value
EPS_DECAY = 200 # e-greedy threshold decay
GAMMA = 0.8 # Q-learning discount factor
LR = 0.001 # NN optimizer learning rate
HIDDEN_LAYER = 256 # NN hidden layer size
BATCH_SIZE = 64 # Q-learning batch size
# if gpu is to be used
use_cuda = torch.cuda.is_available()
FloatTensor = torch.cuda.FloatTensor if use_cuda else torch.FloatTensor
LongTensor = torch.cuda.LongTensor if use_cuda else torch.LongTensor
ByteTensor = torch.cuda.ByteTensor if use_cuda else torch.ByteTensor
Tensor = FloatTensor
class ReplayMemory:
def __init__(self, capacity):
self.capacity = capacity
self.memory = []
def push(self, transition):
self.memory.append(transition)
if len(self.memory) > self.capacity:
del self.memory[0]
def sample(self, batch_size):
return random.sample(self.memory, batch_size)
def __len__(self):
return len(self.memory)
class Network(nn.Module):
def __init__(self):
nn.Module.__init__(self)
self.l1 = nn.Linear(4, HIDDEN_LAYER)
self.l2 = nn.Linear(HIDDEN_LAYER, 2)
def forward(self, x):
x = F.relu(self.l1(x))
x = self.l2(x)
return x
env = gym.make('CartPole-v0')
env = wrappers.Monitor(env, './tmp/cartpole-v0-1')
model = Network()
if use_cuda:
model.cuda()
memory = ReplayMemory(10000)
optimizer = optim.Adam(model.parameters(), LR)
steps_done = 0
episode_durations = []
def select_action(state):
global steps_done
sample = random.random()
eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * steps_done / EPS_DECAY)
steps_done += 1
if sample > eps_threshold:
return model(Variable(state, volatile=True).type(FloatTensor)).data.max(1)[1].view(1, 1)
else:
return LongTensor([[random.randrange(2)]])
def run_episode(e, environment):
state = environment.reset()
steps = 0
while True:
environment.render()
action = select_action(FloatTensor([state]))
next_state, reward, done, _ = environment.step(action[0, 0])
# negative reward when attempt ends
if done:
reward = -1
memory.push((FloatTensor([state]),
action, # action is already a tensor
FloatTensor([next_state]),
FloatTensor([reward])))
learn()
state = next_state
steps += 1
if done:
print("{2} Episode {0} finished after {1} steps"
.format(e, steps, '\033[92m' if steps >= 195 else '\033[99m'))
episode_durations.append(steps)
plot_durations()
break
def learn():
if len(memory) < BATCH_SIZE:
return
# random transition batch is taken from experience replay memory
transitions = memory.sample(BATCH_SIZE)
batch_state, batch_action, batch_next_state, batch_reward = zip(*transitions)
batch_state = Variable(torch.cat(batch_state))
batch_action = Variable(torch.cat(batch_action))
batch_reward = Variable(torch.cat(batch_reward))
batch_next_state = Variable(torch.cat(batch_next_state))
# current Q values are estimated by NN for all actions
current_q_values = model(batch_state).gather(1, batch_action)
# expected Q values are estimated from actions which gives maximum Q value
max_next_q_values = model(batch_next_state).detach().max(1)[0]
expected_q_values = batch_reward + (GAMMA * max_next_q_values)
# loss is measured from error between current and newly expected Q values
loss = F.smooth_l1_loss(current_q_values, expected_q_values)
# backpropagation of loss to NN
optimizer.zero_grad()
loss.backward()
optimizer.step()
def plot_durations():
plt.figure(2)
plt.clf()
durations_t = torch.FloatTensor(episode_durations)
plt.title('Training...')
plt.xlabel('Episode')
plt.ylabel('Duration')
plt.plot(durations_t.numpy())
# take 100 episode averages and plot them too
if len(durations_t) >= 100:
means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
means = torch.cat((torch.zeros(99), means))
plt.plot(means.numpy())
plt.pause(0.001) # pause a bit so that plots are updated
for e in range(EPISODES):
run_episode(e, env)
print('Complete')
env.render(close=True)
env.close()
plt.ioff()
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment