Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Bidirectional communication between two processes with pyzmq
#!/usr/bin/env python
Bidirectional communication between two processes.
import numpy as np
import zmq
import multiprocessing as mp
# Set this to True to use durable sockets:
durable = False
def is_poll_in(sock, poller, timeout=100):
Check whether a poller detects incoming data on a specified
socks = dict(poller.poll(timeout))
if sock in socks and socks[sock] == zmq.POLLIN:
return True
return False
class Module(mp.Process):
def __init__(self, *args, **kwargs): = kwargs.pop('id')
self.local_ports = kwargs.pop('local_ports')
self.remote_ports = kwargs.pop('remote_ports')
mp.Process.__init__(self, *args, **kwargs)
def run(self):
# Set up connections:
self.ctx = zmq.Context()
self.poller = zmq.Poller()
self.in_sock = self.ctx.socket(zmq.SUB)
if durable:
self.in_sock.setsockopt(zmq.IDENTITY, str( + '_data')
self.in_sock.setsockopt(zmq.SUBSCRIBE, '')
self.in_sock.connect('tcp://localhost:%i' % self.local_ports[0])
self.poller.register(self.in_sock, zmq.POLLIN)
self.in_ctrl_sock = self.ctx.socket(zmq.SUB)
if durable:
self.in_ctrl_sock.setsockopt(zmq.IDENTITY, str( + '_ctrl')
self.in_ctrl_sock.setsockopt(zmq.SUBSCRIBE, '')
self.in_ctrl_sock.connect('tcp://localhost:%i' % self.local_ports[1])
self.poller.register(self.in_ctrl_sock, zmq.POLLIN)
self.out_sock = self.ctx.socket(zmq.PUB)
self.out_sock.bind('tcp://*:%i' % self.remote_ports[0])
if durable:
self.out_sock.setsockopt(zmq.HWM, 1)
self.out_ctrl_sock = self.ctx.socket(zmq.PUB)
self.out_ctrl_sock.bind('tcp://*:%i' % self.remote_ports[1])
if durable:
self.out_ctrl_sock.setsockopt(zmq.HWM, 1)
remote_det = False
sync_ack = False
while True:
# Send phony data until the remote module sends an acknowledgment:
if not sync_ack:
# Check for sync signal:
if is_poll_in(self.in_ctrl_sock, self.poller):
_ = self.in_ctrl_sock.recv_pyobj()
sync_ack = True
# Send an acknowledgment when phony data is first detected:
socks = dict(self.poller.poll(100))
if is_poll_in(self.in_sock, self.poller):
if self.in_sock.recv_pyobj() == 'init':
remote_det = True
# Move on after synchronization has been achieved:
if remote_det and sync_ack:
# Transmit actual data between modules:
for i in xrange(100):
if is_poll_in(self.in_sock, self.poller):
# Read input from remote module:
in_data = self.in_sock.recv_pyobj()
print 'module %i in : %s ' % (, str(in_data))
# Send output to remote module:
out_data = np.random.rand(3)
print 'module %i out: %s ' % (, str(out_data))
if __name__ == '__main__':
m1 = Module(id=0, local_ports=(5000, 5001), remote_ports=(5002, 5003))
m2 = Module(id=1, local_ports=(5002, 5003), remote_ports=(5000, 5001))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.