Skip to content

Instantly share code, notes, and snippets.

@dcalacci
Last active July 25, 2017 19:55
Show Gist options
  • Save dcalacci/ab430d67cd39750f4ea082a418b6b5e2 to your computer and use it in GitHub Desktop.
Save dcalacci/ab430d67cd39750f4ea082a418b6b5e2 to your computer and use it in GitHub Desktop.
Simple Neural Network in Tensorflow
## Authors: Dan Calacci
## Adapted from Natashe Jaque's NN code for PML '17
## intended to be used with a larger framework for deep learning
import tensorflow as tf
import numpy as np
import math
import json
import matplotlib.pyplot as plt
# local
import data_funcs
class NeuralNet:
def __init__(self, data_file, params):
"""Initialize neural network
:param data_file: string filepath of data file to train, validate, and test on
:param params: either a string filepath of the JSON params file, or a
dictionary of parameters.
:returns: new NeuralNet object
:rtype: NeuralNet
"""
if params is None:
raise ValueError("No params passed to constructor. You must give either a JSON-formatted params\
file or a dictionary on network construction.")
elif type(params) == str:
# load param file
with open(params) as json_params:
self.params = json.load(json_params)
elif type(params) == dict:
self.params = params
else:
raise ValueError("params must be a dict or a filepath string.")
self.data_file = data_file
self.is_classification_p = self.params['model_type'] == 'classification'
# not loaded in param file
self.optimizer = tf.train.AdamOptimizer
# extract data from data_file
self.load_data()
# initialize graph with params from param_file
self.initialize_graph()
def load_data(self):
"""Load data from data_file
:returns: None
:rtype: NoneType
"""
# Extract the data from the filename
self.data_loader = data_funcs.DataLoader(self.data_file)
self.input_size = self.data_loader.get_feature_size()
if self.params['model_type'] == 'classification':
print("\nPerforming classification.")
self.output_size = self.data_loader.num_classes
self.metric_name = 'accuracy'
else:
print("\nPerforming regression.")
self.output_size = self.data_loader.num_outputs
self.metric_name = 'RMSE'
print("Input dimensions (number of features):", self.input_size)
print("Number of classes/outputs:", self.output_size)
def initialize_graph(self):
"""Initialize computation graph, tensorflow session, and metric arrays.
:returns: None
:rtype: NoneType
"""
# Set up tensorflow computation graph.
self.graph = tf.Graph()
self.build_graph()
# Set up and initialize tensorflow session.
self.session = tf.Session(graph=self.graph)
self.session.run(self.init)
# Use for plotting evaluation.
self.train_metrics = []
self.val_metrics = []
##########################################################################
# Initializing Network Weights
##########################################################################
def _layer_input_size(self, layer_idx):
"""Input size for the given layer.
An index -1 is considered the start layer of the network.
:param layer_idx: the layer index to return the input size for
:returns: The input size of layer `layer_idx`
:rtype: int
"""
if layer_idx == -1:
return self.input_size
return self.params['layer_sizes'][layer_idx]
def _layer_output_size(self, layer_idx):
"""Output size for the given layer
Output size of last layer is always the size of the output of the
network.
:param layer_idx: the layer index to return the output size for
:returns: The size of the output of layer at index `layer_idx`
:rtype: int
"""
if layer_idx == len(self.params['layer_sizes']) - 1:
return self.output_size
return self.params['layer_sizes'][layer_idx + 1]
def _initial_weight_and_size(self, layer_idx):
"""Initial weight for the given layer index
The weight returned is a tensorflow variable.
:param layer_idx: the layer to return the weight and size for
:returns: A tuple of (input_size, output_size, weight)
:rtype: tuple
"""
input_size, output_size = (self._layer_input_size(layer_idx),
self._layer_output_size(layer_idx))
return (input_size,
output_size,
self._weight_variable([input_size, output_size],
'weights_{}'.format(str(layer_idx))))
def _initial_bias(self, layer_idx):
"""The initial bias for the layer at the given index.
:param layer_idx: Layer index to return the bias for.
:returns: A tensorflow constant that represents the bias for this layer
:rtype: tf.constant
"""
output_size = self._layer_output_size(layer_idx)
return self._bias_variable([output_size],
'biases_{}'.format(str(layer_idx)))
"""Initializes a tensorflow weight variable with random
values centered around 0.
shape: shape of the weight variable (?)
name: name of the variable
"""
def _weight_variable(self, shape, name):
"""Creates a tensorflow weight variable with the given shape and name.
The weight variable returned has random values, centered around 0.
Shape should be list of [input_size, output_size] for a layer.
:param shape: the shape of the layer to create a weight variable for
:param name: the name to give the variable
:returns: a tensorflow truncated_normal variable
:rtype: tf.Variable
"""
std = 1.0 / math.sqrt(float(shape[0]))
initial = tf.truncated_normal(shape, stddev=std, dtype=tf.float64)
return tf.Variable(initial, name=name)
def _bias_variable(self, shape, name):
"""Initializes a tensorflow bias variable to a small constant value for a given
shape and name.
Initializes bias to a value of 0.1 for the layer.
:param shape: the shape of the layer to create a bias variable for.
:param name: the name to give the variable
:returns: a tensorflow constant variable
:rtype: tf.Variable
"""
initial = tf.constant(0.1, shape=shape, dtype=tf.float64)
return tf.Variable(initial, name=name)
def initialize_weights(self):
"""Constructs tensorflow variables for the weights and biases in each layer of
the graph.
The number of layers, and the sizes of each layer, are defined in the
`layer_sizes` field passed to the object on construction.
Creates variables self.weights and self.biases, which are arrays that
contain the weights and biases for each layer of the network.
:returns: None
:rtype: NoneType
"""
# include -1 as the start layer
self.weights, self.biases = [], []
layer_indices = [-1] + range(len(self.params['layer_sizes']))
weights_and_sizes = [self._initial_weight_and_size(idx)
for idx in layer_indices]
input_sizes, output_sizes, self.weights = zip(*weights_and_sizes)
self.biases = [self._initial_bias(idx) for idx in layer_indices]
print("Okay, making a neural net with the following structure:")
print(["{}x{} {}".format(i, o, o) for i, o
in zip(input_sizes, output_sizes)])
##########################################################################
# Building Graph
##########################################################################
def _activation_function(self, h):
"""Returns the activation function for this network.
:param h: The hidden layer to apply the activation function to
:returns: the application of this network's activation function to h
:rtype: Tensor with the same type as h
"""
if self.params['activation_func'] == 'relu':
return tf.nn.relu(h)
return tf.nn.sigmoid(h)
def _run_network(self, input_X):
"""Runs the network for each layer in self.weights on the given input
Runs our network. Applies our learned weights at each layer in the
network, adds biases, and applies our activation function + dropout.
the type of input_X and the output of _run_network is the same as
the initial placeholder for self.tf_X
:param input_X: The input to run the network on, a tf.float64
:returns: The output of the final layer of our network.
:rtype: tf.float64
"""
hidden = input_X
def not_final_layer_p(n): return n == len(self.weights) - 1
for n, w in enumerate(self.weights):
# invoke layer context
with tf.name_scope('layer{}'.format(n)) as scope:
# simple fully connected layer
hidden = tf.matmul(hidden, w) + self.biases[n]
if not_final_layer_p(n):
hidden = self._activation_function(hidden)
hidden = tf.nn.dropout(hidden,
self.tf_dropout_prob)
return hidden
def _configure_common(self):
"""Configures the initial output, input, and dropout tensors
Creates placeholder tensors for tf_X, tf_Y, and tf_dropout_prob
For info on placeholders, see:
https://www.tensorflow.org/versions/r0.11/api_docs/python/io_ops/placeholders
Depends on model_type. For classification, tf_Y will be a tf.int64. For
regression, tf.float64.
:returns: None
:rtype: NoneType
"""
# output, float for regression, int for classification
y_type = tf.int64 if self.is_classification_p else tf.float64
self.tf_Y = tf.placeholder(y_type, name="Y")
# input, always floats for now
self.tf_X = tf.placeholder(tf.float64, name="X")
# droput probabilities for nodes
self.tf_dropout_prob = tf.placeholder(tf.float64)
def _configure_classification(self):
"""Configure network for classification
Sets up our loss function, weight regularization, predictions &
accuracy for the classification regime.
loss function: softmax cross entropy
regularization: l2
:returns: None
:rtype: NoneType
"""
# Apply a softmax function to get probabilities, train this dist
# against targets with cross entropy loss.
loss_func = tf.nn.sparse_softmax_cross_entropy_with_logits
self.loss = tf.reduce_mean(loss_func(logits=self.logits,
labels=self.tf_Y))
# Add weight decay regularization term to loss
weight_reg = sum([tf.nn.l2_loss(w) for w in self.weights])
self.loss += self.params['weight_penalty'] * weight_reg
# Code for making predictions and evaluating them.
self.class_probabilities = tf.nn.softmax(self.logits)
self.predictions = tf.argmax(self.class_probabilities, axis=1)
self.correct_prediction = tf.equal(self.predictions, self.tf_Y)
self.accuracy = tf.reduce_mean(tf.cast(self.correct_prediction,
tf.float32))
def _configure_regression(self):
"""Configures network for regression.
Sets up our loss & weight regularization.
loss: rmse
regularization: l2
:returns: None
:rtype: NoneType
"""
# Apply mean squared error loss.
errs = tf.subtract(tf.reshape(self.logits, [-1]),
self.tf_Y)
self.squared_errors = tf.square(errs)
self.rmse = tf.sqrt(tf.reduce_mean(self.squared_errors))
# Add weight decay regularization term to loss
weight_reg = sum([tf.nn.l2_loss(w) for w in self.weights])
self.loss = self.rmse + self.params['weight_penalty'] * weight_reg
def _configure_backprop(self):
"""Configure backprop for this network.
Sets our gradients & optimizer given our loss and training parameters.
If `params['clip_gradients']` is true, we clip by a global norm. We set
clip_norm to be 5.
Once this is called, we can call self.opt_step to run an optimization
step.
:returns: None
:rtype: NoneType
"""
# Set up backpropagation computation!
self.global_step = tf.Variable(0, trainable=False, name='global_step')
self.train_params = tf.trainable_variables()
# set gradients for learning
self.gradients = tf.gradients(self.loss, self.train_params)
if self.params['clip_gradients']:
self.gradients, _ = tf.clip_by_global_norm(self.gradients, 5)
# optimization step using gradients
self.tf_optimizer = self.optimizer(self.params['learning_rate'])
self.opt_step = self.tf_optimizer.apply_gradients(zip(self.gradients,
self.train_params),
self.global_step)
def configure_network(self):
"""Configure our network based on our model type.
Configures the network's loss, weight regularization, prediction &
accuracy, etc. depending on our model type. Also configures
backpropogation & optimization.
self.opt_step may be run after this is called to run optimization.
:returns: None
:rtype: NoneType
"""
if self.params['model_type'] == 'classification':
self._configure_classification()
else:
self._configure_regression()
self._configure_backprop()
def build_graph(self):
"""Constructs the tensorflow graph with all variables to be trained
Configures network, initializes weights, sets loss & regularization
based on model type, runs the network once, and initializes tensorflow
variables.
:returns: None
:rtype: NoneType
"""
print("building graph...")
# get context of our one and only graph
with self.graph.as_default() as g:
self._configure_common()
self.initialize_weights()
self.logits = self._run_network(self.tf_X)
self.configure_network()
# Necessary for tensorflow to build graph
self.init = tf.global_variables_initializer()
##########################################################################
# Training
##########################################################################
def _print_validation_results(self, step, train_score, val_score):
"""Prints the validation results for this step, and saves a model checkpoint in
checkpoint_dir.
:param step: the current training step number we're on
:param train_score: the model's current score on the training data
:param val_score: the model's current score on the validation data
:returns: None
:rtype: NoneType
"""
print "Training iteration", step
print "\t Training", self.metric_name, train_score
print "\t Validation", self.metric_name, val_score
self.train_metrics.append(train_score)
self.val_metrics.append(val_score)
# Save a checkpoint of the model
self.saver.save(self.session, self.checkpoint_dir +
self.model_name + '.ckpt', global_step=step)
"""
Runs validation on model trained up to this step.
step: step number
feed_dict: feed dict given to this session to run
Returns the train score and validation score of model up to this step.
"""
def _validate_batch(self, step, feed_dict):
"""Runs the model trained up to this step on our validation set.
:param step: step number we're currently on
:param feed_dict: feed dict given to this session to run
:returns: A tuple of the (train_score, val_score) with training
and validation score for this step.
:rtype: tuple
"""
val_X, val_Y = self.data_loader.get_val_data()
val_feed_dict = {self.tf_X: val_X,
self.tf_Y: val_Y,
self.tf_dropout_prob: 1.0}
eval_fn = self.accuracy if self.is_classification_p else self.rmse
train_score = self.session.run(eval_fn, feed_dict)
val_score = self.session.run(eval_fn, val_feed_dict)
return (train_score, val_score)
def _sgd_train_step(self, step, output_every_nth):
"""Runs a step of Stochastic Gradient Descent
:param step: The number of the step we're on in training
:param output_every_nth: The network will print intermediate results
and save a checkpoint every `output_every_nth` steps.
:returns: None
:rtype: NoneType
"""
# replace placeholders with values from data
X, Y = self.data_loader.get_train_batch(self.params['batch_size'])
feed_dict = {self.tf_X: X,
self.tf_Y: Y,
self.tf_dropout_prob: self.params['dropout_prob']}
# run an optimization step
_ = self.session.run([self.opt_step], feed_dict)
# run our validation if we're at our step count
if step % output_every_nth == 0:
self._validate_batch(step, feed_dict)
def train(self, num_steps=30000, output_every_nth=None):
"""Trains the network by running Stochastic Gradient Descent for num_steps.
:param num_steps: Number of steps to run SGD for
:param output_every_nth: The network will print intermediate results
and save a checkpoint every `output_every_nth` steps.
:returns: None
:rtype: NoneType
"""
if output_every_nth is not None:
self.output_every_nth = output_every_nth
for step in range(num_steps):
self._sgd_train_step(step, output_every_nth)
##########################################################################
# Prediction & Usage
##########################################################################
def predict(self, X, get_probabilities=False):
"""Runs the network to get predictions for new data X.
:param X: matrix of data in the same shape + format as the data this
network was trained on.
:param get_probabilities: If true, the network will return the model's
computed softmax probabilities as well as its predictions. Only works
for classification.
:returns: Integer class predictions if classification, and float
predictions if regression.
:rtype: tf.int64 or tf.float64
"""
# no dropout for prediction
feed_dict = {self.tf_X: X,
self.tf_dropout_prob: 1.0}
if self.is_classification_p:
probs, preds = self.session.run([self.class_probabilities,
self.predictions],
feed_dict)
return (preds, probs) if get_probabilities else preds
else: # regression
return self.session.run(self.logits, feed_dict)
def plot_training_progress(self):
"""Plots the training and validation performance as evaluated
throughout training."""
x = [self.output_every_nth * i for i in np.arange(len(self.train_metrics))]
plt.figure()
plt.plot(x,self.train_metrics)
plt.plot(x,self.val_metrics)
plt.legend(['Train', 'Validation'], loc='best')
plt.xlabel('Training epoch')
plt.ylabel(self.metric_name)
plt.show()
def plot_binary_classification_data(self, with_decision_boundary=False):
"""Plots the data from each of two binary classes with two different
colours. If with_decision_boundary is set to true, also plots the
decision boundary learned by the model.
Note: This function only works if there are two input features.
"""
class1_X, class2_X = self.data_loader.get_train_binary_classification_data()
plt.figure()
plt.scatter(class1_X[:,0],class1_X[:,1], color='b')
plt.scatter(class2_X[:,0],class2_X[:,1], color='r')
if with_decision_boundary:
# Make a mesh of points on which to make predictions
mesh_step_size = .1
x1_min = self.data_loader.train_X[:, 0].min() - 1
x1_max = self.data_loader.train_X[:, 0].max() + 1
x2_min = self.data_loader.train_X[:, 1].min() - 1
x2_max = self.data_loader.train_X[:, 1].max() + 1
xx1, xx2 = np.meshgrid(np.arange(x1_min, x1_max, mesh_step_size),
np.arange(x2_min, x2_max, mesh_step_size))
# Make predictions for each point in the mesh
Z = self.predict(np.c_[xx1.ravel(), xx2.ravel()])
# Use matplotlib contour function to show decision boundary on mesh
Z = Z.reshape(xx1.shape)
plt.contour(xx1, xx2, Z, cmap=plt.cm.Paired)
plt.show()
def plot_regression_data(self, with_decision_boundary=False):
"""Plots input regression data. If with_decision_boundary is set
to true, also plots the regression function learned by the model.
Note: This function only works if there is one input feature.
"""
plt.figure()
plt.scatter(self.data_loader.train_X, self.data_loader.train_Y)
if with_decision_boundary:
sorted_x = sorted(self.data_loader.train_X)
preds = self.predict(sorted_x)
plt.plot(sorted_x, preds, color='r', lw=2)
plt.show()
def test_on_validation(self):
"""Returns performance on the model's validation set."""
score = self.get_performance_on_data(self.data_loader.val_X,
self.data_loader.val_Y)
print "Final", self.metric_name, "on validation data is:", score
return score
def test_on_test(self):
"""Returns performance on the model's test set."""
print "WARNING! Only test on the test set when you have finished choosing all of your hyperparameters!"
print "\tNever use the test set to choose hyperparameters!!!"
score = self.get_performance_on_data(self.data_loader.test_X,
self.data_loader.test_Y)
print "Final", self.metric_name, "on test data is:", score
return score
def get_performance_on_data(self, X, Y):
"""Returns the model's performance on input data X and targets Y."""
feed_dict = {self.tf_X: X,
self.tf_Y: Y,
self.tf_dropout_prob: 1.0} # no dropout during evaluation
if self.params['model_type'] == 'classification':
score = self.session.run(self.accuracy, feed_dict)
else: # regression
score = self.session.run(self.rmse, feed_dict)
return score
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment