Skip to content

Instantly share code, notes, and snippets.

@uberscientist
Created February 19, 2018 04:32
Show Gist options
  • Save uberscientist/263d672fc1f27f03a31febf90f2c0dd9 to your computer and use it in GitHub Desktop.
Save uberscientist/263d672fc1f27f03a31febf90f2c0dd9 to your computer and use it in GitHub Desktop.
Example code for figure 13 "Pub-Sub Network with a Proxy" http://zguide.zeromq.org/page:all#The-Dynamic-Discovery-Problem
import zmq
from threading import Thread
from time import sleep
from random import randint
# Setup sockets for the proxy XSUB/XPUB and bind them to separate ports
ctx = zmq.Context()
xsub_sock = ctx.socket(zmq.XSUB)
xpub_sock = ctx.socket(zmq.XPUB)
xsub_sock.bind('tcp://127.0.0.1:1234')
xpub_sock.bind('tcp://*:1235')
def xsub_recv():
# Proxy xsub where publishers connect
while True:
pub_msg = xsub_sock.recv()
print('xsub_recv : ', pub_msg)
xpub_sock.send(pub_msg)
def xpub_recv():
# Proxy xpub where subscribers connect, and messages are sent by xsub
while True:
sub_msg = xpub_sock.recv()
xsub_sock.send(sub_msg)
print('xpub_recv : ', sub_msg)
def subscriber_loop(sub_sock, id):
# The receive loop for the subscribers
while True:
msg = sub_sock.recv_string()
print("SUB", id, ":", msg)
def subscriber_setup():
# Creates 5 subscribers, each filtering for a specific topic
for i in range(0, 5):
print(i)
sub_sock = ctx.socket(zmq.SUB)
sub_sock.connect('tcp://127.0.0.1:1235')
sub_sock.setsockopt_string(zmq.SUBSCRIBE, 'TOPIC_{i}'.format(i=i))
Thread(target=subscriber_loop, args=(sub_sock, i)).start()
def publisher_loop():
# This thread publishes a message to a random topic every second
pub_sock = ctx.socket(zmq.PUB)
pub_sock.connect('tcp://127.0.0.1:1234')
while True:
sleep(1)
pub_sock.send_string("TOPIC_{id} Hello World!".format(id=randint(0, 5)))
# Spin up all the threads
Thread(target=xsub_recv).start()
Thread(target=xpub_recv).start()
subscriber_setup()
Thread(target=publisher_loop).start()
# Here we setup a subscriber that receives messages of any topic
sub_sock = ctx.socket(zmq.SUB)
sub_sock.connect('tcp://127.0.0.1:1235')
sub_sock.setsockopt_string(zmq.SUBSCRIBE, '')
Thread(target=subscriber_loop, args=(sub_sock, 'ALL')).start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment