Skip to content

Instantly share code, notes, and snippets.

@pavlin-policar
Created May 9, 2020 07:52
Show Gist options
  • Save pavlin-policar/4d1a8ab0344617690330470db2b21d6f to your computer and use it in GitHub Desktop.
Save pavlin-policar/4d1a8ab0344617690330470db2b21d6f to your computer and use it in GitHub Desktop.
from sklearn.base import BaseEstimator
from sklearn.neighbors import NearestNeighbors
import numpy as np
import networkx as nx
from community import best_partition
def jaccard(x: set, y: set) -> float:
return len(x & y) / len(x | y)
def matrix_to_knn_graph(data, k_neighbors, metric, progress_callback=None):
"""Convert data matrix to a graph using a nearest neighbors approach with
the Jaccard similarity as the edge weights.
Parameters
----------
data : np.ndarray
k_neighbors : int
metric : str
A distance metric supported by sklearn.
progress_callback : Callable[[float], None]
Returns
-------
nx.Graph
"""
# We do k + 1 because each point is closest to itself, which is not useful
knn = NearestNeighbors(n_neighbors=k_neighbors, metric=metric).fit(data)
nearest_neighbors = knn.kneighbors(data, return_distance=False)
# Convert to list of sets so jaccard can be computed efficiently
nearest_neighbors = list(map(set, nearest_neighbors))
num_nodes = len(nearest_neighbors)
# Create an empty graph and add all the data ids as nodes for easy mapping
graph = nx.Graph()
graph.add_nodes_from(range(len(data)))
for idx, node in enumerate(graph.nodes):
for neighbor in nearest_neighbors[node]:
graph.add_edge(
node,
neighbor,
weight=jaccard(
nearest_neighbors[node], nearest_neighbors[neighbor]),
)
return graph
class LouvainClustering(BaseEstimator):
def __init__(self, k_neighbors=30, metric="l2", resolution=1.0,
random_state=None):
self.k_neighbors = k_neighbors
self.metric = metric
self.resolution = resolution
self.random_state = random_state
self.labels_ = None
def fit(self, X: np.ndarray, y: np.ndarray = None):
# If we are given a data matrix, we have to convert it to a graph first
graph = matrix_to_knn_graph(
X, metric=self.metric, k_neighbors=self.k_neighbors
)
return self.fit_graph(graph)
def fit_graph(self, graph):
partition = best_partition(
graph, resolution=self.resolution, random_state=self.random_state
)
self.labels_ = np.fromiter(
list(zip(*sorted(partition.items())))[1], dtype=int
)
return self
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment