Created
March 11, 2010 17:32
-
-
Save apeiros/329397 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'set' | |
# Create and manage forks. | |
# Provides a basic facility to communicate between the forks and the parent | |
# | |
# Example: | |
# pool = ForkPool.new | |
# child = pool.fork do |parent| | |
# puts "child!" # same $stdout as parent | |
# parent.puts "From child to parent, I got pid #{$$}" | |
# puts "Parent wrote me: ", parent.gets | |
# end | |
# puts "Child with pid #{child.pid} wrote me: ", child.gets # communicate with a fork | |
# child.puts "Thank you for confirming your pid" | |
# pool.wait(child) # wait for one or more forks to terminate | |
# pool.fork :name => :a do sleep 5 end | |
# pool.fork :name => :b do sleep 2 end | |
# a = pool.by_name[:a] | |
# b = pool.by_name[:b] # get forks by names | |
# a.kill! # immediatly terminate a fork | |
# pool.wait_all # wait for all known forks to terminate | |
# | |
# See Process::fork, ForkPool::Fork | |
class ForkPool | |
class Fork | |
# The process id of the fork | |
attr_reader :pid | |
# The name of the fork (see ForkPool#fork's options) | |
attr_reader :name | |
# The fork will read data from here | |
attr_reader :parent_io | |
# Readable IO | |
attr_reader :in | |
# Writable IO | |
attr_reader :out | |
# Create a new Fork instance. | |
# You shouldn't do that manually, use ForkPool#fork instead. | |
def initialize(pool, name=nil) | |
@pool = pool | |
@name = name | |
@pid = nil | |
@process_status = nil | |
@in = nil | |
@out = nil | |
end | |
# Sets the name and io to communicate with the parent/child | |
def complete!(pid, readable_io, writable_io) # :nodoc: | |
@pid = pid | |
@in = readable_io | |
@out = writable_io | |
class <<self | |
undef_method :complete! # this method must only be called once | |
end | |
end | |
# Sets the exit status | |
def process_status!(status) # :nodoc: | |
@process_status = status | |
class <<self | |
undef_method :process_status! # this method must only be called once | |
end | |
end | |
# Process::Status for dead forks, nil for live forks | |
def process_status | |
@process_status ||= begin | |
_, exit_status = *Process.wait2(@pid, Process::WNOHANG) | |
@pool.death_notice(self) | |
exit_status | |
end | |
end | |
# The exit status of this fork. | |
# See Process::Status | |
def exit_status | |
process_status.exitstatus | |
end | |
# Whether this fork is still running (= is alive) or already exited. | |
def alive? | |
!exit_status | |
end | |
# Whether this fork is still running or already exited (= is dead). | |
def dead? | |
!!exit_status | |
end | |
# Read from the fork. | |
# See IO#gets | |
def gets(*args) | |
@in.gets(*args) | |
end | |
# Read from the fork. | |
# See IO#read | |
def read(*args) | |
@in.read(*args) | |
end | |
# Read from the fork. | |
# See IO#read_nonblock | |
def read_nonblock(*args) | |
@in.read_nonblock(*args) | |
end | |
# Write to the fork. | |
# See IO#puts | |
def puts(*args) | |
@out.puts(*args) | |
end | |
# Write to the fork. | |
# See IO#write | |
def write(*args) | |
@out.write(*args) | |
end | |
# Sends the (SIG)HUP signal to this fork. | |
# This is "gently asking the process to terminate". | |
# See Process::kill | |
def kill | |
Process.kill("HUP", @pid) | |
end | |
# Sends the (SIG)KILL signal to this fork. | |
# The process will be immediatly terminated. | |
# See Process::kill | |
def kill! | |
Process.kill("KILL", @pid) | |
end | |
# Sends the given signal to this fork | |
# See Process::kill | |
def signal(sig) | |
Process.kill(sig, @pid) | |
end | |
def dup # :nodoc: | |
raise TypeError, "can't dup ForkPool::Fork" | |
end | |
def clone # :nodoc: | |
raise TypeError, "can't clone ForkPool::Fork" | |
end | |
end | |
@this = nil | |
class <<self | |
# Set this process' fork | |
def this!(fork) # :nodoc: | |
@this = fork | |
end | |
# This process' fork (only set when you're in a forked process) | |
attr_reader :this | |
# When in a fork, this will return the name of the fork | |
def name | |
@this && @this.name | |
end | |
end | |
# A hash of all known forks by name | |
attr_reader :by_name | |
# A hash of all known forks by process id | |
attr_reader :by_pid | |
# A Set of all forks known to be dead | |
attr_reader :dead | |
# A Set of all forks assumed to be still running (you can verify by using | |
# ForkPool::Fork#alive?) | |
attr_reader :alive | |
# Create a new ForkPool. | |
# See ForkPool | |
def initialize | |
@alive = Set.new | |
@dead = Set.new | |
@by_name = {} | |
@by_pid = {} | |
end | |
# Create a new subprocess that is immediatly executed. | |
# options: | |
# :name:: An optional name for this fork, to facilitate management | |
# | |
# Example: | |
# pool = ForkPool.new | |
# child = pool.fork do |parent| | |
# puts "child!" # same $stdout as parent | |
# parent.puts "From child to parent, I got pid #{$$}" | |
# puts "Parent wrote me: ", parent.gets | |
# end | |
# puts "Child with pid #{child.pid} wrote me: ", child.gets | |
# child.puts "Thank you for confirming your pid" | |
# | |
# See Process::fork | |
def fork(opt=nil) | |
name = nil | |
if opt then | |
name = opt[:name] | |
raise TypeError, "Name must be a Symbol" unless name.is_a?(Symbol) | |
end | |
fork = Fork.new(self, name) | |
fork_read, parent_write = IO.pipe | |
parent_read, fork_write = IO.pipe | |
pid = Process.fork do | |
begin | |
parent_write.close | |
parent_read.close | |
fork.complete!(Process.pid, fork_read, fork_write) | |
ForkPool.this!(fork) | |
yield(fork) | |
ensure | |
fork_write.close | |
fork_read.close | |
end | |
end | |
fork_write.close | |
fork_read.close | |
fork.complete!(pid, parent_read, parent_write) | |
@alive.add(fork) | |
@by_pid[pid] = fork | |
@by_name[name] = fork if name | |
fork | |
end | |
# The pids of all forks that are still alive. | |
# Performs a check on the status of each fork. This can be suppressed | |
# by passing true for the no_checking argument. | |
def running_pids(no_checking=nil) | |
check_status(*@alive) unless no_checking | |
@alive.map { |fork| fork.pid } | |
end | |
# The names of all forks that are still alive. | |
# Performs a check on the status of each fork. This can be suppressed | |
# by passing true for the no_checking argument. | |
def running_names(no_checking=nil) | |
check_status(*@alive) unless no_checking | |
@alive.map { |fork| fork.name }.compact # compact because some forks may not have a name | |
end | |
# Deletes all data associated with fork, and returns the fork | |
# raises a RuntimeError if you provide a pid of a still running fork | |
# process. | |
def forget!(fork) | |
raise "Fork is still running" unless @dead.include?(fork) | |
@by_pid.delete(fork.pid) | |
@by_name.delete(fork.name) | |
@alive.delete(fork) # just to be really sure | |
@dead.delete(fork) | |
fork | |
end | |
# Deletes the data of all dead forks. | |
# See ForkPool#forget! | |
def forget_all! | |
@dead.each do |fork| forget!(fork) end | |
end | |
# Inform the pool about the dead of a fork instance | |
# Used by wait and Fork#process_status | |
def death_notice(fork) # :nodoc: | |
@alive.delete(fork) | |
@dead.add(fork) | |
end | |
# Check & update the status of one or more forks | |
def check_status(*forks) # :nodoc: | |
forks.each do |fork| | |
_, exit_status = Process.wait2(fork.pid, Process::WNOHANG) | |
if exit_status then | |
fork.process_status!(exit_status) | |
death_notice(fork) | |
end | |
end | |
end | |
# Wait for one or more forks to terminate | |
# Example: | |
# pool = ForkPool.new | |
# start = Time.now | |
# a = pool.fork do sleep 5 end | |
# b = pool.fork do sleep 2 end | |
# c = pool.fork do sleep 3 end | |
# pool.wait(b,c) | |
# (Time.now-start).floor # => 3 | |
def wait(*forks) | |
forks.each do |fork| | |
_, exit_status = Process.wait2(fork.pid) | |
fork.process_status!(exit_status) | |
death_notice(fork) | |
end | |
end | |
# Wait for all forks to terminate | |
# Example: | |
# pool = ForkPool.new | |
# start = Time.now | |
# pool.fork do sleep 5 end | |
# pool.fork do sleep 2 end | |
# pool.fork do sleep 3 end | |
# pool.wait_all | |
# (Time.now-start).floor # => 5 | |
def wait_all | |
wait(*@alive) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment