Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

View joaofig's full-sized avatar

João Paulo Figueira joaofig

View GitHub Profile
@joaofig
joaofig / CompoundTrajectory_final.py
Last active January 15, 2024 15:53
Final methods of the compound trajectory class
@classmethod
def _trim_array(cls, array: np.ndarray) -> np.ndarray:
if array.shape[0] > 2:
return array[1:-1]
else:
return array
def to_trajectory(self) -> Trajectory:
lat = np.concatenate([self._trim_array(segment.lat) for segment in self.segments])
lon = np.concatenate([self._trim_array(segment.lon) for segment in self.segments])
@joaofig
joaofig / segment_to_trajectory.py
Created January 9, 2024 19:58
Converts a trajectory segment to a node-based trajectory, recomputing traversal times.
def segment_to_trajectory(segment: list[LatLon],
dt: float,
t0: float) -> Trajectory:
lat = np.array([point.lat for point in segment])
lon = np.array([point.lon for point in segment])
time = np.zeros_like(lat)
seg_lens = vec_haversine(lat[1:], lon[1:], lat[:-1], lon[:-1])
avg_speed = np.sum(seg_lens) / dt
time[0] = t0
@joaofig
joaofig / merge_trajectory.py
Created January 9, 2024 19:45
Merges the map-matched trajectory to the corresponding map nodes
def merge_trajectory(trajectory: Trajectory,
map_lat: np.ndarray,
map_lon: np.ndarray) -> list[list[LatLon]]:
segments: list[list[LatLon]] = []
j = 0
for i in range(trajectory.lat.shape[0]-1):
pt0 = LatLon(float(trajectory.lat[i]), float(trajectory.lon[i]))
pt1 = LatLon(float(trajectory.lat[i+1]), float(trajectory.lon[i+1]))
seg_len = pt0.haversine(*pt1.to_tuple())
@joaofig
joaofig / CompoundTrajectory_init.py
Created January 9, 2024 19:14
The compound trajectory constructor
class CompoundTrajectory:
def __init__(self, trajectory: Trajectory,
map_lat: np.ndarray,
map_lon: np.ndarray):
self.segments: list[Trajectory] = []
t0 = 0
merged = merge_trajectory(trajectory, map_lat, map_lon)
for i, segment in enumerate(merged):
dt = trajectory.dt[i]
@joaofig
joaofig / speed_prep_main.py
Last active January 7, 2024 21:08
Main loop for speed inference data preparation.
def main():
config = get_config(tile_extract='./valhalla/custom_files/valhalla_tiles.tar',
verbose=True)
trips = get_all_trips()
for traj_id, vehicle_id, trip_id in trips:
print(f"Vehicle {vehicle_id}, trip {trip_id}, trajectory: {traj_id}")
trip_df = get_trip_signals(vehicle_id, trip_id)
def evolve_path(self: PredictedPath,
h0: int,
probability: float):
path = PredictedPath(probability=self.probability * probability,
step=self.step+1,
size=self.size)
path.array = self.array.copy()
path.array[self.step] = h0
return path
def expand_path(path: PredictedPath,
max_branch: int = 3) -> list[PredictedPath]:
successors = get_successors(path.array[path.step-2], path.array[path.step-1])
best_expansions = []
total = successors.total()
for p in successors.most_common(max_branch):
new_probability = p[1] / total
new_path = evolve_path(path, p[0], new_probability)
best_expansions.append(new_path)
return best_expansions
def expand_seed(h0: int, h1: int,
max_branch: int = 3,
max_length: int = 10) -> list[PredictedPath]:
final = []
seed = PredictedPath.from_seed(h0, h1, max_length)
paths = expand_path(seed, max_branch)
while len(final) < max_branch:
expanded = []
for p in paths:
expanded.extend(expand_path(p, max_branch=max_branch))
class PredictedPath():
def __init__(self,
probability: float = 1.0,
step: int = 1,
size: int = 0):
self.probability = probability
self.step = step
self.size = size
self.array: np.ndarray = np.zeros(size, dtype=int)
def predict(max_branch: int = 3,
max_length: int = 10) -> FeatureGroup | None:
fg = folium.FeatureGroup(name="polylines")
if "token_list" in st.session_state:
token_list = st.session_state["token_list"]
seed = token_list[-3:-1]
if len(seed) > 1:
paths = expand_seed(seed[0], seed[1],
max_branch=max_branch,