Skip to content

Instantly share code, notes, and snippets.

@nariaki3551
Created August 13, 2022 09:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nariaki3551/cf012cbc3caca173d2b9d2a89977f3bf to your computer and use it in GitHub Desktop.
Save nariaki3551/cf012cbc3caca173d2b9d2a89977f3bf to your computer and use it in GitHub Desktop.
# 参考書籍: De Berg, Mark, et al. "コンピュータ・ジオメトリ 計算幾何学: アルゴリズムと応用." (2000) 第5章
from dataclasses import dataclass
from typing import List, Tuple, Generator
class Point:
def __init__(self, name: str, loc: Tuple[float], **kwargs):
assert len(loc) == 2
self.name = name
self.__loc = *loc, self.name
self.attr = kwargs
def loc(self) -> Tuple[float, str]:
return self.__loc
def loc_rev(self) -> Tuple[float]:
return self.__loc[1], self.__loc[0]
def is_included(self, com_R) -> bool:
lon, lat, _ = self.__loc
return (
com_R.x_min <= (lon, lat) <= com_R.x_max
and com_R.y_min <= (lat, lon) <= com_R.y_max
)
def __hash__(self):
return hash((self.name, self.__loc))
def __repr__(self):
return f"Point{self.name, self.__loc[:2]})"
@dataclass
class Rectangle:
x_min: float
x_max: float
y_min: float
y_max: float
class FCNode:
def __init__(self, point: Point):
self.point = point
self.left = None
self.right = None
self.array = None
def loc(self) -> Tuple[float, str]:
return self.point.loc()
def is_leaf(self) -> bool:
return self.left is None and self.right is None
class FCPoint:
def __init__(self, point: Point):
self.point = point
self.bleft = None # left_begin_pointer
self.bright = None # right_begin_pointer
self.eleft = None # left_end_pointer
self.eright = None # left_end_pointer
def loc(self) -> Tuple[float, str]:
return self.point.loc()
def loc_rev(self) -> Tuple[float]:
return self.point.loc_rev()
def is_included(self, com_R) -> bool:
return self.point.is_included(com_R)
def find_minmax_ix(self, array: List[Point]) -> int:
"""
Returns:
int: minmax index of array for (y, x)
"""
lo, hi = 0, len(array)
if self.loc_rev() > array[hi - 1].loc_rev():
return None
while lo < hi:
mid = (lo + hi) // 2
if array[mid].loc_rev() < self.loc_rev():
lo = mid + 1
else:
hi = mid
return lo
def find_maxmin_ix(self, array: List[Point]) -> int:
"""
Returns:
int: minmax index of array for (y, x)
"""
lo, hi = 0, len(array)
if self.loc_rev() < array[lo].loc_rev():
return None
while lo < hi:
mid = (lo + hi) // 2
if self.loc_rev() < array[mid].loc_rev():
hi = mid
else:
lo = mid + 1
return lo - 1
class Tree:
def __init__(self, points: List[Point] = None, pickle_file: str = None):
self.root = None
if pickle_file is not None:
import pickle
with open(pickle_file, "rb") as pf:
self.root = pickle.load(pf)
if points is not None:
self.build_fc_tree(points)
def query(self, R: Rectangle) -> Generator[FCPoint, None, None]:
"""
Yields:
Point: points are include in Rectangle R
"""
com_R = Rectangle(
x_min=(R.x_min, -float("inf")),
x_max=(R.x_max, float("inf")),
y_min=(R.y_min, -float("inf")),
y_max=(R.y_max, float("inf")),
)
yield from (fc_point.point for fc_point in self.fc_range_query(com_R))
def build_fc_tree(self, points: List[Point]):
"""build fractional cascading tree from points"""
assert len(points) > 0, "no points in input data"
assert len(points) == len(set(points)), "(name, location) data must be unique."
points = [FCPoint(point) for point in points]
points.sort(key=lambda point: point.loc()) # sorted by x
sbx = [FCPoint(fc_point.point) for fc_point in points]
points.sort(key=lambda point: point.loc_rev()) # sorted by y
sby = [FCPoint(fc_point.point) for fc_point in points]
def link_array(
array: List[FCPoint], array_l: List[FCPoint], array_r: List[FCPoint]
) -> List[FCPoint]:
"""create minmax and maxmin link from array to array_l and array_r"""
for fc_point in array:
fc_point.bleft = fc_point.find_minmax_ix(array_l)
fc_point.bright = fc_point.find_minmax_ix(array_r)
fc_point.eleft = fc_point.find_maxmin_ix(array_l)
fc_point.eright = fc_point.find_maxmin_ix(array_r)
return array
def create_node(sbx: List[FCPoint], sby: List[FCPoint]) -> FCNode:
if len(sbx) == 1:
v = FCNode(sbx[0])
v.array = sby
else:
n = len(sbx)
pivot = sbx[n // 2 - 1]
sbx_l = sbx[: n // 2]
sbx_r = sbx[n // 2 :]
sby_l = [
FCPoint(fc_point.point)
for fc_point in sby
if fc_point.loc() <= pivot.loc()
]
sby_r = [
FCPoint(fc_point.point)
for fc_point in sby
if fc_point.loc() > pivot.loc()
]
v = FCNode(pivot)
v.array = link_array(sby, sby_l, sby_r)
v.left = create_node(sbx_l, sby_l)
v.right = create_node(sbx_r, sby_r)
return v
self.root = create_node(sbx, sby)
return self
def dump_pickle(self, pickle_file: str):
import pickle
with open(pickle_file, "wb") as pf:
pickle.dump(self.root, file=pf, protocol=-1)
def find_split_node(self, begin: Tuple[float], end: Tuple[float]) -> FCNode:
"""
Returns:
FCNode: FCNode v where the search path to begin and the search path to end diverge, or leaf node v where both paths end together
"""
v = self.root
while not v.is_leaf() and not begin <= v.loc() < end:
if v.loc() >= end:
v = v.left
else:
v = v.right
return v
def fc_range_query(self, com_R: Rectangle) -> Generator[FCPoint, None, None]:
begin_x, end_x = com_R.x_min, com_R.x_max
begin_y, end_y = com_R.y_min, com_R.y_max
virtual_begin_point = FCPoint(Point(name="vbe", loc=(begin_y[1], begin_y[0])))
virtual_end_point = FCPoint(Point(name="vee", loc=(end_y[1], end_y[0])))
v_split = self.find_split_node(begin_x, end_x)
minmax_ix = virtual_begin_point.find_minmax_ix(v_split.array)
maxmin_ix = virtual_end_point.find_maxmin_ix(v_split.array)
if minmax_ix is None or maxmin_ix is None:
return None
minmax_key = v_split.array[minmax_ix]
maxmin_key = v_split.array[maxmin_ix]
if v_split.is_leaf():
if v_split.array[0].is_included(com_R):
yield v_split.array[0]
else:
def frac_casc(array, begin_pointer, end_pointer):
if begin_pointer is not None and end_pointer is not None:
yield from (
array[ix] for ix in range(begin_pointer, end_pointer + 1)
)
v = v_split.left
lb_p = minmax_key.bleft # left begin pointer
le_p = maxmin_key.eleft # left end pointer
while not lb_p is None and not le_p is None and not v.is_leaf():
if v.loc() >= begin_x:
begin_p = v.array[lb_p].bright
end_p = v.array[le_p].eright
yield from frac_casc(v.right.array, begin_p, end_p)
lb_p = v.array[lb_p].bleft
le_p = v.array[le_p].eleft
v = v.left
else:
lb_p = v.array[lb_p].bright
le_p = v.array[le_p].eright
v = v.right
if not lb_p is None and not le_p is None and v.array[0].is_included(com_R):
yield v.array[0]
v = v_split.right
rb_p = minmax_key.bright # right begin pointer
re_p = maxmin_key.eright # right end pointer
while not rb_p is None and not re_p is None and not v.is_leaf():
if v.loc() <= end_x:
begin_p = v.array[rb_p].bleft
end_p = v.array[re_p].eleft
yield from frac_casc(v.left.array, begin_p, end_p)
rb_p = v.array[rb_p].bright
re_p = v.array[re_p].eright
v = v.right
else:
rb_p = v.array[rb_p].bleft
re_p = v.array[re_p].eleft
v = v.left
if not rb_p is None and not re_p is None and v.array[0].is_included(com_R):
yield v.array[0]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment