Skip to content

Instantly share code, notes, and snippets.

@amintos
Last active December 10, 2015 14:38
Show Gist options
  • Save amintos/4449400 to your computer and use it in GitHub Desktop.
Save amintos/4449400 to your computer and use it in GitHub Desktop.
A **standalone function approximator** using **K-Nearest-Neighbours**. Supports euclidean and cosine similarity measures, weights the top-k results constantly, linearly or exponentially (using Boltzmann/Gibbs) and optionally normalizes the output. Can perform cross-validation on its sample set for parameter tuning and reads CSV files.
#
# K-Nearest-Neighbours Function Approximator
# 2013 | by Toni Mattis
#
import math
import heapq # for efficient top-k retrieval
import random # for cross-validation
import csv # for loading samples from a file
# Auxiliary functions
def _normalize(v):
"""Compute the gaussian (L2-)norm"""
length = math.sqrt(sum((x * x for x in v)))
return [x / length for x in v]
def _normalize1(v):
"""Compute the L1-norm"""
length = sum(map(abs, v))
return [x / length for x in v]
def rsme(v1, v2):
"""Root-mean-square deviation of v1 and v2"""
return math.sqrt(sum(((x1 - x2) * (x1 - x2) for x1, x2 in zip(v1, v2)))
/ len(v1))
# ------------------------------------------------------------------------------
# SIMILARITY MEASURES
def euclideanSimilarity(n):
"""Measure the reciprocal sum of squared errors between two vectors"""
def normalize(sample):
"""Identity"""
return sample
def similarity(sample, reference):
"""Compute the reciprocal squared euclidean distance"""
return 1. / (1 + sum(((s - r) * (s - r)
for s, r in zip(sample, reference))))
return normalize, similarity
def cosineSimilarity(n):
"""Interpret samples as vectors and measures the angle between them"""
if n > 1:
def similarity(sample, reference):
"""Compute the dot-product of two 2-norm vectors"""
return sum((x * y for x, y in zip(sample, reference)))
return _normalize, similarity
else:
raise ValueError, "cosine similarity requires n > 1"
def overlapSimilarity(n):
"""Rank samples by overlapping area. Performs well on histograms."""
def similarity(sample, reference):
return 1 / sum((abs(min(x, y) - max(x, y)) for x, y in zip(sample, reference)))
return (lambda x: x), similarity
# ------------------------------------------------------------------------------
# WEIGHTING SCHEMES
def linearWeights(k):
"""Normalize the similarities to form a weight vector"""
return _normalize1
def constantWeights(k):
"""Return a constant weight vector"""
dist = [1. / k] * k
return lambda weights: dist
def boltzmannWeights(t):
"""Generate an exponential weighting function with temperature t"""
def _boltzmannWeights(k):
def distribution(weights):
return _normalize1([math.exp(w / t) for w in weights])
return distribution
return _boltzmannWeights
# ------------------------------------------------------------------------------
# INTERPOLATION SCHEMES
def linearInterpolation(k, m):
"""Interpolate proportionally given a weight vector"""
def interpolate(weights, outputs):
return [sum((weights[i] * outputs[i][j] for i in xrange(k)))
for j in xrange(m)]
return interpolate
def normalLinearInterpolation(k, m):
"""Interpolate linearly and normalize the result (for unit-vectors)"""
lerp = linearInterpolation(k, m)
def interpolate(weights, outputs):
return _normalize(lerp(weights, outputs))
return interpolate
# ------------------------------------------------------------------------------
# FUNCTION APPROXIMATOR
class KNearestNeighbours(object):
"""
Usage:
KNearestNeighbours(
n, # input dimensions
m, # output dimensions
k, # number of neighbours considered
distance_measure, # either euclideanSimilarity (default)
# or cosineSimilarity (which is invariant
# against collinearity, i.e [1,1] and [2,2]
# are equal)
weighting_scheme, # either constantWeights (default),
# or linearWeights (which uses similarity
# as weights in a weighted average)
# or boltzmannWeights(t) with a given
# Temperature t (i.e. between 0.1 and 1.0),
# which weights similarity exponentially.
interpolation, # either linearInterpolation (LERP, default)
# or normalLinearInterpolation (NLERP), which
# outputs unit vectors (useful for
# rotation quaternions, etc.)
"""
def __init__(self, n, m, k,
distance_measure = euclideanSimilarity,
weighting_scheme = constantWeights,
interpolation = linearInterpolation,
samples = None):
"""Construct a K-Nearest-Neighbour Approximator for an n x m function"""
self.n = n
self.m = m
self.k = k
self.normalize, self.similarity = distance_measure(n)
self.weighting = weighting_scheme(k)
self.interpolate = interpolation(k, m)
self.samples = map(self.normalize, samples) if samples else []
def sample_csv(self, filename):
"""Load CSV structured like in_1,...,in_n,out_1,...,out_m.
Returns the number of imported samples"""
imports = 0
with file(filename) as f:
for row in csv.reader(f, delimiter=','):
try:
float_row = map(float, row)
if len(float_row) >= self.m + self.n:
self.sample(float_row[: self.n],
float_row[self.n : self.n + self.m])
imports += 1
except ValueError:
pass
except IndexError:
pass
return imports
def sample(self, inp, out):
"""Learn a sample"""
self.samples.append((self.normalize(inp), out))
def top_k(self, arg):
"""Find k nearest neighbours as tuple (similarity, output)"""
arg = self.normalize(arg)
return heapq.nlargest(self.k, ((self.similarity(arg, inp), out)
for inp, out in self.samples))
def __call__(self, arg):
"""Perform K-Nearest-Neighbours with interpolation"""
top_k = self.top_k(arg)
weights = self.weighting([sim for sim, out in top_k])
return self.interpolate(weights, [out for sim, out in top_k])
def errors(self, n = 10, ratio = 0.2):
"""Return average component-wise RMSEs of n cross-validations"""
errors = [0] * self.m
for i in xrange(n):
knn = KNearestNeighbours(self.n, self.m, self.k)
knn.normalize = self.normalize
knn.weighting = self.weighting
knn.interpolate = self.interpolate
knn.similarity = self.similarity
samples = self.samples
cut = int(len(samples) * ratio)
random.shuffle(samples)
# cross-validation training set
knn.samples = samples[cut:]
# cross-validation test set
samples = samples[:cut]
predictions = [(s[1], knn(s[0])) for s in samples]
# update error with component-wise RSMEs of all predictions
for j in xrange(self.m):
errors[j] += rsme([p[0][j] for p in predictions],
[p[1][j] for p in predictions])
for i in xrange(self.m):
errors[i] /= n
return errors
if __name__ == '__main__':
print "Testing a function (x, y, z) -> (100x+10y+z, (100x+10y+z)^2) "
samples = [( (math.floor(i / 100.), math.floor((i / 10.) % 10.), math.floor(i % 10.)), (i, i * i) )
for i in xrange(1000)]
k = KNearestNeighbours(3, 2, 5, samples=samples,
weighting_scheme=linearWeights)
print "Result from [8, 8, 8]: ", k([8, 8, 8])
print "Testing errors while Boltzmann distribution is cooling down"
print "Temperature, Approximation at [5, 5, 5], RMSEs for each component"
for i in xrange(1, 20):
k = KNearestNeighbours(3, 2, 5, samples=samples,
weighting_scheme=boltzmannWeights(1. / i))
print 1. / i, k([5, 5, 5]), k.errors()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment