Last active
August 24, 2017 15:54
-
-
Save katsugeneration/6fa6bdafb36cc667caea6c54cc7bfed6 to your computer and use it in GitHub Desktop.
TensorFlow Basic CNN
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import tensorflow as tf | |
import reader | |
from datetime import datetime | |
import os.path | |
import time | |
import numpy as np | |
# Tensorflow の cifar10 サンプルの書き換え | |
# 学習率を徐々に下げたり、評価時の重みは移動平均を使ったりしているがそれはやっていない | |
NUM_CLASSES = 10 | |
class CNNModel(): | |
def __init__(self): | |
# internal setting | |
self._optimizer = tf.train.AdamOptimizer() | |
# config | |
self._batch_size = 128 | |
self._max_steps = 60000 | |
# Weight の L2 ロスを全体のロスに含めるためのヘルパー関数 | |
def _variable_with_weight_decay(self, name, shape, stddev, wd): | |
var = tf.get_variable(name, shape, initializer=tf.truncated_normal_initializer(stddev=stddev, dtype=tf.float32)) | |
if wd is not None: | |
weight_decay = tf.mul(tf.nn.l2_loss(var), wd, name='weight_loss') | |
# L2ロスをデフォルトグラフの losses コレクションに追加する | |
tf.add_to_collection('losses', weight_decay) | |
return var | |
def _build_graph(self, images): | |
bias_initializer = tf.constant_initializer(0.0) | |
# conv1 | |
with tf.variable_scope('conv1') as scope: | |
kernel = self._variable_with_weight_decay('weights', shape=[5, 5, 3, self._batch_size], stddev=5e-2, wd=0.0) | |
conv = tf.nn.conv2d(images, kernel, [1, 1, 1, 1], padding='SAME') | |
biases = tf.get_variable('biases', [self._batch_size], initializer=bias_initializer, dtype=tf.float32) | |
bias = tf.nn.bias_add(conv, biases) | |
conv1 = tf.nn.relu(bias, name=scope.name) | |
# pool1 | |
pool1 = tf.nn.max_pool(conv1, ksize=[1, 3, 3, 1], strides=[1, 2, 2, 1], padding='SAME', name='pool1') | |
# norm1 | |
norm1 = tf.nn.lrn(pool1, 4, bias=1.0, alpha=0.001 / 9.0, beta=0.75, name='norm1') | |
# conv2 | |
with tf.variable_scope('conv2') as scope: | |
kernel = self._variable_with_weight_decay('weights', shape=[5, 5, self._batch_size, self._batch_size], stddev=5e-2, wd=0.0) | |
conv = tf.nn.conv2d(norm1, kernel, [1, 1, 1, 1], padding='SAME') | |
biases = tf.get_variable('biases', [self._batch_size], initializer=tf.constant_initializer(0.1), dtype=tf.float32) | |
bias = tf.nn.bias_add(conv, biases) | |
conv2 = tf.nn.relu(bias, name=scope.name) | |
# norm2 | |
norm2 = tf.nn.lrn(conv2, 4, bias=1.0, alpha=0.001 / 9.0, beta=0.75, name='norm2') | |
# pool2 | |
pool2 = tf.nn.max_pool(norm2, ksize=[1, 3, 3, 1], strides=[1, 2, 2, 1], padding='SAME', name='pool2') | |
# local3 | |
with tf.variable_scope('local3') as scope: | |
# Move everything into depth so we can perform a single matrix multiply. | |
reshape = tf.reshape(pool2, [self._batch_size, -1]) | |
dim = reshape.get_shape()[1].value | |
weights = self._variable_with_weight_decay('weights', shape=[dim, 384], stddev=0.04, wd=0.004) | |
biases = tf.get_variable('biases', [384], initializer=tf.constant_initializer(0.1)) | |
local3 = tf.nn.relu(tf.matmul(reshape, weights) + biases, name=scope.name) | |
# local4 | |
with tf.variable_scope('local4') as scope: | |
weights = self._variable_with_weight_decay('weights', shape=[384, 192], stddev=0.04, wd=0.004) | |
biases = tf.get_variable('biases', [192], initializer=tf.constant_initializer(0.1)) | |
local4 = tf.nn.relu(tf.matmul(local3, weights) + biases, name=scope.name) | |
# softmax, i.e. softmax(WX + b) | |
with tf.variable_scope('softmax_linear') as scope: | |
weights = self._variable_with_weight_decay('weights', [192, NUM_CLASSES], stddev=1/192.0, wd=0.0) | |
biases = tf.get_variable('biases', [NUM_CLASSES], initializer=bias_initializer, dtype=tf.float32) | |
softmax_linear = tf.add(tf.matmul(local4, weights), biases, name=scope.name) | |
return softmax_linear | |
def loss(self, logits, labels): | |
# Calculate the average cross entropy loss across the batch. | |
labels = tf.cast(labels, tf.int64) | |
cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits( | |
logits, labels, name='cross_entropy_per_example') | |
cross_entropy_mean = tf.reduce_mean(cross_entropy, name='cross_entropy') | |
tf.add_to_collection('losses', cross_entropy_mean) | |
return tf.add_n(tf.get_collection('losses'), name='total_loss') | |
def train(self, data, session): | |
labels, images = reader.cifar10_train_iterator(data, self._batch_size) | |
logits = self._build_graph(images) | |
loss_op = self.loss(logits, labels) | |
optimize_op = self._optimizer.minimize(loss_op) | |
saver = tf.train.Saver(tf.all_variables()) | |
session.run(tf.initialize_all_variables()) | |
# バッチ化するにあたってキューに貯めたデータを評価毎に取り出す処理を開始する | |
tf.train.start_queue_runners(sess=session) | |
for step in range(self._max_steps): | |
# ミニバッチごとの処理 | |
start_time = time.time() | |
loss, _ = session.run([loss_op, optimize_op]) | |
duration = time.time() - start_time | |
assert not np.isnan(loss), 'Model diverged with loss = NaN' | |
if step % 10 == 0: | |
num_examples_per_step = self._batch_size | |
examples_per_sec = num_examples_per_step / duration | |
sec_per_batch = float(duration) | |
format_str = ('%s: step %d, loss = %.2f (%.1f examples/sec; %.3f ' | |
'sec/batch)') | |
print (format_str % (datetime.now(), step, loss, examples_per_sec, sec_per_batch)) | |
# Save the model checkpoint periodically. | |
if step % 1000 == 0 or (step + 1) == self._max_steps: | |
checkpoint_path = os.path.join('model', 'model.ckpt') | |
saver.save(session, checkpoint_path, global_step=step) | |
def evaluate(self, data, session): | |
# モデルを読み込む前にあらかじめ必要な変数を定義しておく必要がある | |
labels, images = reader.cifar10_eval_iterator(data, self._batch_size) | |
logits = self._build_graph(images) | |
top_1_op = tf.nn.in_top_k(logits, labels, 1) | |
# Load model | |
saver = tf.train.Saver() | |
ckpt = tf.train.get_checkpoint_state('model') | |
saver.restore(session, ckpt.model_checkpoint_path) | |
true_count = 0 | |
tf.train.start_queue_runners(sess=session) | |
for i in range(int(len(data[0]) / self._batch_size)): | |
predictions = session.run(top_1_op) | |
true_count += np.sum(predictions) | |
precision = true_count / self._batch_size # len(data[0]) | |
print('%s: precision @ 1 = %.3f' % (datetime.now(), precision)) | |
def predict(self, data, session): | |
# モデルを読み込む前にあらかじめ必要な変数を定義しておく必要がある | |
labels, images = reader.cifar10_eval_iterator(data, self._batch_size) | |
logits = self._build_graph(images) | |
top_1_op = tf.nn.top_k(logits, 1) | |
# Load model | |
saver = tf.train.Saver() | |
ckpt = tf.train.get_checkpoint_state('model') | |
saver.restore(session, ckpt.model_checkpoint_path) | |
tf.train.start_queue_runners(sess=session) | |
for i in range(int(len(data[0]) / self._batch_size)): | |
predictions, label = session.run([top_1_op, labels]) | |
print(np.reshape(predictions.indices, (self._batch_size, ))) | |
print(label) | |
def main(): | |
print("start CNN") | |
train_data, test_data = reader.cifar10_raw_data("cifar-10-batches-py") | |
# 学習 | |
with tf.Graph().as_default(): | |
model = CNNModel() | |
session = tf.Session() | |
model.train(train_data, session) | |
# 推論 | |
with tf.Graph().as_default(): | |
model = CNNModel() | |
session = tf.Session() | |
model.predict(test_data, session) | |
# 評価 | |
with tf.Graph().as_default(): | |
model = CNNModel() | |
session = tf.Session() | |
model.evaluate(test_data, session) | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import absolute_import | |
from __future__ import division | |
from __future__ import print_function | |
import collections | |
import os | |
import pickle | |
import numpy as np | |
import tensorflow as tf | |
NUM_CLASSES = 10 | |
NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN = 50000 | |
NUM_EXAMPLES_PER_EPOCH_FOR_EVAL = 10000 | |
NUM_HEIGHT = 32 | |
NUM_WIDTH = 32 | |
NUM_DEPTH = 3 | |
# Process images of this size. Note that this differs from the original CIFAR | |
# image size of 32 x 32. If one alters this number, then the entire model | |
# architecture will change and any model would need to be retrained. | |
IMAGE_SIZE = 24 | |
height = IMAGE_SIZE | |
width = IMAGE_SIZE | |
def _read_images(filename): | |
with open(filename, 'rb') as f: | |
data = pickle.load(f, encoding='bytes') | |
return data | |
def _file_to_images(data_path, train=True): | |
if train: | |
filenames = [os.path.join(data_path, 'data_batch_%d' % i) for i in range(1, 6)] | |
else: | |
filenames = [os.path.join(data_path, 'test_batch')] | |
labels = None | |
images = None | |
for file in filenames: | |
data = _read_images(file) | |
labels = np.concatenate([labels, np.array(data[b'labels'])], axis=0) if not labels is None else np.array(data[b'labels']) | |
images = np.concatenate([images, data[b'data']], axis=0) if not images is None else data[b'data'] | |
return (labels, images) | |
def _tranpose_data(label, image): | |
label = tf.cast(label, tf.int32) | |
# reshape image to array | |
image = tf.reshape(image, [NUM_DEPTH, NUM_HEIGHT, NUM_WIDTH]) | |
# Convert from [depth, height, width] to [height, width, depth]. | |
image = tf.transpose(image, [1, 2, 0]) | |
image = tf.cast(image, tf.float32) | |
# Randomly crop a [height, width] section of the image. | |
distorted_image = tf.random_crop(image, [height, width, 3], seed=1) | |
# Randomly flip the image horizontally. | |
distorted_image = tf.image.random_flip_left_right(distorted_image, seed=1) | |
# Because these operations are not commutative, consider randomizing | |
# the order their operation. | |
# なぜかAll 0 になるのでひとまず保留 | |
# distorted_image = tf.image.random_brightness(distorted_image, max_delta=63, seed=1) | |
distorted_image = tf.image.random_contrast(distorted_image, lower=0.2, upper=1.8, seed=1) | |
# Subtract off the mean and divide by the variance of the pixels. | |
float_image = tf.image.per_image_whitening(distorted_image) | |
return (label, float_image) | |
def cifar10_raw_data(data_path="cifar-10-batches-py"): | |
train_data = _file_to_images(data_path, train=True) | |
test_data = _file_to_images(data_path, train=False) | |
return train_data, test_data | |
def cifar10_train_iterator(raw_data, batch_size): | |
# エポック毎に順番をシャッフルしてデータを取得する | |
label, image = tf.train.slice_input_producer([raw_data[0], raw_data[1]], shuffle=True, seed=1) | |
label, image = _tranpose_data(label, image) | |
# データをバッチ化してエンキューする | |
labels, images = tf.train.batch([label, image], batch_size=batch_size) | |
return (labels, images) | |
def cifar10_eval_iterator(raw_data, batch_size): | |
label, image = tf.train.slice_input_producer([raw_data[0], raw_data[1]], shuffle=False) | |
label = tf.cast(label, tf.int32) | |
# reshape image to array | |
image = tf.reshape(image, [NUM_DEPTH, NUM_HEIGHT, NUM_WIDTH]) | |
# Convert from [depth, height, width] to [height, width, depth]. | |
image = tf.transpose(image, [1, 2, 0]) | |
reshaped_image = tf.cast(image, tf.float32) | |
# Image processing for evaluation. | |
# Crop the central [height, width] of the image. | |
resized_image = tf.image.resize_image_with_crop_or_pad(reshaped_image, width, height) | |
# Subtract off the mean and divide by the variance of the pixels. | |
float_image = tf.image.per_image_whitening(resized_image) | |
# データをバッチ化してエンキューする | |
labels, images = tf.train.batch([label, float_image], batch_size=batch_size) | |
return (labels, images) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment