Created
August 19, 2013 01:17
-
-
Save minrk/6265022 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Example showing splitting of a DEALER socket into a PUSH and PULL thread | |
| for separate sending and receiving. | |
| This is an example of using inproc and multiple sockets to deal with threadsafety. | |
| """ | |
| from __future__ import print_function | |
| import time | |
| from threading import Thread | |
| import zmq | |
| def dealer_thread(remote_url, send_url, recv_url): | |
| """This thread takes a DEALER and splits its send and recv | |
| across a PUSH socket and a PULL socket, | |
| so that send and recv can be handled in separate threads. | |
| """ | |
| ctx = zmq.Context.instance() | |
| dealer = ctx.socket(zmq.DEALER) | |
| pull = ctx.socket(zmq.PULL) | |
| push = ctx.socket(zmq.PUSH) | |
| dealer.connect(remote_url) | |
| pull.bind(send_url) | |
| push.bind(recv_url) | |
| poller = zmq.Poller() | |
| poller.register(pull, zmq.POLLIN) | |
| poller.register(dealer, zmq.POLLIN) | |
| while True: | |
| events = dict(poller.poll(1000)) | |
| if dealer in events: | |
| # dealer received a message, forward to receiver thread | |
| msg = dealer.recv_multipart() | |
| push.send_multipart(msg) | |
| if pull in events: | |
| # sender sent a message, forward it to remote | |
| msg = pull.recv_multipart() | |
| dealer.send_multipart(msg) | |
| def sender_thread(send_url): | |
| """This thread sends messages on a PUSH socket""" | |
| ctx = zmq.Context.instance() | |
| sender = ctx.socket(zmq.PUSH) | |
| sender.connect(send_url) | |
| t0 = time.time() | |
| while True: | |
| msg = [b"%.1f" % (time.time() - t0)] | |
| print ("sending", msg) | |
| sender.send_multipart(msg) | |
| time.sleep(0.5) | |
| def receiver_thread(recv_url): | |
| """This thread receives messages on a PULL socket""" | |
| ctx = zmq.Context.instance() | |
| receiver = ctx.socket(zmq.PULL) | |
| receiver.connect(recv_url) | |
| while True: | |
| msg = receiver.recv_multipart() | |
| print("received", msg) | |
| def echo_thread(remote_url): | |
| """this is our toy remote ROUTER service | |
| all it does is echo messages back with 'echo' tacked on the end. | |
| """ | |
| ctx = zmq.Context.instance() | |
| echo = ctx.socket(zmq.ROUTER) | |
| echo.bind(remote_url) | |
| while True: | |
| msg = echo.recv_multipart() | |
| msg.append("echo") | |
| print ("echoing", msg) | |
| echo.send_multipart(msg) | |
| def main(): | |
| remote = "tcp://127.0.0.1:5555" | |
| send = "inproc://send" | |
| recv = "inproc://recv" | |
| dealer = Thread(target=dealer_thread, args=(remote, send, recv)) | |
| dealer.start() | |
| time.sleep(1) | |
| echo = Thread(target=echo_thread, args=(remote,)) | |
| echo.start() | |
| send = Thread(target=sender_thread, args=(send,)) | |
| send.start() | |
| recv = Thread(target=receiver_thread, args=(recv,)) | |
| recv.start() | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment