Skip to content

Instantly share code, notes, and snippets.

@kuanb
Created September 15, 2020 06:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kuanb/a45b65c3135dce717497643e7f35f0ab to your computer and use it in GitHub Desktop.
Save kuanb/a45b65c3135dce717497643e7f35f0ab to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"DEBUG:root:test\n"
]
}
],
"source": [
"import logging\n",
"import partridge as ptg\n",
"\n",
"# capture logs in notebook\n",
"logger = logging.getLogger()\n",
"logger.setLevel(logging.DEBUG)\n",
"logging.debug(\"test\")\n",
"\n",
"# load a GTFS of AC Transit\n",
"path = 'gtfs.zip'\n",
"_date, service_ids = ptg.read_busiest_date(path)\n",
"view = {'trips.txt': {'service_id': service_ids}}\n",
"feed = ptg.load_feed(path, view)"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"DEBUG:shapely.geos:Found GEOS DLL: <CDLL '/usr/local/lib/python3.6/site-packages/shapely/.libs/libgeos_c-5031f9ac.so.1.13.1', handle 55a8d7001750 at 0x7f9001af0128>, using it.\n",
"DEBUG:shapely.geos:Trying `CDLL(libc.so.6)`\n",
"DEBUG:shapely.geos:Library path: 'libc.so.6'\n",
"DEBUG:shapely.geos:DLL: <CDLL 'libc.so.6', handle 7f9027085000 at 0x7f9001af0320>\n",
"DEBUG:fiona.env:GDAL_DATA not found in environment, set to '/usr/local/lib/python3.6/site-packages/fiona/gdal_data'.\n",
"DEBUG:fiona.env:PROJ data files are available at built-in paths\n",
"DEBUG:fiona.env:Entering env context: <fiona.env.Env object at 0x7f8fea786748>\n",
"DEBUG:fiona.env:Starting outermost env\n",
"DEBUG:fiona.env:No GDAL environment exists\n",
"DEBUG:fiona.env:New GDAL environment <fiona._env.GDALEnv object at 0x7f8fea7867b8> created\n",
"DEBUG:fiona._env:Logging error handler pushed.\n",
"DEBUG:fiona._env:All drivers registered.\n",
"DEBUG:fiona._env:GDAL_DATA found in environment: '/usr/local/lib/python3.6/site-packages/fiona/gdal_data'.\n",
"DEBUG:fiona._env:PROJ data files are available at built-in paths\n",
"DEBUG:fiona._env:Started GDALEnv <fiona._env.GDALEnv object at 0x7f8fea7867b8>.\n",
"DEBUG:fiona.env:Updated existing <fiona._env.GDALEnv object at 0x7f8fea7867b8> with options {}\n",
"DEBUG:fiona.env:Entered env context: <fiona.env.Env object at 0x7f8fea786748>\n",
"DEBUG:fiona.env:Exiting env context: <fiona.env.Env object at 0x7f8fea786748>\n",
"DEBUG:fiona.env:Cleared existing <fiona._env.GDALEnv object at 0x7f8fea7867b8> options\n",
"DEBUG:fiona._env:Stopping GDALEnv <fiona._env.GDALEnv object at 0x7f8fea7867b8>.\n",
"DEBUG:fiona._env:Error handler popped.\n",
"DEBUG:fiona._env:Stopped GDALEnv <fiona._env.GDALEnv object at 0x7f8fea7867b8>.\n",
"DEBUG:fiona.env:Exiting outermost env\n",
"DEBUG:fiona.env:Exited env context: <fiona.env.Env object at 0x7f8fea786748>\n",
"/usr/local/lib/python3.6/site-packages/pyproj/crs/crs.py:53: FutureWarning: '+init=<authority>:<code>' syntax is deprecated. '<authority>:<code>' is the preferred initialization method. When making the change, be mindful of axis order changes: https://pyproj4.github.io/pyproj/stable/gotchas.html#axis-order-changes-in-proj-6\n",
" return _prepare_from_string(\" \".join(pjargs))\n"
]
}
],
"source": [
"import geopandas as gpd\n",
"import pyproj\n",
"from shapely.geometry import Point\n",
"\n",
"# convert all known stops in the schedule to shapes in a GeoDataFrame\n",
"gdf = gpd.GeoDataFrame(\n",
" {\"stop_id\": feed.stops.stop_id.tolist()},\n",
" geometry=[\n",
" Point(lon, lat)\n",
" for lat, lon in zip(\n",
" feed.stops.stop_lat,\n",
" feed.stops.stop_lon)])\n",
"gdf = gdf.set_index(\"stop_id\")\n",
"gdf.crs = {'init': 'epsg:4326'}\n",
"\n",
"# re-cast to meter-based projection to allow for distance calculations\n",
"aeqd = pyproj.Proj(\n",
" proj='aeqd',\n",
" ellps='WGS84',\n",
" datum='WGS84',\n",
" lat_0=gdf.iloc[0].geometry.centroid.y,\n",
" lon_0=gdf.iloc[0].geometry.centroid.x).srs\n",
"gdf = gdf.to_crs(crs=aeqd)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"# let's use this example origin and destination\n",
"# to find the time it would take to go from one to another\n",
"from_stop_name = \"Santa Clara Av & Mozart St\"\n",
"to_stop_name = \"10th Avenue SB\"\n",
"\n",
"# QA: we know the best way to connect these two is the 51A -> 1T\n",
"# if we depart at 8:30 AM, schedule should suggest:\n",
"# take 51A 8:37 - 8:49\n",
"# make walk connection\n",
"# take 1T 8:56 - 9:03\n",
"# total travel time: 26 minutes\n",
"\n",
"# look at all trips from that stop that are after the depart time\n",
"departure_secs = 8.5 * 60 * 60\n",
"\n",
"# get all information, including the stop ids, for the start and end nodes\n",
"from_stop = feed.stops[feed.stops.stop_name == from_stop_name].head(1).squeeze()\n",
"to_stop = feed.stops[[\"10th Avenue\" in f for f in feed.stops.stop_name]].head(1).squeeze()\n",
"\n",
"# extract just the stop ids\n",
"from_stop_id = from_stop.stop_id\n",
"to_stop_id = to_stop.stop_id"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [],
"source": [
"from copy import copy\n",
"from typing import Any\n",
"from typing import Dict\n",
"from typing import List\n",
"from typing import Optional\n",
"from typing import Union\n",
"\n",
"# assume all xfers are 3 minutes\n",
"TRANSFER_COST = (5 * 60)\n",
"\n",
"\n",
"class StopAccessState:\n",
" def __init__(self, origin_stop_id: str):\n",
" \"\"\"State tracker for stop ids.\"\"\"\n",
" self._stops = {}\n",
"\n",
" # initialize the origin node with no prior trip history\n",
" self._origin = origin_stop_id\n",
" self.try_add_update(self._origin, 0)\n",
" \n",
" def all_stops(self):\n",
" return list(self._stops.keys())\n",
" \n",
" def has_stop(self, stop_id: str):\n",
" return stop_id in self._stops.keys()\n",
" \n",
" def get_stop(self, stop_id: str):\n",
" return self._stops[stop_id]\n",
" \n",
" def try_add_update(\n",
" self,\n",
" stop_id: str,\n",
" time_to_reach: Union[int, float],\n",
" trip_id: Optional[str]=None,\n",
" preceding_path: Optional[List[str]]=None,\n",
" ) -> bool:\n",
" # initialize return object\n",
" did_update = False\n",
"\n",
" if stop_id in self._stops.keys():\n",
" if self._stops[stop_id][\"time_to_reach\"] > time_to_reach:\n",
" # update the stop access attributes\n",
" self._stops[stop_id][\"time_to_reach\"] = time_to_reach\n",
" did_update = True\n",
"\n",
" else:\n",
" self._stops[stop_id] = {\n",
" \"time_to_reach\": time_to_reach,\n",
" \"preceding\": [], # initialize with no past paths\n",
" }\n",
" did_update = True\n",
"\n",
" if did_update:\n",
" # override if a preceding path is provided\n",
" if preceding_path:\n",
" self._stops[stop_id][\"preceding\"] = preceding_path\n",
"\n",
" # add current trip id to the path of trips taken, avoiding dupes\n",
" if trip_id is not None and (\n",
" len(self._stops[stop_id][\"preceding\"]) == 0 or\n",
" trip_id != self._stops[stop_id][\"preceding\"][-1]):\n",
" self._stops[stop_id][\"preceding\"].append(trip_id)\n",
" \n",
" return did_update\n",
"\n",
"\n",
"def get_trip_ids_for_stop(feed, stop_id: str, departure_time: int):\n",
" \"\"\"Takes a stop and departure time and get associated trip ids.\"\"\"\n",
" mask_1 = feed.stop_times.stop_id == stop_id\n",
" mask_2 = feed.stop_times.departure_time >= departure_time\n",
"\n",
" # extract the list of qualifying trip ids\n",
" potential_trips = feed.stop_times[mask_1 & mask_2].trip_id.unique().tolist()\n",
" return potential_trips\n",
"\n",
"\n",
"def stop_times_for_kth_trip(\n",
" stops_state: StopAccessState,\n",
" last_updated_stops: List[str],\n",
") -> None:\n",
" # sort stops into their associated groups\n",
" trip_stop_pairings = {}\n",
" for ref_stop_id in last_updated_stops:\n",
" # find all trips already related to this stop\n",
" associated_trips = stop_state.get_stop(ref_stop_id)[\"preceding\"]\n",
"\n",
" # find all qualifying trips assocaited with this stop\n",
" potential_trips = get_trip_ids_for_stop(feed, ref_stop_id, departure_secs)\n",
" for potential_trip in potential_trips:\n",
" # pass on trips that are already addressed\n",
" if potential_trip in associated_trips:\n",
" continue\n",
" \n",
" if potential_trip in trip_stop_pairings.keys():\n",
" trip_stop_pairings[potential_trip].append(ref_stop_id)\n",
" else:\n",
" trip_stop_pairings[potential_trip] = [ref_stop_id]\n",
"\n",
" # iterate through trips with grouped stops in them\n",
" for trip_id in trip_stop_pairings:\n",
" stop_ids = trip_stop_pairings[trip_id]\n",
"\n",
" # get all the stop time arrivals for that trip\n",
" stop_times_sub = feed.stop_times[feed.stop_times.trip_id == trip_id]\n",
" stop_times_sub = stop_times_sub.sort_values(by=\"stop_sequence\", ascending=True)\n",
" \n",
" # find all stop ids that are in this stop ordering and pick last on route path\n",
" stop_times_mask = stop_times_sub.stop_id.isin(stop_ids)\n",
" target_stops = stop_times_sub[stop_times_mask].sort_values(by=\"stop_sequence\", ascending=True)\n",
" \n",
" # get the \"hop on\" point\n",
" from_here = target_stops.tail(1).squeeze() \n",
" ref_stop_id = from_here.stop_id\n",
" \n",
" # are we continuing from some previous path of trips?\n",
" ref_stop_state = stops_state.get_stop(ref_stop_id)\n",
" preceding_path = ref_stop_state[\"preceding\"]\n",
"\n",
" # how long it took to get to the stop so far (0 for start node)\n",
" baseline_cost = ref_stop_state[\"time_to_reach\"]\n",
"\n",
" # get all following stops\n",
" stop_times_after_mask = stop_times_sub.stop_sequence >= from_here.stop_sequence\n",
" stop_times_after = stop_times_sub[stop_times_after_mask]\n",
"\n",
" # for all following stops, calculate time to reach\n",
" arrivals_zip = zip(stop_times_after.arrival_time, stop_times_after.stop_id)\n",
" for arrive_time, arrive_stop_id in arrivals_zip:\n",
" # time to reach is diff from start time to arrival (plus any baseline cost)\n",
" arrive_time_adjusted = arrive_time - departure_secs + baseline_cost\n",
" stops_state.try_add_update(\n",
" arrive_stop_id,\n",
" arrive_time_adjusted,\n",
" trip_id,\n",
" preceding_path)\n",
"\n",
"\n",
"def add_footpath_transfers(\n",
" stops_state: StopAccessState,\n",
" stops_gdf: gpd.GeoDataFrame,\n",
" already_processed_stops: List[str],\n",
" transfer_cost=TRANSFER_COST,\n",
") -> List[str]:\n",
" # initialize a return object\n",
" updated_stop_ids = []\n",
"\n",
" # add in transfers to nearby stops\n",
" stop_ids = stop_state.all_stops()\n",
" for stop_id in stop_ids:\n",
" # no need to re-intersect already done stops\n",
" if stop_id in already_processed_stops:\n",
" continue\n",
"\n",
" stop_pt = stops_gdf.loc[stop_id].geometry\n",
"\n",
" # TODO: parameterize? transfer within .2 miles\n",
" meters_in_miles = 1610\n",
" qual_area = stop_pt.buffer(meters_in_miles/5)\n",
" \n",
" # get all stops within a short walk of target stop\n",
" mask = stops_gdf.intersects(qual_area)\n",
"\n",
" # time to reach new nearby stops is the transfer cost plus arrival at last stop\n",
" ref_stop_state = stops_state.get_stop(stop_id)\n",
" arrive_time_adjusted = ref_stop_state[\"time_to_reach\"] + TRANSFER_COST\n",
"\n",
" last_trip_id = None\n",
" if len(ref_stop_state[\"preceding\"]):\n",
" last_trip_id = ref_stop_state[\"preceding\"][-1]\n",
"\n",
" # only update if currently inaccessible or faster than currrent option\n",
" for arrive_stop_id, row in stops_gdf[mask].iterrows():\n",
" did_update = stops_state.try_add_update(\n",
" arrive_stop_id,\n",
" arrive_time_adjusted,\n",
" last_trip_id)\n",
"\n",
" if did_update:\n",
" updated_stop_ids.append(arrive_stop_id)\n",
" \n",
" return updated_stop_ids"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:root:\n",
"Analyzing possibilities with 0 transfers\n",
"INFO:root:\tinital qualifying stop ids count: 1\n",
"INFO:root:\tstop times calculated in 0.9049 seconds\n",
"INFO:root:\t\t28 stop ids added\n",
"INFO:root:\tfootpath transfers calculated in 0.7186 seconds\n",
"INFO:root:\t\t103 stop ids added\n",
"INFO:root:\talready processed count increased from 0 to 1\n",
"INFO:root:\tnew stops to process: 103\n",
"INFO:root:\n",
"Analyzing possibilities with 1 transfers\n",
"INFO:root:\tinital qualifying stop ids count: 132\n",
"INFO:root:\tstop times calculated in 22.9711 seconds\n",
"INFO:root:\t\t782 stop ids added\n",
"INFO:root:\tfootpath transfers calculated in 20.6031 seconds\n",
"INFO:root:\t\t1077 stop ids added\n",
"INFO:root:\talready processed count increased from 1 to 104\n",
"INFO:root:\tnew stops to process: 1547\n",
"INFO:root:\n",
"Analyzing possibilities with 2 transfers\n",
"INFO:root:\tinital qualifying stop ids count: 1991\n",
"INFO:root:\tstop times calculated in 52.9557 seconds\n",
"INFO:root:\t\t407 stop ids added\n",
"INFO:root:\tfootpath transfers calculated in 50.7413 seconds\n",
"INFO:root:\t\t517 stop ids added\n",
"INFO:root:\talready processed count increased from 104 to 1651\n",
"INFO:root:\tnew stops to process: 940\n",
"INFO:root:Time to destination: 35.5 minutes\n"
]
}
],
"source": [
"import time\n",
"\n",
"# initialize lookup with start node taking 0 seconds to reach\n",
"stop_state = StopAccessState(from_stop_id)\n",
"\n",
"# setting transfer limit at 1\n",
"TRANSFER_LIMIT = 2\n",
"\n",
"already_processed_xfers = []\n",
"just_updated_stops = [from_stop_id]\n",
"for k in range(TRANSFER_LIMIT + 1):\n",
" logger.info(\"\\nAnalyzing possibilities with {} transfers\".format(k))\n",
" \n",
" stop_ids = stop_state.all_stops()\n",
" logger.info(\"\\tinital qualifying stop ids count: {}\".format(len(stop_ids)))\n",
" \n",
" # update time to stops calculated based on stops accessible\n",
" tic = time.perf_counter()\n",
" stop_times_for_kth_trip(stop_state, just_updated_stops)\n",
" toc = time.perf_counter()\n",
" logger.info(\"\\tstop times calculated in {:0.4f} seconds\".format(toc - tic))\n",
"\n",
" added_keys_count = len(stop_state.all_stops()) - len(stop_ids)\n",
" logger.info(\"\\t\\t{} stop ids added\".format(added_keys_count))\n",
" \n",
" # reset stop_ids count\n",
" stop_ids = stop_state.all_stops()\n",
" \n",
" # now add footpath transfers and update\n",
" tic = time.perf_counter()\n",
" just_updated_stops_temp = just_updated_stops\n",
" just_updated_stops = add_footpath_transfers(stop_state, gdf, already_processed_xfers)\n",
" toc = time.perf_counter()\n",
" logger.info(\"\\tfootpath transfers calculated in {:0.4f} seconds\".format(toc - tic))\n",
"\n",
" added_keys_count = len(stop_state.all_stops()) - len(stop_ids)\n",
" logger.info(\"\\t\\t{} stop ids added\".format(added_keys_count))\n",
"\n",
" logger.info(\"\\talready processed count increased from {} to {}\".format(\n",
" len(already_processed_xfers),\n",
" len(already_processed_xfers + just_updated_stops_temp),\n",
" ))\n",
" logger.info(\"\\tnew stops to process: {}\".format(len(just_updated_stops)))\n",
" already_processed_xfers += just_updated_stops_temp\n",
" just_updated_stops_temp = None\n",
" \n",
"assert stop_state.has_stop(to_stop_id), \"Unable to find route to destination within transfer limit\"\n",
"logger.info(\"Time to destination: {} minutes\".format(stop_state.get_stop(to_stop_id)[\"time_to_reach\"]/60))"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"''"
]
},
"execution_count": 24,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"\"\"\"\n",
"Original runtime increased with each iteration. This was due to repetitive identification and processing of stop and trips that were already handled, as well as expensive geo-operations on stops done repeatedly. By leveraging some simple caching patterns, we can avoid those and shift the cost of increased transfers to become have a more linear cost increase. That is, each subsequence trasnfer/iteration would only be calculating the new stops identified, not all of the prior ones, plus the new ones, cumulatively.\n",
"\n",
"```\n",
"Analyzing possibilities with 0 transfers\n",
" inital qualifying stop ids count: 1\n",
" stop times calculated in 0.9308 seconds\n",
" 28 stop ids added\n",
" footpath transfers calculated in 0.6707 seconds\n",
" 103 stop ids added\n",
"\n",
"Analyzing possibilities with 1 transfers\n",
" inital qualifying stop ids count: 132\n",
" stop times calculated in 24.1655 seconds\n",
" 782 stop ids added\n",
" footpath transfers calculated in 20.1302 seconds\n",
" 1077 stop ids added\n",
"\n",
"Analyzing possibilities with 2 transfers\n",
" inital qualifying stop ids count: 1991\n",
" stop times calculated in 60.4546 seconds\n",
" 406 stop ids added\n",
" footpath transfers calculated in 52.8831 seconds\n",
" 517 stop ids added\n",
"Time to destination: 35.5 minutes\n",
"```\n",
"\n",
"Unfortunately, the imapct of caching is fairly limited because there is a great deal of \"fan-out.\" That is, with each iteration (each additional set of transfers allowed), the number of possible stops to consider increases signficantly, procuding ever greater numbers of possble routes and trips to consder, that were inaccessible in the prior step (or transfer-iteration). Below is an example demonstrating how the caching effort produces marginal improvements in runtime of the algorithm.\n",
"\n",
"```\n",
"\n",
"Analyzing possibilities with 0 transfers\n",
" inital qualifying stop ids count: 1\n",
" stop times calculated in 0.9049 seconds\n",
" 28 stop ids added\n",
" footpath transfers calculated in 0.7186 seconds\n",
" 103 stop ids added\n",
" already processed count increased from 0 to 1\n",
" new stops to process: 103\n",
"\n",
"Analyzing possibilities with 1 transfers\n",
" inital qualifying stop ids count: 132\n",
" stop times calculated in 22.9711 seconds\n",
" 782 stop ids added\n",
" footpath transfers calculated in 20.6031 seconds\n",
" 1077 stop ids added\n",
" already processed count increased from 1 to 104\n",
" new stops to process: 1547\n",
"\n",
"Analyzing possibilities with 2 transfers\n",
" inital qualifying stop ids count: 1991\n",
" stop times calculated in 52.9557 seconds\n",
" 407 stop ids added\n",
" footpath transfers calculated in 50.7413 seconds\n",
" 517 stop ids added\n",
" already processed count increased from 104 to 1651\n",
" new stops to process: 940\n",
"Time to destination: 35.5 minutes\n",
"```\n",
"\"\"\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.6"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
@danielsclint
Copy link

danielsclint commented Jul 21, 2021

Hi Kuan- This Gist was super helpful for understanding the algorithms presented in the Microsoft paper. In your article, you mentioned, " performance speed up is rather marginal because of the high level of fan-out that occurs on later iterations of transfers."

I implemented your code, and I refactored it to take better advantage of Pandas / Numpy vectorization. The bottlenecks that you experienced were a results of the for loops and the number of times you are slicing the dataframes. If you eliminate the looping, you get a significant time savings. In my simple example for the Sacramento RTD GTFS feed, the code runs in less than a second for 3 transfers. I'm sure additional enhancements are possible with Numpy.

I also got significant performance improvements from caching the walk transfers as you suggested. However, you could still get really quick results with real-time transfer processing if you eliminate the Geopandas intersect methods. Instead, since these are distance calculations (either polar or cartesian) you can quickly create a stop-to-stop cross join and do the distance calculation with simple arithmetic (trigonometry) to get distances between stops and filter to the appropriate lengths. Using the arithmetic approach, I can calculate all of the stop-to-stop transfers in just a second or two for Sacramento.

In any event, I wanted to say thank you for putting this example out here, and it was super helpful.

Clint

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment