Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@robhurring
Last active July 12, 2016 08:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save robhurring/76b4c9783475747be5aef2d775a823d4 to your computer and use it in GitHub Desktop.
Save robhurring/76b4c9783475747be5aef2d775a823d4 to your computer and use it in GitHub Desktop.
Lightweight actors in ruby
require 'thread'
module Actor
class << self
def included(base)
base.extend(ClassMethods)
end
def current
Thread.current[:actor]
end
end
module ClassMethods
def new(*)
Proxy.new(super)
end
end
class Proxy
attr_reader :outbox
def initialize(target)
@target = target
@mailbox = Queue.new
@outbox = Queue.new
@mutex = Mutex.new
@async_proxy = AsyncProxy.new(self)
@thread = Thread.new do
Thread.current[:actor] = self
Thread.abort_on_exception = true
process_inbox
end
end
def await
@thread.join
end
def future
Future.new(self)
end
def terminate
Thread.kill(@thread)
end
def alive?
@thread && @thread.alive?
end
def async
@async_proxy
end
def method_missing(sym, *args, &block)
process_message(sym, *args, &block)
end
def send_later(sym, *args, &block)
@mailbox << [sym, args, block]
end
private
def process_inbox
while Thread.current.alive?
sym, args, block = @mailbox.pop
process_message sym, *args, &block
end
rescue Exception => e
puts "[#{Actor.current}] Exception! #{e}"
raise
end
def process_message(sym, *args, outbox: nil, &block)
@mutex.synchronize do
result = @target.public_send(sym, *args, &block)
outbox.push(result) if outbox
end
end
end
class AsyncProxy
def initialize(actor)
@actor = actor
end
def method_missing(sym, *args, &block)
@actor.send_later(sym, *args, &block)
end
end
class Future
def initialize(actor)
@actor = actor
@mailbox = Queue.new
end
def value
if @mailbox.empty? && @last_value
@last_value
else
@last_value = @mailbox.pop
end
end
def method_missing(sym, *args, &block)
@mailbox.clear
args.push(outbox: @mailbox)
@actor.send_later(sym, *args, &block)
self
end
end
end
class Producer
include Actor
def initialize
@i = 0
@delay = 0.1
end
def produce(queue)
while Actor.current.alive?
item = "item-#{@i += 1}"
puts "[#{Actor.current}] produced - #{item} -- #{queue.size}"
queue << item
sleep @delay
end
end
end
class Consumer
include Actor
def initialize(name)
@name = name
@delay = rand(2)
end
def consume(queue)
while Actor.current.alive?
item = queue.pop
puts "[#{Actor.current}] consumer-#{@name} - consumed - #{item} -- #{queue.size}"
sleep @delay
end
end
end
class App
def initialize
@queue = SizedQueue.new(100)
end
def start(workers = 10)
monitor_signals
build_consumer_pool(workers)
start_producer
start_consumers
wait
end
def terminate
stop_producer
stop_consumers
end
private
attr_reader :queue
def monitor_signals
Signal.trap('INT') do
shutdown
end
Signal.trap('QUIT') do
shutdown
end
end
def shutdown
puts 'shutting down...'
terminate
exit 0
end
def wait
@producer.await
end
def start_producer
@producer = Producer.new
@producer.async.produce(queue)
end
def stop_producer
@producer.terminate
end
def build_consumer_pool(workers)
@consumers = workers.times.map do |i|
Consumer.new(i)
end
end
def start_consumers
@consumers.each do |consumer|
consumer.async.consume(queue)
end
end
def stop_consumers
@consumers.each(&:terminate)
end
end
App.new.start(2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment