Skip to content

Instantly share code, notes, and snippets.

@maxwellpirtle
Last active January 17, 2023 20:35
Show Gist options
  • Save maxwellpirtle/6a4f0290852fa1f3cf5932a43a60adda to your computer and use it in GitHub Desktop.
Save maxwellpirtle/6a4f0290852fa1f3cf5932a43a60adda to your computer and use it in GitHub Desktop.
import sys
import math
try:
import numpy as np
print(f'Using numpy version {np.__version__}')
except:
print(f'Numpy is needed for this demo', file=sys.stderr)
exit(1)
def is_almost_unity(array, tolerance: float = 1.0e-9) -> bool:
"""Determines whether or not the given array-like
has a sum equal to 1.0 (within a specified tolerance)
@param array: An array-like whose sum is checked
@param tolerance: The error tolerance
"""
return math.isclose(
np.sum(array),
1.0,
rel_tol=tolerance
)
def validate_rows_are_unity(matrix: np.array):
"""Verifies that each row of the matrix sums
to 1.0
"""
# Verify each row sums to unity
rows = matrix.shape[0]
unity = all(
(
is_almost_unity(matrix[row])
for row in range(rows)
)
)
if not unity:
raise ValueError(
'Each row in the matrix must sum to 1 (i.e. the matrix must be stochastic)'
)
def validate_stochastic(stpm: np.array):
"""Verifies that the given numpy matrix is a
valid state transition probabilty matrix (i.e.
that the matrix is a stochastic matrix)
A stochastic matrix is one whose rows sum to
1.0 and is a square matrix
"""
stmp_rows = stpm.shape[0]
stmp_cols = stpm.shape[1]
if stmp_cols != stmp_rows:
raise ValueError(
'The state transition probability matrix must be a square' +
f' matrix, but rows ({stmp_rows}) != columns ({stmp_cols})'
)
validate_rows_are_unity(stpm)
def validate_stochastic_3d(stpma: np.array):
"""Verifies that the given 3D numpy matrix is a
valid state transition probabilty matrix (i.e.
that the matrix is a stochastic matrix) for an MDP
"""
stmp_planes, stmpa_rows, stmpa_cols = stpma.shape
if stmpa_cols != stmpa_rows:
raise ValueError(
'The state transition probability matrix must be a square' +
f' matrix for each plane, but rows ({stmpa_rows}) != columns ({stmpa_cols})'
)
for j in range(stmp_planes):
validate_stochastic(stpma[j])
class RLMarkovRewardProcess:
"""Encapsulates the behavior of a Markov Reward Process
A Markov Reward Process (or MRP for short) is a 4-tuple
(S, P, R, y) where
1. S is a finite set of states that the system can
move into
2. P is a state transition probability matrix. This
is a square matrix whose element in the `i`th row and
`j`th column specify the probability of moving into
state `j` from state `i`
3. R is a reward function which assigns expected rewards
to each state
4. 0 <= y <= 1 is the discount factor.
A Markov Reward Process has the important Markov property,
which can be described mathematically succintly as
P(S_(t+1) | S_t) = P(S_(t+1) | S_1, S_2, ..., S_t)
In other words, the "future is independent of the past given
the present" (David Silverman UCL Lecture Slides)
"""
def __init__(self, state_labels, stpm, reward_vec, discount: float):
"""Creates a new Markov Reward Process object encapsulating
the process's dynamics
@param state_labels: A vector of human-readable labels for each
of the states represented in the MRP. This is optional: if None
is supplied, the states are labeled 1 to N (for N states)
@param stmp: The state transition probability matrix. The initializer
uses the size of the matrix to determine the number (N) of states in the
MRP. The matrix must be square and have rows which sum to unity
@param reward_vec: A vector describing the expected reward assigned
to each statein the MRP. This must have size N
@param discount: The discount factor of the MRP. The discount must be
a floating-point number between 0.0 and 1.0
"""
validate_stochastic(stpm)
num_states, _ = stpm.shape
reward_vec_len = len(reward_vec)
if reward_vec_len != num_states:
raise ValueError(
'There must be exactly one reward assigned to each state, but' +
f'the number of rewards ({reward_vec_len}) != the size of the state' +
f'transition matrix({num_states})'
)
self.__initialize_state_labels(state_labels, num_states)
if discount < 0.0 or discount > 1.0:
raise ValueError('The discount factor must be within 0.0 and 1.0')
self.stpm = stpm
self.reward_vec = reward_vec
self.discount = discount
def __initialize_state_labels(self, state_labels, num_states):
"""Initializes the labels given to each state in the MDP
"""
if state_labels is None:
self.state_labels = np.array(
[f'{i + 1}' for i in range(0, num_states)]
)
else:
state_labels_len = len(state_labels)
if state_labels_len != num_states:
raise ValueError(
'There must be exactly one label assigned to each state, but' +
f'the number of labels ({state_labels_len}) != the size of the state' +
f'transition matrix({num_states})'
)
self.state_labels = state_labels
# MARK: Python Methods
def __str__(self):
return str(self.stpm)
# MARK: Methods
def get_num_states(self):
"""Computes the number of states that make up this MDP
"""
return len(self.state_labels)
def estimate_state_value_function(self, steps: int):
"""Estimates the value of the state value function
`v` at every state in the reward process
@param steps: The number of steps for which to run the
estimation iteration
"""
if steps <= 0:
raise ValueError('At least one iteration required')
v = np.zeros(self.get_num_states())
for j in range(steps):
v = self.reward_vec + self.discount * self.stpm @ v
return v
class RLPolicy:
"""A reinforcement learning policy that is used
to make decisions.
A policy in the context of a MDP is a probability distribution
over the actions and states of the MDP. A policy is interpreted
in a MDP as follows:
Given an MDP with |S| states and |A| actions, a policy
is an |S| x |A| matrix whose rows sum to 1. The element
`p_ij` in row `i` and column `j` gives the probability
that the MDP will take action `j` from state `i`
That is, a policy defines a probability distribution over
the possible actions that can be taken in an MDP for each
state of that MDP
"""
def __init__(self, policy):
"""Creates a new policy for a Markov Decision Process
(see `RLMarkovDecisionProcess`)
@param policy: An `N` x `A` matrix describing the probability
of taking actions (A) in each state (N). For an element at row `i`
and column `j`, the policy gives the probability of taking
action `j` in state `i`. Each row of the policy matrix must
sum to unity
"""
for row in policy:
validate_rows_are_unity(policy)
self.contents = policy
@staticmethod
def randomized_policy(shape):
# Arbitrary for this code (would be chosen
# more carefully)
ALPHA = 0.15
height = shape[0]
width = shape[1]
policy = np.random.dirichlet(
np.repeat(ALPHA, width),
size=height
)
return RLPolicy(policy)
def __matmul__(self, other):
# Transfer matrix multiplication to the matrix itself
return self.contents @ other
def __str__(self):
return str(self.contents)
# MARK: Methods
def get_shape(self):
return self.contents.shape
class RLMarkovDecisionProcess:
"""Encapsulates the behavior of a Markov Decision Process
A Markov Decision Process (or MDP for short) is a 5-tuple
(S, A, P, R, y) where
1. S is a finite set of states that the system can
move into
2. A is a finite set of actions that can be taken
in any state
2. P is a state transition probability matrix. This
is a square matrix whose element in the `i`th row,
`j`th column, and `k`th depth specify the probability
of moving into state `j` from state `i` if action `k`
is taken
3. R is a reward function which assigns expected rewards
to each state and to each action that can be taken in
each state
4. 0 <= y <= 1 is the discount factor
"""
def __init__(self, state_labels, action_labels, stmpa, reward_matrix, discount=1.0):
"""Creates a new Markov Reward Process object encapsulating
the process's dynamics.
The MDP that is represented by this object has an action set
of size `A` and `N` states
@param state_labels: A vector of human-readable labels for each
of the states represented in the MRP. This is optional: if None
is supplied, the states are labeled 1 to `N` (one for each state)
@param action_labels: A vector of human-readable labels for each
of the actions that can be taken in any state in the MRP. This is
optional: if None is supplied, the states are labeled 1 to `N`
(one for each state)
@param stmpa: The 3D state transition probability matrix of dimensions
`A` x `N` x `N`. The initializers the size of the matrix to determine the
number (N) of states in the MRP. The matrix must be stochastic and be
of size `N` x `N` for each possible action (i.e. stmpa).
The interpretation is as follows:
The element `p_ijk` in row `i`, column `j`, and horizontal slice
`k` gives the probability that an agent acting in this MDP moves
into state `k` from state `j` if action `i` is taken
Each vertical slice represents the probability distribution over all
possible actions and destination states from the point of view of a
fixed actions `a`
@param reward_matrix: A matrix describing the expected reward assigned
to each state in the MDP. This must be a matrix of size `N` x `A` (where
each row specifies the state and the columns the rewards for each action
in that state)
The interpretation is as follows:
The element `p_ij` in row `i` and column `j` gives the reward
received by an agent taking action `j` from state `i`
@param discount: The discount factor of the MRP. The discount must be
a floating-point number between 0.0 and 1.0
"""
# Each horizontal plane of `stmpa`` can be thought of as fixing the state `s`
# and looking at all possible ways to get from `s` to any other
# state `s'` using any of the actions
validate_stochastic_3d(stmpa)
num_actions, num_states, _ = stmpa.shape
rm_states, rm_num_actions = reward_matrix.shape
if num_states != rm_states:
raise ValueError(
'There must be at least one reward assigned to each state-action pair, but ' +
f'the number of rewards ({rm_states}) != the number of states in ' +
f'the state transition matrix ({num_states})'
)
if rm_num_actions != num_actions:
raise ValueError(
'There must be a reward for each action in each state, but ' +
f'the number of actions assigned values ({rm_num_actions}) != ' +
f'the number of actions in the transitions matrix ({num_actions})'
)
if discount < 0.0 or discount > 1.0:
raise ValueError('The discount factor must be within 0.0 and 1.0')
self.__initialize_state_labels(state_labels, num_states)
self.__initialize_action_labels(action_labels, num_actions)
self.stpma = stmpa
self.reward_matrix = reward_matrix
self.discount = discount
def __initialize_state_labels(self, state_labels, num_states):
"""Initializes the labels given to each state in the MDP
"""
if state_labels is None:
self.state_labels = np.array(
[f'{i + 1}' for i in range(0, num_states)]
)
else:
state_labels_len = len(state_labels)
if state_labels_len != num_states:
raise ValueError(
'There must be exactly one label assigned to each state, but' +
f'the number of labels ({state_labels_len}) != the size of the state' +
f'transition matrix({num_states})'
)
self.state_labels = state_labels
def __initialize_action_labels(self, action_labels, num_actions):
"""Initializes the labels given to each action in the MDP
"""
if action_labels is None:
self.action_labels = np.array(
[f'Action {i + 1}' for i in range(0, num_actions)]
)
else:
action_labels_len = len(action_labels)
if action_labels_len != num_actions:
raise ValueError(
'There must be exactly one action assigned to each reward, but' +
f'the number of rewards ({num_actions}) != the number of action labels' +
f'({action_labels_len})'
)
self.action_labels = action_labels
# MARK: Methods
def get_num_actions(self):
"""Computes the number of actions that can be taken
in any given state of this MDP
"""
return len(self.action_labels)
def get_num_states(self):
"""Computes the number of states that make up this MDP
"""
return len(self.state_labels)
def get_compatible_policy_shape(self):
"""Computes, as a tuple, the shape of
policies that can be used to describe how an
agent can behave in this Markov decision process
"""
return self.get_num_states(), self.get_num_actions()
def into_mrp(self, policy: RLPolicy) -> RLMarkovRewardProcess:
"""Generates a new MRP from this MDP by
acting according to the given policy
An MDP can be viewed of as an MRP under a policy
which descibes the likelihood with which actions
are take in any given state in the MDP.
@param policy: The policy to follow for each state
in this MDP. The policy must have the same dimensions
as the reward matrix (i.e. have `get_num_actions()` rows
and `get_num_states()` columns)
"""
p_shape = policy.get_shape()
expected_shape = self.get_compatible_policy_shape()
if p_shape != expected_shape:
raise ValueError(
'The given policy is not valid for this MDP. ' +
f'Expected a policy of shape {expected_shape} but ' +
f'received a policy of shape {p_shape}.\n' +
'Make sure that the policy defines a probability for each ' +
'state-action pair'
)
num_states = self.get_num_states()
# To compute R_π, we weight the rewards of travelling
# to each state by the policy. Only combinations along
# the diagonal are sensible; off-diagonal entries represent
# probabilties of actions taken in one state (s) but weights
# for a different state (s'). Note also the use of transpose:
# the reward matrix (`N` x `A`) lists the reward for each
# action in a particular state in a _row_
R_π = np.diag(policy @ np.transpose(self.reward_matrix))
# To compute P_π, we take yz-plane slices of the 3D matrix
# `stpma`. This gives 2D `A` x `N` matrices whose elements
# give the probabilities of moving into a fixed state
# `k` from a state `j` after taking action `i` (`p_ijk` with
# `k` fixed). The result weighted by the policy determines
# the likelihood of moving from state `j` into state `k`.
# Note again that off-diagonal results aren't meaningful, as they
# correspond to meaningless probabilities
# TODO: np.tensorproduct may accomplish this
# in a single step. What is currently written
# is correct but likely horribly inefficient since
# np.append() makes copies :( )
P_π = np.array([[], []])
for k in range(num_states):
yz_plane_slice = self.stpma[:, :, k]
P_π_j = np.diag(policy @ yz_plane_slice)
P_π = np.insert(P_π, k, P_π_j, axis=1)
return RLMarkovRewardProcess(
state_labels=self.state_labels,
stpm=P_π,
reward_vec=R_π,
discount=self.discount
)
def estimate_state_value_under_policy(self, policy, steps: int):
"""Estimates the value state function assigned to each
state if the given policy _policy_ were used by an agent
to navigate this MDP
A policy supplied to an MDP results in an MRP whose
expected values can be computed iteratively. This
method is known as `iterative policy evaluation`
and is guaranteed to converge via contraction mappings.
@param policy: The policy for which to estimate the state
value function
"""
mrp = self.into_mrp(policy)
return mrp.estimate_state_value_function(steps)
def __compute_action_value_with_estimated_state_values(self, value_vec):
"""Computes the action-value function matrix describing how much value
is expected when taking a particular action in a particular state
@param value_vec: An estimate of the state-value function for
each state in this MDP. The vector is assumed to be valid (viz.
obeying the properties of a valid state-value function)
@returns: An `N` x `A` matrix mapping actions to expected
values in each state. The element `r_ij` in row `i` and column
`j` gives the expected reward of taking action `j` from state `i`
"""
if len(value_vec) != self.get_num_states():
raise ValueError(
f'The given value vector ({value_vec}) is not valid for ' +
f'this MDP ({self.get_num_states()} expected but {len(value_vec)})' +
'received)'
)
# NOTE: stpma is a 3D matrix; thus np.matmul() is performing matrix
# multiplication between a 3D matrix and a 1D vector, which behaves
# as we expect. Note, however, that the action-value vectors for any
# fixed action will be the row vectors of the resulting matrix, hence
# the transpose to give the expected outcome
return self.reward_matrix + self.discount * (np.transpose(np.matmul(self.stpma, value_vec)))
def evaluate_policy(self, policy: RLPolicy, steps: int):
"""Estimates both the state-value function assigned to each
state if the given policy _policy_ were used by an agent
to navigate this MDP, as well as the action-value
function (for each state and action pair) under this policy
A policy supplied to an MDP results in an MRP whose
expected values can be computed iteratively. This
method is known as `iterative policy evaluation`
and is guaranteed to converge via contraction mappings.
@param policy: The policy for which to estimate the state
value function
"""
value_func_estimates = self.estimate_state_value_under_policy(
policy, steps
)
action_func_estimates = self.__compute_action_value_with_estimated_state_values(
value_vec=value_func_estimates
)
return value_func_estimates, action_func_estimates
def create_greedy_policy_with(self, action_matrix) -> RLPolicy:
"""Creates a policy which at any given state chooses
the action that will maximize its value with respect
to the given value vector
A greedy policy can be constructed by assigning a
probability of 1.0 to the action in each state that
gives the largest expected reward while assigning
a probability of 0.0 to all other actions in that
state.
@param action_matrix: An `N` x `A` matrix containing
of the action-value function for each state in this MDP.
The element `r_ij` in row `i` and column `j` gives the expected
reward of taking action `j` from state `i`
@returns: A policy which greedily choses the action at
each state with the highest expected returns given the
randomness of the MDP and the values given in the action
matrix.
"""
num_states = self.get_num_states()
shape = self.get_compatible_policy_shape()
# An `N` x `A` matrix of 0s. Some of the 0s are set to 1.0 according to
# the argmax of action matrix
policy = np.zeros(shape)
unity_vec = np.ones(num_states)
walk_vertically_down = np.arange(num_states)
max_action_index_vector = np.argmax(action_matrix, axis=1)
# The `walk_vertically_down` vector moves down one
# step at a time. This ensures that the corresponding
# argmax index is indexing in the correct row.
#
# See the "Advanced Indexing" section at
# https://numpy.org/doc/stable/user/basics.indexing.html for
# more details.
policy[walk_vertically_down, max_action_index_vector] = unity_vec
return RLPolicy(policy)
def find_optimal_configuration(self, iterations: int, estimation_per_iteration: int):
"""Computes the optimal policy, state-value function, and action-value
function for this MDP using (modified) policy iteration
Policy iteration is a technique whereby an optimal policy (as well
as the optimal values for the state-value and action-value functions)
can be determined by intially starting with an arbitrary policy,
evaluating that policy, greedily taking actions that maximize
expected value from the last iteration, and repeating. Contraction mapping
ensures that this procedure converges to the optimal values
@param iterations: The number of times a new policy should be generated
@param estimation_per_iteration: The number of steps that are taken to
estimate the policy generated in each iteration in this MDP
"""
if iterations <= 0:
raise ValueError(
f'Expected a positive number of iterations but got {iterations}'
)
shape = self.get_compatible_policy_shape()
v_opt = np.zeros(self.get_num_states())
q_opt = np.zeros(shape)
π_opt = RLPolicy.randomized_policy(shape)
for i in range(iterations):
v_opt, q_opt = self.evaluate_policy(
π_opt, estimation_per_iteration
)
π_opt = self.create_greedy_policy_with(q_opt)
return v_opt, q_opt, π_opt
# Example
test = np.array(
[
# P(taking action 1 changes state from _ to _)
[
[0.0, 1.0],
[0.0, 1.0]
],
# P(taking action 2 changes state from _ to _)
[
[0.0, 1.0],
[0.0, 1.0]
],
# P(taking action 3 changes state from _ to _)
[
[0.0, 1.0],
[0.0, 1.0]
]
])
pmat = np.array(
[
# π(each action in each state)
[0.1, 0.4, 0.5],
[0.5, 0.3, 0.2]
])
rewards = np.array(
[
# Going into each state for each action
[2.0, 1.0, 5.0],
[0.0, 0.0, 0.0]
])
process = RLMarkovDecisionProcess(None, None, test, rewards)
policy = RLPolicy(pmat)
np.set_printoptions(formatter={'float': lambda x: "{0:0.3f}".format(x)})
print(process.find_optimal_configuration(10, 100))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment