Skip to content

Instantly share code, notes, and snippets.

@apeiros
Created August 26, 2010 21:10
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save apeiros/552264 to your computer and use it in GitHub Desktop.
Save apeiros/552264 to your computer and use it in GitHub Desktop.
Thread::Pool
require 'thread'
class Thread
# Example:
# pool = Thread::Pool.new(10) do |exception| handle(exception) end
# pool.execute(1,2,3) do |x,y,z| whatever(x,y,z) end
# pool.join
class Pool
# The default exception handler will just reraise the exception in the main
# thread
DefaultExceptionHandler = proc { |exception|
Thread.main.raise(exception)
}
# We must not rescue those exceptions in workers
PassthroughExceptions = [
::NoMemoryError,
::SignalException,
::Interrupt,
::SystemExit,
]
# The number of workers in the pool
attr_reader :size
# The exception handling proc
attr_reader :exception_handler
# Whether the pool is running
# A pool starts running right at #new
attr_reader :running
alias running? running
def initialize(size, &exception_handler)
@size = size # The desired workforce-size
@exception_handler = exception_handler || DefaultExceptionHandler # A callback to invoke when an exception occurs in a job
@jobs = Queue.new # jobs to process are pushed here and processed by the threads in @workers
@join = Queue.new # worker threads that received a nil (sign to stop accepting new jobs) push themselves here
@worker_mutex = Mutex.new # whenever @worker is changed, synchronize over this mutex
@size_mutex = Mutex.new # whenever @size is changed, synchronize over this mutex
@running = true # indicates whether this pool is still accepting new jobs
@workers = [] # the worker threads process incomming jobs
spawn_worker(size) # spawn the worker threads
end
# The number of jobs waiting for execution
def pending_jobs_count
@jobs.size
end
# Resize the pool
def size=(new_size)
@size_mutex.synchronize do
case @size <=> new_size
when -1 # increase number of workers
spawn_worker(new_size-@size)
when 1 # reduce number of workers
(@size-new_size).times do
@jobs << nil
end
else # do nothing
end
@size = new_size
end
end
# Executes the given block as soon as a worker is free
def execute(*args, &block)
raise "Can't execute on a dead pool" unless @running
@jobs << [block, args]
end
# Spins the pool down (it won't accept any new jobs to execute)
# and waits for all pending jobs to be completed
def join
@running = false
@size.times do @jobs << nil end
@size_mutex.synchronize do
@size.times do @join.shift end
end
self
end
# Kills all worker threads (using Thread#kill)
# You shouldn't ever use this if you're not absolutely sure
# that no data being currently processed will not be corrupted
# or not be needed after a kill.
def kill
@running = false
@workers.each do |worker| worker.kill end
self
end
def inspect # :nodoc:
sprintf "#<%p:0x%x size=%d pending=%d%s>",
self.class,
object_id << 1,
@size,
pending_jobs_count,
@running ? '' : ' dead'
end
private
# Registers `amount` new threads in @workers
# This method is thread-safe.
def spawn_worker(amount=1)
@worker_mutex.synchronize do
@workers.concat(Array.new(amount) { Thread.new(&method(:worker)) })
end
end
# The worker method is used in constructing a worker thread.
# It fetches jobs from the @jobs queue until a nil is passed.
# After that it'll terminate and push the Thread instance on
# @join
# In case of an exception it'll invoke the exception_handler
# and continue working if possible. If the exception handler
# terminates the worker thread, it ensures that a new thread
# is started to replace it.
# Before it exits, it removes itself from @workers
def worker
while job = @jobs.shift
begin
job.shift.call(*job)
rescue *PassthroughExceptions
raise # we should pass those on
rescue Exception => e # try to stay alive - it costs less
@exception_handler.call(e)
end
end
rescue *PassthroughExceptions
raise # we should pass those on
rescue Exception
spawn_worker
raise
else
@join << Thread.current
ensure
@worker_mutex.synchronize do
@workers.delete(Thread.current)
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment