-
-
Save sferik/39831f34eb87686b639c to your computer and use it in GitHub Desktop.
module Enumerable | |
def first_to_finish | |
threads = collect { |args| Thread.new { yield(args) } } | |
loop until done = threads.detect { |t| !t.alive? } | |
threads.each(&:kill) | |
done.value | |
end | |
end | |
puts [5, 3, 1, 2, 4].first_to_finish { |x| sleep x } |
require "benchmark/ips" | |
require "bcrypt" | |
module Enumerable | |
def first_to_finish | |
threads = collect { |args| Thread.new { yield(args) } } | |
loop until done = threads.detect { |t| !t.alive? } | |
threads.each(&:kill) | |
done.value | |
end | |
def first_to_finish_with_queue | |
queue = Queue.new | |
threads = collect { |args| Thread.new { queue << yield(args) } } | |
result = queue.pop | |
threads.each(&:kill) | |
result | |
end | |
def get_first_result_async | |
result = nil | |
threads = map do |args| | |
Thread.new do | |
if current_result = yield(args) | |
result = current_result | |
(threads - [Thread.current]).each(&:kill) # kill siblings | |
end | |
end | |
end | |
threads.each(&:join) | |
result | |
end | |
end | |
COSTS = (10..15).to_a.reverse | |
def sferik | |
COSTS.first_to_finish { |cost| BCrypt::Password.create("secret", :cost => cost) } | |
end | |
def choonkeat | |
COSTS.first_to_finish_with_queue { |cost| BCrypt::Password.create("secret", :cost => cost) } | |
end | |
def juanito | |
COSTS.get_first_result_async { |cost| BCrypt::Password.create("secret", :cost => cost) } | |
end | |
Benchmark.ips do |x| | |
x.report("@sferik") { sferik } | |
x.report("@choonkeat") { choonkeat } | |
x.report("@JuanitoFatas") { juanito } | |
x.compare! | |
end |
ok that was terrible. let's try again
using Queue
is promising https://gist.github.com/choonkeat/0c254b0c2efbc7237a2a#file-first_to_finish_benchmark-rb-L6-L12
def first_to_finish_with_queue
queue = Queue.new
threads = collect { |args| Thread.new { queue << yield(args) } }
result = queue.pop
threads.each(&:kill)
result
end
@choonkeat Using Queue
is lovely.
I’ve added your implementation to the benchmark. My implementation is about 50% faster than yours on JRuby but yours is about 30X faster than mine and @JuanitoFatas’s on CRuby. Again, the performance is highly workload-sensitive, so I suspect your implementation would beat mine on an IO-heavy load, even on JRuby. Very nice. 👏
In case anyone is curious, here’s the functional reactive implementation I came up with:
require "rx_ruby"
times = [5, 3, 1, 2, 4]
source = RxRuby::Observable.from(times).flat_map do |n|
RxRuby::Observable.of(n).delay(n)
end
subscription = source.subscribe { |s| puts s }
loop until Thread.list.size < times.size + 1
subscription.unsubscribe
I’m not an experienced RX programmer, so there’s a good chance I’m doing it completely wrong, but it was a fun exercise to try. Ultimately, I had to use the same loop until
hack that I used in the Enumerable
example, which was disappointing, but maybe someone can suggest a more elegant solution.
awesome that was fun @sferik 👏 👏 👏
btw since we're killing threads, I suspect those lambda arguments need to be wrapped with Thread.handle_interrupt
(or are they unrelated?)
folding the lessons here into choonkeat/attache#27 thanks guys
@sferik i've borrowed your
loop until
to address thethreads
race condition, limiting the cpu hog https://gist.github.com/choonkeat/0c254b0c2efbc7237a2a#file-first_to_finish_benchmark-rb-L8 not as elegant, but works well even if the lambdas immediately return