Skip to content

Instantly share code, notes, and snippets.

@jinyu121
Last active April 18, 2018 06:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jinyu121/49c9ac412ad773e810c5cbabe7f7ed45 to your computer and use it in GitHub Desktop.
Save jinyu121/49c9ac412ad773e810c5cbabe7f7ed45 to your computer and use it in GitHub Desktop.
TensorFlow 图片分类例子2

TensorFlow 分类例子

数据准备

需要把用到的数据转换成 TfRecord 格式。这里以Flower数据集为例。

网络

这里使用 LeNet5 作为分类网络。

更新记录

  1. 原始版
  2. 修正错误
  3. 将输入数据在TensorBoard里进行可视化
  4. 加入自定义网络
  5. 更多的可视化
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import random
from tqdm import tqdm
import numpy as np
import tensorflow as tf
from skimage import io, transform, color, util
flags = tf.app.flags
flags.DEFINE_string(flag_name='directory', default_value='/home/haoyu/Datasets/flowers/flower_photos', docstring='数据地址')
flags.DEFINE_string(flag_name='save_dir', default_value='/home/haoyu/Datasets/flowers/tfrecords', docstring='保存地址')
flags.DEFINE_integer(flag_name='test_size', default_value=350, docstring='测试集大小')
FLAGS = flags.FLAGS
MODES = [tf.estimator.ModeKeys.TRAIN, tf.estimator.ModeKeys.EVAL, tf.estimator.ModeKeys.PREDICT]
def _float_feature(value):
if not isinstance(value, list):
value = [value]
return tf.train.Feature(int64_list=tf.train.FloatList(value=value))
def _int_feature(value):
if not isinstance(value, list):
value = [value]
return tf.train.Feature(int64_list=tf.train.Int64List(value=value))
def _bytes_feature(value):
if not isinstance(value, list):
value = [value]
return tf.train.Feature(bytes_list=tf.train.BytesList(value=value))
def convert_to_tfrecord(mode, anno):
"""转换为TfRecord"""
assert mode in MODES, "模式错误"
filename = os.path.join(FLAGS.save_dir, mode + '.tfrecords')
with tf.python_io.TFRecordWriter(filename) as writer:
for fnm, cls in tqdm(anno):
# 读取图片、转换
img = io.imread(fnm)
img = color.rgb2gray(img)
img = transform.resize(img, [28, 28])
# 获取转换后的信息
if 3 == img.ndim:
rows, cols, depth = img.shape
else:
rows, cols = img.shape
depth = 1
# 创建Example对象
example = tf.train.Example(
features=tf.train.Features(
feature={
'image/height': _int_feature(rows),
'image/width': _int_feature(cols),
'image/depth': _int_feature(depth),
'image/class/label': _int_feature(cls),
'image/encoded': _bytes_feature(img.astype(np.float32).tobytes())
}
)
)
# 序列化并保存
writer.write(example.SerializeToString())
def get_folder_name(folder):
"""不递归,获取特定文件夹下所有文件夹名"""
fs = os.listdir(folder)
fs = [x for x in fs if os.path.isdir(os.path.join(folder, x))]
return sorted(fs)
def get_file_name(folder):
"""不递归,获取特定文件夹下所有文件名"""
fs = os.listdir(folder)
fs = map(lambda x: os.path.join(folder, x), fs)
fs = [x for x in fs if os.path.isfile(x)]
return fs
def get_annotations(directory, classes):
"""获取所有图片路径和标签"""
files = []
labels = []
for ith, val in enumerate(classes):
fi = get_file_name(os.path.join(directory, val))
files.extend(fi)
labels.extend([ith] * len(fi))
assert len(files) == len(labels), "图片和标签数量不等"
# 将图片路径和标签拼合在一起
annotation = [x for x in zip(files, labels)]
# 随机打乱
random.shuffle(annotation)
return annotation
def main(_):
class_names = get_folder_name(FLAGS.directory)
annotation = get_annotations(FLAGS.directory, class_names)
convert_to_tfrecord(tf.estimator.ModeKeys.TRAIN, annotation[FLAGS.test_size:])
convert_to_tfrecord(tf.estimator.ModeKeys.EVAL, annotation[:FLAGS.test_size])
if __name__ == '__main__':
tf.logging.set_verbosity(tf.logging.INFO)
tf.app.run()
# 如何设置学习率如何衰减
learning_rate = tf.train.polynomial_decay(0.001, tf.train.get_or_create_global_step(), 100000, 5e-6, power=4)
tf.summary.scalar('learning_rate', learning_rate)
optimizer = tf.train.MomentumOptimizer(learning_rate=learning_rate, momentum=0.9) # 注意这里
# 如何设置训练
if mode == tf.estimator.ModeKeys.TRAIN:
optimizer = tf.train.MomentumOptimizer(learning_rate=learning_rate, momentum=0.9)
train_op = slim.learning.create_train_op(loss, optimizer, tf.train.get_or_create_global_step()) # 有BatchNorm的时候需要用这个
train_op = optimizer.minimize(loss, tf.train.get_or_create_global_step()) # 没有BatchNorm的时候可以用这个
else:
train_op = None
# 获取并可视化训练精度
accuracy = tf.metrics.accuracy(tf.argmax(labels, axis=1), predictions['classes'], name='accuracy') # Top 1 精度
accuracy_topk = tf.metrics.mean(tf.nn.in_top_k(predictions['probabilities'], tf.argmax(labels, axis=1), 2), name='accuracy_topk') # Top K 精度
metrics = {'test_accuracy': accuracy, 'test_accuracy_topk': accuracy_topk}
# 可视化训练精度
tf.summary.scalar('train_accuracy', accuracy[1])
tf.summary.scalar('train_accuracy_topk', accuracy_topk[1])
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import tensorflow as tf
flags = tf.app.flags
flags.DEFINE_integer(flag_name='batch_size', default_value=128, docstring='Batch 大小')
flags.DEFINE_string(flag_name='data_dir', default_value='/home/haoyu/Datasets/flowers/tfrecords', docstring='数据存放位置')
flags.DEFINE_string(flag_name='model_dir', default_value='/tmp/flower_model', docstring='模型存放位置')
flags.DEFINE_integer(flag_name='steps', default_value=500, docstring='训练步数')
flags.DEFINE_integer(flag_name='classes', default_value=5, docstring='类别数量')
FLAGS = flags.FLAGS
MODES = [tf.estimator.ModeKeys.TRAIN, tf.estimator.ModeKeys.EVAL, tf.estimator.ModeKeys.PREDICT]
def input_fn(mode, batch_size=1):
"""输入函数"""
def parser(serialized_example):
"""如何处理数据集中的每一个数据"""
# 解析单个example对象
features = tf.parse_single_example(
serialized_example,
features={
'image/height': tf.FixedLenFeature([], tf.int64),
'image/width': tf.FixedLenFeature([], tf.int64),
'image/depth': tf.FixedLenFeature([], tf.int64),
'image/encoded': tf.FixedLenFeature([], tf.string),
'image/class/label': tf.FixedLenFeature([], tf.int64),
})
# 获取参数
height = tf.cast(features['image/height'], tf.int32)
width = tf.cast(features['image/width'], tf.int32)
depth = tf.cast(features['image/depth'], tf.int32)
# 还原image
image = tf.decode_raw(features['image/encoded'], tf.float32)
image = tf.reshape(image, [height, width, depth])
image = image - 0.5
# 还原label
label = tf.cast(features['image/class/label'], tf.int32)
return image, tf.one_hot(label, FLAGS.classes)
if mode in MODES:
tfrecords_file = os.path.join(FLAGS.data_dir, mode + '.tfrecords')
else:
raise ValueError("Mode 未知")
assert tf.gfile.Exists(tfrecords_file), ('TFRrecords 文件不存在')
# 创建数据集
dataset = tf.contrib.data.TFRecordDataset([tfrecords_file])
# 创建映射
dataset = dataset.map(parser, num_threads=1, output_buffer_size=batch_size)
# 设置batch
dataset = dataset.batch(batch_size)
# 如果是训练,那么就永久循环下去
if mode == tf.estimator.ModeKeys.TRAIN:
dataset = dataset.repeat()
# 创建迭代器
iterator = dataset.make_one_shot_iterator()
# 获取 feature 和 label
images, labels = iterator.get_next()
return images, labels
def my_model(inputs, mode):
"""写一个网络"""
net = tf.reshape(inputs, [-1, 28, 28, 1])
net = tf.layers.conv2d(net, 32, [5, 5], padding='same', activation=tf.nn.relu)
net = tf.layers.max_pooling2d(net, [2, 2], strides=2)
net = tf.layers.conv2d(net, 64, [5, 5], padding='same', activation=tf.nn.relu)
net = tf.layers.max_pooling2d(net, [2, 2], strides=2)
net = tf.reshape(net, [-1, 7 * 7 * 64])
net = tf.layers.dense(net, 1024, activation=tf.nn.relu)
net = tf.layers.dropout(net, 0.4, training=(mode == tf.estimator.ModeKeys.TRAIN))
net = tf.layers.dense(net, FLAGS.classes)
return net
def my_model_fn(features, labels, mode):
"""模型函数"""
# 可视化输入
tf.summary.image('images', features)
# 创建网络
logits = my_model(features, mode)
predictions = {
'classes': tf.argmax(input=logits, axis=1),
'probabilities': tf.nn.softmax(logits, name='softmax_tensor')
}
# 如果是PREDICT,那么只需要predictions就够了
if mode == tf.estimator.ModeKeys.PREDICT:
return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions)
# 创建Loss
loss = tf.losses.softmax_cross_entropy(onehot_labels=labels, logits=logits, scope='loss')
tf.summary.scalar('train_loss', loss)
# 设置如何训练
if mode == tf.estimator.ModeKeys.TRAIN:
optimizer = tf.train.AdamOptimizer(learning_rate=1e-3)
train_op = optimizer.minimize(loss, tf.train.get_or_create_global_step())
else:
train_op = None
# 获取训练精度
accuracy = tf.metrics.accuracy(
tf.argmax(labels, axis=1), predictions['classes'],
name='accuracy')
accuracy_topk = tf.metrics.mean(
tf.nn.in_top_k(predictions['probabilities'], tf.argmax(labels, axis=1), 2),
name='accuracy_topk')
metrics = {
'test_accuracy': accuracy,
'test_accuracy_topk': accuracy_topk
}
# 可视化训练精度
tf.summary.scalar('train_accuracy', accuracy[1])
tf.summary.scalar('train_accuracy_topk', accuracy_topk[1])
return tf.estimator.EstimatorSpec(
mode=mode,
predictions=predictions,
loss=loss,
train_op=train_op,
eval_metric_ops=metrics)
def main(_):
# 监视器
logging_hook = tf.train.LoggingTensorHook(
every_n_iter=100,
tensors={
'accuracy': 'accuracy/value',
'accuracy_topk': 'accuracy_topk/value',
'loss': 'loss/value'
},
)
# 创建 Estimator
mnist_classifier = tf.estimator.Estimator(
model_fn=my_model_fn,
model_dir=FLAGS.model_dir)
for i in range(20):
# 训练
mnist_classifier.train(
input_fn=lambda: input_fn(tf.estimator.ModeKeys.TRAIN, FLAGS.batch_size),
steps=FLAGS.steps,
hooks=[logging_hook])
# 测试并输出结果
print("=" * 10, "Testing", "=" * 10)
eval_results = mnist_classifier.evaluate(
input_fn=lambda: input_fn(tf.estimator.ModeKeys.EVAL))
print('Evaluation results:\n\t{}'.format(eval_results))
print("=" * 30)
if __name__ == '__main__':
tf.logging.set_verbosity(tf.logging.INFO)
tf.app.run()
# Tiny Yolo
def my_model(inputs, mode):
"""写一个网络"""
net = tf.reshape(inputs, [-1, 224, 224, 3])
net = tf.layers.conv2d(net, 16, [3, 3], padding='same', activation=tf.nn.relu)
net = tf.layers.max_pooling2d(net, [2, 2], strides=2)
net = tf.layers.conv2d(net, 32, [3, 3], padding='same', activation=tf.nn.relu)
net = tf.layers.max_pooling2d(net, [2, 2], strides=2)
net = tf.layers.conv2d(net, 16, [1, 1], padding='same', activation=tf.nn.relu)
net = tf.layers.conv2d(net, 128, [3, 3], padding='same', activation=tf.nn.relu)
net = tf.layers.conv2d(net, 16, [1, 1], padding='same', activation=tf.nn.relu)
net = tf.layers.conv2d(net, 128, [3, 3], padding='same', activation=tf.nn.relu)
net = tf.layers.max_pooling2d(net, [2, 2], strides=2)
net = tf.layers.conv2d(net, 32, [1, 1], padding='same', activation=tf.nn.relu)
net = tf.layers.conv2d(net, 256, [3, 3], padding='same', activation=tf.nn.relu)
net = tf.layers.conv2d(net, 32, [1, 1], padding='same', activation=tf.nn.relu)
net = tf.layers.conv2d(net, 256, [3, 3], padding='same', activation=tf.nn.relu)
net = tf.layers.max_pooling2d(net, [2, 2], strides=2)
net = tf.layers.conv2d(net, 64, [1, 1], padding='same', activation=tf.nn.relu)
net = tf.layers.conv2d(net, 512, [3, 3], padding='same', activation=tf.nn.relu)
net = tf.layers.conv2d(net, 64, [1, 1], padding='same', activation=tf.nn.relu)
net = tf.layers.conv2d(net, 512, [3, 3], padding='same', activation=tf.nn.relu)
net = tf.layers.conv2d(net, 128, [1, 1], padding='same', activation=tf.nn.relu)
net = tf.layers.conv2d(net, 5, [1, 1], padding='same', activation=tf.nn.relu)
_b, _h, _w, _c = net.shape
net = tf.layers.average_pooling2d(net, [_h, _w], 1)
net = tf.reshape(net, [-1, 5])
return net
# Tiny Yolo
# 这个需要把 learning rate 调到 2e-4 左右才能收敛
def my_model(inputs, mode):
with slim.arg_scope([slim.conv2d], padding='SAME',
weights_initializer=tf.truncated_normal_initializer(stddev=0.1),
weights_regularizer=slim.l2_regularizer(0.0005)):
net = tf.reshape(inputs, [-1, 224, 224, 3])
net = slim.conv2d(net, 16, [3, 3])
net = slim.max_pool2d(net, [2, 2])
net = slim.conv2d(net, 32, [3, 3])
net = slim.max_pool2d(net, [2, 2])
net = slim.stack(net, slim.conv2d, [(16, [1, 1]), (128, [3, 3]), (16, [1, 1]), (128, [3, 3])])
net = slim.max_pool2d(net, [2, 2])
net = slim.stack(net, slim.conv2d, [(32, [1, 1]), (256, [3, 3]), (32, [1, 1]), (256, [3, 3])])
net = slim.max_pool2d(net, [2, 2])
net = slim.stack(net, slim.conv2d, [(64, [1, 1]), (512, [3, 3]), (64, [1, 1]), (512, [3, 3])])
net = slim.conv2d(net, 128, [1, 1])
net = slim.conv2d(net, 5, [1, 1], activation_fn=None)
_b, _h, _w, _c = net.shape
net = slim.avg_pool2d(net, [_h, _w], 1)
net = tf.reshape(net, [-1, 5])
return net
# Darknet19
def my_model(inputs, mode):
with slim.arg_scope([slim.conv2d], padding='SAME',
weights_initializer=tf.truncated_normal_initializer(stddev=0.1),
weights_regularizer=slim.l2_regularizer(0.0005),
normalizer_fn=slim.batch_norm,
normalizer_params={'is_training': mode == tf.estimator.ModeKeys.TRAIN}
):
net = tf.reshape(inputs, [-1, 224, 224, 3])
net = slim.conv2d(net, 32, [3, 3])
net = slim.max_pool2d(net, [2, 2])
net = slim.conv2d(net, 64, [3, 3])
net = slim.max_pool2d(net, [2, 2])
net = slim.stack(net, slim.conv2d, [
(128, [3, 3]), (64, [1, 1]), (128, [3, 3])])
net = slim.max_pool2d(net, [2, 2])
net = slim.stack(net, slim.conv2d, [
(256, [3, 3]), (128, [1, 1]), (256, [3, 3])])
net = slim.max_pool2d(net, [2, 2])
net = slim.stack(net, slim.conv2d, [
(512, [3, 3]), (256, [1, 1]), (512, [3, 3]), (256, [1, 1]), (512, [3, 3])])
net = slim.max_pool2d(net, [2, 2])
net = slim.stack(net, slim.conv2d, [
(1024, [3, 3]), (512, [1, 1]), (1024, [3, 3]), (512, [1, 1]), (1024, [3, 3])])
net = slim.conv2d(net, FLAGS.classes, [1, 1])
_b, _h, _w, _c = net.shape
net = slim.avg_pool2d(net, [_h, _w], 1)
net = tf.reshape(net, [-1, FLAGS.classes])
return net
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment