Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import gym
import numpy as np
import matplotlib.pyplot as plt
env_type = "FrozenLake8x8-v0"
algorithm_type = "q_learning"
policy_type = "epsilon_greedy"
run_name = 'run-{0}-{1}-{2}'.format(env_type, algorithm_type, policy_type)
# Random seed
np.random.RandomState(42)
# Selection of the problem
env = gym.envs.make(env_type)
# init Q table with zeros indicating no prior knowledge
na = env.action_space.n
ns = env.observation_space.n
q_value = np.zeros([ns, na])
# agent hyperparameters
alpha = 0.1
gamma = 0.99
epsilon = 0.7
epsilon_decay = 0.995
kappa = 0.01
# sim setup
n_episode = 20000
max_step = 100
average_terminal_reward = -1
def select_action_with_epsilon_greedy(current_state, q_value, epsilon=0.1):
action = np.argmax(q_value[current_state, :])
if np.random.rand() < epsilon:
action = np.random.randint(q_value.shape[1])
return action
def update_q_value(q_value, reward, current_state, next_state, current_action):
# Calculation of TD error
delta = reward + gamma * np.max(q_value[next_state, :]) - q_value[current_state, current_action]
# Update a Q value table
q_value[current_state, current_action] += alpha * delta
return q_value
history = []
env.reset()
for i_episode in range(n_episode):
# reset
cumulative_reward = 0
current_state = env.reset()
# randomly select first action in episode
current_action = select_action_with_epsilon_greedy(current_state, q_value, epsilon=epsilon)
for i_step in range(max_step):
# get a result of your action from the environment
next_state, reward, done, info = env.step(current_action)
# Update a cummulative reward
cumulative_reward = reward + gamma * cumulative_reward
# determine next action
next_action = select_action_with_epsilon_greedy(next_state, q_value, epsilon=epsilon)
# update q_table
update_q_value(q_value, reward, current_state, next_state, current_action)
# update state
current_state = next_state
current_action = next_action
if done:
kappa = 0.01
if average_terminal_reward == -1:
average_terminal_reward = reward
else:
average_terminal_reward = kappa * reward + (1. - kappa) * average_terminal_reward
if reward > average_terminal_reward:
# bias the towards exploitation
# epsilon is decayed expolentially
epsilon = epsilon * epsilon_decay
print (f"""Episode: {i_episode}\t Steps: {i_step}\
\tCumulative Reward: {cumulative_reward}\tTerminal Reward: {reward}\
\tAverage Terminal Reward: {round(average_terminal_reward,2)}\tEpsilon: {round(epsilon,2)}""")
history.append([i_episode, i_step, cumulative_reward, reward, average_terminal_reward, epsilon])
break
def render_and_save_history(history:list):
history = np.array(history)
window_size = 100
def running_average(x, window_size, mode='valid'):
return np.convolve(x, np.ones(window_size)/window_size, mode=mode)
fig, ax = plt.subplots(2, 2, figsize=[12, 8])
# Number of steps
ax[0, 0].plot(history[:, 0], history[:, 1], '.')
ax[0, 0].set_xlabel('Episode')
ax[0, 0].set_ylabel('Number of steps')
ax[0, 0].plot(history[window_size-1:, 0], running_average(history[:, 1], window_size))
# Cumulative reward
ax[0, 1].plot(history[:, 0], history[:, 2], '.')
ax[0, 1].set_xlabel('Episode')
ax[0, 1].set_ylabel('Cumulative rewards')
ax[0, 1].plot(history[window_size-1:, 0], running_average(history[:, 2], window_size))
# Terminal reward
ax[1, 0].plot(history[:, 0], history[:, 3], '.')
ax[1, 0].set_xlabel('Episode')
ax[1, 0].set_ylabel('Terminal rewards')
ax[1, 0].plot(history[window_size-1:, 0], running_average(history[:, 3], window_size))
# Epsilon/Beta
ax[1, 1].plot(history[:, 0], history[:, 5], '.')
ax[1, 1].set_xlabel('Episode')
if policy_type == 'softmax':
ax[1, 1].set_ylabel('Beta')
elif policy_type == 'epsilon_greedy':
ax[1, 1].set_ylabel('Epsilon')
fig.savefig('./'+run_name+'.png')
render_and_save_history(history)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.