Skip to content

Instantly share code, notes, and snippets.

@sambaiz
Last active July 8, 2018 11:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sambaiz/72888520379cff778d6261cd417a3773 to your computer and use it in GitHub Desktop.
Save sambaiz/72888520379cff778d6261cd417a3773 to your computer and use it in GitHub Desktop.
import tensorflow as tf
import pandas as pd
import numpy as np
import inspect
from bayes_opt import BayesianOptimization
import shutil
import os
class MNIST_CNN:
def __init__(self, learning_rate, variable_default_stddev, bias_default, keep_prob=1.0):
self.learning_rate = float(learning_rate)
self.variable_default_stddev = float(variable_default_stddev)
self.bias_default = float(bias_default)
self.keep_prob = float(keep_prob)
def _weight_variable(self, shape):
initial = tf.truncated_normal(shape, stddev=self.variable_default_stddev)
return tf.Variable(initial)
def _bias_variable(self, shape):
initial = tf.constant(self.bias_default, shape=shape)
return tf.Variable(initial)
def _convAndPool(self, image, inputChannel, outputChannel):
W_conv = self._weight_variable([5, 5, inputChannel, outputChannel])
b_conv = self._bias_variable([outputChannel])
h_conv = tf.nn.relu(tf.nn.conv2d(image, W_conv, strides=[1, 1, 1, 1], padding="SAME") + b_conv)
return tf.nn.max_pool(h_conv, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding="SAME")
def graph(self):
x = tf.placeholder_with_default(tf.zeros([0, 784], tf.float32), [None, 784])
y = tf.placeholder_with_default(tf.zeros([0, 10], tf.float32), [None, 10])
x_image = tf.reshape(x, [-1,28,28,1])
with tf.name_scope("ConvolutionalLayer1"):
l1 = self._convAndPool(x_image, 1, 32)
with tf.name_scope("ConvolutionalLayer2"):
l2 = self._convAndPool(l1, 32, 64)
with tf.name_scope("DenselyConnectedLayer"):
l2_flat = tf.reshape(l2, [-1, 7*7*64])
W_fc1 = self._weight_variable([7 * 7 * 64, 1024])
b_fc1 = self._bias_variable([1024])
h_fc1 = tf.nn.relu(tf.matmul(l2_flat, W_fc1) + b_fc1)
with tf.name_scope("Dropout"):
h_fc1_drop = tf.nn.dropout(h_fc1, self.keep_prob)
with tf.name_scope("Readout"):
W_fc2 = self._weight_variable([1024, 10])
b_fc2 = self._bias_variable([10])
y_prediction =tf.nn.softmax(tf.matmul(h_fc1_drop, W_fc2) + b_fc2)
prediction = tf.argmax(y_prediction,1)
with tf.name_scope("Optimize"):
y_prediction_clip = tf.clip_by_value(y_prediction, 1e-30, 1.0) # make log(y) not nan
cross_entropy = tf.reduce_mean(-tf.reduce_sum(y * tf.log(y_prediction_clip), reduction_indices=[1]))
train_step = tf.train.AdamOptimizer(self.learning_rate).minimize(cross_entropy, global_step=tf.train.get_or_create_global_step())
with tf.name_scope("Evaluation"):
correct_prediction = tf.equal(tf.argmax(y,1), prediction)
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
tf.summary.scalar("Accuracy", accuracy)
return {
"placeholder": {
"x": x,
"y": y
},
"fetch": {
"train_step": train_step,
"prediction": prediction,
"accuracy": accuracy
}
}
class Batch:
def __init__(self, data, labels):
assert len(data) == len(labels)
self.data = data
self.labels = labels
self._index = 0
def get_next(self, size):
self._index += size
if self._index > len(self.data):
perm = np.arange(len(self.data))
np.random.shuffle(perm)
self.data = self.data[perm]
self.labels = self.labels[perm]
self._index = size
return self.data[self._index-size:self._index], self.labels[self._index-size:self._index]
class MNIST:
def __init__(self, images, labels):
self.images = images
self.labels = labels
def _restore(self, sess, saver, savedir):
ckpt = tf.train.get_checkpoint_state(savedir)
if ckpt:
saver.restore(sess, ckpt.model_checkpoint_path)
def predict(self, savedir, images):
with tf.Graph().as_default():
g = MNIST_CNN(0, 0, 0).graph()
saver = tf.train.Saver()
with tf.Session() as sess:
sess.run(tf.initialize_all_variables())
self._restore(sess, saver, savedir)
return sess.run(g["fetch"]["prediction"], feed_dict={
g["placeholder"]["x"]: list(images),
})
def train(self, learning_rate, variable_default_stddev, bias_default, savedir=None, last_step=100):
test_images = self.images[:500]
test_labels = self.labels[:500]
train_batch = Batch(self.images[500:], self.labels[500:])
tmp_save_dir = "./tmp-ckpt-{}-{}-{}".format(learning_rate, variable_default_stddev, bias_default)
if not savedir:
savedir = tmp_save_dir
with tf.Graph().as_default():
global_step=tf.train.get_or_create_global_step()
g = MNIST_CNN(learning_rate, variable_default_stddev, bias_default).graph()
saver = tf.train.Saver()
hooks = [
tf.train.StopAtStepHook(last_step=last_step)
]
with tf.train.MonitoredTrainingSession(
hooks=hooks,
checkpoint_dir=savedir,
save_checkpoint_secs=300,
save_summaries_secs=60
) as sess:
sess.run(global_step)
while not sess.should_stop():
images, labels = train_batch.get_next(500)
sess.run(g["fetch"]["train_step"], feed_dict={
g["placeholder"]["x"]: list(images),
g["placeholder"]["y"]: list(labels),
})
with tf.Session() as sess:
self._restore(sess, saver, savedir)
if os.path.exists(tmp_save_dir):
shutil.rmtree(tmp_save_dir)
return sess.run(g["fetch"]["accuracy"], feed_dict={
g["placeholder"]["x"]: list(test_images),
g["placeholder"]["y"]: list(test_labels),
})
def main(_):
df_train = pd.read_csv("train.csv")
df_train = df_train.take(np.random.permutation(df_train.index)).reset_index(drop=True)
train_images = df_train.drop(['label'], axis=1).values
train_labels = df_train["label"].map(lambda x: np.identity(10)[x]).values # one hot vector
mnist = MNIST(train_images, train_labels)
learning_rate = 1e-5
variable_default_stddev = 0.1
bias_default = 0.1
savedir = './ckpt-{}-{}-{}'.format(learning_rate, variable_default_stddev, bias_default)
print(mnist.train(learning_rate, variable_default_stddev, bias_default, savedir=savedir, last_step=200000))
print(mnist.predict(savedir, train_images[:10]))
print(df_train["label"][:10].values)
if __name__ == "__main__":
tf.app.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment