Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Pipeline multiprocessing for python with generators

Pipeline multiprocessing in Python with generators

Similar to mpipe, this short module lets you string together tasks so that they are executed in parallel.

The difference is that it lets you use generators, functions which yield results instead of returning them. Each yielded item gets passed to the next stage.

You can specify that one or more copies of the workers operate on the input queue.

Things that can be in a stage

Generators

You can use a genenerator function as a stage. It takes one parameter, input, and yields one or more results that get sent to the next stage. The first stage should yield pipeline.STOP when it is done to allow the pipeline to shut down when the successive stages have completed.

Regular functions

If the stage has only one result, you can simply return it.

Objects with init(), __call__(), and shutdown() methods

If you need to do some initialization in the child task, use an object. If it has an init(self) method (not __init__) then it will be called before starting, and shutdown(self) will be called when the pipeline is shutting down.

Shutting the thing down

The pipeline ends when the first task yields pipey.STOP. This first shuts down the current stage, and then as each successive stage's input queue empties, it is shut down as well.

Debugging

Use pipey.log("hello") to output messages to the python logging module in a thread-safe way. The messages will be prefixed with the name of the method used in the stage, and a unique identifier.

Here is some example psuedocode:

import pipey

def readDocuments(input_ignored):
    # for each document that we want to process,
       yield document
       
    # This will shutdown the entire pipeline once everything is done.
    yield pipey.STOP
    
def processDocuments(doc):
    # perform some intensive processing on the document
    # note you can yield more than one result to the next stage
    yield processedDoc
    
class ResultCombiner:
   def __init__(self, filename):
      # you can record some arguments here, still within the parent process.

   def init(self):
      # now we are in the child process.
      # open the output file
      
   def __call__(self, processedDoc):
      # write the processed document to the file.
      
   def shutdown(self):
      # close the file
      
pipeline = pipey.Pipeline()

# one process reads the documents
pipeline.add(readDocuments)

# up to 8 processes transform the documents
pipeline.add(processDocuments, 8)

# One process combines the results into a file.
pipeline.add(ResultCombiner("output.txt"))

pipeline.run(input_to_first_stage)

Todo

  • Use some kind of unique values to represent STOP, SHUTDOWN, SHUTDOWN_LAST

  • When the first stage returns, send the stop signal automatically, eliminating the need for it.

  • Pipeline.run() should collect and return the results of the final stage.

#!/usr/bin/env python3
import pipey, time, random, logging, sys, subprocess
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG,
format="%(created)f [%(name)s] %(message)s")
def readDocuments(input):
# for each document that we want to process,
for i in range(input):
pipey.log("Generating {}".format(i))
yield i
# This will shutdown the entire pipeline once everything is done.
yield pipey.STOP
def processDocuments(doc):
# perform some intensive processing on the document
# note you can yield more than one result to the next stage
pipey.log("Processing {}".format(doc))
time.sleep(random.random())
yield doc * 2
class ResultCombiner:
def __init__(self, filename):
# you can record some arguments here, still within the parent process.
self.filename = filename
def init(self):
# now we are in the child process.
# open the output file
self.f = open(self.filename, "w")
def __call__(self, processedDoc):
# write the processed document to the file.
pipey.log("Writing {} to file".format(processedDoc))
self.f.write("{}\n".format(processedDoc))
def shutdown(self):
pipey.log("Sorting results")
# close the file
self.f.close()
subprocess.call(["sort", "-n", "-o", self.filename, self.filename])
pipeline = pipey.Pipeline()
# one process reads the documents
pipeline.add(readDocuments)
# up to 8 processes transform the documents
pipeline.add(processDocuments, 8)
# One process combines the results into a file.
pipeline.add(ResultCombiner("output.txt"))
pipeline.run(100)
#!/usr/bin/env python3
from multiprocessing import Process, Queue
import sys
import logging
import traceback
import inspect
#TODO: Make these more unique
STOP = "STOP"
SHUTDOWN = "SHUTDOWN"
SHUTDOWN_LAST = "SHUTDOWN_LAST"
log = None
class Task:
def __init__(self, id, fn, inputQueue, outputQueue, multiplicity):
self.id = id
self.fn = fn
self.inputQueue = inputQueue
self.outputQueue = outputQueue
self.multiplicity = multiplicity
def start(self):
self.process = Process(target=self.main, args=(self.inputQueue, self.outputQueue))
self.process.start()
def main(self, inputQueue, outputQueue):
self.inputQueue = inputQueue
self.outputQueue = outputQueue
if inspect.isfunction(self.fn):
logger = logging.getLogger(str(self.id) + ":" +
self.fn.__name__)
else:
logger = logging.getLogger(str(self.id) + ":" +
type(self.fn).__name__)
global log
log = lambda a: logger.debug(a)
try:
if hasattr(self.fn, "init"):
self.fn.init()
log("Running")
while True:
input = self.inputQueue.get()
log("Input is {}".format(input))
if input == SHUTDOWN: break
if input == SHUTDOWN_LAST:
self.outputQueue.put(STOP)
break
if input == STOP:
for i in range(self.multiplicity-1):
self.inputQueue.put(SHUTDOWN)
self.inputQueue.put(SHUTDOWN_LAST)
continue
result = self.fn(input)
if inspect.isgenerator(result):
for x in result:
if x == STOP:
self.inputQueue.put(STOP)
break
self.outputQueue.put(x)
else:
if result == STOP:
self.inputQueue.put(STOP)
else:
self.outputQueue.put(result)
log("Shutting down")
if hasattr(self.fn, "shutdown"):
self.fn.shutdown()
except KeyboardInterrupt:
pass
except Exception:
print("For {}".format(self.fn))
raise
class Pipeline:
def __init__(self):
self.tasks = []
self.inputQueue = Queue(1)
self.outputQueue = Queue(1)
self.nextId = 1
def run(self, arg = None):
for task in self.tasks:
task.start()
self.inputQueue.put(arg)
while True:
x = self.outputQueue.get()
if x == STOP: break
def add(self, fn, fanOut=1):
inputQueue = self.inputQueue
outputQueue = self.outputQueue
if len(self.tasks):
inputQueue = Queue(2)
for task in self.tasks:
if task.outputQueue == self.outputQueue:
task.outputQueue = inputQueue
for i in range(fanOut):
task = Task(self.nextId, fn, inputQueue, outputQueue, fanOut)
self.nextId += 1
self.tasks.append(task)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.