Skip to content

Instantly share code, notes, and snippets.

@giulioungaretti
Created November 29, 2016 15:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save giulioungaretti/42871a75a3bf894b7c8458cfa7d20c9c to your computer and use it in GitHub Desktop.
Save giulioungaretti/42871a75a3bf894b7c8458cfa7d20c9c to your computer and use it in GitHub Desktop.
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# Copyright © 2016 unga <giulioungaretti@me.com>
#
# Distributed under terms of the MIT license.
"""
Benchamrk mp queues
"""
import argparse
import multiprocessing as mp
import sys
import time
timer = time.perf_counter
def mp_test(qin, qout, qglobal, out_size):
"""
simple test that keeps a process running until asked to stop,
"""
delays = []
data = (b'o' * out_size)
while True:
item, qtime = qin.get()
delays.append(timer() - qtime)
if item == 'break':
qglobal.put({
'avg': sum(delays) / len(delays),
'max': max(delays)
})
break
qout.put(data)
def bench(reps, in_size, size, method):
"""
Args:
reps(int) number of message
"""
data_in = b"i"*in_size
# how many bytes (not including python overhead)
output_size = size
input_size = in_size
mp.set_start_method(method)
resp_delays = []
# NOTE these are infite queues
# the upper bound and their size/emptynes/fullness are
# not reialbe numbers
qglobal = mp.Queue()
qin = mp.Queue()
qout = mp.Queue()
p = mp.Process(target=mp_test, args=(qin, qout, qglobal, size))
p.start()
print("sleep 1 let process start")
time.sleep(1)
for i in range(reps):
t1 = timer()
qin.put((data_in, timer()))
qout.get()
resp_delays.append((timer() - t1) * 1000)
qin.put(('break', timer()))
p.join()
delays = qglobal.get()
avg_delay = delays['avg'] * 1000
max_delay = delays['max'] * 1000
avg_resp_delay = sum(resp_delays) / len(resp_delays)
max_resp_delay = max(resp_delays)
print('Milliseconds to receive to queue request of \
{} byte'.format(input_size))
print(' avg: {:.6f}'.format(avg_delay))
print(' max: {:.6f}\n'.format(max_delay))
print('Milliseconds to respond to queue request of \
{} byte'.format(output_size))
print(' avg: {:.6f}'.format(avg_resp_delay))
print(' max: {:.6f}\n'.format(max_resp_delay))
def parse_args(argv=None):
parser = argparse.ArgumentParser(description='Run a performance test')
parser.add_argument('-i', '--input', type=int, default=10240,
help='size (in bytes) of the test input message')
parser.add_argument('-o', '--output', type=int, default=10240,
help='magintude test output message')
parser.add_argument('-c', '--count', type=int, default=10240,
help='number of test messages to send')
parser.add_argument('-m', '--method', type=str, default='spawn',
help='multiprocessing method to use')
return parser.parse_args(argv)
if __name__ == "__main__":
args = parse_args()
print(args)
bench(args.count, args.input, args.output, args.method)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment