Skip to content

Instantly share code, notes, and snippets.

@shkr
Last active February 14, 2019 05:33
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save shkr/326946579451f80f98a5e7d11eef27b4 to your computer and use it in GitHub Desktop.
Save shkr/326946579451f80f98a5e7d11eef27b4 to your computer and use it in GitHub Desktop.
stacked_denoising_autoencoder
import numpy as np
from itertools import islice
import tensorflow as tf
import multiprocessing
from datetime import datetime, timedelta
import cPickle as pickle
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('SDA')
LOG_DIR = 'summaries/'
def weight_variable(shape):
"""fn returns a weight variable of shape and truncated normal values"""
return tf.Variable(tf.truncated_normal(shape, stddev=0.2))
def bias_variable(shape):
"""fn returns a bias variable with zeros init"""
return tf.Variable(tf.zeros(shape))
def variable_summaries(var):
"""fn attaches a basic set of statistics of the variable, to tensorboard summaries"""
with tf.name_scope('summaries'):
mean = tf.reduce_mean(var)
tf.summary.scalar('mean', mean)
with tf.name_scope('stddev'):
stddev = tf.sqrt(tf.reduce_mean(tf.square(var - mean)))
tf.summary.scalar('stddev', stddev)
tf.summary.scalar('max', tf.reduce_max(var))
tf.summary.scalar('min', tf.reduce_min(var))
tf.summary.histogram('histogram', var)
def two_d_tensor_image_summary(name, tensor, rows, columns):
"""fn transforms a tensor to a tf.image.Summary"""
four_d_tensor = tf.reshape(tensor, [rows, columns, 1, 1])
return tf.summary.image(name, four_d_tensor)
def queue_worker(q, generator):
"""fn iteratively inserts value into queue"""
while True:
q.put(generator.next())
def queue_reader(q):
"""fn returns a queue reader iterator"""
while True:
yield q.get()
def safe_write_to_s3(localfile, bucket, s3filename):
"""fn to safely write a local file to a s3 bucket"""
import boto3
s3 = boto3.resource('s3')
did_it = False
for i in range(5):
try:
s3.Bucket(bucket).upload_file(localfile,s3filename)
did_it = True
except Exception as e:
print e
time.sleep(1)
continue
break
return did_it
class StackedDenoisingAutoencoder(object):
def __init__(self,
update_callback,
name,
input_dimension,
layers,
batch_size,
embed_dim,
learning_rate,
curroption_probability,
max_batches_per_layer,
batch_generator,
s3OutputUrl,
**kwargs):
R"""
Stacked Denoising Autoencoder
Parameters
-----
name str
A name for the model
input_dimension int
The input dimension of the vector
layers
The number of layers in the sda
batch_size int
The number of items in each batch
embed_dim int
The number of dimensions in the embedding
learning_rate float
The intial learning rate for the optimizer
curroption_probability float
The fraction of nodes in the input layer that
are not curropted in a sda training step
max_batches_per_layer int
The maximum number of batches to train each layer
layers
The number of layers in the SDA
update_callback
A callback function to send messages during the training
and model serializing process
batch_generator iterator
An iterator that generates a matrix of size batch_size x input_dimension
s3OutputUrl str
The s3 url where results will be serialized
"""
# Attributes
self.name = name
self.s3OutputUrl = s3OutputUrl
self.vocabulary_dim = input_dimension
# Hyperparameters
self.batch_size = batch_size
self.embed_dim = embed_dim
self.learning_rate = learning_rate
self.corruption_probability = corruption_probability
self.max_batches_per_layer = max_batches_per_layer
self.layers = layers
# Callback attributes
self.update_callback = update_callback
self.check_frequency = timedelta(seconds=60)
# Autoencoder Object
self.autoencoder = None
def generate_mask(self, mask_prob, shape):
"""method to generate an array of shape with 1s and 0s"""
return np.random.binomial(1, 1.0 - mask_prob, shape)
def optimizer(self):
"""method to return an optimization tensor"""
return tf.train.AdagradOptimizer(self.learning_rate)
def save_trained_model(self):
"""method to picke and copy the trained model to s3"""
# Blank out update callback, since it was scoped externally
self.update_callback = None
fname = self.name + ".sda.pickle"
with open(fname, 'w') as outfile:
pickle.dump(self.__dict__, outfile)
if safe_write_to_s3(fname, self.s3OutputUrl, 'models/' + fname):
output_loc = "s3://%s/models/%s" % (self.s3OutputUrl, fname)
retval = output_loc
else:
retval = None
return retval
def fit_model(self):
"""method which trains the model"""
# refs to class objects
name = self.name
s3url = self.s3url
input_dim = self.vocabulary_dim
corruption_matrix_fn = self.generate_mask
batch_size = self.batch_size
optimizer = self.optimizer()
max_batches_per_layer = self.max_batches_per_layer
corruption_probability = self.corruption_probability
embed_dim = self.embed_dim
layers = self.layers
act = tf.nn.sigmoid
# Set up multiprocessing batch generator
base_gen = self.batch_generator(s3url, batch_size, input_dim)
q = multiprocessing.Queue(maxsize=20)
procs = [multiprocessing.Process(target=self.queue_worker, args=(q,base_gen)) for x in range(1)]
for p in procs:
p.daemon = True
p.start()
batch_generator = queue_reader(q)
layer_input_dimensions = [input_dim] + [embed_dim] * (layers - 1)
autoencoder = []
def batch_normalization(layer_input, mean, variance, scale, beta):
"""method to apply batch normalization to layer input"""
# Epsilon value for Batch Normalization
epsilon = 1e-3
layer_input_hat = (layer_input - mean) / tf.sqrt(variance + epsilon)
layer_input_norm = scale * layer_input_hat + beta
return layer_input_norm
# Main loop over layer construction for pre-training
for layer_index, layer_input_dimension in enumerate(
layer_input_dimensions, start=1):
self.update_callback(
status="active",
status_detail="Beginning to Train Layer %i" % layer_index)
tf.reset_default_graph()
# Create the constant layers first
with tf.name_scope('input'):
x = tf.placeholder(
tf.float32, [batch_size, None], name='input_data')
corruption_matrix = tf.placeholder(
tf.float32,
shape=[batch_size, None],
name='corruption_matrix')
# int16 type limits maximum batches per layer to 32767; if no. of batches per layer
# is greater than this, then use tf.int32
batch_counter = tf.placeholder(
tf.int32, None, name='batch_counter')
current_input_tensor = x
# Get output from the penultimate layer
for const_ind, const_layer in enumerate(autoencoder, start=1):
with tf.name_scope("constant_encoding_layer_%i" % const_ind):
constant_weight = tf.constant(
const_layer['W'], name="trained_weights")
constant_bias = tf.constant(
const_layer['b'], name="trained_bias")
constant_scale = tf.constant(
const_layer['scale'], name="trained_scale")
constant_beta = tf.constant(
const_layer['beta'], name="trained_beta")
constant_pop_mean = tf.constant(
const_layer['pop_mean'], name='trained_pop_mean')
constant_pop_var = tf.constant(
const_layer['pop_var'], name='trained_pop_var')
layer_input = tf.matmul(current_input_tensor,
constant_weight) + constant_bias
layer_input_norm = batch_normalization(
layer_input, constant_pop_mean, constant_pop_var,
constant_scale, constant_beta)
layer_output = act(layer_input_norm)
current_input_tensor = layer_output
reconstruct_me = current_input_tensor
# Apply mask
with tf.name_scope('masking'):
noised = tf.multiply(current_input_tensor, corruption_matrix)
# define flow of data through the encoding layer
with tf.name_scope("training_encoding_layer_%i" % layer_index):
output_dim = embed_dim
with tf.name_scope('weights'):
encoding_weights = weight_variable(
[layer_input_dimension, output_dim])
variable_summaries(encoding_weights)
two_d_tensor_image_summary('weight_image', encoding_weights, layer_input_dimension, output_dim)
with tf.name_scope('biases'):
encoding_biases = bias_variable([output_dim])
variable_summaries(encoding_biases)
with tf.name_scope('Wx_plus_b'):
preactivate = tf.matmul(noised,
encoding_weights) + encoding_biases
with tf.name_scope('batch_normalization'):
scale = tf.Variable(tf.ones([output_dim]), name='scale')
beta = tf.Variable(tf.zeros([output_dim]), name='beta')
pop_mean = tf.Variable(
tf.zeros([output_dim]), trainable=False)
pop_var = tf.Variable(
tf.ones([output_dim]), trainable=False)
# Before sending activations to decoding layer, batch normalization
batch_mean, batch_var = tf.nn.moments(preactivate, [0])
# step for training population mean and variance
# this is an expoenential moving average which weighs recent
# batch mean and variance higher than older ones
# decay = 1e-3
# train_mean = tf.assign(pop_mean, (pop_mean * decay) + (batch_mean * (1 - decay)))
# train_var = tf.assign(pop_var, (pop_var * decay) + (batch_var * (1 - decay)))
# this is an incremental average which weighs all batches equally
# NOTE : the incremental multiplier is cast to tf.float32 from tf.float64
train_mean = tf.assign(pop_mean, pop_mean + tf.cast(
tf.truediv(1, batch_counter), tf.float32) *
(batch_mean - pop_mean))
train_var = tf.assign(pop_var, pop_var + tf.cast(
tf.truediv(1, batch_counter), tf.float32) *
(batch_var - pop_var))
with tf.control_dependencies([train_mean, train_var]):
# Apply the initial batch normalizing transform
preactivate_norm = batch_normalization(
preactivate, batch_mean, batch_var, scale, beta)
y = act(preactivate_norm, name='activation')
tf.summary.histogram('activations', y)
# define flow of data through the decoding layer
with tf.name_scope("decoding_layer"):
with tf.name_scope('weights'):
decode_weights = weight_variable([embed_dim, layer_input_dimension])
tf.summary.histogram('weights', decode_weights)
with tf.name_scope('biases'):
biases = bias_variable(layer_input_dimension)
variable_summaries(biases)
with tf.name_scope('Wx_plus_b'):
preactivate_z = tf.matmul(y, decode_weights) + biases
tf.summary.histogram('preactivations', preactivate_z)
output_vals = tf.nn.sigmoid(preactivate_z)
# define loss
with tf.name_scope("loss"):
diff = tf.nn.sigmoid_cross_entropy_with_logits(preactivate_z,
reconstruct_me)
cross_entropy = tf.reduce_mean(diff)
tf.summary.scalar('cross_entropy', cross_entropy)
with tf.name_scope('reconstruction_minus_actual'):
diff = output_vals - reconstruct_me
l2loss = tf.nn.l2_loss(diff)
tf.summary.scalar('l2_loss', l2loss)
loss_fn = cross_entropy
# launch the graph with a new session to train the weights
with tf.Session() as sess:
# creates a summary writer
writer = tf.summary.FileWriter(
logdir=LOG_DIR + name +
'_pretrain_layer_{}'.format(layer_index),
graph=sess.graph,
flush_secs=10)
merged_summary = tf.summary.merge_all()
# set optimizer with current loss tensor
optimization_step = optimizer.minimize(loss_fn)
# initialize the graph
sess.run(tf.global_variables_initializer())
curtime = datetime.now()
last_writing_time = datetime.now()-timedelta(days=1)
for i, batch_input in enumerate(islice(batch_generator, max_batches_per_layer)):
print "Starting batch %i" % i
if datetime.now() - curtime > self.check_frequency:
action = self.update_callback(
status="active",
status_detail="Training layer %i" % layer_index)
if action == "stop":
break
if action == "cancel":
return
curtime = datetime.now()
batch_corruption_matrix = corruption_matrix_fn(
corruption_probability,
(batch_size, layer_input_dimension))
batch_feed = {
x: batch_input,
batch_counter: int(i + 1),
corruption_matrix: batch_corruption_matrix
}
if datetime.now() - last_writing_time > self.check_frequency:
_, summary = sess.run([optimization_step, merged_summary], feed_dict=batch_feed)
writer.add_summary(summary, i)
last_writing_time = datetime.now()
else:
sess.run([optimization_step], feed_dict=batch_feed)
# Done with training layer, save it off
layer_W, layer_b = sess.run([encoding_weights, encoding_biases])
layer_scale, layer_beta, layer_pop_mean, layer_pop_var = sess.run(
[scale, beta, pop_mean, pop_var])
trained_layer = {
'W': layer_W,
'b': layer_b,
'scale': layer_scale,
'beta': layer_beta,
'pop_mean': layer_pop_mean,
'pop_var': layer_pop_var
}
autoencoder.append(trained_layer)
# Close Summary FileWriter
writer.close()
self.autoencoder = autoencoder
# FINE TUNING STEP
self.update_callback(
status="active",
status_detail="Fine Tuning: Set up decoder training")
# Start by pre-training decoder matrix with everything else constant
tf.reset_default_graph()
with tf.name_scope('input'):
x = tf.placeholder(
tf.float32, [batch_size, input_dim], name='input_data')
reconstruct_me = x
current_input_tensor = x
for const_ind, const_layer in enumerate(autoencoder, start=1):
with tf.name_scope("constant_encoding_layer_%i" % const_ind):
constant_weight = tf.constant(
const_layer['W'], name="trained_weights")
constant_bias = tf.constant(
const_layer['b'], name="trained_bias")
constant_scale = tf.constant(
const_layer['scale'], name="trained_scale")
constant_beta = tf.constant(
const_layer['beta'], name="trained_beta")
constant_pop_mean = tf.constant(
const_layer['pop_mean'], name='trained_pop_mean')
constant_pop_var = tf.constant(
const_layer['pop_var'], name='trained_pop_var')
layer_input = tf.matmul(current_input_tensor,
constant_weight) + constant_bias
layer_input_norm = batch_normalization(
layer_input, constant_pop_mean, constant_pop_var,
constant_scale, constant_beta)
layer_output = act(layer_input_norm)
current_input_tensor = layer_output
with tf.name_scope("decoding_layer"):
with tf.name_scope('weights'):
decode_weights = weight_variable([embed_dim, input_dim])
tf.summary.histogram('weights', decode_weights)
with tf.name_scope('biases'):
biases = bias_variable(input_dim)
variable_summaries(biases)
with tf.name_scope('Wx_plus_b'):
preactivate_z = tf.matmul(current_input_tensor,
decode_weights) + biases
tf.summary.histogram('preactivations', preactivate_z)
output_vals = tf.nn.sigmoid(preactivate_z)
with tf.name_scope("loss"):
diff = tf.nn.sigmoid_cross_entropy_with_logits(preactivate_z,
reconstruct_me)
cross_entropy = tf.reduce_mean(diff)
tf.summary.scalar('cross_entropy', cross_entropy)
with tf.name_scope('reconstruction_minus_actual'):
diff = output_vals - reconstruct_me
l2loss = tf.nn.l2_loss(diff)
tf.summary.scalar('l2_loss', l2loss)
with tf.Session() as sess:
# creates a summary writer
writer = tf.summary.FileWriter(
logdir=LOG_DIR + name + '_pretrain_decoder',
graph=sess.graph,
flush_secs=10)
merged_summary = tf.summary.merge_all()
# set optimizer with current loss tensor
optimization_step = optimizer.minimize(cross_entropy)
# initialize the graph
sess.run(tf.global_variables_initializer())
for i, batch_input in enumerate(islice(batch_generator, max_batches_per_layer)):
if datetime.now() - curtime > self.check_frequency:
action = self.update_callback(
status="active",
status_detail="Fine Tuning: Pre-Training Decoder")
if action == "stop":
break
if action == "cancel":
return
curtime = datetime.now()
batch_feed = {x: batch_input}
_, summary = sess.run([optimization_step, merged_summary],
feed_dict=batch_feed)
writer.add_summary(summary, i)
writer.close()
# Done with training layer, save it off
decoderW, decoderb = sess.run([decode_weights, biases])
# now we have the initial decoder weights and biases. Fine-tune the whole shebang
# construct input graph
self.update_callback(
status="active", status_detail="Preparing to Fine Tune")
tf.reset_default_graph()
with tf.name_scope('input'):
x = tf.placeholder(
tf.float32, [batch_size, input_dim], name='input_data')
reconstruct_me = x
current_input_tensor = x
final_weight_vars = []
final_bias_vars = []
final_pop_mean = []
final_pop_var = []
final_scale = []
final_beta = []
hidden_representation = []
for ind, layer in enumerate(autoencoder, start=1):
with tf.name_scope("encoding_layer_%i" % ind):
weight = tf.Variable(layer['W'], name="pretrained_weights")
bias = tf.Variable(layer['b'], name="pretrained_bias")
scale = tf.constant(layer['scale'], name="pretrained_scale")
beta = tf.constant(layer['beta'], name="pretrained_beta")
pop_mean = tf.constant(
layer['pop_mean'], name='pretrained_pop_mean')
pop_var = tf.constant(
layer['pop_var'], name='pretrained_pop_var')
final_weight_vars.append(weight)
final_bias_vars.append(bias)
final_scale.append(scale)
final_beta.append(beta)
final_pop_mean.append(pop_mean)
final_pop_var.append(pop_var)
layer_input = tf.matmul(current_input_tensor, weight) + bias
layer_input_norm = batch_normalization(layer_input, pop_mean,
pop_var, scale, beta)
layer_output = act(layer_input_norm)
hidden_representation.append(layer_output)
current_input_tensor = layer_output
with tf.name_scope("decoding_layer"):
with tf.name_scope('weights'):
decode_weights = tf.Variable(decoderW)
tf.summary.histogram('weights', decode_weights)
with tf.name_scope('biases'):
biases = tf.Variable(decoderb)
variable_summaries(biases)
with tf.name_scope('Wx_plus_b'):
preactivate_z = tf.matmul(current_input_tensor,
decode_weights) + biases
tf.summary.histogram('preactivations', preactivate_z)
output_vals = tf.nn.sigmoid(preactivate_z)
with tf.name_scope("loss"):
diff = tf.nn.sigmoid_cross_entropy_with_logits(preactivate_z,
reconstruct_me)
cross_entropy = tf.reduce_mean(diff)
tf.summary.scalar('cross_entropy', cross_entropy)
with tf.name_scope('reconstruction_minus_actual'):
diff = output_vals - reconstruct_me
l2loss = tf.nn.l2_loss(diff)
tf.summary.scalar('l2_loss', l2loss)
loss_fn = cross_entropy
# launch a fine-tuning session
with tf.Session() as sess:
print "Started fine-tuning"
# creates a summary writer
writer = tf.summary.FileWriter(
logdir=LOG_DIR + name + '_finetune',
graph=sess.graph,
flush_secs=10)
merged_summary = tf.summary.merge_all()
# set optimizer with current loss tensor
optimization_step = optimizer.minimize(loss_fn)
# initialize the graph
sess.run(tf.global_variables_initializer())
for i, batch_input in enumerate(islice(batch_generator, max_batches_per_layer)):
if datetime.now() - curtime > self.check_frequency:
action = self.update_callback(status="active", status_detail="Fine Tuning")
if action == "stop":
break
if action == "cancel":
return
curtime = datetime.now()
batch_feed = {x: batch_input}
_, summary = sess.run([optimization_step, merged_summary],
feed_dict=batch_feed)
writer.add_summary(summary, i)
# All done, shut down queue and workers
q.close()
for p in procs:
p.terminate()
# All done, but session is still open
wvals = sess.run(final_weight_vars)
bvals = sess.run(final_bias_vars)
scale_vals = sess.run(final_scale)
beta_vals = sess.run(final_beta)
pop_mean_vals = sess.run(final_pop_mean)
pop_var_vals = sess.run(final_pop_var)
autoencoder = [{
'W': layer_W,
'b': layer_b,
'scale': layer_scale,
'beta': layer_beta,
'pop_mean': layer_pop_mean,
'pop_var': layer_pop_var
} for layer_W, layer_b, layer_scale, layer_beta, layer_pop_mean, layer_pop_var in
zip(wvals, bvals, scale_vals, beta_vals, pop_mean_vals,
pop_var_vals)]
action = self.update_callback(
status="active", status_detail="Saving Model to S3")
self.autoencoder = autoencoder
return autoencoder
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment