Skip to content

Instantly share code, notes, and snippets.

@dirkgr
Created October 16, 2018 00:15
Show Gist options
  • Save dirkgr/df604217d363e69a9d52b2e19bd58806 to your computer and use it in GitHub Desktop.
Save dirkgr/df604217d363e69a9d52b2e19bd58806 to your computer and use it in GitHub Desktop.
A map function that uses multiple processes to map, but does it more efficiently than the `multiprocessing` library by using the magic of Unix forking
from typing import *
import multiprocessing as mp
def mp_map(fn, input_sequence: Iterable) -> Iterable:
input_queue = mp.Queue()
output_queue = mp.Queue()
def process_items():
while True:
item = input_queue.get()
if item is None: # sentinel value
break
item_index, item = item
try:
processed_item = fn(item)
except Exception as e:
output_queue.put((None, e))
else:
output_queue.put((processed_item, None))
output_queue.close()
output_queue.join_thread()
processes = []
try:
for cpu_number in range(mp.cpu_count()):
process = mp.Process(target=process_items)
processes.append(process)
process.start()
item_count = 0
for i, item in enumerate(input_sequence):
input_queue.put((i, item))
item_count += 1
for _ in range(mp.cpu_count()):
input_queue.put(None)
while item_count > 0:
processed_item, error = output_queue.get()
if error is not None:
raise error
yield processed_item
item_count -= 1
finally:
for process in processes:
process.terminate()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment