Skip to content

Instantly share code, notes, and snippets.

@kianby
Last active November 25, 2023 07:28
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save kianby/e1d455e5fb2a14f8dee3c02c337527f5 to your computer and use it in GitHub Desktop.
Save kianby/e1d455e5fb2a14f8dee3c02c337527f5 to your computer and use it in GitHub Desktop.
ZeroMQ, Python, XSUB / XPUB proxy
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import zmq
def main():
context = zmq.Context()
# Socket facing producers
frontend = context.socket(zmq.XPUB)
frontend.bind("tcp://*:5559")
# Socket facing consumers
backend = context.socket(zmq.XSUB)
backend.bind("tcp://*:5560")
zmq.proxy(frontend, backend)
# We never get here…
frontend.close()
backend.close()
context.term()
if __name__ == "__main__":
main()
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import sys
import zmq
import time
from threading import Thread
class Poller(Thread):
def __init__(self, id, topic):
super().__init__()
self.id = id
self.topic = topic
def run(self):
print('start poller {}'.format(self.id))
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://127.0.0.1:5559")
subscriber.setsockopt_string(zmq.SUBSCRIBE, self.topic)
self.loop = True
while self.loop:
message = subscriber.recv()
print('poller {}: {}'.format(self.id, message))
def stop(self):
self.loop = False
context = zmq.Context()
poller1 = Poller(1, 'NASDA')
poller1.start()
poller2 = Poller(2, 'NASDAQ')
poller2.start()
socket = context.socket(zmq.PUB)
socket.connect("tcp://127.0.0.1:5560")
for index in range(3):
time.sleep(2)
socket.send_string('NASDA:' + time.strftime('%H:%M:%S'))
time.sleep(2)
socket.send_string('NASDAQ:' + time.strftime('%H:%M:%S'))
poller1.stop()
poller2.stop()
socket.send_string('NASDAQ:STOP')
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment