Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import os
import time
import queue
import multiprocessing
def work(data, i, j):
return i, j, data[i] + data[j]
def worker_queue(worker_index, data, q, rq):
"""calculate i + j of (i, j) in queue q
Parameters
----------
worker_index : int
data : list
q : multiprocessing.Queue
queue for get input
rq : multiprocessing.Queue
queue for store results
"""
print(f'start worker_index {worker_index}')
timeout = 1
while True:
try:
i, j = q.get(timeout=timeout)
_, _, result = work(data, i, j)
rq.put((i, j, result))
except queue.Empty:
break
return
def main_queue(num_worker, N):
"""test code of multiprocess using Queue()
parallel compute i + j for any i, j in 1..M
Parameters
----------
num_worker : int
the number of workers
N : int
"""
# heavy object
data = [i for i in range(N)]
# queue for input
M = 10
q = multiprocessing.Queue()
for i in range(M):
for j in range(M):
q.put((i, j))
# queue for output ( result queue )
rq = multiprocessing.Queue()
# worker factory
def create_worker(worker_index):
return multiprocessing.Process(
target=worker_queue,
args=(worker_index, data, q, rq))
# create workers
processes = list()
timeout = 1
worker_index = 1
for worker_index in range(1, num_worker):
try:
i, j = q.get(timeout=timeout)
q.put((i, j))
process = create_worker(worker_index)
process.start()
processes.append(process)
except queue.Empty:
break
# current process work, too
worker_index = 0
worker_queue(worker_index, data, q, rq)
# wait all workers
for process in processes:
process.join()
# get results
results = dict()
while True:
try:
(i, j, result) = rq.get(timeout=timeout)
results[i, j] = result
except queue.Empty:
break
return results
def main_pool(num_worker, N):
"""test code of multiprocess using Pool
parallel compute i + j for any i, j in 1..M
Parameters
----------
num_worker : int
the number of workers
N : int
"""
# heavy object
data = [i for i in range(N)]
M = 10
args = [
(data, i, j) for i in range(M) for j in range(M)
]
with multiprocessing.Pool(processes=num_worker-1) as pool:
results_list = pool.starmap(work, args)
# get results
results = dict()
for i, j, result in results_list:
results[i, j] = result
return results
if __name__ == '__main__':
num_worker = 3
N = int(1e8)
start_time = time.time()
main_queue(num_worker, N)
elapsed_time = time.time() - start_time
print(f'queue: elapsed time = {elapsed_time}')
start_time = time.time()
main_pool(num_worker, N)
elapsed_time = time.time() - start_time
print(f'map: elapsed time = {elapsed_time}')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment