Skip to content

Instantly share code, notes, and snippets.

@lvxejay
Last active July 22, 2021 17:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lvxejay/a0dcb03453d73e8f9afc1d4c914e32ba to your computer and use it in GitHub Desktop.
Save lvxejay/a0dcb03453d73e8f9afc1d4c914e32ba to your computer and use it in GitHub Desktop.
ZMQ Benchmarking
#! /usr/bin/env/python3
# --------------------------------------------------------------------------- HEADER --#
"""
:author:
Jared Webber
:description:
Somewhat crude ZMQ Performance Benchmark and Tests
:license:
Copyright (C) 2021 ForgeXYZ, LLC
"""
# -------------------------------------------------------------------------- IMPORTS --#
import sys
import zmq
from multiprocessing import Process
import numpy as np
import time
# ------------------------------------------------------------------------ FUNCTIONS --#
def array_worker(*args):
context = zmq.Context()
work_receiver = context.socket(zmq.PULL)
work_receiver.connect("tcp://127.0.0.1:5557")
mbits = work_receiver.recv()
deserialized_x = np.frombuffer(mbits, dtype=np.float64).reshape(3840, 2160, 4)
def float_worker(rounds):
context = zmq.Context()
work_receiver = context.socket(zmq.PULL)
work_receiver.connect("tcp://127.0.0.1:5557")
for task_nbr in range(rounds):
mbits = work_receiver.recv()
def mtx_worker(rounds):
context = zmq.Context()
work_receiver = context.socket(zmq.PULL)
work_receiver.connect("tcp://127.0.0.1:5557")
for task_nbr in range(rounds):
mbits = work_receiver.recv()
deserialized_x = np.frombuffer(mbits, dtype=np.float64).reshape(4, 4)
def zmq_vent(worker_func, msg=10.0, rounds=1):
"""Send 10 million messages across the network."""
Process(target=worker_func, args=(rounds,)).start()
context = zmq.Context()
ventilator_send = context.socket(zmq.PUSH)
ventilator_send.bind("tcp://127.0.0.1:5557")
it = 0
while it <= rounds:
ventilator_send.send(msg)
it += 1
return it
def test():
"""Ventillate 1 million messages."""
print("Starting tests...")
# Test 1: Vent 5 million matrices
mtx = np.identity(4, dtype=np.float64)
mtx_bits = mtx.tobytes()
start_time = time.time()
it = zmq_vent(mtx_worker, msg=mtx_bits, rounds=5000000)
end_time = time.time()
duration = end_time - start_time
msg_per_sec = it / duration
print("\n -- Matrix Test --")
print("Duration: %s" % duration)
print("Messages Per Second: %s" % msg_per_sec)
# Test 2: Send a 4k image across the network
arr = np.random.ranf((3840, 2160, 4))
start_time = time.time()
it = zmq_vent(array_worker, msg=arr, rounds=1)
end_time = time.time()
duration = end_time - start_time
msg_per_sec = (3840*2160*4) / duration
print("\n -- Image Test --")
print("Duration: %s" % duration)
print("Pixels per Second: %s " % msg_per_sec)
# Test 3: Send a floats across the network
start_time = time.time()
it = zmq_vent(float_worker, msg=b"10.0", rounds=5000000)
end_time = time.time()
duration = end_time - start_time
msg_per_sec = it / duration
print("\n -- Float Test --")
print("Duration: %s" % duration)
print("Messages Per Second: %s" % msg_per_sec)
print("\n Finished tests, exiting...")
sys.exit(1)
# ------------------------------------------------------------------------ FUNCTIONS --#
if __name__ == '__main__':
test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment