Skip to content

Instantly share code, notes, and snippets.

@usmcamp0811
Created April 19, 2017 20:34
Show Gist options
  • Save usmcamp0811/0e2d959797eb5cde9c3c4e7a595a5dd0 to your computer and use it in GitHub Desktop.
Save usmcamp0811/0e2d959797eb5cde9c3c4e7a595a5dd0 to your computer and use it in GitHub Desktop.
import numpy as np
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
from sklearn import datasets
from sklearn.mixture import GaussianMixture
from sklearn.model_selection import StratifiedKFold
#%matplotlib qt
from sklearn.cluster import KMeans
import numpy as np
from sklearn.manifold import TSNE
import pandas as pd
from mpl_toolkits.mplot3d import Axes3D
import matplotlib.pyplot as plt
np.random.seed(0)
tf.set_random_seed(0)
mnist = input_data.read_data_sets('MNIST_data', one_hot=True)
n_samples = mnist.train.num_examples
def xavier_init(fan_in, fan_out, constant=1):
""" Xavier initialization of network weights"""
# https://stackoverflow.com/questions/33640581/how-to-do-xavier-initialization-on-tensorflow
low = -constant*np.sqrt(6.0/(fan_in + fan_out))
high = constant*np.sqrt(6.0/(fan_in + fan_out))
return tf.random_uniform((fan_in, fan_out),
minval=low, maxval=high,
dtype=tf.float32)
class VariationalAutoencoder(object):
""" Variation Autoencoder (VAE) with an sklearn-like interface implemented using TensorFlow.
This implementation uses probabilistic encoders and decoders using Gaussian
distributions and realized by multi-layer perceptrons. The VAE can be learned
end-to-end.
See "Auto-Encoding Variational Bayes" by Kingma and Welling for more details.
"""
def __init__(self, network_architecture, transfer_fct=tf.nn.softplus,
learning_rate=0.0001, batch_size=100):
LOGDIR = '/home/mcamp/PycharmProjects/VAE_tutorials/'
GITHUB_URL = 'https://raw.githubusercontent.com/mamcgrath/TensorBoard-TF-Dev-Summit-Tutorial/master/'
self.network_architecture = network_architecture
self.transfer_fct = transfer_fct
self.learning_rate = learning_rate
self.batch_size = batch_size
# tf Graph input
self.x = tf.placeholder(tf.float32, [None, network_architecture["n_input"]])
# Create autoencoder network
self._create_network()
# Define loss function based variational upper-bound and
# corresponding optimizer
self._create_loss_optimizer()
# Initializing the tensor flow variables
init = tf.global_variables_initializer()
# Launch the session
# self.sess = tf.InteractiveSession()
self.sess = tf.Session()
self.train_writer = tf.summary.FileWriter('./train', self.sess.graph)
# self.test_writer = tf.summary.FileWriter('./test', self.sess.graph)
self.sess.run(init)
config = tf.contrib.tensorboard.plugins.projector.ProjectorConfig()
## You can add multiple embeddings. Here we add only one.
z_embedding_config = config.embeddings.add()
zm_embedding_config = config.embeddings.add()
# in_embedding_config = config.embeddings.add()
# out_embedding_config = config.embeddings.add()
z_embedding_config.tensor_name = self.z_embedding.name
zm_embedding_config.tensor_name = self.zm_embedding.name
# in_embedding_config.tensor_name = self.in_embedding.name
# out_embedding_config.tensor_name = self.out_embedding.name
z_embedding_config.sprite.image_path = LOGDIR + 'sprite_1024.png'
zm_embedding_config.sprite.image_path = LOGDIR + 'sprite_1024.png'
# in_embedding_config.sprite.image_path = LOGDIR + 'sprite_1024.png'
# out_embedding_config.sprite.image_path = LOGDIR + 'sprite_1024.png'
z_embedding_config.metadata_path = LOGDIR + 'labels_1024.tsv'
zm_embedding_config.metadata_path = LOGDIR + 'labels_1024.tsv'
# in_embedding_config.metadata_path = LOGDIR + 'labels_1024.tsv'
# out_embedding_config.metadata_path = LOGDIR + 'labels_1024.tsv'
# Specify the width and height of a single thumbnail.
z_embedding_config.sprite.single_image_dim.extend([28, 28])
zm_embedding_config.sprite.single_image_dim.extend([28, 28])
# in_embedding_config.sprite.single_image_dim.extend([28, 28])
# out_embedding_config.sprite.single_image_dim.extend([28, 28])
tf.contrib.tensorboard.plugins.projector.visualize_embeddings(self.train_writer, config)
def _create_network(self):
# Initialize autoencode network weights and biases
network_weights = self._initialize_weights(**self.network_architecture)
# Use recognition network to determine mean and
# (log) variance of Gaussian distribution in latent
# space
self.z_mean, self.z_log_sigma_sq = \
self._recognition_network(network_weights["weights_recog"],
network_weights["biases_recog"])
# Draw one sample z from Gaussian distribution
n_z = self.network_architecture["n_z"]
eps = tf.random_normal((self.batch_size, n_z), 0, 1,
dtype=tf.float32)
with tf.name_scope('z'):
# z = mu + sigma*epsilon
self.z = tf.add(self.z_mean,
tf.multiply(tf.sqrt(tf.exp(self.z_log_sigma_sq)), eps))
with tf.name_scope('x_reconstr_mean'):
# Use generator to determine mean of
# Bernoulli distribution of reconstructed input
self.x_reconstr_mean = \
self._generator_network(network_weights["weights_gener"],
network_weights["biases_gener"])
z_embedding_input = self.z
zm_embedding_input = self.z_mean
in_embedding_input = self.z_log_sigma_sq
out_embedding_input = self.x_reconstr_mean
LOGDIR = '/home/mcamp/PycharmProjects/VAE_tutorials/'
GITHUB_URL = 'https://raw.githubusercontent.com/mamcgrath/TensorBoard-TF-Dev-Summit-Tutorial/master/'
self.z_embedding = tf.Variable(tf.zeros([self.batch_size, self.network_architecture['n_z']]), name="z")
self.zm_embedding = tf.Variable(tf.zeros([self.batch_size, self.network_architecture['n_z']]), name="z_mean")
# self.in_embedding = tf.Variable(tf.zeros([self.batch_size, 20]), name="in")
# self.out_embedding = tf.Variable(tf.zeros([self.batch_size, 784]), name="out")
# give it calculated embedding
z_assignment = self.z_embedding.assign(z_embedding_input)
zm_assignment = self.zm_embedding.assign(zm_embedding_input)
# in_assignment = self.in_embedding.assign(in_embedding_input)
# out_assignment = self.out_embedding.assign(out_embedding_input)
## Format: tensorflow/contrib/tensorboard/plugins/projector/projector_config.proto
def _initialize_weights(self, n_hidden_recog_1, n_hidden_recog_2,
n_hidden_gener_1, n_hidden_gener_2,
n_input, n_z):
all_weights = dict()
with tf.name_scope('Encoder_Weights'):
all_weights['weights_recog'] = {
'h1': tf.Variable(xavier_init(n_input, n_hidden_recog_1)),
'h2': tf.Variable(xavier_init(n_hidden_recog_1, n_hidden_recog_2)),
'out_mean': tf.Variable(xavier_init(n_hidden_recog_2, n_z)),
'out_log_sigma': tf.Variable(xavier_init(n_hidden_recog_2, n_z))}
all_weights['biases_recog'] = {
'b1': tf.Variable(tf.zeros([n_hidden_recog_1], dtype=tf.float32)),
'b2': tf.Variable(tf.zeros([n_hidden_recog_2], dtype=tf.float32)),
'out_mean': tf.Variable(tf.zeros([n_z], dtype=tf.float32)),
'out_log_sigma': tf.Variable(tf.zeros([n_z], dtype=tf.float32))}
with tf.name_scope('Decoder_Weights'):
all_weights['weights_gener'] = {
'h1': tf.Variable(xavier_init(n_z, n_hidden_gener_1)),
'h2': tf.Variable(xavier_init(n_hidden_gener_1, n_hidden_gener_2)),
'out_mean': tf.Variable(xavier_init(n_hidden_gener_2, n_input)),
'out_log_sigma': tf.Variable(xavier_init(n_hidden_gener_2, n_input))}
all_weights['biases_gener'] = {
'b1': tf.Variable(tf.zeros([n_hidden_gener_1], dtype=tf.float32)),
'b2': tf.Variable(tf.zeros([n_hidden_gener_2], dtype=tf.float32)),
'out_mean': tf.Variable(tf.zeros([n_input], dtype=tf.float32)),
'out_log_sigma': tf.Variable(tf.zeros([n_input], dtype=tf.float32))}
return all_weights
def _recognition_network(self, weights, biases):
# Generate probabilistic encoder (recognition network), which
# maps inputs onto a normal distribution in latent space.
# The transformation is parametrized and can be learned.
with tf.name_scope('recognition_network'):
layer_1 = self.transfer_fct(tf.add(tf.matmul(self.x, weights['h1']),
biases['b1']), name='EncoderLayer1')
layer_2 = self.transfer_fct(tf.add(tf.matmul(layer_1, weights['h2']),
biases['b2']), name='EncoderLayer2')
z_mean = tf.add(tf.matmul(layer_2, weights['out_mean']),
biases['out_mean'], name='EncoderZ_Mean')
z_log_sigma_sq = \
tf.add(tf.matmul(layer_2, weights['out_log_sigma']),
biases['out_log_sigma'], name='EncoderZ_log_sigma_sq')
return (z_mean, z_log_sigma_sq)
def _generator_network(self, weights, biases):
# Generate probabilistic decoder (decoder network), which
# maps points in latent space onto a Bernoulli distribution in data space.
# The transformation is parametrized and can be learned.
with tf.name_scope('generator_network'):
layer_1 = self.transfer_fct(tf.add(tf.matmul(self.z, weights['h1']),
biases['b1']), name='DecoderLayer1')
layer_2 = self.transfer_fct(tf.add(tf.matmul(layer_1, weights['h2']),
biases['b2']), name='DecoderLayer2')
x_reconstr_mean = \
tf.nn.sigmoid(tf.add(tf.matmul(layer_2, weights['out_mean']),
biases['out_mean']), name='DecoderX_reconstr_mean')
return x_reconstr_mean
def _create_loss_optimizer(self):
# The loss is composed of two terms:
# 1.) The reconstruction loss (the negative log probability
# of the input under the reconstructed Bernoulli distribution
# induced by the decoder in the data space).
# This can be interpreted as the number of "nats" required
# for reconstructing the input when the activation in latent
# is given.
# Adding 1e-10 to avoid evaluation of log(0.0)
reconstr_loss = \
-tf.reduce_sum(self.x * tf.log(1e-10 + self.x_reconstr_mean)
+ (1 - self.x) * tf.log(1e-10 + 1 - self.x_reconstr_mean),
1)
# 2.) The latent loss, which is defined as the Kullback Leibler divergence
## between the distribution in latent space induced by the encoder on
# the data and some prior. This acts as a kind of regularizer.
# This can be interpreted as the number of "nats" required
# for transmitting the the latent space distribution given
# the prior.
latent_loss = -0.5 * tf.reduce_sum(1 + self.z_log_sigma_sq
- tf.square(self.z_mean)
- tf.exp(self.z_log_sigma_sq), 1)
self.cost = tf.reduce_mean(reconstr_loss + latent_loss) # average over batch
# Use ADAM optimizer
self.optimizer = \
tf.train.AdamOptimizer(learning_rate=self.learning_rate).minimize(self.cost)
tf.summary.scalar('Cost_', self.cost)
tf.summary.scalar('Recontr_loss', tf.reduce_mean(reconstr_loss))
# tf.summary.scalar('Latent_loss', latent_loss)
def partial_fit(self, X):
"""Train model based on mini-batch of input data.
Return cost of mini-batch"""
opt, cost = self.sess.run((self.optimizer, self.cost), feed_dict={self.x: X})
return cost
def transform(self, X):
"""Transform data by mapping it into the latent space.
Note: This maps to mean of distribution, we could alternatively sample from Gaussian distribution"""
return self.sess.run(self.z_mean, feed_dict={self.x: X})
def generate(self, z_mu=None):
""" Generate data by sampling from latent space.
If z_mu is not None, data for this point in latent space is generated. Otherwise, z_mu is drawn
from prior in latent space."""
if z_mu is None:
z_mu = np.random.normal(size=self.network_architecture['n_z'])
#Note: This maps to mean of distribution, we could alternateively sample from Gaussian distribution
return self.sess.run(self.x_reconstr_mean, feed_dict={self.z: z_mu})
def reconstruct(self, X):
""" Use VAE to reconstruct given data. """
return self.sess.run(self.x_reconstr_mean, feed_dict={self.x: X})
def get_z(self, X):
return self.sess.run([self.z_mean,self.x_reconstr_mean], feed_dict={self.x: X})
def train(network_architecture, learning_rate=0.001, batch_size=100, training_epochs=10,
display_step=5):
vae = VariationalAutoencoder(network_architecture,learning_rate=learning_rate,batch_size=batch_size)
#Training cycle
saver = tf.train.Saver(max_to_keep=3)
ckpt = tf.train.get_checkpoint_state("./")
if ckpt and ckpt.model_checkpoint_path:
saver.restore(vae.sess, ckpt.model_checkpoint_path)
for epoch in range(training_epochs):
save_path = saver.save(vae.sess, './model.ckpt', epoch)
avg_cost = 0.
total_batch = int(n_samples / batch_size)
# Loop over all batches
for i in range(total_batch):
batch_xs, _ = mnist.train.next_batch(batch_size)
#Fit training using batch data
# cost = vae.partial_fit(batch_xs)
merged = tf.summary.merge_all()
summary, opt, loss = vae.sess.run((merged, vae.optimizer, vae.cost), feed_dict={vae.x: batch_xs})
#Compute Average loss
avg_cost += loss / n_samples * batch_size
b = round(i/total_batch, 2)
print("Epoch:", '%04d' % (epoch+1), "cost=", "{:.9f}".format(avg_cost),\
"Batch=", "{:.9f}".format(b), end='\r')
vae.train_writer.add_summary(summary, epoch)
#Display logs per epoch step
# if epoch % display_step == 0:
# print("Epoch:", '%04d' % (epoch+1), "cost=", "{:.9f}".format(avg_cost))
# print("Epoch:", '%04d' % (epoch+1), "cost=", "{:.9f}".format(avg_cost), end='\r')
return vae
network_architecture = dict(
n_hidden_recog_1 = 512, #1st layer encoder neurosn
n_hidden_recog_2 = 512, #2nd layer encoder neurons
n_hidden_gener_1 = 512, #1st layer decoder neurons
n_hidden_gener_2 = 512, #2nd layer decoder neurons
n_input = 784, #MNIST data input (img shape: 28 * 28)
n_z = 20) #dimensionality of latent space
tf.reset_default_graph()
vae = train(network_architecture, training_epochs=100, batch_size=1024)
x_sample = mnist.test.next_batch(1024)[0]
y_sample = mnist.test.next_batch(1024)[1]
# x_reconstruct = vae.reconstruct(x_sample)
x_reconstruct = vae.reconstruct(x_sample)
plt.figure(figsize=(8, 12))
for i in range(5):
plt.subplot(5, 2, 2*i + 1)
plt.imshow(x_sample[i].reshape(28, 28), vmin=0, vmax=1, cmap="gray")
plt.title("Test input")
plt.colorbar()
plt.subplot(5, 2, 2*i + 2)
plt.imshow(x_reconstruct[i].reshape(28, 28), vmin=0, vmax=1, cmap="gray")
plt.title("Reconstruction")
plt.colorbar()
plt.tight_layout()
x_sample, y_sample = mnist.test.next_batch(1024)
z_mu = vae.transform(x_sample)
z_mu, x_re = vae.get_z(x_sample)
gmm = GaussianMixture(n_components=10,
covariance_type='tied', max_iter=200, random_state=0)
gmm.fit(z_mu)
pred_labels = gmm.predict(z_mu)
kmeans = KMeans(n_clusters=10, random_state=0).fit(z_mu)
kpred_labels = kmeans.labels_
model = TSNE(n_components=3, random_state=0, perplexity=50, learning_rate=500, n_iter=2800)
z_mu = model.fit_transform(z_mu)
fig = plt.figure(3, figsize=(20,20))
ax = fig.add_subplot(311, projection='3d')
bx = fig.add_subplot(312, projection='3d')
kx = fig.add_subplot(313, projection='3d')
# zplot = pd.DataFrame(zplot)
xs = z_mu[:, 0]
ys = z_mu[:, 1]
zs = z_mu[:, 2]
ax.scatter(xs, ys, zs, c=np.array(pred_labels))
bx.scatter(xs, ys, zs, c=np.argmax(y_sample, 1))
kx.scatter(xs, ys, zs, c=np.array(kpred_labels))
ax.set_xlabel('X Label')
ax.set_ylabel('Y Label')
ax.set_zlabel('Z Label')
ax.set_title('GMM Predicted')
bx.set_xlabel('X Label')
bx.set_ylabel('Y Label')
bx.set_zlabel('Z Label')
bx.set_title('Truth')
kx.set_xlabel('X Label')
kx.set_ylabel('Y Label')
kx.set_zlabel('Z Label')
kx.set_title('K-Means Predicted')
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment