Skip to content

Instantly share code, notes, and snippets.

@140am
Last active June 26, 2023 14:46
Show Gist options
  • Save 140am/ca661b9a4fca550f9554 to your computer and use it in GitHub Desktop.
Save 140am/ca661b9a4fca550f9554 to your computer and use it in GitHub Desktop.
Simple Python / ØMQ IPC (Inter Process Communication) performance benchmark
""" Simple IPC benchmark test
Test throughput of 512 KB messages sent between two python processes using:
- multiprocessing pipe
- zeroMQ PUSH/PULL
- zeroMQ DEALER/DEALER
Result:
2014-05-20 16:16:32,782 INFO 11612 ipc - Running multiprocessing Pipe() benchmark:
2014-05-20 16:16:33,871 INFO 11612 ipc Sending 10000 numbers to Pipe() took 1.08912396431 seconds
2014-05-20 16:16:44,629 INFO 11612 ipc Sending 100000 numbers to Pipe() took 10.7582190037 seconds
2014-05-20 16:18:32,023 INFO 11612 ipc Sending 1000000 numbers to Pipe() took 107.393200874 seconds
2014-05-20 16:18:35,026 INFO 11612 ipc - Running ZMQ() DEALER/DEALER benchmark:
2014-05-20 16:18:36,151 INFO 11612 ipc Sending 10000 numbers via ZMQ() took 1.12546110153 seconds
2014-05-20 16:18:45,249 INFO 11612 ipc Sending 100000 numbers via ZMQ() took 9.09737110138 seconds
2014-05-20 16:20:32,940 INFO 11612 ipc Sending 1000000 numbers via ZMQ() took 107.690907001 seconds
2014-05-20 16:20:35,943 INFO 11612 ipc - Running ZMQ() PUSH/PULL benchmark:
2014-05-20 16:20:36,980 INFO 11612 ipc Sending 10000 numbers via ZMQ() took 1.03703999519 seconds
2014-05-20 16:20:48,757 INFO 11612 ipc Sending 100000 numbers via ZMQ() took 11.7766180038 seconds
2014-05-20 16:22:57,251 INFO 11612 ipc Sending 1000000 numbers via ZMQ() took 128.493817091 seconds
"""
from gevent import monkey
monkey.patch_all(thread=False, socket=False)
import multiprocessing
import time
import zmq
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)-15s %(levelname)-8s %(process)d %(module)s %(message)s'
)
log = logging.getLogger(__name__)
# multiprocessing Pipe
def pipe_reader(pipe, count):
output_p, input_p = pipe
input_p.close() # We are only reading
cc = 0
while cc < count:
try:
msg = output_p.recv()
except EOFError:
break
else:
cc += 1
def pipe_writer(count, input_p):
message = 'x'*512*1024
for ii in xrange(0, count):
input_p.send(message)
# ZeroMQ DEALER/DEALER
class ZmqReaderDealer(multiprocessing.Process):
def __init__(self, count):
super(ZmqReaderDealer, self).__init__()
self.count = count
self.cc = 0
def run(self):
ctx = zmq.Context()
sub = ctx.socket(zmq.DEALER)
#sub.setsockopt(zmq.RCVHWM, 1)
#sub.setsockopt(zmq.LINGER, 1)
sub.bind('ipc:///tmp/zmqtest')
while self.cc < self.count:
self.cc += 1
msg = sub.recv(copy=False)
sub.close()
ctx.term()
class ZMQWriterDealer(multiprocessing.Process):
def __init__(self, count):
super(ZMQWriterDealer, self).__init__()
self.count = count
self.cc = 0
def run(self):
ctx = zmq.Context()
pub = ctx.socket(zmq.DEALER)
#pub.setsockopt(zmq.SNDHWM, 1)
#pub.setsockopt(zmq.LINGER, 1)
pub.connect('ipc:///tmp/zmqtest')
time.sleep(0.1)
message = 'x'*512*1024
for ii in xrange(0, self.count):
self.cc += 1
pub.send(message, copy=False)
pub.close()
ctx.term()
# ZeroMQ ROUTER/ROUTER
class ZmqReaderRouter(multiprocessing.Process):
def __init__(self, count):
super(ZmqReaderRouter, self).__init__()
self.count = count
self.cc = 0
def run(self):
ctx = zmq.Context()
sub = ctx.socket(zmq.ROUTER)
#sub.setsockopt(zmq.RCVHWM, 1)
#sub.setsockopt(zmq.LINGER, 1)
sub.setsockopt(zmq.IDENTITY, 'reader')
sub.bind('ipc:///tmp/zmqtest')
while self.cc < self.count:
self.cc += 1
msg = sub.recv(copy=False)
sub.close()
ctx.term()
class ZMQWriterRouter(multiprocessing.Process):
def __init__(self, count):
super(ZMQWriterRouter, self).__init__()
self.count = count
self.cc = 0
def run(self):
ctx = zmq.Context()
pub = ctx.socket(zmq.ROUTER)
#pub.setsockopt(zmq.SNDHWM, 1)
#pub.setsockopt(zmq.LINGER, 1)
pub.setsockopt(zmq.IDENTITY, 'writer')
pub.connect('ipc:///tmp/zmqtest')
time.sleep(0.1)
message = 'x'*512*1024
for ii in xrange(0, self.count):
self.cc += 1
pub.send_multipart(['reader', message], copy=False)
pub.close()
ctx.term()
# ZeroMQ PUSH/PULL
class ZmqReaderPull(multiprocessing.Process):
def __init__(self, count):
super(ZmqReaderPull, self).__init__()
self.count = count
self.cc = 0
def run(self):
ctx = zmq.Context()
sub = ctx.socket(zmq.PULL)
#sub.setsockopt(zmq.RCVHWM, 1)
#sub.setsockopt(zmq.LINGER, 1)
sub.bind('ipc:///tmp/zmqtest')
while self.cc < self.count:
self.cc += 1
msg = sub.recv(copy=False)
sub.close()
ctx.term()
class ZMQWriterPush(multiprocessing.Process):
def __init__(self, count):
super(ZMQWriterPush, self).__init__()
self.count = count
self.cc = 0
def run(self):
ctx = zmq.Context()
pub = ctx.socket(zmq.PUSH)
#pub.setsockopt(zmq.SNDHWM, 1)
#pub.setsockopt(zmq.LINGER, 1)
pub.connect('ipc:///tmp/zmqtest')
time.sleep(0.1)
message = 'x'*512*1024
for ii in xrange(0, self.count):
self.cc += 1
pub.send(message, copy=False)
pub.close()
ctx.term()
if __name__=='__main__':
time.sleep(3)
log.info('- Running multiprocessing Pipe() benchmark:')
for count in [10**4, 10**5, 10**6]:
_start = time.time()
output_p, input_p = multiprocessing.Pipe()
reader_p = multiprocessing.Process(target=pipe_reader, args=((output_p, input_p),count,))
reader_p.start()
output_p.close()
log.info('Sending %i requests' % count)
pipe_writer(count, input_p)
# Ask the reader to stop when it reads EOF
input_p.close()
reader_p.join()
log.info(
'Sending %s numbers to Pipe() took %s seconds' % (
count, (time.time() - _start)
))
time.sleep(3)
log.info('- Running ZMQ() DEALER/DEALER benchmark:')
for count in [10**4, 10**5, 10**6]:
_start = time.time()
reader_p = ZmqReaderDealer(count=count)
reader_p.start()
time.sleep(0.1)
log.info('Sending %i ZMQ requests' % count)
zmq_writer = ZMQWriterDealer(count=count)
zmq_writer.start()
reader_p.join()
log.info(
'Sending %s numbers via ZMQ() took %s seconds' % (
count, (time.time() - _start)
))
time.sleep(3)
log.info('- Running ZMQ() ROUTER/ROUTER benchmark:')
for count in [10**4, 10**5, 10**6]:
_start = time.time()
reader_p = ZmqReaderRouter(count=count)
reader_p.start()
time.sleep(0.1)
log.info('Sending %i ZMQ requests' % count)
zmq_writer = ZMQWriterRouter(count=count)
zmq_writer.start()
reader_p.join()
log.info(
'Sending %s numbers via ZMQ() took %s seconds' % (
count, (time.time() - _start)
))
time.sleep(3)
log.info('- Running ZMQ() PUSH/PULL benchmark:')
for count in [10**4, 10**5, 10**6]:
_start = time.time()
reader_p = ZmqReaderPull(count=count)
reader_p.start()
time.sleep(0.1)
log.info('Sending %i ZMQ requests' % count)
zmq_writer = ZMQWriterPush(count=count)
zmq_writer.start()
reader_p.join()
log.info(
'Sending %s numbers via ZMQ() took %s seconds' % (
count, (time.time() - _start)
))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment