-
-
Save eginhard/fed591b71febd48b964300f63f20d2c2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
This script counts how many Swiss base network hiking routes run between | |
named guideposts. Should be run on a .osm.pbf file covering Switzerland. | |
It takes into account all yellow, red, and blue base network routes for which a | |
start and end location are tagged (with from/to, osmc:symbol, note or name | |
tags). Routes with a fixme tag or "fixme" start/end points are ignored. | |
For the start and end location of a route, it finds the minimum distance of any | |
guidepost with that name to any start and end node of a way in the route (i.e. it | |
doesn't assume routes are sorted or the guidepost is located exactly at the start/end). | |
Guidepost name matching is case-insensitive, but could be further improved by | |
using the Levenshtein distance to account for small spelling differences.""" | |
import sys | |
from collections import defaultdict | |
import osmium as o | |
import shapely.wkt as wkt | |
from shapely.geometry import LineString | |
from shapely.geometry import Point | |
from geopy.distance import great_circle | |
WKTFAB = o.geom.WKTFactory() | |
MAX_DISTANCE = 10000000 | |
NEARBY_DISTANCE = 50 # meters | |
SYMBOLS = ["yellow::yellow_diamond", "red:white:red_bar", "blue:white:blue_bar"] | |
FIXME_VALUES = ("fixme", "?") | |
class GuidepostHandler(o.SimpleHandler): | |
"""Finds all named guideposts.""" | |
def __init__(self): | |
super().__init__() | |
self.guideposts = defaultdict(list) | |
def node(self, n): | |
if ( | |
n.tags.get("tourism") == "information" | |
and n.tags.get("information") == "guidepost" | |
and "name" in n.tags | |
): | |
# List of locations because multiple guideposts can share a name | |
self.guideposts[n.tags["name"].lower()].append( | |
WKTFAB.create_point(n.location) | |
) | |
class RouteHandler(o.SimpleHandler): | |
"""Filters base network hiking routes.""" | |
def __init__(self): | |
super().__init__() | |
self.routes = [] | |
self.routeways = defaultdict(list) | |
self.ways = set() | |
self.total = 0 | |
self.fixme = 0 | |
def get_start_end(self, tags): | |
"""Return start/end locations of a route.""" | |
if "from" in tags and "to" in tags: | |
return tags["from"], tags["to"] | |
for key in ("osmc:name", "note", "name"): | |
if key in tags and tags[key].count(" - ") == 1: | |
return tags[key].split(" - ", 1) | |
for key in ("osmc:name", "note", "name"): | |
if key in tags and tags[key].count("-") == 1: | |
return tags[key].split("-") | |
return None, None | |
def relation(self, r): | |
if ( | |
r.tags.get("route") == "hiking" | |
and r.tags.get("network") == "lwn" | |
and r.tags.get("osmc:symbol") in SYMBOLS | |
): | |
self.total += 1 | |
start, end = self.get_start_end(r.tags) | |
# Exclude routes without tagged start/end or with fixme | |
if ( | |
"fixme" in r.tags | |
or start is not None | |
and start.lower() in FIXME_VALUES | |
or end is not None | |
and end.lower() in FIXME_VALUES | |
): | |
self.fixme += 1 | |
elif start is not None and end is not None: | |
start = start.lower() | |
end = end.lower() | |
self.routes.append((r.id, start, end)) | |
for member in r.members: | |
if member.type == "w": | |
self.routeways[r.id].append(member.ref) | |
self.ways.add(member.ref) | |
class RouteWayHandler(o.SimpleHandler): | |
"""Finds way geometries of the hiking routes.""" | |
def __init__(self, ways): | |
super().__init__() | |
self.ways = ways | |
self.linestrings = {} | |
def way(self, w): | |
if w.id in self.ways: | |
try: | |
self.linestrings[w.id] = WKTFAB.create_linestring(w) | |
except: | |
print("Could not get geometry of way:", w.id) | |
def main(osmfile): | |
guideposts = GuidepostHandler() | |
guideposts.apply_file(osmfile, locations=True, idx="flex_mem") | |
print("Number of named guideposts:", len(guideposts.guideposts)) | |
routes = RouteHandler() | |
routes.apply_file(osmfile) | |
route_ways = RouteWayHandler(routes.ways) | |
route_ways.apply_file(osmfile, locations=True, idx="flex_mem") | |
def get_min_distance(rid, location_name): | |
"""Return shortest distance of a point on the route to a guidepost of the given name.""" | |
min_distance = MAX_DISTANCE | |
if location_name not in guideposts.guideposts: | |
return min_distance | |
for guidepost in guideposts.guideposts[location_name]: | |
guidepost = Point(wkt.loads(guidepost)) | |
# Compute distance to start and end node of each way in the route | |
for wid in routes.routeways[rid]: | |
try: | |
linestring = LineString(wkt.loads(route_ways.linestrings[wid])) | |
first = linestring.coords[0] | |
last = linestring.coords[-1] | |
min_distance = min( | |
min_distance, | |
great_circle(guidepost.coords, first).meters, | |
great_circle(guidepost.coords, last).meters, | |
) | |
except KeyError: | |
pass | |
return min_distance | |
matching = 0 | |
nearby = 0 | |
for rid, start, end in routes.routes: | |
min_dist_start = get_min_distance(rid, start.lower()) | |
min_dist_end = get_min_distance(rid, end.lower()) | |
for dist in (min_dist_start, min_dist_end): | |
if dist < MAX_DISTANCE: | |
matching += 1 | |
if dist < NEARBY_DISTANCE: | |
nearby += 1 | |
start_dist = "?" if min_dist_start == MAX_DISTANCE else f"{min_dist_start:.0f}m" | |
end_dist = "?" if min_dist_end == MAX_DISTANCE else f"{min_dist_end:.0f}m" | |
# print(f"{rid}: {start} ({start_dist}) -> {end} ({end_dist})") | |
print( | |
f"{len(routes.routes)}/{routes.total} routes have tagged start and end" | |
f"points and no fixme tag ({routes.fixme} have a fixme)." | |
) | |
print( | |
f"Matching guidepost exists for {matching}/{2*len(routes.routes)} route start/end points." | |
) | |
print(f"{nearby}/{matching} guideposts are within {NEARBY_DISTANCE}m of a route.") | |
if __name__ == "__main__": | |
if len(sys.argv) != 2: | |
print("Usage: python %s <osmfile>" % sys.argv[0]) | |
sys.exit(-1) | |
exit(main(sys.argv[1])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment