Skip to content

Instantly share code, notes, and snippets.

@greneholt
Last active June 3, 2020 19:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save greneholt/7585166 to your computer and use it in GitHub Desktop.
Save greneholt/7585166 to your computer and use it in GitHub Desktop.
Ruby threaded task manager
class Threader
def initialize(max_threads = 50)
@max_threads = max_threads
@wait_queue = Queue.new
@tasks = []
@running_tasks = Set.new
@done_tasks = Set.new
end
# Add a task.
def task(&block)
raise "block required" unless block_given?
@tasks << block
assign_tasks_to_threads
end
# Wait for all the tasks to finish, and return an array of the results of all the tasks.
def finish
until @tasks.empty? && @running_tasks.empty?
wait_next
assign_tasks_to_threads
end
end
private
def wait_next
raise "no threads" if @running_tasks.empty?
task = @wait_queue.pop
@running_tasks.delete(task)
end
def run_task(task)
@running_tasks << task
Thread.new do
task.call
@wait_queue << task
end
end
def assign_tasks_to_threads
# Assign as many tasks as there are now free threads.
[(@max_threads - @running_tasks.length), @tasks.length].min.times do
task = @tasks.pop
run_task(task)
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment