Skip to content

Instantly share code, notes, and snippets.

@pietern
Created December 1, 2011 06:34
Show Gist options
  • Save pietern/1414341 to your computer and use it in GitHub Desktop.
Save pietern/1414341 to your computer and use it in GitHub Desktop.
require 'eventmachine'
require 'posix/spawn'
module EventMachine
module POSIX
module Spawn
include ::POSIX::Spawn
class Child
include Spawn
include Deferrable
# Spawn a new process, write all input and read all output. Supports
# the standard spawn interface as described in the POSIX::Spawn module
# documentation:
#
# new([env], command, [argv1, ...], [options])
#
# The following options are supported in addition to the standard
# POSIX::Spawn options:
#
# :input => str Write str to the new process's standard input.
# :timeout => int Maximum number of seconds to allow the process
# to execute before aborting with a TimeoutExceeded
# exception.
# :max => total Maximum number of bytes of output to allow the
# process to generate before aborting with a
# MaximumOutputExceeded exception.
#
# Returns a new Child instance that is being executed. The object
# includes the Deferrable module, and executes the success callback
# when the process has exited, or the failure callback when the process
# was killed because of exceeding the timeout, or exceeding the maximum
# number of bytes to read from stdout and stderr combined. Once the
# success callback is triggered, this objects's out, err and status
# attributes are available.
def initialize(*args)
@env, @argv, options = extract_process_spawn_arguments(*args)
@options = options.dup
@input = @options.delete(:input)
@timeout = @options.delete(:timeout)
@max = @options.delete(:max)
@options.delete(:chdir) if @options[:chdir].nil?
@sigcld = SignalHandler.instance
sigcld.synchronize { exec! }
end
# All data written to the child process's stdout stream as a String.
attr_reader :out
# All data written to the child process's stderr stream as a String.
attr_reader :err
# A Process::Status object with information on how the child exited.
attr_reader :status
# Total command execution time (wall-clock time)
attr_reader :runtime
# Determine if the process did exit with a zero exit status.
def success?
@status && @status.success?
end
# Determine if the process has already terminated.
def terminated?
!! @status
end
# Send the SIGTERM signal to the process.
#
# Returns the Process::Status object obtained by reaping the process.
def kill
@timer.cancel if @timer
::Process.kill('TERM', @pid) rescue nil
end
private
attr_reader :sigcld
class SignalHandler
def self.instance
@instance ||= begin
new.tap { |instance|
prev_handler = Signal.trap("CLD") {
instance.signal
prev_handler.call if prev_handler
}
}
end
end
def initialize
@pid_to_trigger_pipe = {}
@pid_to_process_status = {}
@paused = false
end
def synchronize
@paused = true
yield
ensure
@paused = false
signal
end
def pid_to_io(pid)
r, w = IO.pipe
@pid_to_trigger_pipe[pid] = w
r
end
def pid_to_process_status(pid)
@pid_to_process_status.delete(pid)
end
def signal
return if @paused
# The SIGCHLD handler may not be called exactly once for every
# child. I.e., multiple children exiting concurrently may trigger
# only one SIGCHLD in the parent. Therefore, reap all processes
# that can be reaped.
while pid = ::Process.wait(-1, ::Process::WNOHANG)
@pid_to_process_status[pid] = $?
w = @pid_to_trigger_pipe.delete(pid)
w.close if w
end
rescue ::Errno::ECHILD
end
end
# Execute command, write input, and read output. This is called
# immediately when a new instance of this object is initialized.
def exec!
# spawn the process and hook up the pipes
@pid, stdin, stdout, stderr = popen4(@env, *(@argv + [@options]))
@start = Time.now
# watch fds
cin = EM.watch stdin, WritableStream, @input.dup if @input
cout = EM.watch stdout, ReadableStream, ''
cerr = EM.watch stderr, ReadableStream, ''
# register events
cin.notify_writable = true if cin
cout.notify_readable = true
cerr.notify_readable = true
# keep track of open fds
in_flight = [cin, cout, cerr].compact
in_flight.each { |io|
# force binary encoding
io.force_encoding
# register finalize hook
io.callback { in_flight.delete(io) }
}
failure = nil
# keep track of max output
max = @max
if max && max > 0
check_buffer_size = lambda {
if cout.buffer.size + cerr.buffer.size > max
failure = MaximumOutputExceeded
in_flight.each(&:close)
in_flight.clear
kill
end
}
cout.after_read(&check_buffer_size)
cerr.after_read(&check_buffer_size)
end
# kill process when it doesn't terminate in time
timeout = @timeout
if timeout && timeout > 0
@timer = Timer.new(timeout) {
failure = TimeoutExceeded
in_flight.each(&:close)
in_flight.clear
kill
}
end
# watch sigcld trigger pipe
csigcld = EM.watch sigcld.pid_to_io(@pid), ReadableStream, ''
csigcld.notify_readable = true
csigcld.callback {
in_flight.each(&:close)
in_flight.clear
@timer.cancel if @timer
@runtime = Time.now - @start
@status = sigcld.pid_to_process_status(@pid)
@out = cout.buffer
@err = cerr.buffer
if failure
set_deferred_failure failure
else
set_deferred_success
end
}
end
class Stream < Connection
include Deferrable
attr_reader :buffer
def initialize(buffer)
@buffer = buffer
end
def force_encoding
if @buffer.respond_to?(:force_encoding)
@io.set_encoding('BINARY', 'BINARY')
@buffer.force_encoding('BINARY')
end
end
def after_read(&blk)
@after_read = blk if blk
@after_read
end
def after_write(&blk)
@after_write = blk if blk
@after_write
end
def close
@io.close rescue nil
detach
end
end
class ReadableStream < Stream
# Maximum buffer size for reading
BUFSIZE = (32 * 1024)
def notify_readable
begin
@buffer << @io.readpartial(BUFSIZE)
@after_read.call if @after_read
rescue Errno::EAGAIN, Errno::EINTR
rescue EOFError
close
set_deferred_success
end
end
end
class WritableStream < Stream
def notify_writable
begin
boom = nil
size = @io.write_nonblock(@buffer)
@buffer = @buffer[size, @buffer.size]
@after_write.call if @after_write
rescue Errno::EPIPE => boom
rescue Errno::EAGAIN, Errno::EINTR
end
if boom || @buffer.size == 0
close
set_deferred_success
end
end
end
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment