Created
February 19, 2018 04:32
-
-
Save uberscientist/263d672fc1f27f03a31febf90f2c0dd9 to your computer and use it in GitHub Desktop.
Example code for figure 13
"Pub-Sub Network with a Proxy"
http://zguide.zeromq.org/page:all#The-Dynamic-Discovery-Problem
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import zmq | |
from threading import Thread | |
from time import sleep | |
from random import randint | |
# Setup sockets for the proxy XSUB/XPUB and bind them to separate ports | |
ctx = zmq.Context() | |
xsub_sock = ctx.socket(zmq.XSUB) | |
xpub_sock = ctx.socket(zmq.XPUB) | |
xsub_sock.bind('tcp://127.0.0.1:1234') | |
xpub_sock.bind('tcp://*:1235') | |
def xsub_recv(): | |
# Proxy xsub where publishers connect | |
while True: | |
pub_msg = xsub_sock.recv() | |
print('xsub_recv : ', pub_msg) | |
xpub_sock.send(pub_msg) | |
def xpub_recv(): | |
# Proxy xpub where subscribers connect, and messages are sent by xsub | |
while True: | |
sub_msg = xpub_sock.recv() | |
xsub_sock.send(sub_msg) | |
print('xpub_recv : ', sub_msg) | |
def subscriber_loop(sub_sock, id): | |
# The receive loop for the subscribers | |
while True: | |
msg = sub_sock.recv_string() | |
print("SUB", id, ":", msg) | |
def subscriber_setup(): | |
# Creates 5 subscribers, each filtering for a specific topic | |
for i in range(0, 5): | |
print(i) | |
sub_sock = ctx.socket(zmq.SUB) | |
sub_sock.connect('tcp://127.0.0.1:1235') | |
sub_sock.setsockopt_string(zmq.SUBSCRIBE, 'TOPIC_{i}'.format(i=i)) | |
Thread(target=subscriber_loop, args=(sub_sock, i)).start() | |
def publisher_loop(): | |
# This thread publishes a message to a random topic every second | |
pub_sock = ctx.socket(zmq.PUB) | |
pub_sock.connect('tcp://127.0.0.1:1234') | |
while True: | |
sleep(1) | |
pub_sock.send_string("TOPIC_{id} Hello World!".format(id=randint(0, 5))) | |
# Spin up all the threads | |
Thread(target=xsub_recv).start() | |
Thread(target=xpub_recv).start() | |
subscriber_setup() | |
Thread(target=publisher_loop).start() | |
# Here we setup a subscriber that receives messages of any topic | |
sub_sock = ctx.socket(zmq.SUB) | |
sub_sock.connect('tcp://127.0.0.1:1235') | |
sub_sock.setsockopt_string(zmq.SUBSCRIBE, '') | |
Thread(target=subscriber_loop, args=(sub_sock, 'ALL')).start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment