Skip to content

Instantly share code, notes, and snippets.

@sferik
Last active May 5, 2017 05:51
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save sferik/39831f34eb87686b639c to your computer and use it in GitHub Desktop.
Save sferik/39831f34eb87686b639c to your computer and use it in GitHub Desktop.
module Enumerable
def first_to_finish
threads = collect { |args| Thread.new { yield(args) } }
loop until done = threads.detect { |t| !t.alive? }
threads.each(&:kill)
done.value
end
end
puts [5, 3, 1, 2, 4].first_to_finish { |x| sleep x }
require "benchmark/ips"
require "bcrypt"
module Enumerable
def first_to_finish
threads = collect { |args| Thread.new { yield(args) } }
loop until done = threads.detect { |t| !t.alive? }
threads.each(&:kill)
done.value
end
def first_to_finish_with_queue
queue = Queue.new
threads = collect { |args| Thread.new { queue << yield(args) } }
result = queue.pop
threads.each(&:kill)
result
end
def get_first_result_async
result = nil
threads = map do |args|
Thread.new do
if current_result = yield(args)
result = current_result
(threads - [Thread.current]).each(&:kill) # kill siblings
end
end
end
threads.each(&:join)
result
end
end
COSTS = (10..15).to_a.reverse
def sferik
COSTS.first_to_finish { |cost| BCrypt::Password.create("secret", :cost => cost) }
end
def choonkeat
COSTS.first_to_finish_with_queue { |cost| BCrypt::Password.create("secret", :cost => cost) }
end
def juanito
COSTS.get_first_result_async { |cost| BCrypt::Password.create("secret", :cost => cost) }
end
Benchmark.ips do |x|
x.report("@sferik") { sferik }
x.report("@choonkeat") { choonkeat }
x.report("@JuanitoFatas") { juanito }
x.compare!
end
@sferik
Copy link
Author

sferik commented Dec 18, 2015

In the process of benchmarking both versions , I found a race condition in @JuanitoFatas’s implementation on this line:

(threads - [Thread.current]).each(&:kill) # kill siblings

There’s a possibility that one thread finishes before last thread is spawned, in which case the threads variable will be nil, causing the main thread to crash.

My implementation manages to avoid this issue.

@sferik
Copy link
Author

sferik commented Dec 18, 2015

I’ve added a benchmark to the Gist, which computes hashes using bcrypt, an algorithm designed to be CPU-intensive, with variable cost. I ran it on my quad-core 2.6 GHz Intel Core i7 under both CRuby 2.3.0-preview2 and JRuby 9.0.4.0. The results of the benchmark have extremely high variance, as you might expect when working with non-deterministic code. On CRuby, the two implementations were comparable, on JRuby my version was many times faster. On a machines with more CPUs/cores, I think the benefits of my implementation would be even more pronounced. I’d encourage you to run the benchmark to see the results for yourself.

Since my implementation has comparable performance on CRuby, better performance on JRuby, doesn’t have a race condition or issue with false returns, and the code is more readable (less block nesting), I’d say it’s better in the general case.

@choonkeat
Copy link

cpu

for purpose where the threads are doing cpu intensive work, indeed we're probably soaked anyways, so you're right: giving up that nth bit of cpu is fine or rather, each thread is on their own core anyways. however for scenarios where threads are mostly waiting for io (e.g. fetch url) and this belongs to a bigger multitasking system (e.g. web app with several workers) then leaving the cpu idle is quite important

thanks for the benchmark rb, i'll try out a few things and reply!

@sferik
Copy link
Author

sferik commented Dec 18, 2015

@choonkeat That’s a fair point. In addition to being highly dependent on the number of CPUs/cores and the threading model, it is also highly sensitive to the workload. In my benchmark, I’m assuming a CPU-intensive workload but it’d worth writing a separate benchmark for an IO-intensive load. I suspect my implementation would perform somewhat worse under those conditions.

I’d also be curious to see an implementation written in a functional reactive style, where the threads are observable and an observer subscribes to them to get the first result. Maybe I’ll work on that now and add it to the benchmark.

@choonkeat
Copy link

@sferik i've borrowed your loop until to address the threads race condition, limiting the cpu hog https://gist.github.com/choonkeat/0c254b0c2efbc7237a2a#file-first_to_finish_benchmark-rb-L8 not as elegant, but works well even if the lambdas immediately return

def first_to_finish_with_idle
  result = nil
  threads = collect { |args| Thread.new {
    loop until threads
    result = yield(args)
    (threads - [Thread.current]).each(&:kill)
  } }
  threads.each(&:join)
  result
end

@choonkeat
Copy link

ok that was terrible. let's try again

@choonkeat
Copy link

using Queue is promising https://gist.github.com/choonkeat/0c254b0c2efbc7237a2a#file-first_to_finish_benchmark-rb-L6-L12

def first_to_finish_with_queue
  queue = Queue.new
  threads = collect { |args| Thread.new { queue << yield(args) } }
  result = queue.pop
  threads.each(&:kill)
  result
end

@sferik
Copy link
Author

sferik commented Dec 18, 2015

@choonkeat Using Queue is lovely.

I’ve added your implementation to the benchmark. My implementation is about 50% faster than yours on JRuby but yours is about 30X faster than mine and @JuanitoFatas’s on CRuby. Again, the performance is highly workload-sensitive, so I suspect your implementation would beat mine on an IO-heavy load, even on JRuby. Very nice. 👏

@sferik
Copy link
Author

sferik commented Dec 18, 2015

In case anyone is curious, here’s the functional reactive implementation I came up with:

require "rx_ruby"

times = [5, 3, 1, 2, 4]

source = RxRuby::Observable.from(times).flat_map do |n|
  RxRuby::Observable.of(n).delay(n)
end

subscription = source.subscribe { |s| puts s }
loop until Thread.list.size < times.size + 1
subscription.unsubscribe

I’m not an experienced RX programmer, so there’s a good chance I’m doing it completely wrong, but it was a fun exercise to try. Ultimately, I had to use the same loop until hack that I used in the Enumerable example, which was disappointing, but maybe someone can suggest a more elegant solution.

@choonkeat
Copy link

awesome that was fun @sferik 👏 👏 👏

btw since we're killing threads, I suspect those lambda arguments need to be wrapped with Thread.handle_interrupt (or are they unrelated?)

folding the lessons here into choonkeat/attache#27 thanks guys

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment