Skip to content

Instantly share code, notes, and snippets.

@AutoRecursive
Last active May 30, 2024 07:58
Show Gist options
  • Save AutoRecursive/bcf39c59e942d8c4f2141871276849e9 to your computer and use it in GitHub Desktop.
Save AutoRecursive/bcf39c59e942d8c4f2141871276849e9 to your computer and use it in GitHub Desktop.
A simple CEM Trajecoty Optimizer based on taichi
import taichi as ti
import numpy as np
import time
from matplotlib import pyplot as plt
from tqdm import tqdm
# Set the taichi mode to "cuda" or "cpu"
ti.init(ti.cuda, random_seed=int(time.time()))
# Constants for the CEM solver
POPULATION_SIZE = 4096 # Number of samples in each iteration
ELITE_PERCENTAGE = 0.002 # Percentage of top samples to keep
NUM_ITERATIONS = 200 # Number of iterations
# x, y, v, theta, delta
STATE_DIM = 5
# acceleration, delta_rate
CONTROL_DIM = 2
# Initialize variables
TIME_STEP = 100
NUM_PARAMS = CONTROL_DIM * TIME_STEP # Number of parameters for the optimization problem
DT = 0.1
mean = ti.field(ti.f32, shape=NUM_PARAMS)
cov = ti.field(ti.f32, shape=(NUM_PARAMS, NUM_PARAMS))
samples = ti.Vector.field(n=NUM_PARAMS, dtype=ti.f32, shape=POPULATION_SIZE)
scores = ti.field(ti.f32, shape=POPULATION_SIZE)
sorted_indices = ti.field(ti.i32, shape=POPULATION_SIZE)
state = ti.Vector.field(n=STATE_DIM, dtype=ti.f32, shape=(POPULATION_SIZE, TIME_STEP))
# x, y, v, theta, delta
start_state = ti.field(ti.f32, shape=STATE_DIM)
start_state.from_numpy(np.array([0., 0., 5.0, 0., 0.]).astype(np.float32))
terminal_state = ti.field(ti.f32, shape=STATE_DIM)
terminal_state.from_numpy(np.array([TIME_STEP * 0.1 * 9.1, -3.75, 9.0, 0., 0.]).astype(np.float32))
@ti.kernel
def initiate_distributions():
for i in range(NUM_PARAMS):
mean[i] = 0.
for j in range(NUM_PARAMS):
cov[i, j] = 0.0
if i < TIME_STEP:
cov[i, i] = 3.0
else:
cov[i, i] = 0.3
@ti.kernel
def reset_sorted_indices():
for i in range(POPULATION_SIZE):
# initilization
sorted_indices[i] = i
@ti.func
def dynamics(pop_index):
# https://thomasfermi.github.io/Algorithms-for-Automated-Driving/Control/BicycleModel.html
for si in range(STATE_DIM):
state[pop_index, 0][si] = start_state[si]
for j in range(1, TIME_STEP):
state[pop_index, j][0] = state[pop_index, j - 1][0] + DT * state[pop_index, j - 1][2] * ti.math.cos(state[pop_index, j - 1][3])
state[pop_index, j][1] = state[pop_index, j - 1][1] + DT * state[pop_index, j - 1][2] * ti.math.sin(state[pop_index, j - 1][3])
state[pop_index, j][2] = state[pop_index, j - 1][2] + DT * samples[pop_index][j]
state[pop_index, j][2] = ti.min(ti.max(0.0, state[pop_index, j][2]), 22.2)
state[pop_index, j][3] = state[pop_index, j - 1][3] + DT * state[pop_index, j - 1][2] * ti.math.tan(state[pop_index, j - 1][4]) / 4.0
state[pop_index, j][4] = state[pop_index, j - 1][4] + DT * samples[pop_index][j + TIME_STEP]
state[pop_index, j][4] = ti.min(ti.max(-0.3, state[pop_index, j][4]), 0.3)
@ti.kernel
def generate_samples():
# Generate random samples from a multivariate normal distribution
for i in range(POPULATION_SIZE):
for j in range(NUM_PARAMS):
samples[i][j] = (ti.random(dtype=ti.f32) - 0.5) * 2 * cov[j, j] + mean[j]
dynamics(i)
@ti.kernel
def evaluate_samples():
ref_ind = int(abs(start_state[1] - terminal_state[1]) / 3.75 * 50)
# Evaluate the objective function for each sample
for i in range(POPULATION_SIZE):
scores[i] = (samples[i]**2).sum() * 10.
for j in range(TIME_STEP):
if j > ref_ind:
# terminal_weight = 1e3 if j == TIME_STEP - 1 else 1.
# scores[i] += 1 * (state[i, j][0] - terminal_state[0]) ** 2 # x
scores[i] += 10 * (state[i, j][1] - terminal_state[1]) ** 2 # y
# elif j < 20:
# scores[i] += 10 * (state[i, j][1] - start_state[1]) ** 2 # y
scores[i] += 1 * (state[i, j][2] - terminal_state[2]) ** 2 # v
scores[i] += 1e1 * (state[i, j][3] - terminal_state[3]) ** 2 # theta
scores[i] += 1e3 * (state[i, -1][1] - terminal_state[1])** 2
scores[i] += 1e5 * (state[i, -1][3] - terminal_state[3]) ** 2 # theta
scores[i] /= TIME_STEP
@ti.kernel
def update_distribution():
# Update the distribution parameters based on the top samples
num_elite = int(POPULATION_SIZE * ELITE_PERCENTAGE)
# Calculate new mean
for j in range(NUM_PARAMS):
mean[j] = 0.0
for i in range(num_elite):
mean[j] += samples[sorted_indices[i]][j]
mean[j] /= num_elite
# Calculate new covariance
# for j1 in range(num_params):
# for j2 in range(num_params):
# cov[j1, j2] = 0.0
# for i in range(num_elite):
# cov[j1, j2] += (samples[sorted_indices[i]][j1] - mean[j1]) * \
# (samples[sorted_indices[i]][j2] - mean[j2])
# cov[j1, j2] /= num_elite - 1
# if j1 == j2:
# cov[j1, j2] = ti.math.max(5e-1, cov[j1, j2])
for j in range(NUM_PARAMS):
cov[j, j] = 0.0
for i in range(num_elite):
cov[j, j] += (samples[sorted_indices[i]][j] - mean[j])**2
cov[j, j] /= num_elite - 1
if j < TIME_STEP:
cov[j, j] = ti.math.max(1e-3, cov[j, j])
else:
cov[j, j] = ti.math.max(1e-3, cov[j, j])
@ti.kernel
def terminate_condition() -> bool:
res = True
if abs(state[sorted_indices[0], -1][1] - terminal_state[1]) > 0.5 or \
abs(state[sorted_indices[0], -1][3] - terminal_state[3]) > 1e-1:
res = False
return res
def cem(warm_start=False, verbose=False):
min_score = 1e10
if not warm_start:
initiate_distributions()
# Main loop for CEM optimization
t0 = time.perf_counter()
for iteration in range(NUM_ITERATIONS):
if iteration % 5 == 1:
initiate_distributions()
reset_sorted_indices()
generate_samples()
# print(f"iteration: {iteration}, state: {state.to_numpy()}")
# print(f"Samples: {samples.to_numpy()}")
# break
# print(f"iteration: {iteration}, state: {state.to_numpy()}")
evaluate_samples()
ti.algorithms.parallel_sort(keys=scores, values=sorted_indices)
if terminate_condition():
# print(f"Converge in {iteration} iteration")
# print(f"{state.to_numpy()[sorted_indices.to_numpy()[0], -1]}")
break
# else:
# print("continue")
# print(f"it: {iteration}, score[0]: {scores.to_numpy()[0]}")
if scores.to_numpy()[0] < min_score + 50:
# print(f"States: {state.to_numpy()[sorted_indices.to_numpy()[0], -1]}")
min_score = scores.to_numpy()[0]
# print(f"iteration: {iteration}, scores: {scores.to_numpy()}\nsorted_indices: {sorted_indices.to_numpy()}")
update_distribution()
# print(f"Action Mean: {mean.to_numpy()}")
# print(f"iteration: {iteration}, cov: {cov.to_numpy()}")
# print("Covariance:", cov.to_numpy())
t1 = time.perf_counter()
if verbose:
print(f"duration: {(t1 - t0) * 1e3} ms")
# # Testing the CEM solver
# print(f"States: {state.to_numpy()[sorted_indices.to_numpy()[0]]}")
# print("Mean:", mean.to_numpy())
# print("Covariance:", cov.to_numpy())
success = True
if not terminate_condition():
success = False
print(f"Failed: {state.to_numpy()[sorted_indices.to_numpy()[0], -1]}")
# traj = state.to_numpy()[sorted_indices.to_numpy()[0]]
# plt.axis('equal')
# plt.plot(traj[:, 0], traj[:, 1])
# sorted_indices_np = sorted_indices.to_numpy()
# for t_i in range(1):
# traj = state.to_numpy()[sorted_indices_np[t_i]]
# plt.plot(traj[:, 0], traj[:, 1])
# print("Mean:", mean.to_numpy()[:TIME_STEP], mean.to_numpy()[TIME_STEP:])
plt.show()
return {'time': t1 - t0, 'iteration': iteration, 'success': success}
def plan(current_state=np.array([0., 0., 5.0, 0., 0.]).astype(np.float32)):
plan_states = []
start_state.from_numpy(current_state)
cem(False)
cached_plan = []
fail_count = 0
for _ in range(100):
res = cem(True)
if not res['success']:
fail_count += 1
if fail_count >= 3:
print(f'Fail at {current_state}')
if len(plan_states) > 0:
plan_states = np.array(plan_states)
plt.scatter(plan_states[:, 0], plan_states[:,1])
plt.axis('equal')
plt.show(block=True)
return False
if res['success'] and res['time'] < 0.1:
# perfect pose
cached_plan = state.to_numpy()[sorted_indices.to_numpy()[0]]
current_state = cached_plan[1]
start_state.from_numpy(current_state)
# print(current_state)
elif len(cached_plan) > 0:
cached_plan = cached_plan[1:]
current_state = cached_plan[1]
start_state.from_numpy(current_state)
else:
print(f'Fail at {current_state}')
plan_states = np.array(plan_states)
plt.scatter(plan_states[:, 0], plan_states[:,1])
plt.axis('equal')
plt.show(block=True)
return False
plan_states.append(current_state)
plan_states = np.array(plan_states)
return True
def one_frame_experiment():
print("Initialize Taichi for JIT")
cem()
print("=========Experiment Started=========")
EXPERIMENT_TIME = 50
res_list = [cem() for _ in range(EXPERIMENT_TIME)]
durations = np.percentile([res['time'] for res in res_list], 99)
print([res['iteration'] for res in res_list])
print(f"Success Rate: {sum([res['success'] for res in res_list]) / EXPERIMENT_TIME}")
print(f"duration: {durations * 1e3} ms")
# print(f"States: {state.to_numpy()[sorted_indices.to_numpy()[0]]}")
# print("Mean:", mean.to_numpy()[:TIME_STEP], mean.to_numpy()[TIME_STEP:])
# print("Covariance:", cov.to_numpy())
# plan()
# one_frame_experiment()
def plan_experiment():
EXPERIMENT_TIME = 50
start_state.from_numpy(np.array([0., 0., 5.0, 0., 0.]).astype(np.float32))
print(sum([plan() for _ in tqdm(range(EXPERIMENT_TIME))]) / EXPERIMENT_TIME)
plan_experiment()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment