Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
# frozen_string_literal: true
# SimpleThreadPool gives a simple implementation for producer/consumers pattern
# Example:
#
# pool = SimpleThreadPool.new(10, max_queue: 5) do |user_params|
# Net::HTTP.post(UPDATE_API, user_params)
# end
# users.each do |user|
# pool << user.update_api_params
# end
# pool.close
class SimpleThreadPool
# Initiates the pool and starts `num_threads` working threads.
# `max_queue` option allows blocking main thread from adding new items to the pool queue
# if the consumers process it slower than the producer generates.
# The main goal of this option is to limit the memory usage in case the number or size of items
# will get big. `max_queue` can be as low as 1, but recommended to keep it at least equal to `num_threads`.
def initialize(num_threads, max_queue: nil)
@num_threads = num_threads
@queue = max_queue ? SizedQueue.new(max_queue) : Queue.new
@threads = @num_threads.times.map do
Thread.new do
while (item = @queue.pop)
yield item
end
end
end
end
# Adds item to the processing queue, in case `max_queue` was passed to `new`, it will block if the queue already
# contains this number of items.
# nil is reserved for closed queue detection and is not allowed to be pushed to the queue manually
def <<(item)
raise ArgumentError, "Cannot process 'nil' item" if item.nil?
@queue << item
end
# Will close the queue and wait until all threads will finish processing current items in the queue
# Any further attempt to push to the queue with fail with exeption
def close
@queue.close
@threads.each(&:join)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment