Similar to mpipe, this short module lets you string together tasks so that they are executed in parallel.
The difference is that it lets you use generators, functions which yield results instead of returning them. Each yielded item gets passed to the next stage.
You can specify that one or more copies of the workers operate on the input queue.
You can use a genenerator function as a stage. It takes one parameter, input, and yields one or more results that get sent to the next stage. The first stage should yield pipeline.STOP when it is done to allow the pipeline to shut down when the successive stages have completed.
If the stage has only one result, you can simply return it.
If you need to do some initialization in the child task, use an object. If it has an init(self)
method (not __init__
) then it will be called before starting, and shutdown(self)
will be called when the pipeline is shutting down.
The pipeline ends when the first task yields pipey.STOP
. This first shuts down the current stage, and then as each successive stage's input queue empties, it is shut down as well.
Use pipey.log("hello") to output messages to the python logging module in a thread-safe way. The messages will be prefixed with the name of the method used in the stage, and a unique identifier.
Here is some example psuedocode:
import pipey
def readDocuments(input_ignored):
# for each document that we want to process,
yield document
# This will shutdown the entire pipeline once everything is done.
yield pipey.STOP
def processDocuments(doc):
# perform some intensive processing on the document
# note you can yield more than one result to the next stage
yield processedDoc
class ResultCombiner:
def __init__(self, filename):
# you can record some arguments here, still within the parent process.
def init(self):
# now we are in the child process.
# open the output file
def __call__(self, processedDoc):
# write the processed document to the file.
def shutdown(self):
# close the file
pipeline = pipey.Pipeline()
# one process reads the documents
pipeline.add(readDocuments)
# up to 8 processes transform the documents
pipeline.add(processDocuments, 8)
# One process combines the results into a file.
pipeline.add(ResultCombiner("output.txt"))
pipeline.run(input_to_first_stage)
-
Use some kind of unique values to represent STOP, SHUTDOWN, SHUTDOWN_LAST
-
When the first stage returns, send the stop signal automatically, eliminating the need for it.
-
Pipeline.run() should collect and return the results of the final stage.