Created
August 13, 2022 09:26
-
-
Save nariaki3551/cf012cbc3caca173d2b9d2a89977f3bf to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# 参考書籍: De Berg, Mark, et al. "コンピュータ・ジオメトリ 計算幾何学: アルゴリズムと応用." (2000) 第5章 | |
from dataclasses import dataclass | |
from typing import List, Tuple, Generator | |
class Point: | |
def __init__(self, name: str, loc: Tuple[float], **kwargs): | |
assert len(loc) == 2 | |
self.name = name | |
self.__loc = *loc, self.name | |
self.attr = kwargs | |
def loc(self) -> Tuple[float, str]: | |
return self.__loc | |
def loc_rev(self) -> Tuple[float]: | |
return self.__loc[1], self.__loc[0] | |
def is_included(self, com_R) -> bool: | |
lon, lat, _ = self.__loc | |
return ( | |
com_R.x_min <= (lon, lat) <= com_R.x_max | |
and com_R.y_min <= (lat, lon) <= com_R.y_max | |
) | |
def __hash__(self): | |
return hash((self.name, self.__loc)) | |
def __repr__(self): | |
return f"Point{self.name, self.__loc[:2]})" | |
@dataclass | |
class Rectangle: | |
x_min: float | |
x_max: float | |
y_min: float | |
y_max: float | |
class FCNode: | |
def __init__(self, point: Point): | |
self.point = point | |
self.left = None | |
self.right = None | |
self.array = None | |
def loc(self) -> Tuple[float, str]: | |
return self.point.loc() | |
def is_leaf(self) -> bool: | |
return self.left is None and self.right is None | |
class FCPoint: | |
def __init__(self, point: Point): | |
self.point = point | |
self.bleft = None # left_begin_pointer | |
self.bright = None # right_begin_pointer | |
self.eleft = None # left_end_pointer | |
self.eright = None # left_end_pointer | |
def loc(self) -> Tuple[float, str]: | |
return self.point.loc() | |
def loc_rev(self) -> Tuple[float]: | |
return self.point.loc_rev() | |
def is_included(self, com_R) -> bool: | |
return self.point.is_included(com_R) | |
def find_minmax_ix(self, array: List[Point]) -> int: | |
""" | |
Returns: | |
int: minmax index of array for (y, x) | |
""" | |
lo, hi = 0, len(array) | |
if self.loc_rev() > array[hi - 1].loc_rev(): | |
return None | |
while lo < hi: | |
mid = (lo + hi) // 2 | |
if array[mid].loc_rev() < self.loc_rev(): | |
lo = mid + 1 | |
else: | |
hi = mid | |
return lo | |
def find_maxmin_ix(self, array: List[Point]) -> int: | |
""" | |
Returns: | |
int: minmax index of array for (y, x) | |
""" | |
lo, hi = 0, len(array) | |
if self.loc_rev() < array[lo].loc_rev(): | |
return None | |
while lo < hi: | |
mid = (lo + hi) // 2 | |
if self.loc_rev() < array[mid].loc_rev(): | |
hi = mid | |
else: | |
lo = mid + 1 | |
return lo - 1 | |
class Tree: | |
def __init__(self, points: List[Point] = None, pickle_file: str = None): | |
self.root = None | |
if pickle_file is not None: | |
import pickle | |
with open(pickle_file, "rb") as pf: | |
self.root = pickle.load(pf) | |
if points is not None: | |
self.build_fc_tree(points) | |
def query(self, R: Rectangle) -> Generator[FCPoint, None, None]: | |
""" | |
Yields: | |
Point: points are include in Rectangle R | |
""" | |
com_R = Rectangle( | |
x_min=(R.x_min, -float("inf")), | |
x_max=(R.x_max, float("inf")), | |
y_min=(R.y_min, -float("inf")), | |
y_max=(R.y_max, float("inf")), | |
) | |
yield from (fc_point.point for fc_point in self.fc_range_query(com_R)) | |
def build_fc_tree(self, points: List[Point]): | |
"""build fractional cascading tree from points""" | |
assert len(points) > 0, "no points in input data" | |
assert len(points) == len(set(points)), "(name, location) data must be unique." | |
points = [FCPoint(point) for point in points] | |
points.sort(key=lambda point: point.loc()) # sorted by x | |
sbx = [FCPoint(fc_point.point) for fc_point in points] | |
points.sort(key=lambda point: point.loc_rev()) # sorted by y | |
sby = [FCPoint(fc_point.point) for fc_point in points] | |
def link_array( | |
array: List[FCPoint], array_l: List[FCPoint], array_r: List[FCPoint] | |
) -> List[FCPoint]: | |
"""create minmax and maxmin link from array to array_l and array_r""" | |
for fc_point in array: | |
fc_point.bleft = fc_point.find_minmax_ix(array_l) | |
fc_point.bright = fc_point.find_minmax_ix(array_r) | |
fc_point.eleft = fc_point.find_maxmin_ix(array_l) | |
fc_point.eright = fc_point.find_maxmin_ix(array_r) | |
return array | |
def create_node(sbx: List[FCPoint], sby: List[FCPoint]) -> FCNode: | |
if len(sbx) == 1: | |
v = FCNode(sbx[0]) | |
v.array = sby | |
else: | |
n = len(sbx) | |
pivot = sbx[n // 2 - 1] | |
sbx_l = sbx[: n // 2] | |
sbx_r = sbx[n // 2 :] | |
sby_l = [ | |
FCPoint(fc_point.point) | |
for fc_point in sby | |
if fc_point.loc() <= pivot.loc() | |
] | |
sby_r = [ | |
FCPoint(fc_point.point) | |
for fc_point in sby | |
if fc_point.loc() > pivot.loc() | |
] | |
v = FCNode(pivot) | |
v.array = link_array(sby, sby_l, sby_r) | |
v.left = create_node(sbx_l, sby_l) | |
v.right = create_node(sbx_r, sby_r) | |
return v | |
self.root = create_node(sbx, sby) | |
return self | |
def dump_pickle(self, pickle_file: str): | |
import pickle | |
with open(pickle_file, "wb") as pf: | |
pickle.dump(self.root, file=pf, protocol=-1) | |
def find_split_node(self, begin: Tuple[float], end: Tuple[float]) -> FCNode: | |
""" | |
Returns: | |
FCNode: FCNode v where the search path to begin and the search path to end diverge, or leaf node v where both paths end together | |
""" | |
v = self.root | |
while not v.is_leaf() and not begin <= v.loc() < end: | |
if v.loc() >= end: | |
v = v.left | |
else: | |
v = v.right | |
return v | |
def fc_range_query(self, com_R: Rectangle) -> Generator[FCPoint, None, None]: | |
begin_x, end_x = com_R.x_min, com_R.x_max | |
begin_y, end_y = com_R.y_min, com_R.y_max | |
virtual_begin_point = FCPoint(Point(name="vbe", loc=(begin_y[1], begin_y[0]))) | |
virtual_end_point = FCPoint(Point(name="vee", loc=(end_y[1], end_y[0]))) | |
v_split = self.find_split_node(begin_x, end_x) | |
minmax_ix = virtual_begin_point.find_minmax_ix(v_split.array) | |
maxmin_ix = virtual_end_point.find_maxmin_ix(v_split.array) | |
if minmax_ix is None or maxmin_ix is None: | |
return None | |
minmax_key = v_split.array[minmax_ix] | |
maxmin_key = v_split.array[maxmin_ix] | |
if v_split.is_leaf(): | |
if v_split.array[0].is_included(com_R): | |
yield v_split.array[0] | |
else: | |
def frac_casc(array, begin_pointer, end_pointer): | |
if begin_pointer is not None and end_pointer is not None: | |
yield from ( | |
array[ix] for ix in range(begin_pointer, end_pointer + 1) | |
) | |
v = v_split.left | |
lb_p = minmax_key.bleft # left begin pointer | |
le_p = maxmin_key.eleft # left end pointer | |
while not lb_p is None and not le_p is None and not v.is_leaf(): | |
if v.loc() >= begin_x: | |
begin_p = v.array[lb_p].bright | |
end_p = v.array[le_p].eright | |
yield from frac_casc(v.right.array, begin_p, end_p) | |
lb_p = v.array[lb_p].bleft | |
le_p = v.array[le_p].eleft | |
v = v.left | |
else: | |
lb_p = v.array[lb_p].bright | |
le_p = v.array[le_p].eright | |
v = v.right | |
if not lb_p is None and not le_p is None and v.array[0].is_included(com_R): | |
yield v.array[0] | |
v = v_split.right | |
rb_p = minmax_key.bright # right begin pointer | |
re_p = maxmin_key.eright # right end pointer | |
while not rb_p is None and not re_p is None and not v.is_leaf(): | |
if v.loc() <= end_x: | |
begin_p = v.array[rb_p].bleft | |
end_p = v.array[re_p].eleft | |
yield from frac_casc(v.left.array, begin_p, end_p) | |
rb_p = v.array[rb_p].bright | |
re_p = v.array[re_p].eright | |
v = v.right | |
else: | |
rb_p = v.array[rb_p].bleft | |
re_p = v.array[re_p].eleft | |
v = v.left | |
if not rb_p is None and not re_p is None and v.array[0].is_included(com_R): | |
yield v.array[0] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment