This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@classmethod | |
def _trim_array(cls, array: np.ndarray) -> np.ndarray: | |
if array.shape[0] > 2: | |
return array[1:-1] | |
else: | |
return array | |
def to_trajectory(self) -> Trajectory: | |
lat = np.concatenate([self._trim_array(segment.lat) for segment in self.segments]) | |
lon = np.concatenate([self._trim_array(segment.lon) for segment in self.segments]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def segment_to_trajectory(segment: list[LatLon], | |
dt: float, | |
t0: float) -> Trajectory: | |
lat = np.array([point.lat for point in segment]) | |
lon = np.array([point.lon for point in segment]) | |
time = np.zeros_like(lat) | |
seg_lens = vec_haversine(lat[1:], lon[1:], lat[:-1], lon[:-1]) | |
avg_speed = np.sum(seg_lens) / dt | |
time[0] = t0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def merge_trajectory(trajectory: Trajectory, | |
map_lat: np.ndarray, | |
map_lon: np.ndarray) -> list[list[LatLon]]: | |
segments: list[list[LatLon]] = [] | |
j = 0 | |
for i in range(trajectory.lat.shape[0]-1): | |
pt0 = LatLon(float(trajectory.lat[i]), float(trajectory.lon[i])) | |
pt1 = LatLon(float(trajectory.lat[i+1]), float(trajectory.lon[i+1])) | |
seg_len = pt0.haversine(*pt1.to_tuple()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class CompoundTrajectory: | |
def __init__(self, trajectory: Trajectory, | |
map_lat: np.ndarray, | |
map_lon: np.ndarray): | |
self.segments: list[Trajectory] = [] | |
t0 = 0 | |
merged = merge_trajectory(trajectory, map_lat, map_lon) | |
for i, segment in enumerate(merged): | |
dt = trajectory.dt[i] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def main(): | |
config = get_config(tile_extract='./valhalla/custom_files/valhalla_tiles.tar', | |
verbose=True) | |
trips = get_all_trips() | |
for traj_id, vehicle_id, trip_id in trips: | |
print(f"Vehicle {vehicle_id}, trip {trip_id}, trajectory: {traj_id}") | |
trip_df = get_trip_signals(vehicle_id, trip_id) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def evolve_path(self: PredictedPath, | |
h0: int, | |
probability: float): | |
path = PredictedPath(probability=self.probability * probability, | |
step=self.step+1, | |
size=self.size) | |
path.array = self.array.copy() | |
path.array[self.step] = h0 | |
return path |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def expand_path(path: PredictedPath, | |
max_branch: int = 3) -> list[PredictedPath]: | |
successors = get_successors(path.array[path.step-2], path.array[path.step-1]) | |
best_expansions = [] | |
total = successors.total() | |
for p in successors.most_common(max_branch): | |
new_probability = p[1] / total | |
new_path = evolve_path(path, p[0], new_probability) | |
best_expansions.append(new_path) | |
return best_expansions |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def expand_seed(h0: int, h1: int, | |
max_branch: int = 3, | |
max_length: int = 10) -> list[PredictedPath]: | |
final = [] | |
seed = PredictedPath.from_seed(h0, h1, max_length) | |
paths = expand_path(seed, max_branch) | |
while len(final) < max_branch: | |
expanded = [] | |
for p in paths: | |
expanded.extend(expand_path(p, max_branch=max_branch)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class PredictedPath(): | |
def __init__(self, | |
probability: float = 1.0, | |
step: int = 1, | |
size: int = 0): | |
self.probability = probability | |
self.step = step | |
self.size = size | |
self.array: np.ndarray = np.zeros(size, dtype=int) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def predict(max_branch: int = 3, | |
max_length: int = 10) -> FeatureGroup | None: | |
fg = folium.FeatureGroup(name="polylines") | |
if "token_list" in st.session_state: | |
token_list = st.session_state["token_list"] | |
seed = token_list[-3:-1] | |
if len(seed) > 1: | |
paths = expand_seed(seed[0], seed[1], | |
max_branch=max_branch, |
NewerOlder