Last active
April 19, 2024 02:45
-
-
Save kenji4569/8da401b4d1bbfe2b0b6be41ac6d3d1a2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# # Dynamic Vehicle Routing | |
# ## Refs | |
# - https://github.com/lb-robotics/python-vehicle-routing | |
# - https://zenn.dev/ohtaman/articles/streamlit_start_stop | |
# ## Setup | |
# $ pip install streamlit numpy scipy pandas altair readerwriterlock | |
# $ pip install wandb # Optional. Please do "wandb login" when you use it. | |
# $ streamlit run dynamic-vehicle-routing-streamlit-app.py | |
import threading | |
import time | |
import typing | |
import uuid | |
from datetime import datetime | |
from typing import List, Literal | |
import altair as alt | |
import numpy as np | |
import pandas as pd | |
import streamlit as st | |
from readerwriterlock import rwlock | |
from scipy.cluster.vq import kmeans | |
try: | |
import wandb | |
except ImportError as e: | |
wandb = None | |
TIME_STEP = 0.1 | |
Location = List # TODO: npt.NDArray with import numpy.typing as npt | |
def get_random_location() -> Location: | |
return np.random.rand(2) * np.array([2, 2]) - np.array([1, 1]) | |
def generate_uid(): | |
return uuid.uuid4().hex | |
EventName = Literal[ | |
"time_step", | |
"generate_demand", | |
"pickup_demand", | |
"assign_demand", | |
"generate_vehicle", | |
] | |
class Event: | |
def __init__(self, name: EventName, data): | |
self.uid = generate_uid() | |
self.time = datetime.now() | |
self.name = name | |
self.data = data | |
Events = List[Event] | |
def get_events_from_last_ui(events: Events, last_event_uid: str): | |
last_index = -1 | |
num_events = len(events) | |
for i in range(num_events): | |
event = events[num_events - 1 - i] | |
if event.uid == last_event_uid: | |
last_index = num_events - 1 - i | |
break | |
return events[last_index + 1 :] | |
class Timer(threading.Thread): | |
def __init__(self, events: Events, **kwargs): | |
super().__init__(**kwargs) | |
self.done = threading.Event() | |
self.events = events | |
self.elapsed_time = 0 | |
def run(self): | |
while not self.done.wait(0): | |
time.sleep(TIME_STEP) | |
self.elapsed_time += TIME_STEP | |
self.events.append(Event("time_step", None)) | |
def stop(self): | |
self.done.set() | |
self.join() | |
class Demand: | |
def __init__( | |
self, | |
location: Location, | |
): | |
self.uid = generate_uid() | |
self.location = location | |
self.picked_up = False | |
class DemandManager(threading.Thread): | |
def __init__(self, events: Events, demand_throughput: float, **kwargs): | |
super().__init__(**kwargs) | |
self.done = threading.Event() | |
self.events = events | |
self.demand_throughput = demand_throughput | |
self.demands: List[Demand] = [] | |
def run(self): | |
while not self.done.wait(0): | |
self._generate_demand() | |
time.sleep(self._get_next_event_time()) | |
def _get_next_event_time(self): | |
return np.random.exponential(scale=1 / self.demand_throughput) | |
def _generate_demand(self): | |
demand = Demand(location=get_random_location()) | |
self.demands.append(demand) | |
self.events.append(Event("generate_demand", {"demand_uid": demand.uid})) | |
def stop(self): | |
self.done.set() | |
self.join() | |
def get_demand_by_uid(self, uid: str): | |
return next(filter(lambda d: d.uid == uid, reversed(self.demands)), None) | |
def get_unpicked_demands(self): | |
return list(filter(lambda d: not d.picked_up, self.demands)) | |
class Vehicle(threading.Thread): | |
def __init__(self, events: Events, location: Location, **kwargs): | |
super().__init__(**kwargs) | |
self.done = threading.Event() | |
self.lock = rwlock.RWLockFair() | |
self.uid = generate_uid() | |
self.speed = 0.5 | |
self.pickup_threshold = 0.05 | |
self.events = events | |
self.location = location | |
self.target_location = self.location | |
self.target_demand: Demand | None = None | |
self.depot_location: Location = None | |
def run(self): | |
while not self.done.wait(0): | |
time.sleep(TIME_STEP) | |
self._move() | |
def _move(self): | |
diff = self.target_location - self.location | |
diff_norm = np.linalg.norm(diff) | |
if diff_norm < self.pickup_threshold: | |
self.location = self.target_location | |
self._pickup_demand() | |
else: | |
movement = diff * (self.speed / diff_norm) | |
self.location = self.location + TIME_STEP * movement | |
def _pickup_demand(self): | |
if not self.target_demand: | |
return | |
target_demand = self.target_demand | |
with self.lock.gen_wlock(): | |
if not target_demand.picked_up: | |
target_demand.picked_up = True | |
self.events.append( | |
Event( | |
"pickup_demand", | |
{"demand_uid": target_demand.uid, "vehicle_uid": self.uid}, | |
) | |
) | |
self.release_demand() | |
def assign_demand(self, demand: Demand): | |
with self.lock.gen_wlock(): | |
self.target_demand = demand | |
self.target_location = demand.location | |
self.events.append( | |
Event( | |
"assign_demand", {"demand_uid": demand.uid, "vehicle_uid": self.uid} | |
) | |
) | |
def release_demand(self): | |
with self.lock.gen_wlock(): | |
self.target_demand = None | |
if self.depot_location is not None: | |
self.target_location = self.depot_location | |
else: | |
self.target_location = self.location | |
def set_depot_location(self, depot_location): | |
self.depot_location = depot_location | |
if not self.target_demand: | |
self.target_location = self.depot_location | |
class VehicleManager: | |
def __init__(self, events: Events, num_vehicles: int): | |
self.events = events | |
self.num_vehicles = num_vehicles | |
self.vehicles: List[Vehicle] = [] | |
def start(self): | |
for i in range(self.num_vehicles): | |
self._generate_vehicle() | |
def _generate_vehicle(self): | |
vehicle = Vehicle(events=self.events, location=get_random_location()) | |
vehicle.start() | |
self.vehicles.append(vehicle) | |
self.events.append(Event("generate_vehicle", {"vehicle_uid": vehicle.uid})) | |
def stop(self): | |
for vehicle in self.vehicles: | |
vehicle.done.set() | |
vehicle.join() | |
def is_running(self): | |
return len(self.vehicles) > 0 and all( | |
(vehicle.is_alive() for vehicle in self.vehicles) | |
) | |
def is_stopped(self): | |
return len(self.vehicles) == 0 or all( | |
(not vehicle.is_alive() for vehicle in self.vehicles) | |
) | |
def get_vehicle_by_uid(self, uid: str): | |
return next(filter(lambda v: v.uid == uid, reversed(self.vehicles)), None) | |
def get_unassigned_vehicles(self): | |
return list(filter(lambda v: not v.target_demand, self.vehicles)) | |
AssignStrategy = Literal[ | |
"random-selfish", | |
"nearest-selfish", | |
"random-cooperate", | |
"nearest-cooperate", | |
] | |
assign_strategies = list(typing.get_args(AssignStrategy)) | |
DepotStrategy = Literal[ | |
"stay", | |
"kmeans", | |
] | |
depot_strategies = list(typing.get_args(DepotStrategy)) | |
class CentralManager(threading.Thread): | |
def __init__( | |
self, | |
events: Events, | |
demand_manager: DemandManager, | |
vehicle_manager: VehicleManager, | |
assign_strategy: AssignStrategy, | |
depot_strategy: DepotStrategy, | |
**kwargs, | |
): | |
super().__init__(**kwargs) | |
self.done = threading.Event() | |
self.events = events | |
self.demand_manager = demand_manager | |
self.vehicle_manager = vehicle_manager | |
self.assign_strategy = assign_strategy | |
self.depot_strategy = depot_strategy | |
self.last_event_uid = None | |
self.new_events = [] | |
def run(self): | |
if self.depot_strategy == "kmeans": | |
self._set_depots_by_kmeans() | |
while not self.done.wait(0): | |
time.sleep(TIME_STEP) | |
if len(self.events) > 0: | |
self.new_events = get_events_from_last_ui( | |
self.events, self.last_event_uid | |
) | |
last_event = self.events[-1] | |
self.last_event_uid = last_event.uid | |
self._assign_vehicles_to_demands() | |
self._release_vehicles_from_demands() | |
def _set_depots_by_kmeans(self): | |
num_samples = 10000 | |
samples = np.random.uniform(-1, 1, (num_samples, 2)) | |
centroids, _distortion = kmeans(samples, self.vehicle_manager.num_vehicles) | |
for i, vehicle in enumerate(self.vehicle_manager.vehicles): | |
vehicle.set_depot_location(centroids[i]) | |
def stop(self): | |
self.done.set() | |
self.join() | |
def _assign_vehicles_to_demands(self): | |
if self.assign_strategy == "random-selfish": | |
self._assign_by_random_selfish() | |
elif self.assign_strategy == "nearest-selfish": | |
self._assign_by_nearest_selfish() | |
elif self.assign_strategy == "random-cooperate": | |
self._assign_by_random_cooperate() | |
elif self.assign_strategy == "nearest-cooperate": | |
self._assign_by_nearest_cooperate() | |
else: | |
raise NotImplementedError() | |
def _release_vehicles_from_demands(self): | |
vehicles = list( | |
filter(lambda v: v.target_demand, self.vehicle_manager.vehicles) | |
) | |
for event in self.new_events: | |
if event.name == "pickup_demand": | |
for vehicle in vehicles: | |
if ( | |
vehicle.target_demand | |
and vehicle.target_demand.uid == event.data["demand_uid"] | |
): | |
vehicle.release_demand() | |
def _assign_by_random_selfish(self): | |
demands = self.demand_manager.get_unpicked_demands() | |
vehicles = self.vehicle_manager.get_unassigned_vehicles() | |
if len(demands) == 0 or len(vehicles) == 0: | |
return | |
for vehicle in vehicles: | |
demand = demands[np.random.randint(len(demands))] | |
vehicle.assign_demand(demand) | |
def _assign_by_nearest_selfish(self): | |
demands = self.demand_manager.get_unpicked_demands() | |
vehicles = self.vehicle_manager.get_unassigned_vehicles() | |
if len(demands) == 0 or len(vehicles) == 0: | |
return | |
demand_locations = np.array([d.location for d in demands]) | |
for vehicle in vehicles: | |
distances = np.linalg.norm( | |
(vehicle.location.reshape((1, 2)) - demand_locations), axis=1 | |
) | |
demand_index = np.argmin(distances) | |
demand = demands[demand_index] | |
vehicle.assign_demand(demand) | |
def _assign_by_random_cooperate(self): | |
demands = self.demand_manager.get_unpicked_demands() | |
vehicles = self.vehicle_manager.get_unassigned_vehicles() | |
targeted_demand_uids = [ | |
v.target_demand.uid | |
for v in self.vehicle_manager.vehicles | |
if v.target_demand | |
] | |
demands = list(filter(lambda d: d.uid not in targeted_demand_uids, demands)) | |
if len(demands) == 0 or len(vehicles) == 0: | |
return | |
for vehicle in vehicles: | |
if len(demands) == 0: | |
break | |
demand_index = np.random.randint(len(demands)) | |
demand = demands[demand_index] | |
vehicle.assign_demand(demand) | |
demands = demands[:demand_index] + demands[demand_index + 1 :] | |
def _assign_by_nearest_cooperate(self): | |
demands = self.demand_manager.get_unpicked_demands() | |
vehicles = self.vehicle_manager.get_unassigned_vehicles() | |
targeted_demand_uids = [ | |
v.target_demand.uid | |
for v in self.vehicle_manager.vehicles | |
if v.target_demand | |
] | |
demands = list(filter(lambda d: d.uid not in targeted_demand_uids, demands)) | |
if len(demands) == 0 or len(vehicles) == 0: | |
return | |
for vehicle in vehicles: | |
if len(demands) == 0: | |
break | |
demand_locations = np.array([d.location for d in demands]) | |
distances = np.linalg.norm( | |
(vehicle.location.reshape((1, 2)) - demand_locations), axis=1 | |
) | |
demand_index = np.argmin(distances) | |
demand = demands[demand_index] | |
vehicle.assign_demand(demand) | |
demands = demands[:demand_index] + demands[demand_index + 1 :] | |
class RootManager: | |
def __init__(self): | |
self.events: Events = [] | |
self.started = False | |
def is_running(self): | |
if not self.started: | |
return False | |
return ( | |
self.timer.is_alive() | |
and self.demand_manager.is_alive() | |
and self.vehicle_manager.is_running() | |
and self.central_manager.is_alive() | |
) | |
def is_stopped(self): | |
if not self.started: | |
return True | |
return ( | |
not self.timer.is_alive() | |
and not self.demand_manager.is_alive() | |
and self.vehicle_manager.is_stopped() | |
and not self.central_manager.is_alive() | |
) | |
def setup(self, num_vehicles, demand_throughput, assign_strategy, depot_strategy): | |
self.timer = Timer(events=self.events) | |
self.demand_manager = DemandManager( | |
events=self.events, demand_throughput=demand_throughput | |
) | |
self.vehicle_manager = VehicleManager( | |
events=self.events, num_vehicles=num_vehicles | |
) | |
self.central_manager = CentralManager( | |
events=self.events, | |
demand_manager=self.demand_manager, | |
vehicle_manager=self.vehicle_manager, | |
assign_strategy=assign_strategy, | |
depot_strategy=depot_strategy, | |
) | |
def start(self): | |
if not self.is_stopped(): | |
st.write("Some workers are alive") | |
return | |
self.timer.start() | |
self.demand_manager.start() | |
self.vehicle_manager.start() | |
self.central_manager.start() | |
self.started = True | |
def stop(self): | |
self.timer.stop() | |
self.demand_manager.stop() | |
self.vehicle_manager.stop() | |
self.central_manager.stop() | |
def display_vehicles_and_demands(root_manager: RootManager): | |
demands = root_manager.demand_manager.get_unpicked_demands() | |
demands_df = pd.DataFrame( | |
[(demand.location[0], demand.location[1], "demand") for demand in demands], | |
columns=["x", "y", "type"], | |
) | |
vehicles = root_manager.vehicle_manager.vehicles | |
vehicles_df = pd.DataFrame( | |
[(vehicle.location[0], vehicle.location[1], "vehicle") for vehicle in vehicles], | |
columns=["x", "y", "type"], | |
) | |
df = pd.concat([demands_df, vehicles_df], axis=0) | |
if len(df) > 0: | |
scatter = ( | |
alt.Chart(df) | |
.mark_point(filled=True, size=100, opacity=0.7) | |
.encode( | |
x=alt.X( | |
"x", | |
scale=alt.Scale(domain=[-1, 1]), | |
axis=alt.Axis(title="x"), | |
), | |
y=alt.Y( | |
"y", | |
scale=alt.Scale(domain=[-1, 1]), | |
axis=alt.Axis(title="y"), | |
), | |
color=alt.Color( | |
"type", | |
scale=alt.Scale( | |
domain=["demand", "vehicle"], range=["red", "blue"] | |
), | |
legend=None, | |
), | |
) | |
).properties(width=350, height=350) | |
st.altair_chart(scatter, use_container_width=False) | |
@st.cache_resource | |
def get_metrics_data(): | |
return {} | |
def display_metrics(root_manager: RootManager, use_wandb: bool): | |
metrics_data = get_metrics_data() | |
last_event_uid = metrics_data["last_event_uid"] | |
events = get_events_from_last_ui(root_manager.events, last_event_uid) | |
num_time_steps = metrics_data["num_time_steps"] | |
num_unpicked_demands_metrics = metrics_data["num_unpicked_demands_metrics"] | |
num_unpicked_demands = num_unpicked_demands_metrics[-1][1] | |
for event in events: | |
if event.name == "time_step": | |
num_time_steps += TIME_STEP | |
num_unpicked_demands_metrics.append((num_time_steps, num_unpicked_demands)) | |
metrics_data["last_event_uid"] = event.uid | |
metrics_data["num_time_steps"] = num_time_steps | |
metrics_data["num_unpicked_demands_metrics"] = num_unpicked_demands_metrics | |
elif event.name == "generate_demand": | |
num_unpicked_demands += 1 | |
elif event.name == "pickup_demand": | |
num_unpicked_demands -= 1 | |
num_unpicked_demands_metrics_df = pd.DataFrame( | |
num_unpicked_demands_metrics, | |
columns=["t", "v"], | |
) | |
num_unpicked_demands_metrics_df_line = ( | |
alt.Chart(num_unpicked_demands_metrics_df) | |
.mark_line(color="steelblue") | |
.encode( | |
x=alt.X( | |
"t", | |
axis=alt.Axis(title="time step"), | |
), | |
y=alt.Y( | |
"v", | |
axis=alt.Axis(title="num unpicked demands"), | |
), | |
) | |
).properties(width=350, height=350) | |
st.altair_chart(num_unpicked_demands_metrics_df_line, use_container_width=False) | |
if use_wandb and wandb: | |
wandb.log({"num_unpicked_demands": num_unpicked_demands}) | |
def display_debug(root_manager: RootManager): | |
st.write(f"elapsed time: {int(root_manager.timer.elapsed_time)} sec") | |
st.write(f"timer thread: {root_manager.timer.getName()}") | |
st.write(f"demand generator thread: {root_manager.demand_manager.getName()}") | |
vehicle_thread_names = ", ".join( | |
[vehicle.getName() for vehicle in root_manager.vehicle_manager.vehicles] | |
) | |
st.write(f"vehicle threads: {vehicle_thread_names}") | |
st.write(f"central manager thread: {root_manager.central_manager.getName()}") | |
@st.cache_resource | |
def get_root_manager(): | |
return RootManager() | |
def stop(root_manager: RootManager, use_wandb: bool): | |
if use_wandb and wandb: | |
wandb.finish() | |
root_manager.stop() | |
def main(): | |
root_manager = get_root_manager() | |
is_running = root_manager.is_running() | |
with st.sidebar: | |
num_vehicles = st.selectbox( | |
"number of vehicles", | |
[1, 3, 5, 10], | |
index=2, | |
disabled=is_running, | |
) | |
demand_throughput = st.selectbox( | |
"demand throughput", | |
[1, 1.5, 2, 3], | |
index=2, | |
disabled=is_running, | |
) | |
assign_strategy = st.selectbox( | |
"assign_strategy", | |
assign_strategies, | |
index=0, | |
disabled=is_running, | |
) | |
depot_strategy = st.selectbox( | |
"depot_strategy", | |
depot_strategies, | |
index=0, | |
disabled=is_running, | |
) | |
max_elapsed_time = st.selectbox( | |
"max_time_step", | |
[5, 10, 20, 50, 100], | |
index=2, | |
disabled=is_running, | |
) | |
use_wandb = st.checkbox("use_wandb") | |
if st.button( | |
"Start", | |
disabled=is_running, | |
): | |
if root_manager.started: | |
st.cache_resource.clear() | |
root_manager = get_root_manager() | |
if use_wandb and wandb: | |
wandb.init( | |
project="dynamic-vehicle-routing", | |
config={ | |
"num_vehicles": num_vehicles, | |
"demand_throughput": demand_throughput, | |
"assign_strategy": assign_strategy, | |
"depot_strategy": depot_strategy, | |
}, | |
) | |
metrics_data = get_metrics_data() | |
metrics_data["last_event_uid"] = None | |
metrics_data["num_time_steps"] = 0 | |
metrics_data["num_unpicked_demands_metrics"] = [(0, 0)] | |
root_manager.setup( | |
num_vehicles=num_vehicles, | |
demand_throughput=demand_throughput, | |
assign_strategy=assign_strategy, | |
depot_strategy=depot_strategy, | |
) | |
root_manager.start() | |
st.rerun() | |
if st.button("Stop", disabled=not is_running): | |
stop(root_manager, use_wandb) | |
st.rerun() | |
if is_running: | |
col1, col2 = st.columns(2) | |
with col1: | |
display_vehicles_and_demands(root_manager) | |
with col2: | |
display_metrics(root_manager, use_wandb=use_wandb) | |
st.divider() | |
display_debug(root_manager) | |
if root_manager.timer.elapsed_time > max_elapsed_time: | |
stop(root_manager, use_wandb) | |
st.rerun() | |
else: | |
time.sleep(TIME_STEP) | |
st.rerun() | |
elif root_manager.is_stopped(): | |
st.write("Stopped") | |
else: | |
st.write("Stopping..") | |
time.sleep(TIME_STEP) | |
st.rerun() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment