Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import numpy as np
import numpy
import math
import itertools
import random
import matplotlib.pyplot as plt
class Som(object):
"""Self-organizing map
This is for demonstration purposes and probably too slow and buggy for
any serious work
"""
def __init__(self, size, inputLen, initializer, neighFunc, learnFunc):
"""Create a SOM
size -- size of the grid as two-tuple (width, height)
inputLen -- length of the feature vectors
initializer -- function for initial node weights
neighFunc -- function that returns whether a node is in neighborhood. Takes
distance from a node and training time as arguments
learnFunc -- returns learning coefficient given the training time
"""
self.network = self.__generateNetwork(size, inputLen, initializer)
self.neighFunc = neighFunc
self.learnFunc = learnFunc
self.wrapEdges = True
self.latticeDistances = {}
self.t = 1.0
def __generateNetwork(self, size, inputLen, initializer):
"""
Generate new SOM network
Generates a new 2D network with 'size' dimensions.
The nodes are initialized with 'size' uniformly distributed
random values from range 'initRange'. The newly created
network is returned as a numpy array.
"""
nodes = []
for i in range(size[0]):
row = []
for j in range(size[1]):
node = initializer(inputLen, (i, j))
node = np.array(node)
row.append(node)
nodes.append(row)
nodes = np.array(nodes)
return nodes
def getWinningNode(self, vector):
"""Get node whose feature value is closest to `vector`
vector -- a feature vector
"""
# Calculate differences of each node in the network
# and the 'vector'.
diffs = np.sqrt((self.network - vector)**2).sum(axis=2)
# The diffs are in a flat array, so convert the index
# to the 2D shape of the network
winner = np.unravel_index(diffs.argmin(), diffs.shape)
return winner
def teach(self, data):
"""Teach the network with data
Updates the network's nodes according to "the SOM algorithm". Ie
finds the closest node to the given data vector, finds nodes that
belong to its neighborhood and updates their values by
node_value = node_value +
(training_value - node_value)*learnFunc(t)*neighborhoodFunc(winner, t)
data -- a feature vector
Returns the neighborhood that was updated as a 2D-table with neighborhood function
values of the corresponding nodes. Mainly used for debugging.
"""
# Get node closest to the training vector
winner = self.getWinningNode(data)
return self._teach_winner(data, winner)
def _teach_winner(self, data, winner):
# Generate an 2D array to keep track of the neigborhood
# values. Not really needed, mostly for visualization and
# debugging.
neighb = np.zeros((self.network.shape[0], self.network.shape[1]))
for row in range(self.network.shape[0]):
for col in range(self.network.shape[1]):
# Update each node according to the update
# rule
n = self.__updateNode((row, col), winner, data)
# Store the neigborhood value
neighb[row,col] = n
# Increment the time counter
self.t += 1.0
return neighb
def __updateNode(self, node, winner, value):
n = self.neighFunc(self.getLatticeDistance(winner, node), self.t)
learnFactor = self.learnFunc(self.t)*n
diff = value - self.network[node]
self.network[node] += diff*learnFactor
return n
#def getLatticeDistance(self, a, b):
# ind = (a, b)
# if ind not in self.latticeDistances:
# dist = self._calculateLatticeDistance(*ind)
# self.latticeDistances[ind] = dist
# self.latticeDistances[(b, a)] = dist
#
# return self.latticeDistances[ind]
def getLatticeDistance(self, a, b):
"""Get a distance between node coordinates 'a' and 'b'.
If wrapEdges is true, the shortest path may be found by
wrapping over the edges
"""
if(not self.wrapEdges):
return math.sqrt((a[0]-b[0])**2 + (a[1] - b[1])**2)
# linarg.norm seems to be slow
#return np.linalg.norm(np.subtract(a, b))
w = self.network.shape[0]
h = self.network.shape[1]
row = min([abs(a[0] - b[0]), abs(a[0] - (b[0]-h)), abs(a[0]-h - b[0])])
col = min([abs(a[1] - b[1]), abs(a[1] - (b[1]-w)), abs(a[1]-w - b[1])])
return math.sqrt(row**2+col**2)
@staticmethod
def Inverse(t, v=1.0):
return v/(t+1.0)
@staticmethod
def TimeStep(value, t, steps):
limit = steps/float(t)
if(limit < 1.0): limit = 1.0
if(value <= limit):
return 1.0
else:
return 0.0
@staticmethod
def ExpDecay(value, t, steps):
v = np.exp(-1.0*t*value**2/steps)
return v
@staticmethod
def Constant(value, t, cutoff):
if(value < cutoff): return 1.0
else: return 0.0
class ParameterlessSom(Som):
def __init__(self, size, inputLen, initializer):
self.winner = None
self.max_distance = None
self.winner_dist = None
neighFunc = self._neighborhood
learnFunc = self._learning_coefficient
Som.__init__(self, size, inputLen, initializer,
neighFunc, learnFunc)
self.beta = np.mean(self.network.shape[:-1])
def teach(self, data):
self.winner = self.getWinningNode(data)
self.winner_dist = np.linalg.norm(np.subtract(data, self.network[self.winner]))
self.max_distance = max(self.winner_dist, self.max_distance)
self._fit_error = self._get_fit_error()
return self._teach_winner(data, self.winner)
def _get_fit_error(self):
if self.winner_dist == 0.0:
return 0.0
return self.winner_dist/self.max_distance
def _neighborhood(self, dist, steps):
var = self._fit_error*self.beta
var = max(var, 0.0001)
n = np.exp(-(dist)**2/var**2)
return n
def _learning_coefficient(self, t):
return self._fit_error()
def distance_matrix(v):
# You'd think numpy would have this
n = len(v)
m = np.zeros((n, n))
indices = itertools.combinations(range(n), 2)
for a, b in indices:
if a == b: continue
dist = np.linalg.norm(np.subtract(v[a], v[b]))
m[a, b] = m[b, a] = dist
return m
class DatasetDiameter(object):
def __init__(self):
self.diameter = None
self.span_set = []
def update(self, x):
self.span_set.append(x)
dists = distance_matrix(self.span_set)
diam = np.max(dists)
if diam < self.diameter:
self.span_set.pop()
return self.diameter
if len(self.span_set) >= len(x):
nearest = np.argmin(dists[-1,:-1])
del self.span_set[nearest]
self.diameter = diam
class ParameterlessSom2(Som):
def __init__(self, size, inputLen, initializer):
self.diam = DatasetDiameter()
neighFunc = self._neighborhood
learnFunc = self._learning_coefficient
Som.__init__(self, size, inputLen, initializer,
neighFunc, learnFunc)
self.beta = np.mean(self.network.shape[:2])/3.0
def teach(self, data):
self.diam.update(data)
self.winner = self.getWinningNode(data)
self.winner_dist = np.linalg.norm(np.subtract(data, self.network[self.winner]))
self._fit_error = self._get_fit_error()
return self._teach_winner(data, self.winner)
def _get_fit_error(self):
if self.winner_dist == 0.0:
return 0.0
return min(self.winner_dist/self.diam.diameter, 1.0)
@classmethod
def neighborhood(cls, fit_error, dist, beta):
var = fit_error
var = math.log(1.0 + var*(math.e-1.0))
if var == 0: return 1.0
var *= beta
n = math.exp(-(dist)**2/var**2)
return n
def _neighborhood(self, dist, steps):
return self.__class__.neighborhood(
self._fit_error, dist, self.beta)
def _learning_coefficient(self, t):
return self._fit_error*2
class SomPlotter(object):
def __init__(self, network):
self.p = plt.figure()
self.h = network.shape[0]
self.w = network.shape[1]
def plotRound(self, network, neighb):
pass
class ColorSomPlotter(SomPlotter):
import colorsys
def __init__(self, network):
SomPlotter.__init__(self, network)
def plotRound(self, network, neighb):
for row in range(network.shape[0]):
for col in range(network.shape[1]):
w = list(network[row, col])
#hsv = list(self.colorsys.rgb_to_hsv(*w[1:4]))
#hsv[1] = 1.0; hsv[2] = 0.8
#rgb = self.colorsys.hsv_to_rgb(*hsv)
rgb = w[0:3]
rgb = [max(min(1.0, c), 0.0) for c in rgb]
plt.plot([col*len(w)], [row], "s", markersize=15, color=rgb)
class LineSomPlotter(SomPlotter):
def __init__(self, network):
SomPlotter.__init__(self, network)
@classmethod
def _get_color(cls, neighb, row, col):
if neighb is None:
color = "blue"
elif(neighb[row, col] > 0.99):
color = "red"
elif(neighb[row, col] < 0.5):
color = "black"
else:
color = "blue"
return color
def plotRound(self, network, neighb):
for row in range(network.shape[0]):
for col in range(network.shape[1]):
# Subplots seem to be dog slow
#plt.subplot(self.h, self.w, row*self.h+col+1)
w = list(network[row, col])
if neighb is None:
color = "blue"
elif(neighb[row, col] > 0.99):
color = "red"
elif(neighb[row, col] < 0.5):
color = "black"
else:
color = "blue"
x = np.linspace(col+0.1, col+0.9, len(w))
y = map(lambda x: x+row, w)
print x, y
plt.plot(x, y, '.-', color=color, linewidth=2)
#plt.axis("off")
#plt.ylim(0, 1)
class CurveSomPlotter(SomPlotter):
def __init__(self, denormalizer, network):
self.denorm = denormalizer
SomPlotter.__init__(self, network)
def plotRound(self, network, neighb):
for row, col in itertools.product(*map(range, network.shape[:2])):
w = np.array(list(network[row, col]))
w = self.denorm(w)
x = w[0::2] + col
y = (1.0 - w[1::2]) + row
color = LineSomPlotter._get_color(neighb, row, col)
plt.plot(x[:3], y[:3], color=color)
plt.plot(x[3:], y[3:], color=color)
class FlowerSomPlotter(SomPlotter):
def plotRound(self, network, neighb):
for row in range(network.shape[0]):
for col in range(network.shape[1]):
# Subplots seem to be dog slow
#plt.subplot(self.h, self.w, row*self.h+col+1)
w = list(network[row, col])
if(neighb[row, col] < 0.5):
color = "blue"
else:
color = "red"
#x = range(col*len(w), col*len(w)+len(w))
#y = map(lambda x: x+row, w)
xo = row+0.5
yo = col+0.5
plt.plot((xo, xo-w[0]/2.2), (yo, yo+w[0]/2.2), color=color)
plt.plot((xo, xo+w[1]/2.2), (yo, yo+w[1]/2.2), color=color)
plt.plot((xo, xo+w[2]/2.2), (yo, yo-w[2]/2.2), color=color)
plt.plot((xo, xo-w[3]/2.2), (yo, yo-w[3]/2.2), color=color)
#plt.plot((xo, xo), (yo-w[0]/3.0, yo+w[0]/3.0), color=color)
#plt.plot((xo-w[2]/3.0, xo+w[2]/3.0), (yo-w[2]/3.0, yo+w[2]/3.0), color=color)
#plt.plot((xo+w[2]/3.0, xo-w[2]/3.0), (yo-w[2]/3.0, yo+w[2]/3.0), color=color)
#plt.scatter((xo, xo), (yo-w[0]/3.0, yo+w[0]/3.0), s=w[1]*40, color=color)
#plt.axis("off")
plt.xlim(0, network.shape[0])
plt.ylim(0, network.shape[1])
import csv
def read_csv_data(fileobj):
reader = csv.reader(fileobj)
data = []
labels = []
labelmap = {}
for row in reader:
if(len(row) < 3): continue
featlen = len(row)-1
vector = map(lambda x: float(x), row[0:featlen])
data.append(vector)
if(row[-1] not in labelmap):
labelmap[row[-1]] = len(labelmap)
#labels.append(labelmap[row[-1]])
labels.append(row[-1])
#plotter.plotRound(som.network, n)
labelreverse = dict((v,k) for k, v in labelmap.iteritems())
#print data
data = numpy.array(data)
return data, labels, labelreverse
def normalize_data(data):
normers = np.zeros((data.shape[1], 2))
for i in range(data.shape[1]):
col = data[:,i]
normers[i][0] = np.mean(data[:,i])
data[:,i] -= normers[i][0]
normers[i][1] = np.std(data[:,i])
data[:,i] = numpy.divide(data[:,i], normers[i][1])
def denormalizer(x):
x = x * normers[:,1]
x += normers[:,0]
return x
"""
for i in range(data.shape[0]):
row = data[i,:]
row -= numpy.mean(row)
std = numpy.std(row)
if(std != 0):
row = numpy.divide(row, std)
row -= min(row)
if(numpy.max(row) != 0):
row = numpy.divide(row, numpy.max(row))
data[i,:] = row
"""
return data, denormalizer
def demo():
import sys
data, labels, labelreverse = read_csv_data(open(sys.argv[1]))
w = 20
h = 20
teachrounds = 1000
data, denormalizer = normalize_data(data)
mean = np.mean(data)
std = np.std(data)
init_range = (mean-std, mean+std)
def initializer(l, c, mean=mean, std=std):
return [random.uniform(mean-std, mean+std) for i in range(l)]
som = ParameterlessSom2([w, h], len(data[0]), initializer)
plt.ion()
plotter = FlowerSomPlotter(som.network)
for i in range(teachrounds):
r = random.randint(0, data.shape[0]-1)
neighb = som.teach(data[r])
if i % 1 == 0:
plt.cla()
plotter.plotRound(som.network, neighb)
plt.draw()
plt.pause(0.1)
raw_input()
if __name__ == '__main__':
demo()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment