Skip to content

Instantly share code, notes, and snippets.

@sj26
Last active January 27, 2017 03:42
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sj26/2095122 to your computer and use it in GitHub Desktop.
Save sj26/2095122 to your computer and use it in GitHub Desktop.
Process an enumerator using a simple thread pool
require "etc"
require "thread"
Enumerator.class_eval do
# Process an enumerator using a simple thread pool
#
# `of:` is pool size, defaults to detecting number of CPUs
#
# Caveat: Enumerator#map may yield results out of order. Work is processed in
# order, but apparently #map concatenates the results however it likes. This
# might be okay, depending on use case. Enumerator#each works great, though!
#
# 1.upto(10).in_thread_pool.map { |i| puts "#{Thread.current}: #{i}"; i * i }
# #<Thread:0x007fbed45fa218>: 1
# #<Thread:0x007fbed45fa038>: 2
# #<Thread:0x007fbed45f9f20>: 3
# #<Thread:0x007fbed45f9e08>: 4
# #<Thread:0x007fbed45fa218>: 5
# #<Thread:0x007fbed45f9f20>: 6
# #<Thread:0x007fbed45f9e08>: 7
# #<Thread:0x007fbed45fa038>: 8
# #<Thread:0x007fbed45f9f20>: 9
# #<Thread:0x007fbed45fa038>: 10
# => [1, 9, 16, 4, 36, 64, 25, 49, 81]
#
def in_thread_pool(of: Etc.nprocessors)
return enum_for(:in_thread_pool, of: of) unless block_given?
queue = SizedQueue.new(1)
threads = Array.new(of) do
Thread.new do
# pop blocks, returns nil when queue closed
while values = queue.pop
yield(*values)
end
end
end
while true
begin
queue.push next_values
rescue StopIteration => stop
queue.close
threads.each(&:join)
return stop.result
end
end
end
end
require "etc"
require "enumerator_thread_poolinging"
RSpec.describe "Enumerator#in_thread_pool" do
it "processes each element" do
a = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
b = a.to_enum.in_thread_pool.map { |i| i * i }
expect(b).to match_array([1, 4, 9, 16, 25, 36, 49, 64, 81, 100])
end
it "uses specified number of threads" do
threads = 1.upto(100).to_enum.in_thread_pool(of: 1).map { Thread.current }
expect(threads.uniq.size).to eql(1)
threads = 1.upto(100).to_enum.in_thread_pool(of: 2).map { Thread.current }
expect(threads.uniq.size).to eql(2)
threads = 1.upto(100).to_enum.in_thread_pool(of: 3).map { Thread.current }
expect(threads.uniq.size).to eql(3)
end
it "uses the number of processors by default" do
threads = 1.upto(100).to_enum.in_thread_pool.map { Thread.current }
expect(threads.uniq.size).to eql(Etc.nprocessors)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment