Last active
July 13, 2020 19:55
-
-
Save CloudNiner/8b020f434d0509419032c6b2fe0577c5 to your computer and use it in GitHub Desktop.
OSM GeoJson by Bounding Box and Tags via OSMX
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This demo uses OSMExpress and the s2sphere library to query a OSMX db by bounding box. | |
It writes a GeoJson FeatureCollection of all nodes and ways that matched the tag query to stdout | |
TODO: Relation support | |
Example: | |
``` | |
python3 demo.py ./data/pennsylvania.osmx \ | |
--bbox "-75.255188,40.021212,-75.217095,40.050702" \ | |
--tag "surface=gravel" > osm_manayunk_pa_gravel.geojson | |
``` | |
requirements.txt: | |
``` | |
click==7.1.2 | |
geojson==2.5.0 | |
osmx==0.0.4 | |
s3sphere==0.2.5 | |
``` | |
""" | |
from collections import namedtuple | |
from itertools import zip_longest | |
import logging | |
import sys | |
import click | |
import geojson | |
from geojson import Feature, FeatureCollection, LineString, Point | |
import osmx | |
import s2sphere | |
logging.basicConfig(format="%(message)s") | |
logger = logging.getLogger(__name__) | |
# logger.setLevel(logging.DEBUG) | |
# id: int, location: (lat, lon, version), tags: dict, metadata: dict | |
Node = namedtuple("Node", ["id", "location", "tags", "metadata"]) | |
# id: int, nodes: [int], tags: dict, metadata: dict | |
Way = namedtuple("Way", ["id", "nodes", "tags", "metadata"]) | |
def grouper(n, iterable, fillvalue=None): | |
"Collect data into fixed-length chunks or blocks" | |
# grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx" | |
args = [iter(iterable)] * n | |
return zip_longest(*args, fillvalue=fillvalue) | |
def match_tags(feature_tags, match_tags): | |
""" Return True if any of match_tags match node_tags, otherwise False. | |
TODO: Probably a waaaay nice solution in combination with osmx.tag_dict() | |
feature_tags: [str] of the form [key, value, ...]. | |
match_tags: [str] of the form ["key[=value]", ...] where value is optional. | |
Example ["highway=crossing", "maxheight"]. | |
""" | |
for t in match_tags: | |
k, _, v = t.partition("=") | |
for node_k, node_v in grouper(2, feature_tags, fillvalue=""): | |
if node_k == k and (node_v == v or v == ""): | |
return True | |
return False | |
@click.command() | |
@click.argument( | |
"osmx_database", type=click.Path(exists=True), | |
) | |
@click.option( | |
"--bbox", type=str, | |
) | |
@click.option( | |
"-t", | |
"--tag", | |
multiple=True, | |
help="Optionally filter by tag. Provide '<key>' or '<key>=<value>'.", | |
) | |
def main(osmx_database, bbox, tag): | |
"""Query OSMX_DATABASE for objects in BBOX that match tags.""" | |
logger.debug("{} - {} - {}".format(osmx_database, bbox, tag)) | |
minlon, minlat, maxlon, maxlat = bbox.split(",") | |
p1 = s2sphere.LatLng.from_degrees(float(minlat), float(minlon)) | |
p2 = s2sphere.LatLng.from_degrees(float(maxlat), float(maxlon)) | |
region = s2sphere.LatLngRect.from_point_pair(p1, p2) | |
coverer = s2sphere.RegionCoverer() | |
# Values from https://github.com/protomaps/OSMExpress/blob/master/src/extract.cpp#L131-L132 | |
# TODO: Smartly query s2 levels so we're not looking at a bunch of small boxes | |
coverer.max_cells = 1024 | |
coverer.min_level = 16 | |
coverer.max_level = 16 | |
cell_ids = coverer.get_covering(region) | |
logger.debug("BBox covers {} cells.".format(len(cell_ids))) | |
env = osmx.Environment(osmx_database) | |
with osmx.Transaction(env) as txn: | |
node_ids = [] | |
cell_node_index = osmx.Index(txn, b"cell_node") | |
for cell_id in cell_ids: | |
node_ids.extend(cell_node_index.get(cell_id.id())) | |
location_index = osmx.Locations(txn) | |
node_index = osmx.Nodes(txn) | |
# Find nodes with requested tags | |
nodes = [] | |
for node_id in node_ids: | |
osmx_node = node_index.get(node_id) | |
if osmx_node is not None and match_tags(osmx_node.tags, tag): | |
osmx_node_dict = osmx_node.to_dict() | |
nodes.append( | |
Node( | |
id=node_id, | |
location=location_index.get(node_id), | |
tags=osmx.tag_dict(osmx_node_dict["tags"]), | |
metadata=osmx_node_dict["metadata"], | |
) | |
) | |
logger.debug("Matched {} nodes".format(len(nodes))) | |
# Find ways with requested tags | |
node_way_index = osmx.NodeWay(txn) | |
way_table = osmx.Ways(txn) | |
ways = [] | |
for node_id in node_ids: | |
node_way_ids = node_way_index.get(node_id) | |
for way_id in node_way_ids: | |
osmx_way = way_table.get(way_id) | |
if osmx_way is not None and match_tags(osmx_way.tags, tag): | |
osmx_way_dict = osmx_way.to_dict() | |
way = Way( | |
id=way_id, | |
nodes=osmx_way_dict["nodes"], | |
tags=osmx.tag_dict(osmx_way_dict["tags"]), | |
metadata=osmx_way_dict["metadata"], | |
) | |
ways.append(way) | |
logger.debug("Matched {} ways".format(len(ways))) | |
# OSM Objects -> Features | |
features = [ | |
Feature( | |
id=n.id, | |
geometry=Point((n.location[0], n.location[1])), | |
properties={"metadata": n.metadata, "tags": n.tags}, | |
) | |
for n in nodes | |
] | |
for way in ways: | |
locations = [location_index.get(l) for l in way.nodes] | |
# TODO: Detect area from tags and create Polygon instead | |
geometry = LineString([(l[1], l[0]) for l in locations]) | |
features.append( | |
Feature( | |
id=way.id, | |
geometry=geometry, | |
properties={"metadata": way.metadata, "tags": way.tags}, | |
) | |
) | |
# Write GeoJson | |
sys.stdout.write(geojson.dumps(FeatureCollection(features))) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment