Skip to content

Instantly share code, notes, and snippets.

@minrk
Forked from m0sa/ZeroMQ-XPUB.py
Created April 3, 2012 23:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save minrk/2296266 to your computer and use it in GitHub Desktop.
Save minrk/2296266 to your computer and use it in GitHub Desktop.
Example of the ZeroMQ XPUB socket, that shows how the producer can be controlled by the number of subscribers
import thread
import time
import zmq
# global zmg context
context = zmq.Context()
endpoint = "tcp://*:8888"
# the subscriber thread function
def subscriber(name, address, cnt, subscriptions):
print ("starting worker thread %s subscribing to %s for %s"%(name,address,subscriptions))
sub = context.socket(zmq.SUB)
sub.connect(address)
for subscription in subscriptions:
sub.setsockopt(zmq.SUBSCRIBE, subscription)
for x in range(0, cnt):
print ("%s received %s" % (name, sub.recv()))
print ("%s closing socket after %d messages" % (name, cnt))
sub.close()
def main():
publisher = context.socket(zmq.XPUB)
publisher.bind(endpoint)
address = "tcp://localhost:8888"
thread.start_new(subscriber, ("s1", address, 10, ["a", "b"]))
thread.start_new(subscriber, ("s2", address, 20, ["b", "c"]))
subscriptions = []
r = 0
while True:
# handle subscription flow first to decide what messages need to be produced
while True:
try:
rc = publisher.recv(zmq.NOBLOCK)
subscription = rc[1:]
status = rc[0]== "\x01"
method = subscriptions.append if status else subscriptions.remove
method(subscription)
except zmq.ZMQError:
break
# produce a value for each existing subscription
for subscription in subscriptions:
print "sending " + subscription
publisher.send("%s %d" % (subscription, r))
time.sleep(0.5)
r += 1
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment