Last active
October 16, 2018 08:53
-
-
Save hcl14/a8c07b5de98f7325e8bd44e4d4123c37 to your computer and use it in GitHub Desktop.
Q-learning behavior
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Simple example of Q-learning inability to go in loops | |
# Though it is strictly forbibben by the code (line 101), | |
# but you can comment out that logic and see that algorithm just becomes less stable | |
# The reason is that loop is impossible in this setup, | |
# as only a single Q-value exists for each position on the map | |
import numpy as np | |
np.random.seed(0) | |
# Tricky map: Q-learning cannot first collect 5 and return to 0, then go for 10 | |
# (if we can only go left, right, up and down) | |
# It can only oscillate between two possible path: go for 5, and stay there, | |
# or go for 10 | |
# -10 is death, 10 is win | |
game_map = np.array([[0, 2, 0], | |
[0 , 0, 0], | |
[5, -10, 10]], dtype=np.int32) | |
# Game which allows only going left, right, up and down | |
# game_map is np array with map, max_steps is maximum steps allowed | |
class Game: | |
def __init__(self, game_map, max_steps, finish_coords): | |
self.action_size = 4 # only 4 actions | |
self.state_size = game_map.size | |
self.game_map = game_map | |
# coordinates of finish | |
self.finish_coords = np.array(finish_coords, dtype=np.int32) | |
self.max_x = self.game_map.shape[0] | |
self.max_y = self.game_map.shape[1] | |
self.max_steps = max_steps | |
# initialize other variables | |
self.reset() | |
def reset(self): | |
# current position on the map | |
self.current_state = np.array([0,0], dtype=np.int32) | |
self.steps_made = 0 | |
# map with path passed, need float for nans | |
self.progress_map = self.game_map.copy().astype(np.float32) | |
self.progress_map[0,0] = np.nan # np.nan will indicate passed square | |
self.game_finished = 0 | |
def get_state_number(self, state): | |
# return state number | |
return state[0] * (self.max_y) + state[1] | |
def draw_legend(self): | |
# draw state numbers map | |
arr = np.zeros([self.max_x,self.max_y]) | |
for i in range(0,self.max_x): | |
for j in range(0,self.max_y): | |
arr[i,j] = self.get_state_number([i,j]) | |
print(arr) | |
def get_map_increment(self,input_action): | |
# input_action = 0, 1, 2, 3 | |
action = np.zeros(4) | |
action[input_action] = 1 | |
# multiarmed bandit: | |
''' | |
actions are: | |
[1, 0, 0, 0] - up 0 | |
[0, 1, 0, 0] - down 1 | |
[0, 0, 1, 0] - left 2 | |
[0, 0, 0, 1] - right 3 | |
''' | |
# transform to [+-1, +-1] | |
position_change = np.array([-action[0] + action[1], -action[2] + action[3]], dtype=np.int32) | |
return position_change | |
# function which returns reward and updates position | |
def step(self, input_action): | |
# update position on the map | |
position_change = self.get_map_increment(input_action) | |
new_state = self.current_state + position_change | |
self.steps_made += 1 | |
reward = 0 # in case we did not made a move | |
if ((new_state[0] >= 0) and (new_state[0] < self.max_x)) and ((new_state[1] >= 0) and (new_state[1] < self.max_y)): | |
# action is correct, update parameters, path on the map and return reward | |
reward = self.progress_map[tuple(new_state)] | |
# if we have already been there - no reason to return | |
# as Q-learning does not allow loops | |
if np.isnan(reward): | |
reward = 0 | |
else: | |
self.current_state = new_state | |
# mark position as passed | |
self.progress_map[tuple(new_state)] = np.nan | |
# check if we died: | |
if reward < 0: | |
self.game_finished = 1 | |
# check if we finished: | |
if (new_state == self.finish_coords).all(): | |
self.game_finished = 1 | |
# check if we are out of action points: | |
if self.steps_made >= self.max_steps: | |
self.game_finished = 1 | |
return self.get_state_number(self.current_state), reward, self.game_finished | |
# Game which allows cross-walk additionally | |
# game_map is np array with map, max_steps is maximum steps allowed | |
class Extended_Game(Game): | |
def __init__(self, *args): | |
super().__init__(*args) | |
self.action_size = 8 # now 8 actions | |
def get_map_increment(self, input_action): | |
# input_action = 0, 1, 2, 3, 4, 5, 6, 7 | |
action = np.zeros(4) | |
if input_action < 4: | |
action[input_action] = 1 | |
else: | |
switcher = { | |
4: np.array([1, 0, 1, 0], dtype=np.int32), | |
5: np.array([1, 0, 0, 1], dtype=np.int32), | |
6: np.array([0, 1, 1, 0], dtype=np.int32), | |
7: np.array([0, 1, 0, 1], dtype=np.int32) | |
} | |
action = switcher[input_action] | |
# multiarmed bandit: | |
''' | |
actions are: | |
[1, 0, 0, 0] - up 0 | |
[0, 1, 0, 0] - down 1 | |
[0, 0, 1, 0] - left 2 | |
[0, 0, 0, 1] - right 3 | |
[1, 0, 1, 0] - up-left 4 | |
[1, 0, 0, 1] - up-right 5 | |
[0, 1, 1, 0] - down-left 6 | |
[0, 1, 0, 1] - down-right 7 | |
''' | |
# transform to [+-1, +-1] | |
position_change = np.array([-action[0] + action[1], -action[2] + action[3]], dtype=np.int32) | |
return position_change | |
# Train and test procedures ----------------------- | |
def train(game): | |
action_size = game.action_size | |
state_size = game.state_size | |
# create Q table | |
qtable = np.zeros((state_size, action_size)) | |
# q-learning, based on the code | |
# https://medium.freecodecamp.org/diving-deeper-into-reinforcement-learning-with-q-learning-c18d0db58efe | |
# algorithm settings | |
total_episodes = 350 # Total episodes | |
learning_rate = 0.5 # Learning rate | |
gamma = 0.9 # Discounting rate | |
# Exploration parameters | |
max_epsilon = 1.0 # Exploration probability at start | |
epsilon = max_epsilon # Exploration rate | |
min_epsilon = 0.1 # Minimum exploration probability | |
decay_rate = 0.01 # Exponential decay rate for exploration prob | |
# Perform Q-learning until game finished | |
rewards = [] | |
for episode in range(total_episodes): | |
# Reset the environment | |
game.reset() | |
done = False | |
total_rewards = 0 | |
state = 0 | |
steps = 0 | |
# If done (if we're dead or won) : finish episode | |
while not done: | |
# 3. Choose an action a in the current world state (s) | |
## First we randomize a number | |
exp_exp_tradeoff = np.random.uniform(0, 1) | |
## If this number > greater than epsilon --> exploitation (taking the biggest Q value for this state) | |
if exp_exp_tradeoff > epsilon: | |
action = np.argmax(qtable[state,:]) | |
# Else doing a random choice --> exploration | |
else: | |
action = np.random.randint(0,action_size) | |
# Take the action (a) and observe the outcome state(s') and reward (r) | |
new_state, reward, done = game.step(action) | |
# Update Q(s,a):= Q(s,a) + lr [R(s,a) + gamma * max Q(s',a') - Q(s,a)] | |
# qtable[new_state,:] : all the actions we can take from new state | |
qtable[state, action] = qtable[state, action] + learning_rate * (reward + gamma * np.max(qtable[new_state, :]) - qtable[state, action]) | |
total_rewards += reward | |
# Our new state is state | |
state = new_state | |
steps += 1 | |
# Reduce epsilon (because we need less and less exploration) | |
epsilon = min_epsilon + (max_epsilon - min_epsilon)*np.exp(-decay_rate*episode) | |
rewards.append(total_rewards) | |
# terurn trained table and list of rewards during training | |
return qtable, rewards | |
# Evaluate: | |
def evaluate(game, qtable): | |
print('Evaluation') | |
game.reset() | |
state = 0 | |
done = False | |
total_rewards = 0 | |
steps = 0 | |
while not done: | |
action = np.argmax(qtable[state,:]) | |
new_state, reward, done = game.step(action) | |
state = new_state | |
total_rewards += reward | |
steps += 1 | |
print(new_state) | |
print('steps made: {}'.format(steps)) | |
print('reward: {}'.format(total_rewards)) | |
print('path (nans):') | |
print(game.progress_map) | |
## Train and evaluate models ---------------------------------- | |
# create ordinary game with max allowed steps = 15: | |
# end coordinates are in range [0, dim) | |
game = Game(game_map, 15, [2,2]) | |
print('Game states:') | |
game.draw_legend() | |
print('\nTraining on game with (left, right, up and down)') | |
qtable, rewards = train(game) | |
evaluate(game, qtable) | |
print('Max reward found: {}'. format(max(rewards))) | |
# create extended game with max allowed steps = 15: | |
game = Extended_Game(game_map, 15, [2,2]) | |
print('\nTraining on game with (left, right, up, down, up-left, up-right, down-left, down-right)') | |
qtable, rewards = train(game) | |
evaluate(game, qtable) | |
print('Max reward found: {}'. format(max(rewards))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment