Skip to content

Instantly share code, notes, and snippets.

View neektza's full-sized avatar

Nikica Jokic neektza

View GitHub Profile
module EventMachine
class StreamObject < Selectable
# ...
def heartbeat
if @inactivity_timeout and @inactivity_timeout > 0
and (@last_activity + @inactivity_timeout)
< Reactor.instance.current_loop_time
schedule_close true
end
end
module EventMachine
class Reactor
# ...
def open_loopbreaker
@loopbreak_writer.close if @loopbreak_writer
@loopbreak_reader, @loopbreak_writer = IO.pipe
LoopbreakReader.new(@loopbreak_reader)
end
def close_loopbreaker
@loopbreak_writer.close
module EventMachine
class Reactor
def run_timers
@timers.each do |t|
if t.first <= @current_loop_time
@timers.delete t
EventMachine::event_callback "", TimerFired, t.last
else
break
end
module EventMachine
class Reactor
def run
raise Error.new( "already running" ) if @running
@running = true
begin
open_loopbreaker
loop do
@current_loop_time = Time.now
break if @stop_scheduled
module EventMachine
class Reactor
include Singleton
def initialize_for_run
@running = false
@stop_scheduled = false
@selectables ||= {};@selectables.clear
@timers = SortedSet.new
set_timer_quantum(0.1)
@current_loop_time = Time.now
result = obj.public_send(@method, *@arguments, &_b)
@sender << result # @sender == sender's mailbox
module Celluloid
class Call
def dispatch(obj)
check(obj) # check if @method's arity matches
_b = @block && @block.to_proc
obj.public_send(@method, *@arguments, &_b)
end
end
end
class SyncProxy < AbstractProxy
def method_missing(meth, *args, &block)
# ... snip
call = ::Celluloid::Call::Sync.new(::Celluloid.mailbox, meth, args, block)
@mailbox << call
call.value
end
end
# Async method interception
class Cell
def initialize(subject, options, actor_options)
# ... bunch of other stuff ...
(options[:proxy_class] || Proxy::Cell).new(
@actor.mailbox, @actor.proxy, @subject.class.to_s)
end
end
module Celluloid
module ClassMethods
def new(*args, &block)
proxy = Cell.new(allocate, behavior_options, actor_options).proxy
proxy._send_(:initialize, *args, &block)
proxy
end
alias_method :spawn, :new
end
end