Last active
January 11, 2024 04:47
-
-
Save thowell/350aaf2c01f3a41706e1eb8dee673127 to your computer and use it in GitHub Desktop.
predictive sampling written in python with numpy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright 2024 Taylor Howell | |
from __future__ import annotations | |
import numpy as np | |
# predictive sampling | |
# https://arxiv.org/abs/2212.00541 | |
# policy class for predictive sampling | |
class Policy: | |
# initialize policy | |
def __init__( | |
self, | |
nstate: int, | |
naction: int, | |
horizon: float, | |
splinestep: float, | |
limits: np.array, | |
): | |
self._nstate = nstate | |
self._naction = naction | |
self._splinestep = splinestep | |
self._horizon = horizon | |
self._nspline = int(horizon / splinestep) + 1 | |
self._parameters = np.zeros((self._naction, self._nspline), dtype=float) | |
self._times = np.array( | |
[t * self._splinestep for t in range(self._nspline)], dtype=float | |
) | |
self._limits = limits | |
# clamp actions within limits | |
def clamp(self, action: np.array) -> np.array: | |
return np.minimum( | |
np.maximum(action, self._limits[:, 0]), self._limits[:, 1] | |
) | |
# get action from policy | |
def action(self, time: float) -> np.array: | |
# current time below lower bound | |
if time <= self._times[0]: | |
return self.clamp(self._parameters[:, 0]) | |
# find interval | |
for i in range(self._nspline - 1): | |
if time >= self._times[i] and time < self._times[i + 1]: | |
return self.clamp(self._parameters[:, i]) | |
# current time above upper bound | |
return self.clamp(self._parameters[:, -1]) | |
# resample policy plan from current time | |
def resample(self, time: float): | |
# new parameters and times | |
p = np.zeros((self._naction, self._nspline), dtype=float) | |
t = np.array( | |
[i * self._splinestep + time for i in range(self._nspline)], dtype=float | |
) | |
# get new actions | |
for i in range(self._nspline): | |
a = self.action(t[i]) | |
p[:, i] = a | |
# update | |
self._parameters = p | |
self._times = t | |
# add zero-mean Gaussian noise to policy parameters | |
def add_noise(self, scale: float): | |
self._parameters += np.random.normal( | |
scale=scale, size=(self._naction, self._nspline) | |
) | |
self._parameters = self.clamp(self._parameters) | |
# return a copy of the policy with noisy parameters | |
def noisy_copy(self, scale: float) -> Policy: | |
# create new policy object | |
policy = Policy( | |
self._nstate, | |
self._naction, | |
self._horizon, | |
self._splinestep, | |
self._limits, | |
) | |
# copy policy parameters into new object | |
policy._parameters = np.copy(self._parameters) | |
# get noisy parameters | |
policy.add_noise(scale) | |
return policy | |
# rollout | |
def rollout( | |
state: np.array, | |
time: float, | |
dynamics: function, | |
reward: function, | |
policy: Policy, | |
horizon: float, | |
timestep: float, | |
) -> float: | |
# number of steps | |
steps = int(horizon / timestep) | |
# initialize reward | |
total_reward = 0.0 | |
# simulate | |
for _ in range(steps): | |
# get action from policy | |
action = policy.action(time) | |
# evaluate current reward | |
total_reward += reward(state, action) | |
# step dynamics | |
state = dynamics(state, action, timestep) | |
time += timestep | |
# terminal reward | |
total_reward += reward(state, None) | |
return total_reward / (steps + 1) | |
# predictive sampling planner class | |
class Planner: | |
# initialize planner | |
def __init__( | |
self, | |
nstate: int, | |
naction: int, | |
dynamics: function, | |
reward: function, | |
horizon: float, | |
splinestep: float, | |
planstep: float, | |
nsample: int, | |
noise_scale: float, | |
nimprove: int, | |
limits: np.array, | |
): | |
self._dynamics = dynamics | |
self._reward = reward | |
self._horizon = horizon | |
self.policy = Policy(nstate, naction, self._horizon, splinestep, limits) | |
self._planstep = planstep | |
self._nsample = nsample | |
self._noise_scale = noise_scale | |
self._nimprove = nimprove | |
# action from policy | |
def action_from_policy(self, time: float) -> np.array: | |
return self.policy.action(time) | |
# improve policy | |
def improve_policy(self, state: np.array, time: float): | |
# resample policy | |
self.policy.resample(time) | |
for _ in range(self._nimprove): | |
# evaluate nominal policy | |
reward_nominal = rollout( | |
state, | |
time, | |
self._dynamics, | |
self._reward, | |
self.policy, | |
self._horizon, | |
self._planstep, | |
) | |
# evaluate noisy policies | |
policies = [] | |
rewards = [] | |
for _ in range(self._nsample): | |
# noisy policy | |
noisy_policy = self.policy.noisy_copy(self._noise_scale) | |
noisy_reward = rollout( | |
state, | |
time, | |
self._dynamics, | |
self._reward, | |
noisy_policy, | |
self._horizon, | |
self._planstep, | |
) | |
# collect result | |
policies.append(noisy_policy) | |
rewards.append(noisy_reward) | |
# find best policy | |
idx = np.argmax(rewards) | |
# return new policy | |
if rewards[idx] > reward_nominal: | |
self.policy = policies[idx] | |
if __name__ == "__main__": | |
print("Predictive Sampling") | |
# problem dimensions | |
nstate = 2 | |
naction = 1 | |
# dynamics | |
def dynamics(state, action, timestep): | |
return np.array([state[0] + state[1] * timestep, state[1] + action[0]]) | |
# reward | |
goal = np.array([1.0, 0.0]) | |
def reward(state, action): | |
r0 = -((state[0] - goal[0]) ** 2) # target position | |
r1 = -((state[1] - goal[1]) ** 2) # small velocity | |
r2 = -action[0] ** 2 if action is not None else 0.0 # small action | |
return 1.0 * r0 + 0.01 * r1 + 0.001 * r2 | |
# planner | |
horizon = 0.25 | |
splinestep = 0.1 | |
planstep = 0.025 | |
nimprove = 2 | |
nsample = 4 | |
noise_scale = 0.025 | |
limits = np.array([[-0.1, 0.1]]) | |
planner = Planner( | |
nstate, | |
naction, | |
dynamics, | |
reward, | |
horizon, | |
splinestep, | |
planstep, | |
nsample, | |
noise_scale, | |
nimprove, | |
limits, | |
) | |
# simulate | |
state = np.array([0.0, 0.0]) | |
time = 0.0 | |
timestep = 0.01 | |
steps = 500 | |
# cache | |
states = [state] | |
actions = [] | |
for t in range(steps): | |
## predictive sampling | |
# improve policy | |
planner.improve_policy(state, time) | |
# get action from policy | |
action = planner.action_from_policy(time) | |
## random action | |
# action = np.random.normal(scale=0.1, size=naction) | |
print("step : ", t) | |
print("state : ", state) | |
print("action: ", action) | |
# step | |
state = dynamics(state, action, timestep) | |
time += timestep | |
# cache | |
states.append(state) | |
actions.append(action) | |
print("\nfinal state: ", states[-1]) | |
print("goal state : ", goal) | |
print("state error: ", np.linalg.norm(states[-1] - goal)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment