Skip to content

Instantly share code, notes, and snippets.

@apeiros
Created March 11, 2010 17:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save apeiros/329397 to your computer and use it in GitHub Desktop.
Save apeiros/329397 to your computer and use it in GitHub Desktop.
require 'set'
# Create and manage forks.
# Provides a basic facility to communicate between the forks and the parent
#
# Example:
# pool = ForkPool.new
# child = pool.fork do |parent|
# puts "child!" # same $stdout as parent
# parent.puts "From child to parent, I got pid #{$$}"
# puts "Parent wrote me: ", parent.gets
# end
# puts "Child with pid #{child.pid} wrote me: ", child.gets # communicate with a fork
# child.puts "Thank you for confirming your pid"
# pool.wait(child) # wait for one or more forks to terminate
# pool.fork :name => :a do sleep 5 end
# pool.fork :name => :b do sleep 2 end
# a = pool.by_name[:a]
# b = pool.by_name[:b] # get forks by names
# a.kill! # immediatly terminate a fork
# pool.wait_all # wait for all known forks to terminate
#
# See Process::fork, ForkPool::Fork
class ForkPool
# Exceptions that have to be ignored
IgnoreExceptions = [::SystemExit]
# An object representing a fork, containing data about it like pid, name,
# exit_status, exception etc.
# It also provides facilities for parent and child process to communicate
# with each other.
class Fork
# The process id of the fork
attr_reader :pid
# The name of the fork (see ForkPool#fork's options)
attr_reader :name
# Readable IO
attr_reader :in
# Writable IO
attr_reader :out
# Create a new Fork instance.
# You shouldn't do that manually, use ForkPool#fork instead.
def initialize(pool, name=nil, *flags)
@handle_exceptions = flags.delete(:exceptions)
@death_notice = flags.delete(:death_notice)
raise ArgumentError, "Unknown flags: #{flags.join(', ')}" unless flags.empty?
@pool = pool
@name = name
@pid = nil
@process_status = nil
@in = nil
@out = nil
end
# Whether this fork sends the final exception to the parent
# See ForkPool::Fork#exception
def handle_exceptions?
@handle_exceptions
end
# Whether this fork sends a death notice to the parent
def death_notice?
@death_notice
end
# Sets the name and io to communicate with the parent/child
def complete!(pid, readable_io, writable_io, ctrl_io) # :nodoc:
@pid = pid
@in = readable_io
@out = writable_io
@ctrl = ctrl_io
class <<self
undef_method :complete! # this method must only be called once
end
end
# Sets the exit status
def process_status!(status) # :nodoc:
@process_status = status
class <<self
undef_method :process_status! # this method must only be called once
end
end
# Process::Status for dead forks, nil for live forks
def process_status
@process_status ||= begin
_, exit_status = *Process.wait2(@pid, Process::WNOHANG)
@pool.death_notice(self)
exit_status
end
end
# The exit status of this fork.
# See Process::Status#exitstatus
def exit_status
status = process_status
status && status.exitstatus
end
# Whether this fork is still running (= is alive) or already exited.
def alive?
!exit_status
end
# Whether this fork is still running or already exited (= is dead).
def dead?
!!exit_status
end
# Read from the fork.
# See IO#gets
def gets(*args)
@in.gets(*args)
end
# Read from the fork.
# See IO#read
def read(*args)
@in.read(*args)
end
# Read from the fork.
# See IO#read_nonblock
def read_nonblock(*args)
@in.read_nonblock(*args)
end
# Receive an object sent by the other process via send_object.
# See ForkPool::Fork#send_object for an example.
def receive_object
size = @in.read(4).unpack("I").first
marshalled = @in.read(size)
Marshal.load(marshalled)
end
# Write to the fork.
# See IO#puts
def puts(*args)
@out.puts(*args)
end
# Write to the fork.
# See IO#write
def write(*args)
@out.write(*args)
end
# The exception that terminated the fork
# Requires the :exceptions flag to be set when creating the fork.
def exception
@exception ||= begin
raise "You must set the :exceptions flag when forking in order to use this" unless handle_exceptions?
size_packed = @ctrl.read(4)
size = size_packed ? size_packed.unpack("I").first : 0
if size.zero? then # death notice or no exception
nil
else
marshalled = @ctrl.read(size)
Marshal.load(marshalled)
end
end
end
# Sends an object to the parent process
#
# Example:
# Demo = Struct.new(:a, :b, :c)
# pool.fork :name => :serializer do |parent|
# parent.send_object({:a => 'little', :nested => ['hash']})
# parent.send_object(Demo.new(1, :two, "three"))
# end
# p :received => pool[:serializer].receive_object # -> {:received=>{:a=>"little", :nested=>["hash"]}}
# p :received => pool[:serializer].receive_object # -> {:received=>#<struct Demo a=1, b=:two, c="three">}
#
# See ForkPool::Fork#receive_object
def send_object(obj)
marshalled = Marshal.dump(obj)
@out.write([marshalled.size].pack("I"))
@out.write(marshalled)
end
# Sends the (SIG)HUP signal to this fork.
# This is "gently asking the process to terminate".
# See Process::kill
def kill
Process.kill("HUP", @pid)
end
# Sends the (SIG)KILL signal to this fork.
# The process will be immediatly terminated.
# See Process::kill
def kill!
Process.kill("KILL", @pid)
end
# Sends the given signal to this fork
# See Process::kill
def signal(sig)
Process.kill(sig, @pid)
end
def dup # :nodoc:
raise TypeError, "can't dup ForkPool::Fork"
end
def clone # :nodoc:
raise TypeError, "can't clone ForkPool::Fork"
end
end
@this = nil
class <<self
# Set this process' fork
def this!(fork) # :nodoc:
@this = fork
end
# This process' fork (only set when you're in a forked process)
attr_reader :this
# When in a fork, this will return the name of the fork
def name
@this && @this.name
end
# Returns true if this process was not spawned by ForkPool
def root?
!@this
end
end
# A Set of all forks known to be dead
attr_reader :dead
# A Set of all forks assumed to be still running (you can verify by using
# ForkPool::Fork#alive?)
attr_reader :alive
# Create a new ForkPool.
# See ForkPool
def initialize
@alive = Set.new
@dead = Set.new
@forks = {}
end
# Access forks by pid or by name
#
# Example:
# pool = ForkPool.new
# fork1 = pool.fork do sleep 100 end
# fork2 = pool.fork :name => :fork2 do sleep 100 end
# pool[fork1.pid].equal?(fork1) # => true
# pool[fork2.pid].equal?(fork2) # => true
# pool[fork2.name].equal?(fork2) # => true
def [](pid_or_name)
@forks[pid_or_name]
end
# Create a new subprocess that is immediatly executed.
# Returns a ForkPool::Fork instance.
# Arguments:
# name:: An optional name for this fork, to facilitate management,
# see ForkPool#[]
# flags:: A list of symbols. See under the 'Flags' section for valid Symbols
#
# Flags:
# :handle_exceptions:: The fork will report back any exception that occurs
# which can be read in the parent via
# ForkPool::Fork#exception.
# :death_notice:: FIXME, document
#
# Example:
# pool = ForkPool.new
# child = pool.fork do |parent|
# puts "child!" # same $stdout as parent
# parent.puts "From child to parent, I got pid #{$$}"
# puts "Parent wrote me: ", parent.gets
# end
# puts "Child with pid #{child.pid} wrote me: ", child.gets
# child.puts "Thank you for confirming your pid"
#
# See also: Process::fork
def fork(name=nil, *flags, &block)
raise TypeError, "Name must be a Symbol" if name && !name.is_a?(Symbol)
fork = Fork.new(self, name, *flags)
fork_read, parent_write = IO.pipe
parent_read, fork_write = IO.pipe
ctrl_read, ctrl_write = nil
ctrl_read, ctrl_write = IO.pipe if fork.handle_exceptions? || fork.death_notice?
pid = Process.fork do
parent_write.close
parent_read.close
ctrl_read.close if ctrl_read
child_process(fork, fork_read, fork_write, ctrl_write, &block)
end
fork_write.close
fork_read.close
ctrl_write.close if ctrl_write
fork.complete!(pid, parent_read, parent_write, ctrl_read)
@alive.add(fork)
@forks[pid] = fork
@forks[name] = fork if name
fork
end
# The pids of all forks that are still alive.
# Performs a check on the status of each fork. This can be suppressed
# by passing true for the no_checking argument.
def running_pids(no_checking=nil)
check_status(*@alive) unless no_checking
@alive.map { |fork| fork.pid }
end
# The names of all forks that are still alive.
# Performs a check on the status of each fork. This can be suppressed
# by passing true for the no_checking argument.
def running_names(no_checking=nil)
check_status(*@alive) unless no_checking
@alive.map { |fork| fork.name }.compact # compact because some forks may not have a name
end
# Deletes all data associated with fork, and returns the fork
# raises a RuntimeError if you provide a pid of a still running fork
# process.
def forget!(fork)
raise "Fork is still running" unless @dead.include?(fork)
@forks.delete(fork.pid)
@forks.delete(fork.name)
@alive.delete(fork) # just to be really sure
@dead.delete(fork)
fork
end
# Deletes the data of all dead forks.
# See ForkPool#forget!
def forget_all!
@dead.each do |fork| forget!(fork) end
end
# Inform the pool about the dead of a fork instance
# Used by wait and Fork#process_status
def death_notice(fork) # :nodoc:
@alive.delete(fork)
@dead.add(fork)
end
# Check & update the status of one or more forks
def check_status(*forks) # :nodoc:
forks.each do |fork|
_, exit_status = Process.wait2(fork.pid, Process::WNOHANG)
if exit_status then
fork.process_status!(exit_status)
death_notice(fork)
end
end
end
# Wait for one or more forks to terminate
# Example:
# pool = ForkPool.new
# start = Time.now
# a = pool.fork do sleep 5 end
# b = pool.fork do sleep 2 end
# c = pool.fork do sleep 3 end
# pool.wait(b,c)
# (Time.now-start).floor # => 3
def wait(*forks)
forks.each do |fork|
_, exit_status = Process.wait2(fork.pid)
fork.process_status!(exit_status)
death_notice(fork)
end
end
# Wait for all forks to terminate
# Example:
# pool = ForkPool.new
# start = Time.now
# pool.fork do sleep 5 end
# pool.fork do sleep 2 end
# pool.fork do sleep 3 end
# pool.wait_all
# (Time.now-start).floor # => 5
def wait_all
wait(*@alive)
end
private
def child_process(fork, fork_read, fork_write, ctrl_write)
fork.complete!(Process.pid, fork_read, fork_write, ctrl_write)
ForkPool.this!(fork)
yield(fork)
rescue *ForkPool::IgnoreExceptions
raise
rescue Exception => e
if ctrl_write && fork.handle_exceptions? then
marshalled = Marshal.dump(e)
ctrl_write.write([marshalled.size].pack("I"))
ctrl_write.write(marshalled)
end
exit 1
ensure
fork_write.close
fork_read.close
if ctrl_write then
ctrl_write.write("\0\0\0\0") if fork.death_notice? # send the death notice
ctrl_write.close
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment