public
Last active

  • Download Gist
farm.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
"""
Simple task farm, with routed replies in pyzmq
 
For http://stackoverflow.com/questions/7809200/implementing-task-farm-messaging-pattern-with-zeromq
 
Note that things are run in threads to keep stuff in one file, there is no
reason they need to be.
 
 
License: Public Domain
"""
 
import os
import time
import random
import zmq
import threading
 
ctx = zmq.Context.instance()
client_iface = "tcp://127.0.0.1:5555"
engine_iface = "tcp://127.0.0.1:5556"
 
def scheduler():
"""ROUTER-DEALER queue device, for load-balancing requests from clients
across engines, and routing replies to the originating client."""
router = ctx.socket(zmq.ROUTER)
router.bind(client_iface)
dealer = ctx.socket(zmq.DEALER)
# this is optional, it just makes identities more obvious when they appear
dealer.setsockopt(zmq.IDENTITY, b'Controller.DEALER')
dealer.bind(engine_iface)
# the remainder of this function can be entirely replaced with
# zmq.device(zmq.QUEUE, router, dealer)
# but this shows what is really going on:
poller = zmq.Poller()
poller.register(router, zmq.POLLIN)
poller.register(dealer, zmq.POLLIN)
while True:
evts = dict(poller.poll())
# poll() returns a list of tuples [(socket, evt), (socket, evt)]
# dict(poll()) turns this into {socket:evt, socket:evt}
if router in evts:
msg = router.recv_multipart()
# ROUTER sockets prepend the identity of the sender, for routing replies
client = msg[0]
print "Controller.ROUTER received %s, relaying via DEALER" % msg
dealer.send_multipart(msg)
if dealer in evts:
msg = dealer.recv_multipart()
client = msg[0]
print "Controller.DEALER received %s, relaying via ROUTER" % msg
router.send_multipart(msg)
 
def process_request(msg):
"""process the message (reverse letters)"""
return [ part[::-1] for part in msg ]
 
def engine(id):
"""The engine - receives messages, performs some action, and sends a reply,
preserving the leading two message parts as routing identities
"""
s = ctx.socket(zmq.ROUTER)
s.connect(engine_iface)
while True:
msg = s.recv_multipart()
print "engine %s recvd message:" % id, msg
# note that the first two parts will be [b'Controller.ROUTER', b'Client.<id>']
# these are needed for the reply to propagate up to the right client
idents, request = msg[:2], msg[2:]
reply = idents + process_request(request)
print "engine %s sending reply:" % id, reply
s.send_multipart(reply)
 
def client(id, n):
"""The client - sends messages, and receives replies after they
have been processed by the """
s = ctx.socket(zmq.DEALER)
s.identity = "Client.%s" % id
s.connect(client_iface)
for i in range(n):
print
msg = ["hello", "world", str(random.randint(10,100))]
print "client %s sending :" % id, msg
s.send_multipart(msg)
msg = s.recv_multipart()
print "client %s received:" % id, msg
time.sleep(0.1)
 
if __name__ == '__main__':
st = threading.Thread(target=scheduler)
st.daemon=True
st.start()
engines = []
for i in range(4):
t = threading.Thread(target=engine, args=(i,))
t.daemon=True
t.start()
engines.append(t)
# now start a few clients, and fire off some requests
clients = []
for i in range(3):
t = threading.Thread(target=client, args=(i,12))
t.start()
# remove this t.join() to allow clients to be run concurrently.
# this will work just fine, but the print-statements will
# be harder to follow
t.join()
clients.append(t)

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.