Create a gist now

Instantly share code, notes, and snippets.

@minrk /
Created Nov 11, 2011

What would you like to do?
Simple task farm, with routed replies in pyzmq
Note that things are run in threads to keep stuff in one file, there is no
reason they need to be.
License: Public Domain
import os
import time
import random
import zmq
import threading
ctx = zmq.Context.instance()
client_iface = "tcp://"
engine_iface = "tcp://"
def scheduler():
"""ROUTER-DEALER queue device, for load-balancing requests from clients
across engines, and routing replies to the originating client."""
router = ctx.socket(zmq.ROUTER)
dealer = ctx.socket(zmq.DEALER)
# this is optional, it just makes identities more obvious when they appear
dealer.setsockopt(zmq.IDENTITY, b'Controller.DEALER')
# the remainder of this function can be entirely replaced with
# zmq.device(zmq.QUEUE, router, dealer)
# but this shows what is really going on:
poller = zmq.Poller()
poller.register(router, zmq.POLLIN)
poller.register(dealer, zmq.POLLIN)
while True:
evts = dict(poller.poll())
# poll() returns a list of tuples [(socket, evt), (socket, evt)]
# dict(poll()) turns this into {socket:evt, socket:evt}
if router in evts:
msg = router.recv_multipart()
# ROUTER sockets prepend the identity of the sender, for routing replies
client = msg[0]
print "Controller.ROUTER received %s, relaying via DEALER" % msg
if dealer in evts:
msg = dealer.recv_multipart()
client = msg[0]
print "Controller.DEALER received %s, relaying via ROUTER" % msg
def process_request(msg):
"""process the message (reverse letters)"""
return [ part[::-1] for part in msg ]
def engine(id):
"""The engine - receives messages, performs some action, and sends a reply,
preserving the leading two message parts as routing identities
s = ctx.socket(zmq.ROUTER)
while True:
msg = s.recv_multipart()
print "engine %s recvd message:" % id, msg
# note that the first two parts will be [b'Controller.ROUTER', b'Client.<id>']
# these are needed for the reply to propagate up to the right client
idents, request = msg[:2], msg[2:]
reply = idents + process_request(request)
print "engine %s sending reply:" % id, reply
def client(id, n):
"""The client - sends messages, and receives replies after they
have been processed by the """
s = ctx.socket(zmq.DEALER)
s.identity = "Client.%s" % id
for i in range(n):
msg = ["hello", "world", str(random.randint(10,100))]
print "client %s sending :" % id, msg
msg = s.recv_multipart()
print "client %s received:" % id, msg
if __name__ == '__main__':
st = threading.Thread(target=scheduler)
engines = []
for i in range(4):
t = threading.Thread(target=engine, args=(i,))
# now start a few clients, and fire off some requests
clients = []
for i in range(3):
t = threading.Thread(target=client, args=(i,12))
# remove this t.join() to allow clients to be run concurrently.
# this will work just fine, but the print-statements will
# be harder to follow
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment