Skip to content

Instantly share code, notes, and snippets.

@springle
Created July 27, 2017 21:50
Show Gist options
  • Save springle/ce04f7015b664df5a4ded6775797e5d9 to your computer and use it in GitHub Desktop.
Save springle/ce04f7015b664df5a4ded6775797e5d9 to your computer and use it in GitHub Desktop.
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Benchmark script for TensorFlow.
See the README for more information.
"""
from __future__ import print_function
import argparse
from collections import defaultdict
import os
import threading
import time
import numpy as np
import six
from six.moves import xrange # pylint: disable=redefined-builtin
import tensorflow as tf
from tensorflow.python.client import timeline
from tensorflow.python.layers import convolutional as conv_layers
from tensorflow.python.layers import core as core_layers
from tensorflow.python.layers import pooling as pooling_layers
from tensorflow.python.ops import data_flow_ops
from tensorflow.python.platform import gfile
import benchmark_storage
import cnn_util
import datasets
import model_config
import preprocessing
import variable_mgr
tf.flags.DEFINE_string('model', 'trivial', 'name of the model to run')
# The code will first check if it's running under benchmarking mode
# or evaluation mode, depending on FLAGS.eval:
# Under the evaluation mode, this script will read a saved model,
# and compute the accuracy of the model against a validation dataset.
# Additional ops for accuracy and top_k predictors are only used under this
# mode.
# Under the benchmarking mode, user can specify whether nor not to use
# the forward-only option, which will only compute the loss function.
# forward-only cannot be enabled with eval at the same time.
tf.flags.DEFINE_boolean('eval', False, 'whether use eval or benchmarking')
tf.flags.DEFINE_boolean('forward_only', False, """whether use forward-only or
training for benchmarking""")
tf.flags.DEFINE_integer('batch_size', 0, 'batch size per compute device')
tf.flags.DEFINE_integer('num_batches', 100,
'number of batches to run, excluding warmup')
tf.flags.DEFINE_integer('num_warmup_batches', None,
'number of batches to run before timing')
tf.flags.DEFINE_integer('autotune_threshold', None,
'The autotune threshold for the models')
tf.flags.DEFINE_integer('num_gpus', 1, 'the number of GPUs to run on')
tf.flags.DEFINE_integer('display_every', 10,
"""Number of local steps after which progress is printed
out""")
tf.flags.DEFINE_string('data_dir', None, """Path to dataset in TFRecord format
(aka Example protobufs). If not specified,
synthetic data will be used.""")
tf.flags.DEFINE_string('data_name', None,
"""Name of dataset: imagenet or flowers.
If not specified, it is automatically guessed
based on --data_dir.""")
tf.flags.DEFINE_string('resize_method', 'bilinear',
"""Method for resizing input images:
crop,nearest,bilinear,bicubic or area.
The 'crop' mode requires source images to be at least
as large as the network input size,
while the other modes support any sizes and apply
random bbox distortions
before resizing (even with --nodistortions).""")
tf.flags.DEFINE_boolean('distortions', True,
"""Enable/disable distortions during
image preprocessing. These include bbox and color
distortions.""")
tf.flags.DEFINE_string('local_parameter_device', 'gpu',
"""Device to use as parameter server: cpu or gpu.
For distributed training, it can affect where caching
of variables happens.""")
tf.flags.DEFINE_string('device', 'gpu',
"""Device to use for computation: cpu or gpu""")
tf.flags.DEFINE_string('data_format', 'NCHW',
"""Data layout to use: NHWC (TF native)
or NCHW (cuDNN native).""")
tf.flags.DEFINE_integer('num_intra_threads', 1,
"""Number of threads to use for intra-op
parallelism. If set to 0, the system will pick
an appropriate number.""")
tf.flags.DEFINE_integer('num_inter_threads', 0,
"""Number of threads to use for inter-op
parallelism. If set to 0, the system will pick
an appropriate number.""")
tf.flags.DEFINE_string('trace_file', None,
"""Enable TensorFlow tracing and write trace to
this file.""")
tf.flags.DEFINE_string('graph_file', None,
"""Write the model's graph definition to this
file. Defaults to binary format unless filename ends
in 'txt'.""")
tf.flags.DEFINE_string('optimizer', 'sgd',
'Optimizer to use: momentum or sgd or rmsprop')
tf.flags.DEFINE_float('learning_rate', None,
"""Initial learning rate for training.""")
tf.flags.DEFINE_float('num_epochs_per_decay', 0,
"""Steps after which learning rate decays.""")
tf.flags.DEFINE_float('learning_rate_decay_factor', 0.94,
"""Learning rate decay factor.""")
tf.flags.DEFINE_float('momentum', 0.9, """Momentum for training.""")
tf.flags.DEFINE_float('rmsprop_decay', 0.9, """Decay term for RMSProp.""")
tf.flags.DEFINE_float('rmsprop_momentum', 0.9, """Momentum in RMSProp.""")
tf.flags.DEFINE_float('rmsprop_epsilon', 1.0, """Epsilon term for RMSProp.""")
tf.flags.DEFINE_float('gradient_clip', None, """Gradient clipping magnitude.
Disabled by default.""")
tf.flags.DEFINE_float('weight_decay', 0.00004,
"""Weight decay factor for training.""")
# Performance tuning flags.
tf.flags.DEFINE_boolean('winograd_nonfused', True,
"""Enable/disable using the Winograd non-fused
algorithms.""")
tf.flags.DEFINE_boolean('sync_on_finish', False,
"""Enable/disable whether the devices are synced after
each step.""")
tf.flags.DEFINE_boolean('staged_vars', False,
"""whether the variables are staged from the main
computation""")
tf.flags.DEFINE_boolean('force_gpu_compatible', True,
"""whether to enable force_gpu_compatible in
GPU_Options""")
# The method for managing variables:
# parameter_server: variables are stored on a parameter server that holds
# the master copy of the variable. In local execution, a local device
# acts as the parameter server for each variable; in distributed
# execution, the parameter servers are separate processes in the cluster.
# For each step, each tower gets a copy of the variables from the
# parameter server, and sends its gradients to the param server.
# replicated: each GPU has its own copy of the variables. To apply gradients,
# nccl all-reduce or regular cross-device aggregation is used to replicate
# the combined gradients to all towers (depending on --use_nccl option).
# independent: each GPU has its own copy of the variables, and gradients are
# not shared between towers. This can be used to check performance when no
# data is moved between GPUs.
# distributed_replicated: Distributed training only. Each GPU has a copy of
# the variables, and updates its copy after the parameter servers are all
# updated with the gradients from all servers. Only works with
# cross_replica_sync=true. Unlike 'replicated', currently never uses
# nccl all-reduce for replicating within a server.
tf.flags.DEFINE_string(
'variable_update', 'parameter_server',
('The method for managing variables: '
'parameter_server, replicated, distributed_replicated, independent'))
tf.flags.DEFINE_boolean(
'use_nccl', True,
'Whether to use nccl all-reduce primitives where possible')
# Distributed training flags.
tf.flags.DEFINE_string('job_name', '',
'One of "ps", "worker", "". Empty for local training')
tf.flags.DEFINE_string('ps_hosts', '', 'Comma-separated list of target hosts')
tf.flags.DEFINE_string('worker_hosts', '',
'Comma-separated list of target hosts')
tf.flags.DEFINE_integer('task_index', 0, 'Index of task within the job')
tf.flags.DEFINE_string('server_protocol', 'grpc', 'protocol for servers')
tf.flags.DEFINE_boolean('cross_replica_sync', True, '')
# Summary and Save & load checkpoints.
tf.flags.DEFINE_integer('summary_verbosity', 0,
"""Verbosity level for summary ops. Pass 0 to disable
both summaries and checkpoints.""")
tf.flags.DEFINE_integer('save_summaries_steps', 0,
"""How often to save summaries for trained models.
Pass 0 to disable summaries.""")
tf.flags.DEFINE_integer('save_model_secs', 0,
"""How often to save trained models. Pass 0 to disable
checkpoints""")
tf.flags.DEFINE_string('train_dir', None,
"""Path to session checkpoints.""")
tf.flags.DEFINE_string('eval_dir', '/tmp/tf_cnn_benchmarks/eval',
"""Directory where to write eval event logs.""")
tf.flags.DEFINE_string('pretrain_dir', None,
"""Path to pretrained session checkpoints.""")
tf.flags.DEFINE_string('result_storage', None,
"""Specifies storage option for benchmark results.
None means results won't be stored.
'cbuild_benchmark_datastore' means results will be stored
in cbuild datastore (note: this option requires special
pemissions and meant to be used from cbuilds).""")
FLAGS = tf.flags.FLAGS
log_fn = print # tf.logging.info
class GlobalStepWatcher(threading.Thread):
"""A helper class for globe_step.
Polls for changes in the global_step of the model, and finishes when the
number of steps for the global run are done.
"""
def __init__(self, sess, global_step_op,
start_at_global_step, end_at_global_step):
threading.Thread.__init__(self)
self.sess = sess
self.global_step_op = global_step_op
self.start_at_global_step = start_at_global_step
self.end_at_global_step = end_at_global_step
self.start_time = 0
self.start_step = 0
self.finish_time = 0
self.finish_step = 0
def run(self):
while self.finish_time == 0:
time.sleep(.25)
global_step_val, = self.sess.run([self.global_step_op])
if self.start_time == 0 and global_step_val >= self.start_at_global_step:
log_fn('Starting real work at step %s at time %s' % (
global_step_val, time.ctime()))
self.start_time = time.time()
self.start_step = global_step_val
if self.finish_time == 0 and global_step_val >= self.end_at_global_step:
log_fn('Finishing real work at step %s at time %s' % (
global_step_val, time.ctime()))
self.finish_time = time.time()
self.finish_step = global_step_val
def done(self):
return self.finish_time > 0
def steps_per_second(self):
return ((self.finish_step - self.start_step) /
(self.finish_time - self.start_time))
class ConvNetBuilder(object):
"""Builder of cnn net."""
def __init__(self,
input_op,
input_nchan,
phase_train,
data_format='NCHW',
data_type=tf.float32):
self.top_layer = input_op
self.top_size = input_nchan
self.phase_train = phase_train
self.data_format = data_format
self.data_type = data_type
self.counts = defaultdict(lambda: 0)
self.use_batch_norm = False
self.batch_norm_config = {} # 'decay': 0.997, 'scale': True}
self.channel_pos = (
'channels_last' if data_format == 'NHWC' else 'channels_first')
def conv(self,
num_out_channels,
k_height,
k_width,
d_height=1,
d_width=1,
mode='SAME',
input_layer=None,
num_channels_in=None,
batch_norm=None,
activation='relu'):
if input_layer is None:
input_layer = self.top_layer
if num_channels_in is None:
num_channels_in = self.top_size
name = 'conv' + str(self.counts['conv'])
self.counts['conv'] += 1
with tf.variable_scope(name):
strides = [1, d_height, d_width, 1]
if self.data_format == 'NCHW':
strides = [strides[0], strides[3], strides[1], strides[2]]
if mode != 'SAME_RESNET':
conv = conv_layers.conv2d(
input_layer,
num_out_channels, [k_height, k_width],
strides=[d_height, d_width],
padding=mode,
data_format=self.channel_pos,
use_bias=False)
else: # Special padding mode for ResNet models
if d_height == 1 and d_width == 1:
conv = conv_layers.conv2d(
input_layer,
num_out_channels, [k_height, k_width],
strides=[d_height, d_width],
padding='SAME',
data_format=self.channel_pos,
use_bias=False)
else:
rate = 1 # Unused (for 'a trous' convolutions)
kernel_size_effective = k_height + (k_width - 1) * (rate - 1)
pad_total = kernel_size_effective - 1
pad_beg = pad_total // 2
pad_end = pad_total - pad_beg
padding = [[0, 0], [pad_beg, pad_end], [pad_beg, pad_end], [0, 0]]
if self.data_format == 'NCHW':
padding = [padding[0], padding[3], padding[1], padding[2]]
input_layer = tf.pad(input_layer, padding)
conv = conv_layers.conv2d(
input_layer,
num_out_channels, [k_height, k_width],
strides=[d_height, d_width],
padding='VALID',
data_format=self.channel_pos,
use_bias=False)
if batch_norm is None:
batch_norm = self.use_batch_norm
if not batch_norm:
biases = tf.get_variable(
'biases', [num_out_channels], self.data_type,
tf.constant_initializer(0.0))
biased = tf.reshape(
tf.nn.bias_add(
conv, biases, data_format=self.data_format),
conv.get_shape())
else:
self.top_layer = conv
self.top_size = num_out_channels
biased = self.batch_norm(**self.batch_norm_config)
if activation == 'relu':
conv1 = tf.nn.relu(biased)
elif activation == 'linear' or activation is None:
conv1 = biased
elif activation == 'tanh':
conv1 = tf.nn.tanh(biased)
else:
raise KeyError('Invalid activation type \'%s\'' % activation)
self.top_layer = conv1
self.top_size = num_out_channels
return conv1
def mpool(self,
k_height,
k_width,
d_height=2,
d_width=2,
mode='VALID',
input_layer=None,
num_channels_in=None):
"""Construct a max pooling layer."""
if input_layer is None:
input_layer = self.top_layer
else:
self.top_size = num_channels_in
name = 'mpool' + str(self.counts['mpool'])
self.counts['mpool'] += 1
pool = pooling_layers.max_pooling2d(
input_layer, [k_height, k_width], [d_height, d_width],
padding=mode,
data_format=self.channel_pos,
name=name)
self.top_layer = pool
return pool
def apool(self,
k_height,
k_width,
d_height=2,
d_width=2,
mode='VALID',
input_layer=None,
num_channels_in=None):
"""Construct an average pooling layer."""
if input_layer is None:
input_layer = self.top_layer
else:
self.top_size = num_channels_in
name = 'apool' + str(self.counts['apool'])
self.counts['apool'] += 1
pool = pooling_layers.average_pooling2d(
input_layer, [k_height, k_width], [d_height, d_width],
padding=mode,
data_format=self.channel_pos,
name=name)
self.top_layer = pool
return pool
def reshape(self, shape, input_layer=None):
if input_layer is None:
input_layer = self.top_layer
self.top_layer = tf.reshape(input_layer, shape)
self.top_size = shape[-1] # HACK This may not always work
return self.top_layer
def affine(self,
num_out_channels,
input_layer=None,
num_channels_in=None,
activation='relu'):
if input_layer is None:
input_layer = self.top_layer
if num_channels_in is None:
num_channels_in = self.top_size
name = 'affine' + str(self.counts['affine'])
self.counts['affine'] += 1
with tf.variable_scope(name):
init_factor = 2. if activation == 'relu' else 1.
kernel = tf.get_variable(
'weights', [num_channels_in, num_out_channels],
self.data_type,
tf.random_normal_initializer(stddev=np.sqrt(init_factor /
(num_channels_in))))
biases = tf.get_variable('biases', [num_out_channels],
self.data_type,
tf.constant_initializer(0.0))
logits = tf.matmul(input_layer, kernel) + biases
if activation == 'relu':
affine1 = tf.nn.relu(logits, name=name)
elif activation == 'linear' or activation is None:
affine1 = logits
else:
raise KeyError('Invalid activation type \'%s\'' % activation)
self.top_layer = affine1
self.top_size = num_out_channels
return affine1
def resnet_bottleneck_v1(self,
depth,
depth_bottleneck,
stride,
input_layer=None,
in_size=None):
if input_layer is None:
input_layer = self.top_layer
if in_size is None:
in_size = self.top_size
name = 'resnet_v1' + str(self.counts['resnet_v1'])
self.counts['resnet_v1'] += 1
with tf.variable_scope(name):
if depth == in_size:
if stride == 1:
shortcut = input_layer
else:
shortcut = self.mpool(
1,
1,
stride,
stride,
input_layer=input_layer,
num_channels_in=in_size)
else:
shortcut = self.conv(
depth,
1,
1,
stride,
stride,
activation=None,
input_layer=input_layer,
num_channels_in=in_size)
self.conv(
depth_bottleneck,
1,
1,
stride,
stride,
input_layer=input_layer,
num_channels_in=in_size)
self.conv(depth_bottleneck, 3, 3, 1, 1, mode='SAME_RESNET')
res = self.conv(depth, 1, 1, 1, 1, activation=None)
output = tf.nn.relu(shortcut + res)
self.top_layer = output
self.top_size = depth
return output
def inception_module(self, name, cols, input_layer=None, in_size=None):
if input_layer is None:
input_layer = self.top_layer
if in_size is None:
in_size = self.top_size
name += str(self.counts[name])
self.counts[name] += 1
with tf.variable_scope(name):
col_layers = []
col_layer_sizes = []
for c, col in enumerate(cols):
col_layers.append([])
col_layer_sizes.append([])
for l, layer in enumerate(col):
ltype, args = layer[0], layer[1:]
kwargs = {
'input_layer': input_layer,
'num_channels_in': in_size
} if l == 0 else {}
if ltype == 'conv':
self.conv(*args, **kwargs)
elif ltype == 'mpool':
self.mpool(*args, **kwargs)
elif ltype == 'apool':
self.apool(*args, **kwargs)
elif ltype == 'share': # Share matching layer from previous column
self.top_layer = col_layers[c - 1][l]
self.top_size = col_layer_sizes[c - 1][l]
else:
raise KeyError('Invalid layer type for inception module: \'%s\'' %
ltype)
col_layers[c].append(self.top_layer)
col_layer_sizes[c].append(self.top_size)
catdim = 3 if self.data_format == 'NHWC' else 1
self.top_layer = tf.concat([layers[-1] for layers in col_layers], catdim)
self.top_size = sum([sizes[-1] for sizes in col_layer_sizes])
return self.top_layer
def residual(self, nout, net, scale=1.0):
inlayer = self.top_layer
net(self)
self.conv(nout, 1, 1, activation=None)
self.top_layer = tf.nn.relu(inlayer + scale * self.top_layer)
def spatial_mean(self, keep_dims=False):
name = 'spatial_mean' + str(self.counts['spatial_mean'])
self.counts['spatial_mean'] += 1
axes = [1, 2] if self.data_format == 'NHWC' else [2, 3]
self.top_layer = tf.reduce_mean(
self.top_layer, axes, keep_dims=keep_dims, name=name)
return self.top_layer
def dropout(self, keep_prob=0.5, input_layer=None):
if input_layer is None:
input_layer = self.top_layer
else:
self.top_size = None
name = 'dropout' + str(self.counts['dropout'])
with tf.variable_scope(name):
if not self.phase_train:
keep_prob = 1.0
dropout = core_layers.dropout(input_layer, keep_prob)
self.top_layer = dropout
return dropout
def batch_norm(self, input_layer=None, **kwargs):
"""Adds a Batch Normalization layer."""
if input_layer is None:
input_layer = self.top_layer
else:
self.top_size = None
name = 'batchnorm' + str(self.counts['batchnorm'])
self.counts['batchnorm'] += 1
with tf.variable_scope(name) as scope:
bn = tf.contrib.layers.batch_norm(
input_layer, is_training=self.phase_train,
fused=True, data_format=self.data_format,
scope=scope, **kwargs)
self.top_layer = bn
return bn
def loss_function(logits, labels):
# global cross_entropy # HACK TESTING
cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(
logits=logits, labels=labels, name='xentropy')
loss = tf.reduce_mean(cross_entropy, name='xentropy_mean')
return loss
def add_image_preprocessing(dataset, input_nchan, image_size, batch_size,
num_compute_devices, input_data_type,
resize_method, train):
"""Add image Preprocessing ops to tf graph."""
if dataset is not None:
preproc_train = preprocessing.ImagePreprocessor(
image_size, image_size, batch_size,
num_compute_devices, input_data_type, train=train,
resize_method=resize_method)
if train:
subset = 'train'
else:
subset = 'validation'
images, labels = preproc_train.minibatch(dataset, subset=subset)
images_splits = images
labels_splits = labels
# Note: We force all datasets to 1000 to ensure even comparison
# This works because we use sparse_softmax_cross_entropy
nclass = 1001
else:
nclass = 1001
input_shape = [batch_size, image_size, image_size, input_nchan]
images = tf.truncated_normal(
input_shape,
dtype=input_data_type,
stddev=1e-1,
name='synthetic_images')
labels = tf.random_uniform(
[batch_size],
minval=1,
maxval=nclass,
dtype=tf.int32,
name='synthetic_labels')
# Note: This results in a H2D copy, but no computation
# Note: This avoids recomputation of the random values, but still
# results in a H2D copy.
images = tf.contrib.framework.local_variable(images, name='images')
labels = tf.contrib.framework.local_variable(labels, name='labels')
# Change to 0-based (don't use background class like Inception does)
labels -= 1
if num_compute_devices == 1:
images_splits = [images]
labels_splits = [labels]
else:
images_splits = tf.split(images, num_compute_devices, 0)
labels_splits = tf.split(labels, num_compute_devices, 0)
return nclass, images_splits, labels_splits
def create_config_proto():
config = tf.ConfigProto()
config.allow_soft_placement = True
config.intra_op_parallelism_threads = FLAGS.num_intra_threads
config.inter_op_parallelism_threads = FLAGS.num_inter_threads
config.gpu_options.force_gpu_compatible = FLAGS.force_gpu_compatible
return config
def get_mode_from_flags():
"""Determine which mode this script is running."""
if FLAGS.forward_only and FLAGS.eval:
raise ValueError('Only one of forward_only and eval flags is true')
if FLAGS.eval:
return 'evaluation'
if FLAGS.forward_only:
return 'forward-only'
return 'training'
def benchmark_one_step(sess, fetches, step, batch_size,
step_train_times, trace_filename, summary_op=None):
"""Advance one step of benchmarking."""
if trace_filename is not None and step == -1:
run_options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE)
run_metadata = tf.RunMetadata()
else:
run_options = None
run_metadata = None
summary_str = None
start_time = time.time()
if summary_op is None:
results = sess.run(fetches, options=run_options, run_metadata=run_metadata)
else:
(results, summary_str) = sess.run(
[fetches, summary_op], options=run_options, run_metadata=run_metadata)
if not FLAGS.forward_only:
lossval = results[1]
else:
lossval = 0.
train_time = time.time() - start_time
step_train_times.append(train_time)
if step >= 0 and (step == 0 or (step + 1) % FLAGS.display_every == 0):
log_fn('%i\t%s\t%.3f' % (
step + 1, get_perf_timing_str(batch_size, step_train_times),
lossval))
if trace_filename is not None and step == -1:
log_fn('Dumping trace to', trace_filename)
trace = timeline.Timeline(step_stats=run_metadata.step_stats)
with open(trace_filename, 'w') as trace_file:
trace_file.write(trace.generate_chrome_trace_format(show_memory=True))
return summary_str
def get_perf_timing_str(batch_size, step_train_times, scale=1):
times = np.array(step_train_times)
speeds = batch_size / times
speed_mean = scale * batch_size / np.mean(times)
if scale == 1:
speed_uncertainty = np.std(speeds) / np.sqrt(float(len(speeds)))
speed_madstd = 1.4826 * np.median(np.abs(speeds - np.median(speeds)))
speed_jitter = speed_madstd
return 'images/sec: %.1f +/- %.1f (jitter = %.1f)' % (
speed_mean, speed_uncertainty, speed_jitter)
else:
return 'images/sec: %.1f' % speed_mean
def load_checkpoint(saver, sess, ckpt_dir):
ckpt = tf.train.get_checkpoint_state(ckpt_dir)
if ckpt and ckpt.model_checkpoint_path:
if os.path.isabs(ckpt.model_checkpoint_path):
# Restores from checkpoint with absolute path.
model_checkpoint_path = ckpt.model_checkpoint_path
else:
# Restores from checkpoint with relative path.
model_checkpoint_path = os.path.join(ckpt_dir, ckpt.model_checkpoint_path)
# Assuming model_checkpoint_path looks something like:
# /my-favorite-path/imagenet_train/model.ckpt-0,
# extract global_step from it.
global_step = ckpt.model_checkpoint_path.split('/')[-1].split('-')[-1]
if not global_step.isdigit():
global_step = 0
else:
global_step = int(global_step)
saver.restore(sess, model_checkpoint_path)
log_fn('Successfully loaded model from %s.' % ckpt.model_checkpoint_path)
return global_step
else:
raise RuntimeError('No checkpoint file found.')
class BenchmarkCNN(object):
"""Class for benchmarking a cnn network."""
def __init__(self):
self.model = FLAGS.model
self.model_conf = model_config.get_model_config(self.model)
self.trace_filename = FLAGS.trace_file
self.data_format = FLAGS.data_format
self.num_batches = FLAGS.num_batches
autotune_threshold = FLAGS.autotune_threshold if (
FLAGS.autotune_threshold) else 1
min_autotune_warmup = 5 * autotune_threshold * autotune_threshold
self.num_warmup_batches = FLAGS.num_warmup_batches if (
FLAGS.num_warmup_batches) else max(10, min_autotune_warmup)
self.graph_file = FLAGS.graph_file
self.resize_method = FLAGS.resize_method
self.sync_queue_counter = 0
self.num_gpus = FLAGS.num_gpus
# Use the batch size from the command line if specified, otherwise use the
# model's default batch size. Scale the benchmark's batch size by the
# number of GPUs.
if FLAGS.batch_size > 0:
self.model_conf.set_batch_size(FLAGS.batch_size)
self.batch_size = self.model_conf.get_batch_size() * FLAGS.num_gpus
# Use the learning rate from the command line if specified, otherwise use
# the model's default learning rate, which must always be set.
assert self.model_conf.get_learning_rate() > 0.0
if FLAGS.learning_rate is not None:
self.model_conf.set_learning_rate(FLAGS.learning_rate)
self.job_name = FLAGS.job_name # "" for local training
self.ps_hosts = FLAGS.ps_hosts.split(',')
self.worker_hosts = FLAGS.worker_hosts.split(',')
self.dataset = None
self.data_name = FLAGS.data_name
if FLAGS.data_dir is not None:
if self.data_name is None:
if 'imagenet' in FLAGS.data_dir:
self.data_name = 'imagenet'
elif 'flowers' in FLAGS.data_dir:
self.data_name = 'flowers'
else:
raise ValueError('Could not identify name of dataset. '
'Please specify with --data_name option.')
if self.data_name == 'imagenet':
self.dataset = datasets.ImagenetData(FLAGS.data_dir)
elif self.data_name == 'flowers':
self.dataset = datasets.FlowersData(FLAGS.data_dir)
else:
raise ValueError('Unknown dataset. Must be one of imagenet or flowers.')
self.local_parameter_device_flag = FLAGS.local_parameter_device
if self.job_name:
self.task_index = FLAGS.task_index
self.cluster = tf.train.ClusterSpec({'ps': self.ps_hosts,
'worker': self.worker_hosts})
self.server = None
if not self.server:
self.server = tf.train.Server(self.cluster, job_name=self.job_name,
task_index=self.task_index,
config=create_config_proto(),
protocol=FLAGS.server_protocol)
worker_prefix = '/job:worker/task:%s' % self.task_index
self.param_server_device = tf.train.replica_device_setter(
worker_device=worker_prefix + '/cpu:0', cluster=self.cluster)
# This device on which the queues for managing synchronization between
# servers should be stored.
num_ps = len(self.ps_hosts)
self.sync_queue_devices = ['/job:ps/task:%s/cpu:0' % i
for i in range(num_ps)]
else:
self.task_index = 0
self.cluster = None
self.server = None
worker_prefix = ''
self.param_server_device = '/%s:0' % FLAGS.local_parameter_device
self.sync_queue_devices = [self.param_server_device]
# Device to use for ops that need to always run on the local worker's CPU.
self.cpu_device = '%s/cpu:0' % worker_prefix
# Device to use for ops that need to always run on the local worker's
# compute device, and never on a parameter server device.
self.raw_devices = ['%s/%s:%i' % (worker_prefix, FLAGS.device, i)
for i in xrange(FLAGS.num_gpus)]
if FLAGS.staged_vars and FLAGS.variable_update != 'parameter_server':
raise ValueError('staged_vars for now is only supported with '
'--variable_update=parameter_server')
if FLAGS.variable_update == 'parameter_server':
if self.job_name:
if not FLAGS.staged_vars:
self.variable_mgr = variable_mgr.VariableMgrDistributedFetchFromPS(
self)
else:
self.variable_mgr = (
variable_mgr.VariableMgrDistributedFetchFromStagedPS(self))
else:
if not FLAGS.staged_vars:
self.variable_mgr = variable_mgr.VariableMgrLocalFetchFromPS(self)
else:
self.variable_mgr = variable_mgr.VariableMgrLocalFetchFromStagedPS(
self)
elif FLAGS.variable_update == 'replicated':
if self.job_name:
raise ValueError('Invalid --variable_update in distributed mode: %s' %
FLAGS.variable_update)
self.variable_mgr = variable_mgr.VariableMgrLocalReplicated(
self, FLAGS.use_nccl)
elif FLAGS.variable_update == 'distributed_replicated':
if not self.job_name:
raise ValueError('Invalid --variable_update in local mode: %s' %
FLAGS.variable_update)
self.variable_mgr = variable_mgr.VariableMgrDistributedReplicated(self)
elif FLAGS.variable_update == 'independent':
if self.job_name:
raise ValueError('Invalid --variable_update in distributed mode: %s' %
FLAGS.variable_update)
self.variable_mgr = variable_mgr.VariableMgrIndependent(self)
else:
raise ValueError('Invalid --variable_update: %s' % FLAGS.variable_update)
# Device to use for running on the local worker's compute device, but
# with variables assigned to parameter server devices.
self.devices = self.variable_mgr.get_devices()
if self.job_name:
self.global_step_device = self.param_server_device
else:
self.global_step_device = self.cpu_device
def print_info(self):
"""Print basic information."""
log_fn('Model: %s' % self.model)
log_fn('Mode: %s' % get_mode_from_flags())
log_fn('Batch size: %s global' % self.batch_size)
log_fn(' %s per device' % (self.batch_size / len(self.devices)))
log_fn('Devices: %s' % self.raw_devices)
log_fn('Data format: %s' % self.data_format)
log_fn('Optimizer: %s' % FLAGS.optimizer)
log_fn('Variables: %s' % FLAGS.variable_update)
if FLAGS.variable_update == 'replicated':
log_fn('Use NCCL: %s' % FLAGS.use_nccl)
if self.job_name:
log_fn('Sync: %s' % FLAGS.cross_replica_sync)
if FLAGS.staged_vars:
log_fn('Staged vars: %s' % FLAGS.staged_vars)
log_fn('==========')
def run(self):
if FLAGS.job_name == 'ps':
log_fn('Running parameter server %s' % self.task_index)
self.server.join()
return
with tf.Graph().as_default():
if FLAGS.eval:
self._eval_cnn()
else:
self._benchmark_cnn()
def _eval_cnn(self):
"""Evaluate the model from a checkpoint using validation dataset."""
(enqueue_ops, fetches) = self._build_model()
saver = tf.train.Saver(tf.global_variables())
summary_writer = tf.summary.FileWriter(FLAGS.eval_dir,
tf.get_default_graph())
target = ''
with tf.Session(target=target, config=create_config_proto()) as sess:
for i in xrange(len(enqueue_ops)):
sess.run(enqueue_ops[:(i+1)])
if FLAGS.train_dir is None:
raise ValueError('Trained model directory not specified')
global_step = load_checkpoint(saver, sess, FLAGS.train_dir)
start_time = time.time()
count_top_1 = 0.0
count_top_5 = 0.0
total_eval_count = self.num_batches * self.batch_size
for step in xrange(self.num_batches):
results = sess.run(fetches)
count_top_1 += results[0]
count_top_5 += results[1]
if (step + 1) % FLAGS.display_every == 0:
duration = time.time() - start_time
examples_per_sec = self.batch_size * self.num_batches / duration
log_fn('%i\t%.1f examples/sec' % (step + 1, examples_per_sec))
start_time = time.time()
precision_at_1 = count_top_1 / total_eval_count
recall_at_5 = count_top_5 / total_eval_count
summary = tf.Summary()
summary.value.add(tag='eval/Accuracy@1', simple_value=precision_at_1)
summary.value.add(tag='eval/Recall@5', simple_value=recall_at_5)
summary_writer.add_summary(summary, global_step)
log_fn('Precision @ 1 = %.4f recall @ 5 = %.4f [%d examples]' %
(precision_at_1, recall_at_5, total_eval_count))
def _benchmark_cnn(self):
"""Run cnn in benchmark mode. When forward_only on, it forwards CNN."""
(enqueue_ops, fetches) = self._build_model()
main_fetch_group = tf.group(*fetches)
execution_barrier = None
if self.job_name and not FLAGS.cross_replica_sync:
execution_barrier = self.add_sync_queues_and_barrier(
'execution_barrier_', [])
global_step = tf.contrib.framework.get_global_step()
with tf.device(self.global_step_device):
with tf.control_dependencies([main_fetch_group]):
inc_global_step = global_step.assign_add(1)
fetches.append(inc_global_step)
if self.job_name and FLAGS.cross_replica_sync:
# Block all replicas until all replicas are ready for next step.
fetches.append(self.add_sync_queues_and_barrier(
'sync_queues_step_end_', [main_fetch_group]))
variable_mgr_post_init_ops = self.variable_mgr.get_post_init_ops()
if variable_mgr_post_init_ops:
post_init_op_group = tf.group(*variable_mgr_post_init_ops)
else:
post_init_op_group = None
local_var_init_op = tf.local_variables_initializer()
summary_op = tf.summary.merge_all()
is_chief = (not self.job_name or self.task_index == 0)
summary_writer = None
if (is_chief and FLAGS.summary_verbosity and
FLAGS.train_dir and
FLAGS.save_summaries_steps > 0):
summary_writer = tf.summary.FileWriter(FLAGS.train_dir,
tf.get_default_graph())
# We run the summaries in the same thread as the training operations by
# passing in None for summary_op to avoid a summary_thread being started.
# Running summaries and training operations in parallel could run out of
# GPU memory.
sv = tf.train.Supervisor(
is_chief=is_chief,
logdir=FLAGS.train_dir,
saver=tf.train.Saver(tf.global_variables()),
global_step=global_step,
summary_op=None,
save_model_secs=FLAGS.save_model_secs,
summary_writer=summary_writer)
step_train_times = []
with sv.managed_session(
master=self.server.target if self.server else '',
config=create_config_proto(),
start_standard_services=FLAGS.summary_verbosity > 0) as sess:
for i in xrange(len(enqueue_ops)):
sess.run(enqueue_ops[:(i+1)])
sess.run(local_var_init_op)
if post_init_op_group:
sess.run(post_init_op_group)
init_global_step = 0
if FLAGS.pretrain_dir is not None:
init_global_step = load_checkpoint(sv.saver, sess, FLAGS.pretrain_dir)
global_step_watcher = GlobalStepWatcher(
sess, global_step,
len(self.worker_hosts) * self.num_warmup_batches + init_global_step,
len(self.worker_hosts) * (
self.num_warmup_batches + self.num_batches) - 1)
global_step_watcher.start()
if self.graph_file is not None:
path, filename = os.path.split(self.graph_file)
as_text = filename.endswith('txt')
log_fn('Writing GraphDef as %s to %s' % (
'text' if as_text else 'binary', self.graph_file))
tf.train.write_graph(sess.graph_def, path, filename, as_text)
log_fn('Running warm up')
local_step = -1 * self.num_warmup_batches
if FLAGS.cross_replica_sync and FLAGS.job_name:
# In cross-replica sync mode, all workers must run the same number of
# local steps, or else the workers running the extra step will block.
done_fn = lambda: local_step == self.num_batches
else:
done_fn = lambda: global_step_watcher.done()
while not done_fn():
if local_step == 0:
log_fn('Done warm up')
if execution_barrier:
log_fn('Waiting for other replicas to finish warm up')
assert global_step_watcher.start_time == 0
sess.run([execution_barrier])
log_fn('Step\tImg/sec\tloss')
assert len(step_train_times) == self.num_warmup_batches
step_train_times = [] # reset to ignore warm up batches
if (summary_writer and
(local_step + 1) % FLAGS.save_summaries_steps == 0):
fetch_summary = summary_op
else:
fetch_summary = None
summary_str = benchmark_one_step(
sess, fetches, local_step, self.batch_size, step_train_times,
self.trace_filename, fetch_summary)
if summary_str is not None and is_chief:
sv.summary_computed(sess, summary_str)
local_step += 1
# Waits for the global step to be done, regardless of done_fn.
while not global_step_watcher.done():
time.sleep(.25)
images_per_sec = global_step_watcher.steps_per_second() * self.batch_size
log_fn('-' * 64)
log_fn('total images/sec: %.2f' % images_per_sec)
log_fn('-' * 64)
if is_chief:
store_benchmarks({'total_images_per_sec': images_per_sec})
# Save the model checkpoint.
if FLAGS.train_dir is not None and is_chief:
checkpoint_path = os.path.join(FLAGS.train_dir, 'model.ckpt')
if not gfile.Exists(FLAGS.train_dir):
gfile.MakeDirs(FLAGS.train_dir)
sv.saver.save(sess, checkpoint_path, global_step)
if execution_barrier:
# Wait for other workers to reach the end, so this worker doesn't
# go away underneath them.
sess.run([execution_barrier])
sv.stop()
def _build_model(self):
"""Build the TensorFlow graph."""
image_size = self.model_conf.get_image_size()
data_type = tf.float32
input_data_type = tf.float32
input_nchan = 3
tf.set_random_seed(1234)
np.random.seed(4321)
phase_train = not (FLAGS.eval or FLAGS.forward_only)
log_fn('Generating model')
losses = []
device_grads = []
all_logits = []
all_top_1_ops = []
all_top_5_ops = []
enqueue_ops = []
gpu_copy_stage_ops = []
gpu_compute_stage_ops = []
gpu_grad_stage_ops = []
use_synthetic_gpu_images = (self.dataset is None)
with tf.device(self.global_step_device):
global_step = tf.contrib.framework.get_or_create_global_step()
# Build the processing and model for the worker.
with tf.device(self.cpu_device):
nclass, images_splits, labels_splits = add_image_preprocessing(
self.dataset, input_nchan, image_size, self.batch_size,
len(self.devices), input_data_type, self.resize_method,
not FLAGS.eval)
update_ops = None
staging_delta_ops = []
for device_num in range(len(self.devices)):
with self.variable_mgr.create_outer_variable_scope(
device_num), tf.name_scope('tower_%i' % device_num) as name_scope:
results = self.add_forward_pass_and_gradients(
images_splits[device_num], labels_splits[device_num], nclass,
phase_train, device_num, input_data_type, data_type, input_nchan,
use_synthetic_gpu_images, gpu_copy_stage_ops, gpu_compute_stage_ops,
gpu_grad_stage_ops)
if phase_train:
losses.append(results[0])
device_grads.append(results[1])
else:
all_logits.append(results[0])
all_top_1_ops.append(results[1])
all_top_5_ops.append(results[2])
if self.variable_mgr.retain_tower_updates(device_num):
# Retain the Batch Normalization updates operations only from the
# first tower. Ideally, we should grab the updates from all towers but
# these stats accumulate extremely fast so we can ignore the other
# stats from the other towers without significant detriment.
update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS, name_scope)
staging_delta_ops = list(self.variable_mgr.staging_delta_ops)
if not update_ops:
update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS, name_scope)
enqueue_ops.append(tf.group(*gpu_copy_stage_ops))
if self.variable_mgr.supports_staged_vars():
for staging_ops in self.variable_mgr.staging_vars_on_devices:
gpu_compute_stage_ops.extend(
[put_op for _, (put_op, _) in six.iteritems(staging_ops)])
enqueue_ops.append(tf.group(*gpu_compute_stage_ops))
if gpu_grad_stage_ops:
staging_delta_ops += gpu_grad_stage_ops
if staging_delta_ops:
enqueue_ops.append(tf.group(*(staging_delta_ops)))
if not phase_train:
if FLAGS.forward_only:
all_logits = tf.concat(all_logits, 0)
fetches = [all_logits] + enqueue_ops
else:
all_top_1_ops = tf.reduce_sum(all_top_1_ops)
all_top_5_ops = tf.reduce_sum(all_top_5_ops)
fetches = [all_top_1_ops, all_top_5_ops] + enqueue_ops
return (enqueue_ops, fetches)
extra_nccl_ops = []
apply_gradient_devices, gradient_state = (
self.variable_mgr.preprocess_device_grads(device_grads))
training_ops = []
for d, device in enumerate(apply_gradient_devices):
with tf.device(device):
total_loss = tf.reduce_mean(losses)
avg_grads = self.variable_mgr.get_gradients_to_apply(d, gradient_state)
gradient_clip = FLAGS.gradient_clip
learning_rate = self.model_conf.get_learning_rate()
if self.dataset and FLAGS.num_epochs_per_decay > 0:
num_batches_per_epoch = (
self.dataset.num_examples_per_epoch() / self.batch_size)
decay_steps = int(num_batches_per_epoch * FLAGS.num_epochs_per_decay)
# Decay the learning rate exponentially based on the number of steps.
learning_rate = tf.train.exponential_decay(
FLAGS.learning_rate, global_step,
decay_steps, FLAGS.learning_rate_decay_factor, staircase=True)
if gradient_clip is not None:
clipped_grads = [
(tf.clip_by_value(grad, -gradient_clip, +gradient_clip), var)
for grad, var in avg_grads
]
else:
clipped_grads = avg_grads
if FLAGS.optimizer == 'momentum':
opt = tf.train.MomentumOptimizer(
learning_rate, FLAGS.momentum, use_nesterov=True)
elif FLAGS.optimizer == 'sgd':
opt = tf.train.GradientDescentOptimizer(learning_rate)
elif FLAGS.optimizer == 'rmsprop':
opt = tf.train.RMSPropOptimizer(learning_rate, FLAGS.rmsprop_decay,
momentum=FLAGS.rmsprop_momentum,
epsilon=FLAGS.rmsprop_epsilon)
else:
raise ValueError('Optimizer "%s" was not recognized', FLAGS.optimizer)
self.variable_mgr.append_apply_gradients_ops(
gradient_state, opt, clipped_grads, training_ops)
train_op = tf.group(*(training_ops + update_ops + extra_nccl_ops))
with tf.device(self.cpu_device):
if self.task_index == 0 and FLAGS.summary_verbosity > 0:
tf.summary.scalar('learning_rate', learning_rate)
tf.summary.scalar('total_loss', total_loss)
for grad, var in avg_grads:
if grad is not None:
tf.summary.histogram(var.op.name + '/gradients', grad)
for var in tf.trainable_variables():
tf.summary.histogram(var.op.name, var)
fetches = [train_op, total_loss] + enqueue_ops
return (enqueue_ops, fetches)
def add_forward_pass_and_gradients(
self, host_images, host_labels, nclass, phase_train, device_num,
input_data_type, data_type, input_nchan, use_synthetic_gpu_images,
gpu_copy_stage_ops, gpu_compute_stage_ops, gpu_grad_stage_ops):
"""Add ops for forward-pass and gradient computations."""
if not use_synthetic_gpu_images:
with tf.device(self.cpu_device):
images_shape = host_images.get_shape()
labels_shape = host_labels.get_shape()
gpu_copy_stage = data_flow_ops.StagingArea(
[tf.float32, tf.int32],
shapes=[images_shape, labels_shape])
gpu_copy_stage_op = gpu_copy_stage.put(
[host_images, host_labels])
gpu_copy_stage_ops.append(gpu_copy_stage_op)
host_images, host_labels = gpu_copy_stage.get()
with tf.device(self.raw_devices[device_num]):
if not use_synthetic_gpu_images:
gpu_compute_stage = data_flow_ops.StagingArea(
[tf.float32, tf.int32],
shapes=[images_shape, labels_shape]
)
# The CPU-to-GPU copy is triggered here.
gpu_compute_stage_op = gpu_compute_stage.put(
[host_images, host_labels])
images, labels = gpu_compute_stage.get()
images = tf.reshape(images, shape=images_shape)
gpu_compute_stage_ops.append(gpu_compute_stage_op)
else:
# Minor hack to avoid H2D copy when using synthetic data
images = tf.truncated_normal(
host_images.get_shape(),
dtype=input_data_type,
stddev=1e-1,
name='synthetic_images')
images = tf.contrib.framework.local_variable(
images, name='gpu_cached_images')
labels = host_labels
with tf.device(self.devices[device_num]):
# Rescale to [0, 1)
images *= 1. / 256
# Rescale to [-1,1] instead of [0, 1)
images = tf.subtract(images, 0.5)
images = tf.multiply(images, 2.0)
if self.data_format == 'NCHW':
images = tf.transpose(images, [0, 3, 1, 2])
if input_data_type != data_type:
images = tf.cast(images, data_type)
network = ConvNetBuilder(
images, input_nchan, phase_train, self.data_format, data_type)
self.model_conf.add_inference(network)
# Add the final fully-connected class layer
logits = network.affine(nclass, activation='linear')
if not phase_train:
top_1_op = tf.reduce_sum(
tf.cast(tf.nn.in_top_k(logits, labels, 1), data_type))
top_5_op = tf.reduce_sum(
tf.cast(tf.nn.in_top_k(logits, labels, 5), data_type))
return (logits, top_1_op, top_5_op)
loss = loss_function(logits, labels)
params = self.variable_mgr.trainable_variables_on_device(device_num)
l2_loss = tf.add_n([tf.nn.l2_loss(v) for v in params])
weight_decay = FLAGS.weight_decay
if weight_decay is not None and weight_decay != 0.:
loss += weight_decay * l2_loss
aggmeth = tf.AggregationMethod.DEFAULT
grads = tf.gradients(loss, params, aggregation_method=aggmeth)
if FLAGS.staged_vars:
grad_dtypes = [grad.dtype for grad in grads]
grad_shapes = [grad.shape for grad in grads]
grad_stage = data_flow_ops.StagingArea(grad_dtypes, grad_shapes)
grad_stage_op = grad_stage.put(grads)
# In general, this decouples the computation of the gradients and
# the updates of the weights.
# During the pipeline warm up, this runs enough training to produce
# the first set of gradients.
gpu_grad_stage_ops.append(grad_stage_op)
grads = grad_stage.get()
param_refs = self.variable_mgr.trainable_variables_on_device(
device_num, writable=True)
gradvars = list(zip(grads, param_refs))
return (loss, gradvars)
def add_sync_queues_and_barrier(self, name_prefix,
enqueue_after_list):
"""Adds ops to enqueue on all worker queues.
Args:
name_prefix: prefixed for the shared_name of ops.
enqueue_after_list: control dependency from ops.
Returns:
an op that should be used as control dependency before starting next step.
"""
self.sync_queue_counter += 1
num_workers = self.cluster.num_tasks('worker')
with tf.device(self.sync_queue_devices[
self.sync_queue_counter % len(self.sync_queue_devices)]):
sync_queues = [
tf.FIFOQueue(num_workers, [tf.bool], shapes=[[]],
shared_name='%s%s' % (name_prefix, i))
for i in range(num_workers)]
queue_ops = []
# For each other worker, add an entry in a queue, signaling that it can
# finish this step.
token = tf.constant(False)
with tf.control_dependencies(enqueue_after_list):
for i, q in enumerate(sync_queues):
if i == self.task_index:
queue_ops.append(tf.no_op())
else:
queue_ops.append(q.enqueue(token))
# Drain tokens off queue for this worker, one for each other worker.
queue_ops.append(
sync_queues[self.task_index].dequeue_many(len(sync_queues) - 1))
return tf.group(*queue_ops)
def store_benchmarks(names_to_values):
if FLAGS.result_storage:
benchmark_storage.store_benchmark(names_to_values, FLAGS.result_storage)
def main(_):
if FLAGS.winograd_nonfused:
os.environ['TF_ENABLE_WINOGRAD_NONFUSED'] = '1'
else:
os.environ.pop('TF_ENABLE_WINOGRAD_NONFUSED', None)
if FLAGS.autotune_threshold:
os.environ['TF_AUTOTUNE_THRESHOLD'] = str(FLAGS.autotune_threshold)
os.environ['TF_SYNC_ON_FINISH'] = str(int(FLAGS.sync_on_finish))
argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
bench = BenchmarkCNN()
tfversion = cnn_util.tensorflow_version_tuple()
log_fn('TensorFlow: %i.%i' % (tfversion[0], tfversion[1]))
bench.print_info()
bench.run()
if __name__ == '__main__':
tf.app.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment