Skip to content

Instantly share code, notes, and snippets.

@liangfu
Created August 13, 2021 07:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save liangfu/cbb1eea279079448b859e308f4deb75e to your computer and use it in GitHub Desktop.
Save liangfu/cbb1eea279079448b859e308f4deb75e to your computer and use it in GitHub Desktop.
# Inspired by https://keon.io/deep-q-learning/
# python3 -m pip install keras==2.3.1 tensorflow-gpu==1.15 gym==0.18.3
# python3 -m pip install 'h5py==2.10.0' --force-reinstall
import random
import gym
import math
import numpy as np
import keras
from collections import deque
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
import tensorflow as tf
class DQNCartPoleSolver():
def __init__(self, n_episodes=1000, n_win_ticks=195, max_env_steps=None, gamma=1.0, epsilon=1.0, epsilon_min=0.01, epsilon_log_decay=0.995, alpha=0.01, alpha_decay=0.01, batch_size=256, monitor=False, quiet=False):
self.memory = deque(maxlen=100000)
self.env = gym.make('CartPole-v0')
if monitor: self.env = gym.wrappers.Monitor(self.env, './videos', force=False)
self.gamma = gamma
self.epsilon = epsilon
self.epsilon_min = epsilon_min
self.epsilon_decay = epsilon_log_decay
self.alpha = alpha
self.alpha_decay = alpha_decay
self.n_episodes = n_episodes
self.n_win_ticks = n_win_ticks
self.batch_size = batch_size
self.quiet = quiet
if max_env_steps is not None: self.env._max_episode_steps = max_env_steps
# Init model
self.model = Sequential()
self.model.add(Dense(24, input_dim=4, activation='tanh'))
self.model.add(Dense(48, activation='tanh'))
self.model.add(Dense(2, activation='linear'))
self.model.compile(loss='mse', optimizer=Adam(learning_rate=self.alpha, decay=self.alpha_decay))
# Uncomment to load the pretrained model
# self.model = keras.models.load_model('./dqn_model.h5')
def remember(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))
def choose_action(self, state, epsilon):
return self.env.action_space.sample() if (np.random.random() <= epsilon) else np.argmax(self.model.predict(state))
def get_epsilon(self, t):
return max(self.epsilon_min, min(self.epsilon, 1.0 - math.log10((t + 1) * self.epsilon_decay)))
def preprocess_state(self, state):
return np.reshape(state, [1, 4])
def replay(self, batch_size):
x_batch, y_batch = [], []
minibatch = random.sample(
self.memory, min(len(self.memory), batch_size))
for state, action, reward, next_state, done in minibatch:
y_target = self.model.predict(state)
y_target[0][action] = reward if done else reward + self.gamma * np.max(self.model.predict(next_state)[0])
x_batch.append(state[0])
y_batch.append(y_target[0])
self.model.fit(np.array(x_batch), np.array(y_batch), batch_size=len(x_batch), verbose=0)
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
def run(self):
scores = deque(maxlen=100)
for e in range(self.n_episodes):
state = self.preprocess_state(self.env.reset())
done = False
i = 0
while not done:
action = self.choose_action(state, self.get_epsilon(e))
next_state, reward, done, _ = self.env.step(action)
next_state = self.preprocess_state(next_state)
self.remember(state, action, reward, next_state, done)
state = next_state
i += 1
scores.append(i)
mean_score = np.mean(scores)
if mean_score >= self.n_win_ticks and e >= 100:
if not self.quiet: print('Ran {} episodes. Solved after {} trials ✔'.format(e, e - 100))
return e - 100
print('[Episode {}] - Mean survival time over last 100 episodes was {:.3f} ticks.'.format(e, mean_score))
self.model.save('./dqn_model.h5')
self.replay(self.batch_size)
if not self.quiet: print('Did not solve after {} episodes 😞'.format(e))
return e
if __name__ == '__main__':
agent = DQNCartPoleSolver()
agent.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment