Skip to content

Instantly share code, notes, and snippets.

@drdavella
Created December 19, 2017 18:54
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save drdavella/87a903fdd769711ad7382f072aafde95 to your computer and use it in GitHub Desktop.
Save drdavella/87a903fdd769711ad7382f072aafde95 to your computer and use it in GitHub Desktop.
Simple benchmark for testing data transfer between processes using Python's multiprocess module
#!/usr/bin/env python
from argparse import ArgumentParser
from multiprocessing import Process, Array, Pipe, Queue
from timeit import default_timer as timer
import numpy as np
def process_with_array(dest, source):
input_data = np.array(source, copy=False, dtype=float)
dest[:] = input_data**2
def test_with_array(size):
source = np.random.random(size)
# Use lock=False since each array is only single-reader/single-writer
source_array = Array('d', source, lock=False)
dest_array = Array('d', np.empty(size), lock=False)
p = Process(target=process_with_array, args=(dest_array, source_array))
start = timer()
p.start()
p.join()
end = timer()
np.testing.assert_allclose(source**2, dest_array)
return end - start
def process_with_queue(input_queue, output_queue):
source = input_queue.get()
dest = source**2
output_queue.put(dest)
def test_with_queue(size):
source = np.random.random(size)
input_queue = Queue()
output_queue = Queue()
p = Process(target=process_with_queue, args=(input_queue, output_queue))
start = timer()
p.start()
input_queue.put(source)
result = output_queue.get()
end = timer()
np.testing.assert_allclose(source**2, result)
return end - start
def process_with_pipe(child):
source = child.recv()
dest = source**2
child.send(dest)
def test_with_pipe(size):
source = np.random.random(size)
parent, child = Pipe()
p = Process(target=process_with_pipe, args=(child, ))
start = timer()
p.start()
parent.send(source)
result = parent.recv()
end = timer()
np.testing.assert_allclose(source**2, result)
return end - start
def time_report_str(iters, time):
average = time / iters
return f"time for {iters} iters: total={time:1.5}s, avg={average:01.5}s"
def main():
p = ArgumentParser()
p.add_argument('-i', '--iters', dest='iters', type=int, default=20,
help="Number of iterations to run for benchmark")
p.add_argument('-s', '--size', dest='size', type=int, default=1000,
help="Size of data array to use for benchmark")
args = p.parse_args()
assert args.size > 0, "Size must be an integer >= 0"
assert args.iters > 0, "Iters must be an integer >= 0"
time = 0
for i in range(args.iters):
time += test_with_array(args.size)
print("Using mp.Array: " + time_report_str(args.iters, time))
time = 0
for i in range(args.iters):
time += test_with_queue(args.size)
print("Using mp.Queue: " + time_report_str(args.iters, time))
time = 0
for i in range(args.iters):
time += test_with_pipe(args.size)
print("Using mp.Pipe: " + time_report_str(args.iters, time))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment