Skip to content

Instantly share code, notes, and snippets.

@aloncohen1
Last active October 31, 2022 16:10
Show Gist options
  • Save aloncohen1/44be2082d13173f007596a2accf108f2 to your computer and use it in GitHub Desktop.
Save aloncohen1/44be2082d13173f007596a2accf108f2 to your computer and use it in GitHub Desktop.
generate_route_signals.ipynb
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"from shapely.ops import linemerge, nearest_points, snap\n",
"from shapely.geometry import LineString, Point, MultiLineString\n",
"import taxicab as tc\n",
"\n",
"def generate_route_signals(route_geo, start_time, sampling_rate=60):\n",
" end_time = start_time + timedelta(minutes=int(np.random.choice(range(15,50))))\n",
" N_POINTS = round((end_time - start_time).total_seconds() / sampling_rate)\n",
" POINTS_PER_SEGMENT = 10\n",
"\n",
" noise_list = np.linspace(0.9999999, 1.000001) # add noise to points\n",
" signal_time = start_time - timedelta(seconds=int(np.random.choice([5,10,15])))\n",
"\n",
" timestamps_list = []\n",
"\n",
" for i in range(0,N_POINTS):\n",
" timestamps_list.append(signal_time)\n",
" signal_time += timedelta(seconds=sampling_rate + int(np.random.choice(range(0,9))))\n",
"\n",
" line_to_points = np.array(\n",
" [\n",
" {'lat': y*np.random.choice(noise_list), 'lng': x*np.random.choice(noise_list)}\n",
" for p1, p2 in zip(route_geo.coords, route_geo.coords[1:]) # iterate through line segments\n",
" for x, y in zip(\n",
" np.linspace(p1[0], p2[0], POINTS_PER_SEGMENT),\n",
" np.linspace(p1[1], p2[1], POINTS_PER_SEGMENT),\n",
" )\n",
" ])\n",
"\n",
"\n",
" indexes = np.sort(np.random.choice(range(0, len(line_to_points)),replace=False,\n",
" size=min(N_POINTS, len(line_to_points)))) \n",
" random_points = line_to_points[indexes]\n",
"\n",
" signals_df = pd.DataFrame([i for i in random_points])\n",
" signals_df['timestamp'] = timestamps_list[:len(signals_df)]\n",
"\n",
" return signals_df, signal_time\n",
"\n",
" \n",
"def get_route_geometry(route, orig, dest):\n",
" x, y = [], []\n",
" for u, v in zip(route[tc.constants.BODY][:-1], route[tc.constants.BODY][1:]):\n",
" # if there are parallel edges, select the shortest in length\n",
" data = min(G.get_edge_data(u, v).values(), key=lambda d: d[\"length\"])\n",
" if \"geometry\" in data:\n",
" # if geometry attribute exists, add all its coords to list\n",
" xs, ys = data[\"geometry\"].xy\n",
" x.extend(xs)\n",
" y.extend(ys)\n",
" else:\n",
" # otherwise, the edge is a straight line from node to node\n",
" x.extend((G.nodes[u][\"x\"], G.nodes[v][\"x\"]))\n",
" y.extend((G.nodes[u][\"y\"], G.nodes[v][\"y\"]))\n",
"\n",
" final_route = []\n",
"\n",
" if route[2]:\n",
" final_route.append(LineString([Point(orig[1], orig[0]),\n",
" nearest_points(Point(orig[1], orig[0]),route[2])[1]]))\n",
" final_route.append(route[2])\n",
" else:\n",
" final_route.append(LineString([Point(orig[1], orig[0]),Point(x[0],y[0])]))\n",
" \n",
" final_route.append(LineString([Point(lng,lat) for lng, lat in list(zip(x, y))]))\n",
"\n",
" if route[3]:\n",
" final_route.append(route[3])\n",
" final_route.append(LineString([nearest_points(Point(dest[1],dest[0]), route[3])[1],\n",
" Point(dest[1],dest[0])]))\n",
"\n",
" else:\n",
" final_route.append(LineString([Point(x[-1],y[-1]), Point(dest[1],dest[0])]))\n",
" \n",
" final_route_geo = linemerge(final_route)\n",
" if isinstance(final_route_geo, MultiLineString):\n",
" final_route_geo = linemerge([snap(i, j,0.000001) for i,j in zip(final_route, final_route[1:])])\n",
" if isinstance(final_route_geo, MultiLineString):\n",
" final_route_geo = list(final_route_geo)[np.argmax(np.array([len(i.coords) for i in list(final_route_geo)]))]\n",
"\n",
" return final_route_geo"
]
}
],
"metadata": {
"colab": {
"collapsed_sections": [],
"provenance": []
},
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.6"
}
},
"nbformat": 4,
"nbformat_minor": 1
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment