Skip to content

Instantly share code, notes, and snippets.

@chetandhembre
Last active October 12, 2016 18:20
Show Gist options
  • Save chetandhembre/0e2709cc98b878c2d17cb562a371212a to your computer and use it in GitHub Desktop.
Save chetandhembre/0e2709cc98b878c2d17cb562a371212a to your computer and use it in GitHub Desktop.
SARSA on policy learning
#!#/usr/bin/python2
import numpy as np
import matplotlib
import math
from matplotlib import pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import matplotlib.mlab as mlab
from matplotlib.patches import Circle, Wedge, Polygon
from matplotlib.collections import PatchCollection
import matplotlib.patches as patches
"""
1: right
2: left
3: up
4: down
5: right-up
6: left-up
7: right-down
8: left-down
"""
ALLOWED_DIRECTIONS = [1, 2, 3, 4, 5, 6, 7, 8]
ALLOWED_VELOCITY_VALUE = [1, 2, 3, 4, 5]
HEIGHT = 5
WIDTH = 10
MAX_VELOCITY = 5
def create_number_tiles_row():
start = int(math.ceil(np.random.uniform(1, int(WIDTH / 2))))
end = int(math.ceil(np.random.uniform(start, WIDTH + 1)))
return start, end
def get_next_position(x1, y1, v, d):
if d == 1:
#right
x1 = x1 + v
elif d == 2:
#left
x1 = x1 - v
elif d == 3:
# up
y1 = y1 - v
elif d == 4:
# down
y1 = y1 + v
elif d == 5:
# right-up
x1 = x1 + v
y1 = y1 - v
elif d == 6:
# left-up
x1 = x1 - v
y1 = y1 - v
elif d == 7:
# right-down
x1 = x1 + v
y1 = y1 + v
else:
# left-down
x1 = x1 - v
y1 = y1 + v
return x1, y1
class State(object):
def __init__(self, x, y):
self.x = x
self.y = y
def __hash__(self):
return hash((self.x, self.y))
def __eq__(self, other):
return (self.x == other.x) and (self.y == other.y)
def __str__(self):
return str((self.x, self.y))
class Action(object):
def __init__(self, velocity, direction):
self.velocity = velocity
self.direction = direction
def __eq__(self, other):
return (self.velocity == other.velocity) and (self.direction == other.direction)
def __hash__(self):
return hash((self.velocity, self.direction))
def __str__(self):
return str((self.velocity, self.direction))
class StateAction(object):
def __init__(self, state, action):
self.state = state
self.action = action
def __eq__(self, other):
return self.state == other.state and self.action == other.action
def __hash__(self):
return hash((self.state, self.action))
def __str__(self):
return str((str(self.state), str(self.action)))
class GridRoad(object):
def __init__(self, width=WIDTH, height=HEIGHT):
self.width = width
self.height = height
self.value_map = {}
self.allowed_location = np.zeros((height, width))
self.state_action_visited = {}
self.state_visited = np.zeros((height, width))
self.N0 = 500
for i in range(height):
start, end = create_number_tiles_row()
self.allowed_location[i][int(start): int(end)] = 1
print self.allowed_location
self.initialized()
def is_allowed_poisition(self, position):
x, y = position
if x > self.height - 1 or y > self.width - 1 or x < 0 or y < 0:
return False
return self.allowed_location[x][y]
def initialized(self):
for i in range(self.height):
for j in range(self.width):
for v in ALLOWED_VELOCITY_VALUE:
for d in ALLOWED_DIRECTIONS:
state = State(i, j)
action = Action(v, d)
state_action = StateAction(state, action)
if self.allowed_location[i][j] == 0:
self.value_map[state_action] = -np.inf
else:
self.value_map[state_action] = 0
self.state_action_visited[state_action] = 0
def select_start_end(self):
for i in range(self.width):
if self.is_allowed_poisition((0, i)):
start = (0, i)
break
for i in range(self.width - 1, -1, -1):
if self.is_allowed_poisition((self.height - 1, i)):
end = (self.height - 1, i)
break
return start, end
def get_epsilon(self, position):
x, y = position
return self.N0 / float(self.N0 + (self.state_visited[x][y]))
def get_allowed_velocity(self, current_action=None):
if not current_action:
return [1]
v = current_action.velocity
allowed_velocity = [v, v - 1, v + 1]
return allowed_velocity
def get_intermediate_positions_allowed(self, current_position, current_action):
positions = []
position = current_position
x, y = position
v, d = current_action.velocity, current_action.direction
x, y = get_next_position(x, y, 1, d)
x1, y1 = get_next_position(x, y, v, d)
while not (x, y) == (x1, y1):
if not self.is_allowed_poisition((x, y)):
return False
x, y = get_next_position(x, y, 1, d)
return True
def get_greedy_action(self, position, current_action):
x, y = position
state = State(x, y)
max_action_val = - np.inf
max_action = None
allowed_velocity = self.get_allowed_velocity(current_action=current_action)
actions = []
is_same = True
for v in allowed_velocity:
if v > MAX_VELOCITY or v < 1:
continue
for d in ALLOWED_DIRECTIONS:
action = Action(v, d)
if not self.get_intermediate_positions_allowed(position, action):
continue
state_action = StateAction(state, action)
val = self.value_map[state_action]
if max_action_val <= val:
max_action = action
max_action_val = val
return max_action
def select_action(self, position, current_action=None, is_greedy=False, is_q_learning=False, is_next_action=False):
epsilon = self.get_epsilon(position)
actions = []
allowed_velocity = self.get_allowed_velocity(current_action=current_action)
for v in allowed_velocity:
if v > MAX_VELOCITY or v < 1:
continue
for d in ALLOWED_DIRECTIONS:
action = Action(v, d)
if not self.get_intermediate_positions_allowed(position, action):
continue
actions.append(action)
if len(actions) == 0:
return None
action_probability = np.full(len(actions), epsilon / len(actions))
greedy_action = self.get_greedy_action(position, current_action=current_action)
if is_greedy or (is_next_action and is_q_learning):
return greedy_action
action_probability[actions.index(greedy_action)] = action_probability[actions.index(greedy_action)] + (1 - epsilon)
return actions[np.random.choice(np.arange(len(action_probability)), p=action_probability)]
def get_reward(self, position):
if not self.is_allowed_poisition(position):
return -5
return -1
def take_action(self, position, action):
x, y = position
x1, y1 = x, y
v, d = action.velocity, action.direction
x1, y1 = get_next_position(x1, y1, v, d)
is_allowed = self.is_allowed_poisition((x1, y1))
reward = self.get_reward(position)
return (x1, y1), reward, is_allowed
def visited_state_action(self, position, action):
x, y = position
state = State(x, y)
self.state_visited[x][y] = self.state_visited[x][y] + 1
if action is None:
return
action_state = StateAction(state, action)
self.state_action_visited[action_state] = self.state_action_visited[action_state] + 1
def get_alpha(self, position, action):
x, y = position
state = State(x, y)
state_action = StateAction(state, action)
return float(1 / self.state_action_visited[state_action])
def get_new_val(gamma, current_position, current_action, reward, next_position, next_action):
pass
def update_state_action_value(self, gamma, current_position, current_action, reward, next_position, next_action):
alpha = self.get_alpha(current_position, current_action)
x, y = current_position
current_state_action = StateAction(State(x, y), current_action)
current_val = self.value_map[current_state_action]
x1, y1 = next_position
is_allowed = self.is_allowed_poisition(next_position)
next_val = - np.inf
if is_allowed:
next_state_action = StateAction(State(x1, y1), next_action)
next_val = self.value_map[next_state_action]
new_val = current_val + alpha * (reward + (gamma * next_val) - current_val)
if math.isnan(new_val):
return False
if not math.isnan(new_val):
self.value_map[current_state_action] = new_val
return True
class Game(object):
def __init__(self, gamma, no_episode, is_q_learning=False):
self.gamma = gamma
self.grid_road = GridRoad()
self.no_episode = no_episode
self.start, self.end = self.grid_road.select_start_end()
self.episode_visits = {}
self.is_q_learning = is_q_learning
def plot(self):
total_plots = len(self.episode_visits.keys())
self.episode_visits[total_plots] = self.episode(is_greedy=True)
total_plots = total_plots + 1
print self.episode_visits.keys()
cols = 5
print total_plots / cols
rows = int(math.ceil(total_plots / cols)) if int(math.ceil(total_plots / cols)) > 0 else 2
episodes = 0
rows = 3
fig, axs = plt.subplots(rows, cols)
plt.figure(1, figsize=(50,50))
for i in range(rows):
for j in range(cols):
if episodes > total_plots - 1:
break
visited_positions = self.episode_visits[episodes]
x = []
y = []
ax = axs[i][j]
ax.plot(self.start[0], self.start[1], 'x', markersize=20)
previous_x = self.start[0]
previous_y = self.start[1]
for position in visited_positions[1:]:
x, y = position
ax.arrow(previous_x, previous_y, x - previous_x, y - previous_y, head_width=0.3, head_length=0.3, overhang=-1)
ax.plot(x, y, 'o', markersize=5)
previous_x = x
previous_y = y
ax.plot(self.end[0], self.end[1], 'x', markersize=20)
height, width = self.grid_road.allowed_location.shape
ax.add_patch(
patches.Rectangle(
(0, 0), # (x,y)
HEIGHT, # width
WIDTH, # height
alpha=0.1
)
)
for ii in range(height):
for jj in range(width):
if self.grid_road.allowed_location[ii][jj] == 0:
ax.plot(ii, jj, '^', markersize=5, c='red')
ax.set_xticks(np.arange(-WIDTH,2 * WIDTH + 1,1))
ax.set_yticks(np.arange(-HEIGHT,2 * HEIGHT + 1,1))
ax.set_title('episode ' + str(episodes * self.PLOT_FREQUENCY + 1))
ax.set_autoscale_on(False)
ax.grid()
episodes = episodes + 1
print "not visited!!!"
for i in range(HEIGHT):
for j in range(WIDTH):
if self.grid_road.state_visited[i][j] == 0 and self.grid_road.allowed_location[i][j] == 1:
print i, j
plt.show()
def episode(self, is_greedy=False):
moves = 0
current_position = self.start
current_action = None
visited_positions = []
while moves < HEIGHT * WIDTH * 2:
moves = moves + 1
visited_positions.append(current_position)
previous_action = current_action
current_action = self.grid_road.select_action(current_position, current_action=current_action, is_greedy=is_greedy, is_q_learning=self.is_q_learning)
self.grid_road.visited_state_action(current_position, current_action)
if State(current_position[0], current_position[1]) == State(self.end[0], self.end[1]):
if is_greedy:
print current_position, current_action, next_position, next_action
print "breaking"
break
if current_action is None:
print "can not process"
break
next_position, reward, is_allowed = self.grid_road.take_action(current_position, current_action)
if State(next_position[0], next_position[1]) == State(self.end[0], self.end[1]):
reward = 1
next_action = None
if is_allowed:
next_action = self.grid_road.select_action(next_position, current_action=current_action, is_greedy=is_greedy, is_q_learning=self.is_q_learning, is_next_action=True)
result = self.grid_road.update_state_action_value(self.gamma, current_position, current_action, reward, next_position, next_action)
if not is_allowed or not result:
if is_greedy:
print current_position, current_action, next_position, next_action
print "as"
break
current_position = next_position
return visited_positions
def play(self):
print self.start, self.end
self.PLOT_FREQUENCY = int(self.no_episode / 10)
for i in range(self.no_episode):
visited_positions = self.episode()
if i % self.PLOT_FREQUENCY == 0:
self.episode_visits[int(i / self.PLOT_FREQUENCY)] = visited_positions
game = Game(0.90, 10000, is_q_learning=True)
game.play()
game.plot()
game = Game(0.90, 10000, is_q_learning=False)
game.play()
game.plot()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment