Skip to content

Instantly share code, notes, and snippets.

@ljwolf
Last active September 26, 2022 16:15
Show Gist options
  • Save ljwolf/ed42cab09a88b40b0c509e53b013ab9c to your computer and use it in GitHub Desktop.
Save ljwolf/ed42cab09a88b40b0c509e53b013ab9c to your computer and use it in GitHub Desktop.
Towards a faster kernel weights class in PySAL
from sklearn import metrics
from sklearn.neighbors import BallTree
import numpy, pandas
from scipy.optimize import minimize_scalar
from scipy import sparse, stats
from libpysal.weights import WSP, Kernel
from libpysal.weights.util import get_points_array
coordinates = numpy.random.normal(0, 1, size=(100, 2))
def _triangular(distances, bandwidth):
u = numpy.clip(distances / bandwidth, 0, 1)
return 1 - u
def _parabolic(distances, bandwidth):
u = numpy.clip(distances / bandwidth, 0, 1)
return 0.75 * (1 - u**2)
def _gaussian(distances, bandwidth):
u = distances / bandwidth
return numpy.exp(-((u / 2) ** 2)) / (numpy.sqrt(2) * numpy.pi)
def _bisquare(distances, bandwidth):
u = numpy.clip(distances / bandwidth, 0, 1)
return (15 / 16) * (1 - u**2) ** 2
def _cosine(distances, bandwidth):
u = numpy.clip(distances / bandwidth, 0, 1)
return (numpy.pi / 4) * numpy.cos(numpy.pi / 2 * u)
_kernel_functions = dict(
triangular=_triangular,
parabolic=_parabolic,
gaussian=_gaussian,
bisquare=_bisquare,
cosine=_cosine,
)
def FastKernel(
coordinates,
bandwidth=None,
metric="euclidean",
kernel="triangular",
k=None,
indices=None,
**kwargs
):
if hasattr(coordinates, "geometry"):
indices = coordinates.index
coordinates = get_points_array(coordinates)
else:
if indices is None:
indices = pandas.RangeIndex(coordinates.shape[0])
if k is not None:
tree = BallTree(coordinates)
D_linear, ixs = tree.query(coordinates, k=k + 1)
self_ix, neighbor_ix = ixs[:, 0], ixs[:, 1:]
D_linear = D_linear[:, 1:]
self_ix_flat = numpy.repeat(self_ix, k)
neighbor_ix_flat = neighbor_ix.flatten()
D_linear_flat = D_linear.flatten()
D = sparse.csc_matrix((D_linear_flat, (self_ix_flat, neighbor_ix_flat)))
else:
D = metrics.pairwise_distances(coordinates, metric=metric)
D = sparse.csc_matrix(D)
if bandwidth is None:
bandwidth = numpy.percentile(D.data, 25)
elif bandwidth == "opt":
bandwidth = _optimize_bandwidth(D, kernel)
K = D.copy()
K.data = _kernel_functions[kernel](D.data, bandwidth)
W = WSP(K, index=indices).to_W()
W._cache["bandwidth"] = bandwidth
return W
def _from_dataframe(df, **kwargs):
coordinates = get_points_array(df.geometry)
indices = df.index
return FastKernel(coordinates, indices=indices, **kwargs)
FastKernel.from_dataframe = _from_dataframe
def _optimize_bandwidth(D, kernel):
kernel_function = _kernel_functions[kernel]
def _loss(bandwidth, D=D, kernel_function=kernel_function):
Ku = kernel_function(D.data, bandwidth)
bins, _ = numpy.histogram(Ku, bins=int(D.shape[0] ** 0.5), range=(0,1))
return -stats.entropy(bins / bins.sum())
xopt = minimize_scalar(_loss, bounds=(0, D.data.max() * 2), method="bounded")
return xopt.x
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment