Skip to content

Instantly share code, notes, and snippets.

@CloudNiner
Created July 29, 2020 13:10
Show Gist options
  • Save CloudNiner/e5552aaae6c235193b8f196558b92e2c to your computer and use it in GitHub Desktop.
Save CloudNiner/e5552aaae6c235193b8f196558b92e2c to your computer and use it in GitHub Desktop.
Query OSMX by S2Region in Python
import logging
import time
import osmx
import pywraps2 as s2
logging.basicConfig(format="%(message)s")
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
OSMX_S2_CELL_MAX_CELLS = 1024
OSMX_S2_CELL_MAX_LEVEL = 16
def traverse_cell(cell_id, cell_level):
""" Traverse all child cells for cell_id at the desired cell_level. """
start = cell_id.child_begin(cell_level)
end = cell_id.child_end(cell_level)
current_cell = start
while current_cell.id() != end.id():
yield current_cell
current_cell = current_cell.next()
def node_ids_for_region(osmx_txn, s2_region, min_cell_level=None):
""" Get node_ids from osmx within the s2_region. """
start_time = time.time()
cell_node_index = osmx.Index(osmx_txn, b"cell_node")
coverer = s2.S2RegionCoverer()
# Values from https://github.com/protomaps/OSMExpress/blob/master/src/extract.cpp#L131-L132
coverer.set_max_cells(OSMX_S2_CELL_MAX_CELLS)
if isinstance(min_cell_level, int):
coverer.set_min_level(min_cell_level)
coverer.set_max_level(OSMX_S2_CELL_MAX_LEVEL)
s2_cell_ids = coverer.GetCovering(s2_region)
logger.debug("Region covers {} cells.".format(len(s2_cell_ids)))
node_ids = list()
for cell_id in s2_cell_ids:
for max_level_cell_id in traverse_cell(cell_id, OSMX_S2_CELL_MAX_LEVEL):
node_ids.extend(cell_node_index.get(max_level_cell_id.id()))
node_id_set = set(node_ids)
logger.debug(
"Returned {} node_ids in {}s".format(len(node_id_set), time.time() - start_time)
)
return node_id_set
""" Convert various geometries to S2 regions
This method isn't used in this example gist, but it could be used to convert a polygon
for input to search_by_tags, e.g.:
osmx_db = "/path/to/db.osmx"
polygon = ???
tags = ["name=", "highway=primary"]
features = search_by_tags(osmx_db, polygon_to_s2_region(polygon), tags)
"""
import pywraps2 as s2
def polygon_to_s2_region(polygon):
""" Convert a geojson.Polygon to s2.S2Region. """
loops = [
s2.S2Loop(
list(
map(
lambda coord: s2.S2LatLng.FromDegrees(coord[1], coord[0])
.Normalized()
.ToPoint(),
loop[:-1],
)
)
)
for loop in polygon.coordinates
]
for loop in loops:
loop.Normalize()
return s2.S2Polygon(*loops)
from enum import Enum
import logging
from geojson import Feature, LineString, Point
import osmx
from osmx_utils import node_ids_for_region
logging.basicConfig(format="%(message)s")
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
class OSMTypeFilter(Enum):
NODE = "node"
WAY = "way"
ALL = "all"
def match_tags(feature_tags, match_tags):
""" Return True if feature_tags contain all match_tags, otherwise False.
feature_tags: {key: value}
match_tags: [str] of the form ["key[=value]", ...] where value is optional.
Example ["highway=crossing", "maxheight"].
"""
for t in match_tags:
k, _, v = t.partition("=")
if k not in feature_tags:
return False
# Skip value check if one wasn't provided
if v == "":
continue
if feature_tags[k] != v:
return False
return True
def search_by_tags(osmx_database, s2_region, tags, type_filter=OSMTypeFilter.ALL):
""" Yield generator of OSM object Features matching s2_region + tags.
Format:
Nodes:
Feature(id="node/id",
geometry=Point,
properties={"id": number, "type": "node", "metadata": {}, "tags": {}})
Ways:
Feature(id="way/id",
geometry=Point,
properties={
"id": number,
"type": "way",
"nodes": [],
"metadata": {},
"tags": {}
})
TODO: Relation support
osmx_database: abs or rel path to db.osmx file on disk
s2_region: s2.S2Region. Common choices include s2.S2LatLngRect or s2.S2Polygon
tags: array of "key" or "key=value" strings, e.g. ["surface=gravel"]
"""
env = osmx.Environment(osmx_database.as_posix())
with osmx.Transaction(env) as txn:
node_ids = node_ids_for_region(txn, s2_region)
location_index = osmx.Locations(txn)
node_index = osmx.Nodes(txn)
# Yield nodes with requested tags
if type_filter == OSMTypeFilter.NODE or type_filter == OSMTypeFilter.ALL:
node_count = 0
for node_id in node_ids:
osmx_node = node_index.get(node_id)
if osmx_node is None:
continue
osmx_node_tags = osmx.tag_dict(osmx_node.tags)
if match_tags(osmx_node_tags, tags):
location = location_index.get(node_id)
node_count += 1
yield Feature(
id="node/{}".format(node_id),
geometry=Point((location[1], location[0])),
properties={
"id": node_id,
"type": "node",
"metadata": osmx_node.metadata.to_dict(),
"tags": osmx_node_tags,
},
)
logger.debug("Matched {} nodes".format(node_count))
# Yield ways with requested tags
if type_filter == OSMTypeFilter.WAY or type_filter == OSMTypeFilter.ALL:
node_way_index = osmx.NodeWay(txn)
way_table = osmx.Ways(txn)
way_count = 0
way_ids = list()
for node_id in node_ids:
way_ids.extend(node_way_index.get(node_id))
for way_id in set(way_ids):
osmx_way = way_table.get(way_id)
if osmx_way is None:
continue
osmx_way_tags = osmx.tag_dict(osmx_way.tags)
if match_tags(osmx_way_tags, tags):
way_count += 1
way_nodes = osmx_way.nodes
locations = [location_index.get(loc) for loc in way_nodes]
yield Feature(
id="way/{}".format(way_id),
geometry=LineString([(loc[1], loc[0]) for loc in locations]),
properties={
"id": way_id,
"type": "way",
"metadata": osmx_way.metadata.to_dict(),
"tags": osmx_way_tags,
},
)
logger.debug("Matched {} ways".format(way_count))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment