Skip to content

Instantly share code, notes, and snippets.

@cmsd2
Created October 9, 2012 11:40
Show Gist options
  • Save cmsd2/3858204 to your computer and use it in GitHub Desktop.
Save cmsd2/3858204 to your computer and use it in GitHub Desktop.
async and message passing channels for ruby
require 'thread'
class Counter
def initialize(start = nil)
@next = start || 0
@mutex = Mutex.new
end
def next
n = nil
@mutex.synchronize {
@next += 1
n = @next
}
n
end
@@global_sequence = Counter.new
class << self
def next
@@global_sequence.next
end
end
end
class TimeoutException < StandardError; end
class MessageBox
def initialize
@messages = []
@mutex = Mutex.new
@condition = ConditionVariable.new
end
def receive(timeout = nil)
started_at = Time.now.to_f if timeout
have_result = false
result = nil
@mutex.synchronize do
while !have_result
p "checking #{@messages.size} messages"
@messages = @messages.inject([]) do |a, m|
if !have_result
if !block_given? || yield(m)
result = m
have_result = true
p "found result"
else
a << m
end
else
a << m
end
a
end
return result if have_result
if timeout
now_at = Time.now.to_f
timeout -= (now_at - started_at)
started_at = now_at
if timeout <= 0
raise TimeoutException
end
end
p "waiting #{timeout || 'forever'}"
@condition.wait(@mutex, timeout)
p "woke up"
end
end
result
end
def post(m)
i = nil
@mutex.synchronize do
@messages << m
i = @messages.size
@condition.signal
end
i
end
end
class Thread
def message_box
@message_box ||= MessageBox.new
end
end
class Actor < Thread
def initialize(target = nil)
@target = target || self
super { self.message_loop }
end
def post(args)
message_box.post(args)
end
def receive(timeout = nil)
message_box.receive(timeout) do |m|
!block_given? || yield(m)
end
end
def call(*args)
seq = Counter.next
from = Thread.current.message_box
post [from, seq, args]
reply = from.receive { |f,s,a| s == seq }
reply.last
end
def message_loop
while true
m = self.receive
p "handling #{m.inspect}"
handle_message m
end
rescue StandardError => e
puts "error: #{e}\n" + e.backtrace.join("\n")
end
protected
def handle_message(m)
if m.size == 3
from, seq, args = m
elsif m.size == 2
from, args = m
elsif m.size == 1
args = m.first
end
puts "handling msg #{seq}: #{args.inspect}\n"
args = args.is_a?(Symbol) ? [args] : args
method = args.shift
result = @target.send method, *args
if result && from
if seq
from.post([self, seq, result])
else
from.post([self, result])
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment