Last active
January 17, 2023 20:35
-
-
Save maxwellpirtle/6a4f0290852fa1f3cf5932a43a60adda to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import math | |
try: | |
import numpy as np | |
print(f'Using numpy version {np.__version__}') | |
except: | |
print(f'Numpy is needed for this demo', file=sys.stderr) | |
exit(1) | |
def is_almost_unity(array, tolerance: float = 1.0e-9) -> bool: | |
"""Determines whether or not the given array-like | |
has a sum equal to 1.0 (within a specified tolerance) | |
@param array: An array-like whose sum is checked | |
@param tolerance: The error tolerance | |
""" | |
return math.isclose( | |
np.sum(array), | |
1.0, | |
rel_tol=tolerance | |
) | |
def validate_rows_are_unity(matrix: np.array): | |
"""Verifies that each row of the matrix sums | |
to 1.0 | |
""" | |
# Verify each row sums to unity | |
rows = matrix.shape[0] | |
unity = all( | |
( | |
is_almost_unity(matrix[row]) | |
for row in range(rows) | |
) | |
) | |
if not unity: | |
raise ValueError( | |
'Each row in the matrix must sum to 1 (i.e. the matrix must be stochastic)' | |
) | |
def validate_stochastic(stpm: np.array): | |
"""Verifies that the given numpy matrix is a | |
valid state transition probabilty matrix (i.e. | |
that the matrix is a stochastic matrix) | |
A stochastic matrix is one whose rows sum to | |
1.0 and is a square matrix | |
""" | |
stmp_rows = stpm.shape[0] | |
stmp_cols = stpm.shape[1] | |
if stmp_cols != stmp_rows: | |
raise ValueError( | |
'The state transition probability matrix must be a square' + | |
f' matrix, but rows ({stmp_rows}) != columns ({stmp_cols})' | |
) | |
validate_rows_are_unity(stpm) | |
def validate_stochastic_3d(stpma: np.array): | |
"""Verifies that the given 3D numpy matrix is a | |
valid state transition probabilty matrix (i.e. | |
that the matrix is a stochastic matrix) for an MDP | |
""" | |
stmp_planes, stmpa_rows, stmpa_cols = stpma.shape | |
if stmpa_cols != stmpa_rows: | |
raise ValueError( | |
'The state transition probability matrix must be a square' + | |
f' matrix for each plane, but rows ({stmpa_rows}) != columns ({stmpa_cols})' | |
) | |
for j in range(stmp_planes): | |
validate_stochastic(stpma[j]) | |
class RLMarkovRewardProcess: | |
"""Encapsulates the behavior of a Markov Reward Process | |
A Markov Reward Process (or MRP for short) is a 4-tuple | |
(S, P, R, y) where | |
1. S is a finite set of states that the system can | |
move into | |
2. P is a state transition probability matrix. This | |
is a square matrix whose element in the `i`th row and | |
`j`th column specify the probability of moving into | |
state `j` from state `i` | |
3. R is a reward function which assigns expected rewards | |
to each state | |
4. 0 <= y <= 1 is the discount factor. | |
A Markov Reward Process has the important Markov property, | |
which can be described mathematically succintly as | |
P(S_(t+1) | S_t) = P(S_(t+1) | S_1, S_2, ..., S_t) | |
In other words, the "future is independent of the past given | |
the present" (David Silverman UCL Lecture Slides) | |
""" | |
def __init__(self, state_labels, stpm, reward_vec, discount: float): | |
"""Creates a new Markov Reward Process object encapsulating | |
the process's dynamics | |
@param state_labels: A vector of human-readable labels for each | |
of the states represented in the MRP. This is optional: if None | |
is supplied, the states are labeled 1 to N (for N states) | |
@param stmp: The state transition probability matrix. The initializer | |
uses the size of the matrix to determine the number (N) of states in the | |
MRP. The matrix must be square and have rows which sum to unity | |
@param reward_vec: A vector describing the expected reward assigned | |
to each statein the MRP. This must have size N | |
@param discount: The discount factor of the MRP. The discount must be | |
a floating-point number between 0.0 and 1.0 | |
""" | |
validate_stochastic(stpm) | |
num_states, _ = stpm.shape | |
reward_vec_len = len(reward_vec) | |
if reward_vec_len != num_states: | |
raise ValueError( | |
'There must be exactly one reward assigned to each state, but' + | |
f'the number of rewards ({reward_vec_len}) != the size of the state' + | |
f'transition matrix({num_states})' | |
) | |
self.__initialize_state_labels(state_labels, num_states) | |
if discount < 0.0 or discount > 1.0: | |
raise ValueError('The discount factor must be within 0.0 and 1.0') | |
self.stpm = stpm | |
self.reward_vec = reward_vec | |
self.discount = discount | |
def __initialize_state_labels(self, state_labels, num_states): | |
"""Initializes the labels given to each state in the MDP | |
""" | |
if state_labels is None: | |
self.state_labels = np.array( | |
[f'{i + 1}' for i in range(0, num_states)] | |
) | |
else: | |
state_labels_len = len(state_labels) | |
if state_labels_len != num_states: | |
raise ValueError( | |
'There must be exactly one label assigned to each state, but' + | |
f'the number of labels ({state_labels_len}) != the size of the state' + | |
f'transition matrix({num_states})' | |
) | |
self.state_labels = state_labels | |
# MARK: Python Methods | |
def __str__(self): | |
return str(self.stpm) | |
# MARK: Methods | |
def get_num_states(self): | |
"""Computes the number of states that make up this MDP | |
""" | |
return len(self.state_labels) | |
def estimate_state_value_function(self, steps: int): | |
"""Estimates the value of the state value function | |
`v` at every state in the reward process | |
@param steps: The number of steps for which to run the | |
estimation iteration | |
""" | |
if steps <= 0: | |
raise ValueError('At least one iteration required') | |
v = np.zeros(self.get_num_states()) | |
for j in range(steps): | |
v = self.reward_vec + self.discount * self.stpm @ v | |
return v | |
class RLPolicy: | |
"""A reinforcement learning policy that is used | |
to make decisions. | |
A policy in the context of a MDP is a probability distribution | |
over the actions and states of the MDP. A policy is interpreted | |
in a MDP as follows: | |
Given an MDP with |S| states and |A| actions, a policy | |
is an |S| x |A| matrix whose rows sum to 1. The element | |
`p_ij` in row `i` and column `j` gives the probability | |
that the MDP will take action `j` from state `i` | |
That is, a policy defines a probability distribution over | |
the possible actions that can be taken in an MDP for each | |
state of that MDP | |
""" | |
def __init__(self, policy): | |
"""Creates a new policy for a Markov Decision Process | |
(see `RLMarkovDecisionProcess`) | |
@param policy: An `N` x `A` matrix describing the probability | |
of taking actions (A) in each state (N). For an element at row `i` | |
and column `j`, the policy gives the probability of taking | |
action `j` in state `i`. Each row of the policy matrix must | |
sum to unity | |
""" | |
for row in policy: | |
validate_rows_are_unity(policy) | |
self.contents = policy | |
@staticmethod | |
def randomized_policy(shape): | |
# Arbitrary for this code (would be chosen | |
# more carefully) | |
ALPHA = 0.15 | |
height = shape[0] | |
width = shape[1] | |
policy = np.random.dirichlet( | |
np.repeat(ALPHA, width), | |
size=height | |
) | |
return RLPolicy(policy) | |
def __matmul__(self, other): | |
# Transfer matrix multiplication to the matrix itself | |
return self.contents @ other | |
def __str__(self): | |
return str(self.contents) | |
# MARK: Methods | |
def get_shape(self): | |
return self.contents.shape | |
class RLMarkovDecisionProcess: | |
"""Encapsulates the behavior of a Markov Decision Process | |
A Markov Decision Process (or MDP for short) is a 5-tuple | |
(S, A, P, R, y) where | |
1. S is a finite set of states that the system can | |
move into | |
2. A is a finite set of actions that can be taken | |
in any state | |
2. P is a state transition probability matrix. This | |
is a square matrix whose element in the `i`th row, | |
`j`th column, and `k`th depth specify the probability | |
of moving into state `j` from state `i` if action `k` | |
is taken | |
3. R is a reward function which assigns expected rewards | |
to each state and to each action that can be taken in | |
each state | |
4. 0 <= y <= 1 is the discount factor | |
""" | |
def __init__(self, state_labels, action_labels, stmpa, reward_matrix, discount=1.0): | |
"""Creates a new Markov Reward Process object encapsulating | |
the process's dynamics. | |
The MDP that is represented by this object has an action set | |
of size `A` and `N` states | |
@param state_labels: A vector of human-readable labels for each | |
of the states represented in the MRP. This is optional: if None | |
is supplied, the states are labeled 1 to `N` (one for each state) | |
@param action_labels: A vector of human-readable labels for each | |
of the actions that can be taken in any state in the MRP. This is | |
optional: if None is supplied, the states are labeled 1 to `N` | |
(one for each state) | |
@param stmpa: The 3D state transition probability matrix of dimensions | |
`A` x `N` x `N`. The initializers the size of the matrix to determine the | |
number (N) of states in the MRP. The matrix must be stochastic and be | |
of size `N` x `N` for each possible action (i.e. stmpa). | |
The interpretation is as follows: | |
The element `p_ijk` in row `i`, column `j`, and horizontal slice | |
`k` gives the probability that an agent acting in this MDP moves | |
into state `k` from state `j` if action `i` is taken | |
Each vertical slice represents the probability distribution over all | |
possible actions and destination states from the point of view of a | |
fixed actions `a` | |
@param reward_matrix: A matrix describing the expected reward assigned | |
to each state in the MDP. This must be a matrix of size `N` x `A` (where | |
each row specifies the state and the columns the rewards for each action | |
in that state) | |
The interpretation is as follows: | |
The element `p_ij` in row `i` and column `j` gives the reward | |
received by an agent taking action `j` from state `i` | |
@param discount: The discount factor of the MRP. The discount must be | |
a floating-point number between 0.0 and 1.0 | |
""" | |
# Each horizontal plane of `stmpa`` can be thought of as fixing the state `s` | |
# and looking at all possible ways to get from `s` to any other | |
# state `s'` using any of the actions | |
validate_stochastic_3d(stmpa) | |
num_actions, num_states, _ = stmpa.shape | |
rm_states, rm_num_actions = reward_matrix.shape | |
if num_states != rm_states: | |
raise ValueError( | |
'There must be at least one reward assigned to each state-action pair, but ' + | |
f'the number of rewards ({rm_states}) != the number of states in ' + | |
f'the state transition matrix ({num_states})' | |
) | |
if rm_num_actions != num_actions: | |
raise ValueError( | |
'There must be a reward for each action in each state, but ' + | |
f'the number of actions assigned values ({rm_num_actions}) != ' + | |
f'the number of actions in the transitions matrix ({num_actions})' | |
) | |
if discount < 0.0 or discount > 1.0: | |
raise ValueError('The discount factor must be within 0.0 and 1.0') | |
self.__initialize_state_labels(state_labels, num_states) | |
self.__initialize_action_labels(action_labels, num_actions) | |
self.stpma = stmpa | |
self.reward_matrix = reward_matrix | |
self.discount = discount | |
def __initialize_state_labels(self, state_labels, num_states): | |
"""Initializes the labels given to each state in the MDP | |
""" | |
if state_labels is None: | |
self.state_labels = np.array( | |
[f'{i + 1}' for i in range(0, num_states)] | |
) | |
else: | |
state_labels_len = len(state_labels) | |
if state_labels_len != num_states: | |
raise ValueError( | |
'There must be exactly one label assigned to each state, but' + | |
f'the number of labels ({state_labels_len}) != the size of the state' + | |
f'transition matrix({num_states})' | |
) | |
self.state_labels = state_labels | |
def __initialize_action_labels(self, action_labels, num_actions): | |
"""Initializes the labels given to each action in the MDP | |
""" | |
if action_labels is None: | |
self.action_labels = np.array( | |
[f'Action {i + 1}' for i in range(0, num_actions)] | |
) | |
else: | |
action_labels_len = len(action_labels) | |
if action_labels_len != num_actions: | |
raise ValueError( | |
'There must be exactly one action assigned to each reward, but' + | |
f'the number of rewards ({num_actions}) != the number of action labels' + | |
f'({action_labels_len})' | |
) | |
self.action_labels = action_labels | |
# MARK: Methods | |
def get_num_actions(self): | |
"""Computes the number of actions that can be taken | |
in any given state of this MDP | |
""" | |
return len(self.action_labels) | |
def get_num_states(self): | |
"""Computes the number of states that make up this MDP | |
""" | |
return len(self.state_labels) | |
def get_compatible_policy_shape(self): | |
"""Computes, as a tuple, the shape of | |
policies that can be used to describe how an | |
agent can behave in this Markov decision process | |
""" | |
return self.get_num_states(), self.get_num_actions() | |
def into_mrp(self, policy: RLPolicy) -> RLMarkovRewardProcess: | |
"""Generates a new MRP from this MDP by | |
acting according to the given policy | |
An MDP can be viewed of as an MRP under a policy | |
which descibes the likelihood with which actions | |
are take in any given state in the MDP. | |
@param policy: The policy to follow for each state | |
in this MDP. The policy must have the same dimensions | |
as the reward matrix (i.e. have `get_num_actions()` rows | |
and `get_num_states()` columns) | |
""" | |
p_shape = policy.get_shape() | |
expected_shape = self.get_compatible_policy_shape() | |
if p_shape != expected_shape: | |
raise ValueError( | |
'The given policy is not valid for this MDP. ' + | |
f'Expected a policy of shape {expected_shape} but ' + | |
f'received a policy of shape {p_shape}.\n' + | |
'Make sure that the policy defines a probability for each ' + | |
'state-action pair' | |
) | |
num_states = self.get_num_states() | |
# To compute R_π, we weight the rewards of travelling | |
# to each state by the policy. Only combinations along | |
# the diagonal are sensible; off-diagonal entries represent | |
# probabilties of actions taken in one state (s) but weights | |
# for a different state (s'). Note also the use of transpose: | |
# the reward matrix (`N` x `A`) lists the reward for each | |
# action in a particular state in a _row_ | |
R_π = np.diag(policy @ np.transpose(self.reward_matrix)) | |
# To compute P_π, we take yz-plane slices of the 3D matrix | |
# `stpma`. This gives 2D `A` x `N` matrices whose elements | |
# give the probabilities of moving into a fixed state | |
# `k` from a state `j` after taking action `i` (`p_ijk` with | |
# `k` fixed). The result weighted by the policy determines | |
# the likelihood of moving from state `j` into state `k`. | |
# Note again that off-diagonal results aren't meaningful, as they | |
# correspond to meaningless probabilities | |
# TODO: np.tensorproduct may accomplish this | |
# in a single step. What is currently written | |
# is correct but likely horribly inefficient since | |
# np.append() makes copies :( ) | |
P_π = np.array([[], []]) | |
for k in range(num_states): | |
yz_plane_slice = self.stpma[:, :, k] | |
P_π_j = np.diag(policy @ yz_plane_slice) | |
P_π = np.insert(P_π, k, P_π_j, axis=1) | |
return RLMarkovRewardProcess( | |
state_labels=self.state_labels, | |
stpm=P_π, | |
reward_vec=R_π, | |
discount=self.discount | |
) | |
def estimate_state_value_under_policy(self, policy, steps: int): | |
"""Estimates the value state function assigned to each | |
state if the given policy _policy_ were used by an agent | |
to navigate this MDP | |
A policy supplied to an MDP results in an MRP whose | |
expected values can be computed iteratively. This | |
method is known as `iterative policy evaluation` | |
and is guaranteed to converge via contraction mappings. | |
@param policy: The policy for which to estimate the state | |
value function | |
""" | |
mrp = self.into_mrp(policy) | |
return mrp.estimate_state_value_function(steps) | |
def __compute_action_value_with_estimated_state_values(self, value_vec): | |
"""Computes the action-value function matrix describing how much value | |
is expected when taking a particular action in a particular state | |
@param value_vec: An estimate of the state-value function for | |
each state in this MDP. The vector is assumed to be valid (viz. | |
obeying the properties of a valid state-value function) | |
@returns: An `N` x `A` matrix mapping actions to expected | |
values in each state. The element `r_ij` in row `i` and column | |
`j` gives the expected reward of taking action `j` from state `i` | |
""" | |
if len(value_vec) != self.get_num_states(): | |
raise ValueError( | |
f'The given value vector ({value_vec}) is not valid for ' + | |
f'this MDP ({self.get_num_states()} expected but {len(value_vec)})' + | |
'received)' | |
) | |
# NOTE: stpma is a 3D matrix; thus np.matmul() is performing matrix | |
# multiplication between a 3D matrix and a 1D vector, which behaves | |
# as we expect. Note, however, that the action-value vectors for any | |
# fixed action will be the row vectors of the resulting matrix, hence | |
# the transpose to give the expected outcome | |
return self.reward_matrix + self.discount * (np.transpose(np.matmul(self.stpma, value_vec))) | |
def evaluate_policy(self, policy: RLPolicy, steps: int): | |
"""Estimates both the state-value function assigned to each | |
state if the given policy _policy_ were used by an agent | |
to navigate this MDP, as well as the action-value | |
function (for each state and action pair) under this policy | |
A policy supplied to an MDP results in an MRP whose | |
expected values can be computed iteratively. This | |
method is known as `iterative policy evaluation` | |
and is guaranteed to converge via contraction mappings. | |
@param policy: The policy for which to estimate the state | |
value function | |
""" | |
value_func_estimates = self.estimate_state_value_under_policy( | |
policy, steps | |
) | |
action_func_estimates = self.__compute_action_value_with_estimated_state_values( | |
value_vec=value_func_estimates | |
) | |
return value_func_estimates, action_func_estimates | |
def create_greedy_policy_with(self, action_matrix) -> RLPolicy: | |
"""Creates a policy which at any given state chooses | |
the action that will maximize its value with respect | |
to the given value vector | |
A greedy policy can be constructed by assigning a | |
probability of 1.0 to the action in each state that | |
gives the largest expected reward while assigning | |
a probability of 0.0 to all other actions in that | |
state. | |
@param action_matrix: An `N` x `A` matrix containing | |
of the action-value function for each state in this MDP. | |
The element `r_ij` in row `i` and column `j` gives the expected | |
reward of taking action `j` from state `i` | |
@returns: A policy which greedily choses the action at | |
each state with the highest expected returns given the | |
randomness of the MDP and the values given in the action | |
matrix. | |
""" | |
num_states = self.get_num_states() | |
shape = self.get_compatible_policy_shape() | |
# An `N` x `A` matrix of 0s. Some of the 0s are set to 1.0 according to | |
# the argmax of action matrix | |
policy = np.zeros(shape) | |
unity_vec = np.ones(num_states) | |
walk_vertically_down = np.arange(num_states) | |
max_action_index_vector = np.argmax(action_matrix, axis=1) | |
# The `walk_vertically_down` vector moves down one | |
# step at a time. This ensures that the corresponding | |
# argmax index is indexing in the correct row. | |
# | |
# See the "Advanced Indexing" section at | |
# https://numpy.org/doc/stable/user/basics.indexing.html for | |
# more details. | |
policy[walk_vertically_down, max_action_index_vector] = unity_vec | |
return RLPolicy(policy) | |
def find_optimal_configuration(self, iterations: int, estimation_per_iteration: int): | |
"""Computes the optimal policy, state-value function, and action-value | |
function for this MDP using (modified) policy iteration | |
Policy iteration is a technique whereby an optimal policy (as well | |
as the optimal values for the state-value and action-value functions) | |
can be determined by intially starting with an arbitrary policy, | |
evaluating that policy, greedily taking actions that maximize | |
expected value from the last iteration, and repeating. Contraction mapping | |
ensures that this procedure converges to the optimal values | |
@param iterations: The number of times a new policy should be generated | |
@param estimation_per_iteration: The number of steps that are taken to | |
estimate the policy generated in each iteration in this MDP | |
""" | |
if iterations <= 0: | |
raise ValueError( | |
f'Expected a positive number of iterations but got {iterations}' | |
) | |
shape = self.get_compatible_policy_shape() | |
v_opt = np.zeros(self.get_num_states()) | |
q_opt = np.zeros(shape) | |
π_opt = RLPolicy.randomized_policy(shape) | |
for i in range(iterations): | |
v_opt, q_opt = self.evaluate_policy( | |
π_opt, estimation_per_iteration | |
) | |
π_opt = self.create_greedy_policy_with(q_opt) | |
return v_opt, q_opt, π_opt | |
# Example | |
test = np.array( | |
[ | |
# P(taking action 1 changes state from _ to _) | |
[ | |
[0.0, 1.0], | |
[0.0, 1.0] | |
], | |
# P(taking action 2 changes state from _ to _) | |
[ | |
[0.0, 1.0], | |
[0.0, 1.0] | |
], | |
# P(taking action 3 changes state from _ to _) | |
[ | |
[0.0, 1.0], | |
[0.0, 1.0] | |
] | |
]) | |
pmat = np.array( | |
[ | |
# π(each action in each state) | |
[0.1, 0.4, 0.5], | |
[0.5, 0.3, 0.2] | |
]) | |
rewards = np.array( | |
[ | |
# Going into each state for each action | |
[2.0, 1.0, 5.0], | |
[0.0, 0.0, 0.0] | |
]) | |
process = RLMarkovDecisionProcess(None, None, test, rewards) | |
policy = RLPolicy(pmat) | |
np.set_printoptions(formatter={'float': lambda x: "{0:0.3f}".format(x)}) | |
print(process.find_optimal_configuration(10, 100)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment