Skip to content

Instantly share code, notes, and snippets.

@kenji4569
Last active April 19, 2024 02:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kenji4569/8da401b4d1bbfe2b0b6be41ac6d3d1a2 to your computer and use it in GitHub Desktop.
Save kenji4569/8da401b4d1bbfe2b0b6be41ac6d3d1a2 to your computer and use it in GitHub Desktop.
# # Dynamic Vehicle Routing
# ## Refs
# - https://github.com/lb-robotics/python-vehicle-routing
# - https://zenn.dev/ohtaman/articles/streamlit_start_stop
# ## Setup
# $ pip install streamlit numpy scipy pandas altair readerwriterlock
# $ pip install wandb # Optional. Please do "wandb login" when you use it.
# $ streamlit run dynamic-vehicle-routing-streamlit-app.py
import threading
import time
import typing
import uuid
from datetime import datetime
from typing import List, Literal
import altair as alt
import numpy as np
import pandas as pd
import streamlit as st
from readerwriterlock import rwlock
from scipy.cluster.vq import kmeans
try:
import wandb
except ImportError as e:
wandb = None
TIME_STEP = 0.1
Location = List # TODO: npt.NDArray with import numpy.typing as npt
def get_random_location() -> Location:
return np.random.rand(2) * np.array([2, 2]) - np.array([1, 1])
def generate_uid():
return uuid.uuid4().hex
EventName = Literal[
"time_step",
"generate_demand",
"pickup_demand",
"assign_demand",
"generate_vehicle",
]
class Event:
def __init__(self, name: EventName, data):
self.uid = generate_uid()
self.time = datetime.now()
self.name = name
self.data = data
Events = List[Event]
def get_events_from_last_ui(events: Events, last_event_uid: str):
last_index = -1
num_events = len(events)
for i in range(num_events):
event = events[num_events - 1 - i]
if event.uid == last_event_uid:
last_index = num_events - 1 - i
break
return events[last_index + 1 :]
class Timer(threading.Thread):
def __init__(self, events: Events, **kwargs):
super().__init__(**kwargs)
self.done = threading.Event()
self.events = events
self.elapsed_time = 0
def run(self):
while not self.done.wait(0):
time.sleep(TIME_STEP)
self.elapsed_time += TIME_STEP
self.events.append(Event("time_step", None))
def stop(self):
self.done.set()
self.join()
class Demand:
def __init__(
self,
location: Location,
):
self.uid = generate_uid()
self.location = location
self.picked_up = False
class DemandManager(threading.Thread):
def __init__(self, events: Events, demand_throughput: float, **kwargs):
super().__init__(**kwargs)
self.done = threading.Event()
self.events = events
self.demand_throughput = demand_throughput
self.demands: List[Demand] = []
def run(self):
while not self.done.wait(0):
self._generate_demand()
time.sleep(self._get_next_event_time())
def _get_next_event_time(self):
return np.random.exponential(scale=1 / self.demand_throughput)
def _generate_demand(self):
demand = Demand(location=get_random_location())
self.demands.append(demand)
self.events.append(Event("generate_demand", {"demand_uid": demand.uid}))
def stop(self):
self.done.set()
self.join()
def get_demand_by_uid(self, uid: str):
return next(filter(lambda d: d.uid == uid, reversed(self.demands)), None)
def get_unpicked_demands(self):
return list(filter(lambda d: not d.picked_up, self.demands))
class Vehicle(threading.Thread):
def __init__(self, events: Events, location: Location, **kwargs):
super().__init__(**kwargs)
self.done = threading.Event()
self.lock = rwlock.RWLockFair()
self.uid = generate_uid()
self.speed = 0.5
self.pickup_threshold = 0.05
self.events = events
self.location = location
self.target_location = self.location
self.target_demand: Demand | None = None
self.depot_location: Location = None
def run(self):
while not self.done.wait(0):
time.sleep(TIME_STEP)
self._move()
def _move(self):
diff = self.target_location - self.location
diff_norm = np.linalg.norm(diff)
if diff_norm < self.pickup_threshold:
self.location = self.target_location
self._pickup_demand()
else:
movement = diff * (self.speed / diff_norm)
self.location = self.location + TIME_STEP * movement
def _pickup_demand(self):
if not self.target_demand:
return
target_demand = self.target_demand
with self.lock.gen_wlock():
if not target_demand.picked_up:
target_demand.picked_up = True
self.events.append(
Event(
"pickup_demand",
{"demand_uid": target_demand.uid, "vehicle_uid": self.uid},
)
)
self.release_demand()
def assign_demand(self, demand: Demand):
with self.lock.gen_wlock():
self.target_demand = demand
self.target_location = demand.location
self.events.append(
Event(
"assign_demand", {"demand_uid": demand.uid, "vehicle_uid": self.uid}
)
)
def release_demand(self):
with self.lock.gen_wlock():
self.target_demand = None
if self.depot_location is not None:
self.target_location = self.depot_location
else:
self.target_location = self.location
def set_depot_location(self, depot_location):
self.depot_location = depot_location
if not self.target_demand:
self.target_location = self.depot_location
class VehicleManager:
def __init__(self, events: Events, num_vehicles: int):
self.events = events
self.num_vehicles = num_vehicles
self.vehicles: List[Vehicle] = []
def start(self):
for i in range(self.num_vehicles):
self._generate_vehicle()
def _generate_vehicle(self):
vehicle = Vehicle(events=self.events, location=get_random_location())
vehicle.start()
self.vehicles.append(vehicle)
self.events.append(Event("generate_vehicle", {"vehicle_uid": vehicle.uid}))
def stop(self):
for vehicle in self.vehicles:
vehicle.done.set()
vehicle.join()
def is_running(self):
return len(self.vehicles) > 0 and all(
(vehicle.is_alive() for vehicle in self.vehicles)
)
def is_stopped(self):
return len(self.vehicles) == 0 or all(
(not vehicle.is_alive() for vehicle in self.vehicles)
)
def get_vehicle_by_uid(self, uid: str):
return next(filter(lambda v: v.uid == uid, reversed(self.vehicles)), None)
def get_unassigned_vehicles(self):
return list(filter(lambda v: not v.target_demand, self.vehicles))
AssignStrategy = Literal[
"random-selfish",
"nearest-selfish",
"random-cooperate",
"nearest-cooperate",
]
assign_strategies = list(typing.get_args(AssignStrategy))
DepotStrategy = Literal[
"stay",
"kmeans",
]
depot_strategies = list(typing.get_args(DepotStrategy))
class CentralManager(threading.Thread):
def __init__(
self,
events: Events,
demand_manager: DemandManager,
vehicle_manager: VehicleManager,
assign_strategy: AssignStrategy,
depot_strategy: DepotStrategy,
**kwargs,
):
super().__init__(**kwargs)
self.done = threading.Event()
self.events = events
self.demand_manager = demand_manager
self.vehicle_manager = vehicle_manager
self.assign_strategy = assign_strategy
self.depot_strategy = depot_strategy
self.last_event_uid = None
self.new_events = []
def run(self):
if self.depot_strategy == "kmeans":
self._set_depots_by_kmeans()
while not self.done.wait(0):
time.sleep(TIME_STEP)
if len(self.events) > 0:
self.new_events = get_events_from_last_ui(
self.events, self.last_event_uid
)
last_event = self.events[-1]
self.last_event_uid = last_event.uid
self._assign_vehicles_to_demands()
self._release_vehicles_from_demands()
def _set_depots_by_kmeans(self):
num_samples = 10000
samples = np.random.uniform(-1, 1, (num_samples, 2))
centroids, _distortion = kmeans(samples, self.vehicle_manager.num_vehicles)
for i, vehicle in enumerate(self.vehicle_manager.vehicles):
vehicle.set_depot_location(centroids[i])
def stop(self):
self.done.set()
self.join()
def _assign_vehicles_to_demands(self):
if self.assign_strategy == "random-selfish":
self._assign_by_random_selfish()
elif self.assign_strategy == "nearest-selfish":
self._assign_by_nearest_selfish()
elif self.assign_strategy == "random-cooperate":
self._assign_by_random_cooperate()
elif self.assign_strategy == "nearest-cooperate":
self._assign_by_nearest_cooperate()
else:
raise NotImplementedError()
def _release_vehicles_from_demands(self):
vehicles = list(
filter(lambda v: v.target_demand, self.vehicle_manager.vehicles)
)
for event in self.new_events:
if event.name == "pickup_demand":
for vehicle in vehicles:
if (
vehicle.target_demand
and vehicle.target_demand.uid == event.data["demand_uid"]
):
vehicle.release_demand()
def _assign_by_random_selfish(self):
demands = self.demand_manager.get_unpicked_demands()
vehicles = self.vehicle_manager.get_unassigned_vehicles()
if len(demands) == 0 or len(vehicles) == 0:
return
for vehicle in vehicles:
demand = demands[np.random.randint(len(demands))]
vehicle.assign_demand(demand)
def _assign_by_nearest_selfish(self):
demands = self.demand_manager.get_unpicked_demands()
vehicles = self.vehicle_manager.get_unassigned_vehicles()
if len(demands) == 0 or len(vehicles) == 0:
return
demand_locations = np.array([d.location for d in demands])
for vehicle in vehicles:
distances = np.linalg.norm(
(vehicle.location.reshape((1, 2)) - demand_locations), axis=1
)
demand_index = np.argmin(distances)
demand = demands[demand_index]
vehicle.assign_demand(demand)
def _assign_by_random_cooperate(self):
demands = self.demand_manager.get_unpicked_demands()
vehicles = self.vehicle_manager.get_unassigned_vehicles()
targeted_demand_uids = [
v.target_demand.uid
for v in self.vehicle_manager.vehicles
if v.target_demand
]
demands = list(filter(lambda d: d.uid not in targeted_demand_uids, demands))
if len(demands) == 0 or len(vehicles) == 0:
return
for vehicle in vehicles:
if len(demands) == 0:
break
demand_index = np.random.randint(len(demands))
demand = demands[demand_index]
vehicle.assign_demand(demand)
demands = demands[:demand_index] + demands[demand_index + 1 :]
def _assign_by_nearest_cooperate(self):
demands = self.demand_manager.get_unpicked_demands()
vehicles = self.vehicle_manager.get_unassigned_vehicles()
targeted_demand_uids = [
v.target_demand.uid
for v in self.vehicle_manager.vehicles
if v.target_demand
]
demands = list(filter(lambda d: d.uid not in targeted_demand_uids, demands))
if len(demands) == 0 or len(vehicles) == 0:
return
for vehicle in vehicles:
if len(demands) == 0:
break
demand_locations = np.array([d.location for d in demands])
distances = np.linalg.norm(
(vehicle.location.reshape((1, 2)) - demand_locations), axis=1
)
demand_index = np.argmin(distances)
demand = demands[demand_index]
vehicle.assign_demand(demand)
demands = demands[:demand_index] + demands[demand_index + 1 :]
class RootManager:
def __init__(self):
self.events: Events = []
self.started = False
def is_running(self):
if not self.started:
return False
return (
self.timer.is_alive()
and self.demand_manager.is_alive()
and self.vehicle_manager.is_running()
and self.central_manager.is_alive()
)
def is_stopped(self):
if not self.started:
return True
return (
not self.timer.is_alive()
and not self.demand_manager.is_alive()
and self.vehicle_manager.is_stopped()
and not self.central_manager.is_alive()
)
def setup(self, num_vehicles, demand_throughput, assign_strategy, depot_strategy):
self.timer = Timer(events=self.events)
self.demand_manager = DemandManager(
events=self.events, demand_throughput=demand_throughput
)
self.vehicle_manager = VehicleManager(
events=self.events, num_vehicles=num_vehicles
)
self.central_manager = CentralManager(
events=self.events,
demand_manager=self.demand_manager,
vehicle_manager=self.vehicle_manager,
assign_strategy=assign_strategy,
depot_strategy=depot_strategy,
)
def start(self):
if not self.is_stopped():
st.write("Some workers are alive")
return
self.timer.start()
self.demand_manager.start()
self.vehicle_manager.start()
self.central_manager.start()
self.started = True
def stop(self):
self.timer.stop()
self.demand_manager.stop()
self.vehicle_manager.stop()
self.central_manager.stop()
def display_vehicles_and_demands(root_manager: RootManager):
demands = root_manager.demand_manager.get_unpicked_demands()
demands_df = pd.DataFrame(
[(demand.location[0], demand.location[1], "demand") for demand in demands],
columns=["x", "y", "type"],
)
vehicles = root_manager.vehicle_manager.vehicles
vehicles_df = pd.DataFrame(
[(vehicle.location[0], vehicle.location[1], "vehicle") for vehicle in vehicles],
columns=["x", "y", "type"],
)
df = pd.concat([demands_df, vehicles_df], axis=0)
if len(df) > 0:
scatter = (
alt.Chart(df)
.mark_point(filled=True, size=100, opacity=0.7)
.encode(
x=alt.X(
"x",
scale=alt.Scale(domain=[-1, 1]),
axis=alt.Axis(title="x"),
),
y=alt.Y(
"y",
scale=alt.Scale(domain=[-1, 1]),
axis=alt.Axis(title="y"),
),
color=alt.Color(
"type",
scale=alt.Scale(
domain=["demand", "vehicle"], range=["red", "blue"]
),
legend=None,
),
)
).properties(width=350, height=350)
st.altair_chart(scatter, use_container_width=False)
@st.cache_resource
def get_metrics_data():
return {}
def display_metrics(root_manager: RootManager, use_wandb: bool):
metrics_data = get_metrics_data()
last_event_uid = metrics_data["last_event_uid"]
events = get_events_from_last_ui(root_manager.events, last_event_uid)
num_time_steps = metrics_data["num_time_steps"]
num_unpicked_demands_metrics = metrics_data["num_unpicked_demands_metrics"]
num_unpicked_demands = num_unpicked_demands_metrics[-1][1]
for event in events:
if event.name == "time_step":
num_time_steps += TIME_STEP
num_unpicked_demands_metrics.append((num_time_steps, num_unpicked_demands))
metrics_data["last_event_uid"] = event.uid
metrics_data["num_time_steps"] = num_time_steps
metrics_data["num_unpicked_demands_metrics"] = num_unpicked_demands_metrics
elif event.name == "generate_demand":
num_unpicked_demands += 1
elif event.name == "pickup_demand":
num_unpicked_demands -= 1
num_unpicked_demands_metrics_df = pd.DataFrame(
num_unpicked_demands_metrics,
columns=["t", "v"],
)
num_unpicked_demands_metrics_df_line = (
alt.Chart(num_unpicked_demands_metrics_df)
.mark_line(color="steelblue")
.encode(
x=alt.X(
"t",
axis=alt.Axis(title="time step"),
),
y=alt.Y(
"v",
axis=alt.Axis(title="num unpicked demands"),
),
)
).properties(width=350, height=350)
st.altair_chart(num_unpicked_demands_metrics_df_line, use_container_width=False)
if use_wandb and wandb:
wandb.log({"num_unpicked_demands": num_unpicked_demands})
def display_debug(root_manager: RootManager):
st.write(f"elapsed time: {int(root_manager.timer.elapsed_time)} sec")
st.write(f"timer thread: {root_manager.timer.getName()}")
st.write(f"demand generator thread: {root_manager.demand_manager.getName()}")
vehicle_thread_names = ", ".join(
[vehicle.getName() for vehicle in root_manager.vehicle_manager.vehicles]
)
st.write(f"vehicle threads: {vehicle_thread_names}")
st.write(f"central manager thread: {root_manager.central_manager.getName()}")
@st.cache_resource
def get_root_manager():
return RootManager()
def stop(root_manager: RootManager, use_wandb: bool):
if use_wandb and wandb:
wandb.finish()
root_manager.stop()
def main():
root_manager = get_root_manager()
is_running = root_manager.is_running()
with st.sidebar:
num_vehicles = st.selectbox(
"number of vehicles",
[1, 3, 5, 10],
index=2,
disabled=is_running,
)
demand_throughput = st.selectbox(
"demand throughput",
[1, 1.5, 2, 3],
index=2,
disabled=is_running,
)
assign_strategy = st.selectbox(
"assign_strategy",
assign_strategies,
index=0,
disabled=is_running,
)
depot_strategy = st.selectbox(
"depot_strategy",
depot_strategies,
index=0,
disabled=is_running,
)
max_elapsed_time = st.selectbox(
"max_time_step",
[5, 10, 20, 50, 100],
index=2,
disabled=is_running,
)
use_wandb = st.checkbox("use_wandb")
if st.button(
"Start",
disabled=is_running,
):
if root_manager.started:
st.cache_resource.clear()
root_manager = get_root_manager()
if use_wandb and wandb:
wandb.init(
project="dynamic-vehicle-routing",
config={
"num_vehicles": num_vehicles,
"demand_throughput": demand_throughput,
"assign_strategy": assign_strategy,
"depot_strategy": depot_strategy,
},
)
metrics_data = get_metrics_data()
metrics_data["last_event_uid"] = None
metrics_data["num_time_steps"] = 0
metrics_data["num_unpicked_demands_metrics"] = [(0, 0)]
root_manager.setup(
num_vehicles=num_vehicles,
demand_throughput=demand_throughput,
assign_strategy=assign_strategy,
depot_strategy=depot_strategy,
)
root_manager.start()
st.rerun()
if st.button("Stop", disabled=not is_running):
stop(root_manager, use_wandb)
st.rerun()
if is_running:
col1, col2 = st.columns(2)
with col1:
display_vehicles_and_demands(root_manager)
with col2:
display_metrics(root_manager, use_wandb=use_wandb)
st.divider()
display_debug(root_manager)
if root_manager.timer.elapsed_time > max_elapsed_time:
stop(root_manager, use_wandb)
st.rerun()
else:
time.sleep(TIME_STEP)
st.rerun()
elif root_manager.is_stopped():
st.write("Stopped")
else:
st.write("Stopping..")
time.sleep(TIME_STEP)
st.rerun()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment