Skip to content

Instantly share code, notes, and snippets.

@minrk
Created January 13, 2021 12:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save minrk/6737ecbf49cfdddcd2422bb3e540a749 to your computer and use it in GitHub Desktop.
Save minrk/6737ecbf49cfdddcd2422bb3e540a749 to your computer and use it in GitHub Desktop.
"""Example of a simple scheduler with pyzmq
"""
import json
import time
from multiprocessing import Process
import zmq
WORKER_URL = "tcp://127.0.0.1:5555"
CLIENT_URL = "tcp://127.0.0.1:5556"
def encode_msg(msg):
"""Serialize a message as json bytes"""
return json.dumps(msg).encode("utf8")
def decode_msg(msg_bytes):
"""decode message bytes encoded by encode_msg"""
return json.loads(msg_bytes.decode("utf8"))
def worker(name):
"""Run a single worker
register with scheduler by sending an initial handshake with our name,
then wait and process requests from clients.
"""
ctx = zmq.Context()
s = ctx.socket(zmq.DEALER)
s.connect(WORKER_URL)
s.send(encode_msg({"msg_type": "handshake", "name": name}))
while True:
client_id, msg_bytes = s.recv_multipart()
msg = decode_msg(msg_bytes)
print(f"{name} handling {msg}")
msg["completed by"] = name
s.send_multipart([client_id, encode_msg(msg)])
def client(name):
"""Run a client, sending requests and receiving replies
request shutdown at the end
"""
ctx = zmq.Context()
s = ctx.socket(zmq.DEALER)
s.connect(CLIENT_URL)
N = 10
for i in range(N):
print(f"sending request {i}")
msg = {"msg_type": "request", "from": name, "target": f"worker-{i}", "id": i}
s.send(encode_msg(msg))
for i in range(N):
reply = decode_msg(s.recv())
print(f"Received reply: {reply}")
s.send(encode_msg({"msg_type": "shutdown"}))
s.close()
ctx.term()
def recv_msg(socket):
"""Receive a message according to our wire protocol"""
msg_parts = socket.recv_multipart()
identities, msg_bytes = msg_parts[:-1], msg_parts[-1]
msg = decode_msg(msg_bytes)
return identities, msg
def scheduler():
"""Run the scheduler, listening for connections from workers and clients"""
ctx = zmq.Context()
workers = {}
worker_socket = ctx.socket(zmq.ROUTER)
worker_socket.bind(WORKER_URL)
client_socket = ctx.socket(zmq.ROUTER)
client_socket.bind(CLIENT_URL)
poller = zmq.Poller()
poller.register(worker_socket, zmq.POLLIN)
poller.register(client_socket, zmq.POLLIN)
while True:
events = dict(poller.poll())
if worker_socket in events:
# message arriving from a worker
identities, msg = recv_msg(worker_socket)
if msg["msg_type"] == "handshake":
# handshake messages store identity in a lookup dict
workers[msg["name"]] = identities[0]
print(f"Registered new worker {msg['name']}: {identities[0]}")
else:
# relay reply to client
worker_identity, client_identity = identities
client_socket.send_multipart([client_identity, encode_msg(msg)])
if client_socket in events:
# message arriving from a client
identities, msg = recv_msg(client_socket)
if msg["msg_type"] == "shutdown":
# shutdown message halts the scheduler
print("Shutting down...")
break
# lookup target in workers dict
target_worker = msg["target"]
worker_identity = workers.get(target_worker, None)
if worker_identity:
# target exists, forward message preserving client id for
# later reply relay
print(f"Forwarding {msg['msg_type']} to {target_worker}")
worker_socket.send_multipart(
[worker_identity] + identities + [encode_msg(msg)]
)
else:
# no valid target, send back a reply directly
print(f"No such worker: {target_worker}")
msg["completed by"] = "nobody"
client_socket.send_multipart(identities + [encode_msg(msg)])
worker_socket.close(linger=0)
client_socket.close(linger=0)
ctx.term()
def main():
sched = Process(target=scheduler, daemon=True)
sched.start()
workers = [
Process(target=worker, args=(f"worker-{i}",), daemon=True) for i in range(5)
]
[w.start() for w in workers]
time.sleep(1)
client("theclient")
sched.join()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment