Skip to content

Instantly share code, notes, and snippets.

@goromal
Last active December 10, 2022 17:32
Show Gist options
  • Save goromal/c37629235750b65b9d0ec0e17456ee96 to your computer and use it in GitHub Desktop.
Save goromal/c37629235750b65b9d0ec0e17456ee96 to your computer and use it in GitHub Desktop.
Simple traffic simulator on a circular road. Cars have two control objectives: maintain a consistent distance between cars and maintain a consistent car speed.
# Simple traffic simulator on a circular road. Cars have two
# control objectives:
# - Maintain a consistent distance between cars.
# - Maintain a consistent car speed.
import argparse
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from geometry import SO2
from pysignals import (
Vector1Signal,
Vector1State,
SO2State,
RigidBodyParams2D,
Rotational1DOFSystem
)
argparser = argparse.ArgumentParser(
description="Simple traffic simulator on a circular road.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
argparser.add_argument("--num_cars", type=int, default=7, help="Number of cars to simulate.")
argparser.add_argument("--vel_des", type=float, default=1.0, help="Desired car velocity.")
argparser.add_argument("--vel_max", type=float, default=1.5, help="Maximum allowed car velocity.")
argparser.add_argument("--beta_mu", type=float, default=0.5, help="Mean proportional gain.")
argparser.add_argument("--beta_sigma", type=float, default=0.5, help="Proportional gain standard deviation.")
argparser.add_argument("--gamma_mu", type=float, default=0.5, help="Mean derivative gain.")
argparser.add_argument("--gamma_sigma", type=float, default=0.5, help="Derivative gain standard deviation.")
argparser.add_argument("--vel_col_thresh", type=float, default=0.30, help="Distance threshold for slowing down.")
argparser.add_argument("--pos_col_thresh", type=float, default=0.15, help="Distance threshold for collision avoidance")
args = argparser.parse_args()
class Simulator(object):
def __init__(self, n, vd, beta_mu, beta_sigma, gamma_mu, gamma_sigma):
self.cars = []
self.n = n
self.t = 0.0
# arbitrary/normalized system parameterization
p = RigidBodyParams2D()
p.m = 1.0
p.J = 1.0
p.g = np.zeros(2)
# generate cars with normally-distributed proportional and
# derivative gains
for i in range(n):
sys = Rotational1DOFSystem()
sys.setParams(p)
x0 = SO2State.identity()
dd = 2.0 * np.pi / float(n)
x0.pose = SO2.fromAngle(i * dd)
x0.twist = np.array([vd / 2.0])
sys.x.update(self.t, x0)
beta = np.random.normal(beta_mu, beta_sigma)
gamma = np.random.normal(gamma_mu, gamma_sigma)
self.cars.append(Car(sys, dd, vd, beta, gamma))
def update(self, dt):
# simulate dynamics for each car; each car uses the car in
# front of it as a reference
for i in range(self.n):
if i == self.n - 1:
R_kp1 = self.cars[0].sys.x().pose
w_kp1 = self.cars[0].sys.x().twist
else:
R_kp1 = self.cars[i + 1].sys.x().pose
w_kp1 = self.cars[i + 1].sys.x().twist
self.cars[i].update(self.t, dt, R_kp1, w_kp1)
self.t += dt
def states(self):
# report each car's position on the circle for plotting
car_states = np.zeros(self.n)
for i in range(self.n):
car_states[i] = self.cars[i].sys.x().pose.angle()
return car_states
class Car(object):
def __init__(self, sys, dd, vd, beta, gamma):
# dynamic system
self.sys = sys
# control input
self.u = Vector1Signal()
# desired inter-car distance
self.dd = dd
# desired car velocity
self.vd = vd
# proportional gain
self.beta = beta
# derivative gain
self.gamma = gamma
def update(self, t, dt, R_kp1, w_kp1):
# simulate PID control
p = self.beta * ((R_kp1 - self.sys.x().pose) - self.dd)
i = 0
d = self.gamma * (self.vd - self.sys.x().twist)
ctrl = p + i + d
self.u.update(t, ctrl)
self.sys.simulateEuler(self.u, t + dt)
# velocity limitations
if self.sys.x().twist < 0:
x_k_w = SO2State.identity()
x_k_w.pose = self.sys.x().pose
x_k_w.twist = np.array([0.0])
self.sys.x.update(t + dt, x_k_w, Vector1State.identity())
elif self.sys.x().twist > args.vel_max:
x_k_w = SO2State.identity()
x_k_w.pose = self.sys.x().pose
x_k_w.twist = np.array([args.vel_max])
self.sys.x.update(t + dt, x_k_w, Vector1State.identity())
# collision avoidance
col_fac = abs(self.sys.x().pose - R_kp1)
if col_fac < args.vel_col_thresh:
R_k_col = self.sys.x().pose
w_k_col = col_fac / args.vel_col_thresh * self.sys.x().twist
if col_fac < args.pos_col_thresh:
R_k_col = R_kp1 + np.array([-1.25 * args.pos_col_thresh])
if w_k_col > w_kp1:
w_k_col = 0.9 * w_kp1
x_k_col = SO2State.identity()
x_k_col.pose = R_k_col
x_k_col.twist = w_k_col
self.sys.x.update(t + dt, x_k_col, Vector1State.identity())
def main():
dt_ms = 50
dt = float(dt_ms) / 1000.0
tf = 10.0
sim = Simulator(args.num_cars, args.vel_des, args.beta_mu, args.beta_sigma, args.gamma_mu, args.gamma_sigma)
# plotting + animation
fig, ax = plt.subplots(figsize=(3,3))
xdata, ydata = [], []
ln, = ax.plot([], [], "ro")
def init():
ax.set_xlim(-1.2, 1.2)
ax.set_ylim(-1.2, 1.2)
ax.grid()
th = np.linspace(0, 2 * np.pi, 30)
ax.plot(np.cos(th), np.sin(th), "k-")
states = np.array(sim.states())
ln.set_data(np.cos(states), np.sin(states))
return ln,
def update(i):
sim.update(dt)
states = np.array(sim.states())
ln.set_data(np.cos(states), np.sin(states))
return ln,
anim = FuncAnimation(fig, update, frames=int(tf / dt), init_func=init, interval=dt_ms, blit=True)
plt.show()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment