Created
September 5, 2014 06:44
-
-
Save bshlgrs/85c7100eb8e30a0d2fe3 to your computer and use it in GitHub Desktop.
reductions
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import division | |
import numpy | |
import numpy.random as nprand | |
import random | |
class MDP(object): | |
def __init__(self, gamma, number_of_actions = 2): | |
self.tables = [self.make_table() for x in range(number_of_actions)] | |
self.gamma = gamma | |
def clear_table(self, number_of_actions, number_of_states): | |
self.tables = [[[(0,0) for z in range(number_of_states)] | |
for y in range(number_of_states)] | |
for x in range(number_of_actions)] | |
# sets prob and reward for some action, start_state, and end_state | |
def set_params(self, action, start_state, end_state, prob, reward): | |
self.tables[action][start_state][end_state] = (prob, reward) | |
def make_table(self): | |
table = [] | |
for start_state in range(3): | |
probabilities = [random.random() for x in range(3)] | |
while sum(probabilities) == 0: | |
probabilities = [random.random() for x in range(3)] | |
row = [] | |
for end_state in range(3): | |
row.append((probabilities[end_state] / sum(probabilities), | |
random.random())) | |
table.append(row) | |
return table | |
def solve_v_values(self): | |
return solve_v_values(self.tables, self.gamma) | |
# Require that |Q*(state0,i) - Q*(state1,i)| < epsilon for all actions i. | |
def obeys_requirement(self, epsilon): | |
values = solve_q_values(self.tables, self.gamma) | |
for action in range(len(self.tables)): | |
state0_value = values[0][action] | |
state1_value = values[1][action] | |
if abs(state0_value - state1_value) > epsilon: | |
return False | |
return True | |
# Reduced MDP has two states: {x,y}, two actions {f,g}, and rewards in [0,1]. Transition probabilities are (lhs is reduced MDP, rhs is old mdp): | |
# p(x,r|y,i) = p(a,r|c,i) + p(b,r|c,i) | |
# p(y,r|y,i) = p(c,r|c,i) | |
# p(y,r|x,i) = B(a)p(c,r|a,i) + B(b)p(c,r|b,i) | |
# p(x,r|x,i) = B(a)(p(a,r|a,i) + p(b,r|b,i)) + B(b)(p(a,r|b,i) + p(b,r|b,i)) | |
# Table is an array with dimension num_actions * num_states * num_states | |
# each entry is a tuple of (probability, reward) | |
def reduce_mdp(self, prior_of_a = random.random()): | |
assert len(self.tables) == 2 and len(self.tables[0]) == 3 | |
reduced_mdp = MDP(self.gamma) | |
reduced_mdp.tables = [] | |
for action in range(len(self.tables)): | |
# p(a|b) | |
def p(a, b): | |
return self.tables[action][b][a][0] | |
# r(a|b) | |
def r(a, b): | |
return self.tables[action][b][a][1] | |
x = 0 | |
y = 1 | |
a = 0 | |
b = 1 | |
c = 2 | |
B_a = prior_of_a | |
B_b = 1 - prior_of_a | |
reduced_mdp.tables.append( | |
[[ (B_a * (p(a,a) + p(b,a)) + B_b * (p(a,b) + p(b,b)), | |
((B_a * (r(a,a) * p(a,a) + r(b,a) * p(b,a)) + | |
B_b * (r(a,b) * p(a,b) + r(b,b) * p(b,b))) / | |
(B_a * (p(b,a) + p(a,a)) + B_b * (p(b,b) + p(a,b)) or 1) | |
)), | |
( | |
B_a * p(c,a) + B_b * p(c,b) | |
, ((B_a * p(c,a) * r(c,a) + B_b * p(c,b) * r(c,b)) / | |
(B_a * p(c,a) + B_b * p(c,b) or 1)) | |
)], | |
[( | |
p(a,c) + p(b,c), | |
r(a,c) * p(a,c) + r(b,c) * p(b,c) ), | |
( | |
p(c,c), | |
r(c,c))]]) | |
return reduced_mdp | |
def print_table(table): | |
number_of_states = len(table[0]) | |
number_of_actions = len(table) | |
print "MDP:" | |
for action in range(number_of_actions): | |
print " Action: %d"%action | |
for state in range(number_of_states): | |
print " From state %d"%state | |
for new_state in range(number_of_states): | |
prob, reward = table[action][state][new_state] | |
if prob > 0: | |
print " To state %d: probability %f, reward %f"%(new_state, prob, reward) | |
simple_problem = [[[ | |
(0.5, 1), # from 0 to 0 | |
(0.5, 0)], # from 0 to 1 | |
[(0.7, 0.5), # from 1 to 0 | |
(0.3, 3)]]] # from 1 to 1 | |
other_simple_problem = [[[ | |
(1, 1), # from 0 to 0 | |
(0, 1)], # from 0 to 1 | |
[(1, 0), # from 1 to 0 | |
(0, 0)]]] | |
good_action_problem = [[[ | |
(1, 1), # from 0 to 0 | |
(0, 1)], # from 0 to 1 | |
[(0, 0), # from 1 to 0 | |
(1, 0)]], | |
[[ | |
(1, 1), # from 0 to 0 | |
(0, 1)], # from 0 to 1 | |
[(1, 0), # from 1 to 0 | |
(0, 0)]]] | |
trivial_problem = [[[(1, 1)]]] | |
def solve_v_values(table, gamma, optimal = True, policy = None): | |
number_of_states = len(table[0]) | |
number_of_actions = len(table) | |
if not policy: | |
policy = [0] * number_of_states | |
values = [0] * number_of_states | |
delta = 1 | |
while delta > 0.00001: | |
delta = 0 | |
# policy evaluation | |
for state in range(number_of_states): | |
old_value = values[state] | |
new_value = 0 | |
action = policy[state] | |
for new_state in range(number_of_states): | |
prob, reward = table[action][state][new_state] | |
new_value += prob * (reward + gamma * values[new_state]) | |
values[state] = new_value | |
delta = max(delta, abs(new_value - old_value)) | |
if optimal: | |
old_policy = list(policy) | |
# policy improvement | |
for state in range(number_of_states): | |
best_val = 0 | |
best_action = 0 | |
for action in range(number_of_actions): | |
val = 0 | |
for new_state in range(number_of_states): | |
prob, reward = table[action][state][new_state] | |
val += prob * (reward + gamma * values[new_state]) | |
if val > best_val: | |
best_action = action | |
best_val = val | |
policy[state] = best_action | |
if policy != old_policy: | |
# this makes it do the loop again | |
delta = 1 | |
return (values, policy) | |
def solve_q_values(table, gamma): # of optimal policy | |
number_of_states = len(table[0]) | |
number_of_actions = len(table) | |
values, policy = solve_v_values(table, gamma) | |
out = [] | |
for state in range(number_of_states): | |
row = [] | |
for action in range(number_of_actions): | |
val = 0 | |
for new_state in range(number_of_states): | |
prob, reward = table[action][state][new_state] | |
val += prob * (reward + gamma * values[new_state]) | |
row.append(val) | |
out.append(row) | |
return out | |
# Want to generate a random MDP with three states {a,b,c}, two actions {x,y}, and rewards in [0,1]. | |
# Need two random 3x3 matrices with entries T^i_{jk} specifying the probability of m | |
# Also need two random 3x3 matrices with entries R^i_{jk} specifying the reward gained when transitioning from state a to state b given action c. (Rewards are deterministic here, randomly generated rewards that are a stochastic function of start state, end state, and action would be better) | |
# Require that |Q*(a,i) - Q*(b,i)| < epsilon for all actions i. | |
# Want a joint probability distribution on {a,b} called B(a) and B(b). | |
# Reduced MDP has two states: {ab,c}, two actions {x,y}, and rewards in [0,1]. Transition probabilities are (lhs is reduced MDP, rhs is old mdp): | |
# p(ab,r|c,i) = p(a,r|c,i) + p(b,r|c,i) | |
# p(c,r|c,i) = p(c,r|c,i) | |
# p(c,r|ab,i) = B(a)p(c,r|a,i) + B(b)p(c,r|b,i) | |
# p(ab,r|ab,i) = B(a)(p(a,r|a,i) + p(b,r|b,i)) + B(b)(p(a,r|b,i) + p(b,r|b,i)) | |
def do_stuff_with_mdp(mdp): | |
# Solve for optimal value V* of each state in the original MDP | |
orig_v_values, orig_policy = mdp.solve_v_values() | |
# CHANGE THIS TO A RANDOM | |
reduced_mdp = mdp.reduce_mdp(0.5) | |
# Solve for optimal policy in the reduced MDP | |
reduced_v_values, reduced_policy = reduced_mdp.solve_v_values() | |
# Solve for value V^P of each state in the original MDP where you | |
# in states a and b you execute the optimal policy for state ab of the reduced MDP, | |
# and in state c you execute the optimal policy for state c of the reduced MDP. | |
best_action_from_ab = reduced_policy[0] | |
best_action_from_c = reduced_policy[1] | |
expanded_policy = [best_action_from_ab, best_action_from_ab, best_action_from_c] | |
if expanded_policy == orig_policy: | |
return {"max_difference": 0} | |
other_v_values, other_policy = solve_v_values(mdp.tables, mdp.gamma, | |
False, expanded_policy) | |
# Return the max difference V* - V^P where states are varied. | |
return {"max_difference": max(abs(x-y) for (x,y) in zip(orig_v_values, other_v_values)), | |
"reduced_mdp": reduced_mdp, | |
"other_v_values": other_v_values, | |
"orig_policy": orig_policy, | |
"orig_v_values": orig_v_values, | |
"other_policy": other_policy } | |
def main(): | |
# Numbers epsilon \in [0,1] and gamma \in [0,1) entered in by hand at start | |
# epsilon = input("Enter epsilon \\in [0,1]: ") | |
# gamma = input("Enter gamma \\in [0,1): ") | |
epsilon = 0.1 | |
gamma = 0.2 | |
# num_trials = input("how many trials do you want? ") | |
num_trials = 100000 | |
best = None | |
best_value = 0 | |
# Repeat a bajillion times. Return the maximum difference V* - V^P, and the parameters T^i_{jk}, R^i_{jk}, and B(a). | |
for a in range(num_trials): | |
if a % 10000 == 0: | |
print "on trial ", a | |
mdp = MDP(gamma) | |
while not mdp.obeys_requirement(epsilon): | |
mdp = MDP(gamma) | |
stuff = do_stuff_with_mdp(mdp) | |
new_value = stuff["max_difference"] | |
if new_value > best_value: | |
best = (mdp, stuff) | |
best_value = new_value | |
print(best[1]) | |
print_table(best[0].tables) | |
print_table(best[1]["reduced_mdp"].tables) | |
table = [[[(0.0, 0.0), (0.0, 0.0), (1.0, 0.0)], [(1.0, 0.0), (0.0, 1.0), (0.0, 0.0)], [(0.0, 1.0), (1.0, 0.0), (0.0, 1.0)]], [[(0.0, 1.0), (0.0, 0.0), (1.0, 1.0)], [(0.0, 0.0), (1.0, 1.0), (0.0, 1.0)], [(0.0, 0.0), (0.0, 0.0), (1.0, 1.0)]]] | |
if __name__ == '__main__': | |
# print_table(table) | |
# print solve_v_values(table,0.1) | |
main() | |
# my_mdp = MDP(0.1) # make mdp | |
# my_mdp.clear_table(2, 3) # 2 actions, 3 states | |
# my_mdp.set_params(0, 0, 1, 1, 1) | |
# my_mdp.set_params(0, 1, 2, 1, 0) | |
# my_mdp.set_params(0, 2, 2, 1, 0) | |
# my_mdp.set_params(1, 0, 1, 1, 1) | |
# my_mdp.set_params(1, 1, 2, 1, 0) | |
# my_mdp.set_params(1, 2, 2, 1, 0) | |
# print_table(my_mdp.tables) | |
# reduced_mdp = my_mdp.reduce_mdp(0.7) | |
# print_table(reduced_mdp.tables) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment