Skip to content

Instantly share code, notes, and snippets.

@henrik
Last active February 2, 2021 16:55
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save henrik/57cbc4e9c6a59edf0e39a83d619c3cac to your computer and use it in GitHub Desktop.
Save henrik/57cbc4e9c6a59edf0e39a83d619c3cac to your computer and use it in GitHub Desktop.
Run a block on a list of things in a limited number of concurrent threads. Mostly for the fun of it – there are more featureful libs like https://github.com/grosser/parallel.
# Lets you call a block for each item in a list, just like `each`.
# But instead of running serially, it runs in a limited number of parallel threads.
# This is useful when you don't just want one thread per item, e.g. to avoid rate limiting or network saturation.
class EachInThreadPool
def self.call(inputs, pool_size:, &block)
queue = Queue.new
inputs.each { queue << _1 }
pool_size.times.map {
Thread.new do
Thread.current.abort_on_exception = true
loop do
item = queue.pop(_raise_if_empty = true)
block.call(item)
rescue ThreadError => e
e.message == "queue empty" ? break : raise
end
end
}.each(&:join)
end
end
# This implementation is nicer in some ways, but I really dislike the sleep.
class EachInThreadPool
def self.call(inputs, pool_size:, &block)
queue = Queue.new
inputs.each { queue << _1 }
threads = pool_size.times.map {
Thread.new do
Thread.current.abort_on_exception = true
loop do
block.call(queue.pop)
end
end
}
sleep 0.01 while queue.num_waiting < pool_size
threads.each(&:exit)
end
end
require "spec_helper"
require "each_in_thread_pool"
RSpec.describe EachInThreadPool, ".call" do
it "performs the given work" do
list = (1..100).to_a
result = []
EachInThreadPool.call(list, pool_size: 10) do |i|
result << i
end
expect(result.sort).to eq(list)
end
it "uses no more than the given number of threads" do
list = (1..100).to_a
thread_ids = Set.new
EachInThreadPool.call(list, pool_size: 15) do
thread_ids << Thread.current.object_id
# Slow down execution to avoid a single thread finishing all the work before the other threads get a chance.
sleep 0.01
end
expect(thread_ids.length).to eq(15)
end
it "aborts on exception" do
list = [ -> {
Thread.current.report_on_exception = false # Don't print error output.
raise "boom"
} ]
expect {
EachInThreadPool.call(list, pool_size: 10, &:call)
}.to raise_error("boom")
end
it "aborts on ThreadErrors that don't represent an empty queue" do
list = [ -> {
Thread.current.report_on_exception = false # Don't print error output.
raise ThreadError, "fraying badly"
} ]
expect {
EachInThreadPool.call(list, pool_size: 10, &:call)
}.to raise_error(ThreadError)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment