Skip to content

Instantly share code, notes, and snippets.

@unakatsuo
Created February 21, 2011 10:16
Show Gist options
  • Save unakatsuo/836894 to your computer and use it in GitHub Desktop.
Save unakatsuo/836894 to your computer and use it in GitHub Desktop.
test implementation of unlimited list iterator plus concurrency.
EM.run {
wq = EM::WorkQueue.new(10)
rqueue = []
push_loop = proc {
wq.push do |sig|
rqueue << 1
EM.add_timer(rand(10) / 100.0){
sig.return
}
end if !wq.closed?
EM.add_timer(rand(10) / 100.0, &push_loop)
}
EM.next_tick {
push_loop.call
}
EM.add_timer(2.0) {
wq.close {
puts "wq.close: #{rqueue.size}"
EM.add_timer(0.5) {
EM.stop
}
}
}
}
require 'eventmachine'
class EventMachine::WorkQueue
def initialize(concurrency=1)
@concurrency = concurrency
@closed = false
@workers = 0
@endjob_done = false
@wqueue = [] # worker queue
end
def push(&blk)
raise "Closed already" if closed?
@wqueue << blk
EM.next_tick {
dojob
}
end
alias :<< :push
alias :enq :push
# close the queue and do not accept new worker.
# the given block will be called after all worker is finished.
def close(&blk)
raise "already closed this work queue" if @closed
@closed = true
@on_close_hook = blk if blk
if @workers == 0 && @wqueue.empty?
endjob
end
end
def closed?
@closed
end
private
def dojob
return if @workers >= @concurrency || @wqueue.empty?
@workers += 1
w = @wqueue.shift
on_done = proc { |*args|
#raise RuntimeError, 'already completed this iteration' if is_done
@workers -= 1
if @closed && @wqueue.empty?
endjob
else
EM.next_tick {dojob}
end
}
class << on_done
alias :next :call
alias :return :call
end
case w.arity
when 0
w.call
on_done.call
when 1
w.call(on_done)
end
end
def endjob
return if @endjob_done
begin
@on_close_hook.call if @on_close_hook
ensure
@endjob_done = true
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment