Skip to content

Instantly share code, notes, and snippets.

@bshlgrs
Created September 5, 2014 06:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bshlgrs/85c7100eb8e30a0d2fe3 to your computer and use it in GitHub Desktop.
Save bshlgrs/85c7100eb8e30a0d2fe3 to your computer and use it in GitHub Desktop.
reductions
from __future__ import division
import numpy
import numpy.random as nprand
import random
class MDP(object):
def __init__(self, gamma, number_of_actions = 2):
self.tables = [self.make_table() for x in range(number_of_actions)]
self.gamma = gamma
def clear_table(self, number_of_actions, number_of_states):
self.tables = [[[(0,0) for z in range(number_of_states)]
for y in range(number_of_states)]
for x in range(number_of_actions)]
# sets prob and reward for some action, start_state, and end_state
def set_params(self, action, start_state, end_state, prob, reward):
self.tables[action][start_state][end_state] = (prob, reward)
def make_table(self):
table = []
for start_state in range(3):
probabilities = [random.random() for x in range(3)]
while sum(probabilities) == 0:
probabilities = [random.random() for x in range(3)]
row = []
for end_state in range(3):
row.append((probabilities[end_state] / sum(probabilities),
random.random()))
table.append(row)
return table
def solve_v_values(self):
return solve_v_values(self.tables, self.gamma)
# Require that |Q*(state0,i) - Q*(state1,i)| < epsilon for all actions i.
def obeys_requirement(self, epsilon):
values = solve_q_values(self.tables, self.gamma)
for action in range(len(self.tables)):
state0_value = values[0][action]
state1_value = values[1][action]
if abs(state0_value - state1_value) > epsilon:
return False
return True
# Reduced MDP has two states: {x,y}, two actions {f,g}, and rewards in [0,1]. Transition probabilities are (lhs is reduced MDP, rhs is old mdp):
# p(x,r|y,i) = p(a,r|c,i) + p(b,r|c,i)
# p(y,r|y,i) = p(c,r|c,i)
# p(y,r|x,i) = B(a)p(c,r|a,i) + B(b)p(c,r|b,i)
# p(x,r|x,i) = B(a)(p(a,r|a,i) + p(b,r|b,i)) + B(b)(p(a,r|b,i) + p(b,r|b,i))
# Table is an array with dimension num_actions * num_states * num_states
# each entry is a tuple of (probability, reward)
def reduce_mdp(self, prior_of_a = random.random()):
assert len(self.tables) == 2 and len(self.tables[0]) == 3
reduced_mdp = MDP(self.gamma)
reduced_mdp.tables = []
for action in range(len(self.tables)):
# p(a|b)
def p(a, b):
return self.tables[action][b][a][0]
# r(a|b)
def r(a, b):
return self.tables[action][b][a][1]
x = 0
y = 1
a = 0
b = 1
c = 2
B_a = prior_of_a
B_b = 1 - prior_of_a
reduced_mdp.tables.append(
[[ (B_a * (p(a,a) + p(b,a)) + B_b * (p(a,b) + p(b,b)),
((B_a * (r(a,a) * p(a,a) + r(b,a) * p(b,a)) +
B_b * (r(a,b) * p(a,b) + r(b,b) * p(b,b))) /
(B_a * (p(b,a) + p(a,a)) + B_b * (p(b,b) + p(a,b)) or 1)
)),
(
B_a * p(c,a) + B_b * p(c,b)
, ((B_a * p(c,a) * r(c,a) + B_b * p(c,b) * r(c,b)) /
(B_a * p(c,a) + B_b * p(c,b) or 1))
)],
[(
p(a,c) + p(b,c),
r(a,c) * p(a,c) + r(b,c) * p(b,c) ),
(
p(c,c),
r(c,c))]])
return reduced_mdp
def print_table(table):
number_of_states = len(table[0])
number_of_actions = len(table)
print "MDP:"
for action in range(number_of_actions):
print " Action: %d"%action
for state in range(number_of_states):
print " From state %d"%state
for new_state in range(number_of_states):
prob, reward = table[action][state][new_state]
if prob > 0:
print " To state %d: probability %f, reward %f"%(new_state, prob, reward)
simple_problem = [[[
(0.5, 1), # from 0 to 0
(0.5, 0)], # from 0 to 1
[(0.7, 0.5), # from 1 to 0
(0.3, 3)]]] # from 1 to 1
other_simple_problem = [[[
(1, 1), # from 0 to 0
(0, 1)], # from 0 to 1
[(1, 0), # from 1 to 0
(0, 0)]]]
good_action_problem = [[[
(1, 1), # from 0 to 0
(0, 1)], # from 0 to 1
[(0, 0), # from 1 to 0
(1, 0)]],
[[
(1, 1), # from 0 to 0
(0, 1)], # from 0 to 1
[(1, 0), # from 1 to 0
(0, 0)]]]
trivial_problem = [[[(1, 1)]]]
def solve_v_values(table, gamma, optimal = True, policy = None):
number_of_states = len(table[0])
number_of_actions = len(table)
if not policy:
policy = [0] * number_of_states
values = [0] * number_of_states
delta = 1
while delta > 0.00001:
delta = 0
# policy evaluation
for state in range(number_of_states):
old_value = values[state]
new_value = 0
action = policy[state]
for new_state in range(number_of_states):
prob, reward = table[action][state][new_state]
new_value += prob * (reward + gamma * values[new_state])
values[state] = new_value
delta = max(delta, abs(new_value - old_value))
if optimal:
old_policy = list(policy)
# policy improvement
for state in range(number_of_states):
best_val = 0
best_action = 0
for action in range(number_of_actions):
val = 0
for new_state in range(number_of_states):
prob, reward = table[action][state][new_state]
val += prob * (reward + gamma * values[new_state])
if val > best_val:
best_action = action
best_val = val
policy[state] = best_action
if policy != old_policy:
# this makes it do the loop again
delta = 1
return (values, policy)
def solve_q_values(table, gamma): # of optimal policy
number_of_states = len(table[0])
number_of_actions = len(table)
values, policy = solve_v_values(table, gamma)
out = []
for state in range(number_of_states):
row = []
for action in range(number_of_actions):
val = 0
for new_state in range(number_of_states):
prob, reward = table[action][state][new_state]
val += prob * (reward + gamma * values[new_state])
row.append(val)
out.append(row)
return out
# Want to generate a random MDP with three states {a,b,c}, two actions {x,y}, and rewards in [0,1].
# Need two random 3x3 matrices with entries T^i_{jk} specifying the probability of m
# Also need two random 3x3 matrices with entries R^i_{jk} specifying the reward gained when transitioning from state a to state b given action c. (Rewards are deterministic here, randomly generated rewards that are a stochastic function of start state, end state, and action would be better)
# Require that |Q*(a,i) - Q*(b,i)| < epsilon for all actions i.
# Want a joint probability distribution on {a,b} called B(a) and B(b).
# Reduced MDP has two states: {ab,c}, two actions {x,y}, and rewards in [0,1]. Transition probabilities are (lhs is reduced MDP, rhs is old mdp):
# p(ab,r|c,i) = p(a,r|c,i) + p(b,r|c,i)
# p(c,r|c,i) = p(c,r|c,i)
# p(c,r|ab,i) = B(a)p(c,r|a,i) + B(b)p(c,r|b,i)
# p(ab,r|ab,i) = B(a)(p(a,r|a,i) + p(b,r|b,i)) + B(b)(p(a,r|b,i) + p(b,r|b,i))
def do_stuff_with_mdp(mdp):
# Solve for optimal value V* of each state in the original MDP
orig_v_values, orig_policy = mdp.solve_v_values()
# CHANGE THIS TO A RANDOM
reduced_mdp = mdp.reduce_mdp(0.5)
# Solve for optimal policy in the reduced MDP
reduced_v_values, reduced_policy = reduced_mdp.solve_v_values()
# Solve for value V^P of each state in the original MDP where you
# in states a and b you execute the optimal policy for state ab of the reduced MDP,
# and in state c you execute the optimal policy for state c of the reduced MDP.
best_action_from_ab = reduced_policy[0]
best_action_from_c = reduced_policy[1]
expanded_policy = [best_action_from_ab, best_action_from_ab, best_action_from_c]
if expanded_policy == orig_policy:
return {"max_difference": 0}
other_v_values, other_policy = solve_v_values(mdp.tables, mdp.gamma,
False, expanded_policy)
# Return the max difference V* - V^P where states are varied.
return {"max_difference": max(abs(x-y) for (x,y) in zip(orig_v_values, other_v_values)),
"reduced_mdp": reduced_mdp,
"other_v_values": other_v_values,
"orig_policy": orig_policy,
"orig_v_values": orig_v_values,
"other_policy": other_policy }
def main():
# Numbers epsilon \in [0,1] and gamma \in [0,1) entered in by hand at start
# epsilon = input("Enter epsilon \\in [0,1]: ")
# gamma = input("Enter gamma \\in [0,1): ")
epsilon = 0.1
gamma = 0.2
# num_trials = input("how many trials do you want? ")
num_trials = 100000
best = None
best_value = 0
# Repeat a bajillion times. Return the maximum difference V* - V^P, and the parameters T^i_{jk}, R^i_{jk}, and B(a).
for a in range(num_trials):
if a % 10000 == 0:
print "on trial ", a
mdp = MDP(gamma)
while not mdp.obeys_requirement(epsilon):
mdp = MDP(gamma)
stuff = do_stuff_with_mdp(mdp)
new_value = stuff["max_difference"]
if new_value > best_value:
best = (mdp, stuff)
best_value = new_value
print(best[1])
print_table(best[0].tables)
print_table(best[1]["reduced_mdp"].tables)
table = [[[(0.0, 0.0), (0.0, 0.0), (1.0, 0.0)], [(1.0, 0.0), (0.0, 1.0), (0.0, 0.0)], [(0.0, 1.0), (1.0, 0.0), (0.0, 1.0)]], [[(0.0, 1.0), (0.0, 0.0), (1.0, 1.0)], [(0.0, 0.0), (1.0, 1.0), (0.0, 1.0)], [(0.0, 0.0), (0.0, 0.0), (1.0, 1.0)]]]
if __name__ == '__main__':
# print_table(table)
# print solve_v_values(table,0.1)
main()
# my_mdp = MDP(0.1) # make mdp
# my_mdp.clear_table(2, 3) # 2 actions, 3 states
# my_mdp.set_params(0, 0, 1, 1, 1)
# my_mdp.set_params(0, 1, 2, 1, 0)
# my_mdp.set_params(0, 2, 2, 1, 0)
# my_mdp.set_params(1, 0, 1, 1, 1)
# my_mdp.set_params(1, 1, 2, 1, 0)
# my_mdp.set_params(1, 2, 2, 1, 0)
# print_table(my_mdp.tables)
# reduced_mdp = my_mdp.reduce_mdp(0.7)
# print_table(reduced_mdp.tables)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment