Created
December 19, 2017 18:54
-
-
Save drdavella/87a903fdd769711ad7382f072aafde95 to your computer and use it in GitHub Desktop.
Simple benchmark for testing data transfer between processes using Python's multiprocess module
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from argparse import ArgumentParser | |
from multiprocessing import Process, Array, Pipe, Queue | |
from timeit import default_timer as timer | |
import numpy as np | |
def process_with_array(dest, source): | |
input_data = np.array(source, copy=False, dtype=float) | |
dest[:] = input_data**2 | |
def test_with_array(size): | |
source = np.random.random(size) | |
# Use lock=False since each array is only single-reader/single-writer | |
source_array = Array('d', source, lock=False) | |
dest_array = Array('d', np.empty(size), lock=False) | |
p = Process(target=process_with_array, args=(dest_array, source_array)) | |
start = timer() | |
p.start() | |
p.join() | |
end = timer() | |
np.testing.assert_allclose(source**2, dest_array) | |
return end - start | |
def process_with_queue(input_queue, output_queue): | |
source = input_queue.get() | |
dest = source**2 | |
output_queue.put(dest) | |
def test_with_queue(size): | |
source = np.random.random(size) | |
input_queue = Queue() | |
output_queue = Queue() | |
p = Process(target=process_with_queue, args=(input_queue, output_queue)) | |
start = timer() | |
p.start() | |
input_queue.put(source) | |
result = output_queue.get() | |
end = timer() | |
np.testing.assert_allclose(source**2, result) | |
return end - start | |
def process_with_pipe(child): | |
source = child.recv() | |
dest = source**2 | |
child.send(dest) | |
def test_with_pipe(size): | |
source = np.random.random(size) | |
parent, child = Pipe() | |
p = Process(target=process_with_pipe, args=(child, )) | |
start = timer() | |
p.start() | |
parent.send(source) | |
result = parent.recv() | |
end = timer() | |
np.testing.assert_allclose(source**2, result) | |
return end - start | |
def time_report_str(iters, time): | |
average = time / iters | |
return f"time for {iters} iters: total={time:1.5}s, avg={average:01.5}s" | |
def main(): | |
p = ArgumentParser() | |
p.add_argument('-i', '--iters', dest='iters', type=int, default=20, | |
help="Number of iterations to run for benchmark") | |
p.add_argument('-s', '--size', dest='size', type=int, default=1000, | |
help="Size of data array to use for benchmark") | |
args = p.parse_args() | |
assert args.size > 0, "Size must be an integer >= 0" | |
assert args.iters > 0, "Iters must be an integer >= 0" | |
time = 0 | |
for i in range(args.iters): | |
time += test_with_array(args.size) | |
print("Using mp.Array: " + time_report_str(args.iters, time)) | |
time = 0 | |
for i in range(args.iters): | |
time += test_with_queue(args.size) | |
print("Using mp.Queue: " + time_report_str(args.iters, time)) | |
time = 0 | |
for i in range(args.iters): | |
time += test_with_pipe(args.size) | |
print("Using mp.Pipe: " + time_report_str(args.iters, time)) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment