Last active
July 29, 2022 13:59
-
-
Save valsdav/ace92743e63da3e572d11dc312d49cdc to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import multiprocessing as mp | |
def multiprocessor_generator(iterable, heavy_processing, input_queue_size=10, output_queue_size=20, nworkers=2): | |
def gen_to_queue(input_q, iterable): | |
# This function simply consume our generator and write it to the input queue | |
for it in iterable: | |
input_q.put(it) | |
for _ in range(nworkers): # Once generator is consumed, send end-signal | |
input_q.put(None) | |
def process(input_q, output_q): | |
# Change the random seed for each processor | |
pid = mp.current_process().pid | |
np.random.seed() | |
while True: | |
it = input_q.get() | |
if it is None: | |
output_q.put(None) | |
break | |
output_q.put(heavy_processing(it)) | |
input_q = mp.Queue(maxsize=input_queue_size) | |
output_q = mp.Queue(maxsize=output_queue_size) | |
# Here we need 3 groups of worker : | |
# * One that will consume the input generator and put it into a queue. It will be `gen_pool`. It's ok to have only 1 process doing this, since this is a very light task | |
# * One that do the main processing. It will be `pool`. | |
# * One that read the results and yield it back, to keep it as a generator. The main thread will do it. | |
gen_pool = mp.Pool(1, initializer=gen_to_queue, initargs=(input_q, iterable)) | |
pool = mp.Pool(nworkers, initializer=process, initargs=(input_q, output_q)) | |
try : | |
finished_workers = 0 | |
while True: | |
it = output_q.get() | |
if it is None: | |
finished_workers += 1 | |
if finished_workers == nworkers: | |
break | |
else: | |
yield it | |
finally: | |
# This is called at GeneratorExit | |
print("Closing multiprocessing generator") | |
gen_pool.close() | |
pool.close() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Added a limit also on the output queue size to avoid to accumulate too much memory in case the consumer is slow...