Last active
July 31, 2018 10:04
-
-
Save utahka/af083db1960161b2f639a6299e1bad32 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from sklearn.model_selection import train_test_split | |
from sklearn.utils import shuffle | |
import tensorflow as tf | |
from tensorflow.examples.tutorials.mnist import input_data | |
def salt_and_pepper_noise(x, ratio): | |
x_noise = x.copy() | |
_, d = x.shape | |
x_min = x.min() | |
x_max = x.max() | |
for i, sample in enumerate(x): | |
mask = np.random.randint(0, d, ratio) | |
for m in mask: | |
if np.random.random() < 0.5: | |
x_noise[i][m] = x_min | |
else: | |
x_noise[i][m] = x_max | |
return x_noise | |
class Autoencoder: | |
def __init__(self, layer_size, n_epochs, batch_size, lr, corr_ratio, random_state, | |
summary_dir="./dae_logs", model_dir="./dae_model"): | |
self.rng = random_state | |
self.layer_size = layer_size | |
self.n_epochs = n_epochs | |
self.batch_size = batch_size | |
self.lr = lr | |
self.corr_ratio = corr_ratio | |
self.summary_dir = summary_dir | |
self.model_dir = model_dir | |
self.tf_sess = None | |
def _xavier_init(self, input_size, output_size, const=1.0): | |
high = const * np.sqrt(6.0 / (input_size + output_size)) | |
low = -high | |
return tf.random_uniform((input_size, output_size), minval=low, maxval=high) | |
def _build_model(self, input_size): | |
# `target_X` is pure data, and `noised_X` is corrupted data. | |
self.target_X = tf.placeholder(tf.float32, [None, input_size], name="target_X") | |
self.noised_X = tf.placeholder(tf.float32, [None, input_size], name="noised_X") | |
# create parameter's Variable | |
init_W = self._xavier_init(input_size, self.layer_size) | |
self.W = tf.Variable(init_W, name="W") | |
self.b_encode = tf.Variable(tf.zeros([self.layer_size]), name="encode_bias") | |
self.b_decode = tf.Variable(tf.zeros([input_size]), name="decode_bias") | |
# define encode processing | |
with tf.name_scope("encode"): | |
self.encode = tf.nn.sigmoid(tf.matmul(self.noised_X, self.W) + self.b_encode) | |
# define decode processing | |
with tf.name_scope("decode"): | |
W_T = tf.transpose(self.W) | |
self.decode = tf.nn.sigmoid(tf.matmul(self.encode, W_T) + self.b_decode) | |
with tf.name_scope("loss"): | |
self.loss_criterion = - tf.reduce_mean(tf.reduce_sum(self.target_X * tf.log(self.decode))) | |
with tf.name_scope("loss_summary"): | |
self.train_loss_summary = tf.summary.scalar("train_loss", self.loss_criterion) | |
self.valid_loss_summary = tf.summary.scalar("valid_loss", self.loss_criterion) | |
with tf.name_scope("training_step"): | |
optimizer = tf.train.GradientDescentOptimizer(self.lr) | |
self.train_step = optimizer.minimize(self.loss_criterion) | |
def _gen_batches(self, X, batch_size): | |
N, _ = X.shape | |
for start in range(0, N, batch_size): | |
end = start + batch_size | |
yield X[start: end] | |
def _corrupt(self, x, noise_ratio): | |
x_corrupted = salt_and_pepper_noise(x, noise_ratio) | |
return x_corrupted | |
def _fit(self, train_X, valid_X): | |
_, input_size = train_X.shape | |
_corr_ratio = np.round(self.corr_ratio * input_size).astype(np.int) | |
for epoch in range(self.n_epochs): | |
train_X = shuffle(train_X, random_state=self.rng) | |
valid_X = shuffle(valid_X, random_state=self.rng) | |
# create minibatch generator | |
train_batches = [_ for _ in self._gen_batches(train_X, self.batch_size)] | |
valid_batches = [_ for _ in self._gen_batches(valid_X, self.batch_size)] | |
# Training step | |
train_loss_hist = [] | |
for target_X in train_batches: | |
noised_X = self._corrupt(target_X, _corr_ratio) | |
_, train_loss, train_loss_summary = self.tf_sess.run([ | |
self.train_step, | |
self.loss_criterion, | |
self.train_loss_summary, | |
], feed_dict={ | |
self.noised_X: noised_X, | |
self.target_X: target_X, | |
}) | |
train_loss_hist.append(train_loss) | |
train_loss_mean = np.mean(train_loss_hist) | |
# Validation step | |
valid_loss_hist = [] | |
for target_X in valid_batches: | |
valid_loss, valid_loss_summary = self.tf_sess.run([ | |
self.loss_criterion, | |
self.valid_loss_summary, | |
], feed_dict={ | |
self.noised_X: target_X, | |
self.target_X: target_X, | |
}) | |
valid_loss_hist.append(valid_loss) | |
valid_loss_mean = np.mean(valid_loss_hist) | |
# Write Summary | |
self.writer.add_summary(train_loss_summary, epoch+1) | |
self.writer.add_summary(valid_loss_summary, epoch+1) | |
if epoch == 0 or (epoch + 1) % 10 == 0: | |
print(f"epoch: {epoch+1}, train_loss: {train_loss:.2f}, valid_loss: {valid_loss:.2f}") | |
def fit(self, train_X, valid_X): | |
_, input_size = train_X.shape | |
self._build_model(input_size) | |
with tf.Session() as self.tf_sess: | |
self.tf_sess.run(tf.global_variables_initializer()) | |
# create writer | |
if tf.gfile.Exists(self.summary_dir): | |
tf.gfile.DeleteRecursively(self.summary_dir) | |
self.writer = tf.summary.FileWriter(self.summary_dir, self.tf_sess.graph) | |
# create model saver | |
self.tf_saver = tf.train.Saver(var_list={ | |
"W": self.W, | |
"encode_bias": self.b_encode, | |
"decode_bias": self.b_decode, | |
}) | |
self._fit(train_X, valid_X) | |
# serialize | |
self.tf_saver.save(self.tf_sess, self.model_dir) | |
def _check_is_fitted(self, attributes): | |
if not isinstance(attributes, (list, tuple)): | |
attributes = [attributes] | |
if not all([hasattr(self, attr) for attr in attributes]): | |
raise NotFittedError("this Autoencoder instance is not fitted.") | |
def transform(self, X): | |
self._check_is_fitted(["W", "b_encode"]) | |
with tf.Session() as self.tf_sess: | |
self.tf_sess.run(tf.global_variables_initializer()) | |
self.tf_saver.restore(self.tf_sess, self.model_dir) | |
return self.tf_sess.run(self.encode, feed_dict={self.noised_X: X,}) | |
if __name__ == "__main__": | |
random_state = np.random.RandomState(123) | |
mnist = input_data.read_data_sets('MNIST_data/', one_hot=True) | |
X, y = mnist.train.images, mnist.train.labels | |
train_X, valid_X, train_y, valid_y = train_test_split(X, y, test_size=0., random_state=random_state) | |
model = Autoencoder(layer_size=256, lr=0.001, n_epochs=100, batch_size=256, | |
corr_ratio=0.1, random_state=np.random.RandomState(123), | |
summary_dir="./dae_logs", model_dir="./model/dae") | |
model.fit(train_X, valid_X) | |
encoded_X = model.transform(train_X) | |
n = 10 | |
plt.figure(figsize=(20, 2)) | |
for i, img in enumerate(encoded_X[:10]): | |
ax = plt.subplot(1, n, i+1) | |
plt.imshow(img.reshape(16, 16)) | |
plt.gray() | |
ax.get_xaxis().set_visible(False) | |
ax.get_yaxis().set_visible(False) | |
plt.show() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from DenoisingAutoencoder import Autoencoder | |
class Dense(Autoencoder): | |
def __init__(self, input_size, layer_size, activation): | |
super().__init__(input_size=input_size, layer_size=layer_size, activation=activation, | |
lr=0.001, n_epochs=100, batch_size=256, | |
corr_ratio=0.1, random_state=np.random.RandomState(123), | |
summary_dir="./sda_logs", model_dir="./model/sda") | |
class StackedAutoencoder: | |
def __init__(self, layers, batch_size, n_epochs, lr, summary_dir, random_state): | |
self.layers = layers | |
self.batch_size = batch_size | |
self.input_size = layers[0].input_size | |
self.n_epochs = n_epochs | |
self.lr = lr | |
self.summary_dir = summary_dir | |
self.random_state = random_state | |
self.tf_sess = None | |
self.layer_params = {} | |
def _forward(self, X): | |
for i, layer in enumerate(self.layers): | |
if len(self.layers) == i + 1: | |
layer._build_model() | |
W, b_encode, _ = layer.get_params(check_fit=False) | |
self.layer_params["W_{i}"] = W | |
self.layer_params["b_encode_{i}"] = b_encode | |
activation = layer.activation | |
X = activation(tf.matmul(X, W) + b_encode) | |
return X | |
def _pretraining(self, train_X, valid_X): | |
_train_X = np.copy(train_X) | |
_valid_X = np.copy(valid_X) | |
for i, layer in enumerate(self.layers[:-1]): | |
print(f"[Pretraining: layer{i+1}]") | |
layer.fit(_train_X, _valid_X) | |
_train_X = layer.transform(_train_X) | |
_valid_X = layer.transform(_valid_X) | |
def _build_model(self): | |
self.X = tf.placeholder(tf.float32, [None, self.input_size], name="X") | |
self.t = tf.placeholder(tf.float32) | |
with tf.name_scope("forward"): | |
self.y = self._forward(self.X) | |
with tf.name_scope("loss"): | |
self.loss_criterion = -tf.reduce_mean(tf.reduce_sum(self.t * tf.log(self.y), 1)) | |
with tf.name_scope("loss_summary"): | |
self.train_loss_summary = tf.summary.scalar("train_loss", self.loss_criterion) | |
self.valid_loss_summary = tf.summary.scalar("valid_loss", self.loss_criterion) | |
with tf.name_scope("training_step"): | |
optimizer = tf.train.GradientDescentOptimizer(self.lr) | |
self.train_step = optimizer.minimize(self.loss_criterion) | |
def _gen_batches(self, X, y, batch_size): | |
N, _ = X.shape | |
for start in range(0, N, batch_size): | |
end = start + batch_size | |
yield X[start: end], y[start: end] | |
def _fit(self, train_X, train_y, valid_X, valid_y): | |
input_size = self.input_size | |
for epoch in range(self.n_epochs): | |
train_X, train_y = shuffle(train_X, train_y, random_state=self.random_state) | |
valid_X, valid_y = shuffle(valid_X, valid_y, random_state=self.random_state) | |
# create minibatch generator | |
train_batches = [_ for _ in self._gen_batches(train_X, train_y, self.batch_size)] | |
valid_batches = [_ for _ in self._gen_batches(valid_X, valid_y, self.batch_size)] | |
# Training step | |
train_loss_hist = [] | |
for train_batch_X, train_batch_y in train_batches: | |
_, train_loss, train_loss_summary = self.tf_sess.run([ | |
self.train_step, | |
self.loss_criterion, | |
self.train_loss_summary, | |
], feed_dict={ | |
self.X: train_batch_X, | |
self.t: train_batch_y, | |
}) | |
train_loss_hist.append(train_loss) | |
train_loss_mean = np.mean(train_loss_hist) | |
# Validation step | |
valid_loss_hist = [] | |
for valid_batch_X, valid_batch_y in valid_batches: | |
valid_loss, valid_loss_summary = self.tf_sess.run([ | |
self.loss_criterion, | |
self.valid_loss_summary, | |
], feed_dict={ | |
self.X: valid_batch_X, | |
self.t: valid_batch_y, | |
}) | |
valid_loss_hist.append(valid_loss) | |
valid_loss_mean = np.mean(valid_loss_hist) | |
# Write Summary | |
self.writer.add_summary(train_loss_summary, epoch+1) | |
self.writer.add_summary(valid_loss_summary, epoch+1) | |
if epoch == 0 or (epoch + 1) % 10 == 0: | |
print(f"epoch: {epoch+1}, train_loss: {train_loss:.2f}, valid_loss: {valid_loss:.2f}") | |
def fit(self, train_X, train_y, valid_X, valid_y, pretrain=True): | |
if pretrain: | |
self._pretraining(train_X, valid_X) | |
self._build_model() | |
with tf.Session() as self.tf_sess: | |
self.tf_sess.run(tf.global_variables_initializer()) | |
# create writer | |
if tf.gfile.Exists(self.summary_dir): | |
tf.gfile.DeleteRecursively(self.summary_dir) | |
self.writer = tf.summary.FileWriter(self.summary_dir, self.tf_sess.graph) | |
self._fit(train_X, train_y, valid_X, valid_y) | |
if __name__ == "__main__": | |
layers = [ | |
Dense(784, 256, tf.nn.sigmoid), | |
Dense(256, 256, tf.nn.sigmoid), | |
Dense(256, 10, tf.nn.softmax), | |
] | |
model = StackedAutoencoder(layers, batch_size=256, n_epochs=100, lr=0.01, | |
summary_dir="./sda_logs", random_state=random_state) | |
model.fit(train_X, train_y, valid_X, valid_y) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment