Skip to content

Instantly share code, notes, and snippets.

@valsdav
Last active July 29, 2022 13:59
Show Gist options
  • Save valsdav/ace92743e63da3e572d11dc312d49cdc to your computer and use it in GitHub Desktop.
Save valsdav/ace92743e63da3e572d11dc312d49cdc to your computer and use it in GitHub Desktop.
import multiprocessing as mp
def multiprocessor_generator(iterable, heavy_processing, input_queue_size=10, output_queue_size=20, nworkers=2):
def gen_to_queue(input_q, iterable):
# This function simply consume our generator and write it to the input queue
for it in iterable:
input_q.put(it)
for _ in range(nworkers): # Once generator is consumed, send end-signal
input_q.put(None)
def process(input_q, output_q):
# Change the random seed for each processor
pid = mp.current_process().pid
np.random.seed()
while True:
it = input_q.get()
if it is None:
output_q.put(None)
break
output_q.put(heavy_processing(it))
input_q = mp.Queue(maxsize=input_queue_size)
output_q = mp.Queue(maxsize=output_queue_size)
# Here we need 3 groups of worker :
# * One that will consume the input generator and put it into a queue. It will be `gen_pool`. It's ok to have only 1 process doing this, since this is a very light task
# * One that do the main processing. It will be `pool`.
# * One that read the results and yield it back, to keep it as a generator. The main thread will do it.
gen_pool = mp.Pool(1, initializer=gen_to_queue, initargs=(input_q, iterable))
pool = mp.Pool(nworkers, initializer=process, initargs=(input_q, output_q))
try :
finished_workers = 0
while True:
it = output_q.get()
if it is None:
finished_workers += 1
if finished_workers == nworkers:
break
else:
yield it
finally:
# This is called at GeneratorExit
print("Closing multiprocessing generator")
gen_pool.close()
pool.close()
@valsdav
Copy link
Author

valsdav commented Jul 29, 2022

Added a limit also on the output queue size to avoid to accumulate too much memory in case the consumer is slow...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment