Skip to content

Instantly share code, notes, and snippets.

@anopheles
Created September 12, 2012 13:34
Show Gist options
  • Save anopheles/3706633 to your computer and use it in GitHub Desktop.
Save anopheles/3706633 to your computer and use it in GitHub Desktop.
Router Dealer example with bidirectional communication
# encoding: utf-8
import zmq
from collections import defaultdict
context = zmq.Context()
client = context.socket(zmq.ROUTER)
client.bind("tcp://*:5556")
poll = zmq.Poller()
poll.register(client, zmq.POLLIN)
counter = defaultdict(int)
while True:
# handle input
sockets = dict(poll.poll(1000))
if sockets:
identity = client.recv()
msg = client.recv()
counter[identity] += 1
# start recording
for identity in counter.keys():
client.send(identity, zmq.SNDMORE)
client.send("START")
print counter
# encoding: utf-8
import random
import zmq
import time
context = zmq.Context()
worker = context.socket(zmq.DEALER)
worker.setsockopt(zmq.IDENTITY, str(random.randint(0, 8000)))
worker.connect("tcp://localhost:5556")
start = False
worker.send("Hello")
while True:
if start:
worker.send("recording data: %s" % random.randint(0,100))
time.sleep(0.5)
request = worker.recv()
if request == "START":
start = True
if request == "STOP":
start = False
if request == "END":
print "A is finishing"
break
@anopheles
Copy link
Author

The balancing issue was caused by design! The workers need a fair amount of time to initialize. Putting a time.sleep(0.5) after the worker.send() in line 14 in the worker.py the messages arrive evenly.

@ellingsonr
Copy link

I am trying to achieve two way communication using a dealer and a router. I modified your example slightly:

import time
from threading import Thread
import zmq

def worker_thread():
    cxt = zmq.Context.instance()
    worker = cxt.socket(zmq.DEALER)
    worker.setsockopt(zmq.IDENTITY, 'A')
    worker.connect("tcp://127.0.0.1:5559")

    for _ in range(10):
        request = worker.recv()
        print 'worker recieved'
        worker.send_multipart(['A', "data_recieved"])

cxt = zmq.Context.instance()
client = cxt.socket(zmq.ROUTER)
client.bind('tcp://127.0.0.1:5559')

Thread(target=worker_thread).start()
time.sleep(2)

for _ in range(10):
    client.send_multipart(['A', 'data'])
    request = client.recv()
    print 'worker responded'

This seems to work as expected. There is an exchange of 10 messages. However, when I split the worker and client code into two different script files, as seen below, the client sends a message, and the worker blocks infinitely while waiting to receive it. Why would this happen?

Client:

import time
import zmq


cxt = zmq.Context.instance()
client = cxt.socket(zmq.ROUTER)
client.bind('tcp://127.0.0.1:5559')

for _ in range(10):
    client.send_multipart(['A', 'data'])
    request = client.recv()
    print 'worker responded'

Worker:

import time
import zmq

cxt = zmq.Context.instance()
worker = cxt.socket(zmq.DEALER)
worker.setsockopt(zmq.IDENTITY, 'A')
worker.connect("tcp://127.0.0.1:5559")

for _ in range(10):
    request = worker.recv()
    print 'worker recieved'
    worker.send_multipart(['A', "data_recieved"])

I am new to ZeroMQ and relatively new to python, so I apologise if this issue is caused by something obvious or mundane.

@divmgl
Copy link

divmgl commented Nov 13, 2018

I am trying to achieve two way communication using a dealer and a router. I modified your example slightly:

import time
from threading import Thread
import zmq

def worker_thread():
    cxt = zmq.Context.instance()
    worker = cxt.socket(zmq.DEALER)
    worker.setsockopt(zmq.IDENTITY, 'A')
    worker.connect("tcp://127.0.0.1:5559")

    for _ in range(10):
        request = worker.recv()
        print 'worker recieved'
        worker.send_multipart(['A', "data_recieved"])

cxt = zmq.Context.instance()
client = cxt.socket(zmq.ROUTER)
client.bind('tcp://127.0.0.1:5559')

Thread(target=worker_thread).start()
time.sleep(2)

for _ in range(10):
    client.send_multipart(['A', 'data'])
    request = client.recv()
    print 'worker responded'

This seems to work as expected. There is an exchange of 10 messages. However, when I split the worker and client code into two different script files, as seen below, the client sends a message, and the worker blocks infinitely while waiting to receive it. Why would this happen?

Client:

import time
import zmq


cxt = zmq.Context.instance()
client = cxt.socket(zmq.ROUTER)
client.bind('tcp://127.0.0.1:5559')

for _ in range(10):
    client.send_multipart(['A', 'data'])
    request = client.recv()
    print 'worker responded'

Worker:

import time
import zmq

cxt = zmq.Context.instance()
worker = cxt.socket(zmq.DEALER)
worker.setsockopt(zmq.IDENTITY, 'A')
worker.connect("tcp://127.0.0.1:5559")

for _ in range(10):
    request = worker.recv()
    print 'worker recieved'
    worker.send_multipart(['A', "data_recieved"])

I am new to ZeroMQ and relatively new to python, so I apologise if this issue is caused by something obvious or mundane.

I don't believe recv is implemented in zmq.DEALER, only recv_multipart.

@ashwynh21
Copy link

A simple and elegant example, thank you...it cleared a lot of misunderstandings I had from working with ZMQ...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment