Skip to content

Instantly share code, notes, and snippets.

@bshlgrs
Created August 31, 2014 11:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bshlgrs/e409a2361f012813b1b8 to your computer and use it in GitHub Desktop.
Save bshlgrs/e409a2361f012813b1b8 to your computer and use it in GitHub Desktop.
# Ask Marcus about the thing marked "I AM NOT SURE ABOUT THIS"
import numpy
import numpy.random as nprand
import random
# Numbers epsilon \in [0,1] and gamma \in [0,1) entered in by hand at start
# epsilon = raw_input("Enter epsilon \\in [0,1]")
# gamma = raw_input("Enter epsilon \\in [0,1)")
# num_trials = raw_input("how many trials do you want?")
class MDP(object):
def __init__(self, gamma, number_of_actions = 2):
self.tables = [self.make_table() for x in range(number_of_actions)]
self.gamma = gamma
def clear_table(self, number_of_actions, number_of_states):
self.tables = [[[(0,0) for z in range(number_of_states)]
for y in range(number_of_states)]
for x in range(number_of_actions)]
# sets prob and reward for some action, start_state, and end_state
def set_params(self, action, start_state, end_state, prob, reward):
self.tables[action][start_state][end_state] = (prob, reward)
def make_table(self):
table = []
for start_state in range(3):
probabilities = [random.random() for x in range(3)]
row = []
for end_state in range(3):
row.append((probabilities[end_state] / sum(probabilities),
random.random()))
table.append(row)
return table
def solve_v_values(self):
return solve_v_values(self.table, self.gamma)
# Require that |Q*(state0,i) - Q*(state1,i)| < epsilon for all actions i.
def obeys_requirement(self, epsilon):
values = solve_q_values(self.tables, self.gamma)
for action in range(len(self.tables)):
state0_value = values[0][action]
state1_value = values[1][action]
if abs(state0_value - state1_value) > epsilon:
return False
return True
# Reduced MDP has two states: {x,y}, two actions {f,g}, and rewards in [0,1]. Transition probabilities are (lhs is reduced MDP, rhs is old mdp):
# p(x,r|y,i) = p(a,r|c,i) + p(b,r|c,i)
# p(y,r|y,i) = p(c,r|c,i)
# p(y,r|x,i) = B(a)p(c,r|a,i) + B(b)p(c,r|b,i)
# p(x,r|x,i) = B(a)(p(a,r|a,i) + p(b,r|b,i)) + B(b)(p(a,r|b,i) + p(b,r|b,i))
# Table is an array with dimension num_actions * num_states * num_states
# each entry is a tuple of (probability, reward)
def reduce_mdp(self, prior_of_a = random.random()):
assert len(self.tables) == 2 and len(self.tables[0]) == 3
reduced_mdp = MDP(self.gamma)
reduced_mdp.tables = []
for action in range(len(self.tables)):
# p(a|b)
def p(a, b):
return self.tables[action][b][a][0]
# r(a|b)
def r(a, b):
return self.tables[action][b][a][1]
x = 0
y = 1
a = 0
b = 1
c = 2
B_a = prior_of_a
B_b = 1 - prior_of_a
# I AM NOT SURE ABOUT THIS:
# Is there a difference between stochastic rewards and averaging?
# Hmmm?
reduced_mdp.tables.append(
[[( # P(x|x, i) = B(a)(p(a|a,i) + p(a|b,i)) + B(b)(p(a|b,i) + p(b|b,i))
B_a * (p(a,a) + p(b,a)) + B_b * (p(a,b) + p(b,b)),
# r(x|x,i) = B(a) * (r(a|a,i) * p(a|a,i) + r(b|a,i) * p(b|a,i)) +
# B(b) * (r(b|a,i) * p(b|a,i) + r(b|b,i) * p(b|b,i))
B_a * (r(a,a) * p(a,a) + r(b,a) * p(b,a)) +
B_b * (r(a,b) * p(a,b) + r(b,b) * p(b,b))
),
( # P(y|x) = B(a) * p(c|a,i) + B(b) * p(c|b,i)
B_a * p(c,a) + B_b * p(c,b)
# r(y|x) = B(a) * p(c|a) * r(c|a) + B(b) * p(c|b) * r(c|b)
, B_a * p(c,a) * r(c,a) + B_b * p(c,b) * r(c,b))],
[( # P(x|y) = P(a|c) + P(b|c)
p(a,c) + p(b,c),
# r(x,y) = r(a|c) * p(a|c) + r(b|c) * p(b|c)
r(a,c) * p(a,c) + r(b,c) * p(b,c) ),
( # P(y|y) = P(c|c)
p(c,c),
# R(y|y) = R(c|c)
r(c,c))]])
return reduced_mdp
def print_table(table):
number_of_states = len(table[0])
number_of_actions = len(table)
for action in range(number_of_actions):
print "Action: %d"%action
for state in range(number_of_states):
print " From state %d"%state
for new_state in range(number_of_states):
prob, reward = table[action][state][new_state]
if prob > 0:
print " To state %d: probability %f, reward %f"%(new_state, prob, reward)
simple_problem = [[[
(0.5, 1), # from 0 to 0
(0.5, 0)], # from 0 to 1
[(0.7, 0.5), # from 1 to 0
(0.3, 3)]]] # from 1 to 1
other_simple_problem = [[[
(1, 1), # from 0 to 0
(0, 1)], # from 0 to 1
[(1, 0), # from 1 to 0
(0, 0)]]]
good_action_problem = [[[
(1, 1), # from 0 to 0
(0, 1)], # from 0 to 1
[(0, 0), # from 1 to 0
(1, 0)]],
[[
(1, 1), # from 0 to 0
(0, 1)], # from 0 to 1
[(1, 0), # from 1 to 0
(0, 0)]]]
trivial_problem = [[[(1, 1)]]]
def solve_v_values(table, gamma, optimal = True):
number_of_states = len(table[0])
number_of_actions = len(table)
values = [0] * number_of_states
policy = [0] * number_of_states
delta = 1
while delta > 0.00001:
delta = 0
# policy evaluation
for state in range(number_of_states):
old_value = values[state]
new_value = 0
action = policy[state]
for new_state in range(number_of_states):
prob, reward = table[action][state][new_state]
new_value += prob * (reward + gamma * values[new_state])
values[state] = new_value
delta = max(delta, abs(new_value - old_value))
if optimal:
# policy improvement
for state in range(number_of_states):
best_val = 0
best_action = 0
for action in range(number_of_actions):
val = 0
for new_state in range(number_of_states):
prob, reward = table[action][state][new_state]
val += prob * (reward + gamma * values[new_state])
if val > best_val:
best_action = action
best_val = val
policy[state] = best_action
return (values, policy)
def solve_q_values(table, gamma): # of optimal policy
number_of_states = len(table[0])
number_of_actions = len(table)
values, policy = solve_v_values(table, gamma)
out = []
for state in range(number_of_states):
row = []
for action in range(number_of_actions):
val = 0
for new_state in range(number_of_states):
prob, reward = table[action][state][new_state]
val += prob * (reward + gamma * values[new_state])
row.append(val)
out.append(row)
return out
# print solve_v_values(good_action_problem, 0.5)
# print solve_q_values(good_action_problem, 0.5)
# Want to generate a random MDP with three states {a,b,c}, two actions {x,y}, and rewards in [0,1].
# Need two random 3x3 matrices with entries T^i_{jk} specifying the probability of m
# Also need two random 3x3 matrices with entries R^i_{jk} specifying the reward gained when transitioning from state a to state b given action c. (Rewards are deterministic here, randomly generated rewards that are a stochastic function of start state, end state, and action would be better)
# Require that |Q*(a,i) - Q*(b,i)| < epsilon for all actions i.
# Want a joint probability distribution on {a,b} called B(a) and B(b).
# Reduced MDP has two states: {ab,c}, two actions {x,y}, and rewards in [0,1]. Transition probabilities are (lhs is reduced MDP, rhs is old mdp):
# p(ab,r|c,i) = p(a,r|c,i) + p(b,r|c,i)
# p(c,r|c,i) = p(c,r|c,i)
# p(c,r|ab,i) = B(a)p(c,r|a,i) + B(b)p(c,r|b,i)
# p(ab,r|ab,i) = B(a)(p(a,r|a,i) + p(b,r|b,i)) + B(b)(p(a,r|b,i) + p(b,r|b,i))
# Solve for optimal value V* of each state in the original MDP
# Solve for optimal policy in the reduced MDP
# Solve for value V^P of each state in the original MDP where you in states a and b you execute the optimal policy for state ab of the reduced MDP, and in state c you execute the optimal policy for state c of the reduced MDP.
# Return the max difference V* - V^P where states are varied.
# Repeat a bajillion times. Return the maximum difference V* - V^P, and the parameters T^i_{jk}, R^i_{jk}, and B(a).
my_mdp = MDP(0.1) # make mdp
my_mdp.clear_table(2, 3) # 2 actions, 3 states
my_mdp.set_params(0, 0, 2, 1, 0)
my_mdp.set_params(0, 1, 2, 1, 1)
my_mdp.set_params(0, 2, 2, 1, 0)
my_mdp.set_params(1, 0, 2, 1, 0)
my_mdp.set_params(1, 1, 2, 1, 1)
my_mdp.set_params(1, 2, 2, 1, 0)
print_table(my_mdp.tables)
reduced_mdp = my_mdp.reduce_mdp(0.7)
print_table(reduced_mdp.tables)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment