Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
module Enumerable
def first_to_finish
threads = collect { |args| Thread.new { yield(args) } }
loop until done = threads.detect { |t| !t.alive? }
threads.each(&:kill)
done.value
end
end
puts [5, 3, 1, 2, 4].first_to_finish { |x| sleep x }
require "benchmark/ips"
require "bcrypt"
module Enumerable
def first_to_finish
threads = collect { |args| Thread.new { yield(args) } }
loop until done = threads.detect { |t| !t.alive? }
threads.each(&:kill)
done.value
end
def first_to_finish_with_queue
queue = Queue.new
threads = collect { |args| Thread.new { queue << yield(args) } }
result = queue.pop
threads.each(&:kill)
result
end
def get_first_result_async
result = nil
threads = map do |args|
Thread.new do
if current_result = yield(args)
result = current_result
(threads - [Thread.current]).each(&:kill) # kill siblings
end
end
end
threads.each(&:join)
result
end
end
COSTS = (10..15).to_a.reverse
def sferik
COSTS.first_to_finish { |cost| BCrypt::Password.create("secret", :cost => cost) }
end
def choonkeat
COSTS.first_to_finish_with_queue { |cost| BCrypt::Password.create("secret", :cost => cost) }
end
def juanito
COSTS.get_first_result_async { |cost| BCrypt::Password.create("secret", :cost => cost) }
end
Benchmark.ips do |x|
x.report("@sferik") { sferik }
x.report("@choonkeat") { choonkeat }
x.report("@JuanitoFatas") { juanito }
x.compare!
end
@JuanitoFatas

This comment has been minimized.

Copy link

commented Dec 17, 2015

My poor version:

module Enumerable
  def get_first_result_async
    result = nil
    threads = map do |args|
      Thread.new do
        if current_result = yield(args)
          result = current_result
          (threads - [Thread.current]).each(&:kill) # kill siblings
        end
      end
    end
    threads.each(&:join)
    result
  end
end
@sferik

This comment has been minimized.

Copy link
Owner Author

commented Dec 17, 2015

@JuanitoFatas The problem with your implementation is that it won’t work if the result is false. I’ve updated my version to send kill to each thread after the first result is found. I was relying on the garbage collector to cleanup the unreferenced threads after the method returned but that’s probably suboptimal.

@choonkeat

This comment has been minimized.

Copy link

commented Dec 18, 2015

thanks @sferik! sorry should've clarified that the lambas in each thread should be different, e.g. each one getting something from different servers

this looks elegant but doesn't it like keep the cpu busy looping?

loop until done = threads.detect { |t| !t.alive? }
threads.each(&:kill)

whereas the "kill sibling threads inside a thread" simply block and idle waiting for threads to join?

threads = map do |args|
  Thread.new do
    # something
    (threads - [Thread.current]).each(&:kill) # kill siblings
  end
end
threads.each(&:join)
@sferik

This comment has been minimized.

Copy link
Owner Author

commented Dec 18, 2015

this looks elegant but doesn't it like keep the cpu busy looping?

@choonkeat If there’s only one CPU/core, then this may not be the most efficient approach. However, in that case, there’s not much benefit in using multiple threads.

The efficiency depends a lot on the number of CPUs/cores and the threading model. This approach treats the main thread as the master, with n worker threads. With a native threading model, as in JRuby, the tight loop on the master thread should not significantly interfere with the performance of the worker threads (modulo confounding factors like dynamic frequency scaling technologies, such as Intel’s Turbo Boost or AMD’s Turbo Core). Even in CRuby, with a global VM lock, the main thread should only be scheduled on 1 / (n + 1) cycles.

There are some drawbacks of @JuanitoFatas’s approach. As I mentioned, it won’t work for blocks that return false. It also depends on threads aborting quickly, which they probably will, but if one thread gets stuck, it will block the method from returning. I’ll do some benchmarking now on JRuby and CRuby and report my findings below.

@sferik

This comment has been minimized.

Copy link
Owner Author

commented Dec 18, 2015

In the process of benchmarking both versions , I found a race condition in @JuanitoFatas’s implementation on this line:

(threads - [Thread.current]).each(&:kill) # kill siblings

There’s a possibility that one thread finishes before last thread is spawned, in which case the threads variable will be nil, causing the main thread to crash.

My implementation manages to avoid this issue.

@sferik

This comment has been minimized.

Copy link
Owner Author

commented Dec 18, 2015

I’ve added a benchmark to the Gist, which computes hashes using bcrypt, an algorithm designed to be CPU-intensive, with variable cost. I ran it on my quad-core 2.6 GHz Intel Core i7 under both CRuby 2.3.0-preview2 and JRuby 9.0.4.0. The results of the benchmark have extremely high variance, as you might expect when working with non-deterministic code. On CRuby, the two implementations were comparable, on JRuby my version was many times faster. On a machines with more CPUs/cores, I think the benefits of my implementation would be even more pronounced. I’d encourage you to run the benchmark to see the results for yourself.

Since my implementation has comparable performance on CRuby, better performance on JRuby, doesn’t have a race condition or issue with false returns, and the code is more readable (less block nesting), I’d say it’s better in the general case.

@choonkeat

This comment has been minimized.

Copy link

commented Dec 18, 2015

cpu

for purpose where the threads are doing cpu intensive work, indeed we're probably soaked anyways, so you're right: giving up that nth bit of cpu is fine or rather, each thread is on their own core anyways. however for scenarios where threads are mostly waiting for io (e.g. fetch url) and this belongs to a bigger multitasking system (e.g. web app with several workers) then leaving the cpu idle is quite important

thanks for the benchmark rb, i'll try out a few things and reply!

@sferik

This comment has been minimized.

Copy link
Owner Author

commented Dec 18, 2015

@choonkeat That’s a fair point. In addition to being highly dependent on the number of CPUs/cores and the threading model, it is also highly sensitive to the workload. In my benchmark, I’m assuming a CPU-intensive workload but it’d worth writing a separate benchmark for an IO-intensive load. I suspect my implementation would perform somewhat worse under those conditions.

I’d also be curious to see an implementation written in a functional reactive style, where the threads are observable and an observer subscribes to them to get the first result. Maybe I’ll work on that now and add it to the benchmark.

@choonkeat

This comment has been minimized.

Copy link

commented Dec 18, 2015

@sferik i've borrowed your loop until to address the threads race condition, limiting the cpu hog https://gist.github.com/choonkeat/0c254b0c2efbc7237a2a#file-first_to_finish_benchmark-rb-L8 not as elegant, but works well even if the lambdas immediately return

def first_to_finish_with_idle
  result = nil
  threads = collect { |args| Thread.new {
    loop until threads
    result = yield(args)
    (threads - [Thread.current]).each(&:kill)
  } }
  threads.each(&:join)
  result
end
@choonkeat

This comment has been minimized.

Copy link

commented Dec 18, 2015

ok that was terrible. let's try again

@choonkeat

This comment has been minimized.

Copy link

commented Dec 18, 2015

using Queue is promising https://gist.github.com/choonkeat/0c254b0c2efbc7237a2a#file-first_to_finish_benchmark-rb-L6-L12

def first_to_finish_with_queue
  queue = Queue.new
  threads = collect { |args| Thread.new { queue << yield(args) } }
  result = queue.pop
  threads.each(&:kill)
  result
end
@sferik

This comment has been minimized.

Copy link
Owner Author

commented Dec 18, 2015

@choonkeat Using Queue is lovely.

I’ve added your implementation to the benchmark. My implementation is about 50% faster than yours on JRuby but yours is about 30X faster than mine and @JuanitoFatas’s on CRuby. Again, the performance is highly workload-sensitive, so I suspect your implementation would beat mine on an IO-heavy load, even on JRuby. Very nice. 👏

@sferik

This comment has been minimized.

Copy link
Owner Author

commented Dec 18, 2015

In case anyone is curious, here’s the functional reactive implementation I came up with:

require "rx_ruby"

times = [5, 3, 1, 2, 4]

source = RxRuby::Observable.from(times).flat_map do |n|
  RxRuby::Observable.of(n).delay(n)
end

subscription = source.subscribe { |s| puts s }
loop until Thread.list.size < times.size + 1
subscription.unsubscribe

I’m not an experienced RX programmer, so there’s a good chance I’m doing it completely wrong, but it was a fun exercise to try. Ultimately, I had to use the same loop until hack that I used in the Enumerable example, which was disappointing, but maybe someone can suggest a more elegant solution.

@choonkeat

This comment has been minimized.

Copy link

commented Dec 18, 2015

awesome that was fun @sferik 👏 👏 👏

btw since we're killing threads, I suspect those lambda arguments need to be wrapped with Thread.handle_interrupt (or are they unrelated?)

folding the lessons here into choonkeat/attache#27 thanks guys

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.