Skip to content

Instantly share code, notes, and snippets.

@jul
Created May 29, 2024 11:56
Show Gist options
  • Save jul/40a45dbb7d8570051297cb4b5869d6dd to your computer and use it in GitHub Desktop.
Save jul/40a45dbb7d8570051297cb4b5869d6dd to your computer and use it in GitHub Desktop.
self saturing distributed torture test
import time
import random
#from threading import Thread
from multiprocessing import Process as Thread
# straight out of https://zguide.zeromq.org/docs/chapter3/ example
import zmq
NBR_WORKERS = 128
context = zmq.Context.instance()
client = context.socket(zmq.ROUTER)
client.bind("tcp://*:5671")
def worker_thread():
global mon_raw, monotonic, up_raw
context = zmq.Context.instance()
worker = context.socket(zmq.REQ)
worker.connect("tcp://localhost:5671")
# We use a string identity for ease here
worker.setsockopt_string(zmq.IDENTITY, str(worker))
total = 0
while True:
# Tell the router we're ready for work
workload=""
try:
a=time.time()
worker.send(b"ready")
workload=worker.recv()
now0 = float(workload)
timer=[now0,time.time()-now0,]
timer+=[ time.clock_gettime(x) for x in range(0,7)]
timer+=[ time.clock_gettime(11),]
b=time.time()
worker.send((",".join(map(str,timer))).encode("utf8") + f",{b-a}".encode("utf8"))
except Exception as e:
# print(e)
worker.send(b"")
if workload==b"END":
break;
workload=worker.recv()
total += 1
for _ in range(NBR_WORKERS):
Thread(target=worker_thread).start()
# Now ask mama to shut down and report their results
for _ in range(NBR_WORKERS*100000):
address, empty, ready = client.recv_multipart()
#print(ready)
client.send_multipart([
address,
b'',
str(time.time()).encode("utf8")
])
address, empty, ready = client.recv_multipart()
client.send_multipart([
address,
b'',
b'',
])
if ready.strip()!=b"ready" and ready.strip():
print(ready.decode("utf8"))
for _ in range(NBR_WORKERS):
address, empty, ready = client.recv_multipart()
#print(ready)
client.send_multipart([
address,
b'',
str(time.time()).encode("utf8")
])
address, empty, ready = client.recv_multipart()
client.send_multipart([
address,
b'',
b'END',
])
if ready.strip()!=b"ready" and ready.strip():
print(ready.decode("utf8"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment