Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import tensorflow as tf
import pandas as pd
import numpy as np
import inspect
from bayes_opt import BayesianOptimization
import shutil
import os
class MNIST_CNN:
def __init__(self, learning_rate, variable_default_stddev, bias_default, keep_prob=1.0):
self.learning_rate = float(learning_rate)
self.variable_default_stddev = float(variable_default_stddev)
self.bias_default = float(bias_default)
self.keep_prob = float(keep_prob)
def _weight_variable(self, shape):
initial = tf.truncated_normal(shape, stddev=self.variable_default_stddev)
return tf.Variable(initial)
def _bias_variable(self, shape):
initial = tf.constant(self.bias_default, shape=shape)
return tf.Variable(initial)
def _convAndPool(self, image, inputChannel, outputChannel):
W_conv = self._weight_variable([5, 5, inputChannel, outputChannel])
b_conv = self._bias_variable([outputChannel])
h_conv = tf.nn.relu(tf.nn.conv2d(image, W_conv, strides=[1, 1, 1, 1], padding="SAME") + b_conv)
return tf.nn.max_pool(h_conv, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding="SAME")
def graph(self):
x = tf.placeholder_with_default(tf.zeros([0, 784], tf.float32), [None, 784])
y = tf.placeholder_with_default(tf.zeros([0, 10], tf.float32), [None, 10])
x_image = tf.reshape(x, [-1,28,28,1])
with tf.name_scope("ConvolutionalLayer1"):
l1 = self._convAndPool(x_image, 1, 32)
with tf.name_scope("ConvolutionalLayer2"):
l2 = self._convAndPool(l1, 32, 64)
with tf.name_scope("DenselyConnectedLayer"):
l2_flat = tf.reshape(l2, [-1, 7*7*64])
W_fc1 = self._weight_variable([7 * 7 * 64, 1024])
b_fc1 = self._bias_variable([1024])
h_fc1 = tf.nn.relu(tf.matmul(l2_flat, W_fc1) + b_fc1)
with tf.name_scope("Dropout"):
h_fc1_drop = tf.nn.dropout(h_fc1, self.keep_prob)
with tf.name_scope("Readout"):
W_fc2 = self._weight_variable([1024, 10])
b_fc2 = self._bias_variable([10])
y_prediction =tf.nn.softmax(tf.matmul(h_fc1_drop, W_fc2) + b_fc2)
prediction = tf.argmax(y_prediction,1)
with tf.name_scope("Optimize"):
y_prediction_clip = tf.clip_by_value(y_prediction, 1e-30, 1.0) # make log(y) not nan
cross_entropy = tf.reduce_mean(-tf.reduce_sum(y * tf.log(y_prediction_clip), reduction_indices=[1]))
train_step = tf.train.AdamOptimizer(self.learning_rate).minimize(cross_entropy, global_step=tf.train.get_or_create_global_step())
with tf.name_scope("Evaluation"):
correct_prediction = tf.equal(tf.argmax(y,1), prediction)
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
tf.summary.scalar("Accuracy", accuracy)
return {
"placeholder": {
"x": x,
"y": y
},
"fetch": {
"train_step": train_step,
"prediction": prediction,
"accuracy": accuracy
}
}
class Batch:
def __init__(self, data, labels):
assert len(data) == len(labels)
self.data = data
self.labels = labels
self._index = 0
def get_next(self, size):
self._index += size
if self._index > len(self.data):
perm = np.arange(len(self.data))
np.random.shuffle(perm)
self.data = self.data[perm]
self.labels = self.labels[perm]
self._index = size
return self.data[self._index-size:self._index], self.labels[self._index-size:self._index]
class MNIST:
def __init__(self, images, labels):
self.images = images
self.labels = labels
def _restore(self, sess, saver, savedir):
ckpt = tf.train.get_checkpoint_state(savedir)
if ckpt:
saver.restore(sess, ckpt.model_checkpoint_path)
def predict(self, savedir, images):
with tf.Graph().as_default():
g = MNIST_CNN(0, 0, 0).graph()
saver = tf.train.Saver()
with tf.Session() as sess:
sess.run(tf.initialize_all_variables())
self._restore(sess, saver, savedir)
return sess.run(g["fetch"]["prediction"], feed_dict={
g["placeholder"]["x"]: list(images),
})
def train(self, learning_rate, variable_default_stddev, bias_default, savedir=None, last_step=100):
test_images = self.images[:500]
test_labels = self.labels[:500]
train_batch = Batch(self.images[500:], self.labels[500:])
tmp_save_dir = "./tmp-ckpt-{}-{}-{}".format(learning_rate, variable_default_stddev, bias_default)
if not savedir:
savedir = tmp_save_dir
with tf.Graph().as_default():
global_step=tf.train.get_or_create_global_step()
g = MNIST_CNN(learning_rate, variable_default_stddev, bias_default).graph()
saver = tf.train.Saver()
hooks = [
tf.train.StopAtStepHook(last_step=last_step)
]
with tf.train.MonitoredTrainingSession(
hooks=hooks,
checkpoint_dir=savedir,
save_checkpoint_secs=300,
save_summaries_secs=60
) as sess:
sess.run(global_step)
while not sess.should_stop():
images, labels = train_batch.get_next(500)
sess.run(g["fetch"]["train_step"], feed_dict={
g["placeholder"]["x"]: list(images),
g["placeholder"]["y"]: list(labels),
})
with tf.Session() as sess:
self._restore(sess, saver, savedir)
if os.path.exists(tmp_save_dir):
shutil.rmtree(tmp_save_dir)
return sess.run(g["fetch"]["accuracy"], feed_dict={
g["placeholder"]["x"]: list(test_images),
g["placeholder"]["y"]: list(test_labels),
})
def main(_):
df_train = pd.read_csv("train.csv")
df_train = df_train.take(np.random.permutation(df_train.index)).reset_index(drop=True)
train_images = df_train.drop(['label'], axis=1).values
train_labels = df_train["label"].map(lambda x: np.identity(10)[x]).values # one hot vector
mnist = MNIST(train_images, train_labels)
learning_rate = 1e-5
variable_default_stddev = 0.1
bias_default = 0.1
savedir = './ckpt-{}-{}-{}'.format(learning_rate, variable_default_stddev, bias_default)
print(mnist.train(learning_rate, variable_default_stddev, bias_default, savedir=savedir, last_step=200000))
print(mnist.predict(savedir, train_images[:10]))
print(df_train["label"][:10].values)
if __name__ == "__main__":
tf.app.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.