Skip to content

Instantly share code, notes, and snippets.

@chalup
Created September 1, 2015 13:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chalup/46960f82d3870d20b7b6 to your computer and use it in GitHub Desktop.
Save chalup/46960f82d3870d20b7b6 to your computer and use it in GitHub Desktop.
OPTICS implementation
import math
import numpy as np
from sklearn.neighbors import BallTree
class Optics:
def __init__(self, eps, min_pts):
self.eps = eps
self.min_pts = min_pts
def get_reachability_plot(self, data):
n = data.shape[0]
assert n >= self.min_pts, "Input data should have at least min_pts (%) samples".format(self.min_pts)
ordering = []
core_distances = np.ones(n) * np.nan
reachability_distances = np.ones(n) * np.inf
seeds = np.ones(n, dtype=bool)
ball_tree = BallTree(data, p=1)
# calculate core distance for each core point
neighbors_count = ball_tree.query_radius(data, r=self.eps, count_only=True)
core_points_mask = neighbors_count >= self.min_pts
assert any(core_points_mask), "Input data contains no core points for specified eps and min_pts"
core_distances[core_points_mask] = ball_tree.query(data[core_points_mask], k=self.min_pts)[0][:, -1]
# start from core point
i = core_points_mask.nonzero()[0][0]
while True:
seeds[i] = False
ordering.append(i)
core_distance = core_distances[i]
if not math.isnan(core_distance):
assert core_distance <= self.eps
distances, indices = ball_tree.query(data[i], k=neighbors_count[i])
seeds_mask = seeds[indices[0]]
neighbors = indices[0][seeds_mask]
new_reachability_distances = np.minimum(
reachability_distances[neighbors],
np.maximum(core_distance, distances[0][seeds_mask])
)
reachability_distances[neighbors] = new_reachability_distances
# continue with next seed point with reachability distance
i = seeds.nonzero()[0][np.argmin(reachability_distances[seeds])]
if not math.isinf(reachability_distances[i]):
continue
# if that's not possible, use next unprocessed core point
unprocessed_core_points = seeds * core_points_mask
if any(unprocessed_core_points):
i = unprocessed_core_points.nonzero()[0][0]
continue
# done
break
# add all remaining points to ordering
ordering += seeds.nonzero()[0].tolist()
assert len(ordering) == n
return ordering, reachability_distances
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment