Created
April 14, 2015 08:48
-
-
Save jampekka/abb3cad5939d75a49467 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import numpy | |
import math | |
import itertools | |
import random | |
import matplotlib.pyplot as plt | |
class Som(object): | |
"""Self-organizing map | |
This is for demonstration purposes and probably too slow and buggy for | |
any serious work | |
""" | |
def __init__(self, size, inputLen, initializer, neighFunc, learnFunc): | |
"""Create a SOM | |
size -- size of the grid as two-tuple (width, height) | |
inputLen -- length of the feature vectors | |
initializer -- function for initial node weights | |
neighFunc -- function that returns whether a node is in neighborhood. Takes | |
distance from a node and training time as arguments | |
learnFunc -- returns learning coefficient given the training time | |
""" | |
self.network = self.__generateNetwork(size, inputLen, initializer) | |
self.neighFunc = neighFunc | |
self.learnFunc = learnFunc | |
self.wrapEdges = True | |
self.latticeDistances = {} | |
self.t = 1.0 | |
def __generateNetwork(self, size, inputLen, initializer): | |
""" | |
Generate new SOM network | |
Generates a new 2D network with 'size' dimensions. | |
The nodes are initialized with 'size' uniformly distributed | |
random values from range 'initRange'. The newly created | |
network is returned as a numpy array. | |
""" | |
nodes = [] | |
for i in range(size[0]): | |
row = [] | |
for j in range(size[1]): | |
node = initializer(inputLen, (i, j)) | |
node = np.array(node) | |
row.append(node) | |
nodes.append(row) | |
nodes = np.array(nodes) | |
return nodes | |
def getWinningNode(self, vector): | |
"""Get node whose feature value is closest to `vector` | |
vector -- a feature vector | |
""" | |
# Calculate differences of each node in the network | |
# and the 'vector'. | |
diffs = np.sqrt((self.network - vector)**2).sum(axis=2) | |
# The diffs are in a flat array, so convert the index | |
# to the 2D shape of the network | |
winner = np.unravel_index(diffs.argmin(), diffs.shape) | |
return winner | |
def teach(self, data): | |
"""Teach the network with data | |
Updates the network's nodes according to "the SOM algorithm". Ie | |
finds the closest node to the given data vector, finds nodes that | |
belong to its neighborhood and updates their values by | |
node_value = node_value + | |
(training_value - node_value)*learnFunc(t)*neighborhoodFunc(winner, t) | |
data -- a feature vector | |
Returns the neighborhood that was updated as a 2D-table with neighborhood function | |
values of the corresponding nodes. Mainly used for debugging. | |
""" | |
# Get node closest to the training vector | |
winner = self.getWinningNode(data) | |
return self._teach_winner(data, winner) | |
def _teach_winner(self, data, winner): | |
# Generate an 2D array to keep track of the neigborhood | |
# values. Not really needed, mostly for visualization and | |
# debugging. | |
neighb = np.zeros((self.network.shape[0], self.network.shape[1])) | |
for row in range(self.network.shape[0]): | |
for col in range(self.network.shape[1]): | |
# Update each node according to the update | |
# rule | |
n = self.__updateNode((row, col), winner, data) | |
# Store the neigborhood value | |
neighb[row,col] = n | |
# Increment the time counter | |
self.t += 1.0 | |
return neighb | |
def __updateNode(self, node, winner, value): | |
n = self.neighFunc(self.getLatticeDistance(winner, node), self.t) | |
learnFactor = self.learnFunc(self.t)*n | |
diff = value - self.network[node] | |
self.network[node] += diff*learnFactor | |
return n | |
#def getLatticeDistance(self, a, b): | |
# ind = (a, b) | |
# if ind not in self.latticeDistances: | |
# dist = self._calculateLatticeDistance(*ind) | |
# self.latticeDistances[ind] = dist | |
# self.latticeDistances[(b, a)] = dist | |
# | |
# return self.latticeDistances[ind] | |
def getLatticeDistance(self, a, b): | |
"""Get a distance between node coordinates 'a' and 'b'. | |
If wrapEdges is true, the shortest path may be found by | |
wrapping over the edges | |
""" | |
if(not self.wrapEdges): | |
return math.sqrt((a[0]-b[0])**2 + (a[1] - b[1])**2) | |
# linarg.norm seems to be slow | |
#return np.linalg.norm(np.subtract(a, b)) | |
w = self.network.shape[0] | |
h = self.network.shape[1] | |
row = min([abs(a[0] - b[0]), abs(a[0] - (b[0]-h)), abs(a[0]-h - b[0])]) | |
col = min([abs(a[1] - b[1]), abs(a[1] - (b[1]-w)), abs(a[1]-w - b[1])]) | |
return math.sqrt(row**2+col**2) | |
@staticmethod | |
def Inverse(t, v=1.0): | |
return v/(t+1.0) | |
@staticmethod | |
def TimeStep(value, t, steps): | |
limit = steps/float(t) | |
if(limit < 1.0): limit = 1.0 | |
if(value <= limit): | |
return 1.0 | |
else: | |
return 0.0 | |
@staticmethod | |
def ExpDecay(value, t, steps): | |
v = np.exp(-1.0*t*value**2/steps) | |
return v | |
@staticmethod | |
def Constant(value, t, cutoff): | |
if(value < cutoff): return 1.0 | |
else: return 0.0 | |
class ParameterlessSom(Som): | |
def __init__(self, size, inputLen, initializer): | |
self.winner = None | |
self.max_distance = None | |
self.winner_dist = None | |
neighFunc = self._neighborhood | |
learnFunc = self._learning_coefficient | |
Som.__init__(self, size, inputLen, initializer, | |
neighFunc, learnFunc) | |
self.beta = np.mean(self.network.shape[:-1]) | |
def teach(self, data): | |
self.winner = self.getWinningNode(data) | |
self.winner_dist = np.linalg.norm(np.subtract(data, self.network[self.winner])) | |
self.max_distance = max(self.winner_dist, self.max_distance) | |
self._fit_error = self._get_fit_error() | |
return self._teach_winner(data, self.winner) | |
def _get_fit_error(self): | |
if self.winner_dist == 0.0: | |
return 0.0 | |
return self.winner_dist/self.max_distance | |
def _neighborhood(self, dist, steps): | |
var = self._fit_error*self.beta | |
var = max(var, 0.0001) | |
n = np.exp(-(dist)**2/var**2) | |
return n | |
def _learning_coefficient(self, t): | |
return self._fit_error() | |
def distance_matrix(v): | |
# You'd think numpy would have this | |
n = len(v) | |
m = np.zeros((n, n)) | |
indices = itertools.combinations(range(n), 2) | |
for a, b in indices: | |
if a == b: continue | |
dist = np.linalg.norm(np.subtract(v[a], v[b])) | |
m[a, b] = m[b, a] = dist | |
return m | |
class DatasetDiameter(object): | |
def __init__(self): | |
self.diameter = None | |
self.span_set = [] | |
def update(self, x): | |
self.span_set.append(x) | |
dists = distance_matrix(self.span_set) | |
diam = np.max(dists) | |
if diam < self.diameter: | |
self.span_set.pop() | |
return self.diameter | |
if len(self.span_set) >= len(x): | |
nearest = np.argmin(dists[-1,:-1]) | |
del self.span_set[nearest] | |
self.diameter = diam | |
class ParameterlessSom2(Som): | |
def __init__(self, size, inputLen, initializer): | |
self.diam = DatasetDiameter() | |
neighFunc = self._neighborhood | |
learnFunc = self._learning_coefficient | |
Som.__init__(self, size, inputLen, initializer, | |
neighFunc, learnFunc) | |
self.beta = np.mean(self.network.shape[:2])/3.0 | |
def teach(self, data): | |
self.diam.update(data) | |
self.winner = self.getWinningNode(data) | |
self.winner_dist = np.linalg.norm(np.subtract(data, self.network[self.winner])) | |
self._fit_error = self._get_fit_error() | |
return self._teach_winner(data, self.winner) | |
def _get_fit_error(self): | |
if self.winner_dist == 0.0: | |
return 0.0 | |
return min(self.winner_dist/self.diam.diameter, 1.0) | |
@classmethod | |
def neighborhood(cls, fit_error, dist, beta): | |
var = fit_error | |
var = math.log(1.0 + var*(math.e-1.0)) | |
if var == 0: return 1.0 | |
var *= beta | |
n = math.exp(-(dist)**2/var**2) | |
return n | |
def _neighborhood(self, dist, steps): | |
return self.__class__.neighborhood( | |
self._fit_error, dist, self.beta) | |
def _learning_coefficient(self, t): | |
return self._fit_error*2 | |
class SomPlotter(object): | |
def __init__(self, network): | |
self.p = plt.figure() | |
self.h = network.shape[0] | |
self.w = network.shape[1] | |
def plotRound(self, network, neighb): | |
pass | |
class ColorSomPlotter(SomPlotter): | |
import colorsys | |
def __init__(self, network): | |
SomPlotter.__init__(self, network) | |
def plotRound(self, network, neighb): | |
for row in range(network.shape[0]): | |
for col in range(network.shape[1]): | |
w = list(network[row, col]) | |
#hsv = list(self.colorsys.rgb_to_hsv(*w[1:4])) | |
#hsv[1] = 1.0; hsv[2] = 0.8 | |
#rgb = self.colorsys.hsv_to_rgb(*hsv) | |
rgb = w[0:3] | |
rgb = [max(min(1.0, c), 0.0) for c in rgb] | |
plt.plot([col*len(w)], [row], "s", markersize=15, color=rgb) | |
class LineSomPlotter(SomPlotter): | |
def __init__(self, network): | |
SomPlotter.__init__(self, network) | |
@classmethod | |
def _get_color(cls, neighb, row, col): | |
if neighb is None: | |
color = "blue" | |
elif(neighb[row, col] > 0.99): | |
color = "red" | |
elif(neighb[row, col] < 0.5): | |
color = "black" | |
else: | |
color = "blue" | |
return color | |
def plotRound(self, network, neighb): | |
for row in range(network.shape[0]): | |
for col in range(network.shape[1]): | |
# Subplots seem to be dog slow | |
#plt.subplot(self.h, self.w, row*self.h+col+1) | |
w = list(network[row, col]) | |
if neighb is None: | |
color = "blue" | |
elif(neighb[row, col] > 0.99): | |
color = "red" | |
elif(neighb[row, col] < 0.5): | |
color = "black" | |
else: | |
color = "blue" | |
x = np.linspace(col+0.1, col+0.9, len(w)) | |
y = map(lambda x: x+row, w) | |
print x, y | |
plt.plot(x, y, '.-', color=color, linewidth=2) | |
#plt.axis("off") | |
#plt.ylim(0, 1) | |
class CurveSomPlotter(SomPlotter): | |
def __init__(self, denormalizer, network): | |
self.denorm = denormalizer | |
SomPlotter.__init__(self, network) | |
def plotRound(self, network, neighb): | |
for row, col in itertools.product(*map(range, network.shape[:2])): | |
w = np.array(list(network[row, col])) | |
w = self.denorm(w) | |
x = w[0::2] + col | |
y = (1.0 - w[1::2]) + row | |
color = LineSomPlotter._get_color(neighb, row, col) | |
plt.plot(x[:3], y[:3], color=color) | |
plt.plot(x[3:], y[3:], color=color) | |
class FlowerSomPlotter(SomPlotter): | |
def plotRound(self, network, neighb): | |
for row in range(network.shape[0]): | |
for col in range(network.shape[1]): | |
# Subplots seem to be dog slow | |
#plt.subplot(self.h, self.w, row*self.h+col+1) | |
w = list(network[row, col]) | |
if(neighb[row, col] < 0.5): | |
color = "blue" | |
else: | |
color = "red" | |
#x = range(col*len(w), col*len(w)+len(w)) | |
#y = map(lambda x: x+row, w) | |
xo = row+0.5 | |
yo = col+0.5 | |
plt.plot((xo, xo-w[0]/2.2), (yo, yo+w[0]/2.2), color=color) | |
plt.plot((xo, xo+w[1]/2.2), (yo, yo+w[1]/2.2), color=color) | |
plt.plot((xo, xo+w[2]/2.2), (yo, yo-w[2]/2.2), color=color) | |
plt.plot((xo, xo-w[3]/2.2), (yo, yo-w[3]/2.2), color=color) | |
#plt.plot((xo, xo), (yo-w[0]/3.0, yo+w[0]/3.0), color=color) | |
#plt.plot((xo-w[2]/3.0, xo+w[2]/3.0), (yo-w[2]/3.0, yo+w[2]/3.0), color=color) | |
#plt.plot((xo+w[2]/3.0, xo-w[2]/3.0), (yo-w[2]/3.0, yo+w[2]/3.0), color=color) | |
#plt.scatter((xo, xo), (yo-w[0]/3.0, yo+w[0]/3.0), s=w[1]*40, color=color) | |
#plt.axis("off") | |
plt.xlim(0, network.shape[0]) | |
plt.ylim(0, network.shape[1]) | |
import csv | |
def read_csv_data(fileobj): | |
reader = csv.reader(fileobj) | |
data = [] | |
labels = [] | |
labelmap = {} | |
for row in reader: | |
if(len(row) < 3): continue | |
featlen = len(row)-1 | |
vector = map(lambda x: float(x), row[0:featlen]) | |
data.append(vector) | |
if(row[-1] not in labelmap): | |
labelmap[row[-1]] = len(labelmap) | |
#labels.append(labelmap[row[-1]]) | |
labels.append(row[-1]) | |
#plotter.plotRound(som.network, n) | |
labelreverse = dict((v,k) for k, v in labelmap.iteritems()) | |
#print data | |
data = numpy.array(data) | |
return data, labels, labelreverse | |
def normalize_data(data): | |
normers = np.zeros((data.shape[1], 2)) | |
for i in range(data.shape[1]): | |
col = data[:,i] | |
normers[i][0] = np.mean(data[:,i]) | |
data[:,i] -= normers[i][0] | |
normers[i][1] = np.std(data[:,i]) | |
data[:,i] = numpy.divide(data[:,i], normers[i][1]) | |
def denormalizer(x): | |
x = x * normers[:,1] | |
x += normers[:,0] | |
return x | |
""" | |
for i in range(data.shape[0]): | |
row = data[i,:] | |
row -= numpy.mean(row) | |
std = numpy.std(row) | |
if(std != 0): | |
row = numpy.divide(row, std) | |
row -= min(row) | |
if(numpy.max(row) != 0): | |
row = numpy.divide(row, numpy.max(row)) | |
data[i,:] = row | |
""" | |
return data, denormalizer | |
def demo(): | |
import sys | |
data, labels, labelreverse = read_csv_data(open(sys.argv[1])) | |
w = 20 | |
h = 20 | |
teachrounds = 1000 | |
data, denormalizer = normalize_data(data) | |
mean = np.mean(data) | |
std = np.std(data) | |
init_range = (mean-std, mean+std) | |
def initializer(l, c, mean=mean, std=std): | |
return [random.uniform(mean-std, mean+std) for i in range(l)] | |
som = ParameterlessSom2([w, h], len(data[0]), initializer) | |
plt.ion() | |
plotter = FlowerSomPlotter(som.network) | |
for i in range(teachrounds): | |
r = random.randint(0, data.shape[0]-1) | |
neighb = som.teach(data[r]) | |
if i % 1 == 0: | |
plt.cla() | |
plotter.plotRound(som.network, neighb) | |
plt.draw() | |
plt.pause(0.1) | |
raw_input() | |
if __name__ == '__main__': | |
demo() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment