Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
# from https://hspazio.github.io/2017/worker-pool/
SIZED_QUEUE_SIZE = 10
############################################################################
def fib(n)
n < 2 ? n : fib(n-1) + fib(n-2)
end
############################################################################
class Worker
attr_reader :name
def initialize(name)
@name = name
@queue = SizedQueue.new(SIZED_QUEUE_SIZE)
@thread = Thread.new { perform }
end
def <<(job)
@queue << job
end
def jobs_count
@queue.size
end
def join
@thread.join
end
private
def perform
while (job = @queue.pop)
break if job == :done
job.call
puts "#{name} got #{job}" # only for debugging
end
end
end
############################################################################
class LeastBusyFirstScheduler
def initialize(workers)
@workers = workers
end
def schedule(job)
worker = @workers.sort_by(&:jobs_count).first
worker << job
end
end
############################################################################
class WorkerPool
def initialize(num_workers, scheduler_factory)
@workers = Array.new(num_workers) { |n| Worker.new("worker_#{n}") }
@scheduler = scheduler_factory.new(@workers)
end
def <<(job)
if job == :done
@workers.map { |worker| worker << :done }
else
@scheduler.schedule(job)
end
end
def wait
@workers.map(&:join)
end
end
############################################################################
pool = WorkerPool.new(5, LeastBusyFirstScheduler)
results = {}
Thread.new do
30.times { |n| pool << -> { results[n] = fib(n+20) } }
pool << :done
end
pool.wait
puts results
@knugie

This comment has been minimized.

Copy link
Owner Author

knugie commented Apr 19, 2018

CRuby (GIL):
$ rvm 2.5.1
$ ruby worker_pool.rb
image

JRuby (real threads):
$ rvm jruby-9.1.7.0
$ ruby worker_pool.rb
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.