Last active
December 10, 2015 14:38
-
-
Save amintos/4449400 to your computer and use it in GitHub Desktop.
A **standalone function approximator** using **K-Nearest-Neighbours**. Supports euclidean and cosine similarity measures, weights the top-k results constantly, linearly or exponentially (using Boltzmann/Gibbs) and optionally normalizes the output. Can perform cross-validation on its sample set for parameter tuning and reads CSV files.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# K-Nearest-Neighbours Function Approximator | |
# 2013 | by Toni Mattis | |
# | |
import math | |
import heapq # for efficient top-k retrieval | |
import random # for cross-validation | |
import csv # for loading samples from a file | |
# Auxiliary functions | |
def _normalize(v): | |
"""Compute the gaussian (L2-)norm""" | |
length = math.sqrt(sum((x * x for x in v))) | |
return [x / length for x in v] | |
def _normalize1(v): | |
"""Compute the L1-norm""" | |
length = sum(map(abs, v)) | |
return [x / length for x in v] | |
def rsme(v1, v2): | |
"""Root-mean-square deviation of v1 and v2""" | |
return math.sqrt(sum(((x1 - x2) * (x1 - x2) for x1, x2 in zip(v1, v2))) | |
/ len(v1)) | |
# ------------------------------------------------------------------------------ | |
# SIMILARITY MEASURES | |
def euclideanSimilarity(n): | |
"""Measure the reciprocal sum of squared errors between two vectors""" | |
def normalize(sample): | |
"""Identity""" | |
return sample | |
def similarity(sample, reference): | |
"""Compute the reciprocal squared euclidean distance""" | |
return 1. / (1 + sum(((s - r) * (s - r) | |
for s, r in zip(sample, reference)))) | |
return normalize, similarity | |
def cosineSimilarity(n): | |
"""Interpret samples as vectors and measures the angle between them""" | |
if n > 1: | |
def similarity(sample, reference): | |
"""Compute the dot-product of two 2-norm vectors""" | |
return sum((x * y for x, y in zip(sample, reference))) | |
return _normalize, similarity | |
else: | |
raise ValueError, "cosine similarity requires n > 1" | |
def overlapSimilarity(n): | |
"""Rank samples by overlapping area. Performs well on histograms.""" | |
def similarity(sample, reference): | |
return 1 / sum((abs(min(x, y) - max(x, y)) for x, y in zip(sample, reference))) | |
return (lambda x: x), similarity | |
# ------------------------------------------------------------------------------ | |
# WEIGHTING SCHEMES | |
def linearWeights(k): | |
"""Normalize the similarities to form a weight vector""" | |
return _normalize1 | |
def constantWeights(k): | |
"""Return a constant weight vector""" | |
dist = [1. / k] * k | |
return lambda weights: dist | |
def boltzmannWeights(t): | |
"""Generate an exponential weighting function with temperature t""" | |
def _boltzmannWeights(k): | |
def distribution(weights): | |
return _normalize1([math.exp(w / t) for w in weights]) | |
return distribution | |
return _boltzmannWeights | |
# ------------------------------------------------------------------------------ | |
# INTERPOLATION SCHEMES | |
def linearInterpolation(k, m): | |
"""Interpolate proportionally given a weight vector""" | |
def interpolate(weights, outputs): | |
return [sum((weights[i] * outputs[i][j] for i in xrange(k))) | |
for j in xrange(m)] | |
return interpolate | |
def normalLinearInterpolation(k, m): | |
"""Interpolate linearly and normalize the result (for unit-vectors)""" | |
lerp = linearInterpolation(k, m) | |
def interpolate(weights, outputs): | |
return _normalize(lerp(weights, outputs)) | |
return interpolate | |
# ------------------------------------------------------------------------------ | |
# FUNCTION APPROXIMATOR | |
class KNearestNeighbours(object): | |
""" | |
Usage: | |
KNearestNeighbours( | |
n, # input dimensions | |
m, # output dimensions | |
k, # number of neighbours considered | |
distance_measure, # either euclideanSimilarity (default) | |
# or cosineSimilarity (which is invariant | |
# against collinearity, i.e [1,1] and [2,2] | |
# are equal) | |
weighting_scheme, # either constantWeights (default), | |
# or linearWeights (which uses similarity | |
# as weights in a weighted average) | |
# or boltzmannWeights(t) with a given | |
# Temperature t (i.e. between 0.1 and 1.0), | |
# which weights similarity exponentially. | |
interpolation, # either linearInterpolation (LERP, default) | |
# or normalLinearInterpolation (NLERP), which | |
# outputs unit vectors (useful for | |
# rotation quaternions, etc.) | |
""" | |
def __init__(self, n, m, k, | |
distance_measure = euclideanSimilarity, | |
weighting_scheme = constantWeights, | |
interpolation = linearInterpolation, | |
samples = None): | |
"""Construct a K-Nearest-Neighbour Approximator for an n x m function""" | |
self.n = n | |
self.m = m | |
self.k = k | |
self.normalize, self.similarity = distance_measure(n) | |
self.weighting = weighting_scheme(k) | |
self.interpolate = interpolation(k, m) | |
self.samples = map(self.normalize, samples) if samples else [] | |
def sample_csv(self, filename): | |
"""Load CSV structured like in_1,...,in_n,out_1,...,out_m. | |
Returns the number of imported samples""" | |
imports = 0 | |
with file(filename) as f: | |
for row in csv.reader(f, delimiter=','): | |
try: | |
float_row = map(float, row) | |
if len(float_row) >= self.m + self.n: | |
self.sample(float_row[: self.n], | |
float_row[self.n : self.n + self.m]) | |
imports += 1 | |
except ValueError: | |
pass | |
except IndexError: | |
pass | |
return imports | |
def sample(self, inp, out): | |
"""Learn a sample""" | |
self.samples.append((self.normalize(inp), out)) | |
def top_k(self, arg): | |
"""Find k nearest neighbours as tuple (similarity, output)""" | |
arg = self.normalize(arg) | |
return heapq.nlargest(self.k, ((self.similarity(arg, inp), out) | |
for inp, out in self.samples)) | |
def __call__(self, arg): | |
"""Perform K-Nearest-Neighbours with interpolation""" | |
top_k = self.top_k(arg) | |
weights = self.weighting([sim for sim, out in top_k]) | |
return self.interpolate(weights, [out for sim, out in top_k]) | |
def errors(self, n = 10, ratio = 0.2): | |
"""Return average component-wise RMSEs of n cross-validations""" | |
errors = [0] * self.m | |
for i in xrange(n): | |
knn = KNearestNeighbours(self.n, self.m, self.k) | |
knn.normalize = self.normalize | |
knn.weighting = self.weighting | |
knn.interpolate = self.interpolate | |
knn.similarity = self.similarity | |
samples = self.samples | |
cut = int(len(samples) * ratio) | |
random.shuffle(samples) | |
# cross-validation training set | |
knn.samples = samples[cut:] | |
# cross-validation test set | |
samples = samples[:cut] | |
predictions = [(s[1], knn(s[0])) for s in samples] | |
# update error with component-wise RSMEs of all predictions | |
for j in xrange(self.m): | |
errors[j] += rsme([p[0][j] for p in predictions], | |
[p[1][j] for p in predictions]) | |
for i in xrange(self.m): | |
errors[i] /= n | |
return errors | |
if __name__ == '__main__': | |
print "Testing a function (x, y, z) -> (100x+10y+z, (100x+10y+z)^2) " | |
samples = [( (math.floor(i / 100.), math.floor((i / 10.) % 10.), math.floor(i % 10.)), (i, i * i) ) | |
for i in xrange(1000)] | |
k = KNearestNeighbours(3, 2, 5, samples=samples, | |
weighting_scheme=linearWeights) | |
print "Result from [8, 8, 8]: ", k([8, 8, 8]) | |
print "Testing errors while Boltzmann distribution is cooling down" | |
print "Temperature, Approximation at [5, 5, 5], RMSEs for each component" | |
for i in xrange(1, 20): | |
k = KNearestNeighbours(3, 2, 5, samples=samples, | |
weighting_scheme=boltzmannWeights(1. / i)) | |
print 1. / i, k([5, 5, 5]), k.errors() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment