Skip to content

Instantly share code, notes, and snippets.

@rvprasad
Last active October 20, 2017 22:34
Show Gist options
  • Save rvprasad/059b08bb242af1df8dbb1286d329e0a1 to your computer and use it in GitHub Desktop.
Save rvprasad/059b08bb242af1df8dbb1286d329e0a1 to your computer and use it in GitHub Desktop.
Illustrates how the performance of multiprocessing.Process changes inside and outside of multiprocessing.Pool in Python.
# Python -- v3.6
import begin
import multiprocessing
import time
def worker(varying_data, fixed_data):
t = 0
for j in range(1, 10000):
t += len(varying_data)
return t
def custom_pool_worker_wrapper(args1, args2, results):
fixed_data = args1.get()
args1.task_done()
while True:
varying_data = args2.get()
args2.task_done()
result = worker(varying_data, fixed_data)
results.put(result)
@begin.subcommand
def custom_pool():
iterations = 11
for i in range(50, 83, 8):
start_time = time.time()
fixed_data = [i] * int(pow(10, i/10))
args1 = multiprocessing.JoinableQueue()
args2 = multiprocessing.JoinableQueue()
results = multiprocessing.JoinableQueue()
procs = []
for _ in range(1, 5):
proc = multiprocessing.Process(target=custom_pool_worker_wrapper,
args=(args1, args2, results))
proc.start()
args1.put(fixed_data)
procs.append(proc)
args1.close()
tmp = 0
data = [1] * 100
for i in range(1, iterations):
tmp = 0
for _ in range(1, 101):
args2.put(data)
for _ in range(1, 101):
tmp += results.get()
results.task_done()
args2.close()
args2.join()
results.close()
results.join()
for proc in procs:
proc.terminate()
end_time = time.time()
secs_per_iteration = (end_time - start_time) / iterations
print("fixed_data {0:>13,} ints : {1:>6.6f} secs per iteration {2}"
.format(len(fixed_data), secs_per_iteration, tmp))
fixed_data = None
def initializer(init_data):
global fixed_data
fixed_data = init_data
def builtin_pool_worker_wrapper(varying_data):
return worker(varying_data, fixed_data)
@begin.subcommand
def builtin_pool():
iterations = 11
for i in range(50, 83, 8):
start_time = time.time()
fixed_data = [i] * int(pow(10, i/10))
pool = multiprocessing.Pool(4, initializer, (fixed_data,))
data = [[1] * 100] * 100
tmp = 0
for i in range(1, iterations):
tmp = sum(pool.map(builtin_pool_worker_wrapper, data))
pool.close()
pool.join()
pool.terminate()
end_time = time.time()
secs_per_iteration = (end_time - start_time) / iterations
print("fixed_data {0:>13,} ints : {1:>6.6f} seconds per iteration {2}"
.format(len(fixed_data), secs_per_iteration, tmp))
@begin.start
def entry():
pass
@rvprasad
Copy link
Author

Related gist: test_process_graph.gp

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment