Created
April 19, 2011 03:07
-
-
Save mperham/926741 to your computer and use it in GitHub Desktop.
More complex supervisor with worker pool example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'actor' | |
require 'rubinius_fix' | |
Ready = Struct.new(:this) | |
Work = Struct.new(:msg) | |
processor = Proc.new do |msg| | |
raise msg.to_s if msg % 7 == 0 | |
print "Doing some hard work for #{msg}, boss!\n" | |
end | |
@supervisor = Actor.spawn do | |
supervisor = Actor.current | |
work_loop = Proc.new do | |
loop do | |
work = Actor.receive | |
result = processor.call(work.msg) | |
supervisor << Ready[Actor.current] | |
end | |
end | |
Actor.trap_exit = true | |
ready_workers = [] | |
10.times do |x| | |
# start N workers | |
ready_workers << Actor.spawn_link(&work_loop) | |
end | |
loop do | |
Actor.receive do |f| | |
f.when(Ready) do |who| | |
# TODO | |
end | |
f.when(Work) do |work| | |
ready_workers.pop << work | |
end | |
f.when(Actor::DeadActorError) do |exit| | |
print "Actor exited with message: #{exit.reason}\n" | |
ready_workers << Actor.spawn_link(&work_loop) | |
end | |
end | |
end | |
end | |
10.times do |idx| | |
@supervisor << Work[idx] | |
end | |
sleep 1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
if RUBY_ENGINE == 'rbx' && Rubinius::VERSION < '1.2.4' | |
puts "Loading rubinius actor monkeypatches" | |
class Actor | |
# Monkeypatch so this works with Rubinius 1.2.3 (latest). | |
# 1.2.4 should have the necessary fix included. | |
def notify_exited(actor, reason) | |
exit_message = nil | |
@lock.receive | |
begin | |
return self unless @alive | |
@links.delete(actor) | |
if @trap_exit | |
exit_message = DeadActorError.new(actor, reason) | |
elsif reason | |
@interrupts << DeadActorError.new(actor, reason) | |
if @filter | |
@filter = nil | |
@ready << nil | |
end | |
end | |
ensure | |
@lock << nil | |
end | |
send exit_message if exit_message | |
self | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment