Skip to content

Instantly share code, notes, and snippets.

@chancyk
Created January 13, 2017 18:45
Show Gist options
  • Save chancyk/41f1028cb444a9ffed404c5fc9fe33c3 to your computer and use it in GitHub Desktop.
Save chancyk/41f1028cb444a9ffed404c5fc9fe33c3 to your computer and use it in GitHub Desktop.
PyZMQ Threaded Load-Balancing Broker
"""
Threaded Load-balancing Broker
"""
from __future__ import print_function
from threading import Thread
import zmq
NBR_CLIENTS = 10
NBR_WORKERS = 3
BACKEND_ADDRESS = "tcp://127.0.0.1:8001"
# BACKEND_ADDRESS = "inproc://backend"
def client_task(ident):
"""Basic request-reply client using REQ socket."""
socket = zmq.Context().socket(zmq.REQ)
socket.identity = u"Client-{}".format(ident).encode("ascii")
socket.connect("tcp://127.0.0.1:8000")
# Send request, get reply
socket.send(b"HELLO")
reply = socket.recv()
print("{}: {}".format(socket.identity.decode("ascii"),
reply.decode("ascii")))
def worker_task(ident):
"""Worker task, using a REQ socket to do load-balancing."""
socket = zmq.Context().socket(zmq.REQ)
socket.identity = u"Worker-{}".format(ident).encode("ascii")
socket.connect(BACKEND_ADDRESS)
# Tell broker we're ready for work
socket.send(b"READY")
while True:
address, empty, request = socket.recv_multipart()
print("{}: {}".format(socket.identity.decode("ascii"),
request.decode("ascii")))
socket.send_multipart([address, b"", b"OK"])
def main():
"""Load balancer main loop."""
# Prepare context and sockets
context = zmq.Context.instance()
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://127.0.0.1:8000")
backend = context.socket(zmq.ROUTER)
backend.bind(BACKEND_ADDRESS)
# Start background tasks
def start(task, *args):
worker = Thread(target=task, args=args)
worker.daemon = True
worker.start()
for i in range(NBR_CLIENTS):
start(client_task, i)
for i in range(NBR_WORKERS):
start(worker_task, i)
# Initialize main loop state
count = NBR_CLIENTS
workers = []
poller = zmq.Poller()
# Only poll for requests from backend until workers are available
poller.register(backend, zmq.POLLIN)
while True:
sockets = dict(poller.poll())
if backend in sockets:
# Handle worker activity on the backend
request = backend.recv_multipart()
worker, empty, client = request[:3]
if not workers:
# Poll for clients now that a worker is available
poller.register(frontend, zmq.POLLIN)
workers.append(worker)
if client != b"READY" and len(request) > 3:
# If client reply, send rest back to frontend
empty, reply = request[3:]
frontend.send_multipart([client, b"", reply])
count -= 1
if not count:
break
if frontend in sockets:
# Get next client request, route to last-used worker
client, empty, request = frontend.recv_multipart()
worker = workers.pop(0)
backend.send_multipart([worker, b"", client, b"", request])
if not workers:
# Don't poll clients if no workers are available
poller.unregister(frontend)
# Clean up
backend.close()
frontend.close()
context.term()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment