-
-
Save sferik/39831f34eb87686b639c to your computer and use it in GitHub Desktop.
module Enumerable | |
def first_to_finish | |
threads = collect { |args| Thread.new { yield(args) } } | |
loop until done = threads.detect { |t| !t.alive? } | |
threads.each(&:kill) | |
done.value | |
end | |
end | |
puts [5, 3, 1, 2, 4].first_to_finish { |x| sleep x } |
require "benchmark/ips" | |
require "bcrypt" | |
module Enumerable | |
def first_to_finish | |
threads = collect { |args| Thread.new { yield(args) } } | |
loop until done = threads.detect { |t| !t.alive? } | |
threads.each(&:kill) | |
done.value | |
end | |
def first_to_finish_with_queue | |
queue = Queue.new | |
threads = collect { |args| Thread.new { queue << yield(args) } } | |
result = queue.pop | |
threads.each(&:kill) | |
result | |
end | |
def get_first_result_async | |
result = nil | |
threads = map do |args| | |
Thread.new do | |
if current_result = yield(args) | |
result = current_result | |
(threads - [Thread.current]).each(&:kill) # kill siblings | |
end | |
end | |
end | |
threads.each(&:join) | |
result | |
end | |
end | |
COSTS = (10..15).to_a.reverse | |
def sferik | |
COSTS.first_to_finish { |cost| BCrypt::Password.create("secret", :cost => cost) } | |
end | |
def choonkeat | |
COSTS.first_to_finish_with_queue { |cost| BCrypt::Password.create("secret", :cost => cost) } | |
end | |
def juanito | |
COSTS.get_first_result_async { |cost| BCrypt::Password.create("secret", :cost => cost) } | |
end | |
Benchmark.ips do |x| | |
x.report("@sferik") { sferik } | |
x.report("@choonkeat") { choonkeat } | |
x.report("@JuanitoFatas") { juanito } | |
x.compare! | |
end |
@sferik i've borrowed your loop until
to address the threads
race condition, limiting the cpu hog https://gist.github.com/choonkeat/0c254b0c2efbc7237a2a#file-first_to_finish_benchmark-rb-L8 not as elegant, but works well even if the lambdas immediately return
def first_to_finish_with_idle
result = nil
threads = collect { |args| Thread.new {
loop until threads
result = yield(args)
(threads - [Thread.current]).each(&:kill)
} }
threads.each(&:join)
result
end
ok that was terrible. let's try again
using Queue
is promising https://gist.github.com/choonkeat/0c254b0c2efbc7237a2a#file-first_to_finish_benchmark-rb-L6-L12
def first_to_finish_with_queue
queue = Queue.new
threads = collect { |args| Thread.new { queue << yield(args) } }
result = queue.pop
threads.each(&:kill)
result
end
@choonkeat Using Queue
is lovely.
I’ve added your implementation to the benchmark. My implementation is about 50% faster than yours on JRuby but yours is about 30X faster than mine and @JuanitoFatas’s on CRuby. Again, the performance is highly workload-sensitive, so I suspect your implementation would beat mine on an IO-heavy load, even on JRuby. Very nice. 👏
In case anyone is curious, here’s the functional reactive implementation I came up with:
require "rx_ruby"
times = [5, 3, 1, 2, 4]
source = RxRuby::Observable.from(times).flat_map do |n|
RxRuby::Observable.of(n).delay(n)
end
subscription = source.subscribe { |s| puts s }
loop until Thread.list.size < times.size + 1
subscription.unsubscribe
I’m not an experienced RX programmer, so there’s a good chance I’m doing it completely wrong, but it was a fun exercise to try. Ultimately, I had to use the same loop until
hack that I used in the Enumerable
example, which was disappointing, but maybe someone can suggest a more elegant solution.
awesome that was fun @sferik 👏 👏 👏
btw since we're killing threads, I suspect those lambda arguments need to be wrapped with Thread.handle_interrupt
(or are they unrelated?)
folding the lessons here into choonkeat/attache#27 thanks guys
@choonkeat That’s a fair point. In addition to being highly dependent on the number of CPUs/cores and the threading model, it is also highly sensitive to the workload. In my benchmark, I’m assuming a CPU-intensive workload but it’d worth writing a separate benchmark for an IO-intensive load. I suspect my implementation would perform somewhat worse under those conditions.
I’d also be curious to see an implementation written in a functional reactive style, where the threads are observable and an observer subscribes to them to get the first result. Maybe I’ll work on that now and add it to the benchmark.