Implementing Kohonen Self Organizing Maps using numpy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Script to implement simple self organizing map | |
@author: Riley Smith | |
Created: 1-27-21 | |
""" | |
""" | |
@modified: Md Mahedi Hasan (11/28/2021) | |
For Homework Assingment 11 (CpE 520 | |
""" | |
import numpy as np | |
import matplotlib.pyplot as plt | |
from torchvision import transforms | |
import torchvision | |
import os | |
data_dir = "./data" | |
saved_image_dir = "./images" | |
num_epochs = 10 | |
class SOM(): | |
def __init__(self, m=3, n=3, dim=3, lr=1, sigma=1): | |
# Initialize descriptive features of SOM | |
# m = verticle; n = horizonal dim = input size; sigma = weight update | |
self.m = m | |
self.n = n | |
self.dim = dim | |
self.shape = (m, n) | |
self.initial_lr = lr | |
self.lr = lr | |
self.sigma = sigma | |
self.half_size = 16 | |
# Initialize weights | |
rng = np.random.default_rng(seed=61) | |
self.weights = rng.normal(size=(m * n, dim)) | |
self._locations = self._get_locations(m, n) | |
# Set after fitting | |
self._trained = False | |
def _get_locations(self, m, n): | |
return np.argwhere(np.ones(shape=(m, n))).astype(np.int64) | |
def _find_bmu(self, x): | |
# Stack x to have one row per weight | |
x_stack = np.stack([x]*(self.m*self.n), axis=0) | |
# Calculate distance between x and each weight | |
distance = np.linalg.norm(x_stack - self.weights, axis=1) | |
# Find index of best matching unit | |
return np.argmin(distance) | |
def step(self, x, ti, gi): | |
decay_iter = int(ti / 16) #16 --> decay requried to make window size 0 | |
# Stack x to have one row per weight | |
x_stack = np.stack([x]*(self.m*self.n), axis=0) | |
# Get index of best matching unit | |
bmu_index = self._find_bmu(x) | |
# Find location of best matching unit | |
bmu_location = self._locations[bmu_index,:] | |
#calculate neighborhood size | |
start_index = bmu_location[1] - self.half_size | |
if start_index < 0: start_index = 0 | |
end_index = bmu_location[1] + self.half_size | |
if end_index > 40: end_index = 40 | |
# Find square distance from each weight to the BMU | |
stacked_bmu = np.stack([bmu_location]*(self.m*self.n), axis=0) | |
bmu_distance = np.sum(np.power(self._locations.astype(np.float64) - | |
stacked_bmu.astype(np.float64), 2), axis=1) | |
# Compute update neighborhood | |
neighborhood = np.exp((bmu_distance / 2*(self.sigma ** 2)) * -1) | |
#shrink the neighborhood window size | |
if (gi != 0 and gi % decay_iter ==0): | |
self.half_size = self.half_size - 1 | |
print("shrinking the window size: ", self.half_size) | |
neighborhood[:start_index] = 0 | |
neighborhood[end_index+1:40] = 0 | |
local_step = self.lr * neighborhood | |
# Stack local step to be proper shape for update | |
local_multiplier = np.stack([local_step]*(self.dim), axis=1) | |
# Multiply by difference between input and weights | |
delta = local_multiplier * (x_stack - self.weights) | |
# Update weights | |
self.weights += delta | |
def fit(self, X, epochs=1, shuffle=True): | |
""" | |
Take data (a tensor of type float64) as input and fit the SOM to that | |
data for the specified number of epochs. | |
Parameters | |
---------- | |
X : ndarray (n, self.dim) | |
""" | |
# Count total number of iterations | |
global_iter_counter = 0 | |
n_samples = X.shape[0] | |
total_iterations = epochs * 60000 | |
for epoch in range(epochs): | |
print("################### Epoch ################# ", epoch) | |
if shuffle: | |
rng = np.random.default_rng(seed=61) | |
indices = rng.permutation(n_samples) | |
else: | |
indices = np.arange(n_samples) | |
# Train | |
for idx in indices: | |
global_iter_counter += 1 | |
input = X[idx] | |
# Do one step of training | |
self.step(input, total_iterations, global_iter_counter) | |
# Update learning rate | |
#self.lr = np.exp((global_iter_counter / total_iterations)*-1) * self.initial_lr | |
self.lr = (1 - (global_iter_counter / total_iterations)) * self.initial_lr | |
# Set trained flag | |
self._trained = True | |
return | |
def transform(self, X): | |
# Stack data and cluster centers | |
X_stack = np.stack([X]*(self.m*self.n), axis=1) | |
cluster_stack = np.stack([self.weights]*X.shape[0], axis=0) | |
# Compute difference | |
diff = X_stack - cluster_stack | |
# Take and return norm | |
return np.linalg.norm(diff, axis=2) | |
@property | |
def cluster_centers_(self): | |
return self.weights.reshape(self.m, self.n, self.dim) | |
def Train(): | |
mean = (0.1307, ) | |
std = (0.3081, ) | |
train_trans = transforms.Compose([ | |
transforms.RandomRotation((0, 5), fill=(0, )), | |
]) | |
dataset = torchvision.datasets.MNIST( | |
root=data_dir, | |
train=True, | |
download=True) | |
images = [] | |
for img, _ in dataset: | |
img = (np.asarray(train_trans(img)) - mean) / std | |
images.append(img) | |
images = np.array(images) | |
images = images.reshape(images.shape[0], 784) | |
print("imput shape: ", images.shape) | |
model = SOM(m=1, n=40, dim=784) | |
print("fitting model") | |
model.fit(images, epochs=num_epochs) | |
centers = model.cluster_centers_.squeeze(axis=0) | |
print("saving centers") | |
iter = 0 | |
for center in centers: | |
iter += 1 | |
plt.imshow(center.reshape(28, 28), cmap="gray") | |
plt.savefig(os.path.join(saved_image_dir, str(iter) + ".png")) | |
if __name__ == "__main__": | |
t = Train() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment