Skip to content

Instantly share code, notes, and snippets.

@kngwyu
Created February 18, 2021 04:50
Show Gist options
  • Save kngwyu/2660a9fed7eab75e55609516e0822d02 to your computer and use it in GitHub Desktop.
Save kngwyu/2660a9fed7eab75e55609516e0822d02 to your computer and use it in GitHub Desktop.
import dataclasses
from typing import Iterable, Optional, Protocol, Tuple
import numpy as np
Array = np.ndarray
@dataclasses.dataclass(frozen=True)
class Transition:
state: int
reward: float
class TabularMDP(Protocol):
n_states: int
n_actions: int
def step(self, action: int) -> Transition:
pass
def reset(self, seed: Optional[int] = None) -> int:
pass
class OneCycleMDP(TabularMDP):
def __init__(
self,
n_states: int,
noise_prob: float = 0.1,
seed: int = 0,
) -> None:
self.n_states = n_states
self.n_actions = 2
self._current_state = 0
self._noise_prob = noise_prob
self._seed = seed
self._rng = np.random.RandomState(seed)
def step(self, action: int) -> Transition:
assert action in [0, 1], f"Invalid Action {action}"
if self._current_state == self.n_states - 1 and action == 1:
reward = 1.0
else:
reward = 0.0
perturbation = self._rng.uniform() < self._noise_prob
if (action == 1) ^ perturbation:
self._current_state = (self._current_state + 1) % self.n_states
return Transition(self._current_state, reward)
def reset(self, seed: Optional[int] = None) -> int:
if seed is not None:
self._seed = seed
self._current_state = 0
self._rng = np.random.RandomState(self._seed)
return self._current_state
def p_and_r(self) -> Tuple[Array, Array]:
p = np.zeros((self.n_states, self.n_actions, self.n_states))
for i in range(self.n_states):
i1 = (i + 1) % self.n_states
p[i][0][i] = 1.0 - self._noise_prob
p[i][0][i1] = self._noise_prob
p[i][1][i] = self._noise_prob
p[i][1][i1] = 1.0 - self._noise_prob
r = np.zeros((self.n_states, self.n_actions))
r[-1][1] = 1.0
return p, r
def n_optimal_states(self, pi: Array) -> int:
if len(pi.shape) == 1: # Deterministic
return np.sum(pi == 1)
elif len(pi.shape) == 2:
THRESHOLD = 1e-3
return np.sum((pi[:, 1] + THRESHOLD) > 1.0)
else:
raise ValueError(f"Unsupported shape: {pi.shape}")
class TwoCycleMDP(TabularMDP):
def __init__(
self,
n_left_states: int,
noise_prob: float = 0.1,
seed: int = 0,
) -> None:
self.n_states = n_left_states * 3
self.n_actions = 2
self._starting_state = n_left_states
self._current_state = n_left_states
self._noise_prob = noise_prob
self._seed = seed
self._rng = np.random.RandomState(seed)
def step(self, action: int) -> Transition:
assert action in [0, 1], f"Invalid Action {action}"
if action == 1 and self._current_state == self.n_states - 1:
reward = 3.0
elif action == 0 and self._current_state == 0:
reward = 1.0
else:
reward = 0.0
goes_right = (action == 1) ^ (self._rng.uniform() < self._noise_prob)
if self._current_state == self._starting_state: # Center
if goes_right:
self._current_state += 1
else:
self._current_state -= 1
elif self._current_state < self._starting_state: # Left
if not goes_right:
if self._current_state < 0:
self._current_state = self._starting_state
else:
self._current_state -= 1
else: # Right
if goes_right:
if self._current_state == self.n_states - 1:
self._current_state = self._starting_state
else:
self._current_state += 1
return Transition(self._current_state, reward)
def reset(self, seed: Optional[int] = None) -> int:
if seed is not None:
self._seed = seed
self._current_state = self._starting_state
self._rng = np.random.RandomState(self._seed)
return self._current_state
def p_and_r(self) -> Tuple[Array, Array]:
p = np.zeros((self.n_states, self.n_actions, self.n_states))
p[self._starting_state][0][self._starting_state - 1] = 1.0 - self._noise_prob
p[self._starting_state][1][self._starting_state - 1] = self._noise_prob
p[self._starting_state][0][self._starting_state + 1] = self._noise_prob
p[self._starting_state][1][self._starting_state + 1] = 1.0 - self._noise_prob
for i in range(self._starting_state):
i1 = i - 1 if i > 0 else self._starting_state
p[i][0][i] = self._noise_prob
p[i][0][i1] = 1.0 - self._noise_prob
p[i][1][i] = 1.0 - self._noise_prob
p[i][1][i1] = self._noise_prob
for i in range(self._starting_state, self.n_states):
i1 = i + 1 if i < self.n_states - 1 else self._starting_state
p[i][0][i] = 1.0 - self._noise_prob
p[i][0][i1] = self._noise_prob
p[i][1][i] = self._noise_prob
p[i][1][i1] = 1.0 - self._noise_prob
r = np.zeros((self.n_states, self.n_actions))
r[0][0] = 1.0
r[self.n_states - 1][1] = 3.0
return p, r
def n_optimal_states(self, pi: Array) -> int:
left = pi[: self._starting_state]
right = pi[self._starting_state :]
if len(pi.shape) == 1: # Deterministic
return np.sum(left == 0) + np.sum(right == 1)
elif len(pi.shape) == 2:
THRESHOLD = 1e-3
return np.sum((left[:, 0] + THRESHOLD) > 1.0) + np.sum(
(right[:, 1] + THRESHOLD) > 1.0
)
else:
raise ValueError(f"Unsupported shape: {pi.shape}")
def solve_discounted(mdp: TabularMDP, gamma: float, threshold: float) -> Array:
p, r = mdp.p_and_r()
v = np.zeros(p.shape[0])
for n_iter in range(100000):
r_plus_gamma_pv = r + gamma * np.einsum("saS,S->sa", p, v)
v_next = r_plus_gamma_pv.max(axis=1)
if np.all(np.absolute(v_next - v) < threshold):
return v_next, r_plus_gamma_pv
v = v_next
raise RuntimeError("Value Iteration did not converge >_<")
def solve_undiscounted(mdp: TabularMDP, threshold: float) -> Array:
p, r = mdp.p_and_r()
v = np.zeros(p.shape[0])
prev_diff = np.ones_like(v) * np.inf
for n_iter in range(100000):
r_plus_pv = r + np.einsum("saS,S->sa", p, v)
v_next = r_plus_pv.max(axis=1)
diff = v_next - v
if np.all(np.absolute(diff - prev_diff) < threshold):
return v_next, r_plus_pv
v = v_next
prev_diff = diff
raise RuntimeError("Value Iteration did not converge >_<")
def epsilon_greedy(rng: np.random.RandomState, q_value: Array, epsilon: float) -> int:
if rng.uniform() < epsilon:
return rng.choice(range(q_value.shape[0]))
else:
return np.argmax(q_value)
@dataclasses.dataclass(frozen=True)
class Result:
rewards: Array
n_optimal_states: Array
def print_summary(self, n_states: int) -> None:
import click
click.secho("=====Result Summary=====", bg="black", fg="white", bold=True)
n_rewards = self.rewards.shape[0]
print(f"Return (total) : {np.sum(self.rewards)}")
print(f"Return (second half) : {np.sum(self.rewards[n_rewards // 2:])}")
if 0 < len(self.n_optimal_states):
(optimal_steps,) = np.where(self.n_optimal_states == n_states)
if len(optimal_steps) == 0:
print("Not Optimal >_<")
else:
print(f"Became optimal after {optimal_steps[0]} steps")
def plot_result(plots: Iterable[Tuple[str, dict]]) -> None:
import pandas as pd
import seaborn as sns
from matplotlib import pyplot as plt
for name, data in plots:
if "." in name:
raise ValueError("Please pass the filename without file extension")
plot = sns.lineplot(data=pd.DataFrame(data), ax=plt.figure(name).add_subplot())
plot.get_figure().savefig(name + ".pdf")
@dataclasses.dataclass(frozen=True)
class QLearningConfig:
alpha: float
gamma: float
epsilon_max: float
epsilon_min: float
normalized_update: bool
seed: int
@dataclasses.dataclass()
class QLearningAgent:
qvalue: Array
epsilon: float
def q_learning(
mdp: TabularMDP,
config: QLearningConfig,
n_steps: int,
) -> Tuple[Result, QLearningAgent]:
state = mdp.reset()
rng = np.random.RandomState(config.seed)
initital_q_value = rng.randn(mdp.n_states, mdp.n_actions)
agent = QLearningAgent(initital_q_value, config.epsilon_max)
epsilon_delta = (config.epsilon_max - config.epsilon_min) / n_steps
rewards = []
n_optimal_states = []
has_n_optimal_states = hasattr(mdp, "n_optimal_states")
for _ in range(n_steps):
action = epsilon_greedy(rng, agent.qvalue[state], agent.epsilon)
transition = mdp.step(action)
next_q_max = np.max(agent.qvalue[transition.state])
td_error = (
transition.reward + config.gamma * next_q_max - agent.qvalue[state, action]
)
# Update the agent
if config.normalized_update:
agent.qvalue[state, action] *= 1 - config.alpha
agent.qvalue[state, action] += config.alpha * td_error
agent.epsilon -= epsilon_delta
# Update the state
state = transition.state
rewards.append(transition.reward)
if has_n_optimal_states:
n_optimal_states.append(mdp.n_optimal_states(agent.qvalue.argmax(axis=1)))
result = Result(np.array(rewards), np.array(n_optimal_states))
return result, agent
@dataclasses.dataclass(frozen=True)
class RLearningResult(Result):
rhos: Array
@dataclasses.dataclass(frozen=True)
class RLearningConfig:
alpha: float
beta: float
epsilon_max: float
epsilon_min: float
normalized_update: bool
singh_modification: bool
seed: int
@dataclasses.dataclass()
class RLearningAgent:
rvalue: Array
rho: float
epsilon: float
def r_learning(
mdp: TabularMDP,
config: RLearningConfig,
n_steps: int,
) -> Tuple[RLearningResult, RLearningAgent]:
state = mdp.reset()
rng = np.random.RandomState(config.seed)
initital_r_value = rng.randn(mdp.n_states, mdp.n_actions)
agent = RLearningAgent(initital_r_value, 0.0, config.epsilon_max)
epsilon_delta = (config.epsilon_max - config.epsilon_min) / n_steps
rewards = []
n_optimal_states = []
rhos = []
has_n_optimal_states = hasattr(mdp, "n_optimal_states")
for _ in range(n_steps):
action = epsilon_greedy(rng, agent.rvalue[state], agent.epsilon)
transition = mdp.step(action)
next_r_max = np.max(agent.rvalue[transition.state])
r_td = transition.reward + next_r_max - agent.rho
if config.singh_modification:
rho_td = transition.reward + next_r_max - agent.rvalue[state, action]
else:
rho_td = transition.reward + next_r_max - np.max(agent.rvalue[state])
# Update the agent
if config.normalized_update:
agent.rvalue[state, action] *= 1 - config.alpha
agent.rho *= 1 - config.beta
agent.rvalue[state, action] += config.alpha * r_td
agent.rho += config.beta * rho_td
agent.epsilon -= epsilon_delta
# Update the state
state = transition.state
# Logging
rewards.append(transition.reward)
rhos.append(agent.rho)
if has_n_optimal_states:
n_optimal_states.append(mdp.n_optimal_states(agent.rvalue.argmax(axis=1)))
result = RLearningResult(
np.array(rewards),
np.array(n_optimal_states) if has_n_optimal_states else None,
np.array(rhos),
)
return result, agent
@dataclasses.dataclass(frozen=True)
class DiffQLearningConfig:
alpha: float
eta: float
epsilon_max: float
epsilon_min: float
normalized_update: bool
seed: int
@dataclasses.dataclass()
class DiffQLearningAgent:
qvalue: Array
rho: float
epsilon: float
DiffQLearningResult = RLearningResult
def diffq_learning(
mdp: TabularMDP,
config: DiffQLearningConfig,
n_steps: int,
) -> Tuple[DiffQLearningResult, DiffQLearningAgent]:
state = mdp.reset()
rng = np.random.RandomState(config.seed)
initital_r_value = rng.randn(mdp.n_states, mdp.n_actions)
agent = DiffQLearningAgent(initital_r_value, 0.0, config.epsilon_max)
epsilon_delta = (config.epsilon_max - config.epsilon_min) / n_steps
rewards = []
n_optimal_states = []
rhos = []
has_n_optimal_states = hasattr(mdp, "n_optimal_states")
alpha_eta = config.alpha * config.eta
for _ in range(n_steps):
action = epsilon_greedy(rng, agent.qvalue[state], agent.epsilon)
transition = mdp.step(action)
next_q_max = np.max(agent.qvalue[transition.state])
td_error = (
transition.reward - agent.rho + next_q_max - agent.qvalue[state, action]
)
# Update the agent
if config.normalized_update:
agent.qvalue[state, action] *= 1 - config.alpha
agent.rho *= 1 - alpha_eta
agent.qvalue[state, action] += config.alpha * td_error
agent.rho += alpha_eta * td_error
agent.epsilon -= epsilon_delta
# Update the state
state = transition.state
# Logging
rewards.append(transition.reward)
rhos.append(agent.rho)
if has_n_optimal_states:
n_optimal_states.append(mdp.n_optimal_states(agent.qvalue.argmax(axis=1)))
result = RLearningResult(
np.array(rewards),
np.array(n_optimal_states) if has_n_optimal_states else None,
np.array(rhos),
)
return result, agent
if __name__ == "__main__":
import typer
MDPS = {
"one": OneCycleMDP,
"onecycle": OneCycleMDP,
"two": TwoCycleMDP,
"twocycle": TwoCycleMDP,
}
app = typer.Typer()
def normalized(default: bool) -> typer.Option:
return typer.Option(
default, "--normalize/--unnormalize", "-N/-UN", is_flag=True
)
@app.command()
def q(
alpha: float = 0.2,
gamma: float = 0.99,
epsilon_max: float = 0.4,
epsilon_min: float = 0.1,
seed: int = 0,
n_states: int = 100,
n_steps: int = 100000,
noise_prob: float = 0.1,
normalized_update: bool = normalized(True),
mdp: str = "onecycle",
plot: bool = typer.Option(False, is_flag=True),
) -> None:
config = QLearningConfig(
alpha,
gamma,
epsilon_max,
epsilon_min,
normalized_update,
seed,
)
mdp = MDPS[mdp](n_states, noise_prob, seed)
result, agent = q_learning(mdp, config, n_steps)
result.print_summary(mdp.n_states)
if plot:
data = {
"Num. optimal states": result.n_optimal_states,
}
plot_result([("q-learning-opt", data)])
@app.command()
def r(
alpha: float = 0.2,
beta: float = 0.05,
epsilon_max: float = 0.4,
epsilon_min: float = 0.1,
seed: int = 0,
n_states: int = 100,
n_steps: int = 100000,
noise_prob: float = 0.1,
normalized_update: bool = normalized(True),
singh_modification: bool = typer.Option(False, "-SM", is_flag=True),
mdp: str = "onecycle",
plot: bool = typer.Option(False, is_flag=True),
) -> None:
config = RLearningConfig(
alpha,
beta,
epsilon_max,
epsilon_min,
normalized_update,
singh_modification,
seed,
)
mdp = MDPS[mdp](n_states, noise_prob, seed)
result, agent = r_learning(mdp, config, n_steps)
result.print_summary(mdp.n_states)
print(agent.rvalue.argmax(axis=1))
if plot:
nopt = {
"Num. optimal states": result.n_optimal_states,
}
rho = {"ρ": result.rhos}
plot_result([("r-learning-opt", nopt), ("r-learning-ρ", rho)])
@app.command()
def diffq(
alpha: float = 0.2,
eta: float = 0.25,
epsilon_max: float = 0.4,
epsilon_min: float = 0.1,
seed: int = 0,
n_states: int = 1000,
n_steps: int = 100000,
noise_prob: float = 0.1,
normalized_update: bool = normalized(False),
mdp: str = "onecycle",
plot: bool = typer.Option(False, is_flag=True),
) -> None:
config = DiffQLearningConfig(
alpha,
eta,
epsilon_max,
epsilon_min,
normalized_update,
seed,
)
mdp = MDPS[mdp](n_states, noise_prob, seed)
result, agent = diffq_learning(mdp, config, n_steps)
result.print_summary(mdp.n_states)
if plot:
nopt = {
"Num. optimal states": result.n_optimal_states,
}
rho = {"ρ": result.rhos}
plot_result([("diffq-learning-opt", nopt), ("diffq-learning-ρ", rho)])
@app.command()
def solve(
discounted: bool = typer.Option(True, "--discounted/--undiscounted"),
gamma: float = 0.99,
threshold: float = 1e-4,
n_states: int = 1000,
noise_prob: float = 0.1,
) -> None:
mdp = OneCycleMDP(n_states, noise_prob)
if discounted:
_, q = solve_discounted(mdp, gamma, threshold)
else:
_, q = solve_undiscounted(mdp, threshold)
print(q)
print(q.argmax(axis=1))
app()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment