Skip to content

Instantly share code, notes, and snippets.

@ZwodahS
Created June 14, 2016 14:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ZwodahS/a9387cb9d7a260ced80b640827845981 to your computer and use it in GitHub Desktop.
Save ZwodahS/a9387cb9d7a260ced80b640827845981 to your computer and use it in GitHub Desktop.
Simple quad tree for searching points inside polygon
import json
from shapely.geometry.polygon import Polygon
from shapely.geometry import Point, box as Box
class _Quad(object):
def __init__(self, quad_tree, parent):
self.quad_tree = quad_tree
self.parent = parent
self.quads = None
self.items = []
self.box = None
self.x_range = [None, None]
self.y_range = [None, None]
def add_item(self, point, data):
# as item is being added, calculate what are the range for the x and y
if self.x_range[0] is None or point.x < self.x_range[0]:
self.x_range[0] = point.x
if self.y_range[0] is None or point.y < self.y_range[0]:
self.y_range[0] = point.y
if self.x_range[1] is None or point.x > self.x_range[1]:
self.x_range[1] = point.x
if self.y_range[1] is None or point.y > self.y_range[1]:
self.y_range[1] = point.y
self.items.append((point, data))
def find_items(self, polygon, out):
# if no intersections with my bounding box, return immediately
if not self.box.intersects(polygon):
return out
# if the polygon contains the whole bounding box, just add all my items
if polygon.contains(self.box):
out.extend(self.items)
return out
# if there is no sub quads, just compare all items in this quad
if not self.quads:
for item in self.items:
if polygon.contains(item[0]):
out.append(item)
else:
# search sub quads
for quad in self.quads:
quad.find_items(polygon, out)
return out
def traverse(self, level, pre_func=None, post_func=None):
"""
level the current level in this traversal
pre_func function to run before calling the traverse in the sub quads
post_func function to run after calling the traverse in the sub quads
"""
if pre_func:
pre_func(self, level)
if self.quads is not None:
for quad in self.quads:
quad.traverse(level+1, pre_func=pre_func, post_func=post_func)
if post_func:
post_func(self, level)
def finalize(self, buffer_bound):
"""finalize this quad to create all the sub quads for this quads recursively
"""
# 1. create my own bounding box
bound = [self.x_range[0], self.y_range[0], self.x_range[1], self.y_range[1]]
bound[0] -= buffer_bound
bound[1] -= buffer_bound
bound[2] += buffer_bound
bound[3] += buffer_bound
self.box = Box(*bound)
# 2. if met threshold, split into smaller quads
if len(self.items) >= self.quad_tree.threshold:
mid_x = bound[0] + ((bound[2] - bound[0])/2)
mid_y = bound[1] + ((bound[3] - bound[1])/2)
self.quads = []
for b in ([bound[0], bound[1], mid_x, mid_y],
[mid_x, bound[1], bound[2], mid_y],
[bound[0], mid_y, mid_x, bound[3]],
[mid_x, mid_y, bound[2], bound[3]]):
# make each box slightly bigger
b[0] -= buffer_bound
b[1] -= buffer_bound
b[2] += buffer_bound
b[3] += buffer_bound
quad = _Quad(self.quad_tree, self)
quad.temp_box = Box(*b)
self.quads.append(quad)
# 3. add items to quads
for point, data in self.items:
target_quad = None
for quad in self.quads:
if quad.temp_box.contains(point):
target_quad = quad
break
target_quad.add_item(point, data)
# 4. finalize all the sub quads
for ind, quad in enumerate(self.quads):
if len(quad.items) == 0:
self.quads[ind] = None
else:
del quad.temp_box
quad.finalize(buffer_bound)
# clean up empty quads
self.quads = [ q for q in self.quads if q is not None ]
del self.x_range
del self.y_range
class QuadTree(object):
"""
QuadTree
The role of this wrapper is to convert x, y into shapely's point and
list of points into polygon
To use this for coordinates, x should be treated as "lng" and y should be
treated as "lat" although it doesn't really make a different
Usage:
data = [ (2, 4, "abc"), (3, 6, "def"), ....]
quad_tree = QuadTree()
for d in data:
quad_tree.add(**d)
quad_tree.finalize()
results = quad_tree.find_items([(0, 0), ...])
"""
def __init__(self, threshold=40):
"""
bound [x, y, x, y]
threshold the number of item before it will be divided into quads
"""
self.threshold = threshold
self.root_quad = _Quad(self, None)
def add_item(self, x, y, data):
self.root_quad.add_item(Point(x, y), data)
def find_items(self, polygon):
"""
polygon list of points for this polygon
return
list of items, each item is (Point, data)
"""
out = []
polygon = Polygon(polygon)
self.root_quad.find_items(polygon, out)
return out
def traverse(self, pre_func=None, post_func=None):
self.root_quad.traverse(1, pre_func=pre_func, post_func=post_func)
def finalize(self, buffer_bound=0.0000000001):
"""
buffer_bound buffer added to the bound
this is necessary for dealing with coordinates
"""
self.root_quad.finalize(buffer_bound)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment