Create a gist now

Instantly share code, notes, and snippets.

@minrk /farm.py
Created Nov 11, 2011

"""
Simple task farm, with routed replies in pyzmq
For http://stackoverflow.com/questions/7809200/implementing-task-farm-messaging-pattern-with-zeromq
Note that things are run in threads to keep stuff in one file, there is no
reason they need to be.
License: Public Domain
"""
import os
import time
import random
import zmq
import threading
ctx = zmq.Context.instance()
client_iface = "tcp://127.0.0.1:5555"
engine_iface = "tcp://127.0.0.1:5556"
def scheduler():
"""ROUTER-DEALER queue device, for load-balancing requests from clients
across engines, and routing replies to the originating client."""
router = ctx.socket(zmq.ROUTER)
router.bind(client_iface)
dealer = ctx.socket(zmq.DEALER)
# this is optional, it just makes identities more obvious when they appear
dealer.setsockopt(zmq.IDENTITY, b'Controller.DEALER')
dealer.bind(engine_iface)
# the remainder of this function can be entirely replaced with
# zmq.device(zmq.QUEUE, router, dealer)
# but this shows what is really going on:
poller = zmq.Poller()
poller.register(router, zmq.POLLIN)
poller.register(dealer, zmq.POLLIN)
while True:
evts = dict(poller.poll())
# poll() returns a list of tuples [(socket, evt), (socket, evt)]
# dict(poll()) turns this into {socket:evt, socket:evt}
if router in evts:
msg = router.recv_multipart()
# ROUTER sockets prepend the identity of the sender, for routing replies
client = msg[0]
print "Controller.ROUTER received %s, relaying via DEALER" % msg
dealer.send_multipart(msg)
if dealer in evts:
msg = dealer.recv_multipart()
client = msg[0]
print "Controller.DEALER received %s, relaying via ROUTER" % msg
router.send_multipart(msg)
def process_request(msg):
"""process the message (reverse letters)"""
return [ part[::-1] for part in msg ]
def engine(id):
"""The engine - receives messages, performs some action, and sends a reply,
preserving the leading two message parts as routing identities
"""
s = ctx.socket(zmq.ROUTER)
s.connect(engine_iface)
while True:
msg = s.recv_multipart()
print "engine %s recvd message:" % id, msg
# note that the first two parts will be [b'Controller.ROUTER', b'Client.<id>']
# these are needed for the reply to propagate up to the right client
idents, request = msg[:2], msg[2:]
reply = idents + process_request(request)
print "engine %s sending reply:" % id, reply
s.send_multipart(reply)
def client(id, n):
"""The client - sends messages, and receives replies after they
have been processed by the """
s = ctx.socket(zmq.DEALER)
s.identity = "Client.%s" % id
s.connect(client_iface)
for i in range(n):
print
msg = ["hello", "world", str(random.randint(10,100))]
print "client %s sending :" % id, msg
s.send_multipart(msg)
msg = s.recv_multipart()
print "client %s received:" % id, msg
time.sleep(0.1)
if __name__ == '__main__':
st = threading.Thread(target=scheduler)
st.daemon=True
st.start()
engines = []
for i in range(4):
t = threading.Thread(target=engine, args=(i,))
t.daemon=True
t.start()
engines.append(t)
# now start a few clients, and fire off some requests
clients = []
for i in range(3):
t = threading.Thread(target=client, args=(i,12))
t.start()
# remove this t.join() to allow clients to be run concurrently.
# this will work just fine, but the print-statements will
# be harder to follow
t.join()
clients.append(t)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment