OpenAI Gym - CartPole-v0
import os | |
from keras.models import Sequential | |
from keras.layers import Dense | |
from keras.optimizers import Adam | |
from common.gym_runner import GymRunner | |
from common.q_learning_agent import QLearningAgent | |
class CartPoleAgent(QLearningAgent): | |
def __init__(self): | |
super().__init__(4, 2) | |
def build_model(self): | |
model = Sequential() | |
model.add(Dense(12, activation='relu', input_dim=4)) | |
model.add(Dense(12, activation='relu')) | |
model.add(Dense(2)) | |
model.compile(Adam(lr=0.001), 'mse') | |
# load the weights of the model if reusing previous training session | |
# model.load_weights("models/cartpole-v0.h5") | |
return model | |
if __name__ == "__main__": | |
gym = GymRunner('CartPole-v0', 'gymresults/cartpole-v0') | |
agent = CartPoleAgent() | |
gym.train(agent, 1000) | |
gym.run(agent, 500) | |
agent.model.save_weights("models/cartpole-v0.h5", overwrite=True) | |
gym.close_and_upload(os.environ['API_KEY']) |
import gym | |
from gym import wrappers | |
class GymRunner: | |
def __init__(self, env_id, monitor_dir, max_timesteps=100000): | |
self.monitor_dir = monitor_dir | |
self.max_timesteps = max_timesteps | |
self.env = gym.make(env_id) | |
self.env = wrappers.Monitor(self.env, monitor_dir, force=True) | |
def calc_reward(self, state, action, gym_reward, next_state, done): | |
return gym_reward | |
def train(self, agent, num_episodes): | |
self.run(agent, num_episodes, do_train=True) | |
def run(self, agent, num_episodes, do_train=False): | |
for episode in range(num_episodes): | |
state = self.env.reset().reshape(1, self.env.observation_space.shape[0]) | |
total_reward = 0 | |
for t in range(self.max_timesteps): | |
action = agent.select_action(state, do_train) | |
# execute the selected action | |
next_state, reward, done, _ = self.env.step(action) | |
next_state = next_state.reshape(1, self.env.observation_space.shape[0]) | |
reward = self.calc_reward(state, action, reward, next_state, done) | |
# record the results of the step | |
if do_train: | |
agent.record(state, action, reward, next_state, done) | |
total_reward += reward | |
state = next_state | |
if done: | |
break | |
# train the agent based on a sample of past experiences | |
if do_train: | |
agent.replay() | |
print("episode: {}/{} | score: {} | e: {:.3f}".format( | |
episode + 1, num_episodes, total_reward, agent.epsilon)) | |
def close_and_upload(self, api_key): | |
self.env.close() | |
gym.upload(self.monitor_dir, api_key=api_key) |
import abc | |
from collections import deque | |
import numpy as np | |
import random | |
class QLearningAgent: | |
def __init__(self, state_size, action_size): | |
self.state_size = state_size | |
self.action_size = action_size | |
# hyperparameters | |
self.gamma = 0.95 # discount rate on future rewards | |
self.epsilon = 1.0 # exploration rate | |
self.epsilon_decay = 0.995 # the decay of epsilon after each training batch | |
self.epsilon_min = 0.1 # the minimum exploration rate permissible | |
self.batch_size = 32 # maximum size of the batches sampled from memory | |
# agent state | |
self.model = self.build_model() | |
self.memory = deque(maxlen=2000) | |
@abc.abstractmethod | |
def build_model(self): | |
return None | |
def select_action(self, state, do_train=True): | |
if do_train and np.random.rand() <= self.epsilon: | |
return random.randrange(self.action_size) | |
return np.argmax(self.model.predict(state)[0]) | |
def record(self, state, action, reward, next_state, done): | |
self.memory.append((state, action, reward, next_state, done)) | |
def replay(self): | |
if len(self.memory) < self.batch_size: | |
return 0 | |
minibatch = random.sample(self.memory, self.batch_size) | |
for state, action, reward, next_state, done in minibatch: | |
target = reward | |
if not done: | |
target = (reward + self.gamma * | |
np.amax(self.model.predict(next_state)[0])) | |
target_f = self.model.predict(state) | |
target_f[0][action] = target | |
self.model.fit(state, target_f, epochs=1, verbose=0) | |
if self.epsilon > self.epsilon_min: | |
self.epsilon *= self.epsilon_decay |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This comment has been minimized.
hi, great work!
i have a doubt though, at line 43,44 in q_learning_agent.py
target = (reward + self.gamma *np.amax(self.model.predict(next_state)[0]))
you are using model which is used to predict action, for predicting reward. why is that? and is it correct at first place?