-
-
Save sferik/39831f34eb87686b639c to your computer and use it in GitHub Desktop.
module Enumerable | |
def first_to_finish | |
threads = collect { |args| Thread.new { yield(args) } } | |
loop until done = threads.detect { |t| !t.alive? } | |
threads.each(&:kill) | |
done.value | |
end | |
end | |
puts [5, 3, 1, 2, 4].first_to_finish { |x| sleep x } |
require "benchmark/ips" | |
require "bcrypt" | |
module Enumerable | |
def first_to_finish | |
threads = collect { |args| Thread.new { yield(args) } } | |
loop until done = threads.detect { |t| !t.alive? } | |
threads.each(&:kill) | |
done.value | |
end | |
def first_to_finish_with_queue | |
queue = Queue.new | |
threads = collect { |args| Thread.new { queue << yield(args) } } | |
result = queue.pop | |
threads.each(&:kill) | |
result | |
end | |
def get_first_result_async | |
result = nil | |
threads = map do |args| | |
Thread.new do | |
if current_result = yield(args) | |
result = current_result | |
(threads - [Thread.current]).each(&:kill) # kill siblings | |
end | |
end | |
end | |
threads.each(&:join) | |
result | |
end | |
end | |
COSTS = (10..15).to_a.reverse | |
def sferik | |
COSTS.first_to_finish { |cost| BCrypt::Password.create("secret", :cost => cost) } | |
end | |
def choonkeat | |
COSTS.first_to_finish_with_queue { |cost| BCrypt::Password.create("secret", :cost => cost) } | |
end | |
def juanito | |
COSTS.get_first_result_async { |cost| BCrypt::Password.create("secret", :cost => cost) } | |
end | |
Benchmark.ips do |x| | |
x.report("@sferik") { sferik } | |
x.report("@choonkeat") { choonkeat } | |
x.report("@JuanitoFatas") { juanito } | |
x.compare! | |
end |
cpu
for purpose where the threads are doing cpu intensive work, indeed we're probably soaked anyways, so you're right: giving up that nth bit of cpu is fine or rather, each thread is on their own core anyways. however for scenarios where threads are mostly waiting for io (e.g. fetch url) and this belongs to a bigger multitasking system (e.g. web app with several workers) then leaving the cpu idle is quite important
thanks for the benchmark rb, i'll try out a few things and reply!
@choonkeat That’s a fair point. In addition to being highly dependent on the number of CPUs/cores and the threading model, it is also highly sensitive to the workload. In my benchmark, I’m assuming a CPU-intensive workload but it’d worth writing a separate benchmark for an IO-intensive load. I suspect my implementation would perform somewhat worse under those conditions.
I’d also be curious to see an implementation written in a functional reactive style, where the threads are observable and an observer subscribes to them to get the first result. Maybe I’ll work on that now and add it to the benchmark.
@sferik i've borrowed your loop until
to address the threads
race condition, limiting the cpu hog https://gist.github.com/choonkeat/0c254b0c2efbc7237a2a#file-first_to_finish_benchmark-rb-L8 not as elegant, but works well even if the lambdas immediately return
def first_to_finish_with_idle
result = nil
threads = collect { |args| Thread.new {
loop until threads
result = yield(args)
(threads - [Thread.current]).each(&:kill)
} }
threads.each(&:join)
result
end
ok that was terrible. let's try again
using Queue
is promising https://gist.github.com/choonkeat/0c254b0c2efbc7237a2a#file-first_to_finish_benchmark-rb-L6-L12
def first_to_finish_with_queue
queue = Queue.new
threads = collect { |args| Thread.new { queue << yield(args) } }
result = queue.pop
threads.each(&:kill)
result
end
@choonkeat Using Queue
is lovely.
I’ve added your implementation to the benchmark. My implementation is about 50% faster than yours on JRuby but yours is about 30X faster than mine and @JuanitoFatas’s on CRuby. Again, the performance is highly workload-sensitive, so I suspect your implementation would beat mine on an IO-heavy load, even on JRuby. Very nice. 👏
In case anyone is curious, here’s the functional reactive implementation I came up with:
require "rx_ruby"
times = [5, 3, 1, 2, 4]
source = RxRuby::Observable.from(times).flat_map do |n|
RxRuby::Observable.of(n).delay(n)
end
subscription = source.subscribe { |s| puts s }
loop until Thread.list.size < times.size + 1
subscription.unsubscribe
I’m not an experienced RX programmer, so there’s a good chance I’m doing it completely wrong, but it was a fun exercise to try. Ultimately, I had to use the same loop until
hack that I used in the Enumerable
example, which was disappointing, but maybe someone can suggest a more elegant solution.
awesome that was fun @sferik 👏 👏 👏
btw since we're killing threads, I suspect those lambda arguments need to be wrapped with Thread.handle_interrupt
(or are they unrelated?)
folding the lessons here into choonkeat/attache#27 thanks guys
I’ve added a benchmark to the Gist, which computes hashes using
bcrypt
, an algorithm designed to be CPU-intensive, with variable cost. I ran it on my quad-core 2.6 GHz Intel Core i7 under both CRuby 2.3.0-preview2 and JRuby 9.0.4.0. The results of the benchmark have extremely high variance, as you might expect when working with non-deterministic code. On CRuby, the two implementations were comparable, on JRuby my version was many times faster. On a machines with more CPUs/cores, I think the benefits of my implementation would be even more pronounced. I’d encourage you to run the benchmark to see the results for yourself.Since my implementation has comparable performance on CRuby, better performance on JRuby, doesn’t have a race condition or issue with
false
returns, and the code is more readable (less block nesting), I’d say it’s better in the general case.