Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@grantr
Created December 14, 2012 01:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save grantr/4281685 to your computer and use it in GitHub Desktop.
Save grantr/4281685 to your computer and use it in GitHub Desktop.
Celluloid ring pool example
require 'celluloid'
module Celluloid
class RingManager
include Celluloid
trap_exit :crash_handler
def initialize(worker_class, options = {})
@size = options[:size] || [Celluloid.cores, 2].max
raise ArgumentError, "minimum ring size is 2" if @size < 2
@worker_class = worker_class
@ring_hasher = options[:ring_hasher]
@args = options[:args] ? Array(options[:args]) : []
@workers = @size.times.collect { worker_class.new_link(*@args) }
end
def finalize
terminators = @workers.each do |actor|
begin
actor.future(:terminate) if actor
rescue DeadActorError, MailboxError
end
end
terminators.compact.each { |terminator| terminator.value rescue nil }
end
# Spawn a new worker for every crashed one
def crash_handler(actor, reason)
index = @workers.index(actor)
@workers[index] = nil
return unless reason
@workers[index] = @worker_class.new_link(*@args)
signal :respawn_complete
end
def _send_(method, *args, &block)
key = __get_ring_key(method, *args)
worker = __provision_worker(key)
begin
worker._send_ method, *args, &block
rescue DeadActorError # if we get a dead actor out of the pool
wait :respawn_complete
worker = __provision_worker(key)
retry
rescue Exception => ex
abort ex
end
end
def __get_ring_key(method, *args)
if @ring_hasher
@ring_hasher.call(method, *args)
else
[method, args.first] # default hash key is method, args.first
end
end
def __provision_worker(key)
#TODO use consistent hashing so adding and removing workers doesnt kill the whole ring.
# maybe workers can throw an exception that causes the ring to get larger?
@workers[key.hash % @size]
end
def name
_send_ @mailbox, :name
end
def is_a?(klass)
_send_ :is_a?, klass
end
def kind_of?(klass)
_send_ :kind_of?, klass
end
def methods(include_ancestors = true)
_send_ :methods, include_ancestors
end
def to_s
_send_ :to_s
end
def inspect
_send_ :inspect
end
def respond_to?(method)
super || @worker_class.instance_methods.include?(method.to_sym)
end
def method_missing(method, *args, &block)
if respond_to?(method)
_send_ method, *args, &block
else
super
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment