Last active
July 19, 2021 22:45
-
-
Save nariaki3551/6e5ba621bb79ac12cb6ce6e16ebbfa9a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import time | |
import queue | |
import multiprocessing | |
def work(data, i, j): | |
return i, j, data[i] + data[j] | |
def worker_queue(worker_index, data, q, rq): | |
"""calculate i + j of (i, j) in queue q | |
Parameters | |
---------- | |
worker_index : int | |
data : list | |
q : multiprocessing.Queue | |
queue for get input | |
rq : multiprocessing.Queue | |
queue for store results | |
""" | |
print(f'start worker_index {worker_index}') | |
timeout = 1 | |
while True: | |
try: | |
i, j = q.get(timeout=timeout) | |
_, _, result = work(data, i, j) | |
rq.put((i, j, result)) | |
except queue.Empty: | |
break | |
return | |
def main_queue(num_worker, N): | |
"""test code of multiprocess using Queue() | |
parallel compute i + j for any i, j in 1..M | |
Parameters | |
---------- | |
num_worker : int | |
the number of workers | |
N : int | |
""" | |
# heavy object | |
data = [i for i in range(N)] | |
# queue for input | |
M = 10 | |
q = multiprocessing.Queue() | |
for i in range(M): | |
for j in range(M): | |
q.put((i, j)) | |
# queue for output ( result queue ) | |
rq = multiprocessing.Queue() | |
# worker factory | |
def create_worker(worker_index): | |
return multiprocessing.Process( | |
target=worker_queue, | |
args=(worker_index, data, q, rq)) | |
# create workers | |
processes = list() | |
timeout = 1 | |
worker_index = 1 | |
for worker_index in range(1, num_worker): | |
try: | |
i, j = q.get(timeout=timeout) | |
q.put((i, j)) | |
process = create_worker(worker_index) | |
process.start() | |
processes.append(process) | |
except queue.Empty: | |
break | |
# current process work, too | |
worker_index = 0 | |
worker_queue(worker_index, data, q, rq) | |
# wait all workers | |
for process in processes: | |
process.join() | |
# get results | |
results = dict() | |
while True: | |
try: | |
(i, j, result) = rq.get(timeout=timeout) | |
results[i, j] = result | |
except queue.Empty: | |
break | |
return results | |
def main_pool(num_worker, N): | |
"""test code of multiprocess using Pool | |
parallel compute i + j for any i, j in 1..M | |
Parameters | |
---------- | |
num_worker : int | |
the number of workers | |
N : int | |
""" | |
# heavy object | |
data = [i for i in range(N)] | |
M = 10 | |
args = [ | |
(data, i, j) for i in range(M) for j in range(M) | |
] | |
with multiprocessing.Pool(processes=num_worker-1) as pool: | |
results_list = pool.starmap(work, args) | |
# get results | |
results = dict() | |
for i, j, result in results_list: | |
results[i, j] = result | |
return results | |
if __name__ == '__main__': | |
num_worker = 3 | |
N = int(1e8) | |
start_time = time.time() | |
main_queue(num_worker, N) | |
elapsed_time = time.time() - start_time | |
print(f'queue: elapsed time = {elapsed_time}') | |
start_time = time.time() | |
main_pool(num_worker, N) | |
elapsed_time = time.time() - start_time | |
print(f'map: elapsed time = {elapsed_time}') | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment