Skip to content

Instantly share code, notes, and snippets.

@maasha

maasha/pipes.rb Secret

Created January 19, 2014 15:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save maasha/5c6a914df0a79657714a to your computer and use it in GitHub Desktop.
Save maasha/5c6a914df0a79657714a to your computer and use it in GitHub Desktop.
#!/usr/bin/env ruby
require 'parallel'
require 'msgpack'
require 'pp'
class Pipe
def initialize
@commands = []
end
def add(command, options = {})
@commands << Command.new(command, options)
self
end
def run
@commands.each_cons(2) do |c_in, c_out|
reader, writer = IO.pipe
c_out.input = MessagePack::Unpacker.new(reader)
c_in.output = MessagePack::Packer.new(writer)
end
Parallel.each(@commands, in_processes: @commands.size) { |command| command.run }
#Parallel.each(@commands, in_processes: 0) { |command| command.run }
#@commands.each { |command| command.run }
self
end
class Command
attr_accessor :input, :output
def initialize(command, options)
@command = command
@options = options
@input = nil
@output = nil
end
def run
send @command
end
def cat
@input.each { |record| @output.write(record).flush } if @input
File.open(@options[:input]) do |ios|
ios.each { |record| @output.write(record).flush } if @output
end
end
def dump
@input.each do |record|
puts record
@output.write(record).flush if @output
end
end
end
end
p = Pipe.new
#p.add(:cat, input: "foo.tab").add(:dump).add(:cat, input: "table.txt").add(:dump)
#p.add(:cat, input: "table.txt").add(:dump)
p.add(:cat, input: "big.tab").add(:dump)
p.run
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment