Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Example of the ZeroMQ XPUB socket, that shows how the producer can be controlled by the number of subscribers
import thread
import time
import zmq
from zmq.core.socket import Socket
# global zmg context
context = zmq.Context()
endpoint = "tcp://*:8888"
# the subscriber thread function
def subscriber(name, address, cnt, subscriptions):
print ("starting worker thread %s subscribing to %s for %s"%(name,address,subscriptions))
sub = Socket(context, zmq.SUB)
sub.connect(address)
for subscription in subscriptions:
sub.setsockopt(zmq.SUBSCRIBE, subscription)
for x in range(0, cnt):
print ("%s received %s" % (name, sub.recv()))
print ("%s closing socket after %d messages" % (name, cnt))
sub.close()
def main():
publisher = Socket(context, zmq.XPUB)
publisher.bind(endpoint)
address = "tcp://localhost:8888"
thread.start_new(subscriber, ("s1", address, 10, ["a", "b"]))
thread.start_new(subscriber, ("s2", address, 20, ["b", "c"]))
subscriptions = []
r = 0
while True:
# handle subscription flow first to decide what messages need to be produced
while True:
try:
rc = publisher.recv(zmq.NOBLOCK)
subscription = rc[1:]
status = rc[0]== "\x01"
method = subscriptions.append if status else subscriptions.remove
method(subscription)
except zmq.ZMQError:
break
# produce a value for each existing subscription
for subscription in subscriptions:
print "sending " + subscription
publisher.send("%s %d" % (subscription, r))
time.sleep(0.5)
r += 1
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment