Skip to content

Instantly share code, notes, and snippets.

@egeromin
Last active January 29, 2018 16:49
Show Gist options
  • Save egeromin/b70e6d8356878b35d18a631272677f1c to your computer and use it in GitHub Desktop.
Save egeromin/b70e6d8356878b35d18a631272677f1c to your computer and use it in GitHub Desktop.
Train a fully connected NN to classify MNIST digits. Implemented using numpy only.
"""
Class to define a fully connected neural net. Written using
numpy only.
The number and size of hidden layers is defined by providing
a list to the constructor, for example
```
net = FullyConnectedNet([1024, 450, 10])
```
In this case, `net` represents a 3-layer neural net, with an input
unit of size 1024, a hidden unit of size 450, and an output
unit of size 10.
The following defaults are hardcoded (for now):
- The hidden layers have RELU activation
- The output layer has SOFTMAX activation
- The cost function is the cross-entropy cost
- Weights use He initialisation
- The update optimizer rule is ADAM (but you can choose the parameters)
- Dropout regularization is used (but you can choose the parameter)
Also implements *gradient checking* to check that backprop is
working correctly.
"""
import os
import numpy as np
import matplotlib.pyplot as plt
class FullyConnectedNet:
def __init__(self, layer_spec, dropout_prob=0.25, learning_rate=0.003,
beta1=0.9, beta2=0.999, epsilon=1e-8):
"""Initialise learning hyperparameters"""
self.layer_spec = layer_spec
self.dropout_prob = dropout_prob
self.learning_rate = learning_rate
self.beta1 = beta1
self.beta2 = beta2
self.epsilon = epsilon
def train_init(self):
"""Initialise all the parameters I require for training"""
self.bias_init()
self.weight_init_he()
self.adam_init()
self.A, self.Z, self.D, self.costs = [], [], [], []
self.dW, self.db = [], []
def bias_init(self):
"""Initialise biases to 0"""
self.b = [np.zeros((sz_layer, 1)) for sz_layer in self.layer_spec[1:]]
def weight_init_he(self):
"""Initialise weights using He initialisation"""
np.random.seed(8)
self.W = [np.random.randn(self.layer_spec[i], self.layer_spec[i - 1]) * np.sqrt(2 / self.layer_spec[i - 1])
for i in range(1, len(self.layer_spec))]
def weight_init_random(self):
"""Random initialisation of weights, used for debugging"""
np.random.seed(3)
self.W = [np.random.randn(self.layer_spec[i], self.layer_spec[i - 1]) * 0.01
for i in range(1, len(self.layer_spec))]
def weight_init_zeros(self):
"""Zeros initialisation of weights, used for debugging"""
self.W = [np.zeros((self.layer_spec[i], self.layer_spec[i - 1]))
for i in range(1, len(self.layer_spec))]
# only used for debugging!
def adam_init(self):
"""Initialise adam moving averages"""
self.t = 1
self.vdW = [np.zeros(Wi.shape) for Wi in self.W]
self.vdb = [np.zeros(bi.shape) for bi in self.b]
self.sdW = [np.zeros(Wi.shape) for Wi in self.W]
self.sdb = [np.zeros(bi.shape) for bi in self.b]
def relu(self, x):
return np.maximum(x, 0)
def softmax(self, z):
y = np.exp(z)
y_soft = y / y.sum(axis=0)
return y_soft
def cross_entropy_cost(self, y_soft, labels):
assert (y_soft.shape == labels.shape) # labels need to be 1-hot encoded
m = labels.shape[1] # number of samples
cost = -np.multiply(np.log(y_soft), labels).sum() / m
return cost
def relu_grad(self, x):
return x > 0
def cross_entropy_with_softmax_grad(self, y_soft, labels):
"""Backprops directly to Z's"""
grad = y_soft - labels
return grad
def forward(self, batch, labels=None, dropout=False):
"""
Do a full forward pass over all layers, for a batch of inputs.
Computes and stores the cost in `self.costs`
:param batch: The batch, dimensions: dim_input x num_samples
:param labels: The labels, 1-hot encoded
:param dropout: `True` in order to use dropout
"""
def single_forward(prev_layer, weights, bias, activation_func, with_dropout=False):
"""Given activations of one layer, do a forward pass and
compute the activations of the next"""
Z = np.dot(weights, prev_layer) + bias
A = activation_func(Z)
if with_dropout:
D = np.random.rand(*A.shape) > self.dropout_prob
A = np.multiply(A, D) # apply dropout mask
self.Z.append(Z)
self.A.append(A)
if with_dropout:
self.D.append(D)
self.A.append(batch) # populate initial 'activations'
for Wi, bi in zip(self.W[:-1], self.b[:-1]):
Aprev = self.A[-1]
single_forward(Aprev, Wi, bi, self.relu, with_dropout=dropout)
# compute RELU activation for all units
single_forward(self.A[-1], self.W[-1], self.b[-1], self.softmax)
# compute sigmoid activation for final layer
cost = None
if labels is not None:
cost = self.cross_entropy_cost(self.A[-1], labels)
self.costs.append(cost) # keep track of costs, for debugging and checking
return self.A[-1], cost # return final activations and cost
def backward(self, labels, dropout=False):
"""
Do a full backward pass over all layers.
:param labels: The labels used for forward propagation, one-hot encoded.
"""
self.dW, self.db = [], [] # reset weights
m = labels.shape[1] # number of samples in batch
def single_backward(dA, weights, with_dropout=False, labels=None):
"""Do a backward pass for a single layer.
The case `labels is not None` is used for last layer, instead of dA.
Otherwise, relu_grad is used"""
if with_dropout and dA is not None:
D = self.D.pop()
dA = np.multiply(dA, D) # apply previously created dropout mask to gradient.
if labels is not None:
dZ = self.cross_entropy_with_softmax_grad(self.A.pop(), labels) # gradient for last layer
self.Z.pop() # final 'Z' not required, throw it away
else:
dZ = np.multiply(self.relu_grad(self.Z.pop()), dA)
dW = np.dot(dZ, self.A.pop().T) / m
db = np.sum(dZ, axis=1, keepdims=True) / m
self.dW.insert(0, dW)
self.db.insert(0, db)
return np.dot(weights.T, dZ) # return updated dA
dA_current = single_backward(None, self.W[-1], labels=labels)
# do first backward pass in case 'softmax'
for Wi in reversed(self.W[:-1]):
dA_current = single_backward(dA_current, Wi, with_dropout=dropout)
# do all 'RELU' backward passes
def check_gradient(self, batch, labels, eps=1e-4, check_thresh=1e-6):
"""Method implementing gradient checking:
for each example in the batch, compute dW by backprop
and manually, by computing the partial derivative using
partial approx= (cost(x+eps) - cost(x-eps)) / 2*eps.
Return the maximum difference of gradients"""
self.forward(batch, labels)
self.backward(labels) # computes dW
max_diff = 0.0
for Wi, dWi in zip(self.W, self.dW):
dWi_manual = np.zeros(Wi.shape)
for iter_W in range(len(dWi_manual.flat)):
Wi.flat[iter_W] += eps
_, cost_plus = self.forward(batch, labels)
Wi.flat[iter_W] -= 2*eps
_, cost_minus = self.forward(batch, labels)
Wi.flat[iter_W] += eps
dWi_manual.flat[iter_W] = (cost_plus - cost_minus) / (2*eps)
max_diff = max(max_diff, abs(dWi_manual - dWi).max())
if max_diff > check_thresh:
raise RuntimeError("Net not passing gradient checking, max_diff = {:.4f}. "
"Something in the code is probably wrong!".format(max_diff))
def gradient_update_simple(self):
"""Simple gradient update, for debugging"""
self.W = [Wi - self.learning_rate * dWi
for Wi, dWi in zip(self.W, self.dW)]
self.b = [bi - self.learning_rate * dbi
for bi, dbi in zip(self.b, self.db)]
def gradient_update_adam(self):
"""Use stored gradients dW to perform an ADAM update of
the weights W"""
def v_compute_running_average(vd, grads):
return [self.beta1 * vdi + (1 - self.beta1) * gradi
for vdi, gradi in zip(vd, grads)]
self.vdW = v_compute_running_average(self.vdW, self.dW)
self.vdb = v_compute_running_average(self.vdb, self.db)
def v_compute_correction(vd):
return [vdi / (1 - np.power(self.beta1, self.t))
for vdi in vd]
vdW_corrected = v_compute_correction(self.vdW)
vdb_corrected = v_compute_correction(self.vdb)
def s_compute_running_average(sd, grads):
return [self.beta2 * sdi + (1 - self.beta2) * np.power(gradi, 2)
for sdi, gradi in zip(sd, grads)]
self.sdW = s_compute_running_average(self.sdW, self.dW)
self.sdb = s_compute_running_average(self.sdb, self.db)
def s_compute_correction(sd):
return [sdi / (1 - np.power(self.beta2, self.t))
for sdi in sd]
sdW_corrected = s_compute_correction(self.sdW)
sdb_corrected = s_compute_correction(self.sdb)
def update_weights(weights, v_corrected, s_corrected):
return [Wi - self.learning_rate * v_corr / (np.sqrt(s_corr) + self.epsilon)
for Wi, v_corr, s_corr in zip(weights, v_corrected, s_corrected)]
self.W = update_weights(self.W, vdW_corrected, sdW_corrected)
self.b = update_weights(self.b, vdb_corrected, sdb_corrected)
self.t = self.t + 1
def check_inputs(self, inputs):
"""Check whether inputs have correct format"""
if inputs.shape[0] != self.layer_spec[0]:
raise RuntimeError("Inputs have incorrect shape!"
"Expected: {}"
"Received: {}".format(self.layer_spec[0],
inputs.shape[0]))
def to_one_hot(self, labels):
"""Compute one-hot encoding of labels"""
one_hot_labels = np.zeros((labels.max()+1, labels.shape[0]))
for i, label in enumerate(labels):
one_hot_labels[label, i] = 1
return one_hot_labels
def check_labels(self, labels):
if isinstance(labels, list):
labels = np.array(labels)
if labels.shape[0] == self.layer_spec[-1]:
return labels # assume already one-hot
labels = labels.flatten() # otherwise, assume given class numbers 0 to num_classes - 1
if labels.max() == self.layer_spec[-1] - 1:
return self.to_one_hot(labels)
raise RuntimeError("Training set labels in the wrong format!")
def train(self, train_set, labels, mini_batch_size=100, num_epochs=10, shuffle=False):
"""
Run full training loop
:param train_set: The training set, 1 column per input
:param labels: The training labels, either with class numbers from
0 to max_num_classes-1, or one-hot encoded
:param mini_batch_size: Mini batch size
:param num_epochs: Number of epochs to run
:param shuffle: Shuffle training set at each epoch?
"""
self.check_inputs(train_set)
labels = self.check_labels(labels) # computes one-hot, if required
self.train_init() # initialise arrays needed for training
np.random.seed(38)
def split_into_batches(train_s):
"""Split the training set into batches using
`np.split`. np.split requires the split to result
in equal division, so we have to extract the last batch
manually"""
m = train_s.shape[1] # number of samples
last_batch_start = - (m % mini_batch_size)
num_full_batches = (m + last_batch_start) / mini_batch_size
if last_batch_start == 0:
last_batch_start = m
last_batch = train_s[:, last_batch_start:] # could be empty, if m % mini_batch_size == 0
batches = np.split(train_s[:, :last_batch_start], num_full_batches, axis=1)
if last_batch.shape[1] > 0: # ignore if empty
batches.append(last_batch)
return batches
count = 0
for epoch in range(num_epochs):
if shuffle:
rand_permutation = np.random.permutation(train_set.shape[1])
train_set = train_set[:, rand_permutation]
labels = labels[:, rand_permutation] # shuffle differently for each epoch.
train_batches = split_into_batches(train_set)
labels_batches = split_into_batches(labels)
for mini_batch, mini_batch_labels in zip(train_batches, labels_batches):
self.forward(mini_batch, mini_batch_labels, dropout=True)
self.backward(mini_batch_labels, dropout=True)
self.gradient_update_adam()
assert(not self.A and not self.Z and not self.D)
count += 1
if count % 500 == 0:
print("Current cost: {}".format(self.costs[-1]))
def predict(self, inputs):
"""Predict by returning """
self.check_inputs(inputs)
activations, _ = self.forward(inputs)
return np.argmax(activations, axis=0)
def test(self, test_set, labels):
"""Compute accuracy given a test set"""
self.check_inputs(test_set)
self.check_labels(labels)
if labels.shape[0] == test_set.shape[0]: # then labels are already one-hot encoded, so convert to numeric
labels = np.argmax(labels)
predictions = self.predict(test_set)
accuracy = (labels == predictions).mean()
return accuracy
def download_mnist(mnist_dir):
"""
Download and gunzip the following files to `mnist_dir`:
http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
"""
import requests
import gzip
print("Downloading MNIST data")
training_files = ('train-images-idx3-ubyte', 'train-labels-idx1-ubyte',
't10k-images-idx3-ubyte', 't10k-labels-idx1-ubyte')
url_home = 'http://yann.lecun.com/exdb/mnist/'
for training_file in training_files:
gzipped_file = training_file + ".gz"
print("Downloading and unzipping {}".format(gzipped_file))
url = url_home + gzipped_file
resp = requests.get(url) # download
path_gzipped = os.path.join(mnist_dir, gzipped_file)
with open(path_gzipped, "wb") as fh:
fh.write(resp.content) # save to disk
path_file = os.path.join(mnist_dir, training_file)
with gzip.open(path_gzipped, 'rb') as fh_gzipped:
with open(path_file, 'wb') as fh:
data = fh_gzipped.read() # gunzip
fh.write(data) # saved gunzipped version to disk
def load_mnist(mnist_dir):
"""Method to load MNIST data from filesystem"""
def load_images(path_images):
with open(path_images, 'rb') as fh:
header_buf = fh.read(4 * 4) # first 4 integers are a header
header = np.frombuffer(header_buf, dtype='>i4') # read big endian integers ('>i4')
assert (header[0] == 2051) # check that the magic number is correct
assert (header[1] == 60000 or header[1] == 10000) # set should have either 60000 or 10000 images (train & test, respectively)
assert (header[2] == 28) # check image width is 28
assert (header[3] == 28) # check image height is 28
images_buf = fh.read()
images = np.frombuffer(images_buf, dtype=np.uint8).reshape((-1, 28*28)).T
images_normalized = images / 255.
return images_normalized
def load_labels(path_labels):
with open(path_labels, 'rb') as fh:
header_buf = fh.read(2 * 4) # first 2 integers are a header
header = np.frombuffer(header_buf, dtype='>i4')
assert (header[0] == 2049) # check that the magic number is correct
assert (header[1] == 60000 or header[1] == 10000)
labels_buf = fh.read()
labels = np.frombuffer(labels_buf, dtype=np.uint8)
return labels
train_images = load_images(os.path.join(mnist_dir, "train-images-idx3-ubyte"))
train_labels = load_labels(os.path.join(mnist_dir, "train-labels-idx1-ubyte"))
test_images = load_images(os.path.join(mnist_dir, "t10k-images-idx3-ubyte"))
test_labels = load_labels(os.path.join(mnist_dir, "t10k-labels-idx1-ubyte"))
return train_images, train_labels, test_images, test_labels
def show_images(images, labels):
"""Plot the training data and its labels to check everything is OK"""
plt.title("Showing some training images and their labels")
count = 1
for image_ind in [34, 2302, 34598]:
image = images[:, image_ind].reshape(28, 28)
ax = plt.subplot(130 + count)
ax.set_title("Should be {}".format(labels[image_ind]))
plt.imshow(image)
count += 1
plt.suptitle("""Showing some training images and their labels
Check whether the labels are accurate! If not, something's wrong!
Close the window to start training.""")
plt.show()
def check_with_small_net():
"""Creates a small net with random inputs
to check backprop using gradient checking"""
net = FullyConnectedNet([100, 25, 10, 2])
net.train_init()
inputs = np.random.rand(100, 1)
labels = np.array([1, 0]).reshape(2, 1)
net.check_gradient(inputs, labels)
def main():
check_with_small_net() # check that backprop is implemented correctly
import argparse
parser = argparse.ArgumentParser(description="""
Train a fully connected NN to classify MNIST digits.
Example usage:
python fully_connected_net.py --mnist_dir mnist_data --layer_spec 784 200 10
This trains the mnist data in `mnist_data` using a fully connected net with
input layer of size 784, hidden layer of size 200, and output layer of size 10
If the data is not already in `mnist_data`, it will be downloaded
""")
parser.add_argument("--mnist_dir", help="Directory to download mnist data. "
"If it doesn't exist, the data will be downloaded",
default='mnist-data')
parser.add_argument("--layer_spec", help="""The structure of the net, a sequence of integers.
The first element MUST be 784 and the last element MUST be 10""", type=int, nargs='+')
parser.add_argument("--num_epochs", help="Number of epochs to train", type=int, default=10)
args = parser.parse_args()
MNIST_INPUT = 784
MNIST_OUTPUT = 10
layer_spec = [MNIST_INPUT, 1000, 100, 30, MNIST_OUTPUT] # 4-layer NN
if args.layer_spec:
layer_spec = args.layer_spec
if layer_spec[0] != MNIST_INPUT:
raise RuntimeError("Input layer must have dimension {}".format(MNIST_INPUT))
if layer_spec[-1] != MNIST_OUTPUT:
raise RuntimeError("Output layer must have dimension {}".format(MNIST_OUTPUT))
print("Layer spec: {}".format(layer_spec))
net = FullyConnectedNet(layer_spec)
print("MNIST directory: {}".format(args.mnist_dir))
if not os.path.isdir(args.mnist_dir):
os.mkdir(args.mnist_dir)
download_mnist(args.mnist_dir)
train_images, train_labels, test_images, test_labels = load_mnist(args.mnist_dir)
show_images(train_images, train_labels)
# train and plot decreasing cost
print("About to train {} epochs".format(args.num_epochs))
net.train(train_images, train_labels, shuffle=True, num_epochs=args.num_epochs)
print("Plotting cost per iteration")
plt.plot(np.array(net.costs))
plt.ylabel('Cross-entropy cost')
plt.xlabel('Iteration number')
plt.title("Cost per iteration")
plt.show()
train_accuracy = net.test(train_images, train_labels)
print("Train accuracy: {:.2f}%".format(100*train_accuracy))
test_accuracy = net.test(test_images, test_labels)
print("Test accuracy: {:.2f}%".format(100*test_accuracy))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment