Skip to content

Instantly share code, notes, and snippets.

@Mahedi-61
Created November 29, 2021 01:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save Mahedi-61/a60a06502f9a070be94440349fd5a0dd to your computer and use it in GitHub Desktop.
Save Mahedi-61/a60a06502f9a070be94440349fd5a0dd to your computer and use it in GitHub Desktop.
Implementing Kohonen Self Organizing Maps using numpy
"""
Script to implement simple self organizing map
@author: Riley Smith
Created: 1-27-21
"""
"""
@modified: Md Mahedi Hasan (11/28/2021)
For Homework Assingment 11 (CpE 520
"""
import numpy as np
import matplotlib.pyplot as plt
from torchvision import transforms
import torchvision
import os
data_dir = "./data"
saved_image_dir = "./images"
num_epochs = 10
class SOM():
def __init__(self, m=3, n=3, dim=3, lr=1, sigma=1):
# Initialize descriptive features of SOM
# m = verticle; n = horizonal dim = input size; sigma = weight update
self.m = m
self.n = n
self.dim = dim
self.shape = (m, n)
self.initial_lr = lr
self.lr = lr
self.sigma = sigma
self.half_size = 16
# Initialize weights
rng = np.random.default_rng(seed=61)
self.weights = rng.normal(size=(m * n, dim))
self._locations = self._get_locations(m, n)
# Set after fitting
self._trained = False
def _get_locations(self, m, n):
return np.argwhere(np.ones(shape=(m, n))).astype(np.int64)
def _find_bmu(self, x):
# Stack x to have one row per weight
x_stack = np.stack([x]*(self.m*self.n), axis=0)
# Calculate distance between x and each weight
distance = np.linalg.norm(x_stack - self.weights, axis=1)
# Find index of best matching unit
return np.argmin(distance)
def step(self, x, ti, gi):
decay_iter = int(ti / 16) #16 --> decay requried to make window size 0
# Stack x to have one row per weight
x_stack = np.stack([x]*(self.m*self.n), axis=0)
# Get index of best matching unit
bmu_index = self._find_bmu(x)
# Find location of best matching unit
bmu_location = self._locations[bmu_index,:]
#calculate neighborhood size
start_index = bmu_location[1] - self.half_size
if start_index < 0: start_index = 0
end_index = bmu_location[1] + self.half_size
if end_index > 40: end_index = 40
# Find square distance from each weight to the BMU
stacked_bmu = np.stack([bmu_location]*(self.m*self.n), axis=0)
bmu_distance = np.sum(np.power(self._locations.astype(np.float64) -
stacked_bmu.astype(np.float64), 2), axis=1)
# Compute update neighborhood
neighborhood = np.exp((bmu_distance / 2*(self.sigma ** 2)) * -1)
#shrink the neighborhood window size
if (gi != 0 and gi % decay_iter ==0):
self.half_size = self.half_size - 1
print("shrinking the window size: ", self.half_size)
neighborhood[:start_index] = 0
neighborhood[end_index+1:40] = 0
local_step = self.lr * neighborhood
# Stack local step to be proper shape for update
local_multiplier = np.stack([local_step]*(self.dim), axis=1)
# Multiply by difference between input and weights
delta = local_multiplier * (x_stack - self.weights)
# Update weights
self.weights += delta
def fit(self, X, epochs=1, shuffle=True):
"""
Take data (a tensor of type float64) as input and fit the SOM to that
data for the specified number of epochs.
Parameters
----------
X : ndarray (n, self.dim)
"""
# Count total number of iterations
global_iter_counter = 0
n_samples = X.shape[0]
total_iterations = epochs * 60000
for epoch in range(epochs):
print("################### Epoch ################# ", epoch)
if shuffle:
rng = np.random.default_rng(seed=61)
indices = rng.permutation(n_samples)
else:
indices = np.arange(n_samples)
# Train
for idx in indices:
global_iter_counter += 1
input = X[idx]
# Do one step of training
self.step(input, total_iterations, global_iter_counter)
# Update learning rate
#self.lr = np.exp((global_iter_counter / total_iterations)*-1) * self.initial_lr
self.lr = (1 - (global_iter_counter / total_iterations)) * self.initial_lr
# Set trained flag
self._trained = True
return
def transform(self, X):
# Stack data and cluster centers
X_stack = np.stack([X]*(self.m*self.n), axis=1)
cluster_stack = np.stack([self.weights]*X.shape[0], axis=0)
# Compute difference
diff = X_stack - cluster_stack
# Take and return norm
return np.linalg.norm(diff, axis=2)
@property
def cluster_centers_(self):
return self.weights.reshape(self.m, self.n, self.dim)
def Train():
mean = (0.1307, )
std = (0.3081, )
train_trans = transforms.Compose([
transforms.RandomRotation((0, 5), fill=(0, )),
])
dataset = torchvision.datasets.MNIST(
root=data_dir,
train=True,
download=True)
images = []
for img, _ in dataset:
img = (np.asarray(train_trans(img)) - mean) / std
images.append(img)
images = np.array(images)
images = images.reshape(images.shape[0], 784)
print("imput shape: ", images.shape)
model = SOM(m=1, n=40, dim=784)
print("fitting model")
model.fit(images, epochs=num_epochs)
centers = model.cluster_centers_.squeeze(axis=0)
print("saving centers")
iter = 0
for center in centers:
iter += 1
plt.imshow(center.reshape(28, 28), cmap="gray")
plt.savefig(os.path.join(saved_image_dir, str(iter) + ".png"))
if __name__ == "__main__":
t = Train()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment