Last active
May 30, 2024 07:58
-
-
Save AutoRecursive/bcf39c59e942d8c4f2141871276849e9 to your computer and use it in GitHub Desktop.
A simple CEM Trajecoty Optimizer based on taichi
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import taichi as ti | |
import numpy as np | |
import time | |
from matplotlib import pyplot as plt | |
from tqdm import tqdm | |
# Set the taichi mode to "cuda" or "cpu" | |
ti.init(ti.cuda, random_seed=int(time.time())) | |
# Constants for the CEM solver | |
POPULATION_SIZE = 4096 # Number of samples in each iteration | |
ELITE_PERCENTAGE = 0.002 # Percentage of top samples to keep | |
NUM_ITERATIONS = 200 # Number of iterations | |
# x, y, v, theta, delta | |
STATE_DIM = 5 | |
# acceleration, delta_rate | |
CONTROL_DIM = 2 | |
# Initialize variables | |
TIME_STEP = 100 | |
NUM_PARAMS = CONTROL_DIM * TIME_STEP # Number of parameters for the optimization problem | |
DT = 0.1 | |
mean = ti.field(ti.f32, shape=NUM_PARAMS) | |
cov = ti.field(ti.f32, shape=(NUM_PARAMS, NUM_PARAMS)) | |
samples = ti.Vector.field(n=NUM_PARAMS, dtype=ti.f32, shape=POPULATION_SIZE) | |
scores = ti.field(ti.f32, shape=POPULATION_SIZE) | |
sorted_indices = ti.field(ti.i32, shape=POPULATION_SIZE) | |
state = ti.Vector.field(n=STATE_DIM, dtype=ti.f32, shape=(POPULATION_SIZE, TIME_STEP)) | |
# x, y, v, theta, delta | |
start_state = ti.field(ti.f32, shape=STATE_DIM) | |
start_state.from_numpy(np.array([0., 0., 5.0, 0., 0.]).astype(np.float32)) | |
terminal_state = ti.field(ti.f32, shape=STATE_DIM) | |
terminal_state.from_numpy(np.array([TIME_STEP * 0.1 * 9.1, -3.75, 9.0, 0., 0.]).astype(np.float32)) | |
@ti.kernel | |
def initiate_distributions(): | |
for i in range(NUM_PARAMS): | |
mean[i] = 0. | |
for j in range(NUM_PARAMS): | |
cov[i, j] = 0.0 | |
if i < TIME_STEP: | |
cov[i, i] = 3.0 | |
else: | |
cov[i, i] = 0.3 | |
@ti.kernel | |
def reset_sorted_indices(): | |
for i in range(POPULATION_SIZE): | |
# initilization | |
sorted_indices[i] = i | |
@ti.func | |
def dynamics(pop_index): | |
# https://thomasfermi.github.io/Algorithms-for-Automated-Driving/Control/BicycleModel.html | |
for si in range(STATE_DIM): | |
state[pop_index, 0][si] = start_state[si] | |
for j in range(1, TIME_STEP): | |
state[pop_index, j][0] = state[pop_index, j - 1][0] + DT * state[pop_index, j - 1][2] * ti.math.cos(state[pop_index, j - 1][3]) | |
state[pop_index, j][1] = state[pop_index, j - 1][1] + DT * state[pop_index, j - 1][2] * ti.math.sin(state[pop_index, j - 1][3]) | |
state[pop_index, j][2] = state[pop_index, j - 1][2] + DT * samples[pop_index][j] | |
state[pop_index, j][2] = ti.min(ti.max(0.0, state[pop_index, j][2]), 22.2) | |
state[pop_index, j][3] = state[pop_index, j - 1][3] + DT * state[pop_index, j - 1][2] * ti.math.tan(state[pop_index, j - 1][4]) / 4.0 | |
state[pop_index, j][4] = state[pop_index, j - 1][4] + DT * samples[pop_index][j + TIME_STEP] | |
state[pop_index, j][4] = ti.min(ti.max(-0.3, state[pop_index, j][4]), 0.3) | |
@ti.kernel | |
def generate_samples(): | |
# Generate random samples from a multivariate normal distribution | |
for i in range(POPULATION_SIZE): | |
for j in range(NUM_PARAMS): | |
samples[i][j] = (ti.random(dtype=ti.f32) - 0.5) * 2 * cov[j, j] + mean[j] | |
dynamics(i) | |
@ti.kernel | |
def evaluate_samples(): | |
ref_ind = int(abs(start_state[1] - terminal_state[1]) / 3.75 * 50) | |
# Evaluate the objective function for each sample | |
for i in range(POPULATION_SIZE): | |
scores[i] = (samples[i]**2).sum() * 10. | |
for j in range(TIME_STEP): | |
if j > ref_ind: | |
# terminal_weight = 1e3 if j == TIME_STEP - 1 else 1. | |
# scores[i] += 1 * (state[i, j][0] - terminal_state[0]) ** 2 # x | |
scores[i] += 10 * (state[i, j][1] - terminal_state[1]) ** 2 # y | |
# elif j < 20: | |
# scores[i] += 10 * (state[i, j][1] - start_state[1]) ** 2 # y | |
scores[i] += 1 * (state[i, j][2] - terminal_state[2]) ** 2 # v | |
scores[i] += 1e1 * (state[i, j][3] - terminal_state[3]) ** 2 # theta | |
scores[i] += 1e3 * (state[i, -1][1] - terminal_state[1])** 2 | |
scores[i] += 1e5 * (state[i, -1][3] - terminal_state[3]) ** 2 # theta | |
scores[i] /= TIME_STEP | |
@ti.kernel | |
def update_distribution(): | |
# Update the distribution parameters based on the top samples | |
num_elite = int(POPULATION_SIZE * ELITE_PERCENTAGE) | |
# Calculate new mean | |
for j in range(NUM_PARAMS): | |
mean[j] = 0.0 | |
for i in range(num_elite): | |
mean[j] += samples[sorted_indices[i]][j] | |
mean[j] /= num_elite | |
# Calculate new covariance | |
# for j1 in range(num_params): | |
# for j2 in range(num_params): | |
# cov[j1, j2] = 0.0 | |
# for i in range(num_elite): | |
# cov[j1, j2] += (samples[sorted_indices[i]][j1] - mean[j1]) * \ | |
# (samples[sorted_indices[i]][j2] - mean[j2]) | |
# cov[j1, j2] /= num_elite - 1 | |
# if j1 == j2: | |
# cov[j1, j2] = ti.math.max(5e-1, cov[j1, j2]) | |
for j in range(NUM_PARAMS): | |
cov[j, j] = 0.0 | |
for i in range(num_elite): | |
cov[j, j] += (samples[sorted_indices[i]][j] - mean[j])**2 | |
cov[j, j] /= num_elite - 1 | |
if j < TIME_STEP: | |
cov[j, j] = ti.math.max(1e-3, cov[j, j]) | |
else: | |
cov[j, j] = ti.math.max(1e-3, cov[j, j]) | |
@ti.kernel | |
def terminate_condition() -> bool: | |
res = True | |
if abs(state[sorted_indices[0], -1][1] - terminal_state[1]) > 0.5 or \ | |
abs(state[sorted_indices[0], -1][3] - terminal_state[3]) > 1e-1: | |
res = False | |
return res | |
def cem(warm_start=False, verbose=False): | |
min_score = 1e10 | |
if not warm_start: | |
initiate_distributions() | |
# Main loop for CEM optimization | |
t0 = time.perf_counter() | |
for iteration in range(NUM_ITERATIONS): | |
if iteration % 5 == 1: | |
initiate_distributions() | |
reset_sorted_indices() | |
generate_samples() | |
# print(f"iteration: {iteration}, state: {state.to_numpy()}") | |
# print(f"Samples: {samples.to_numpy()}") | |
# break | |
# print(f"iteration: {iteration}, state: {state.to_numpy()}") | |
evaluate_samples() | |
ti.algorithms.parallel_sort(keys=scores, values=sorted_indices) | |
if terminate_condition(): | |
# print(f"Converge in {iteration} iteration") | |
# print(f"{state.to_numpy()[sorted_indices.to_numpy()[0], -1]}") | |
break | |
# else: | |
# print("continue") | |
# print(f"it: {iteration}, score[0]: {scores.to_numpy()[0]}") | |
if scores.to_numpy()[0] < min_score + 50: | |
# print(f"States: {state.to_numpy()[sorted_indices.to_numpy()[0], -1]}") | |
min_score = scores.to_numpy()[0] | |
# print(f"iteration: {iteration}, scores: {scores.to_numpy()}\nsorted_indices: {sorted_indices.to_numpy()}") | |
update_distribution() | |
# print(f"Action Mean: {mean.to_numpy()}") | |
# print(f"iteration: {iteration}, cov: {cov.to_numpy()}") | |
# print("Covariance:", cov.to_numpy()) | |
t1 = time.perf_counter() | |
if verbose: | |
print(f"duration: {(t1 - t0) * 1e3} ms") | |
# # Testing the CEM solver | |
# print(f"States: {state.to_numpy()[sorted_indices.to_numpy()[0]]}") | |
# print("Mean:", mean.to_numpy()) | |
# print("Covariance:", cov.to_numpy()) | |
success = True | |
if not terminate_condition(): | |
success = False | |
print(f"Failed: {state.to_numpy()[sorted_indices.to_numpy()[0], -1]}") | |
# traj = state.to_numpy()[sorted_indices.to_numpy()[0]] | |
# plt.axis('equal') | |
# plt.plot(traj[:, 0], traj[:, 1]) | |
# sorted_indices_np = sorted_indices.to_numpy() | |
# for t_i in range(1): | |
# traj = state.to_numpy()[sorted_indices_np[t_i]] | |
# plt.plot(traj[:, 0], traj[:, 1]) | |
# print("Mean:", mean.to_numpy()[:TIME_STEP], mean.to_numpy()[TIME_STEP:]) | |
plt.show() | |
return {'time': t1 - t0, 'iteration': iteration, 'success': success} | |
def plan(current_state=np.array([0., 0., 5.0, 0., 0.]).astype(np.float32)): | |
plan_states = [] | |
start_state.from_numpy(current_state) | |
cem(False) | |
cached_plan = [] | |
fail_count = 0 | |
for _ in range(100): | |
res = cem(True) | |
if not res['success']: | |
fail_count += 1 | |
if fail_count >= 3: | |
print(f'Fail at {current_state}') | |
if len(plan_states) > 0: | |
plan_states = np.array(plan_states) | |
plt.scatter(plan_states[:, 0], plan_states[:,1]) | |
plt.axis('equal') | |
plt.show(block=True) | |
return False | |
if res['success'] and res['time'] < 0.1: | |
# perfect pose | |
cached_plan = state.to_numpy()[sorted_indices.to_numpy()[0]] | |
current_state = cached_plan[1] | |
start_state.from_numpy(current_state) | |
# print(current_state) | |
elif len(cached_plan) > 0: | |
cached_plan = cached_plan[1:] | |
current_state = cached_plan[1] | |
start_state.from_numpy(current_state) | |
else: | |
print(f'Fail at {current_state}') | |
plan_states = np.array(plan_states) | |
plt.scatter(plan_states[:, 0], plan_states[:,1]) | |
plt.axis('equal') | |
plt.show(block=True) | |
return False | |
plan_states.append(current_state) | |
plan_states = np.array(plan_states) | |
return True | |
def one_frame_experiment(): | |
print("Initialize Taichi for JIT") | |
cem() | |
print("=========Experiment Started=========") | |
EXPERIMENT_TIME = 50 | |
res_list = [cem() for _ in range(EXPERIMENT_TIME)] | |
durations = np.percentile([res['time'] for res in res_list], 99) | |
print([res['iteration'] for res in res_list]) | |
print(f"Success Rate: {sum([res['success'] for res in res_list]) / EXPERIMENT_TIME}") | |
print(f"duration: {durations * 1e3} ms") | |
# print(f"States: {state.to_numpy()[sorted_indices.to_numpy()[0]]}") | |
# print("Mean:", mean.to_numpy()[:TIME_STEP], mean.to_numpy()[TIME_STEP:]) | |
# print("Covariance:", cov.to_numpy()) | |
# plan() | |
# one_frame_experiment() | |
def plan_experiment(): | |
EXPERIMENT_TIME = 50 | |
start_state.from_numpy(np.array([0., 0., 5.0, 0., 0.]).astype(np.float32)) | |
print(sum([plan() for _ in tqdm(range(EXPERIMENT_TIME))]) / EXPERIMENT_TIME) | |
plan_experiment() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment