Skip to content

Instantly share code, notes, and snippets.

@betapcode
Forked from m0sa/ZeroMQ-XPUB.py
Created November 9, 2016 06:30
Show Gist options
  • Save betapcode/5a319060458c71954c5a371befacc3ec to your computer and use it in GitHub Desktop.
Save betapcode/5a319060458c71954c5a371befacc3ec to your computer and use it in GitHub Desktop.
Example of the ZeroMQ XPUB socket, that shows how the producer can be controlled by the number of subscribers
import thread
import time
import zmq
from zmq.core.socket import Socket
# global zmg context
context = zmq.Context()
endpoint = "tcp://*:8888"
# the subscriber thread function
def subscriber(name, address, cnt, subscriptions):
print ("starting worker thread %s subscribing to %s for %s"%(name,address,subscriptions))
sub = Socket(context, zmq.SUB)
sub.connect(address)
for subscription in subscriptions:
sub.setsockopt(zmq.SUBSCRIBE, subscription)
for x in range(0, cnt):
print ("%s received %s" % (name, sub.recv()))
print ("%s closing socket after %d messages" % (name, cnt))
sub.close()
def main():
publisher = Socket(context, zmq.XPUB)
publisher.bind(endpoint)
address = "tcp://localhost:8888"
thread.start_new(subscriber, ("s1", address, 10, ["a", "b"]))
thread.start_new(subscriber, ("s2", address, 20, ["b", "c"]))
subscriptions = []
r = 0
while True:
# handle subscription flow first to decide what messages need to be produced
while True:
try:
rc = publisher.recv(zmq.NOBLOCK)
subscription = rc[1:]
status = rc[0]== "\x01"
method = subscriptions.append if status else subscriptions.remove
method(subscription)
except zmq.ZMQError:
break
# produce a value for each existing subscription
for subscription in subscriptions:
print "sending " + subscription
publisher.send("%s %d" % (subscription, r))
time.sleep(0.5)
r += 1
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment