Skip to content

Instantly share code, notes, and snippets.

@betawaffle
Created October 24, 2013 12:16
Show Gist options
  • Save betawaffle/7136173 to your computer and use it in GitHub Desktop.
Save betawaffle/7136173 to your computer and use it in GitHub Desktop.
Ruby Actors
require 'fiber'
require 'eventmachine'
module Actor
class << self
def current
Mailbox.current
end
end
module_function
def spawn(mod, fun, *args)
mailbox = Mailbox.new
mailbox.extend CoreMethods, mod, DefaultMethods
Fiber.new do
mailbox.claim
mailbox.__send__(fun, *args)
end
mailbox
end
end
module Actor
class Fiber < ::Fiber
def initialize
EventMachine.schedule do
super do
begin
yield
ensure
ex = $!
before_terminate(ex)
end
end
end
resume
end
def resume(immediately = false)
resume = lambda do
return unless alive?
begin
super
rescue => ex
ensure
ex ||= $!
after_terminate(ex) unless alive?
end
end
if immediately
EventMachine.schedule(resume)
else
EventMachine.next_tick(resume)
end
end
def transfer
# disabled for now
end
private
def after_terminate(error)
puts "after term: #{error}"
end
def before_terminate(error)
puts "before term: #{error}"
end
end
end
module Actor
class Mailbox
class << self
def current
Thread.current[Mailbox]
end
def register(name, mailbox = current)
EventMachine.schedule { @registered[name] = mailbox } if mailbox
end
def unregister(name)
EventMachine.schedule { @registered.delete(name) }
end
end
def initialize
@messages = Array.new
@waiting = Array.new
end
def claim
fiber = Fiber.current
@owner and raise 'mailbox already has an owner'
@owner = fiber
self
end
def loop
handle(receive) until @stop
end
def receive(pattern = ::BasicObject)
fiber = Fiber.current
if @owner and @owner != fiber
raise 'mailbox claimed by another actor'
end
index = 0
while true
until @messages.size > index
@waiting.push fiber
Fiber.yield
end
messages = @messages
if pattern === messages[index]
return messages.delete_at(index)
end
index += 1
end
end
def send(msg, resume_immediately = false)
EventMachine.schedule do
@messages << msg
if fiber = @waiting.shift
fiber.resume(resume_immediately)
end
end
self
end
end
end
module Actor
class Message
class << self
def [](*args)
new(*args)
end
def define(&block)
Class.new(self, &block)
end
end
def initialize(*args)
@args = args
@args.freeze
freeze
end
def inspect
"#<#{self.class} #{@args.inspect}>"
end
def to_a
@args
end
def to_s
inspect
end
end
end
module Actor
module DefaultMethods
private
def handle(msg)
puts "unexpected message: #{msg}"
end
end
end
module Actor
module CoreMethods
def init(*args)
super if defined? super
case msg = receive
when SystemMessage
handle_system_message(*msg)
else
handle(msg)
end until @stop
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment