Created
April 19, 2017 20:34
-
-
Save usmcamp0811/0e2d959797eb5cde9c3c4e7a595a5dd0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import tensorflow as tf | |
from tensorflow.examples.tutorials.mnist import input_data | |
from sklearn import datasets | |
from sklearn.mixture import GaussianMixture | |
from sklearn.model_selection import StratifiedKFold | |
#%matplotlib qt | |
from sklearn.cluster import KMeans | |
import numpy as np | |
from sklearn.manifold import TSNE | |
import pandas as pd | |
from mpl_toolkits.mplot3d import Axes3D | |
import matplotlib.pyplot as plt | |
np.random.seed(0) | |
tf.set_random_seed(0) | |
mnist = input_data.read_data_sets('MNIST_data', one_hot=True) | |
n_samples = mnist.train.num_examples | |
def xavier_init(fan_in, fan_out, constant=1): | |
""" Xavier initialization of network weights""" | |
# https://stackoverflow.com/questions/33640581/how-to-do-xavier-initialization-on-tensorflow | |
low = -constant*np.sqrt(6.0/(fan_in + fan_out)) | |
high = constant*np.sqrt(6.0/(fan_in + fan_out)) | |
return tf.random_uniform((fan_in, fan_out), | |
minval=low, maxval=high, | |
dtype=tf.float32) | |
class VariationalAutoencoder(object): | |
""" Variation Autoencoder (VAE) with an sklearn-like interface implemented using TensorFlow. | |
This implementation uses probabilistic encoders and decoders using Gaussian | |
distributions and realized by multi-layer perceptrons. The VAE can be learned | |
end-to-end. | |
See "Auto-Encoding Variational Bayes" by Kingma and Welling for more details. | |
""" | |
def __init__(self, network_architecture, transfer_fct=tf.nn.softplus, | |
learning_rate=0.0001, batch_size=100): | |
LOGDIR = '/home/mcamp/PycharmProjects/VAE_tutorials/' | |
GITHUB_URL = 'https://raw.githubusercontent.com/mamcgrath/TensorBoard-TF-Dev-Summit-Tutorial/master/' | |
self.network_architecture = network_architecture | |
self.transfer_fct = transfer_fct | |
self.learning_rate = learning_rate | |
self.batch_size = batch_size | |
# tf Graph input | |
self.x = tf.placeholder(tf.float32, [None, network_architecture["n_input"]]) | |
# Create autoencoder network | |
self._create_network() | |
# Define loss function based variational upper-bound and | |
# corresponding optimizer | |
self._create_loss_optimizer() | |
# Initializing the tensor flow variables | |
init = tf.global_variables_initializer() | |
# Launch the session | |
# self.sess = tf.InteractiveSession() | |
self.sess = tf.Session() | |
self.train_writer = tf.summary.FileWriter('./train', self.sess.graph) | |
# self.test_writer = tf.summary.FileWriter('./test', self.sess.graph) | |
self.sess.run(init) | |
config = tf.contrib.tensorboard.plugins.projector.ProjectorConfig() | |
## You can add multiple embeddings. Here we add only one. | |
z_embedding_config = config.embeddings.add() | |
zm_embedding_config = config.embeddings.add() | |
# in_embedding_config = config.embeddings.add() | |
# out_embedding_config = config.embeddings.add() | |
z_embedding_config.tensor_name = self.z_embedding.name | |
zm_embedding_config.tensor_name = self.zm_embedding.name | |
# in_embedding_config.tensor_name = self.in_embedding.name | |
# out_embedding_config.tensor_name = self.out_embedding.name | |
z_embedding_config.sprite.image_path = LOGDIR + 'sprite_1024.png' | |
zm_embedding_config.sprite.image_path = LOGDIR + 'sprite_1024.png' | |
# in_embedding_config.sprite.image_path = LOGDIR + 'sprite_1024.png' | |
# out_embedding_config.sprite.image_path = LOGDIR + 'sprite_1024.png' | |
z_embedding_config.metadata_path = LOGDIR + 'labels_1024.tsv' | |
zm_embedding_config.metadata_path = LOGDIR + 'labels_1024.tsv' | |
# in_embedding_config.metadata_path = LOGDIR + 'labels_1024.tsv' | |
# out_embedding_config.metadata_path = LOGDIR + 'labels_1024.tsv' | |
# Specify the width and height of a single thumbnail. | |
z_embedding_config.sprite.single_image_dim.extend([28, 28]) | |
zm_embedding_config.sprite.single_image_dim.extend([28, 28]) | |
# in_embedding_config.sprite.single_image_dim.extend([28, 28]) | |
# out_embedding_config.sprite.single_image_dim.extend([28, 28]) | |
tf.contrib.tensorboard.plugins.projector.visualize_embeddings(self.train_writer, config) | |
def _create_network(self): | |
# Initialize autoencode network weights and biases | |
network_weights = self._initialize_weights(**self.network_architecture) | |
# Use recognition network to determine mean and | |
# (log) variance of Gaussian distribution in latent | |
# space | |
self.z_mean, self.z_log_sigma_sq = \ | |
self._recognition_network(network_weights["weights_recog"], | |
network_weights["biases_recog"]) | |
# Draw one sample z from Gaussian distribution | |
n_z = self.network_architecture["n_z"] | |
eps = tf.random_normal((self.batch_size, n_z), 0, 1, | |
dtype=tf.float32) | |
with tf.name_scope('z'): | |
# z = mu + sigma*epsilon | |
self.z = tf.add(self.z_mean, | |
tf.multiply(tf.sqrt(tf.exp(self.z_log_sigma_sq)), eps)) | |
with tf.name_scope('x_reconstr_mean'): | |
# Use generator to determine mean of | |
# Bernoulli distribution of reconstructed input | |
self.x_reconstr_mean = \ | |
self._generator_network(network_weights["weights_gener"], | |
network_weights["biases_gener"]) | |
z_embedding_input = self.z | |
zm_embedding_input = self.z_mean | |
in_embedding_input = self.z_log_sigma_sq | |
out_embedding_input = self.x_reconstr_mean | |
LOGDIR = '/home/mcamp/PycharmProjects/VAE_tutorials/' | |
GITHUB_URL = 'https://raw.githubusercontent.com/mamcgrath/TensorBoard-TF-Dev-Summit-Tutorial/master/' | |
self.z_embedding = tf.Variable(tf.zeros([self.batch_size, self.network_architecture['n_z']]), name="z") | |
self.zm_embedding = tf.Variable(tf.zeros([self.batch_size, self.network_architecture['n_z']]), name="z_mean") | |
# self.in_embedding = tf.Variable(tf.zeros([self.batch_size, 20]), name="in") | |
# self.out_embedding = tf.Variable(tf.zeros([self.batch_size, 784]), name="out") | |
# give it calculated embedding | |
z_assignment = self.z_embedding.assign(z_embedding_input) | |
zm_assignment = self.zm_embedding.assign(zm_embedding_input) | |
# in_assignment = self.in_embedding.assign(in_embedding_input) | |
# out_assignment = self.out_embedding.assign(out_embedding_input) | |
## Format: tensorflow/contrib/tensorboard/plugins/projector/projector_config.proto | |
def _initialize_weights(self, n_hidden_recog_1, n_hidden_recog_2, | |
n_hidden_gener_1, n_hidden_gener_2, | |
n_input, n_z): | |
all_weights = dict() | |
with tf.name_scope('Encoder_Weights'): | |
all_weights['weights_recog'] = { | |
'h1': tf.Variable(xavier_init(n_input, n_hidden_recog_1)), | |
'h2': tf.Variable(xavier_init(n_hidden_recog_1, n_hidden_recog_2)), | |
'out_mean': tf.Variable(xavier_init(n_hidden_recog_2, n_z)), | |
'out_log_sigma': tf.Variable(xavier_init(n_hidden_recog_2, n_z))} | |
all_weights['biases_recog'] = { | |
'b1': tf.Variable(tf.zeros([n_hidden_recog_1], dtype=tf.float32)), | |
'b2': tf.Variable(tf.zeros([n_hidden_recog_2], dtype=tf.float32)), | |
'out_mean': tf.Variable(tf.zeros([n_z], dtype=tf.float32)), | |
'out_log_sigma': tf.Variable(tf.zeros([n_z], dtype=tf.float32))} | |
with tf.name_scope('Decoder_Weights'): | |
all_weights['weights_gener'] = { | |
'h1': tf.Variable(xavier_init(n_z, n_hidden_gener_1)), | |
'h2': tf.Variable(xavier_init(n_hidden_gener_1, n_hidden_gener_2)), | |
'out_mean': tf.Variable(xavier_init(n_hidden_gener_2, n_input)), | |
'out_log_sigma': tf.Variable(xavier_init(n_hidden_gener_2, n_input))} | |
all_weights['biases_gener'] = { | |
'b1': tf.Variable(tf.zeros([n_hidden_gener_1], dtype=tf.float32)), | |
'b2': tf.Variable(tf.zeros([n_hidden_gener_2], dtype=tf.float32)), | |
'out_mean': tf.Variable(tf.zeros([n_input], dtype=tf.float32)), | |
'out_log_sigma': tf.Variable(tf.zeros([n_input], dtype=tf.float32))} | |
return all_weights | |
def _recognition_network(self, weights, biases): | |
# Generate probabilistic encoder (recognition network), which | |
# maps inputs onto a normal distribution in latent space. | |
# The transformation is parametrized and can be learned. | |
with tf.name_scope('recognition_network'): | |
layer_1 = self.transfer_fct(tf.add(tf.matmul(self.x, weights['h1']), | |
biases['b1']), name='EncoderLayer1') | |
layer_2 = self.transfer_fct(tf.add(tf.matmul(layer_1, weights['h2']), | |
biases['b2']), name='EncoderLayer2') | |
z_mean = tf.add(tf.matmul(layer_2, weights['out_mean']), | |
biases['out_mean'], name='EncoderZ_Mean') | |
z_log_sigma_sq = \ | |
tf.add(tf.matmul(layer_2, weights['out_log_sigma']), | |
biases['out_log_sigma'], name='EncoderZ_log_sigma_sq') | |
return (z_mean, z_log_sigma_sq) | |
def _generator_network(self, weights, biases): | |
# Generate probabilistic decoder (decoder network), which | |
# maps points in latent space onto a Bernoulli distribution in data space. | |
# The transformation is parametrized and can be learned. | |
with tf.name_scope('generator_network'): | |
layer_1 = self.transfer_fct(tf.add(tf.matmul(self.z, weights['h1']), | |
biases['b1']), name='DecoderLayer1') | |
layer_2 = self.transfer_fct(tf.add(tf.matmul(layer_1, weights['h2']), | |
biases['b2']), name='DecoderLayer2') | |
x_reconstr_mean = \ | |
tf.nn.sigmoid(tf.add(tf.matmul(layer_2, weights['out_mean']), | |
biases['out_mean']), name='DecoderX_reconstr_mean') | |
return x_reconstr_mean | |
def _create_loss_optimizer(self): | |
# The loss is composed of two terms: | |
# 1.) The reconstruction loss (the negative log probability | |
# of the input under the reconstructed Bernoulli distribution | |
# induced by the decoder in the data space). | |
# This can be interpreted as the number of "nats" required | |
# for reconstructing the input when the activation in latent | |
# is given. | |
# Adding 1e-10 to avoid evaluation of log(0.0) | |
reconstr_loss = \ | |
-tf.reduce_sum(self.x * tf.log(1e-10 + self.x_reconstr_mean) | |
+ (1 - self.x) * tf.log(1e-10 + 1 - self.x_reconstr_mean), | |
1) | |
# 2.) The latent loss, which is defined as the Kullback Leibler divergence | |
## between the distribution in latent space induced by the encoder on | |
# the data and some prior. This acts as a kind of regularizer. | |
# This can be interpreted as the number of "nats" required | |
# for transmitting the the latent space distribution given | |
# the prior. | |
latent_loss = -0.5 * tf.reduce_sum(1 + self.z_log_sigma_sq | |
- tf.square(self.z_mean) | |
- tf.exp(self.z_log_sigma_sq), 1) | |
self.cost = tf.reduce_mean(reconstr_loss + latent_loss) # average over batch | |
# Use ADAM optimizer | |
self.optimizer = \ | |
tf.train.AdamOptimizer(learning_rate=self.learning_rate).minimize(self.cost) | |
tf.summary.scalar('Cost_', self.cost) | |
tf.summary.scalar('Recontr_loss', tf.reduce_mean(reconstr_loss)) | |
# tf.summary.scalar('Latent_loss', latent_loss) | |
def partial_fit(self, X): | |
"""Train model based on mini-batch of input data. | |
Return cost of mini-batch""" | |
opt, cost = self.sess.run((self.optimizer, self.cost), feed_dict={self.x: X}) | |
return cost | |
def transform(self, X): | |
"""Transform data by mapping it into the latent space. | |
Note: This maps to mean of distribution, we could alternatively sample from Gaussian distribution""" | |
return self.sess.run(self.z_mean, feed_dict={self.x: X}) | |
def generate(self, z_mu=None): | |
""" Generate data by sampling from latent space. | |
If z_mu is not None, data for this point in latent space is generated. Otherwise, z_mu is drawn | |
from prior in latent space.""" | |
if z_mu is None: | |
z_mu = np.random.normal(size=self.network_architecture['n_z']) | |
#Note: This maps to mean of distribution, we could alternateively sample from Gaussian distribution | |
return self.sess.run(self.x_reconstr_mean, feed_dict={self.z: z_mu}) | |
def reconstruct(self, X): | |
""" Use VAE to reconstruct given data. """ | |
return self.sess.run(self.x_reconstr_mean, feed_dict={self.x: X}) | |
def get_z(self, X): | |
return self.sess.run([self.z_mean,self.x_reconstr_mean], feed_dict={self.x: X}) | |
def train(network_architecture, learning_rate=0.001, batch_size=100, training_epochs=10, | |
display_step=5): | |
vae = VariationalAutoencoder(network_architecture,learning_rate=learning_rate,batch_size=batch_size) | |
#Training cycle | |
saver = tf.train.Saver(max_to_keep=3) | |
ckpt = tf.train.get_checkpoint_state("./") | |
if ckpt and ckpt.model_checkpoint_path: | |
saver.restore(vae.sess, ckpt.model_checkpoint_path) | |
for epoch in range(training_epochs): | |
save_path = saver.save(vae.sess, './model.ckpt', epoch) | |
avg_cost = 0. | |
total_batch = int(n_samples / batch_size) | |
# Loop over all batches | |
for i in range(total_batch): | |
batch_xs, _ = mnist.train.next_batch(batch_size) | |
#Fit training using batch data | |
# cost = vae.partial_fit(batch_xs) | |
merged = tf.summary.merge_all() | |
summary, opt, loss = vae.sess.run((merged, vae.optimizer, vae.cost), feed_dict={vae.x: batch_xs}) | |
#Compute Average loss | |
avg_cost += loss / n_samples * batch_size | |
b = round(i/total_batch, 2) | |
print("Epoch:", '%04d' % (epoch+1), "cost=", "{:.9f}".format(avg_cost),\ | |
"Batch=", "{:.9f}".format(b), end='\r') | |
vae.train_writer.add_summary(summary, epoch) | |
#Display logs per epoch step | |
# if epoch % display_step == 0: | |
# print("Epoch:", '%04d' % (epoch+1), "cost=", "{:.9f}".format(avg_cost)) | |
# print("Epoch:", '%04d' % (epoch+1), "cost=", "{:.9f}".format(avg_cost), end='\r') | |
return vae | |
network_architecture = dict( | |
n_hidden_recog_1 = 512, #1st layer encoder neurosn | |
n_hidden_recog_2 = 512, #2nd layer encoder neurons | |
n_hidden_gener_1 = 512, #1st layer decoder neurons | |
n_hidden_gener_2 = 512, #2nd layer decoder neurons | |
n_input = 784, #MNIST data input (img shape: 28 * 28) | |
n_z = 20) #dimensionality of latent space | |
tf.reset_default_graph() | |
vae = train(network_architecture, training_epochs=100, batch_size=1024) | |
x_sample = mnist.test.next_batch(1024)[0] | |
y_sample = mnist.test.next_batch(1024)[1] | |
# x_reconstruct = vae.reconstruct(x_sample) | |
x_reconstruct = vae.reconstruct(x_sample) | |
plt.figure(figsize=(8, 12)) | |
for i in range(5): | |
plt.subplot(5, 2, 2*i + 1) | |
plt.imshow(x_sample[i].reshape(28, 28), vmin=0, vmax=1, cmap="gray") | |
plt.title("Test input") | |
plt.colorbar() | |
plt.subplot(5, 2, 2*i + 2) | |
plt.imshow(x_reconstruct[i].reshape(28, 28), vmin=0, vmax=1, cmap="gray") | |
plt.title("Reconstruction") | |
plt.colorbar() | |
plt.tight_layout() | |
x_sample, y_sample = mnist.test.next_batch(1024) | |
z_mu = vae.transform(x_sample) | |
z_mu, x_re = vae.get_z(x_sample) | |
gmm = GaussianMixture(n_components=10, | |
covariance_type='tied', max_iter=200, random_state=0) | |
gmm.fit(z_mu) | |
pred_labels = gmm.predict(z_mu) | |
kmeans = KMeans(n_clusters=10, random_state=0).fit(z_mu) | |
kpred_labels = kmeans.labels_ | |
model = TSNE(n_components=3, random_state=0, perplexity=50, learning_rate=500, n_iter=2800) | |
z_mu = model.fit_transform(z_mu) | |
fig = plt.figure(3, figsize=(20,20)) | |
ax = fig.add_subplot(311, projection='3d') | |
bx = fig.add_subplot(312, projection='3d') | |
kx = fig.add_subplot(313, projection='3d') | |
# zplot = pd.DataFrame(zplot) | |
xs = z_mu[:, 0] | |
ys = z_mu[:, 1] | |
zs = z_mu[:, 2] | |
ax.scatter(xs, ys, zs, c=np.array(pred_labels)) | |
bx.scatter(xs, ys, zs, c=np.argmax(y_sample, 1)) | |
kx.scatter(xs, ys, zs, c=np.array(kpred_labels)) | |
ax.set_xlabel('X Label') | |
ax.set_ylabel('Y Label') | |
ax.set_zlabel('Z Label') | |
ax.set_title('GMM Predicted') | |
bx.set_xlabel('X Label') | |
bx.set_ylabel('Y Label') | |
bx.set_zlabel('Z Label') | |
bx.set_title('Truth') | |
kx.set_xlabel('X Label') | |
kx.set_ylabel('Y Label') | |
kx.set_zlabel('Z Label') | |
kx.set_title('K-Means Predicted') | |
plt.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment