Skip to content

Instantly share code, notes, and snippets.

@ferrous26
Created November 22, 2012 17:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ferrous26/4132181 to your computer and use it in GitHub Desktop.
Save ferrous26/4132181 to your computer and use it in GitHub Desktop.
Trying to generalize the pipeline data flow
require 'thread'
##
# A basic, serial data flow
#
# If you have a long process for a data set, consider performing steps
# of the process concurrently. Even simply reading a file and writing it
# back out elsewhere (i.e. a socket) can have increased throughput by
# using a pipeline.
#
# The implementation of this class takes advantage of Ruby's `SizedQueue`
# class to add flow control to the pipeline; no stage in the pipeline can
# get too far ahead of another stage, which should prevent queues from
# getting long and eating a lot of memory waiting for any one particular
# stage to perform work. The effectiveness of a pipeline will depend on
# the amount of work you are performing per pipeline stage.
#
# The semantics of signalling the "end of pipeline" are a work in progress;
# currently there is no proper mechanism, you can either poll {#empty?} or
# use a `ConditionVariable`.
#
# Setting the `input` and waiting for the pipeline to finish are two other
# features which are still a work in progress. If you need to wait for the
# pipeline to finish you should use a `ConditionVariable` from the `thread`
# standard library.
#
class Pipeline
# @return [Fixnum]
MAX_QUEUE_DEPTH = 10
# @return [Queue,#pop]
attr_reader :input
##
# You can optionally initialize the pipeline with a `Queue` of your own or
# any other object that responds to `#pop` (e.g. `Array`). Note however,
# the object you pass in should support thread-safe mutations if you intend
# to continue mutating it after the pipeline has been initialized.
#
# @param input [Queue,#pop]
def initialize input = nil
@input = input || SizedQueue.new(MAX_QUEUE_DEPTH)
@next_input = @input
@stages = []
end
##
# Add the given block as the next stage in the pipeline
#
# The block should take a single argument, an input from the previous
# stage, and return an object to pass to the next stage.
def add_stage
raise "cannot add more stages; output stage already set" unless @next_input
output = SizedQueue.new MAX_QUEUE_DEPTH
@stages << Thread.new(@next_input) do |input|
Thread.current[:input] = input
Thread.current[:output] = output
loop do
output.push yield input.pop
end
end
@next_input = output
self
end
def output
@stages << Thread.new(@next_input) do |input|
Thread.current[:input] = input
Thread.current[:output] = []
loop do
yield input.pop
end
end
@next_input = nil
self
end
def close
@stages.each(&:exit)
end
def kill
@stages.each(&:kill)
end
def empty?
@stages.all? { |stage| stage[:input].empty && stage[:output].empty? }
end
def crashed?
@stages.any? { |stage| stage.status.nil? }
end
def finished?
# explicitly false, Thread#status == nil means thread crashed
@stages.all? { |stage| stage.status == false }
end
end
require 'minitest/autorun'
require 'securerandom'
class TestPipeline < MiniTest::Unit::TestCase
def pl; @pl ||= Pipeline.new; end
def cv; @cv ||= ConditionVariable.new; end
def mutex; @mutex ||= Mutex.new; end
def string; @string ||= SecureRandom.random_bytes(rand(10)+1) end
def setup
@output = nil
end
def wait_for_pipeline_to_finish
mutex.synchronize do
cv.wait mutex unless @output
end
end
def test_single_stage
pl.add_stage do |input|
mutex.synchronize do
@output = input
cv.signal
end
end
pl.input.push string
wait_for_pipeline_to_finish
assert_equal string, @output
end
def test_multi_stage
pl.add_stage { |input| input.reverse }
pl.add_stage { |input| input.upcase }
pl.add_stage { |input| input * 2 }
pl.add_stage do |input|
mutex.synchronize do
@output = input
cv.signal
end
end
pl.input.push string
wait_for_pipeline_to_finish
assert_equal((string.reverse.upcase * 2), @output)
end
def test_output
pl.input.push string
pl.output do |data|
mutex.synchronize do
@output = data
cv.signal
end
end
wait_for_pipeline_to_finish
assert_equal string, @output
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment