Skip to content

Instantly share code, notes, and snippets.

@kkweon
Created June 8, 2017 22:18
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save kkweon/52ea1e118101eb574b2a83b933851379 to your computer and use it in GitHub Desktop.
Save kkweon/52ea1e118101eb574b2a83b933851379 to your computer and use it in GitHub Desktop.
PyTorch DQN implementation
"""
DQN in PyTorch
"""
import argparse
import torch
import torch.nn
import numpy as np
import random
import gym
from collections import namedtuple
from collections import deque
from typing import List, Tuple
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--gamma",
type=float,
default=0.99,
help="Discount rate for Q_target")
parser.add_argument("--env",
type=str,
default="CartPole-v0",
help="Gym environment name")
parser.add_argument("--n-episode",
type=int,
default=1000,
help="Number of epsidoes to run")
parser.add_argument("--batch-size",
type=int,
default=64,
help="Mini-batch size")
parser.add_argument("--hidden-dim",
type=int,
default=12,
help="Hidden dimension")
parser.add_argument("--capacity",
type=int,
default=50000,
help="Replay memory capacity")
parser.add_argument("--max-episode",
type=int,
default=50,
help="e-Greedy target episode (eps will be the lowest at this episode)")
parser.add_argument("--min-eps",
type=float,
default=0.01,
help="Min epsilon")
FLAGS = parser.parse_args()
class DQN(torch.nn.Module):
def __init__(self, input_dim: int, output_dim: int, hidden_dim: int) -> None:
"""DQN Network
Args:
input_dim (int): `state` dimension.
`state` is 2-D tensor of shape (n, input_dim)
output_dim (int): Number of actions.
Q_value is 2-D tensor of shape (n, output_dim)
hidden_dim (int): Hidden dimension in fc layer
"""
super(DQN, self).__init__()
self.layer1 = torch.nn.Sequential(
torch.nn.Linear(input_dim, hidden_dim),
torch.nn.BatchNorm1d(hidden_dim),
torch.nn.PReLU()
)
self.layer2 = torch.nn.Sequential(
torch.nn.Linear(hidden_dim, hidden_dim),
torch.nn.BatchNorm1d(hidden_dim),
torch.nn.PReLU()
)
self.final = torch.nn.Linear(hidden_dim, output_dim)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""Returns a Q_value
Args:
x (torch.Tensor): `State` 2-D tensor of shape (n, input_dim)
Returns:
torch.Tensor: Q_value, 2-D tensor of shape (n, output_dim)
"""
x = self.layer1(x)
x = self.layer2(x)
x = self.final(x)
return x
Transition = namedtuple("Transition",
field_names=["state", "action", "reward", "next_state", "done"])
class ReplayMemory(object):
def __init__(self, capacity: int) -> None:
"""Replay memory class
Args:
capacity (int): Max size of this memory
"""
self.capacity = capacity
self.cursor = 0
self.memory = []
def push(self,
state: np.ndarray,
action: int,
reward: int,
next_state: np.ndarray,
done: bool) -> None:
"""Creates `Transition` and insert
Args:
state (np.ndarray): 1-D tensor of shape (input_dim,)
action (int): action index (0 <= action < output_dim)
reward (int): reward value
next_state (np.ndarray): 1-D tensor of shape (input_dim,)
done (bool): whether this state was last step
"""
if len(self) < self.capacity:
self.memory.append(None)
self.memory[self.cursor] = Transition(state,
action, reward, next_state, done)
self.cursor = (self.cursor + 1) % self.capacity
def pop(self, batch_size: int) -> List[Transition]:
"""Returns a minibatch of `Transition` randomly
Args:
batch_size (int): Size of mini-bach
Returns:
List[Transition]: Minibatch of `Transition`
"""
return random.sample(self.memory, batch_size)
def __len__(self) -> int:
"""Returns the length """
return len(self.memory)
class Agent(object):
def __init__(self, input_dim: int, output_dim: int, hidden_dim: int) -> None:
"""Agent class that choose action and train
Args:
input_dim (int): input dimension
output_dim (int): output dimension
hidden_dim (int): hidden dimension
"""
self.dqn = DQN(input_dim, output_dim, hidden_dim)
self.input_dim = input_dim
self.output_dim = output_dim
self.loss_fn = torch.nn.MSELoss()
self.optim = torch.optim.Adam(self.dqn.parameters())
def _to_variable(self, x: np.ndarray) -> torch.Tensor:
"""torch.Variable syntax helper
Args:
x (np.ndarray): 2-D tensor of shape (n, input_dim)
Returns:
torch.Tensor: torch variable
"""
return torch.autograd.Variable(torch.Tensor(x))
def get_action(self, states: np.ndarray, eps: float) -> int:
"""Returns an action
Args:
states (np.ndarray): 2-D tensor of shape (n, input_dim)
eps (float): 𝜺-greedy for exploration
Returns:
int: action index
"""
if np.random.rand() < eps:
return np.random.choice(self.output_dim)
else:
self.dqn.train(mode=False)
scores = self.get_Q(states)
_, argmax = torch.max(scores.data, 1)
return int(argmax.numpy())
def get_Q(self, states: np.ndarray) -> torch.FloatTensor:
"""Returns `Q-value`
Args:
states (np.ndarray): 2-D Tensor of shape (n, input_dim)
Returns:
torch.FloatTensor: 2-D Tensor of shape (n, output_dim)
"""
states = self._to_variable(states.reshape(-1, self.input_dim))
self.dqn.train(mode=False)
return self.dqn(states)
def train(self, Q_pred: torch.FloatTensor, Q_true: torch.FloatTensor) -> float:
"""Computes `loss` and backpropagation
Args:
Q_pred (torch.FloatTensor): Predicted value by the network,
2-D Tensor of shape(n, output_dim)
Q_true (torch.FloatTensor): Target value obtained from the game,
2-D Tensor of shape(n, output_dim)
Returns:
float: loss value
"""
self.dqn.train(mode=True)
self.optim.zero_grad()
loss = self.loss_fn(Q_pred, Q_true)
loss.backward()
self.optim.step()
return loss
def train_helper(agent: Agent, minibatch: List[Transition], gamma: float) -> float:
"""Prepare minibatch and train them
Args:
agent (Agent): Agent has `train(Q_pred, Q_true)` method
minibatch (List[Transition]): Minibatch of `Transition`
gamma (float): Discount rate of Q_target
Returns:
float: Loss value
"""
states = np.vstack([x.state for x in minibatch])
actions = np.array([x.action for x in minibatch])
rewards = np.array([x.reward for x in minibatch])
next_states = np.vstack([x.next_state for x in minibatch])
done = np.array([x.done for x in minibatch])
Q_predict = agent.get_Q(states)
Q_target = Q_predict.clone().data.numpy()
Q_target[np.arange(len(Q_target)), actions] = rewards + gamma * np.max(agent.get_Q(next_states).data.numpy(), axis=1) * ~done
Q_target = agent._to_variable(Q_target)
return agent.train(Q_predict, Q_target)
def play_episode(env: gym.Env,
agent: Agent,
replay_memory: ReplayMemory,
eps: float,
batch_size: int) -> int:
"""Play an epsiode and train
Args:
env (gym.Env): gym environment (CartPole-v0)
agent (Agent): agent will train and get action
replay_memory (ReplayMemory): trajectory is saved here
eps (float): 𝜺-greedy for exploration
batch_size (int): batch size
Returns:
int: reward earned in this episode
"""
s = env.reset()
done = False
total_reward = 0
while not done:
a = agent.get_action(s, eps)
s2, r, done, info = env.step(a)
total_reward += r
if done:
r = -1
replay_memory.push(s, a, r, s2, done)
if len(replay_memory) > batch_size:
minibatch = replay_memory.pop(batch_size)
train_helper(agent, minibatch, FLAGS.gamma)
s = s2
return total_reward
def get_env_dim(env: gym.Env) -> Tuple[int, int]:
"""Returns input_dim & output_dim
Args:
env (gym.Env): gym Environment (CartPole-v0)
Returns:
int: input_dim
int: output_dim
"""
input_dim = env.observation_space.shape[0]
output_dim = env.action_space.n
return input_dim, output_dim
def epsilon_annealing(epsiode: int, max_episode: int, min_eps: float) -> float:
"""Returns 𝜺-greedy
1.0---|\
| \
| \
min_e +---+------->
|
max_episode
Args:
epsiode (int): Current episode (0<= episode)
max_episode (int): After max episode, 𝜺 will be `min_eps`
min_eps (float): 𝜺 will never go below this value
Returns:
float: 𝜺 value
"""
slope = (min_eps - 1.0) / max_episode
return max(slope * epsiode + 1.0, min_eps)
def main():
"""Main
"""
try:
env = gym.make(FLAGS.env)
env = gym.wrappers.Monitor(env, directory="monitors", force=True)
rewards = deque(maxlen=100)
input_dim, output_dim = get_env_dim(env)
agent = Agent(input_dim, output_dim, FLAGS.hidden_dim)
replay_memory = ReplayMemory(FLAGS.capacity)
for i in range(FLAGS.n_episode):
eps = epsilon_annealing(i, FLAGS.max_episode, FLAGS.min_eps)
r = play_episode(env, agent, replay_memory, eps, FLAGS.batch_size)
print("[Episode: {:5}] Reward: {:5} 𝜺-greedy: {:5.2f}".format(i + 1, r, eps))
rewards.append(r)
if len(rewards) == rewards.maxlen:
if np.mean(rewards) >= 200:
print("Game cleared in {} games with {}".format(i + 1, np.mean(rewards)))
break
finally:
env.close()
if __name__ == '__main__':
main()
@jtoy
Copy link

jtoy commented Oct 8, 2017

nice example, thanks!

@maximecb
Copy link

Hi @kkweon. I'm working on an OpenAI gym environment and would like to include an example agent implementation in my repo. Would it be OK if I modify and redistribute this code? If so, under what license?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment