Skip to content

Instantly share code, notes, and snippets.

@nizhib
Last active October 7, 2020 13:36
Show Gist options
  • Save nizhib/0a6bdd9d777abf9c0d21102cc2e72526 to your computer and use it in GitHub Desktop.
Save nizhib/0a6bdd9d777abf9c0d21102cc2e72526 to your computer and use it in GitHub Desktop.
Multiprocess Pipeline
import multiprocessing
import os
class MultiprocessPipeline(object):
"""
pipeline = MultiprocessPipeline(foo)
pipeline.start()
for a, b, c in something:
pipeline.put((a, b, c))
for _ in range(len(something)):
result_img, result_mask = pipeline.get()
cv2.imsave(...)
cv2.imsave(...)
pipeline.stop()
"""
def __init__(self, target, num_processes=os.cpu_count()):
self.work = self.__make_work(target)
self.num_processes = num_processes
self.qin = multiprocessing.Queue(num_processes)
self.qout = multiprocessing.Queue()
self.workers = None
@staticmethod
def __make_work(target):
def work(qin, qout):
while True:
task = qin.get()
if task is None:
qin.put(None)
return
result = target(*task)
qout.put(result)
return work
def start(self):
self.workers = [multiprocessing.Process(target=self.work, args=(self.qin, self.qout))
for _ in range(self.num_processes)]
for w in self.workers:
w.start()
def put(self, obj):
self.qin.put(obj)
def get(self):
return self.qout.get()
def stop(self):
self.qin.put(None)
for w in self.workers:
w.join()
self.qin.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment