Skip to content

Instantly share code, notes, and snippets.

@NMZivkovic
Last active October 10, 2019 18:26
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save NMZivkovic/3e5a5623de009103febb3a6bef61b140 to your computer and use it in GitHub Desktop.
Save NMZivkovic/3e5a5623de009103febb3a6bef61b140 to your computer and use it in GitHub Desktop.
import tensorflow as tf
import numpy as np
class SOM(object):
def __init__(self, x, y, input_dim, learning_rate, radius, num_iter=111):
#Initialize properties
self._x = x
self._y = y
self._learning_rate = float(learning_rate)
self._radius = float(radius)
self._num_iter = num_iter
self._graph = tf.Graph()
#Initialize graph
with self._graph.as_default():
#Initializing variables and placeholders
self._weights = tf.Variable(tf.random_normal([x*y, input_dim]))
self._locations = self._generate_index_matrix(x, y)
self._input = tf.placeholder("float", [input_dim])
self._iter_input = tf.placeholder("float")
#Calculating BMU
input_matix = tf.stack([self._input for i in range(x*y)])
distances = tf.sqrt(tf.reduce_sum(tf.pow(tf.subtract(self._weights, input_matix), 2), 1))
bmu = tf.argmin(distances, 0)
#Get BMU location
mask = tf.pad(tf.reshape(bmu, [1]), np.array([[0, 1]]))
size = tf.cast(tf.constant(np.array([1, 2])), dtype=tf.int64)
bmu_location = tf.reshape(tf.slice(self._locations, mask, size), [2])
#Calculate learning rate and radius
decay_function = tf.subtract(1.0, tf.div(self._iter_input, self._num_iter))
_current_learning_rate = tf.multiply(self._learning_rate, decay_function)
_current_radius = tf.multiply(self._radius, decay_function)
#Adapt learning rate to each neuron based on position
bmu_matrix = tf.stack([bmu_location for i in range(x*y)])
bmu_distance = tf.reduce_sum(tf.pow(tf.subtract(self._locations, bmu_matrix), 2), 1)
neighbourhood_func = tf.exp(tf.negative(tf.div(tf.cast(bmu_distance, "float32"), tf.pow(_current_radius, 2))))
learning_rate_matrix = tf.multiply(_current_learning_rate, neighbourhood_func)
#Update all the weights
multiplytiplier = tf.stack([tf.tile(tf.slice(
learning_rate_matrix, np.array([i]), np.array([1])), [input_dim])
for i in range(x*y)])
delta = tf.multiply(
multiplytiplier,
tf.subtract(tf.stack([self._input for i in range(x*y)]), self._weights))
new_weights = tf.add(self._weights, delta)
self._training = tf.assign(self._weights, new_weights)
#Initilize session and run it
self._sess = tf.Session()
initialization = tf.global_variables_initializer()
self._sess.run(initialization)
def train(self, input_vects):
for iter_no in range(self._num_iter):
for input_vect in input_vects:
self._sess.run(self._training,
feed_dict={self._input: input_vect,
self._iter_input: iter_no})
self._centroid_matrix = [[] for i in range(self._x)]
self._weights_list = list(self._sess.run(self._weights))
self._locations = list(self._sess.run(self._locations))
for i, loc in enumerate(self._locations):
self._centroid_matrix[loc[0]].append(self._weights_list[i])
def map_input(self, input_vectors):
return_value = []
for vect in input_vectors:
min_index = min([i for i in range(len(self._weights_list))],
key=lambda x: np.linalg.norm(vect - self._weights_list[x]))
return_value.append(self._locations[min_index])
return return_value
def _generate_index_matrix(self, x,y):
return tf.constant(np.array(list(self._iterator(x, y))))
def _iterator(self, x, y):
for i in range(x):
for j in range(y):
yield np.array([i, j])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment