Created
December 14, 2012 01:13
-
-
Save grantr/4281685 to your computer and use it in GitHub Desktop.
Celluloid ring pool example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'celluloid' | |
module Celluloid | |
class RingManager | |
include Celluloid | |
trap_exit :crash_handler | |
def initialize(worker_class, options = {}) | |
@size = options[:size] || [Celluloid.cores, 2].max | |
raise ArgumentError, "minimum ring size is 2" if @size < 2 | |
@worker_class = worker_class | |
@ring_hasher = options[:ring_hasher] | |
@args = options[:args] ? Array(options[:args]) : [] | |
@workers = @size.times.collect { worker_class.new_link(*@args) } | |
end | |
def finalize | |
terminators = @workers.each do |actor| | |
begin | |
actor.future(:terminate) if actor | |
rescue DeadActorError, MailboxError | |
end | |
end | |
terminators.compact.each { |terminator| terminator.value rescue nil } | |
end | |
# Spawn a new worker for every crashed one | |
def crash_handler(actor, reason) | |
index = @workers.index(actor) | |
@workers[index] = nil | |
return unless reason | |
@workers[index] = @worker_class.new_link(*@args) | |
signal :respawn_complete | |
end | |
def _send_(method, *args, &block) | |
key = __get_ring_key(method, *args) | |
worker = __provision_worker(key) | |
begin | |
worker._send_ method, *args, &block | |
rescue DeadActorError # if we get a dead actor out of the pool | |
wait :respawn_complete | |
worker = __provision_worker(key) | |
retry | |
rescue Exception => ex | |
abort ex | |
end | |
end | |
def __get_ring_key(method, *args) | |
if @ring_hasher | |
@ring_hasher.call(method, *args) | |
else | |
[method, args.first] # default hash key is method, args.first | |
end | |
end | |
def __provision_worker(key) | |
#TODO use consistent hashing so adding and removing workers doesnt kill the whole ring. | |
# maybe workers can throw an exception that causes the ring to get larger? | |
@workers[key.hash % @size] | |
end | |
def name | |
_send_ @mailbox, :name | |
end | |
def is_a?(klass) | |
_send_ :is_a?, klass | |
end | |
def kind_of?(klass) | |
_send_ :kind_of?, klass | |
end | |
def methods(include_ancestors = true) | |
_send_ :methods, include_ancestors | |
end | |
def to_s | |
_send_ :to_s | |
end | |
def inspect | |
_send_ :inspect | |
end | |
def respond_to?(method) | |
super || @worker_class.instance_methods.include?(method.to_sym) | |
end | |
def method_missing(method, *args, &block) | |
if respond_to?(method) | |
_send_ method, *args, &block | |
else | |
super | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment