Last active
August 29, 2015 14:13
-
-
Save kares/8440de10a5e814e879d2 to your computer and use it in GitHub Desktop.
celluloid InternalPool backport from master (due inproper synchronization) http://git.io/9NV23Q
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'celluloid/internal_pool' | |
# NOTE: this also avoids JRuby bug with ThreadGroup#list ! | |
# require 'celluloid' likely happened already : | |
if Celluloid.internal_pool | |
Celluloid.internal_pool.shutdown | |
Celluloid.internal_pool = nil | |
end | |
module Celluloid | |
# Maintain a thread pool FOR SPEED!! | |
class InternalPool | |
attr_accessor :max_idle | |
def initialize | |
@mutex = Mutex.new | |
@idle_threads = [] | |
@all_threads = [] | |
@busy_size = 0 | |
@idle_size = 0 | |
# TODO: should really adjust this based on usage | |
@max_idle = 16 | |
@running = true | |
end | |
def busy_size | |
@busy_size | |
end | |
def idle_size | |
@idle_size | |
end | |
def assert_running | |
raise Error, "Thread pool is not running" unless running? | |
end | |
def assert_inactive | |
return unless active? | |
message = "Thread pool is still active" | |
if defined?(JRUBY_VERSION) | |
Celluloid.logger.warn message | |
else | |
raise Error, message | |
end | |
end | |
def running? | |
@running | |
end | |
def active? | |
busy_size + idle_size > 0 | |
end | |
def each | |
to_a.each {|thread| yield thread } | |
end | |
def to_a | |
@mutex.synchronize { @all_threads.dup } | |
end | |
# Get a thread from the pool, running the given block | |
def get(&block) | |
@mutex.synchronize do | |
assert_running | |
begin | |
if @idle_threads.empty? | |
thread = create | |
else | |
thread = @idle_threads.pop | |
@idle_size = @idle_threads.length | |
end | |
end until thread.status # handle crashed threads | |
thread.busy = true | |
@busy_size += 1 | |
thread[:celluloid_queue] << block | |
thread | |
end | |
end | |
# Return a thread to the pool | |
def put(thread) | |
@mutex.synchronize do | |
thread.busy = false | |
if idle_size + 1 >= @max_idle | |
thread[:celluloid_queue] << nil | |
@busy_size -= 1 | |
@all_threads.delete(thread) | |
else | |
@idle_threads.push thread | |
@busy_size -= 1 | |
@idle_size = @idle_threads.length | |
clean_thread_locals(thread) | |
end | |
end | |
end | |
def shutdown | |
@mutex.synchronize do | |
finalize | |
@all_threads.each do |thread| | |
thread[:celluloid_queue] << nil | |
end | |
@all_threads.clear | |
@idle_threads.clear | |
@busy_size = 0 | |
@idle_size = 0 | |
end | |
end | |
def kill | |
@mutex.synchronize do | |
finalize | |
@running = false | |
@all_threads.shift.kill until @all_threads.empty? | |
@idle_threads.clear | |
@busy_size = 0 | |
@idle_size = 0 | |
end | |
end | |
private | |
# Create a new thread with an associated queue of procs to run | |
def create | |
queue = Queue.new | |
thread = Thread.new do | |
while proc = queue.pop | |
begin | |
proc.call | |
rescue => ex | |
Logger.crash("thread crashed", ex) | |
ensure | |
put thread | |
end | |
end | |
end | |
thread[:celluloid_queue] = queue | |
# @idle_threads << thread | |
@all_threads << thread | |
thread | |
end | |
# Clean the thread locals of an incoming thread | |
def clean_thread_locals(thread) | |
thread.keys.each do |key| | |
next if key == :celluloid_queue | |
# Ruby seems to lack an API for deleting thread locals. WTF, Ruby? | |
thread[key] = nil | |
end | |
end | |
def finalize | |
@max_idle = 0 | |
end | |
end | |
end | |
Celluloid.internal_pool = Celluloid::InternalPool.new unless Celluloid.internal_pool |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment