Created
November 22, 2012 17:11
-
-
Save ferrous26/4132181 to your computer and use it in GitHub Desktop.
Trying to generalize the pipeline data flow
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'thread' | |
## | |
# A basic, serial data flow | |
# | |
# If you have a long process for a data set, consider performing steps | |
# of the process concurrently. Even simply reading a file and writing it | |
# back out elsewhere (i.e. a socket) can have increased throughput by | |
# using a pipeline. | |
# | |
# The implementation of this class takes advantage of Ruby's `SizedQueue` | |
# class to add flow control to the pipeline; no stage in the pipeline can | |
# get too far ahead of another stage, which should prevent queues from | |
# getting long and eating a lot of memory waiting for any one particular | |
# stage to perform work. The effectiveness of a pipeline will depend on | |
# the amount of work you are performing per pipeline stage. | |
# | |
# The semantics of signalling the "end of pipeline" are a work in progress; | |
# currently there is no proper mechanism, you can either poll {#empty?} or | |
# use a `ConditionVariable`. | |
# | |
# Setting the `input` and waiting for the pipeline to finish are two other | |
# features which are still a work in progress. If you need to wait for the | |
# pipeline to finish you should use a `ConditionVariable` from the `thread` | |
# standard library. | |
# | |
class Pipeline | |
# @return [Fixnum] | |
MAX_QUEUE_DEPTH = 10 | |
# @return [Queue,#pop] | |
attr_reader :input | |
## | |
# You can optionally initialize the pipeline with a `Queue` of your own or | |
# any other object that responds to `#pop` (e.g. `Array`). Note however, | |
# the object you pass in should support thread-safe mutations if you intend | |
# to continue mutating it after the pipeline has been initialized. | |
# | |
# @param input [Queue,#pop] | |
def initialize input = nil | |
@input = input || SizedQueue.new(MAX_QUEUE_DEPTH) | |
@next_input = @input | |
@stages = [] | |
end | |
## | |
# Add the given block as the next stage in the pipeline | |
# | |
# The block should take a single argument, an input from the previous | |
# stage, and return an object to pass to the next stage. | |
def add_stage | |
raise "cannot add more stages; output stage already set" unless @next_input | |
output = SizedQueue.new MAX_QUEUE_DEPTH | |
@stages << Thread.new(@next_input) do |input| | |
Thread.current[:input] = input | |
Thread.current[:output] = output | |
loop do | |
output.push yield input.pop | |
end | |
end | |
@next_input = output | |
self | |
end | |
def output | |
@stages << Thread.new(@next_input) do |input| | |
Thread.current[:input] = input | |
Thread.current[:output] = [] | |
loop do | |
yield input.pop | |
end | |
end | |
@next_input = nil | |
self | |
end | |
def close | |
@stages.each(&:exit) | |
end | |
def kill | |
@stages.each(&:kill) | |
end | |
def empty? | |
@stages.all? { |stage| stage[:input].empty && stage[:output].empty? } | |
end | |
def crashed? | |
@stages.any? { |stage| stage.status.nil? } | |
end | |
def finished? | |
# explicitly false, Thread#status == nil means thread crashed | |
@stages.all? { |stage| stage.status == false } | |
end | |
end | |
require 'minitest/autorun' | |
require 'securerandom' | |
class TestPipeline < MiniTest::Unit::TestCase | |
def pl; @pl ||= Pipeline.new; end | |
def cv; @cv ||= ConditionVariable.new; end | |
def mutex; @mutex ||= Mutex.new; end | |
def string; @string ||= SecureRandom.random_bytes(rand(10)+1) end | |
def setup | |
@output = nil | |
end | |
def wait_for_pipeline_to_finish | |
mutex.synchronize do | |
cv.wait mutex unless @output | |
end | |
end | |
def test_single_stage | |
pl.add_stage do |input| | |
mutex.synchronize do | |
@output = input | |
cv.signal | |
end | |
end | |
pl.input.push string | |
wait_for_pipeline_to_finish | |
assert_equal string, @output | |
end | |
def test_multi_stage | |
pl.add_stage { |input| input.reverse } | |
pl.add_stage { |input| input.upcase } | |
pl.add_stage { |input| input * 2 } | |
pl.add_stage do |input| | |
mutex.synchronize do | |
@output = input | |
cv.signal | |
end | |
end | |
pl.input.push string | |
wait_for_pipeline_to_finish | |
assert_equal((string.reverse.upcase * 2), @output) | |
end | |
def test_output | |
pl.input.push string | |
pl.output do |data| | |
mutex.synchronize do | |
@output = data | |
cv.signal | |
end | |
end | |
wait_for_pipeline_to_finish | |
assert_equal string, @output | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment