Last active
December 14, 2016 19:31
-
-
Save DavidSanwald/c02eb10aed03bbe376dc90d079aedf6f to your computer and use it in GitHub Desktop.
Condensed one file gist for more convenient evaluation at https://gym.openai.com/evaluations/eval_GFtDBmuyRjCzcAkBibwYWQ
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Example implementation of Double DQN to provide an understandable, clear | |
implementation of the underlying algorithm using OpenAI gym as benchmark. | |
Further explanations: | |
https://davidsanwald.github.io/2016/12/11/Double-DQN-interfacing-OpenAi-Gym.html | |
If you need any help or have any questions, just drop me a note (: | |
The code is ased on the work of van Hasselt et al. esp. the Double DQN paper: | |
https://arxiv.org/abs/1509.06461 | |
Keras has been used to provide highl level implementation of the Q-networks, | |
to reduce distraction from the main Double DQN principles. | |
The used environment doesn't benefit much from the Double DQN extension, | |
because of the low number of available actions and the deterministic dynamics. | |
With tuned parameters and an extra convolution layer the code also runs on PONG | |
using raw pixel input. | |
The environment has been chosen because the agent can be trained | |
even on a single CPU. The code as been condenced to a single file Gist, | |
to maximice convenience of reproducibility on: | |
https://gym.openai.com/evaluations/eval_GFtDBmuyRjCzcAkBibwYWQ | |
Author: | |
David Sanwald | |
why.ever.not.berlin@gmail.com | |
https://davidsanwald.github.io | |
https://twitter.com/ | |
run with | |
python ddqn.py | |
""" | |
from __future__ import (absolute_import, division, print_function, | |
unicode_literals) | |
from collections import deque | |
from random import sample | |
import numpy as np | |
import gym | |
import keras | |
from keras.layers.core import Dense | |
from keras.models import Sequential | |
from keras.optimizers import SGD | |
EPSILON_MIN = 0.1 | |
EPSILON_MAX = 0.8 | |
EPSILON_DECAY = 0.00075 | |
MEMORY_CAPACITY = 500000 | |
TARGET_UPDATE = 300 | |
SIZE_HIDDEN = 16 | |
BATCH_SIZE = 32 | |
GAMMA = 0.99 | |
LEARNING_RATE = 0.0075 | |
MAX_STEPS = 2000 | |
ACTIVATION = 'tanh' | |
LEARNING_START = 100 | |
N_EPISODES = 20000 | |
MONITOR_DIR = '/tmp/cartpole4' | |
class Experiment: | |
def __init__(self, environment): | |
self.env = gym.make(environment) | |
self.episode_count = 0 | |
self.reward_buffer = deque([], maxlen=100) | |
def run_experiment(self, agent): | |
self.env.monitor.start(MONITOR_DIR) | |
for n in range(N_EPISODES): | |
self.run_episode(agent) | |
self.env.monitor.close() | |
pass | |
def run_episode(self, agent): | |
self.reward = 0 | |
s = self.env.reset() | |
done = False | |
while not done: | |
self.env.render() | |
a = agent.act(s) | |
s_, r, done, _ = self.env.step(a) | |
agent.learn((s, a, s_, r, done)) | |
self.reward += r | |
s = s_ | |
self.episode_count += 1 | |
self.reward_buffer.append(self.reward) | |
average = sum(self.reward_buffer) / len(self.reward_buffer) | |
print("Episode Nr. {} \nScore: {} \nAverage: {}".format( | |
self.episode_count, self.reward, average)) | |
class DQNAgent: | |
def __init__(self, environment): | |
self.env = environment | |
self.memory = ReplayMemory(MEMORY_CAPACITY) | |
self.dim_actions = self.env.action_space.n | |
self.dim_states = self.env.observation_space.shape | |
self.NN = NN(self.env.observation_space.shape, self.env.action_space.n, | |
BATCH_SIZE, SIZE_HIDDEN, LEARNING_RATE, ACTIVATION) | |
self.observers = [] | |
self.episode_count = 0 | |
self.step_count_total = 1 | |
self.step_count_episode = 1 | |
self.epsilon_min = EPSILON_MIN | |
self.epsilon_max = EPSILON_MAX | |
self.epsilon_decay = EPSILON_DECAY | |
self.target_update = TARGET_UPDATE | |
self.max_steps = MAX_STEPS | |
self.n_episodes = N_EPISODES | |
self.epsilon = EPSILON_MAX | |
self.batch_size = BATCH_SIZE | |
self.usetarget = False | |
self.gamma = GAMMA | |
self.loss = 0 | |
self.done = False | |
self.reward = 0 | |
self.reward_episode = 0 | |
self.learning_switch = False | |
self.learning_start = LEARNING_START | |
def notify(self, event): | |
for observer in self.observers: | |
observer(event) | |
pass | |
def act(self, state): | |
self.step_count_total += 1 | |
action = self.choose_action(state) | |
return action | |
def learn(self, obs): | |
self.memory.store(obs) | |
if self.learning_switch: | |
self.backup() | |
self.notify('step_done') | |
pass | |
def backup(self): | |
self.flashback() | |
if self.step_count_total % self.target_update == 0: | |
print('update') | |
print(self.epsilon) | |
self.NN.update_target() | |
self.usetarget = True | |
pass | |
def flashback(self): | |
X, y = self._make_batch() | |
self.loss = self.NN.train(X, y) | |
if np.isnan(self.loss.history['loss']).any(): | |
print('Warning, loss is {}'.format(self.loss)) | |
pass | |
def choose_action(self, state): | |
if np.random.rand() <= self.epsilon: | |
choice = self.random_choice() | |
else: | |
choice = self.greedy_choice(state) | |
return choice | |
def greedy_choice(self, state): | |
greedy_choice = self.NN.best_action(state, usetarget=False) | |
return greedy_choice | |
def random_choice(self): | |
random_choice = np.random.randint(0, self.dim_actions) | |
return random_choice | |
def _make_batch(self): | |
X = [] | |
y = [] | |
batch = self.memory.get_batch(self.batch_size) | |
for state, action, newstate, reward, done in batch: | |
X.append(state) | |
target = self.NN.predict(state, False) | |
q_vals_new_t = self.NN.predict(newstate, self.usetarget) | |
a_select = self.NN.best_action(newstate, False) | |
if done: | |
target[action] = reward | |
else: | |
target[action] = reward + self.gamma * q_vals_new_t[a_select] | |
y.append(target) | |
return X, y | |
def add_observer(self, observer): | |
self.observers.append(observer) | |
pass | |
class NN: | |
def __init__(self, n_states, n_actions, batch_size, size_hidden, | |
learning_rate, activation): | |
self.learning_rate = learning_rate | |
self.act = activation | |
self.n_states = n_states | |
self.n_actions = n_actions | |
self.model = self._make_model(n_states, n_actions, size_hidden) | |
self.model_t = self._make_model(n_states, n_actions, size_hidden) | |
self.batch_size = batch_size | |
def _make_model(self, n_states, n_actions, size_hidden): | |
model = Sequential() | |
model.add(Dense(size_hidden, input_dim=4, activation=self.act)) | |
model.add(Dense(size_hidden, activation=self.act)) | |
model.add(Dense(n_actions, activation='linear')) | |
opt = SGD(lr=self.learning_rate, momentum=0.5, decay=1e-6, clipnorm=2) | |
model.compile(loss='mean_squared_error', optimizer=opt) | |
return model | |
def train(self, X, y): | |
X = prep_batch(X) | |
y = prep_batch(y) | |
loss = self.model.fit(X, | |
y, | |
batch_size=self.batch_size, | |
nb_epoch=1, | |
verbose=0, | |
shuffle=True) | |
return loss | |
def predict(self, state, usetarget=False): | |
state = prep_input(state, self.n_states[0]) | |
if usetarget: | |
q_vals = self.model_t.predict(state) | |
else: | |
q_vals = self.model.predict(state) | |
return q_vals[0] | |
def update_target(self): | |
weights = self.model.get_weights() | |
self.model_t.set_weights(weights) | |
self.save('weights.h5') | |
pass | |
def best_action(self, state, usetarget=False): | |
state = prep_input(state, self.n_states[0]) | |
q_vals = self.predict(state, usetarget) | |
best_action = np.argmax(q_vals) | |
return best_action | |
def save(self, fname): | |
self.model.save_weights(fname, overwrite=True) | |
pass | |
def load(self, fname): | |
self.model.load_weights(fname) | |
self.update() | |
pass | |
class ReplayMemory: | |
def __init__(self, capacity): | |
self.samples = deque([], maxlen=capacity) | |
def store(self, exp): | |
self.samples.append(exp) | |
pass | |
def get_batch(self, n): | |
n_samples = min(n, len(self.samples)) | |
samples = sample(self.samples, n_samples) | |
return samples | |
class EpsilonUpdater: | |
def __init__(self, agent): | |
self.agent = agent | |
def __call__(self, event): | |
if event == 'step_done': | |
self.epsilon_update() | |
self.switch_learning() | |
else: | |
pass | |
def epsilon_update(self): | |
self.agent.epsilon = ( | |
self.agent.epsilon_min + | |
(self.agent.epsilon_max - self.agent.epsilon_min) * np.exp( | |
-self.agent.epsilon_decay * self.agent.step_count_total)) | |
pass | |
def switch_learning(self): | |
if self.agent.step_count_total >= self.agent.learning_start: | |
self.agent.learning_switch = True | |
pass | |
def prep_input(data, n_dimension): | |
prep = np.asarray(data) | |
transformed = prep.reshape((1, n_dimension)) | |
return transformed | |
def prep_batch(to_prep): | |
prep = np.vstack(to_prep) | |
return prep | |
if __name__ == "__main__": | |
key = 'CartPole-v0' | |
exp = Experiment(key) | |
agent = DQNAgent(exp.env) | |
epsilon = EpsilonUpdater(agent) | |
agent.add_observer(epsilon) | |
exp.run_experiment(agent) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment