import tensorflow as tf
cluster = tf.train.ClusterSpec({"ps": ['localhost:2222'], "worker": ['localhost:2224','localhost:2225']})
server = tf.train.Server(cluster.as_cluster_def(), job_name='ps', task_index=0)
server.join()
import tensorflow as tf
from tensorflow.python.ops import data_flow_ops
cluster = tf.train.ClusterSpec({"ps": ['localhost:2222'], "worker": ['localhost:2224','localhost:2225']})
server = tf.train.Server(cluster.as_cluster_def(), job_name='worker', task_index=0)
with tf.device("/job:ps/task:0"):
test = tf.get_variable("test", [1])
queue = (data_flow_ops.FIFOQueue(2,tf.float32,test.get_shape(),test.name))
enqueue_op = queue.enqueue([test])
last = queue.dequeue_many(2)
init_op = tf.initialize_all_variables()
sv = tf.train.Supervisor(logdir="./log")
sess_config = tf.ConfigProto()
sess = sv.prepare_or_wait_for_session(server.target, sess_config)
sess.run(init_op)
coord = tf.train.Coordinator()
threds = tf.train.start_queue_runners(sess=sess, coord=coord)
sess.run([enqueue_op, last])
// worker 1 should block here until worker 2 execute finished, then it can dequeue 2 values from a shared queue
Output: [None, array([[-0.76494521],
[-0.11216009]], dtype=float32)]
import tensorflow as tf
from tensorflow.python.ops import data_flow_ops
cluster = tf.train.ClusterSpec({"ps": ['localhost:2222'], "worker": ['localhost:2224','localhost:2225']})
server = tf.train.Server(cluster.as_cluster_def(), job_name='worker', task_index=1)
with tf.device("/job:ps/task:0"):
test = tf.get_variable("test", [1])
queue = (data_flow_ops.FIFOQueue(2,tf.float32,test.get_shape(),test.name))
enqueue_op = queue.enqueue([test])
init_op = tf.initialize_all_variables()
sv = tf.train.Supervisor(logdir="./log")
sess_config = tf.ConfigProto()
sess = sv.prepare_or_wait_for_session(server.target, sess_config)
sess.run(init_op)
coord = tf.train.Coordinator()
threds = tf.train.start_queue_runners(sess=sess, coord=coord)
sess.run(enqueue_op, last)