Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Bidirectional communication between two processes with pyzmq
#!/usr/bin/env python
"""
Bidirectional communication between two processes.
"""
import numpy as np
import zmq
import multiprocessing as mp
# Set this to True to use durable sockets:
durable = False
def is_poll_in(sock, poller, timeout=100):
"""
Check whether a poller detects incoming data on a specified
socket.
"""
socks = dict(poller.poll(timeout))
if sock in socks and socks[sock] == zmq.POLLIN:
return True
else:
return False
class Module(mp.Process):
def __init__(self, *args, **kwargs):
self.id = kwargs.pop('id')
self.local_ports = kwargs.pop('local_ports')
self.remote_ports = kwargs.pop('remote_ports')
mp.Process.__init__(self, *args, **kwargs)
def run(self):
np.random.seed(self.id)
# Set up connections:
self.ctx = zmq.Context()
self.poller = zmq.Poller()
self.in_sock = self.ctx.socket(zmq.SUB)
if durable:
self.in_sock.setsockopt(zmq.IDENTITY, str(self.id) + '_data')
self.in_sock.setsockopt(zmq.SUBSCRIBE, '')
self.in_sock.connect('tcp://localhost:%i' % self.local_ports[0])
self.poller.register(self.in_sock, zmq.POLLIN)
self.in_ctrl_sock = self.ctx.socket(zmq.SUB)
if durable:
self.in_ctrl_sock.setsockopt(zmq.IDENTITY, str(self.id) + '_ctrl')
self.in_ctrl_sock.setsockopt(zmq.SUBSCRIBE, '')
self.in_ctrl_sock.connect('tcp://localhost:%i' % self.local_ports[1])
self.poller.register(self.in_ctrl_sock, zmq.POLLIN)
self.out_sock = self.ctx.socket(zmq.PUB)
self.out_sock.bind('tcp://*:%i' % self.remote_ports[0])
if durable:
self.out_sock.setsockopt(zmq.HWM, 1)
self.out_ctrl_sock = self.ctx.socket(zmq.PUB)
self.out_ctrl_sock.bind('tcp://*:%i' % self.remote_ports[1])
if durable:
self.out_ctrl_sock.setsockopt(zmq.HWM, 1)
remote_det = False
sync_ack = False
while True:
# Send phony data until the remote module sends an acknowledgment:
if not sync_ack:
self.out_sock.send_pyobj('init')
# Check for sync signal:
if is_poll_in(self.in_ctrl_sock, self.poller):
_ = self.in_ctrl_sock.recv_pyobj()
sync_ack = True
# Send an acknowledgment when phony data is first detected:
socks = dict(self.poller.poll(100))
if is_poll_in(self.in_sock, self.poller):
if self.in_sock.recv_pyobj() == 'init':
self.out_ctrl_sock.send_pyobj('')
remote_det = True
# Move on after synchronization has been achieved:
if remote_det and sync_ack:
break
# Transmit actual data between modules:
for i in xrange(100):
if is_poll_in(self.in_sock, self.poller):
# Read input from remote module:
in_data = self.in_sock.recv_pyobj()
print 'module %i in : %s ' % (self.id, str(in_data))
# Send output to remote module:
out_data = np.random.rand(3)
print 'module %i out: %s ' % (self.id, str(out_data))
self.out_sock.send_pyobj(out_data)
if __name__ == '__main__':
m1 = Module(id=0, local_ports=(5000, 5001), remote_ports=(5002, 5003))
m2 = Module(id=1, local_ports=(5002, 5003), remote_ports=(5000, 5001))
m1.start()
m2.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.