Created
May 9, 2020 07:52
-
-
Save pavlin-policar/4d1a8ab0344617690330470db2b21d6f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from sklearn.base import BaseEstimator | |
from sklearn.neighbors import NearestNeighbors | |
import numpy as np | |
import networkx as nx | |
from community import best_partition | |
def jaccard(x: set, y: set) -> float: | |
return len(x & y) / len(x | y) | |
def matrix_to_knn_graph(data, k_neighbors, metric, progress_callback=None): | |
"""Convert data matrix to a graph using a nearest neighbors approach with | |
the Jaccard similarity as the edge weights. | |
Parameters | |
---------- | |
data : np.ndarray | |
k_neighbors : int | |
metric : str | |
A distance metric supported by sklearn. | |
progress_callback : Callable[[float], None] | |
Returns | |
------- | |
nx.Graph | |
""" | |
# We do k + 1 because each point is closest to itself, which is not useful | |
knn = NearestNeighbors(n_neighbors=k_neighbors, metric=metric).fit(data) | |
nearest_neighbors = knn.kneighbors(data, return_distance=False) | |
# Convert to list of sets so jaccard can be computed efficiently | |
nearest_neighbors = list(map(set, nearest_neighbors)) | |
num_nodes = len(nearest_neighbors) | |
# Create an empty graph and add all the data ids as nodes for easy mapping | |
graph = nx.Graph() | |
graph.add_nodes_from(range(len(data))) | |
for idx, node in enumerate(graph.nodes): | |
for neighbor in nearest_neighbors[node]: | |
graph.add_edge( | |
node, | |
neighbor, | |
weight=jaccard( | |
nearest_neighbors[node], nearest_neighbors[neighbor]), | |
) | |
return graph | |
class LouvainClustering(BaseEstimator): | |
def __init__(self, k_neighbors=30, metric="l2", resolution=1.0, | |
random_state=None): | |
self.k_neighbors = k_neighbors | |
self.metric = metric | |
self.resolution = resolution | |
self.random_state = random_state | |
self.labels_ = None | |
def fit(self, X: np.ndarray, y: np.ndarray = None): | |
# If we are given a data matrix, we have to convert it to a graph first | |
graph = matrix_to_knn_graph( | |
X, metric=self.metric, k_neighbors=self.k_neighbors | |
) | |
return self.fit_graph(graph) | |
def fit_graph(self, graph): | |
partition = best_partition( | |
graph, resolution=self.resolution, random_state=self.random_state | |
) | |
self.labels_ = np.fromiter( | |
list(zip(*sorted(partition.items())))[1], dtype=int | |
) | |
return self |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment