Skip to content

Instantly share code, notes, and snippets.

@apeiros
Created March 11, 2010 17:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save apeiros/329397 to your computer and use it in GitHub Desktop.
Save apeiros/329397 to your computer and use it in GitHub Desktop.
require 'set'
# Create and manage forks.
# Provides a basic facility to communicate between the forks and the parent
#
# Example:
# pool = ForkPool.new
# child = pool.fork do |parent|
# puts "child!" # same $stdout as parent
# parent.puts "From child to parent, I got pid #{$$}"
# puts "Parent wrote me: ", parent.gets
# end
# puts "Child with pid #{child.pid} wrote me: ", child.gets # communicate with a fork
# child.puts "Thank you for confirming your pid"
# pool.wait(child) # wait for one or more forks to terminate
# pool.fork :name => :a do sleep 5 end
# pool.fork :name => :b do sleep 2 end
# a = pool.by_name[:a]
# b = pool.by_name[:b] # get forks by names
# a.kill! # immediatly terminate a fork
# pool.wait_all # wait for all known forks to terminate
#
# See Process::fork, ForkPool::Fork
class ForkPool
class Fork
# The process id of the fork
attr_reader :pid
# The name of the fork (see ForkPool#fork's options)
attr_reader :name
# The fork will read data from here
attr_reader :parent_io
# Readable IO
attr_reader :in
# Writable IO
attr_reader :out
# Create a new Fork instance.
# You shouldn't do that manually, use ForkPool#fork instead.
def initialize(pool, name=nil)
@pool = pool
@name = name
@pid = nil
@process_status = nil
@in = nil
@out = nil
end
# Sets the name and io to communicate with the parent/child
def complete!(pid, readable_io, writable_io) # :nodoc:
@pid = pid
@in = readable_io
@out = writable_io
class <<self
undef_method :complete! # this method must only be called once
end
end
# Sets the exit status
def process_status!(status) # :nodoc:
@process_status = status
class <<self
undef_method :process_status! # this method must only be called once
end
end
# Process::Status for dead forks, nil for live forks
def process_status
@process_status ||= begin
_, exit_status = *Process.wait2(@pid, Process::WNOHANG)
@pool.death_notice(self)
exit_status
end
end
# The exit status of this fork.
# See Process::Status
def exit_status
process_status.exitstatus
end
# Whether this fork is still running (= is alive) or already exited.
def alive?
!exit_status
end
# Whether this fork is still running or already exited (= is dead).
def dead?
!!exit_status
end
# Read from the fork.
# See IO#gets
def gets(*args)
@in.gets(*args)
end
# Read from the fork.
# See IO#read
def read(*args)
@in.read(*args)
end
# Read from the fork.
# See IO#read_nonblock
def read_nonblock(*args)
@in.read_nonblock(*args)
end
# Write to the fork.
# See IO#puts
def puts(*args)
@out.puts(*args)
end
# Write to the fork.
# See IO#write
def write(*args)
@out.write(*args)
end
# Sends the (SIG)HUP signal to this fork.
# This is "gently asking the process to terminate".
# See Process::kill
def kill
Process.kill("HUP", @pid)
end
# Sends the (SIG)KILL signal to this fork.
# The process will be immediatly terminated.
# See Process::kill
def kill!
Process.kill("KILL", @pid)
end
# Sends the given signal to this fork
# See Process::kill
def signal(sig)
Process.kill(sig, @pid)
end
def dup # :nodoc:
raise TypeError, "can't dup ForkPool::Fork"
end
def clone # :nodoc:
raise TypeError, "can't clone ForkPool::Fork"
end
end
@this = nil
class <<self
# Set this process' fork
def this!(fork) # :nodoc:
@this = fork
end
# This process' fork (only set when you're in a forked process)
attr_reader :this
# When in a fork, this will return the name of the fork
def name
@this && @this.name
end
end
# A hash of all known forks by name
attr_reader :by_name
# A hash of all known forks by process id
attr_reader :by_pid
# A Set of all forks known to be dead
attr_reader :dead
# A Set of all forks assumed to be still running (you can verify by using
# ForkPool::Fork#alive?)
attr_reader :alive
# Create a new ForkPool.
# See ForkPool
def initialize
@alive = Set.new
@dead = Set.new
@by_name = {}
@by_pid = {}
end
# Create a new subprocess that is immediatly executed.
# options:
# :name:: An optional name for this fork, to facilitate management
#
# Example:
# pool = ForkPool.new
# child = pool.fork do |parent|
# puts "child!" # same $stdout as parent
# parent.puts "From child to parent, I got pid #{$$}"
# puts "Parent wrote me: ", parent.gets
# end
# puts "Child with pid #{child.pid} wrote me: ", child.gets
# child.puts "Thank you for confirming your pid"
#
# See Process::fork
def fork(opt=nil)
name = nil
if opt then
name = opt[:name]
raise TypeError, "Name must be a Symbol" unless name.is_a?(Symbol)
end
fork = Fork.new(self, name)
fork_read, parent_write = IO.pipe
parent_read, fork_write = IO.pipe
pid = Process.fork do
begin
parent_write.close
parent_read.close
fork.complete!(Process.pid, fork_read, fork_write)
ForkPool.this!(fork)
yield(fork)
ensure
fork_write.close
fork_read.close
end
end
fork_write.close
fork_read.close
fork.complete!(pid, parent_read, parent_write)
@alive.add(fork)
@by_pid[pid] = fork
@by_name[name] = fork if name
fork
end
# The pids of all forks that are still alive.
# Performs a check on the status of each fork. This can be suppressed
# by passing true for the no_checking argument.
def running_pids(no_checking=nil)
check_status(*@alive) unless no_checking
@alive.map { |fork| fork.pid }
end
# The names of all forks that are still alive.
# Performs a check on the status of each fork. This can be suppressed
# by passing true for the no_checking argument.
def running_names(no_checking=nil)
check_status(*@alive) unless no_checking
@alive.map { |fork| fork.name }.compact # compact because some forks may not have a name
end
# Deletes all data associated with fork, and returns the fork
# raises a RuntimeError if you provide a pid of a still running fork
# process.
def forget!(fork)
raise "Fork is still running" unless @dead.include?(fork)
@by_pid.delete(fork.pid)
@by_name.delete(fork.name)
@alive.delete(fork) # just to be really sure
@dead.delete(fork)
fork
end
# Deletes the data of all dead forks.
# See ForkPool#forget!
def forget_all!
@dead.each do |fork| forget!(fork) end
end
# Inform the pool about the dead of a fork instance
# Used by wait and Fork#process_status
def death_notice(fork) # :nodoc:
@alive.delete(fork)
@dead.add(fork)
end
# Check & update the status of one or more forks
def check_status(*forks) # :nodoc:
forks.each do |fork|
_, exit_status = Process.wait2(fork.pid, Process::WNOHANG)
if exit_status then
fork.process_status!(exit_status)
death_notice(fork)
end
end
end
# Wait for one or more forks to terminate
# Example:
# pool = ForkPool.new
# start = Time.now
# a = pool.fork do sleep 5 end
# b = pool.fork do sleep 2 end
# c = pool.fork do sleep 3 end
# pool.wait(b,c)
# (Time.now-start).floor # => 3
def wait(*forks)
forks.each do |fork|
_, exit_status = Process.wait2(fork.pid)
fork.process_status!(exit_status)
death_notice(fork)
end
end
# Wait for all forks to terminate
# Example:
# pool = ForkPool.new
# start = Time.now
# pool.fork do sleep 5 end
# pool.fork do sleep 2 end
# pool.fork do sleep 3 end
# pool.wait_all
# (Time.now-start).floor # => 5
def wait_all
wait(*@alive)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment