Skip to content

Embed URL

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
"""
Simple task farm, with routed replies in pyzmq
For http://stackoverflow.com/questions/7809200/implementing-task-farm-messaging-pattern-with-zeromq
Note that things are run in threads to keep stuff in one file, there is no
reason they need to be.
License: Public Domain
"""
import os
import time
import random
import zmq
import threading
ctx = zmq.Context.instance()
client_iface = "tcp://127.0.0.1:5555"
engine_iface = "tcp://127.0.0.1:5556"
def scheduler():
"""ROUTER-DEALER queue device, for load-balancing requests from clients
across engines, and routing replies to the originating client."""
router = ctx.socket(zmq.ROUTER)
router.bind(client_iface)
dealer = ctx.socket(zmq.DEALER)
# this is optional, it just makes identities more obvious when they appear
dealer.setsockopt(zmq.IDENTITY, b'Controller.DEALER')
dealer.bind(engine_iface)
# the remainder of this function can be entirely replaced with
# zmq.device(zmq.QUEUE, router, dealer)
# but this shows what is really going on:
poller = zmq.Poller()
poller.register(router, zmq.POLLIN)
poller.register(dealer, zmq.POLLIN)
while True:
evts = dict(poller.poll())
# poll() returns a list of tuples [(socket, evt), (socket, evt)]
# dict(poll()) turns this into {socket:evt, socket:evt}
if router in evts:
msg = router.recv_multipart()
# ROUTER sockets prepend the identity of the sender, for routing replies
client = msg[0]
print "Controller.ROUTER received %s, relaying via DEALER" % msg
dealer.send_multipart(msg)
if dealer in evts:
msg = dealer.recv_multipart()
client = msg[0]
print "Controller.DEALER received %s, relaying via ROUTER" % msg
router.send_multipart(msg)
def process_request(msg):
"""process the message (reverse letters)"""
return [ part[::-1] for part in msg ]
def engine(id):
"""The engine - receives messages, performs some action, and sends a reply,
preserving the leading two message parts as routing identities
"""
s = ctx.socket(zmq.ROUTER)
s.connect(engine_iface)
while True:
msg = s.recv_multipart()
print "engine %s recvd message:" % id, msg
# note that the first two parts will be [b'Controller.ROUTER', b'Client.<id>']
# these are needed for the reply to propagate up to the right client
idents, request = msg[:2], msg[2:]
reply = idents + process_request(request)
print "engine %s sending reply:" % id, reply
s.send_multipart(reply)
def client(id, n):
"""The client - sends messages, and receives replies after they
have been processed by the """
s = ctx.socket(zmq.DEALER)
s.identity = "Client.%s" % id
s.connect(client_iface)
for i in range(n):
print
msg = ["hello", "world", str(random.randint(10,100))]
print "client %s sending :" % id, msg
s.send_multipart(msg)
msg = s.recv_multipart()
print "client %s received:" % id, msg
time.sleep(0.1)
if __name__ == '__main__':
st = threading.Thread(target=scheduler)
st.daemon=True
st.start()
engines = []
for i in range(4):
t = threading.Thread(target=engine, args=(i,))
t.daemon=True
t.start()
engines.append(t)
# now start a few clients, and fire off some requests
clients = []
for i in range(3):
t = threading.Thread(target=client, args=(i,12))
t.start()
# remove this t.join() to allow clients to be run concurrently.
# this will work just fine, but the print-statements will
# be harder to follow
t.join()
clients.append(t)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.