Pipeline multiprocessing in Python with generators
Similar to mpipe, this short module lets you string together tasks so that they are executed in parallel.
The difference is that it lets you use generators, functions which yield results instead of returning them. Each yielded item gets passed to the next stage.
You can specify that one or more copies of the workers operate on the input queue.
Things that can be in a stage
You can use a genenerator function as a stage. It takes one parameter, input, and yields one or more results that get sent to the next stage. The first stage should yield pipeline.STOP when it is done to allow the pipeline to shut down when the successive stages have completed.
If the stage has only one result, you can simply return it.
If you need to do some initialization in the child task, use an object. If it has an
init(self) method (not
__init__) then it will be called before starting, and
shutdown(self) will be called when the pipeline is shutting down.
Shutting the thing down
The pipeline ends when the first task yields
pipey.STOP. This first shuts down the current stage, and then as each successive stage's input queue empties, it is shut down as well.
Use pipey.log("hello") to output messages to the python logging module in a thread-safe way. The messages will be prefixed with the name of the method used in the stage, and a unique identifier.
Here is some example psuedocode:
import pipey def readDocuments(input_ignored): # for each document that we want to process, yield document # This will shutdown the entire pipeline once everything is done. yield pipey.STOP def processDocuments(doc): # perform some intensive processing on the document # note you can yield more than one result to the next stage yield processedDoc class ResultCombiner: def __init__(self, filename): # you can record some arguments here, still within the parent process. def init(self): # now we are in the child process. # open the output file def __call__(self, processedDoc): # write the processed document to the file. def shutdown(self): # close the file pipeline = pipey.Pipeline() # one process reads the documents pipeline.add(readDocuments) # up to 8 processes transform the documents pipeline.add(processDocuments, 8) # One process combines the results into a file. pipeline.add(ResultCombiner("output.txt")) pipeline.run(input_to_first_stage)
Use some kind of unique values to represent STOP, SHUTDOWN, SHUTDOWN_LAST
When the first stage returns, send the stop signal automatically, eliminating the need for it.
Pipeline.run() should collect and return the results of the final stage.