Skip to content

Instantly share code, notes, and snippets.

@CloudNiner
Last active July 13, 2020 19:55
Show Gist options
  • Save CloudNiner/8b020f434d0509419032c6b2fe0577c5 to your computer and use it in GitHub Desktop.
Save CloudNiner/8b020f434d0509419032c6b2fe0577c5 to your computer and use it in GitHub Desktop.
OSM GeoJson by Bounding Box and Tags via OSMX
"""
This demo uses OSMExpress and the s2sphere library to query a OSMX db by bounding box.
It writes a GeoJson FeatureCollection of all nodes and ways that matched the tag query to stdout
TODO: Relation support
Example:
```
python3 demo.py ./data/pennsylvania.osmx \
--bbox "-75.255188,40.021212,-75.217095,40.050702" \
--tag "surface=gravel" > osm_manayunk_pa_gravel.geojson
```
requirements.txt:
```
click==7.1.2
geojson==2.5.0
osmx==0.0.4
s3sphere==0.2.5
```
"""
from collections import namedtuple
from itertools import zip_longest
import logging
import sys
import click
import geojson
from geojson import Feature, FeatureCollection, LineString, Point
import osmx
import s2sphere
logging.basicConfig(format="%(message)s")
logger = logging.getLogger(__name__)
# logger.setLevel(logging.DEBUG)
# id: int, location: (lat, lon, version), tags: dict, metadata: dict
Node = namedtuple("Node", ["id", "location", "tags", "metadata"])
# id: int, nodes: [int], tags: dict, metadata: dict
Way = namedtuple("Way", ["id", "nodes", "tags", "metadata"])
def grouper(n, iterable, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
# grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
def match_tags(feature_tags, match_tags):
""" Return True if any of match_tags match node_tags, otherwise False.
TODO: Probably a waaaay nice solution in combination with osmx.tag_dict()
feature_tags: [str] of the form [key, value, ...].
match_tags: [str] of the form ["key[=value]", ...] where value is optional.
Example ["highway=crossing", "maxheight"].
"""
for t in match_tags:
k, _, v = t.partition("=")
for node_k, node_v in grouper(2, feature_tags, fillvalue=""):
if node_k == k and (node_v == v or v == ""):
return True
return False
@click.command()
@click.argument(
"osmx_database", type=click.Path(exists=True),
)
@click.option(
"--bbox", type=str,
)
@click.option(
"-t",
"--tag",
multiple=True,
help="Optionally filter by tag. Provide '<key>' or '<key>=<value>'.",
)
def main(osmx_database, bbox, tag):
"""Query OSMX_DATABASE for objects in BBOX that match tags."""
logger.debug("{} - {} - {}".format(osmx_database, bbox, tag))
minlon, minlat, maxlon, maxlat = bbox.split(",")
p1 = s2sphere.LatLng.from_degrees(float(minlat), float(minlon))
p2 = s2sphere.LatLng.from_degrees(float(maxlat), float(maxlon))
region = s2sphere.LatLngRect.from_point_pair(p1, p2)
coverer = s2sphere.RegionCoverer()
# Values from https://github.com/protomaps/OSMExpress/blob/master/src/extract.cpp#L131-L132
# TODO: Smartly query s2 levels so we're not looking at a bunch of small boxes
coverer.max_cells = 1024
coverer.min_level = 16
coverer.max_level = 16
cell_ids = coverer.get_covering(region)
logger.debug("BBox covers {} cells.".format(len(cell_ids)))
env = osmx.Environment(osmx_database)
with osmx.Transaction(env) as txn:
node_ids = []
cell_node_index = osmx.Index(txn, b"cell_node")
for cell_id in cell_ids:
node_ids.extend(cell_node_index.get(cell_id.id()))
location_index = osmx.Locations(txn)
node_index = osmx.Nodes(txn)
# Find nodes with requested tags
nodes = []
for node_id in node_ids:
osmx_node = node_index.get(node_id)
if osmx_node is not None and match_tags(osmx_node.tags, tag):
osmx_node_dict = osmx_node.to_dict()
nodes.append(
Node(
id=node_id,
location=location_index.get(node_id),
tags=osmx.tag_dict(osmx_node_dict["tags"]),
metadata=osmx_node_dict["metadata"],
)
)
logger.debug("Matched {} nodes".format(len(nodes)))
# Find ways with requested tags
node_way_index = osmx.NodeWay(txn)
way_table = osmx.Ways(txn)
ways = []
for node_id in node_ids:
node_way_ids = node_way_index.get(node_id)
for way_id in node_way_ids:
osmx_way = way_table.get(way_id)
if osmx_way is not None and match_tags(osmx_way.tags, tag):
osmx_way_dict = osmx_way.to_dict()
way = Way(
id=way_id,
nodes=osmx_way_dict["nodes"],
tags=osmx.tag_dict(osmx_way_dict["tags"]),
metadata=osmx_way_dict["metadata"],
)
ways.append(way)
logger.debug("Matched {} ways".format(len(ways)))
# OSM Objects -> Features
features = [
Feature(
id=n.id,
geometry=Point((n.location[0], n.location[1])),
properties={"metadata": n.metadata, "tags": n.tags},
)
for n in nodes
]
for way in ways:
locations = [location_index.get(l) for l in way.nodes]
# TODO: Detect area from tags and create Polygon instead
geometry = LineString([(l[1], l[0]) for l in locations])
features.append(
Feature(
id=way.id,
geometry=geometry,
properties={"metadata": way.metadata, "tags": way.tags},
)
)
# Write GeoJson
sys.stdout.write(geojson.dumps(FeatureCollection(features)))
if __name__ == "__main__":
main()
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment