Skip to content

Instantly share code, notes, and snippets.

@DavidSanwald
Last active December 14, 2016 19:31
Show Gist options
  • Save DavidSanwald/c02eb10aed03bbe376dc90d079aedf6f to your computer and use it in GitHub Desktop.
Save DavidSanwald/c02eb10aed03bbe376dc90d079aedf6f to your computer and use it in GitHub Desktop.
Condensed one file gist for more convenient evaluation at https://gym.openai.com/evaluations/eval_GFtDBmuyRjCzcAkBibwYWQ
"""
Example implementation of Double DQN to provide an understandable, clear
implementation of the underlying algorithm using OpenAI gym as benchmark.
Further explanations:
https://davidsanwald.github.io/2016/12/11/Double-DQN-interfacing-OpenAi-Gym.html
If you need any help or have any questions, just drop me a note (:
The code is ased on the work of van Hasselt et al. esp. the Double DQN paper:
https://arxiv.org/abs/1509.06461
Keras has been used to provide highl level implementation of the Q-networks,
to reduce distraction from the main Double DQN principles.
The used environment doesn't benefit much from the Double DQN extension,
because of the low number of available actions and the deterministic dynamics.
With tuned parameters and an extra convolution layer the code also runs on PONG
using raw pixel input.
The environment has been chosen because the agent can be trained
even on a single CPU. The code as been condenced to a single file Gist,
to maximice convenience of reproducibility on:
https://gym.openai.com/evaluations/eval_GFtDBmuyRjCzcAkBibwYWQ
Author:
David Sanwald
why.ever.not.berlin@gmail.com
https://davidsanwald.github.io
https://twitter.com/
run with
python ddqn.py
"""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
from collections import deque
from random import sample
import numpy as np
import gym
import keras
from keras.layers.core import Dense
from keras.models import Sequential
from keras.optimizers import SGD
EPSILON_MIN = 0.1
EPSILON_MAX = 0.8
EPSILON_DECAY = 0.00075
MEMORY_CAPACITY = 500000
TARGET_UPDATE = 300
SIZE_HIDDEN = 16
BATCH_SIZE = 32
GAMMA = 0.99
LEARNING_RATE = 0.0075
MAX_STEPS = 2000
ACTIVATION = 'tanh'
LEARNING_START = 100
N_EPISODES = 20000
MONITOR_DIR = '/tmp/cartpole4'
class Experiment:
def __init__(self, environment):
self.env = gym.make(environment)
self.episode_count = 0
self.reward_buffer = deque([], maxlen=100)
def run_experiment(self, agent):
self.env.monitor.start(MONITOR_DIR)
for n in range(N_EPISODES):
self.run_episode(agent)
self.env.monitor.close()
pass
def run_episode(self, agent):
self.reward = 0
s = self.env.reset()
done = False
while not done:
self.env.render()
a = agent.act(s)
s_, r, done, _ = self.env.step(a)
agent.learn((s, a, s_, r, done))
self.reward += r
s = s_
self.episode_count += 1
self.reward_buffer.append(self.reward)
average = sum(self.reward_buffer) / len(self.reward_buffer)
print("Episode Nr. {} \nScore: {} \nAverage: {}".format(
self.episode_count, self.reward, average))
class DQNAgent:
def __init__(self, environment):
self.env = environment
self.memory = ReplayMemory(MEMORY_CAPACITY)
self.dim_actions = self.env.action_space.n
self.dim_states = self.env.observation_space.shape
self.NN = NN(self.env.observation_space.shape, self.env.action_space.n,
BATCH_SIZE, SIZE_HIDDEN, LEARNING_RATE, ACTIVATION)
self.observers = []
self.episode_count = 0
self.step_count_total = 1
self.step_count_episode = 1
self.epsilon_min = EPSILON_MIN
self.epsilon_max = EPSILON_MAX
self.epsilon_decay = EPSILON_DECAY
self.target_update = TARGET_UPDATE
self.max_steps = MAX_STEPS
self.n_episodes = N_EPISODES
self.epsilon = EPSILON_MAX
self.batch_size = BATCH_SIZE
self.usetarget = False
self.gamma = GAMMA
self.loss = 0
self.done = False
self.reward = 0
self.reward_episode = 0
self.learning_switch = False
self.learning_start = LEARNING_START
def notify(self, event):
for observer in self.observers:
observer(event)
pass
def act(self, state):
self.step_count_total += 1
action = self.choose_action(state)
return action
def learn(self, obs):
self.memory.store(obs)
if self.learning_switch:
self.backup()
self.notify('step_done')
pass
def backup(self):
self.flashback()
if self.step_count_total % self.target_update == 0:
print('update')
print(self.epsilon)
self.NN.update_target()
self.usetarget = True
pass
def flashback(self):
X, y = self._make_batch()
self.loss = self.NN.train(X, y)
if np.isnan(self.loss.history['loss']).any():
print('Warning, loss is {}'.format(self.loss))
pass
def choose_action(self, state):
if np.random.rand() <= self.epsilon:
choice = self.random_choice()
else:
choice = self.greedy_choice(state)
return choice
def greedy_choice(self, state):
greedy_choice = self.NN.best_action(state, usetarget=False)
return greedy_choice
def random_choice(self):
random_choice = np.random.randint(0, self.dim_actions)
return random_choice
def _make_batch(self):
X = []
y = []
batch = self.memory.get_batch(self.batch_size)
for state, action, newstate, reward, done in batch:
X.append(state)
target = self.NN.predict(state, False)
q_vals_new_t = self.NN.predict(newstate, self.usetarget)
a_select = self.NN.best_action(newstate, False)
if done:
target[action] = reward
else:
target[action] = reward + self.gamma * q_vals_new_t[a_select]
y.append(target)
return X, y
def add_observer(self, observer):
self.observers.append(observer)
pass
class NN:
def __init__(self, n_states, n_actions, batch_size, size_hidden,
learning_rate, activation):
self.learning_rate = learning_rate
self.act = activation
self.n_states = n_states
self.n_actions = n_actions
self.model = self._make_model(n_states, n_actions, size_hidden)
self.model_t = self._make_model(n_states, n_actions, size_hidden)
self.batch_size = batch_size
def _make_model(self, n_states, n_actions, size_hidden):
model = Sequential()
model.add(Dense(size_hidden, input_dim=4, activation=self.act))
model.add(Dense(size_hidden, activation=self.act))
model.add(Dense(n_actions, activation='linear'))
opt = SGD(lr=self.learning_rate, momentum=0.5, decay=1e-6, clipnorm=2)
model.compile(loss='mean_squared_error', optimizer=opt)
return model
def train(self, X, y):
X = prep_batch(X)
y = prep_batch(y)
loss = self.model.fit(X,
y,
batch_size=self.batch_size,
nb_epoch=1,
verbose=0,
shuffle=True)
return loss
def predict(self, state, usetarget=False):
state = prep_input(state, self.n_states[0])
if usetarget:
q_vals = self.model_t.predict(state)
else:
q_vals = self.model.predict(state)
return q_vals[0]
def update_target(self):
weights = self.model.get_weights()
self.model_t.set_weights(weights)
self.save('weights.h5')
pass
def best_action(self, state, usetarget=False):
state = prep_input(state, self.n_states[0])
q_vals = self.predict(state, usetarget)
best_action = np.argmax(q_vals)
return best_action
def save(self, fname):
self.model.save_weights(fname, overwrite=True)
pass
def load(self, fname):
self.model.load_weights(fname)
self.update()
pass
class ReplayMemory:
def __init__(self, capacity):
self.samples = deque([], maxlen=capacity)
def store(self, exp):
self.samples.append(exp)
pass
def get_batch(self, n):
n_samples = min(n, len(self.samples))
samples = sample(self.samples, n_samples)
return samples
class EpsilonUpdater:
def __init__(self, agent):
self.agent = agent
def __call__(self, event):
if event == 'step_done':
self.epsilon_update()
self.switch_learning()
else:
pass
def epsilon_update(self):
self.agent.epsilon = (
self.agent.epsilon_min +
(self.agent.epsilon_max - self.agent.epsilon_min) * np.exp(
-self.agent.epsilon_decay * self.agent.step_count_total))
pass
def switch_learning(self):
if self.agent.step_count_total >= self.agent.learning_start:
self.agent.learning_switch = True
pass
def prep_input(data, n_dimension):
prep = np.asarray(data)
transformed = prep.reshape((1, n_dimension))
return transformed
def prep_batch(to_prep):
prep = np.vstack(to_prep)
return prep
if __name__ == "__main__":
key = 'CartPole-v0'
exp = Experiment(key)
agent = DQNAgent(exp.env)
epsilon = EpsilonUpdater(agent)
agent.add_observer(epsilon)
exp.run_experiment(agent)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment