Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
# frozen_string_literal: true
# SimpleThreadPool gives a simple implementation for producer/consumers pattern
# Example:
# pool =, max_queue: 5) do |user_params|
#, user_params)
# end
# users.each do |user|
# pool << user.update_api_params
# end
# pool.close
class SimpleThreadPool
# Initiates the pool and starts `num_threads` working threads.
# `max_queue` option allows blocking main thread from adding new items to the pool queue
# if the consumers process it slower than the producer generates.
# The main goal of this option is to limit the memory usage in case the number or size of items
# will get big. `max_queue` can be as low as 1, but recommended to keep it at least equal to `num_threads`.
def initialize(num_threads, max_queue: nil)
@num_threads = num_threads
@queue = max_queue ? :
@threads = do do
while (item = @queue.pop)
yield item
# Adds item to the processing queue, in case `max_queue` was passed to `new`, it will block if the queue already
# contains this number of items.
# nil is reserved for closed queue detection and is not allowed to be pushed to the queue manually
def <<(item)
raise ArgumentError, "Cannot process 'nil' item" if item.nil?
@queue << item
# Will close the queue and wait until all threads will finish processing current items in the queue
# Any further attempt to push to the queue with fail with exeption
def close
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment