Skip to content

Instantly share code, notes, and snippets.

@ecolss
Last active December 9, 2023 23:43
Show Gist options
  • Save ecolss/fe0557bfe362b83deb2bbcba0f0f6738 to your computer and use it in GitHub Desktop.
Save ecolss/fe0557bfe362b83deb2bbcba0f0f6738 to your computer and use it in GitHub Desktop.
zeromq router-to-router example code
import multiprocessing as mp
import random
import sys
import zmq
def worker_proc(worker_addr, id_, proc_factory, proc_kwargs):
ctx = zmq.Context()
sock = ctx.socket(zmq.REQ)
sock.identity = f"woker_{id_}".encode()
sock.connect(worker_addr)
proc = proc_factory(**proc_kwargs)
sock.send(b"READY")
while True:
try:
addr, _, *data = sock.recv_multipart()
res = proc(data)
sock.send_multipart([addr, b"", *res])
except Exception as err:
print(f"Exception caught, {err}")
sock.send_multipart([addr, b"", f"Exception: {err}".encode()])
def client_proc(client_addr, id_):
ctx = zmq.Context()
sock = ctx.socket(zmq.REQ)
sock.connect(client_addr)
sock.send(b"HELLO")
res = sock.recv()
print(f"client {id_} recv: {res}")
def run_r2r_poller(worker_addr, client_addr):
context = zmq.Context.instance()
frontend = context.socket(zmq.ROUTER)
frontend.bind(client_addr)
backend = context.socket(zmq.ROUTER)
backend.bind(worker_addr)
workers = []
poller = zmq.Poller()
poller.register(backend, zmq.POLLIN)
poller.register(frontend, zmq.POLLIN)
while True:
sockets = dict(poller.poll())
if backend in sockets:
worker, _, *data = backend.recv_multipart()
workers.append(worker)
if data[0] != b"READY":
frontend.send_multipart([client, b"", *data])
if frontend in sockets and len(workers):
client, _, *data = frontend.recv_multipart()
worker = workers.pop(0)
backend.send_multipart([worker, b"", client, b"", *data])
def factory(**kwargs):
def dummy_fn(x):
return [b"DONE"]
return dummy_fn
def main(mode):
worker_addr = "ipc://workers"
client_addr = "ipc://clients"
if mode == 0:
clients = [
mp.Process(target=client_proc, args=(client_addr, i))
for i in range(2)
]
[el.start() for el in clients]
elif mode == 1:
workers = [
mp.Process(
target=worker_proc,
args=(worker_addr, i, factory, {}),
)
for i in range(2)
]
[el.start() for el in workers]
elif mode == 2:
run_r2r_poller(worker_addr, client_addr)
if __name__ == "__main__":
mode = int(sys.argv[1])
main(mode)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment