Instantly share code, notes, and snippets.

Embed
What would you like to do?
Restricted Boltzmann Machine implementation in TensorFlow, before and after code refactoring. Blog post: http://blackecho.github.io/blog/programming/2016/02/21/refactoring-rbm-tensor-flow-implementation.html
import tensorflow as tf
import numpy as np
import os
import zconfig
import utils
class RBM(object):
""" Restricted Boltzmann Machine implementation using TensorFlow.
The interface of the class is sklearn-like.
"""
def __init__(self, num_visible, num_hidden, visible_unit_type='bin', main_dir='rbm', model_name='rbm_model',
gibbs_sampling_steps=1, learning_rate=0.01, batch_size=10, num_epochs=10, stddev=0.1, verbose=0):
"""
:param num_visible: number of visible units
:param num_hidden: number of hidden units
:param visible_unit_type: type of the visible units (binary or gaussian)
:param main_dir: main directory to put the models, data and summary directories
:param model_name: name of the model, used to save data
:param gibbs_sampling_steps: optional, default 1
:param learning_rate: optional, default 0.01
:param batch_size: optional, default 10
:param num_epochs: optional, default 10
:param stddev: optional, default 0.1. Ignored if visible_unit_type is not 'gauss'
:param verbose: level of verbosity. optional, default 0
"""
self.num_visible = num_visible
self.num_hidden = num_hidden
self.visible_unit_type = visible_unit_type
self.main_dir = main_dir
self.model_name = model_name
self.gibbs_sampling_steps = gibbs_sampling_steps
self.learning_rate = learning_rate
self.batch_size = batch_size
self.num_epochs = num_epochs
self.stddev = stddev
self.verbose = verbose
self.models_dir, self.data_dir, self.summary_dir = self._create_data_directories()
self.model_path = self.models_dir + self.model_name
self.W = None
self.bh_ = None
self.bv_ = None
self.w_upd8 = None
self.bh_upd8 = None
self.bv_upd8 = None
self.encode = None
self.loss_function = None
self.input_data = None
self.hrand = None
self.vrand = None
self.validation_size = None
self.tf_merged_summaries = None
self.tf_summary_writer = None
self.tf_session = None
self.tf_saver = None
def fit(self, train_set, validation_set=None, restore_previous_model=False):
""" Fit the model to the training data.
:param train_set: training set
:param validation_set: validation set. optional, default None
:param restore_previous_model:
if true, a previous trained model
with the same name of this model is restored from disk to continue training.
:return: self
"""
if validation_set is not None:
self.validation_size = validation_set.shape[0]
self._build_model()
with tf.Session() as self.tf_session:
self._initialize_tf_utilities_and_ops(restore_previous_model)
self._train_model(train_set, validation_set)
self.tf_saver.save(self.tf_session, self.model_path)
def _initialize_tf_utilities_and_ops(self, restore_previous_model):
""" Initialize TensorFlow operations: summaries, init operations, saver, summary_writer.
Restore a previously trained model if the flag restore_previous_model is true.
"""
self.tf_merged_summaries = tf.merge_all_summaries()
init_op = tf.initialize_all_variables()
self.tf_saver = tf.train.Saver()
self.tf_session.run(init_op)
if restore_previous_model:
self.tf_saver.restore(self.tf_session, self.model_path)
self.tf_summary_writer = tf.train.SummaryWriter(self.summary_dir, self.tf_session.graph_def)
def _train_model(self, train_set, validation_set):
""" Train the model.
:param train_set: training set
:param validation_set: validation set. optional, default None
:return: self
"""
for i in range(self.num_epochs):
self._run_train_step(train_set)
if validation_set is not None:
self._run_validation_error_and_summaries(i, validation_set)
def _run_train_step(self, train_set):
""" Run a training step. A training step is made by randomly shuffling the training set,
divide into batches and run the variable update nodes for each batch.
:param train_set: training set
:return: self
"""
np.random.shuffle(train_set)
batches = [_ for _ in utils.gen_batches(train_set, self.batch_size)]
updates = [self.w_upd8, self.bh_upd8, self.bv_upd8]
for batch in batches:
self.tf_session.run(updates, feed_dict=self._create_feed_dict(batch))
def _run_validation_error_and_summaries(self, epoch, validation_set):
""" Run the summaries and error computation on the validation set.
:param epoch: current epoch
:param validation_set: validation data
:return: self
"""
result = self.tf_session.run([self.tf_merged_summaries, self.loss_function],
feed_dict=self._create_feed_dict(validation_set))
summary_str = result[0]
err = result[1]
self.tf_summary_writer.add_summary(summary_str, 1)
if self.verbose == 1:
print("Validation cost at step %s: %s" % (epoch, err))
def _create_feed_dict(self, data):
""" Create the dictionary of data to feed to TensorFlow's session during training.
:param data: training/validation set batch
:return: dictionary(self.input_data: data, self.hrand: random_uniform, self.vrand: random_uniform)
"""
return {
self.input_data: data,
self.hrand: np.random.rand(data.shape[0], self.num_hidden),
self.vrand: np.random.rand(data.shape[0], self.num_visible)
}
def _build_model(self):
""" Build the Restricted Boltzmann Machine model in TensorFlow.
:return: self
"""
self.input_data, self.hrand, self.vrand = self._create_placeholders()
self.W, self.bh_, self.bv_ = self._create_variables()
hprobs0, hstates0, vprobs, hprobs1, hstates1 = self.gibbs_sampling_step(self.input_data)
positive = self.compute_positive_association(self.input_data, hprobs0, hstates0)
nn_input = vprobs
for step in range(self.gibbs_sampling_steps - 1):
hprobs, hstates, vprobs, hprobs1, hstates1 = self.gibbs_sampling_step(nn_input)
nn_input = vprobs
negative = tf.matmul(tf.transpose(vprobs), hprobs1)
self.encode = hprobs1 # encoded data, used by the transform method
self.w_upd8 = self.W.assign_add(self.learning_rate * (positive - negative))
self.bh_upd8 = self.bh_.assign_add(self.learning_rate * tf.reduce_mean(hprobs0 - hprobs1, 0))
self.bv_upd8 = self.bv_.assign_add(self.learning_rate * tf.reduce_mean(self.input_data - vprobs, 0))
self.loss_function = tf.sqrt(tf.reduce_mean(tf.square(self.input_data - vprobs)))
_ = tf.scalar_summary("cost", self.loss_function)
def _create_placeholders(self):
""" Create the TensorFlow placeholders for the model.
:return: tuple(input(shape(None, num_visible)),
hrand(shape(None, num_hidden))
vrand(shape(None, num_visible)))
"""
x = tf.placeholder('float', [None, self.num_visible], name='x-input')
hrand = tf.placeholder('float', [None, self.num_hidden], name='hrand')
vrand = tf.placeholder('float', [None, self.num_visible], name='vrand')
return x, hrand, vrand
def _create_variables(self):
""" Create the TensorFlow variables for the model.
:return: tuple(weights(shape(num_visible, num_hidden),
hidden bias(shape(num_hidden)),
visible bias(shape(num_visible)))
"""
W = tf.Variable(tf.random_normal((self.num_visible, self.num_hidden), mean=0.0, stddev=0.01), name='weights')
bh_ = tf.Variable(tf.zeros([self.num_hidden]), name='hidden-bias')
bv_ = tf.Variable(tf.zeros([self.num_visible]), name='visible-bias')
return W, bh_, bv_
def gibbs_sampling_step(self, visible):
""" Performs one step of gibbs sampling.
:param visible: activations of the visible units
:return: tuple(hidden probs, hidden states, visible probs,
new hidden probs, new hidden states)
"""
hprobs, hstates = self.sample_hidden_from_visible(visible)
vprobs = self.sample_visible_from_hidden(hprobs)
hprobs1, hstates1 = self.sample_hidden_from_visible(vprobs)
return hprobs, hstates, vprobs, hprobs1, hstates1
def sample_hidden_from_visible(self, visible):
""" Sample the hidden units from the visible units.
This is the Positive phase of the Contrastive Divergence algorithm.
:param visible: activations of the visible units
:return: tuple(hidden probabilities, hidden binary states)
"""
hprobs = tf.nn.sigmoid(tf.matmul(visible, self.W) + self.bh_)
hstates = utils.sample_prob(hprobs, self.hrand)
return hprobs, hstates
def sample_visible_from_hidden(self, hidden):
""" Sample the visible units from the hidden units.
This is the Negative phase of the Contrastive Divergence algorithm.
:param hidden: activations of the hidden units
:return: visible probabilities
"""
visible_activation = tf.matmul(hidden, tf.transpose(self.W)) + self.bv_
if self.visible_unit_type == 'bin':
vprobs = tf.nn.sigmoid(visible_activation)
elif self.visible_unit_type == 'gauss':
vprobs = tf.truncated_normal((1, self.num_visible), mean=visible_activation, stddev=self.stddev)
else:
vprobs = None
return vprobs
def compute_positive_association(self, visible, hidden_probs, hidden_states):
""" Compute positive associations between visible and hidden units.
:param visible: visible units
:param hidden_probs: hidden units probabilities
:param hidden_states: hidden units states
:return: positive association = dot(visible.T, hidden)
"""
if self.visible_unit_type == 'bin':
positive = tf.matmul(tf.transpose(visible), hidden_states)
elif self.visible_unit_type == 'gauss':
positive = tf.matmul(tf.transpose(visible), hidden_probs)
else:
positive = None
return positive
def _create_data_directories(self):
""" Create the three directories for storing respectively the models,
the data generated by training and the TensorFlow's summaries.
:return: tuple of strings(models_dir, data_dir, summary_dir)
"""
self.main_dir = self.main_dir + '/' if self.main_dir[-1] != '/' else self.main_dir
models_dir = config.models_dir + self.main_dir
data_dir = config.data_dir + self.main_dir
summary_dir = config.summary_dir + self.main_dir
for d in [models_dir, data_dir, summary_dir]:
if not os.path.isdir(d):
os.mkdir(d)
return models_dir, data_dir, summary_dir
def transform(self, data, name='train', save=False):
""" Transform data according to the model.
:type data: array_like
:param data: Data to transform
:type name: string, default 'train'
:param name: Identifier for the data that is being encoded
:type save: boolean, default 'False'
:param save: If true, save data to disk
:return: transformed data
"""
with tf.Session() as self.tf_session:
self.tf_saver.restore(self.tf_session, self.model_path)
encoded_data = self.encode.eval(self._create_feed_dict(data))
if save:
np.save(self.data_dir + self.model_name + '-' + name, encoded_data)
return encoded_data
def load_model(self, shape, gibbs_sampling_steps, model_path):
""" Load a trained model from disk. The shape of the model
(num_visible, num_hidden) and the number of gibbs sampling steps
must be known in order to restore the model.
:param shape: tuple(num_visible, num_hidden)
:param gibbs_sampling_steps:
:param model_path:
:return: self
"""
self.num_visible, self.num_hidden = shape[0], shape[1]
self.gibbs_sampling_steps = gibbs_sampling_steps
self._build_model()
init_op = tf.initialize_all_variables()
self.tf_saver = tf.train.Saver()
with tf.Session() as self.tf_session:
self.tf_session.run(init_op)
self.tf_saver.restore(self.tf_session, model_path)
def get_model_parameters(self):
""" Return the model parameters in the form of numpy arrays.
:return: model parameters
"""
with tf.Session() as self.tf_session:
self.tf_saver.restore(self.tf_session, self.model_path)
return {
'W': self.W.eval(),
'bh_': self.bh_.eval(),
'bv_': self.bv_.eval()
}
def get_weights_as_images(self, width, height, outdir='img/', n_images=10, img_type='grey'):
""" Create and save the weights of the hidden units with respect to the
visible units as images.
:param width:
:param height:
:param outdir:
:param n_images:
:param img_type:
:return: self
"""
outdir = self.data_dir + outdir
with tf.Session() as self.tf_session:
self.tf_saver.restore(self.tf_session, self.model_path)
weights = self.W.eval()
perm = np.random.permutation(self.num_hidden)[:n_images]
for p in perm:
w = np.array([i[p] for i in weights])
image_path = outdir + self.model_name + '_{}.png'.format(p)
utils.gen_image(w, width, height, image_path, img_type)
from tensorflow.python.framework import ops
import tensorflow as tf
import numpy as np
import os
import zconfig
import utils
class RBM(object):
""" Restricted Boltzmann Machine implementation using TensorFlow.
The interface of the class is sklearn-like.
"""
def __init__(self, nvis, nhid, vis_type='bin', directory_name='rbm', model_name='', gibbs_k=1, learning_rate=0.01,
batch_size=10, n_iter=10, stddev=0.1, verbose=0):
self.nvis = nvis
self.nhid = nhid
self.vis_type = vis_type
self.directory_name = directory_name
self.model_name = model_name
self.gibbs_k = gibbs_k
self.learning_rate = learning_rate
self.batch_size = batch_size
self.n_iter = n_iter
self.stddev = stddev
self.verbose = verbose
# Directories paths
self.directory_name = self.directory_name + '/' if self.directory_name[-1] != '/' else self.directory_name
self.models_dir = config.models_dir + self.directory_name
self.data_dir = config.data_dir + self.directory_name
self.summary_dir = config.summary_dir + self.directory_name
# Create dirs
for d in [self.models_dir, self.data_dir, self.summary_dir]:
if not os.path.isdir(d):
os.mkdir(d)
if self.model_name == '':
# Assign model complete name
self.model_name = 'rbm-{}-{}-{}-{}-{}-{}'.format(
self.nvis, self.nhid, self.n_iter, self.batch_size, self.learning_rate, self.batch_size)
# ############################# #
# Computational graph nodes #
# ############################# #
# Model parameters
self.W = None
self.bh_ = None
self.bv_ = None
self.w_upd8 = None
self.bh_upd8 = None
self.bv_upd8 = None
self.encode = None
self.cost = None
self.hrand = None
self.vrand = None
self.validation_size = None
self.sess = None
self.saver = None
def _create_graph(self):
# Symbolic variables
self.x = tf.placeholder('float', [None, self.nvis], name='x-input')
self.hrand = tf.placeholder('float', [None, self.nhid], name='hrand')
self.vrand = tf.placeholder('float', [None, self.nvis], name='vrand-train')
# Biases
self.bh_ = tf.Variable(tf.zeros([self.nhid]), name='hidden-bias')
self.bv_ = tf.Variable(tf.zeros([self.nvis]), name='visible-bias')
self.W = tf.Variable(tf.random_normal((self.nvis, self.nhid), mean=0.0, stddev=0.01), name='weights')
nn_input = self.x
# Initialization
hprobs0 = None
hprobs = None
positive = None
vprobs = None
hprobs1 = None
hstates1 = None
for step in range(self.gibbs_k):
# Positive Contrastive Divergence phase
hprobs = tf.nn.sigmoid(tf.matmul(nn_input, self.W) + self.bh_)
hstates = utils.sample_prob(hprobs, self.hrand)
# Compute positive associations in step 0
if step == 0:
hprobs0 = hprobs # save the activation probabilities of the first step
if self.vis_type == 'bin':
positive = tf.matmul(tf.transpose(nn_input), hstates)
elif self.vis_type == 'gauss':
positive = tf.matmul(tf.transpose(nn_input), hprobs)
# Negative Contrastive Divergence phase
visible_activation = tf.matmul(hprobs, tf.transpose(self.W)) + self.bv_
if self.vis_type == 'bin':
vprobs = tf.nn.sigmoid(visible_activation)
elif self.vis_type == 'gauss':
vprobs = tf.truncated_normal((1, self.nvis), mean=visible_activation, stddev=self.stddev)
# Sample again from the hidden units
hprobs1 = tf.nn.sigmoid(tf.matmul(vprobs, self.W) + self.bh_)
hstates1 = utils.sample_prob(hprobs1, self.hrand)
# Use the reconstructed visible units as input for the next step
nn_input = vprobs
negative = tf.matmul(tf.transpose(vprobs), hprobs1)
self.encode = hprobs # encoded data
self.w_upd8 = self.W.assign_add(self.learning_rate * (positive - negative))
self.bh_upd8 = self.bh_.assign_add(self.learning_rate * tf.reduce_mean(hprobs0 - hprobs1, 0))
self.bv_upd8 = self.bv_.assign_add(self.learning_rate * tf.reduce_mean(self.x - vprobs, 0))
self.cost = tf.sqrt(tf.reduce_mean(tf.square(self.x - vprobs)))
_ = tf.scalar_summary("cost", self.cost)
def fit(self, trX, vlX=None, restore_previous_model=False):
if vlX is not None:
self.validation_size = vlX.shape[0]
# Reset tensorflow's default graph
ops.reset_default_graph()
self._create_graph()
merged = tf.merge_all_summaries()
init_op = tf.initialize_all_variables()
self.saver = tf.train.Saver()
with tf.Session() as self.sess:
self.sess.run(init_op)
if restore_previous_model:
# Restore previous model
self.saver.restore(self.sess, self.models_dir + self.model_name)
# Change model name
self.model_name += '-restored{}'.format(self.n_iter)
# Write tensorflow summaries to summary dir
writer = tf.train.SummaryWriter(self.summary_dir, self.sess.graph_def)
for i in range(self.n_iter):
# Randomly shuffle the input
np.random.shuffle(trX)
batches = [_ for _ in utils.gen_batches(trX, self.batch_size)]
for batch in batches:
self.sess.run([self.w_upd8, self.bh_upd8, self.bv_upd8],
feed_dict={self.x: batch,
self.hrand: np.random.rand(batch.shape[0], self.nhid),
self.vrand: np.random.rand(batch.shape[0], self.nvis)})
if i % 5 == 0:
# Record summary data
if vlX is not None:
feed = {self.x: vlX,
self.hrand: np.random.rand(self.validation_size, self.nhid),
self.vrand: np.random.rand(self.validation_size, self.nvis)}
result = self.sess.run([merged, self.cost], feed_dict=feed)
summary_str = result[0]
err = result[1]
writer.add_summary(summary_str, 1)
if self.verbose == 1:
print("Validation cost at step %s: %s" % (i, err))
# Save trained model
self.saver.save(self.sess, self.models_dir + self.model_name)
def transform(self, data, name='train', gibbs_k=1, save=False, models_dir=''):
""" Transform data according to the model.
:type data: array_like
:param data: Data to transform
:type name: string, default 'train'
:param name: Identifier for the data that is being encoded
:type gibbs_k: 1
:param gibbs_k: Gibbs sampling steps
:type save: boolean, default 'False'
:param save: If true, save data to disk
:return: transformed data
"""
with tf.Session() as self.sess:
# Restore trained model
self.saver.restore(self.sess, self.models_dir + self.model_name)
# Return the output of the encoding layer
encoded_data = self.encode.eval({self.x: data,
self.hrand: np.random.rand(data.shape[0], self.nhid),
self.vrand: np.random.rand(data.shape[0], self.nvis)})
if save:
# Save transformation to output file
np.save(self.data_dir + self.model_name + '-' + name, encoded_data)
return encoded_data
def load_model(self, shape, gibbs_k, model_path):
"""
:param shape: tuple(nvis, nhid)
:param model_path:
:return:
"""
self.nvis, self.nhid = shape[0], shape[1]
self.gibbs_k = gibbs_k
self._create_graph()
# Initialize variables
init_op = tf.initialize_all_variables()
# Add ops to save and restore all the variables
self.saver = tf.train.Saver()
with tf.Session() as self.sess:
self.sess.run(init_op)
# Restore previous model
self.saver.restore(self.sess, model_path)
def get_model_parameters(self):
""" Return the model parameters in the form of numpy arrays.
:return: model parameters
"""
with tf.Session() as self.sess:
# Restore trained model
self.saver.restore(self.sess, self.models_dir + self.model_name)
return {
'W': self.W.eval(),
'bh_': self.bh_.eval(),
'bv_': self.bv_.eval()
}
def get_weights_as_images(self, width, height, outdir='img/', n_images=10, img_type='grey'):
outdir = self.data_dir + outdir
with tf.Session() as self.sess:
self.saver.restore(self.sess, self.models_dir + self.model_name)
weights = self.W.eval()
perm = np.random.permutation(self.nhid)[:n_images]
for p in perm:
w = np.array([i[p] for i in weights])
image_path = outdir + self.model_name + '_{}.png'.format(p)
utils.gen_image(w, width, height, image_path, img_type)
from scipy import misc
import tensorflow as tf
import numpy as np
def sample_prob(probs, rand):
""" Takes a tensor of probabilities (as from a sigmoidal activation)
and samples from all the distributions
:param probs: tensor of probabilities
:param rand: tensor (of the same shape as probs) of random values
:return : binary sample of probabilities
"""
return tf.nn.relu(tf.sign(probs - rand))
def gen_batches(data, batch_size):
""" Divide input data into batches.
:param data: input data
:param batch_size: size of each batch
:return: data divided into batches
"""
data = np.array(data)
for i in range(0, data.shape[0], batch_size):
yield data[i:i+batch_size]
def gen_image(img, width, height, outfile, img_type='grey'):
assert len(img) == width * height or len(img) == width * height * 3
if img_type == 'grey':
misc.imsave(outfile, img.reshape(width, height))
elif img_type == 'color':
misc.imsave(outfile, img.reshape(3, width, height))
models_dir = 'models/' # dir to save/restore models
data_dir = 'data/' # directory to store algorithm data
summary_dir = 'logs/' # directory to store tensorflow summaries
@renjithbaby23

This comment has been minimized.

renjithbaby23 commented Apr 27, 2017

Hi,
Could anyone please share a sample code that uses this class to generate an RBM?

@niharricky27

This comment has been minimized.

niharricky27 commented May 29, 2017

One question: During your batch-wise _run_train_step , if one wants to get the self.encode variable for every input (not batch wise), how can one do that?

@deltawi

This comment has been minimized.

deltawi commented Aug 29, 2017

@blackecho can you provide a sample code on how to use it ?

@Alex891

This comment has been minimized.

Alex891 commented Nov 8, 2017

I think something like this should work :)

from RBM import RBM
from tensorflow.examples.tutorials.mnist import input_data
import tensorflow as tf

#Loading in the mnist data
mnist = input_data.read_data_sets("MNIST_data/", one_hot=True)
trX, trY, teX, teY = mnist.train.images, mnist.train.labels, mnist.test.images, mnist.test.labels

RBM_visible_sizes = 784
RBM_hidden_sizes = 600

#Since we are training, set input as training data
inpX = trX

rbm = RBM(RBM_visible_sizes, RBM_hidden_sizes))
rbm.fit(inpX)

@nsadawi

This comment has been minimized.

nsadawi commented Feb 23, 2018

I think there is an error in the method _create_data_directories
The correct paths should be:
models_dir = self.main_dir + zconfig.models_dir
data_dir = self.main_dir + zconfig.data_dir
summary_dir = self.main_dir + zconfig.summary_dir

@Sahand1993

This comment has been minimized.

Sahand1993 commented Mar 17, 2018

Hello, Thanks for this implementation. One question, why are you using the reduce_mean function when updating the biases on line 204 and 205 in your first example? self.bh_upd8 = self.bh_.assign_add(self.learning_rate * tf.reduce_mean(hprobs0 - hprobs1, 0)) for example, shouldn't it be self.bh_upd8 = self.bh_.assign_add(self.learning_rate * (hprobs0 - hprobs1))? I base this question on the very beginning of this video (look at his formulas for the bias updates) https://www.youtube.com/watch?v=S0kFFiHzR8M .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment