Skip to content

Instantly share code, notes, and snippets.

@katsugeneration
Last active August 24, 2017 15:54
Show Gist options
  • Save katsugeneration/6fa6bdafb36cc667caea6c54cc7bfed6 to your computer and use it in GitHub Desktop.
Save katsugeneration/6fa6bdafb36cc667caea6c54cc7bfed6 to your computer and use it in GitHub Desktop.
TensorFlow Basic CNN
import tensorflow as tf
import reader
from datetime import datetime
import os.path
import time
import numpy as np
# Tensorflow の cifar10 サンプルの書き換え
# 学習率を徐々に下げたり、評価時の重みは移動平均を使ったりしているがそれはやっていない
NUM_CLASSES = 10
class CNNModel():
def __init__(self):
# internal setting
self._optimizer = tf.train.AdamOptimizer()
# config
self._batch_size = 128
self._max_steps = 60000
# Weight の L2 ロスを全体のロスに含めるためのヘルパー関数
def _variable_with_weight_decay(self, name, shape, stddev, wd):
var = tf.get_variable(name, shape, initializer=tf.truncated_normal_initializer(stddev=stddev, dtype=tf.float32))
if wd is not None:
weight_decay = tf.mul(tf.nn.l2_loss(var), wd, name='weight_loss')
# L2ロスをデフォルトグラフの losses コレクションに追加する
tf.add_to_collection('losses', weight_decay)
return var
def _build_graph(self, images):
bias_initializer = tf.constant_initializer(0.0)
# conv1
with tf.variable_scope('conv1') as scope:
kernel = self._variable_with_weight_decay('weights', shape=[5, 5, 3, self._batch_size], stddev=5e-2, wd=0.0)
conv = tf.nn.conv2d(images, kernel, [1, 1, 1, 1], padding='SAME')
biases = tf.get_variable('biases', [self._batch_size], initializer=bias_initializer, dtype=tf.float32)
bias = tf.nn.bias_add(conv, biases)
conv1 = tf.nn.relu(bias, name=scope.name)
# pool1
pool1 = tf.nn.max_pool(conv1, ksize=[1, 3, 3, 1], strides=[1, 2, 2, 1], padding='SAME', name='pool1')
# norm1
norm1 = tf.nn.lrn(pool1, 4, bias=1.0, alpha=0.001 / 9.0, beta=0.75, name='norm1')
# conv2
with tf.variable_scope('conv2') as scope:
kernel = self._variable_with_weight_decay('weights', shape=[5, 5, self._batch_size, self._batch_size], stddev=5e-2, wd=0.0)
conv = tf.nn.conv2d(norm1, kernel, [1, 1, 1, 1], padding='SAME')
biases = tf.get_variable('biases', [self._batch_size], initializer=tf.constant_initializer(0.1), dtype=tf.float32)
bias = tf.nn.bias_add(conv, biases)
conv2 = tf.nn.relu(bias, name=scope.name)
# norm2
norm2 = tf.nn.lrn(conv2, 4, bias=1.0, alpha=0.001 / 9.0, beta=0.75, name='norm2')
# pool2
pool2 = tf.nn.max_pool(norm2, ksize=[1, 3, 3, 1], strides=[1, 2, 2, 1], padding='SAME', name='pool2')
# local3
with tf.variable_scope('local3') as scope:
# Move everything into depth so we can perform a single matrix multiply.
reshape = tf.reshape(pool2, [self._batch_size, -1])
dim = reshape.get_shape()[1].value
weights = self._variable_with_weight_decay('weights', shape=[dim, 384], stddev=0.04, wd=0.004)
biases = tf.get_variable('biases', [384], initializer=tf.constant_initializer(0.1))
local3 = tf.nn.relu(tf.matmul(reshape, weights) + biases, name=scope.name)
# local4
with tf.variable_scope('local4') as scope:
weights = self._variable_with_weight_decay('weights', shape=[384, 192], stddev=0.04, wd=0.004)
biases = tf.get_variable('biases', [192], initializer=tf.constant_initializer(0.1))
local4 = tf.nn.relu(tf.matmul(local3, weights) + biases, name=scope.name)
# softmax, i.e. softmax(WX + b)
with tf.variable_scope('softmax_linear') as scope:
weights = self._variable_with_weight_decay('weights', [192, NUM_CLASSES], stddev=1/192.0, wd=0.0)
biases = tf.get_variable('biases', [NUM_CLASSES], initializer=bias_initializer, dtype=tf.float32)
softmax_linear = tf.add(tf.matmul(local4, weights), biases, name=scope.name)
return softmax_linear
def loss(self, logits, labels):
# Calculate the average cross entropy loss across the batch.
labels = tf.cast(labels, tf.int64)
cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(
logits, labels, name='cross_entropy_per_example')
cross_entropy_mean = tf.reduce_mean(cross_entropy, name='cross_entropy')
tf.add_to_collection('losses', cross_entropy_mean)
return tf.add_n(tf.get_collection('losses'), name='total_loss')
def train(self, data, session):
labels, images = reader.cifar10_train_iterator(data, self._batch_size)
logits = self._build_graph(images)
loss_op = self.loss(logits, labels)
optimize_op = self._optimizer.minimize(loss_op)
saver = tf.train.Saver(tf.all_variables())
session.run(tf.initialize_all_variables())
# バッチ化するにあたってキューに貯めたデータを評価毎に取り出す処理を開始する
tf.train.start_queue_runners(sess=session)
for step in range(self._max_steps):
# ミニバッチごとの処理
start_time = time.time()
loss, _ = session.run([loss_op, optimize_op])
duration = time.time() - start_time
assert not np.isnan(loss), 'Model diverged with loss = NaN'
if step % 10 == 0:
num_examples_per_step = self._batch_size
examples_per_sec = num_examples_per_step / duration
sec_per_batch = float(duration)
format_str = ('%s: step %d, loss = %.2f (%.1f examples/sec; %.3f '
'sec/batch)')
print (format_str % (datetime.now(), step, loss, examples_per_sec, sec_per_batch))
# Save the model checkpoint periodically.
if step % 1000 == 0 or (step + 1) == self._max_steps:
checkpoint_path = os.path.join('model', 'model.ckpt')
saver.save(session, checkpoint_path, global_step=step)
def evaluate(self, data, session):
# モデルを読み込む前にあらかじめ必要な変数を定義しておく必要がある
labels, images = reader.cifar10_eval_iterator(data, self._batch_size)
logits = self._build_graph(images)
top_1_op = tf.nn.in_top_k(logits, labels, 1)
# Load model
saver = tf.train.Saver()
ckpt = tf.train.get_checkpoint_state('model')
saver.restore(session, ckpt.model_checkpoint_path)
true_count = 0
tf.train.start_queue_runners(sess=session)
for i in range(int(len(data[0]) / self._batch_size)):
predictions = session.run(top_1_op)
true_count += np.sum(predictions)
precision = true_count / self._batch_size # len(data[0])
print('%s: precision @ 1 = %.3f' % (datetime.now(), precision))
def predict(self, data, session):
# モデルを読み込む前にあらかじめ必要な変数を定義しておく必要がある
labels, images = reader.cifar10_eval_iterator(data, self._batch_size)
logits = self._build_graph(images)
top_1_op = tf.nn.top_k(logits, 1)
# Load model
saver = tf.train.Saver()
ckpt = tf.train.get_checkpoint_state('model')
saver.restore(session, ckpt.model_checkpoint_path)
tf.train.start_queue_runners(sess=session)
for i in range(int(len(data[0]) / self._batch_size)):
predictions, label = session.run([top_1_op, labels])
print(np.reshape(predictions.indices, (self._batch_size, )))
print(label)
def main():
print("start CNN")
train_data, test_data = reader.cifar10_raw_data("cifar-10-batches-py")
# 学習
with tf.Graph().as_default():
model = CNNModel()
session = tf.Session()
model.train(train_data, session)
# 推論
with tf.Graph().as_default():
model = CNNModel()
session = tf.Session()
model.predict(test_data, session)
# 評価
with tf.Graph().as_default():
model = CNNModel()
session = tf.Session()
model.evaluate(test_data, session)
if __name__ == '__main__':
main()
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import collections
import os
import pickle
import numpy as np
import tensorflow as tf
NUM_CLASSES = 10
NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN = 50000
NUM_EXAMPLES_PER_EPOCH_FOR_EVAL = 10000
NUM_HEIGHT = 32
NUM_WIDTH = 32
NUM_DEPTH = 3
# Process images of this size. Note that this differs from the original CIFAR
# image size of 32 x 32. If one alters this number, then the entire model
# architecture will change and any model would need to be retrained.
IMAGE_SIZE = 24
height = IMAGE_SIZE
width = IMAGE_SIZE
def _read_images(filename):
with open(filename, 'rb') as f:
data = pickle.load(f, encoding='bytes')
return data
def _file_to_images(data_path, train=True):
if train:
filenames = [os.path.join(data_path, 'data_batch_%d' % i) for i in range(1, 6)]
else:
filenames = [os.path.join(data_path, 'test_batch')]
labels = None
images = None
for file in filenames:
data = _read_images(file)
labels = np.concatenate([labels, np.array(data[b'labels'])], axis=0) if not labels is None else np.array(data[b'labels'])
images = np.concatenate([images, data[b'data']], axis=0) if not images is None else data[b'data']
return (labels, images)
def _tranpose_data(label, image):
label = tf.cast(label, tf.int32)
# reshape image to array
image = tf.reshape(image, [NUM_DEPTH, NUM_HEIGHT, NUM_WIDTH])
# Convert from [depth, height, width] to [height, width, depth].
image = tf.transpose(image, [1, 2, 0])
image = tf.cast(image, tf.float32)
# Randomly crop a [height, width] section of the image.
distorted_image = tf.random_crop(image, [height, width, 3], seed=1)
# Randomly flip the image horizontally.
distorted_image = tf.image.random_flip_left_right(distorted_image, seed=1)
# Because these operations are not commutative, consider randomizing
# the order their operation.
# なぜかAll 0 になるのでひとまず保留
# distorted_image = tf.image.random_brightness(distorted_image, max_delta=63, seed=1)
distorted_image = tf.image.random_contrast(distorted_image, lower=0.2, upper=1.8, seed=1)
# Subtract off the mean and divide by the variance of the pixels.
float_image = tf.image.per_image_whitening(distorted_image)
return (label, float_image)
def cifar10_raw_data(data_path="cifar-10-batches-py"):
train_data = _file_to_images(data_path, train=True)
test_data = _file_to_images(data_path, train=False)
return train_data, test_data
def cifar10_train_iterator(raw_data, batch_size):
# エポック毎に順番をシャッフルしてデータを取得する
label, image = tf.train.slice_input_producer([raw_data[0], raw_data[1]], shuffle=True, seed=1)
label, image = _tranpose_data(label, image)
# データをバッチ化してエンキューする
labels, images = tf.train.batch([label, image], batch_size=batch_size)
return (labels, images)
def cifar10_eval_iterator(raw_data, batch_size):
label, image = tf.train.slice_input_producer([raw_data[0], raw_data[1]], shuffle=False)
label = tf.cast(label, tf.int32)
# reshape image to array
image = tf.reshape(image, [NUM_DEPTH, NUM_HEIGHT, NUM_WIDTH])
# Convert from [depth, height, width] to [height, width, depth].
image = tf.transpose(image, [1, 2, 0])
reshaped_image = tf.cast(image, tf.float32)
# Image processing for evaluation.
# Crop the central [height, width] of the image.
resized_image = tf.image.resize_image_with_crop_or_pad(reshaped_image, width, height)
# Subtract off the mean and divide by the variance of the pixels.
float_image = tf.image.per_image_whitening(resized_image)
# データをバッチ化してエンキューする
labels, images = tf.train.batch([label, float_image], batch_size=batch_size)
return (labels, images)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment