Skip to content

Instantly share code, notes, and snippets.

@chrisberkhout
Last active December 11, 2015 12:19
Show Gist options
  • Save chrisberkhout/4600139 to your computer and use it in GitHub Desktop.
Save chrisberkhout/4600139 to your computer and use it in GitHub Desktop.
Start of Pipes, which builds on ChildProcess to make piping between processes in Ruby as easy as in the shell
require "childprocess"
class Pipes
class NullBit
def initialize
@read, @write = IO.pipe
end
def consumer
@write
end
def finish
@write.close
end
end
class ProcessBit
def initialize(*args)
@process = ChildProcess.build(*args)
end
def consumer
@process.io.stdin
end
def start(following_consumer)
@process.duplex = true
@process.io.stdout = following_consumer
@process.start
end
def finish
@process.io.stdin.close
@process.wait
end
end
class BlockBit
def initialize(&block)
@block = block
@read, @write = IO.pipe
end
def consumer
@write
end
def start(following_consumer)
@thread = Thread.new do
@block.call(@read, following_consumer)
end
end
def finish
@write.close
@read.close # without this @read#each_line never stops waiting (with this need to handle IOError... stream closed)
puts "joining thread"
@thread.join
puts "joined thread"
end
end
def initialize
@bits = []
end
def to(*args, &block)
if block_given?
@bits << BlockBit.new(&block)
else
@bits << ProcessBit.new(*args)
end
self
end
def run
@bits = [NullBit.new] + @bits + [NullBit.new]
@bits[1..-2].zip(@bits[2..-1]).reverse.each do |current, following|
current.start(following.consumer)
end
@bits.each do |current|
current.finish
end
end
end
# Pipes.new.to("ls").to("sort").to("sort", "-r").to { |input| puts input.readlines }.run
Pipes.new.
to("ls").
to("sort").
to do |input, output|
input.each_line do |line|
output.puts line
puts "FIRST BLOCK... #{line}"
end rescue nil
end.
to("sort", "-r").
to do |input, output|
input.each_line do |line|
output.puts line
puts "SECOND BLOCK... #{line}"
end rescue nil
end.run
require "childprocess"
r1, w1 = IO.pipe
p3 = ChildProcess.build("sort", "-r")
p3.duplex = true
p3.io.stdout = w1
p3.start
p2 = ChildProcess.build("sort")
p2.duplex = true # sets up pipe so search.io.stdin will be available after .start
p2.io.stdout = p3.io.stdin
p2.start
# p1 = ChildProcess.build("ls")
# p1.duplex = true
# p1.io.stdout = p2.io.stdin
# p1.start
# p1.wait
source = lambda do
p2.io.stdin.puts "one\ntwo\nthree\nfour"
end
source.call
p2.io.stdin.close
p2.wait
p3.io.stdin.close
p3.wait
w1.close
r1.readlines.each { |line| puts "got this -> #{line}"}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment