Skip to content

Instantly share code, notes, and snippets.

@llhe
Created May 4, 2017 07:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save llhe/da4e6ac1be65291d30de08a90ecc01e2 to your computer and use it in GitHub Desktop.
Save llhe/da4e6ac1be65291d30de08a90ecc01e2 to your computer and use it in GitHub Desktop.
Benchmark with RDMA
"""Benchmark tensorflow distributed by assigning a tensor between two workers.
Usage:
Start worker 1:
python rdma_bench.py --workers="hostname1:port,hostname2:port" --protocol=grpc+verbs --task 0
Start worker 2:
python rdma_bench.py --workers="hostname1:port,hostname2:port" --protocol=grpc+verbs --task 1
Run the tests:
python rdma_bench.py --workers="hostname1:port,hostname2:port"
"""
import subprocess
import tensorflow as tf
import time
import sys
import os
import signal
flags = tf.flags
flags.DEFINE_integer("iters", 10, "Maximum number of additions")
flags.DEFINE_integer("data_mb", 100, "size of vector in MBs")
flags.DEFINE_string("protocol", "grpc", "transfer protocol to use, 'grpc' or 'grpc+verbs'")
flags.DEFINE_string("workers", "localhost:11000,localhost:11001", "workers")
flags.DEFINE_integer("task", -1, "current task to run")
FLAGS = flags.FLAGS
# setup local cluster from flags
cluster = {"worker": FLAGS.workers.split(',')}
clusterspec = tf.train.ClusterSpec(cluster).as_cluster_def()
def default_config():
optimizer_options = tf.OptimizerOptions(opt_level=tf.OptimizerOptions.L0)
config = tf.ConfigProto(
graph_options=tf.GraphOptions(optimizer_options=optimizer_options))
config.log_device_placement = False
config.allow_soft_placement = False
return config
def create_graph(device1, device2):
"""Create graph that keeps variable on device1 and
vector of ones/addition op on device2"""
tf.reset_default_graph()
dtype=tf.int32
params_size = 250*1000*FLAGS.data_mb # 1MB is 250k integers
with tf.device(device1):
params = tf.get_variable("params", shape=[params_size], dtype=dtype,
initializer=tf.zeros_initializer())
with tf.device(device2):
# constant node gets placed on device1 because of simple_placer
# update = tf.constant(1, shape=[params_size], dtype=dtype)
update = tf.get_variable("update", shape=[params_size], dtype=dtype,
initializer=tf.ones_initializer())
add_op = params.assign(update)
init_op = tf.global_variables_initializer()
return init_op, add_op
def run_benchmark(sess, init_op, add_op):
"""Returns MB/s rate of addition."""
sess.run(init_op)
for i in range(5):
sess.run(add_op.op) # warm-up
start_time = time.time()
for i in range(FLAGS.iters):
# change to add_op.op to make faster
sess.run(add_op.op)
elapsed_time = time.time() - start_time
return float(FLAGS.iters)*FLAGS.data_mb/elapsed_time
def run_benchmark_local():
ops = create_graph(None, None)
sess = tf.Session(config=default_config())
return run_benchmark(sess, *ops)
def run_benchmark_distributed():
ops = create_graph("/job:worker/task:0", "/job:worker/task:1")
sess = tf.Session("grpc://" + cluster['worker'][0], config=default_config())
rate = run_benchmark(sess, *ops)
return rate
if __name__=='__main__':
if FLAGS.task < 0:
rate1 = run_benchmark_local()
rate2 = run_benchmark_distributed()
print("Adding data in %d MB chunks" %(FLAGS.data_mb))
print("Local rate: %.2f MB per second" %(rate1,))
print("Distributed rate: %.2f MB per second" %(rate2,))
else: # Launch TensorFlow server
server = tf.train.Server(clusterspec, config=default_config(),
job_name="worker",
task_index=int(FLAGS.task),
protocol=FLAGS.protocol)
print("exiting...")
server.join()
@byronyi
Copy link

byronyi commented Jul 11, 2017

I just ran in a 40Gbps RoCE setup for my GDR patch, and here's the result:

Adding data in 100 MB chunks
Local rate: 5243.48 MB per second
Distributed rate: 2679.18 MB per second

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment