Skip to content

Instantly share code, notes, and snippets.

@neektza
Last active August 29, 2015 14:03
Show Gist options
  • Save neektza/6d4c14f433ae1ab09ed4 to your computer and use it in GitHub Desktop.
Save neektza/6d4c14f433ae1ab09ed4 to your computer and use it in GitHub Desktop.
Ruby concurrency blog post code snippets
module EventMachine
class Reactor
def run_timers
@timers.each do |t|
if t.first <= @current_loop_time
@timers.delete t
EventMachine::event_callback "", TimerFired, t.last
else
break
end
end
end
def run_heartbeats
if @next_heartbeat <= @current_loop_time
@next_heartbeat = @current_loop_time + HeartbeatInterval
@selectables.each {|k,io| io.heartbeat}
end
end
def crank_selectables
readers = @selectables.values.select {|io| io.select_for_reading?}
writers = @selectables.values.select {|io| io.select_for_writing?}
s = select( readers, writers, nil, @timer_quantum)
s and s[1] and s[1].each {|w| w.eventable_write }
s and s[0] and s[0].each {|r| r.eventable_read }
@selectables.delete_if do |k,io|
if io.close_scheduled?
io.close
true
end
end
end
end
end
module EventMachine
def self.run blk=nil, tail=nil, &block
end
end
module EventMachine
def self.defer op = nil, callback = nil, &blk
unless @threadpool
@threadpool = []
@threadqueue = ::Queue.new
@resultqueue = ::Queue.new
spawn_threadpool
end
@threadqueue << [op||blk,callback]
end
def self.spawn_threadpool
until @threadpool.size == @threadpool_size.to_i
thread = Thread.new do
Thread.current.abort_on_exception = true
while true
op, cback = *@threadqueue.pop
result = op.call
@resultqueue << [result, cback]
EventMachine.signal_loopbreak
end
end
@threadpool << thread
end
@all_threads_spawned = true
end
def self.run_deferred_callbacks
until (@resultqueue ||= []).empty?
result,cback = @resultqueue.pop
cback.call result if cback
end
size = @next_tick_mutex.synchronize { @next_tick_queue.size }
size.times do |i|
callback = @next_tick_mutex.synchronize { @next_tick_queue.shift }
begin
callback.call
ensure
EM.next_tick {} if $!
end
end
end
end
module EventMachine
class Reactor
# ...
def initialize_for_run
@running = false
@stop_scheduled = false
@selectables ||= {}; @selectables.clear
@timers = SortedSet.new
set_timer_quantum(0.1)
@current_loop_time = Time.now
@next_heartbeat = @current_loop_time + HeartbeatInterval
end
end
end
module EventMachine
class Reactor
# ...
def open_loopbreaker
@loopbreak_writer.close if @loopbreak_writer
@loopbreak_reader, @loopbreak_writer = IO.pipe
LoopbreakReader.new(@loopbreak_reader)
end
def close_loopbreaker
@loopbreak_writer.close
@loopbreak_writer = nil
end
def signal_loopbreak
@loopbreak_writer.write 'hey!' if @loopbreak_writer
end
end
end
module EventMachine
class Reactor
# ...
def run
raise Error.new( "already running" ) if @running
@running = true
begin
open_loopbreaker
loop do
@current_loop_time = Time.now
break if @stop_scheduled
run_timers
break if @stop_scheduled
crank_selectables
break if @stop_scheduled
run_heartbeats
end
ensure
close_loopbreaker
@selectables.each {|k, io| io.close}
@selectables.clear
@running = false
end
end
end
end
module EventMachine
class StreamObject < Selectable
# ...
def heartbeat
if @inactivity_timeout and @inactivity_timeout > 0
and (@last_activity + @inactivity_timeout) < Reactor.instance.current_loop_time
schedule_close true
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment