Skip to content

Instantly share code, notes, and snippets.

@utahka
Last active July 31, 2018 10:04
Show Gist options
  • Save utahka/af083db1960161b2f639a6299e1bad32 to your computer and use it in GitHub Desktop.
Save utahka/af083db1960161b2f639a6299e1bad32 to your computer and use it in GitHub Desktop.
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
def salt_and_pepper_noise(x, ratio):
x_noise = x.copy()
_, d = x.shape
x_min = x.min()
x_max = x.max()
for i, sample in enumerate(x):
mask = np.random.randint(0, d, ratio)
for m in mask:
if np.random.random() < 0.5:
x_noise[i][m] = x_min
else:
x_noise[i][m] = x_max
return x_noise
class Autoencoder:
def __init__(self, layer_size, n_epochs, batch_size, lr, corr_ratio, random_state,
summary_dir="./dae_logs", model_dir="./dae_model"):
self.rng = random_state
self.layer_size = layer_size
self.n_epochs = n_epochs
self.batch_size = batch_size
self.lr = lr
self.corr_ratio = corr_ratio
self.summary_dir = summary_dir
self.model_dir = model_dir
self.tf_sess = None
def _xavier_init(self, input_size, output_size, const=1.0):
high = const * np.sqrt(6.0 / (input_size + output_size))
low = -high
return tf.random_uniform((input_size, output_size), minval=low, maxval=high)
def _build_model(self, input_size):
# `target_X` is pure data, and `noised_X` is corrupted data.
self.target_X = tf.placeholder(tf.float32, [None, input_size], name="target_X")
self.noised_X = tf.placeholder(tf.float32, [None, input_size], name="noised_X")
# create parameter's Variable
init_W = self._xavier_init(input_size, self.layer_size)
self.W = tf.Variable(init_W, name="W")
self.b_encode = tf.Variable(tf.zeros([self.layer_size]), name="encode_bias")
self.b_decode = tf.Variable(tf.zeros([input_size]), name="decode_bias")
# define encode processing
with tf.name_scope("encode"):
self.encode = tf.nn.sigmoid(tf.matmul(self.noised_X, self.W) + self.b_encode)
# define decode processing
with tf.name_scope("decode"):
W_T = tf.transpose(self.W)
self.decode = tf.nn.sigmoid(tf.matmul(self.encode, W_T) + self.b_decode)
with tf.name_scope("loss"):
self.loss_criterion = - tf.reduce_mean(tf.reduce_sum(self.target_X * tf.log(self.decode)))
with tf.name_scope("loss_summary"):
self.train_loss_summary = tf.summary.scalar("train_loss", self.loss_criterion)
self.valid_loss_summary = tf.summary.scalar("valid_loss", self.loss_criterion)
with tf.name_scope("training_step"):
optimizer = tf.train.GradientDescentOptimizer(self.lr)
self.train_step = optimizer.minimize(self.loss_criterion)
def _gen_batches(self, X, batch_size):
N, _ = X.shape
for start in range(0, N, batch_size):
end = start + batch_size
yield X[start: end]
def _corrupt(self, x, noise_ratio):
x_corrupted = salt_and_pepper_noise(x, noise_ratio)
return x_corrupted
def _fit(self, train_X, valid_X):
_, input_size = train_X.shape
_corr_ratio = np.round(self.corr_ratio * input_size).astype(np.int)
for epoch in range(self.n_epochs):
train_X = shuffle(train_X, random_state=self.rng)
valid_X = shuffle(valid_X, random_state=self.rng)
# create minibatch generator
train_batches = [_ for _ in self._gen_batches(train_X, self.batch_size)]
valid_batches = [_ for _ in self._gen_batches(valid_X, self.batch_size)]
# Training step
train_loss_hist = []
for target_X in train_batches:
noised_X = self._corrupt(target_X, _corr_ratio)
_, train_loss, train_loss_summary = self.tf_sess.run([
self.train_step,
self.loss_criterion,
self.train_loss_summary,
], feed_dict={
self.noised_X: noised_X,
self.target_X: target_X,
})
train_loss_hist.append(train_loss)
train_loss_mean = np.mean(train_loss_hist)
# Validation step
valid_loss_hist = []
for target_X in valid_batches:
valid_loss, valid_loss_summary = self.tf_sess.run([
self.loss_criterion,
self.valid_loss_summary,
], feed_dict={
self.noised_X: target_X,
self.target_X: target_X,
})
valid_loss_hist.append(valid_loss)
valid_loss_mean = np.mean(valid_loss_hist)
# Write Summary
self.writer.add_summary(train_loss_summary, epoch+1)
self.writer.add_summary(valid_loss_summary, epoch+1)
if epoch == 0 or (epoch + 1) % 10 == 0:
print(f"epoch: {epoch+1}, train_loss: {train_loss:.2f}, valid_loss: {valid_loss:.2f}")
def fit(self, train_X, valid_X):
_, input_size = train_X.shape
self._build_model(input_size)
with tf.Session() as self.tf_sess:
self.tf_sess.run(tf.global_variables_initializer())
# create writer
if tf.gfile.Exists(self.summary_dir):
tf.gfile.DeleteRecursively(self.summary_dir)
self.writer = tf.summary.FileWriter(self.summary_dir, self.tf_sess.graph)
# create model saver
self.tf_saver = tf.train.Saver(var_list={
"W": self.W,
"encode_bias": self.b_encode,
"decode_bias": self.b_decode,
})
self._fit(train_X, valid_X)
# serialize
self.tf_saver.save(self.tf_sess, self.model_dir)
def _check_is_fitted(self, attributes):
if not isinstance(attributes, (list, tuple)):
attributes = [attributes]
if not all([hasattr(self, attr) for attr in attributes]):
raise NotFittedError("this Autoencoder instance is not fitted.")
def transform(self, X):
self._check_is_fitted(["W", "b_encode"])
with tf.Session() as self.tf_sess:
self.tf_sess.run(tf.global_variables_initializer())
self.tf_saver.restore(self.tf_sess, self.model_dir)
return self.tf_sess.run(self.encode, feed_dict={self.noised_X: X,})
if __name__ == "__main__":
random_state = np.random.RandomState(123)
mnist = input_data.read_data_sets('MNIST_data/', one_hot=True)
X, y = mnist.train.images, mnist.train.labels
train_X, valid_X, train_y, valid_y = train_test_split(X, y, test_size=0., random_state=random_state)
model = Autoencoder(layer_size=256, lr=0.001, n_epochs=100, batch_size=256,
corr_ratio=0.1, random_state=np.random.RandomState(123),
summary_dir="./dae_logs", model_dir="./model/dae")
model.fit(train_X, valid_X)
encoded_X = model.transform(train_X)
n = 10
plt.figure(figsize=(20, 2))
for i, img in enumerate(encoded_X[:10]):
ax = plt.subplot(1, n, i+1)
plt.imshow(img.reshape(16, 16))
plt.gray()
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
plt.show()
from DenoisingAutoencoder import Autoencoder
class Dense(Autoencoder):
def __init__(self, input_size, layer_size, activation):
super().__init__(input_size=input_size, layer_size=layer_size, activation=activation,
lr=0.001, n_epochs=100, batch_size=256,
corr_ratio=0.1, random_state=np.random.RandomState(123),
summary_dir="./sda_logs", model_dir="./model/sda")
class StackedAutoencoder:
def __init__(self, layers, batch_size, n_epochs, lr, summary_dir, random_state):
self.layers = layers
self.batch_size = batch_size
self.input_size = layers[0].input_size
self.n_epochs = n_epochs
self.lr = lr
self.summary_dir = summary_dir
self.random_state = random_state
self.tf_sess = None
self.layer_params = {}
def _forward(self, X):
for i, layer in enumerate(self.layers):
if len(self.layers) == i + 1:
layer._build_model()
W, b_encode, _ = layer.get_params(check_fit=False)
self.layer_params["W_{i}"] = W
self.layer_params["b_encode_{i}"] = b_encode
activation = layer.activation
X = activation(tf.matmul(X, W) + b_encode)
return X
def _pretraining(self, train_X, valid_X):
_train_X = np.copy(train_X)
_valid_X = np.copy(valid_X)
for i, layer in enumerate(self.layers[:-1]):
print(f"[Pretraining: layer{i+1}]")
layer.fit(_train_X, _valid_X)
_train_X = layer.transform(_train_X)
_valid_X = layer.transform(_valid_X)
def _build_model(self):
self.X = tf.placeholder(tf.float32, [None, self.input_size], name="X")
self.t = tf.placeholder(tf.float32)
with tf.name_scope("forward"):
self.y = self._forward(self.X)
with tf.name_scope("loss"):
self.loss_criterion = -tf.reduce_mean(tf.reduce_sum(self.t * tf.log(self.y), 1))
with tf.name_scope("loss_summary"):
self.train_loss_summary = tf.summary.scalar("train_loss", self.loss_criterion)
self.valid_loss_summary = tf.summary.scalar("valid_loss", self.loss_criterion)
with tf.name_scope("training_step"):
optimizer = tf.train.GradientDescentOptimizer(self.lr)
self.train_step = optimizer.minimize(self.loss_criterion)
def _gen_batches(self, X, y, batch_size):
N, _ = X.shape
for start in range(0, N, batch_size):
end = start + batch_size
yield X[start: end], y[start: end]
def _fit(self, train_X, train_y, valid_X, valid_y):
input_size = self.input_size
for epoch in range(self.n_epochs):
train_X, train_y = shuffle(train_X, train_y, random_state=self.random_state)
valid_X, valid_y = shuffle(valid_X, valid_y, random_state=self.random_state)
# create minibatch generator
train_batches = [_ for _ in self._gen_batches(train_X, train_y, self.batch_size)]
valid_batches = [_ for _ in self._gen_batches(valid_X, valid_y, self.batch_size)]
# Training step
train_loss_hist = []
for train_batch_X, train_batch_y in train_batches:
_, train_loss, train_loss_summary = self.tf_sess.run([
self.train_step,
self.loss_criterion,
self.train_loss_summary,
], feed_dict={
self.X: train_batch_X,
self.t: train_batch_y,
})
train_loss_hist.append(train_loss)
train_loss_mean = np.mean(train_loss_hist)
# Validation step
valid_loss_hist = []
for valid_batch_X, valid_batch_y in valid_batches:
valid_loss, valid_loss_summary = self.tf_sess.run([
self.loss_criterion,
self.valid_loss_summary,
], feed_dict={
self.X: valid_batch_X,
self.t: valid_batch_y,
})
valid_loss_hist.append(valid_loss)
valid_loss_mean = np.mean(valid_loss_hist)
# Write Summary
self.writer.add_summary(train_loss_summary, epoch+1)
self.writer.add_summary(valid_loss_summary, epoch+1)
if epoch == 0 or (epoch + 1) % 10 == 0:
print(f"epoch: {epoch+1}, train_loss: {train_loss:.2f}, valid_loss: {valid_loss:.2f}")
def fit(self, train_X, train_y, valid_X, valid_y, pretrain=True):
if pretrain:
self._pretraining(train_X, valid_X)
self._build_model()
with tf.Session() as self.tf_sess:
self.tf_sess.run(tf.global_variables_initializer())
# create writer
if tf.gfile.Exists(self.summary_dir):
tf.gfile.DeleteRecursively(self.summary_dir)
self.writer = tf.summary.FileWriter(self.summary_dir, self.tf_sess.graph)
self._fit(train_X, train_y, valid_X, valid_y)
if __name__ == "__main__":
layers = [
Dense(784, 256, tf.nn.sigmoid),
Dense(256, 256, tf.nn.sigmoid),
Dense(256, 10, tf.nn.softmax),
]
model = StackedAutoencoder(layers, batch_size=256, n_epochs=100, lr=0.01,
summary_dir="./sda_logs", random_state=random_state)
model.fit(train_X, train_y, valid_X, valid_y)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment