Skip to content

Instantly share code, notes, and snippets.

@thowell
Last active January 11, 2024 04:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thowell/350aaf2c01f3a41706e1eb8dee673127 to your computer and use it in GitHub Desktop.
Save thowell/350aaf2c01f3a41706e1eb8dee673127 to your computer and use it in GitHub Desktop.
predictive sampling written in python with numpy
# Copyright 2024 Taylor Howell
from __future__ import annotations
import numpy as np
# predictive sampling
# https://arxiv.org/abs/2212.00541
# policy class for predictive sampling
class Policy:
# initialize policy
def __init__(
self,
nstate: int,
naction: int,
horizon: float,
splinestep: float,
limits: np.array,
):
self._nstate = nstate
self._naction = naction
self._splinestep = splinestep
self._horizon = horizon
self._nspline = int(horizon / splinestep) + 1
self._parameters = np.zeros((self._naction, self._nspline), dtype=float)
self._times = np.array(
[t * self._splinestep for t in range(self._nspline)], dtype=float
)
self._limits = limits
# clamp actions within limits
def clamp(self, action: np.array) -> np.array:
return np.minimum(
np.maximum(action, self._limits[:, 0]), self._limits[:, 1]
)
# get action from policy
def action(self, time: float) -> np.array:
# current time below lower bound
if time <= self._times[0]:
return self.clamp(self._parameters[:, 0])
# find interval
for i in range(self._nspline - 1):
if time >= self._times[i] and time < self._times[i + 1]:
return self.clamp(self._parameters[:, i])
# current time above upper bound
return self.clamp(self._parameters[:, -1])
# resample policy plan from current time
def resample(self, time: float):
# new parameters and times
p = np.zeros((self._naction, self._nspline), dtype=float)
t = np.array(
[i * self._splinestep + time for i in range(self._nspline)], dtype=float
)
# get new actions
for i in range(self._nspline):
a = self.action(t[i])
p[:, i] = a
# update
self._parameters = p
self._times = t
# add zero-mean Gaussian noise to policy parameters
def add_noise(self, scale: float):
self._parameters += np.random.normal(
scale=scale, size=(self._naction, self._nspline)
)
self._parameters = self.clamp(self._parameters)
# return a copy of the policy with noisy parameters
def noisy_copy(self, scale: float) -> Policy:
# create new policy object
policy = Policy(
self._nstate,
self._naction,
self._horizon,
self._splinestep,
self._limits,
)
# copy policy parameters into new object
policy._parameters = np.copy(self._parameters)
# get noisy parameters
policy.add_noise(scale)
return policy
# rollout
def rollout(
state: np.array,
time: float,
dynamics: function,
reward: function,
policy: Policy,
horizon: float,
timestep: float,
) -> float:
# number of steps
steps = int(horizon / timestep)
# initialize reward
total_reward = 0.0
# simulate
for _ in range(steps):
# get action from policy
action = policy.action(time)
# evaluate current reward
total_reward += reward(state, action)
# step dynamics
state = dynamics(state, action, timestep)
time += timestep
# terminal reward
total_reward += reward(state, None)
return total_reward / (steps + 1)
# predictive sampling planner class
class Planner:
# initialize planner
def __init__(
self,
nstate: int,
naction: int,
dynamics: function,
reward: function,
horizon: float,
splinestep: float,
planstep: float,
nsample: int,
noise_scale: float,
nimprove: int,
limits: np.array,
):
self._dynamics = dynamics
self._reward = reward
self._horizon = horizon
self.policy = Policy(nstate, naction, self._horizon, splinestep, limits)
self._planstep = planstep
self._nsample = nsample
self._noise_scale = noise_scale
self._nimprove = nimprove
# action from policy
def action_from_policy(self, time: float) -> np.array:
return self.policy.action(time)
# improve policy
def improve_policy(self, state: np.array, time: float):
# resample policy
self.policy.resample(time)
for _ in range(self._nimprove):
# evaluate nominal policy
reward_nominal = rollout(
state,
time,
self._dynamics,
self._reward,
self.policy,
self._horizon,
self._planstep,
)
# evaluate noisy policies
policies = []
rewards = []
for _ in range(self._nsample):
# noisy policy
noisy_policy = self.policy.noisy_copy(self._noise_scale)
noisy_reward = rollout(
state,
time,
self._dynamics,
self._reward,
noisy_policy,
self._horizon,
self._planstep,
)
# collect result
policies.append(noisy_policy)
rewards.append(noisy_reward)
# find best policy
idx = np.argmax(rewards)
# return new policy
if rewards[idx] > reward_nominal:
self.policy = policies[idx]
if __name__ == "__main__":
print("Predictive Sampling")
# problem dimensions
nstate = 2
naction = 1
# dynamics
def dynamics(state, action, timestep):
return np.array([state[0] + state[1] * timestep, state[1] + action[0]])
# reward
goal = np.array([1.0, 0.0])
def reward(state, action):
r0 = -((state[0] - goal[0]) ** 2) # target position
r1 = -((state[1] - goal[1]) ** 2) # small velocity
r2 = -action[0] ** 2 if action is not None else 0.0 # small action
return 1.0 * r0 + 0.01 * r1 + 0.001 * r2
# planner
horizon = 0.25
splinestep = 0.1
planstep = 0.025
nimprove = 2
nsample = 4
noise_scale = 0.025
limits = np.array([[-0.1, 0.1]])
planner = Planner(
nstate,
naction,
dynamics,
reward,
horizon,
splinestep,
planstep,
nsample,
noise_scale,
nimprove,
limits,
)
# simulate
state = np.array([0.0, 0.0])
time = 0.0
timestep = 0.01
steps = 500
# cache
states = [state]
actions = []
for t in range(steps):
## predictive sampling
# improve policy
planner.improve_policy(state, time)
# get action from policy
action = planner.action_from_policy(time)
## random action
# action = np.random.normal(scale=0.1, size=naction)
print("step : ", t)
print("state : ", state)
print("action: ", action)
# step
state = dynamics(state, action, timestep)
time += timestep
# cache
states.append(state)
actions.append(action)
print("\nfinal state: ", states[-1])
print("goal state : ", goal)
print("state error: ", np.linalg.norm(states[-1] - goal))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment