-
-
Save shkr/326946579451f80f98a5e7d11eef27b4 to your computer and use it in GitHub Desktop.
stacked_denoising_autoencoder
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from itertools import islice | |
import tensorflow as tf | |
import multiprocessing | |
from datetime import datetime, timedelta | |
import cPickle as pickle | |
import time | |
import logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger('SDA') | |
LOG_DIR = 'summaries/' | |
def weight_variable(shape): | |
"""fn returns a weight variable of shape and truncated normal values""" | |
return tf.Variable(tf.truncated_normal(shape, stddev=0.2)) | |
def bias_variable(shape): | |
"""fn returns a bias variable with zeros init""" | |
return tf.Variable(tf.zeros(shape)) | |
def variable_summaries(var): | |
"""fn attaches a basic set of statistics of the variable, to tensorboard summaries""" | |
with tf.name_scope('summaries'): | |
mean = tf.reduce_mean(var) | |
tf.summary.scalar('mean', mean) | |
with tf.name_scope('stddev'): | |
stddev = tf.sqrt(tf.reduce_mean(tf.square(var - mean))) | |
tf.summary.scalar('stddev', stddev) | |
tf.summary.scalar('max', tf.reduce_max(var)) | |
tf.summary.scalar('min', tf.reduce_min(var)) | |
tf.summary.histogram('histogram', var) | |
def two_d_tensor_image_summary(name, tensor, rows, columns): | |
"""fn transforms a tensor to a tf.image.Summary""" | |
four_d_tensor = tf.reshape(tensor, [rows, columns, 1, 1]) | |
return tf.summary.image(name, four_d_tensor) | |
def queue_worker(q, generator): | |
"""fn iteratively inserts value into queue""" | |
while True: | |
q.put(generator.next()) | |
def queue_reader(q): | |
"""fn returns a queue reader iterator""" | |
while True: | |
yield q.get() | |
def safe_write_to_s3(localfile, bucket, s3filename): | |
"""fn to safely write a local file to a s3 bucket""" | |
import boto3 | |
s3 = boto3.resource('s3') | |
did_it = False | |
for i in range(5): | |
try: | |
s3.Bucket(bucket).upload_file(localfile,s3filename) | |
did_it = True | |
except Exception as e: | |
print e | |
time.sleep(1) | |
continue | |
break | |
return did_it | |
class StackedDenoisingAutoencoder(object): | |
def __init__(self, | |
update_callback, | |
name, | |
input_dimension, | |
layers, | |
batch_size, | |
embed_dim, | |
learning_rate, | |
curroption_probability, | |
max_batches_per_layer, | |
batch_generator, | |
s3OutputUrl, | |
**kwargs): | |
R""" | |
Stacked Denoising Autoencoder | |
Parameters | |
----- | |
name str | |
A name for the model | |
input_dimension int | |
The input dimension of the vector | |
layers | |
The number of layers in the sda | |
batch_size int | |
The number of items in each batch | |
embed_dim int | |
The number of dimensions in the embedding | |
learning_rate float | |
The intial learning rate for the optimizer | |
curroption_probability float | |
The fraction of nodes in the input layer that | |
are not curropted in a sda training step | |
max_batches_per_layer int | |
The maximum number of batches to train each layer | |
layers | |
The number of layers in the SDA | |
update_callback | |
A callback function to send messages during the training | |
and model serializing process | |
batch_generator iterator | |
An iterator that generates a matrix of size batch_size x input_dimension | |
s3OutputUrl str | |
The s3 url where results will be serialized | |
""" | |
# Attributes | |
self.name = name | |
self.s3OutputUrl = s3OutputUrl | |
self.vocabulary_dim = input_dimension | |
# Hyperparameters | |
self.batch_size = batch_size | |
self.embed_dim = embed_dim | |
self.learning_rate = learning_rate | |
self.corruption_probability = corruption_probability | |
self.max_batches_per_layer = max_batches_per_layer | |
self.layers = layers | |
# Callback attributes | |
self.update_callback = update_callback | |
self.check_frequency = timedelta(seconds=60) | |
# Autoencoder Object | |
self.autoencoder = None | |
def generate_mask(self, mask_prob, shape): | |
"""method to generate an array of shape with 1s and 0s""" | |
return np.random.binomial(1, 1.0 - mask_prob, shape) | |
def optimizer(self): | |
"""method to return an optimization tensor""" | |
return tf.train.AdagradOptimizer(self.learning_rate) | |
def save_trained_model(self): | |
"""method to picke and copy the trained model to s3""" | |
# Blank out update callback, since it was scoped externally | |
self.update_callback = None | |
fname = self.name + ".sda.pickle" | |
with open(fname, 'w') as outfile: | |
pickle.dump(self.__dict__, outfile) | |
if safe_write_to_s3(fname, self.s3OutputUrl, 'models/' + fname): | |
output_loc = "s3://%s/models/%s" % (self.s3OutputUrl, fname) | |
retval = output_loc | |
else: | |
retval = None | |
return retval | |
def fit_model(self): | |
"""method which trains the model""" | |
# refs to class objects | |
name = self.name | |
s3url = self.s3url | |
input_dim = self.vocabulary_dim | |
corruption_matrix_fn = self.generate_mask | |
batch_size = self.batch_size | |
optimizer = self.optimizer() | |
max_batches_per_layer = self.max_batches_per_layer | |
corruption_probability = self.corruption_probability | |
embed_dim = self.embed_dim | |
layers = self.layers | |
act = tf.nn.sigmoid | |
# Set up multiprocessing batch generator | |
base_gen = self.batch_generator(s3url, batch_size, input_dim) | |
q = multiprocessing.Queue(maxsize=20) | |
procs = [multiprocessing.Process(target=self.queue_worker, args=(q,base_gen)) for x in range(1)] | |
for p in procs: | |
p.daemon = True | |
p.start() | |
batch_generator = queue_reader(q) | |
layer_input_dimensions = [input_dim] + [embed_dim] * (layers - 1) | |
autoencoder = [] | |
def batch_normalization(layer_input, mean, variance, scale, beta): | |
"""method to apply batch normalization to layer input""" | |
# Epsilon value for Batch Normalization | |
epsilon = 1e-3 | |
layer_input_hat = (layer_input - mean) / tf.sqrt(variance + epsilon) | |
layer_input_norm = scale * layer_input_hat + beta | |
return layer_input_norm | |
# Main loop over layer construction for pre-training | |
for layer_index, layer_input_dimension in enumerate( | |
layer_input_dimensions, start=1): | |
self.update_callback( | |
status="active", | |
status_detail="Beginning to Train Layer %i" % layer_index) | |
tf.reset_default_graph() | |
# Create the constant layers first | |
with tf.name_scope('input'): | |
x = tf.placeholder( | |
tf.float32, [batch_size, None], name='input_data') | |
corruption_matrix = tf.placeholder( | |
tf.float32, | |
shape=[batch_size, None], | |
name='corruption_matrix') | |
# int16 type limits maximum batches per layer to 32767; if no. of batches per layer | |
# is greater than this, then use tf.int32 | |
batch_counter = tf.placeholder( | |
tf.int32, None, name='batch_counter') | |
current_input_tensor = x | |
# Get output from the penultimate layer | |
for const_ind, const_layer in enumerate(autoencoder, start=1): | |
with tf.name_scope("constant_encoding_layer_%i" % const_ind): | |
constant_weight = tf.constant( | |
const_layer['W'], name="trained_weights") | |
constant_bias = tf.constant( | |
const_layer['b'], name="trained_bias") | |
constant_scale = tf.constant( | |
const_layer['scale'], name="trained_scale") | |
constant_beta = tf.constant( | |
const_layer['beta'], name="trained_beta") | |
constant_pop_mean = tf.constant( | |
const_layer['pop_mean'], name='trained_pop_mean') | |
constant_pop_var = tf.constant( | |
const_layer['pop_var'], name='trained_pop_var') | |
layer_input = tf.matmul(current_input_tensor, | |
constant_weight) + constant_bias | |
layer_input_norm = batch_normalization( | |
layer_input, constant_pop_mean, constant_pop_var, | |
constant_scale, constant_beta) | |
layer_output = act(layer_input_norm) | |
current_input_tensor = layer_output | |
reconstruct_me = current_input_tensor | |
# Apply mask | |
with tf.name_scope('masking'): | |
noised = tf.multiply(current_input_tensor, corruption_matrix) | |
# define flow of data through the encoding layer | |
with tf.name_scope("training_encoding_layer_%i" % layer_index): | |
output_dim = embed_dim | |
with tf.name_scope('weights'): | |
encoding_weights = weight_variable( | |
[layer_input_dimension, output_dim]) | |
variable_summaries(encoding_weights) | |
two_d_tensor_image_summary('weight_image', encoding_weights, layer_input_dimension, output_dim) | |
with tf.name_scope('biases'): | |
encoding_biases = bias_variable([output_dim]) | |
variable_summaries(encoding_biases) | |
with tf.name_scope('Wx_plus_b'): | |
preactivate = tf.matmul(noised, | |
encoding_weights) + encoding_biases | |
with tf.name_scope('batch_normalization'): | |
scale = tf.Variable(tf.ones([output_dim]), name='scale') | |
beta = tf.Variable(tf.zeros([output_dim]), name='beta') | |
pop_mean = tf.Variable( | |
tf.zeros([output_dim]), trainable=False) | |
pop_var = tf.Variable( | |
tf.ones([output_dim]), trainable=False) | |
# Before sending activations to decoding layer, batch normalization | |
batch_mean, batch_var = tf.nn.moments(preactivate, [0]) | |
# step for training population mean and variance | |
# this is an expoenential moving average which weighs recent | |
# batch mean and variance higher than older ones | |
# decay = 1e-3 | |
# train_mean = tf.assign(pop_mean, (pop_mean * decay) + (batch_mean * (1 - decay))) | |
# train_var = tf.assign(pop_var, (pop_var * decay) + (batch_var * (1 - decay))) | |
# this is an incremental average which weighs all batches equally | |
# NOTE : the incremental multiplier is cast to tf.float32 from tf.float64 | |
train_mean = tf.assign(pop_mean, pop_mean + tf.cast( | |
tf.truediv(1, batch_counter), tf.float32) * | |
(batch_mean - pop_mean)) | |
train_var = tf.assign(pop_var, pop_var + tf.cast( | |
tf.truediv(1, batch_counter), tf.float32) * | |
(batch_var - pop_var)) | |
with tf.control_dependencies([train_mean, train_var]): | |
# Apply the initial batch normalizing transform | |
preactivate_norm = batch_normalization( | |
preactivate, batch_mean, batch_var, scale, beta) | |
y = act(preactivate_norm, name='activation') | |
tf.summary.histogram('activations', y) | |
# define flow of data through the decoding layer | |
with tf.name_scope("decoding_layer"): | |
with tf.name_scope('weights'): | |
decode_weights = weight_variable([embed_dim, layer_input_dimension]) | |
tf.summary.histogram('weights', decode_weights) | |
with tf.name_scope('biases'): | |
biases = bias_variable(layer_input_dimension) | |
variable_summaries(biases) | |
with tf.name_scope('Wx_plus_b'): | |
preactivate_z = tf.matmul(y, decode_weights) + biases | |
tf.summary.histogram('preactivations', preactivate_z) | |
output_vals = tf.nn.sigmoid(preactivate_z) | |
# define loss | |
with tf.name_scope("loss"): | |
diff = tf.nn.sigmoid_cross_entropy_with_logits(preactivate_z, | |
reconstruct_me) | |
cross_entropy = tf.reduce_mean(diff) | |
tf.summary.scalar('cross_entropy', cross_entropy) | |
with tf.name_scope('reconstruction_minus_actual'): | |
diff = output_vals - reconstruct_me | |
l2loss = tf.nn.l2_loss(diff) | |
tf.summary.scalar('l2_loss', l2loss) | |
loss_fn = cross_entropy | |
# launch the graph with a new session to train the weights | |
with tf.Session() as sess: | |
# creates a summary writer | |
writer = tf.summary.FileWriter( | |
logdir=LOG_DIR + name + | |
'_pretrain_layer_{}'.format(layer_index), | |
graph=sess.graph, | |
flush_secs=10) | |
merged_summary = tf.summary.merge_all() | |
# set optimizer with current loss tensor | |
optimization_step = optimizer.minimize(loss_fn) | |
# initialize the graph | |
sess.run(tf.global_variables_initializer()) | |
curtime = datetime.now() | |
last_writing_time = datetime.now()-timedelta(days=1) | |
for i, batch_input in enumerate(islice(batch_generator, max_batches_per_layer)): | |
print "Starting batch %i" % i | |
if datetime.now() - curtime > self.check_frequency: | |
action = self.update_callback( | |
status="active", | |
status_detail="Training layer %i" % layer_index) | |
if action == "stop": | |
break | |
if action == "cancel": | |
return | |
curtime = datetime.now() | |
batch_corruption_matrix = corruption_matrix_fn( | |
corruption_probability, | |
(batch_size, layer_input_dimension)) | |
batch_feed = { | |
x: batch_input, | |
batch_counter: int(i + 1), | |
corruption_matrix: batch_corruption_matrix | |
} | |
if datetime.now() - last_writing_time > self.check_frequency: | |
_, summary = sess.run([optimization_step, merged_summary], feed_dict=batch_feed) | |
writer.add_summary(summary, i) | |
last_writing_time = datetime.now() | |
else: | |
sess.run([optimization_step], feed_dict=batch_feed) | |
# Done with training layer, save it off | |
layer_W, layer_b = sess.run([encoding_weights, encoding_biases]) | |
layer_scale, layer_beta, layer_pop_mean, layer_pop_var = sess.run( | |
[scale, beta, pop_mean, pop_var]) | |
trained_layer = { | |
'W': layer_W, | |
'b': layer_b, | |
'scale': layer_scale, | |
'beta': layer_beta, | |
'pop_mean': layer_pop_mean, | |
'pop_var': layer_pop_var | |
} | |
autoencoder.append(trained_layer) | |
# Close Summary FileWriter | |
writer.close() | |
self.autoencoder = autoencoder | |
# FINE TUNING STEP | |
self.update_callback( | |
status="active", | |
status_detail="Fine Tuning: Set up decoder training") | |
# Start by pre-training decoder matrix with everything else constant | |
tf.reset_default_graph() | |
with tf.name_scope('input'): | |
x = tf.placeholder( | |
tf.float32, [batch_size, input_dim], name='input_data') | |
reconstruct_me = x | |
current_input_tensor = x | |
for const_ind, const_layer in enumerate(autoencoder, start=1): | |
with tf.name_scope("constant_encoding_layer_%i" % const_ind): | |
constant_weight = tf.constant( | |
const_layer['W'], name="trained_weights") | |
constant_bias = tf.constant( | |
const_layer['b'], name="trained_bias") | |
constant_scale = tf.constant( | |
const_layer['scale'], name="trained_scale") | |
constant_beta = tf.constant( | |
const_layer['beta'], name="trained_beta") | |
constant_pop_mean = tf.constant( | |
const_layer['pop_mean'], name='trained_pop_mean') | |
constant_pop_var = tf.constant( | |
const_layer['pop_var'], name='trained_pop_var') | |
layer_input = tf.matmul(current_input_tensor, | |
constant_weight) + constant_bias | |
layer_input_norm = batch_normalization( | |
layer_input, constant_pop_mean, constant_pop_var, | |
constant_scale, constant_beta) | |
layer_output = act(layer_input_norm) | |
current_input_tensor = layer_output | |
with tf.name_scope("decoding_layer"): | |
with tf.name_scope('weights'): | |
decode_weights = weight_variable([embed_dim, input_dim]) | |
tf.summary.histogram('weights', decode_weights) | |
with tf.name_scope('biases'): | |
biases = bias_variable(input_dim) | |
variable_summaries(biases) | |
with tf.name_scope('Wx_plus_b'): | |
preactivate_z = tf.matmul(current_input_tensor, | |
decode_weights) + biases | |
tf.summary.histogram('preactivations', preactivate_z) | |
output_vals = tf.nn.sigmoid(preactivate_z) | |
with tf.name_scope("loss"): | |
diff = tf.nn.sigmoid_cross_entropy_with_logits(preactivate_z, | |
reconstruct_me) | |
cross_entropy = tf.reduce_mean(diff) | |
tf.summary.scalar('cross_entropy', cross_entropy) | |
with tf.name_scope('reconstruction_minus_actual'): | |
diff = output_vals - reconstruct_me | |
l2loss = tf.nn.l2_loss(diff) | |
tf.summary.scalar('l2_loss', l2loss) | |
with tf.Session() as sess: | |
# creates a summary writer | |
writer = tf.summary.FileWriter( | |
logdir=LOG_DIR + name + '_pretrain_decoder', | |
graph=sess.graph, | |
flush_secs=10) | |
merged_summary = tf.summary.merge_all() | |
# set optimizer with current loss tensor | |
optimization_step = optimizer.minimize(cross_entropy) | |
# initialize the graph | |
sess.run(tf.global_variables_initializer()) | |
for i, batch_input in enumerate(islice(batch_generator, max_batches_per_layer)): | |
if datetime.now() - curtime > self.check_frequency: | |
action = self.update_callback( | |
status="active", | |
status_detail="Fine Tuning: Pre-Training Decoder") | |
if action == "stop": | |
break | |
if action == "cancel": | |
return | |
curtime = datetime.now() | |
batch_feed = {x: batch_input} | |
_, summary = sess.run([optimization_step, merged_summary], | |
feed_dict=batch_feed) | |
writer.add_summary(summary, i) | |
writer.close() | |
# Done with training layer, save it off | |
decoderW, decoderb = sess.run([decode_weights, biases]) | |
# now we have the initial decoder weights and biases. Fine-tune the whole shebang | |
# construct input graph | |
self.update_callback( | |
status="active", status_detail="Preparing to Fine Tune") | |
tf.reset_default_graph() | |
with tf.name_scope('input'): | |
x = tf.placeholder( | |
tf.float32, [batch_size, input_dim], name='input_data') | |
reconstruct_me = x | |
current_input_tensor = x | |
final_weight_vars = [] | |
final_bias_vars = [] | |
final_pop_mean = [] | |
final_pop_var = [] | |
final_scale = [] | |
final_beta = [] | |
hidden_representation = [] | |
for ind, layer in enumerate(autoencoder, start=1): | |
with tf.name_scope("encoding_layer_%i" % ind): | |
weight = tf.Variable(layer['W'], name="pretrained_weights") | |
bias = tf.Variable(layer['b'], name="pretrained_bias") | |
scale = tf.constant(layer['scale'], name="pretrained_scale") | |
beta = tf.constant(layer['beta'], name="pretrained_beta") | |
pop_mean = tf.constant( | |
layer['pop_mean'], name='pretrained_pop_mean') | |
pop_var = tf.constant( | |
layer['pop_var'], name='pretrained_pop_var') | |
final_weight_vars.append(weight) | |
final_bias_vars.append(bias) | |
final_scale.append(scale) | |
final_beta.append(beta) | |
final_pop_mean.append(pop_mean) | |
final_pop_var.append(pop_var) | |
layer_input = tf.matmul(current_input_tensor, weight) + bias | |
layer_input_norm = batch_normalization(layer_input, pop_mean, | |
pop_var, scale, beta) | |
layer_output = act(layer_input_norm) | |
hidden_representation.append(layer_output) | |
current_input_tensor = layer_output | |
with tf.name_scope("decoding_layer"): | |
with tf.name_scope('weights'): | |
decode_weights = tf.Variable(decoderW) | |
tf.summary.histogram('weights', decode_weights) | |
with tf.name_scope('biases'): | |
biases = tf.Variable(decoderb) | |
variable_summaries(biases) | |
with tf.name_scope('Wx_plus_b'): | |
preactivate_z = tf.matmul(current_input_tensor, | |
decode_weights) + biases | |
tf.summary.histogram('preactivations', preactivate_z) | |
output_vals = tf.nn.sigmoid(preactivate_z) | |
with tf.name_scope("loss"): | |
diff = tf.nn.sigmoid_cross_entropy_with_logits(preactivate_z, | |
reconstruct_me) | |
cross_entropy = tf.reduce_mean(diff) | |
tf.summary.scalar('cross_entropy', cross_entropy) | |
with tf.name_scope('reconstruction_minus_actual'): | |
diff = output_vals - reconstruct_me | |
l2loss = tf.nn.l2_loss(diff) | |
tf.summary.scalar('l2_loss', l2loss) | |
loss_fn = cross_entropy | |
# launch a fine-tuning session | |
with tf.Session() as sess: | |
print "Started fine-tuning" | |
# creates a summary writer | |
writer = tf.summary.FileWriter( | |
logdir=LOG_DIR + name + '_finetune', | |
graph=sess.graph, | |
flush_secs=10) | |
merged_summary = tf.summary.merge_all() | |
# set optimizer with current loss tensor | |
optimization_step = optimizer.minimize(loss_fn) | |
# initialize the graph | |
sess.run(tf.global_variables_initializer()) | |
for i, batch_input in enumerate(islice(batch_generator, max_batches_per_layer)): | |
if datetime.now() - curtime > self.check_frequency: | |
action = self.update_callback(status="active", status_detail="Fine Tuning") | |
if action == "stop": | |
break | |
if action == "cancel": | |
return | |
curtime = datetime.now() | |
batch_feed = {x: batch_input} | |
_, summary = sess.run([optimization_step, merged_summary], | |
feed_dict=batch_feed) | |
writer.add_summary(summary, i) | |
# All done, shut down queue and workers | |
q.close() | |
for p in procs: | |
p.terminate() | |
# All done, but session is still open | |
wvals = sess.run(final_weight_vars) | |
bvals = sess.run(final_bias_vars) | |
scale_vals = sess.run(final_scale) | |
beta_vals = sess.run(final_beta) | |
pop_mean_vals = sess.run(final_pop_mean) | |
pop_var_vals = sess.run(final_pop_var) | |
autoencoder = [{ | |
'W': layer_W, | |
'b': layer_b, | |
'scale': layer_scale, | |
'beta': layer_beta, | |
'pop_mean': layer_pop_mean, | |
'pop_var': layer_pop_var | |
} for layer_W, layer_b, layer_scale, layer_beta, layer_pop_mean, layer_pop_var in | |
zip(wvals, bvals, scale_vals, beta_vals, pop_mean_vals, | |
pop_var_vals)] | |
action = self.update_callback( | |
status="active", status_detail="Saving Model to S3") | |
self.autoencoder = autoencoder | |
return autoencoder |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment