Last active
December 10, 2022 17:32
-
-
Save goromal/c37629235750b65b9d0ec0e17456ee96 to your computer and use it in GitHub Desktop.
Simple traffic simulator on a circular road. Cars have two control objectives: maintain a consistent distance between cars and maintain a consistent car speed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Simple traffic simulator on a circular road. Cars have two | |
# control objectives: | |
# - Maintain a consistent distance between cars. | |
# - Maintain a consistent car speed. | |
import argparse | |
import numpy as np | |
import matplotlib.pyplot as plt | |
from matplotlib.animation import FuncAnimation | |
from geometry import SO2 | |
from pysignals import ( | |
Vector1Signal, | |
Vector1State, | |
SO2State, | |
RigidBodyParams2D, | |
Rotational1DOFSystem | |
) | |
argparser = argparse.ArgumentParser( | |
description="Simple traffic simulator on a circular road.", | |
formatter_class=argparse.ArgumentDefaultsHelpFormatter | |
) | |
argparser.add_argument("--num_cars", type=int, default=7, help="Number of cars to simulate.") | |
argparser.add_argument("--vel_des", type=float, default=1.0, help="Desired car velocity.") | |
argparser.add_argument("--vel_max", type=float, default=1.5, help="Maximum allowed car velocity.") | |
argparser.add_argument("--beta_mu", type=float, default=0.5, help="Mean proportional gain.") | |
argparser.add_argument("--beta_sigma", type=float, default=0.5, help="Proportional gain standard deviation.") | |
argparser.add_argument("--gamma_mu", type=float, default=0.5, help="Mean derivative gain.") | |
argparser.add_argument("--gamma_sigma", type=float, default=0.5, help="Derivative gain standard deviation.") | |
argparser.add_argument("--vel_col_thresh", type=float, default=0.30, help="Distance threshold for slowing down.") | |
argparser.add_argument("--pos_col_thresh", type=float, default=0.15, help="Distance threshold for collision avoidance") | |
args = argparser.parse_args() | |
class Simulator(object): | |
def __init__(self, n, vd, beta_mu, beta_sigma, gamma_mu, gamma_sigma): | |
self.cars = [] | |
self.n = n | |
self.t = 0.0 | |
# arbitrary/normalized system parameterization | |
p = RigidBodyParams2D() | |
p.m = 1.0 | |
p.J = 1.0 | |
p.g = np.zeros(2) | |
# generate cars with normally-distributed proportional and | |
# derivative gains | |
for i in range(n): | |
sys = Rotational1DOFSystem() | |
sys.setParams(p) | |
x0 = SO2State.identity() | |
dd = 2.0 * np.pi / float(n) | |
x0.pose = SO2.fromAngle(i * dd) | |
x0.twist = np.array([vd / 2.0]) | |
sys.x.update(self.t, x0) | |
beta = np.random.normal(beta_mu, beta_sigma) | |
gamma = np.random.normal(gamma_mu, gamma_sigma) | |
self.cars.append(Car(sys, dd, vd, beta, gamma)) | |
def update(self, dt): | |
# simulate dynamics for each car; each car uses the car in | |
# front of it as a reference | |
for i in range(self.n): | |
if i == self.n - 1: | |
R_kp1 = self.cars[0].sys.x().pose | |
w_kp1 = self.cars[0].sys.x().twist | |
else: | |
R_kp1 = self.cars[i + 1].sys.x().pose | |
w_kp1 = self.cars[i + 1].sys.x().twist | |
self.cars[i].update(self.t, dt, R_kp1, w_kp1) | |
self.t += dt | |
def states(self): | |
# report each car's position on the circle for plotting | |
car_states = np.zeros(self.n) | |
for i in range(self.n): | |
car_states[i] = self.cars[i].sys.x().pose.angle() | |
return car_states | |
class Car(object): | |
def __init__(self, sys, dd, vd, beta, gamma): | |
# dynamic system | |
self.sys = sys | |
# control input | |
self.u = Vector1Signal() | |
# desired inter-car distance | |
self.dd = dd | |
# desired car velocity | |
self.vd = vd | |
# proportional gain | |
self.beta = beta | |
# derivative gain | |
self.gamma = gamma | |
def update(self, t, dt, R_kp1, w_kp1): | |
# simulate PID control | |
p = self.beta * ((R_kp1 - self.sys.x().pose) - self.dd) | |
i = 0 | |
d = self.gamma * (self.vd - self.sys.x().twist) | |
ctrl = p + i + d | |
self.u.update(t, ctrl) | |
self.sys.simulateEuler(self.u, t + dt) | |
# velocity limitations | |
if self.sys.x().twist < 0: | |
x_k_w = SO2State.identity() | |
x_k_w.pose = self.sys.x().pose | |
x_k_w.twist = np.array([0.0]) | |
self.sys.x.update(t + dt, x_k_w, Vector1State.identity()) | |
elif self.sys.x().twist > args.vel_max: | |
x_k_w = SO2State.identity() | |
x_k_w.pose = self.sys.x().pose | |
x_k_w.twist = np.array([args.vel_max]) | |
self.sys.x.update(t + dt, x_k_w, Vector1State.identity()) | |
# collision avoidance | |
col_fac = abs(self.sys.x().pose - R_kp1) | |
if col_fac < args.vel_col_thresh: | |
R_k_col = self.sys.x().pose | |
w_k_col = col_fac / args.vel_col_thresh * self.sys.x().twist | |
if col_fac < args.pos_col_thresh: | |
R_k_col = R_kp1 + np.array([-1.25 * args.pos_col_thresh]) | |
if w_k_col > w_kp1: | |
w_k_col = 0.9 * w_kp1 | |
x_k_col = SO2State.identity() | |
x_k_col.pose = R_k_col | |
x_k_col.twist = w_k_col | |
self.sys.x.update(t + dt, x_k_col, Vector1State.identity()) | |
def main(): | |
dt_ms = 50 | |
dt = float(dt_ms) / 1000.0 | |
tf = 10.0 | |
sim = Simulator(args.num_cars, args.vel_des, args.beta_mu, args.beta_sigma, args.gamma_mu, args.gamma_sigma) | |
# plotting + animation | |
fig, ax = plt.subplots(figsize=(3,3)) | |
xdata, ydata = [], [] | |
ln, = ax.plot([], [], "ro") | |
def init(): | |
ax.set_xlim(-1.2, 1.2) | |
ax.set_ylim(-1.2, 1.2) | |
ax.grid() | |
th = np.linspace(0, 2 * np.pi, 30) | |
ax.plot(np.cos(th), np.sin(th), "k-") | |
states = np.array(sim.states()) | |
ln.set_data(np.cos(states), np.sin(states)) | |
return ln, | |
def update(i): | |
sim.update(dt) | |
states = np.array(sim.states()) | |
ln.set_data(np.cos(states), np.sin(states)) | |
return ln, | |
anim = FuncAnimation(fig, update, frames=int(tf / dt), init_func=init, interval=dt_ms, blit=True) | |
plt.show() | |
if __name__ == "__main__": | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment