Skip to content

Instantly share code, notes, and snippets.

@codingneo
Last active March 25, 2017 22:42
Show Gist options
  • Save codingneo/230754d94353e06692b4c4fe49347937 to your computer and use it in GitHub Desktop.
Save codingneo/230754d94353e06692b4c4fe49347937 to your computer and use it in GitHub Desktop.
Experiment with Distributed Tensorflow

ps server

import tensorflow as tf

cluster = tf.train.ClusterSpec({"ps": ['localhost:2222'], "worker": ['localhost:2224','localhost:2225']})
server = tf.train.Server(cluster.as_cluster_def(), job_name='ps', task_index=0)

server.join()

worker 1

import tensorflow as tf
from tensorflow.python.ops import data_flow_ops

cluster = tf.train.ClusterSpec({"ps": ['localhost:2222'], "worker": ['localhost:2224','localhost:2225']})
server = tf.train.Server(cluster.as_cluster_def(), job_name='worker', task_index=0)

with tf.device("/job:ps/task:0"):
test = tf.get_variable("test", [1])

queue = (data_flow_ops.FIFOQueue(2,tf.float32,test.get_shape(),test.name))

enqueue_op = queue.enqueue([test])
last = queue.dequeue_many(2)
init_op = tf.initialize_all_variables()

sv = tf.train.Supervisor(logdir="./log")
sess_config = tf.ConfigProto()
sess = sv.prepare_or_wait_for_session(server.target, sess_config)

sess.run(init_op)

coord = tf.train.Coordinator()
threds = tf.train.start_queue_runners(sess=sess, coord=coord)
sess.run([enqueue_op, last])
// worker 1 should block here until worker 2 execute finished, then it can dequeue 2 values from a shared queue
Output: [None, array([[-0.76494521],
        [-0.11216009]], dtype=float32)]

worker 2

import tensorflow as tf
from tensorflow.python.ops import data_flow_ops

cluster = tf.train.ClusterSpec({"ps": ['localhost:2222'], "worker": ['localhost:2224','localhost:2225']})
server = tf.train.Server(cluster.as_cluster_def(), job_name='worker', task_index=1)

with tf.device("/job:ps/task:0"):
test = tf.get_variable("test", [1])

queue = (data_flow_ops.FIFOQueue(2,tf.float32,test.get_shape(),test.name))

enqueue_op = queue.enqueue([test])
init_op = tf.initialize_all_variables()

sv = tf.train.Supervisor(logdir="./log")
sess_config = tf.ConfigProto()
sess = sv.prepare_or_wait_for_session(server.target, sess_config)

sess.run(init_op)

coord = tf.train.Coordinator()
threds = tf.train.start_queue_runners(sess=sess, coord=coord)
sess.run(enqueue_op, last)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment