Skip to content

Instantly share code, notes, and snippets.

@chetandhembre
Created October 20, 2016 20:19
Show Gist options
  • Save chetandhembre/1561bf025f4564477e808affd3bca79d to your computer and use it in GitHub Desktop.
Save chetandhembre/1561bf025f4564477e808affd3bca79d to your computer and use it in GitHub Desktop.
Windy Gride Sarsa Implementation
#graph solution
# https://dl.dropboxusercontent.com/u/47591917/windy_gride_episode_length_sarsa.png
# https://dl.dropboxusercontent.com/u/47591917/windy_gride_episode_reward_sarsa.png
# https://dl.dropboxusercontent.com/u/47591917/windy_gride_episode_timestamp_sarsa.png
# https://dl.dropboxusercontent.com/u/47591917/windy_gride_path_sarsa.png
import numpy as np
import matplotlib
import math
import pandas as pd
from matplotlib import pyplot as plt
UP = 0
DOWN = 1
RIGHT = 2
LEFT = 3
ALLOWED_ACTION = [UP, DOWN, RIGHT, LEFT]
class State(object):
def __init__(self, x, y):
self.x = x
self.y = y
def __hash__(self):
return hash((self.x, self.y))
def __eq__(self, other):
return (self.x == other.x) and (self.y == other.y)
def __str__(self):
return str((self.x, self.y))
class ActionState(object):
def __init__(self, state, action):
self.state = state
self.action = action
def __hash__(self):
return hash((self.state.x, self.state.y, self.action))
def __eq__(self, other):
return (self.state == other.state) and (self.action == other.action)
def __str__(self):
return str((self.state.x, self.state.y, self.action))
class ValueMap(object):
def __init__(self, width, height, discount_factor=1.0, alpha=0.5, epsilon=0.1):
self.width = width
self.height = height
self.state_action_map = {}
self.wind_strength = [0, 0, 0, 1, 1, 1, 2, 2, 1, 0]
self.discount_factor = discount_factor
self.alpha = alpha
self.epsilon = epsilon
self.initializesd()
def initializesd(self):
for i in range(self.height):
for j in range(self.width):
for action in ALLOWED_ACTION:
state = State(i, j)
action_state = ActionState(state, action)
self.state_action_map[action_state] = 0.0
def get_start_position(self):
x = np.random.randint(0, 1,size=1)[0]
y = np.random.randint(0, self.height,size=1)[0]
return State(3, 0)
def get_end_position(self):
x = np.random.randint(int(7), self.width,size=1)[0]
y = np.random.randint(0, self.height,size=1)[0]
return State(3, 7)
def get_greedy_action(self, state):
max_value = - np.inf
max_action = None
for action in ALLOWED_ACTION:
action_state = ActionState(state, action)
value = self.state_action_map[action_state]
if value > max_value:
max_value = value
max_action = action
return max_action
def _get_next_state(self, current_state, current_action):
x, y = current_state.x, current_state.y
new_x, new_y = x, y
if current_action == 0:
new_x = x + 1
elif current_action == 1:
new_x = x - 1
elif current_action == 2:
new_y = y + 1
elif current_action == 3:
new_y = y - 1
new_y = max(0, new_y)
new_y = min(self.width - 1, new_y)
# we suppose to multiply by -1 but they way we are created gride we are just multipling by 1
new_x = new_x + self.wind_strength[y] * 1
new_x = max(0, new_x)
new_x = min(self.height - 1, new_x)
return State(new_x, new_y)
def select_action(self, state, is_greedy=False):
actions_probability = np.ones(len(ALLOWED_ACTION)) * self.epsilon / len(ALLOWED_ACTION)
greedy_action = self.get_greedy_action(state)
if is_greedy:
return greedy_action
actions_probability[greedy_action] = actions_probability[greedy_action] + (1 - self.epsilon)
return np.random.choice(np.arange(len(actions_probability)), p=actions_probability)
def get_reward(self, state, action, end):
next_state = self._get_next_state(state, action)
if next_state == end:
return 1
return -1
def get_next_state(self, current_state, current_action, end, is_greedy=False):
next_state = self._get_next_state(current_state, current_action)
next_action = self.select_action(next_state, is_greedy=is_greedy)
reward = self.get_reward(next_state, next_action, end)
return next_state, next_action, reward
def update_value_map(self, current_state, current_action, next_state, next_action, reward):
current_state_action = ActionState(current_state, current_action)
old_value = self.state_action_map[current_state_action]
next_state_action = ActionState(next_state, next_action)
next_value = self.state_action_map[next_state_action]
target = reward + self.discount_factor * next_value
self.state_action_map[current_state_action] = old_value + float(self.alpha * (target - old_value))
class Game(object):
def __init__(self, width, height, no_episodes, discount_factor=1.0, alpha=0.1, epsilon=0.1):
self.width = width
self.height = height
self.value_map = ValueMap(width, height, discount_factor=discount_factor, alpha=alpha, epsilon=epsilon)
self.no_episodes = no_episodes
self.start = self.value_map.get_start_position()
self.end = self.value_map.get_end_position()
self.action_per_episodes = []
self.rewards_per_episodes = []
self.last_episode_actions = []
def plot(self):
noshow = True
# Plot the episode length over time
fig1 = plt.figure(figsize=(10,5))
plt.plot(self.action_per_episodes)
plt.xlabel("Epsiode")
plt.ylabel("Epsiode Length")
plt.title("Episode Length over Time")
plt.savefig('windy_gride_episode_length_sarsa.png')
if noshow:
plt.close(fig1)
else:
plt.show(fig1)
# Plot the episode reward over time
fig2 = plt.figure(figsize=(10,5))
smoothing_window = 10
rewards_smoothed = pd.Series(self.rewards_per_episodes).rolling(smoothing_window, min_periods=smoothing_window).mean()
plt.plot(rewards_smoothed)
plt.xlabel("Epsiode")
plt.ylabel("Epsiode Reward (Smoothed)")
plt.title("Episode Reward over Time (Smoothed over window size {})".format(smoothing_window))
plt.savefig('windy_gride_episode_reward_sarsa.png')
if noshow:
plt.close(fig2)
else:
plt.show(fig2)
# Plot time steps and episode number
fig3 = plt.figure(figsize=(10,5))
plt.plot(np.cumsum(self.action_per_episodes), np.arange(len(self.action_per_episodes)))
plt.xlabel("Time Steps")
plt.ylabel("Episode")
plt.title("Episode per time step")
plt.savefig('windy_gride_episode_timestamp_sarsa.png')
if noshow:
plt.close(fig3)
else:
plt.show(fig3)
plt.plot(self.start.y, self.start.x, 'x', markersize=20)
previous_x = self.start.y
previous_y = self.start.x
for position in self.last_episode_actions[1:]:
x, y = position.y, position.x
plt.arrow(previous_x, previous_y, x - previous_x, y - previous_y, head_width=0.3, head_length=0.3, overhang=0)
plt.plot(x, y, 'o', markersize=5)
previous_x = x
previous_y = y
plt.plot(self.end.y, self.end.x, 'x', markersize=20)
axes = plt.gca()
axes.set_xticks(range(-2, self.width + 2))
axes.set_yticks(range(-2, self.height + 2))
axes.set_title('path to reach destination')
plt.grid()
# plt.show()
plt.savefig('windy_gride_path_sarsa.png')
def play(self):
for i in range(self.no_episodes):
is_greedy = i == self.no_episodes - 1
current_state = self.start
current_action = self.value_map.select_action(current_state, is_greedy=is_greedy)
actions = 0
rewards = 0
self.last_episode_actions = []
self.last_episode_actions.append(current_state)
while not (current_state == self.end):
if actions > 2000:
print "episode %s failed" % (str(i))
break
next_state, next_action, reward = self.value_map.get_next_state(current_state, current_action, self.end, is_greedy=is_greedy)
# print next_state, next_action, self.end
self.value_map.update_value_map(current_state, current_action, next_state, next_action, reward)
current_state = next_state
current_action = next_action
actions = actions + 1
rewards = rewards + reward
self.last_episode_actions.append(current_state)
self.action_per_episodes.append(actions)
self.rewards_per_episodes.append(rewards)
game = Game(10, 7, 1000)
game.play()
game.plot()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment