Skip to content

Instantly share code, notes, and snippets.

@anopheles
Created September 12, 2012 13:34
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save anopheles/3706633 to your computer and use it in GitHub Desktop.
Save anopheles/3706633 to your computer and use it in GitHub Desktop.
Router Dealer example with bidirectional communication
# encoding: utf-8
import zmq
from collections import defaultdict
context = zmq.Context()
client = context.socket(zmq.ROUTER)
client.bind("tcp://*:5556")
poll = zmq.Poller()
poll.register(client, zmq.POLLIN)
counter = defaultdict(int)
while True:
# handle input
sockets = dict(poll.poll(1000))
if sockets:
identity = client.recv()
msg = client.recv()
counter[identity] += 1
# start recording
for identity in counter.keys():
client.send(identity, zmq.SNDMORE)
client.send("START")
print counter
# encoding: utf-8
import random
import zmq
import time
context = zmq.Context()
worker = context.socket(zmq.DEALER)
worker.setsockopt(zmq.IDENTITY, str(random.randint(0, 8000)))
worker.connect("tcp://localhost:5556")
start = False
worker.send("Hello")
while True:
if start:
worker.send("recording data: %s" % random.randint(0,100))
time.sleep(0.5)
request = worker.recv()
if request == "START":
start = True
if request == "STOP":
start = False
if request == "END":
print "A is finishing"
break
@anopheles
Copy link
Author

This example suffers from balancing issues.

The router simply waits for a welcome message from the workers and
replies with a "START" command which initiates the sending of random
data at the worker process.

The router process counts the amount of messages it receives from each
client. As already stated, it seems as if some client messages are
received more frequently. After a few seconds this is returned by the
counter dictionary: {'5906': 1, '3801': 147313, '4712': 24986},
meaning that the router received over 24986 messages from worker with
id 4721.

@anopheles
Copy link
Author

The balancing issue was caused by design! The workers need a fair amount of time to initialize. Putting a time.sleep(0.5) after the worker.send() in line 14 in the worker.py the messages arrive evenly.

@ellingsonr
Copy link

I am trying to achieve two way communication using a dealer and a router. I modified your example slightly:

import time
from threading import Thread
import zmq

def worker_thread():
    cxt = zmq.Context.instance()
    worker = cxt.socket(zmq.DEALER)
    worker.setsockopt(zmq.IDENTITY, 'A')
    worker.connect("tcp://127.0.0.1:5559")

    for _ in range(10):
        request = worker.recv()
        print 'worker recieved'
        worker.send_multipart(['A', "data_recieved"])

cxt = zmq.Context.instance()
client = cxt.socket(zmq.ROUTER)
client.bind('tcp://127.0.0.1:5559')

Thread(target=worker_thread).start()
time.sleep(2)

for _ in range(10):
    client.send_multipart(['A', 'data'])
    request = client.recv()
    print 'worker responded'

This seems to work as expected. There is an exchange of 10 messages. However, when I split the worker and client code into two different script files, as seen below, the client sends a message, and the worker blocks infinitely while waiting to receive it. Why would this happen?

Client:

import time
import zmq


cxt = zmq.Context.instance()
client = cxt.socket(zmq.ROUTER)
client.bind('tcp://127.0.0.1:5559')

for _ in range(10):
    client.send_multipart(['A', 'data'])
    request = client.recv()
    print 'worker responded'

Worker:

import time
import zmq

cxt = zmq.Context.instance()
worker = cxt.socket(zmq.DEALER)
worker.setsockopt(zmq.IDENTITY, 'A')
worker.connect("tcp://127.0.0.1:5559")

for _ in range(10):
    request = worker.recv()
    print 'worker recieved'
    worker.send_multipart(['A', "data_recieved"])

I am new to ZeroMQ and relatively new to python, so I apologise if this issue is caused by something obvious or mundane.

@divmgl
Copy link

divmgl commented Nov 13, 2018

I am trying to achieve two way communication using a dealer and a router. I modified your example slightly:

import time
from threading import Thread
import zmq

def worker_thread():
    cxt = zmq.Context.instance()
    worker = cxt.socket(zmq.DEALER)
    worker.setsockopt(zmq.IDENTITY, 'A')
    worker.connect("tcp://127.0.0.1:5559")

    for _ in range(10):
        request = worker.recv()
        print 'worker recieved'
        worker.send_multipart(['A', "data_recieved"])

cxt = zmq.Context.instance()
client = cxt.socket(zmq.ROUTER)
client.bind('tcp://127.0.0.1:5559')

Thread(target=worker_thread).start()
time.sleep(2)

for _ in range(10):
    client.send_multipart(['A', 'data'])
    request = client.recv()
    print 'worker responded'

This seems to work as expected. There is an exchange of 10 messages. However, when I split the worker and client code into two different script files, as seen below, the client sends a message, and the worker blocks infinitely while waiting to receive it. Why would this happen?

Client:

import time
import zmq


cxt = zmq.Context.instance()
client = cxt.socket(zmq.ROUTER)
client.bind('tcp://127.0.0.1:5559')

for _ in range(10):
    client.send_multipart(['A', 'data'])
    request = client.recv()
    print 'worker responded'

Worker:

import time
import zmq

cxt = zmq.Context.instance()
worker = cxt.socket(zmq.DEALER)
worker.setsockopt(zmq.IDENTITY, 'A')
worker.connect("tcp://127.0.0.1:5559")

for _ in range(10):
    request = worker.recv()
    print 'worker recieved'
    worker.send_multipart(['A', "data_recieved"])

I am new to ZeroMQ and relatively new to python, so I apologise if this issue is caused by something obvious or mundane.

I don't believe recv is implemented in zmq.DEALER, only recv_multipart.

@ashwynh21
Copy link

A simple and elegant example, thank you...it cleared a lot of misunderstandings I had from working with ZMQ...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment