Skip to content

Instantly share code, notes, and snippets.

@mperham
Created April 19, 2011 03:07
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mperham/926741 to your computer and use it in GitHub Desktop.
Save mperham/926741 to your computer and use it in GitHub Desktop.
More complex supervisor with worker pool example
require 'actor'
require 'rubinius_fix'
Ready = Struct.new(:this)
Work = Struct.new(:msg)
processor = Proc.new do |msg|
raise msg.to_s if msg % 7 == 0
print "Doing some hard work for #{msg}, boss!\n"
end
@supervisor = Actor.spawn do
supervisor = Actor.current
work_loop = Proc.new do
loop do
work = Actor.receive
result = processor.call(work.msg)
supervisor << Ready[Actor.current]
end
end
Actor.trap_exit = true
ready_workers = []
10.times do |x|
# start N workers
ready_workers << Actor.spawn_link(&work_loop)
end
loop do
Actor.receive do |f|
f.when(Ready) do |who|
# TODO
end
f.when(Work) do |work|
ready_workers.pop << work
end
f.when(Actor::DeadActorError) do |exit|
print "Actor exited with message: #{exit.reason}\n"
ready_workers << Actor.spawn_link(&work_loop)
end
end
end
end
10.times do |idx|
@supervisor << Work[idx]
end
sleep 1
if RUBY_ENGINE == 'rbx' && Rubinius::VERSION < '1.2.4'
puts "Loading rubinius actor monkeypatches"
class Actor
# Monkeypatch so this works with Rubinius 1.2.3 (latest).
# 1.2.4 should have the necessary fix included.
def notify_exited(actor, reason)
exit_message = nil
@lock.receive
begin
return self unless @alive
@links.delete(actor)
if @trap_exit
exit_message = DeadActorError.new(actor, reason)
elsif reason
@interrupts << DeadActorError.new(actor, reason)
if @filter
@filter = nil
@ready << nil
end
end
ensure
@lock << nil
end
send exit_message if exit_message
self
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment