Created
March 11, 2010 17:32
-
-
Save apeiros/329397 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'set' | |
# Create and manage forks. | |
# Provides a basic facility to communicate between the forks and the parent | |
# | |
# Example: | |
# pool = ForkPool.new | |
# child = pool.fork do |parent| | |
# puts "child!" # same $stdout as parent | |
# parent.puts "From child to parent, I got pid #{$$}" | |
# puts "Parent wrote me: ", parent.gets | |
# end | |
# puts "Child with pid #{child.pid} wrote me: ", child.gets # communicate with a fork | |
# child.puts "Thank you for confirming your pid" | |
# pool.wait(child) # wait for one or more forks to terminate | |
# pool.fork :name => :a do sleep 5 end | |
# pool.fork :name => :b do sleep 2 end | |
# a = pool.by_name[:a] | |
# b = pool.by_name[:b] # get forks by names | |
# a.kill! # immediatly terminate a fork | |
# pool.wait_all # wait for all known forks to terminate | |
# | |
# See Process::fork, ForkPool::Fork | |
class ForkPool | |
# Exceptions that have to be ignored | |
IgnoreExceptions = [::SystemExit] | |
# An object representing a fork, containing data about it like pid, name, | |
# exit_status, exception etc. | |
# It also provides facilities for parent and child process to communicate | |
# with each other. | |
class Fork | |
# The process id of the fork | |
attr_reader :pid | |
# The name of the fork (see ForkPool#fork's options) | |
attr_reader :name | |
# Readable IO | |
attr_reader :in | |
# Writable IO | |
attr_reader :out | |
# Create a new Fork instance. | |
# You shouldn't do that manually, use ForkPool#fork instead. | |
def initialize(pool, name=nil, *flags) | |
@handle_exceptions = flags.delete(:exceptions) | |
@death_notice = flags.delete(:death_notice) | |
raise ArgumentError, "Unknown flags: #{flags.join(', ')}" unless flags.empty? | |
@pool = pool | |
@name = name | |
@pid = nil | |
@process_status = nil | |
@in = nil | |
@out = nil | |
end | |
# Whether this fork sends the final exception to the parent | |
# See ForkPool::Fork#exception | |
def handle_exceptions? | |
@handle_exceptions | |
end | |
# Whether this fork sends a death notice to the parent | |
def death_notice? | |
@death_notice | |
end | |
# Sets the name and io to communicate with the parent/child | |
def complete!(pid, readable_io, writable_io, ctrl_io) # :nodoc: | |
@pid = pid | |
@in = readable_io | |
@out = writable_io | |
@ctrl = ctrl_io | |
class <<self | |
undef_method :complete! # this method must only be called once | |
end | |
end | |
# Sets the exit status | |
def process_status!(status) # :nodoc: | |
@process_status = status | |
class <<self | |
undef_method :process_status! # this method must only be called once | |
end | |
end | |
# Process::Status for dead forks, nil for live forks | |
def process_status | |
@process_status ||= begin | |
_, exit_status = *Process.wait2(@pid, Process::WNOHANG) | |
@pool.death_notice(self) | |
exit_status | |
end | |
end | |
# The exit status of this fork. | |
# See Process::Status#exitstatus | |
def exit_status | |
status = process_status | |
status && status.exitstatus | |
end | |
# Whether this fork is still running (= is alive) or already exited. | |
def alive? | |
!exit_status | |
end | |
# Whether this fork is still running or already exited (= is dead). | |
def dead? | |
!!exit_status | |
end | |
# Read from the fork. | |
# See IO#gets | |
def gets(*args) | |
@in.gets(*args) | |
end | |
# Read from the fork. | |
# See IO#read | |
def read(*args) | |
@in.read(*args) | |
end | |
# Read from the fork. | |
# See IO#read_nonblock | |
def read_nonblock(*args) | |
@in.read_nonblock(*args) | |
end | |
# Receive an object sent by the other process via send_object. | |
# See ForkPool::Fork#send_object for an example. | |
def receive_object | |
size = @in.read(4).unpack("I").first | |
marshalled = @in.read(size) | |
Marshal.load(marshalled) | |
end | |
# Write to the fork. | |
# See IO#puts | |
def puts(*args) | |
@out.puts(*args) | |
end | |
# Write to the fork. | |
# See IO#write | |
def write(*args) | |
@out.write(*args) | |
end | |
# The exception that terminated the fork | |
# Requires the :exceptions flag to be set when creating the fork. | |
def exception | |
@exception ||= begin | |
raise "You must set the :exceptions flag when forking in order to use this" unless handle_exceptions? | |
size_packed = @ctrl.read(4) | |
size = size_packed ? size_packed.unpack("I").first : 0 | |
if size.zero? then # death notice or no exception | |
nil | |
else | |
marshalled = @ctrl.read(size) | |
Marshal.load(marshalled) | |
end | |
end | |
end | |
# Sends an object to the parent process | |
# | |
# Example: | |
# Demo = Struct.new(:a, :b, :c) | |
# pool.fork :name => :serializer do |parent| | |
# parent.send_object({:a => 'little', :nested => ['hash']}) | |
# parent.send_object(Demo.new(1, :two, "three")) | |
# end | |
# p :received => pool[:serializer].receive_object # -> {:received=>{:a=>"little", :nested=>["hash"]}} | |
# p :received => pool[:serializer].receive_object # -> {:received=>#<struct Demo a=1, b=:two, c="three">} | |
# | |
# See ForkPool::Fork#receive_object | |
def send_object(obj) | |
marshalled = Marshal.dump(obj) | |
@out.write([marshalled.size].pack("I")) | |
@out.write(marshalled) | |
end | |
# Sends the (SIG)HUP signal to this fork. | |
# This is "gently asking the process to terminate". | |
# See Process::kill | |
def kill | |
Process.kill("HUP", @pid) | |
end | |
# Sends the (SIG)KILL signal to this fork. | |
# The process will be immediatly terminated. | |
# See Process::kill | |
def kill! | |
Process.kill("KILL", @pid) | |
end | |
# Sends the given signal to this fork | |
# See Process::kill | |
def signal(sig) | |
Process.kill(sig, @pid) | |
end | |
def dup # :nodoc: | |
raise TypeError, "can't dup ForkPool::Fork" | |
end | |
def clone # :nodoc: | |
raise TypeError, "can't clone ForkPool::Fork" | |
end | |
end | |
@this = nil | |
class <<self | |
# Set this process' fork | |
def this!(fork) # :nodoc: | |
@this = fork | |
end | |
# This process' fork (only set when you're in a forked process) | |
attr_reader :this | |
# When in a fork, this will return the name of the fork | |
def name | |
@this && @this.name | |
end | |
# Returns true if this process was not spawned by ForkPool | |
def root? | |
!@this | |
end | |
end | |
# A Set of all forks known to be dead | |
attr_reader :dead | |
# A Set of all forks assumed to be still running (you can verify by using | |
# ForkPool::Fork#alive?) | |
attr_reader :alive | |
# Create a new ForkPool. | |
# See ForkPool | |
def initialize | |
@alive = Set.new | |
@dead = Set.new | |
@forks = {} | |
end | |
# Access forks by pid or by name | |
# | |
# Example: | |
# pool = ForkPool.new | |
# fork1 = pool.fork do sleep 100 end | |
# fork2 = pool.fork :name => :fork2 do sleep 100 end | |
# pool[fork1.pid].equal?(fork1) # => true | |
# pool[fork2.pid].equal?(fork2) # => true | |
# pool[fork2.name].equal?(fork2) # => true | |
def [](pid_or_name) | |
@forks[pid_or_name] | |
end | |
# Create a new subprocess that is immediatly executed. | |
# Returns a ForkPool::Fork instance. | |
# Arguments: | |
# name:: An optional name for this fork, to facilitate management, | |
# see ForkPool#[] | |
# flags:: A list of symbols. See under the 'Flags' section for valid Symbols | |
# | |
# Flags: | |
# :handle_exceptions:: The fork will report back any exception that occurs | |
# which can be read in the parent via | |
# ForkPool::Fork#exception. | |
# :death_notice:: FIXME, document | |
# | |
# Example: | |
# pool = ForkPool.new | |
# child = pool.fork do |parent| | |
# puts "child!" # same $stdout as parent | |
# parent.puts "From child to parent, I got pid #{$$}" | |
# puts "Parent wrote me: ", parent.gets | |
# end | |
# puts "Child with pid #{child.pid} wrote me: ", child.gets | |
# child.puts "Thank you for confirming your pid" | |
# | |
# See also: Process::fork | |
def fork(name=nil, *flags, &block) | |
raise TypeError, "Name must be a Symbol" if name && !name.is_a?(Symbol) | |
fork = Fork.new(self, name, *flags) | |
fork_read, parent_write = IO.pipe | |
parent_read, fork_write = IO.pipe | |
ctrl_read, ctrl_write = nil | |
ctrl_read, ctrl_write = IO.pipe if fork.handle_exceptions? || fork.death_notice? | |
pid = Process.fork do | |
parent_write.close | |
parent_read.close | |
ctrl_read.close if ctrl_read | |
child_process(fork, fork_read, fork_write, ctrl_write, &block) | |
end | |
fork_write.close | |
fork_read.close | |
ctrl_write.close if ctrl_write | |
fork.complete!(pid, parent_read, parent_write, ctrl_read) | |
@alive.add(fork) | |
@forks[pid] = fork | |
@forks[name] = fork if name | |
fork | |
end | |
# The pids of all forks that are still alive. | |
# Performs a check on the status of each fork. This can be suppressed | |
# by passing true for the no_checking argument. | |
def running_pids(no_checking=nil) | |
check_status(*@alive) unless no_checking | |
@alive.map { |fork| fork.pid } | |
end | |
# The names of all forks that are still alive. | |
# Performs a check on the status of each fork. This can be suppressed | |
# by passing true for the no_checking argument. | |
def running_names(no_checking=nil) | |
check_status(*@alive) unless no_checking | |
@alive.map { |fork| fork.name }.compact # compact because some forks may not have a name | |
end | |
# Deletes all data associated with fork, and returns the fork | |
# raises a RuntimeError if you provide a pid of a still running fork | |
# process. | |
def forget!(fork) | |
raise "Fork is still running" unless @dead.include?(fork) | |
@forks.delete(fork.pid) | |
@forks.delete(fork.name) | |
@alive.delete(fork) # just to be really sure | |
@dead.delete(fork) | |
fork | |
end | |
# Deletes the data of all dead forks. | |
# See ForkPool#forget! | |
def forget_all! | |
@dead.each do |fork| forget!(fork) end | |
end | |
# Inform the pool about the dead of a fork instance | |
# Used by wait and Fork#process_status | |
def death_notice(fork) # :nodoc: | |
@alive.delete(fork) | |
@dead.add(fork) | |
end | |
# Check & update the status of one or more forks | |
def check_status(*forks) # :nodoc: | |
forks.each do |fork| | |
_, exit_status = Process.wait2(fork.pid, Process::WNOHANG) | |
if exit_status then | |
fork.process_status!(exit_status) | |
death_notice(fork) | |
end | |
end | |
end | |
# Wait for one or more forks to terminate | |
# Example: | |
# pool = ForkPool.new | |
# start = Time.now | |
# a = pool.fork do sleep 5 end | |
# b = pool.fork do sleep 2 end | |
# c = pool.fork do sleep 3 end | |
# pool.wait(b,c) | |
# (Time.now-start).floor # => 3 | |
def wait(*forks) | |
forks.each do |fork| | |
_, exit_status = Process.wait2(fork.pid) | |
fork.process_status!(exit_status) | |
death_notice(fork) | |
end | |
end | |
# Wait for all forks to terminate | |
# Example: | |
# pool = ForkPool.new | |
# start = Time.now | |
# pool.fork do sleep 5 end | |
# pool.fork do sleep 2 end | |
# pool.fork do sleep 3 end | |
# pool.wait_all | |
# (Time.now-start).floor # => 5 | |
def wait_all | |
wait(*@alive) | |
end | |
private | |
def child_process(fork, fork_read, fork_write, ctrl_write) | |
fork.complete!(Process.pid, fork_read, fork_write, ctrl_write) | |
ForkPool.this!(fork) | |
yield(fork) | |
rescue *ForkPool::IgnoreExceptions | |
raise | |
rescue Exception => e | |
if ctrl_write && fork.handle_exceptions? then | |
marshalled = Marshal.dump(e) | |
ctrl_write.write([marshalled.size].pack("I")) | |
ctrl_write.write(marshalled) | |
end | |
exit 1 | |
ensure | |
fork_write.close | |
fork_read.close | |
if ctrl_write then | |
ctrl_write.write("\0\0\0\0") if fork.death_notice? # send the death notice | |
ctrl_write.close | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment