Last active
April 24, 2017 16:26
-
-
Save ethen8181/1334689f0446f66788df826add5fedf4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import numpy as np | |
from tqdm import trange | |
from scipy.special import expit | |
import matplotlib.pyplot as plt | |
from sklearn.base import BaseEstimator | |
from sklearn.metrics import accuracy_score | |
class NeuralNet(BaseEstimator): | |
""" | |
Neural Network for classification | |
Parameters | |
---------- | |
learning_rate : float | |
learning rate for gradient descent | |
hidden_dims : list of int | |
number of units in the hidden layer, e.g. [30], one hidden layer | |
with 30 units; [50, 50], two hidden layer with 50 units each | |
n_iters : int | |
number of iterations to run the algorithm, a.k.a. epochs | |
activation : str, 'relu' or 'tanh' | |
activation function after the fully connected layer | |
seed : int | |
seed for the randomly initialized weights | |
reg : float | |
regularization for the weights | |
""" | |
def __init__(self, learning_rate, hidden_dims, n_iters, | |
activation, reg, seed, filename): | |
self.reg = reg | |
self.seed = seed | |
self.n_iters = n_iters | |
self.activation = activation | |
self.hidden_dims = hidden_dims | |
self.learning_rate = learning_rate | |
self.filename = filename # for homework | |
def fit(self, X, y): | |
""" | |
Parameters | |
---------- | |
X : 2d numpy array, shape = [n_samples, n_features] | |
The training input samples | |
y : 1d numpy array, shape = [n_samples] | |
the target values, a.k.a class labels in classification | |
""" | |
N, n_features = X.shape | |
n_classes = np.unique(y).shape[0] | |
# initialize random weights, we need to learn these | |
self.biases = [] | |
self.weights = [] | |
dims = [n_features] + self.hidden_dims + [n_classes] | |
rstate = np.random.RandomState(self.seed) | |
for d in range(0, len(dims) - 1): | |
weight = rstate.randn(dims[d], dims[d + 1]) | |
bias = np.zeros((1, dims[d + 1])) | |
self.weights.append(weight) | |
self.biases.append(bias) | |
# needed for homework | |
if not os.path.isdir('train'): | |
os.mkdir('train') | |
self.ma = [] # mean activation for homework | |
self.hw1 = [] # hidden weights | |
self.hw2 = [] | |
self.hw3 = [] | |
# iterate between forward and backpropagation steps to | |
# train the neural network and store the loss and accuracy history | |
self.losses = [] | |
self.accuracies = [] | |
for i in trange(self.n_iters): | |
proba, caches = self._forward_pass(X) | |
self._backward_pass(proba, caches, y) | |
loss = softmax_loss(proba, y, self.weights, self.reg) | |
self.losses.append(loss) | |
y_pred = np.argmax(proba, axis = 1) | |
accuracy = np.sum(y_pred == y) / N | |
self.accuracies.append(accuracy * 100) | |
if i % 10 == 0: | |
self._plot_info(X, y, i) | |
return self | |
def _forward_pass(self, X): | |
""" | |
feed forward: | |
given the input data, output the softmax probability | |
and a cache list that contains the information needed | |
to do the backpropagation | |
""" | |
# store the weight after applying activation to do the homework | |
self._forward = [] | |
f, f_cache = feed_forward(X, self.weights[0], self.biases[0]) | |
caches = [f_cache] | |
for weight, bias in zip(self.weights[1:], self.biases[1:]): | |
activation_forward = ACTIVATION[self.activation]['forward'] | |
a, a_cache = activation_forward(f) | |
f, f_cache = feed_forward(a, weight, bias) | |
caches.append(a_cache) | |
caches.append(f_cache) | |
self._forward.append(a) | |
proba = softmax_forward(f) | |
self._forward.append(proba) | |
return proba, caches | |
def _backward_pass(self, proba, caches, y): | |
"""backpropagation that updates the weights""" | |
dout = softmax_backward(proba, y) | |
cache = caches.pop() | |
dx, dw, db = feed_backward(dout, cache) | |
# store the derivatives of the weight and bias | |
# along the way to do the update at the end | |
dbiases = [db] | |
dweights = [dw] | |
for _ in range(len(caches) // 2): | |
cache = caches.pop() | |
activation_backward = ACTIVATION[self.activation]['backward'] | |
# da = tanh_backward(dx, cache) | |
da = activation_backward(dx, cache) | |
cache = caches.pop() | |
dx, dw, db = feed_backward(da, cache) | |
dbiases.append(db) | |
dweights.append(dw) | |
# regularization | |
dweights = [dw + self.reg * dw for dw in dweights] | |
# visualize gradient to do the homework | |
self._dweights = dweights | |
# update the weights using standard gradient descent, | |
# note that the first element of the dweight corresponds | |
# to the last element of weights | |
w_len = len(self.weights) - 1 | |
for i in range(w_len): | |
self.weights[w_len - i] -= self.learning_rate * dweights[i] | |
self.biases[w_len - i] -= self.learning_rate * dbiases[i] | |
return self | |
def _plot_info(self, X, y, i): | |
""" | |
Plot for the homework | |
select a random sample from the dataset, | |
visualize the image its corresponding prediction and it's | |
confidence of the prediction, i.e. predicted probability; | |
also visualize the stored loss and accuracy up to the | |
current iteration; the utility function will also store the | |
visualization to disk, change figname to None to not have | |
this behavior | |
""" | |
fig, ax = plt.subplots(1, 3, figsize = (12, 3)) | |
# evaluate overall accuracy | |
y_pred = self.predict(X) | |
accuracy = accuracy_score(y, y_pred) | |
# randomly choose a misclassified image | |
mistake = np.where(y_pred != y)[0] | |
index = np.random.choice(mistake) | |
# reshape the image to a square | |
size = np.sqrt(X.shape[1]).astype(np.int) | |
img = X[index].reshape(size, size) | |
ax[0].imshow(img, cmap = 'gray') | |
# prediction for the randomly chosen image | |
title = "\nPrediction: %d confidence=%0.2f" % (y_pred[index], accuracy) | |
ax[0].set_title(title) | |
ax[0].set_xticks([]) | |
ax[0].set_yticks([]) | |
ax[1].plot(self.losses, color = 'blue') | |
ax[1].set_title('Loss') | |
ax[1].set_yscale('log') | |
# aim for 90% accuracy | |
ax[2].plot(self.accuracies, color = 'blue') | |
ax[2].axhline(90, color = 'red', linestyle = ':') | |
ax[2].set_title('Accuracy: %0.2f%%' % self.accuracies[-1]) | |
# modify the figure size to add a little height, | |
# this prevents some text to be chopped off | |
size = fig.get_size_inches() | |
fig.set_size_inches(size[0], size[1] + 1) | |
save_path = os.path.join('train', 'loss' + self.filename + '-%05d.png' % i) | |
fig.savefig(save_path) | |
plt.close() | |
# hard-coded plot for assignment | |
fig, ax = plt.subplots(3, 4, figsize = (10, 10)) | |
# e.g. for the mean, e.g. ideal logsitic should be 0.5 | |
L1 = self._forward[0] | |
avg_forward = L1.mean(axis = 0).reshape(16, 16) | |
ax[0, 0].imshow(avg_forward, cmap = 'gray', interpolation = 'none') | |
ax[0, 0].set_title('L1 $\mu$=%0.2f $\sigma$=%0.2f' % (L1.mean(), L1.std())) | |
L2 = self._forward[1] | |
avg_forward = L2.mean(axis = 0).reshape(16, 8) | |
ax[0, 1].imshow(avg_forward, cmap = 'gray', interpolation = 'none') | |
ax[0, 1].set_title('L2 $\mu$=%0.2f $\sigma$=%0.2f' % (L2.mean(), L2.std())) | |
L3 = self._forward[2] | |
avg_forward = L3.mean(axis = 0).reshape(10, 1) | |
ax[0, 2].imshow(avg_forward, cmap = 'gray', interpolation = 'none') | |
ax[0, 2].set_title('L3 $\mu$=%0.2f $\sigma$=%0.2f' % (L3.mean(), L3.std())) | |
ax[0, 2].set_xticks([]) | |
activations = np.concatenate((L1.flatten(), L2.flatten(), L3.flatten())) | |
self.ma.append(activations.mean()) | |
ax[0, 3].plot(self.ma, color = 'blue') | |
title = "Activations $\mu$: %0.2f $\sigma$=%0.2f" % (self.ma[-1], activations.std()) | |
ax[0, 3].set_title(title) | |
weight = self.weights[0] | |
avg_weight = weight.mean(axis = 1).reshape(28, 28) | |
ax[1, 0].imshow(avg_weight, cmap = 'gray', interpolation = 'none') | |
ax[1, 0].set_title('W1 $\mu$=%0.2f $\sigma$=%0.2f' % (weight.mean(), weight.std())) | |
weight = self.weights[1] | |
avg_weight = weight.mean(axis = 1).reshape(16, 16) | |
ax[1, 1].imshow(avg_weight, cmap = 'gray', interpolation = 'none') | |
ax[1, 1].set_title('W1 $\mu$=%0.2f $\sigma$=%0.2f' % (weight.mean(), weight.std())) | |
weight = self.weights[2] | |
avg_weight = weight.mean(axis = 1).reshape(16, 8) | |
ax[1, 2].imshow(avg_weight, cmap = 'gray', interpolation = 'none') | |
ax[1, 2].set_title('W1 $\mu$=%0.2f $\sigma$=%0.2f' % (weight.mean(), weight.std())) | |
ax[1, 2].set_xticks([]) | |
ax[1, 3].plot(self.accuracies, color = 'blue') | |
ax[1, 3].set_title("Accuracy: %0.2f%%" % self.accuracies[-1]) | |
ax[1, 3].set_ylim(0, 100) | |
uw1 = self._dweights[2] | |
uw2 = self._dweights[1] | |
uw3 = self._dweights[0] | |
dhw1 = self.learning_rate * np.abs(uw1).mean() | |
dhw2 = self.learning_rate * np.abs(uw2).mean() | |
dhw3 = self.learning_rate * np.abs(uw3).mean() | |
self.hw1.append(dhw1) | |
self.hw2.append(dhw2) | |
self.hw3.append(dhw3) | |
avg_uw = uw1.mean(axis = 1).reshape(28, 28) | |
ax[2, 0].imshow(avg_uw, cmap = 'gray', interpolation = 'none') | |
ax[2, 0].set_title ('$\Delta$W1: %0.2f E-5' % (1e5 * dhw1), color = 'r') | |
avg_uw = uw2.mean(axis = 1).reshape(16, 16) | |
ax[2, 1].imshow(avg_uw, cmap = 'gray', interpolation = 'none') | |
ax[2, 1].set_title('$\Delta$W2: %0.2f E-5' % (1e5 * dhw2), color = 'g') | |
avg_uw = uw3.mean(axis = 1).reshape(16, 8) | |
ax[2, 2].imshow(avg_uw, cmap = 'gray', interpolation = 'none') | |
ax[2, 2].set_title('$\Delta$W3: %0.2f E-5' % (1e5 * dhw3), color = 'b') | |
ax[2, 2].set_xticks([]) | |
ax[2, 3].plot(self.hw1, color = 'r') | |
ax[2, 3].plot(self.hw2, color = 'g') | |
ax[2, 3].plot(self.hw3, color = 'b') | |
ax[2, 3].set_title('Weight update magnitude') | |
ax[2, 3].legend(loc = 'upper right') | |
ax[2, 3].set_yscale('log') | |
suptitle = 'Weight and update visualization ACC:' | |
suptitle += " %0.2f%% LR=%0.8f" % (self.accuracies[-1], self.learning_rate) | |
plt.suptitle(suptitle) | |
save_path = os.path.join('train', 'train' + self.filename + '-%05d.png' % i) | |
plt.savefig(save_path) | |
plt.close() | |
def predict(self, X): | |
proba = self.predict_proba(X) | |
y_pred = np.argmax(proba, axis = 1) | |
return y_pred | |
def predict_proba(self, X): | |
proba, _ = self._forward_pass(X) | |
return proba | |
def softmax_loss(proba, y, weights, reg): | |
"""loss is averaged by the number of samples""" | |
N = y.shape[0] | |
# add an epsilon value to prevent taking log of 0 | |
log_proba = -np.log(proba[range(N), y] + 1e-9) | |
data_loss = np.sum(log_proba) / N | |
# regularization for the weights | |
weights_sum = np.sum([np.sum(w ** 2) for w in weights]) | |
reg_loss = 0.5 * reg * weights_sum | |
loss = data_loss + reg_loss | |
return loss | |
def softmax_forward(x): | |
""" | |
compute the softmax of matrix x in a numerically stable way, | |
by substracting each row with the max of each row | |
""" | |
shift_x = x - np.amax(x, axis = 1, keepdims = 1) | |
exp_x = np.exp(shift_x) | |
proba = exp_x / np.sum(exp_x, axis = 1, keepdims = 1) | |
return proba | |
def softmax_backward(proba, y): | |
N = y.shape[0] | |
dx = proba.copy() | |
dx[range(N), y] -= 1 | |
dx /= N | |
return dx | |
def feed_forward(x, w, b): | |
f = x.dot(w) + b | |
f_cache = x, w | |
return f, f_cache | |
def feed_backward(dout, cache): | |
x, w = cache | |
# gradient of w, can be computed by matrix multiplication | |
# with the dout. Just be careful with the dimensions of the output, | |
# e.g. we know that the gradient on the weights dw must be of the | |
# same shape as the w matrix | |
dx = dout.dot(w.T) | |
dw = x.T.dot(dout) | |
db = np.sum(dout, axis = 0) | |
return dx, dw, db | |
def relu_forward(x): | |
a = np.maximum(0, x) | |
a_cache = x | |
return a, a_cache | |
def relu_backward(dout, cache): | |
dx = np.where(cache > 0, dout, 0) | |
return dx | |
def tanh_forward(x): | |
a = np.tanh(x) | |
a_cache = x | |
return a, a_cache | |
def tanh_backward(dout, cache): | |
dx = 1 - cache ** 2 | |
return dx | |
def sigmoid_forward(x): | |
""" | |
expit is a more numerical stable version of | |
the sigmoid formula, 1 / (1 + np.exp(-x)) | |
""" | |
a = expit(x) | |
a_cache = x | |
return a, a_cache | |
def sigmoid_backward(dout, cache): | |
sigmoid = expit(cache) | |
dx = sigmoid * (1 - sigmoid) | |
return dx | |
ACTIVATION = {} | |
ACTIVATION['relu'] = { | |
'forward': relu_forward, | |
'backward': relu_backward | |
} | |
ACTIVATION['tanh'] = { | |
'forward': tanh_forward, | |
'backward': tanh_backward | |
} | |
ACTIVATION['sigmoid'] = { | |
'forward': sigmoid_forward, | |
'backward': sigmoid_backward | |
} | |
__all__ = [NeuralNet] | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment