Skip to content

Instantly share code, notes, and snippets.

@minrk
Created August 19, 2013 01:17
Show Gist options
  • Save minrk/6265022 to your computer and use it in GitHub Desktop.
Save minrk/6265022 to your computer and use it in GitHub Desktop.
"""
Example showing splitting of a DEALER socket into a PUSH and PULL thread
for separate sending and receiving.
This is an example of using inproc and multiple sockets to deal with threadsafety.
"""
from __future__ import print_function
import time
from threading import Thread
import zmq
def dealer_thread(remote_url, send_url, recv_url):
"""This thread takes a DEALER and splits its send and recv
across a PUSH socket and a PULL socket,
so that send and recv can be handled in separate threads.
"""
ctx = zmq.Context.instance()
dealer = ctx.socket(zmq.DEALER)
pull = ctx.socket(zmq.PULL)
push = ctx.socket(zmq.PUSH)
dealer.connect(remote_url)
pull.bind(send_url)
push.bind(recv_url)
poller = zmq.Poller()
poller.register(pull, zmq.POLLIN)
poller.register(dealer, zmq.POLLIN)
while True:
events = dict(poller.poll(1000))
if dealer in events:
# dealer received a message, forward to receiver thread
msg = dealer.recv_multipart()
push.send_multipart(msg)
if pull in events:
# sender sent a message, forward it to remote
msg = pull.recv_multipart()
dealer.send_multipart(msg)
def sender_thread(send_url):
"""This thread sends messages on a PUSH socket"""
ctx = zmq.Context.instance()
sender = ctx.socket(zmq.PUSH)
sender.connect(send_url)
t0 = time.time()
while True:
msg = [b"%.1f" % (time.time() - t0)]
print ("sending", msg)
sender.send_multipart(msg)
time.sleep(0.5)
def receiver_thread(recv_url):
"""This thread receives messages on a PULL socket"""
ctx = zmq.Context.instance()
receiver = ctx.socket(zmq.PULL)
receiver.connect(recv_url)
while True:
msg = receiver.recv_multipart()
print("received", msg)
def echo_thread(remote_url):
"""this is our toy remote ROUTER service
all it does is echo messages back with 'echo' tacked on the end.
"""
ctx = zmq.Context.instance()
echo = ctx.socket(zmq.ROUTER)
echo.bind(remote_url)
while True:
msg = echo.recv_multipart()
msg.append("echo")
print ("echoing", msg)
echo.send_multipart(msg)
def main():
remote = "tcp://127.0.0.1:5555"
send = "inproc://send"
recv = "inproc://recv"
dealer = Thread(target=dealer_thread, args=(remote, send, recv))
dealer.start()
time.sleep(1)
echo = Thread(target=echo_thread, args=(remote,))
echo.start()
send = Thread(target=sender_thread, args=(send,))
send.start()
recv = Thread(target=receiver_thread, args=(recv,))
recv.start()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment