Skip to content

Instantly share code, notes, and snippets.

@Ending2015a
Forked from wayne1029jihad/cartpole.py
Last active September 16, 2021 07:19
Show Gist options
  • Save Ending2015a/35541009a21c1c83cf1d903b9690632e to your computer and use it in GitHub Desktop.
Save Ending2015a/35541009a21c1c83cf1d903b9690632e to your computer and use it in GitHub Desktop.
cartpole.py
import os
import random
import gym
import numpy as np
import pylab
from collections import deque
import pyvirtualdisplay
import cv2
import datetime
import tensorflow as tf
from tensorflow.keras.utils import plot_model
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Input, Dense, Lambda, Add, Conv2D, Flatten
from tensorflow.keras.optimizers import Adam, RMSprop
from tensorflow.keras import backend as K
from tensorflow.compat.v1 import ConfigProto
from tensorflow.compat.v1 import InteractiveSession
config = ConfigProto()
config.gpu_options.allow_growth = True
session = InteractiveSession(config=config)
def Model_(input_shape, action_space):
input_x = Input(input_shape)
input_ = input_x
input_ = tf.keras.layers.Sequential([
tf.keras.layers.Conv2D(64, 5, 3, input_shape=input_shape, data_format="channels_first"),
tf.keras.layers.ReLU(),
tf.keras.layers.Conv2D(64, 4, 2, data_format="channels_first"),
tf.keras.layers.ReLU(),
tf.keras.layers.Conv2D(64, 3, 1, data_format="channels_first"),
tf.keras.layers.ReLU(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(512, kernel_initializer='he_uniform'),
tf.keras.layers.ReLU(),
tf.keras.layers.Dense(256, kernel_initializer='he_uniform'),
tf.keras.layers.ReLU(),
tf.keras.layers.Dense(64, kernel_initializer='he_uniform'),
tf.keras.layers.ReLU(),
tf.keras.layers.Dense(action_space, kernel_initializer='he_uniform'),
])(input_)
model = Model(inputs=input_x, outputs=input_)
model.compile(loss="huber", optimizer=RMSprop(lr=0.00025, rho=0.95, epsilon=0.01), metrics=["accuracy"])
model.summary()
return model
class DQNAgent:
def __init__(self, env_name):
self.env_name = env_name
self.env = gym.make(env_name)
#self.env.seed(0)
log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
self.tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)
self.env._max_episode_steps = 200
self.state_size = self.env.observation_space.shape[0]
self.action_size = self.env.action_space.n
#self.EPISODES = 6000
self.EPISODES = 3000
# Instantiate memory
memory_size = 3000
self.memory = deque(maxlen=memory_size)
self.gamma = 0.95 # discount rate
# EXPLORATION HYPERPARAMETERS for epsilon and epsilon greedy strategy
self.epsilon = 1.0
self.epsilon_min = 0.0001
self.epsilon_decay = 0.0005
self.batch_size = 64
self.TAU = 0.05# target network soft update hyperparameter
self.Save_Path = 'Models' +datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
if not os.path.exists(self.Save_Path):
os.makedirs(self.Save_Path)
self.scores, self.episodes, self.average = [], [], []
self.Model_name = os.path.join(self.Save_Path, self.env_name+"_DQN_CNN.h5")
self.ROWS = 160
self.COLS = 240
self.FRAME_STEP = 4
self.image_memory = np.zeros((self.FRAME_STEP, self.ROWS, self.COLS))
self.state_size = (self.FRAME_STEP, self.ROWS, self.COLS)
# create main model and target model
self.model = Model_(input_shape=self.state_size, action_space = self.action_size)
self.target_model = Model_(input_shape=self.state_size, action_space = self.action_size)
# initialize target model
self.update_target_model(rate=1.0)
# setup optimizer
self.optimizer = tf.keras.optimizers.Adam(lr=1e-3)
# after some time interval update the target model to be same with model
def update_target_model(self, rate=1.0):
vars = self.model.trainable_variables
target_vars = self.target_model.trainable_variables
for tar, var in zip(target_vars, vars):
tar.assign(tar * (1.-rate) + var * rate)
def remember(self, state, action, reward, next_state, done):
experience = state, action, reward, next_state, done
self.memory.append((experience))
def act(self, state, decay_step):
if self.epsilon > self.epsilon_min:
self.epsilon *= (1-self.epsilon_decay)
explore_probability = self.epsilon
if explore_probability > np.random.rand():
# Make a random action (exploration)
return random.randrange(self.action_size)
else:
# Get action from Q-network (exploitation)
# Estimate the Qs values state
# Take the biggest Q value (= the best action)
return np.argmax(self.model.predict(state))
@tf.function
def train(self, state, action, reward, next_state, done):
vars = self.model.trainable_variables
with tf.GradientTape() as tape:
tape.watch(vars)
action = tf.cast(action, dtype=tf.int64)
reward = tf.cast(reward, dtype=tf.float32)
done = tf.cast(done, dtype=tf.float32)
# calculate target q value
next_qs = self.target_model(next_state, training=True)
next_q = tf.math.reduce_max(next_qs, axis=-1)
y = reward + tf.stop_gradient((1.-done) * self.GAMMA * next_q)
# calculate current q
qs = self.model(state)
q = tf.gather(qs, indices=action, batch_dims=1)
# calculate td loss
loss = tf.keras.losses.huber(y, q)
# perform gradient updates
grads = tape.gradient(loss, vars)
self.optimizer.apply_gradients(zip(grads, vars))
def replay(self):
# Randomly sample minibatch from the deque memory
minibatch = random.sample(self.memory, min(len(self.memory), self.batch_size))
state = np.zeros((self.batch_size,) + self.state_size)
next_state = np.zeros((self.batch_size,) + self.state_size)
action, reward, done = [], [], []
for i in range(len(minibatch)):
state[i] = minibatch[i][0]
action.append(minibatch[i][1])
reward.append(minibatch[i][2])
next_state[i] = minibatch[i][3]
done.append(minibatch[i][4])
# convert to numpy arrays
state = np.asarray(state)
action = np.asarray(action)
reward = np.asarray(reward)
next_state = np.asarray(next_state)
done = np.asarray(done)
self.train(state, action, reward, next_state, done)
def load(self, name):
self.model = load_model(name)
def save(self, name):
self.model.save(name)
"""
def imshow(self, image, frame_step=0):
#cv2.imshow("cartpole"+str(frame_step), image[frame_step,...])
if cv2.waitKey(25) & 0xFF == ord("q"):
cv2.destroyAllWindows()
return
"""
def GetImage(self):
img = self.env.render(mode='rgb_array')
img_rgb = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
img_rgb_resized = cv2.resize(img_rgb, (self.COLS, self.ROWS), interpolation=cv2.INTER_CUBIC)
img_rgb_resized[img_rgb_resized < 255] = 0
img_rgb_resized = img_rgb_resized / 255
self.image_memory = np.roll(self.image_memory, 1, axis = 0)
self.image_memory[0,:,:] = img_rgb_resized
return np.expand_dims(self.image_memory, axis=0)
def reset(self):
self.env.reset()
for i in range(self.FRAME_STEP):
state = self.GetImage()
return state
def step(self,action):
next_state, reward, done, info = self.env.step(action)
next_state = self.GetImage()
return next_state, reward, done, info
def run(self):
decay_step = 0
state = self.reset()
for i in range(500):
action = random.randrange(self.action_size)
next_state, reward, done, _ = self.step(action)
self.remember(state, action, reward, next_state, done)
step_num = 0
for e in range(self.EPISODES):
state = self.reset()
done = False
i = 0
while not done:
decay_step += 1
step_num += 1
action = self.act(state, decay_step)
next_state, reward, done, _ = self.step(action)
self.remember(state, action, reward, next_state, done)
state = next_state
i += 1
if done:
if e % self.FRAME_STEP == 0:
self.update_target_model(rate=self.TAU)
self.scores.append(i)
self.episodes.append(e)
self.average.append(sum(self.scores[-100:]) / len(self.scores[-100:]))
print("episode: {}/{}, score: {}".format(e, self.EPISODES, i))
if i == self.env._max_episode_steps:
print("Saving trained model to", self.Model_name,e)
self.save(self.Model_name+str(e))
break
if step_num % 200 == 0:
agent.test(10,"eval")
if step_num % 4 == 0:
self.replay()
self.env.close()
def test(self,num_eval_episodes,model_name):
total_return = 0
if(model_name =="eval"):
m = self.target_model
else:
agent.load(model_name)
m = self.model
for e in range(num_eval_episodes):
state = self.reset()
done = False
i = 0
while not done:
action = np.argmax(m.predict(state))
next_state, reward, done, _ = self.step(action)
state = next_state
i += reward
if done:
print("episode: {}/{}, score: {}".format(e, num_eval_episodes, i))
break
total_return += i
print("Model:",model_name)
print("Average:{}".format(total_return/num_eval_episodes))
if total_return/num_eval_episodes > 180:
self.save(self.Model_name)
if __name__ == "__main__":
env_name = 'CartPole-v0'
display = pyvirtualdisplay.Display(visible=0, size=(1400, 900)).start()
agent = DQNAgent(env_name)
print("Training Start!")
agent.run()
print("Training Finish")
#agent.test(100,"Models_2W_0902_0655/CartPole-v0_DQN_CNN.h52026")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment