Skip to content

Instantly share code, notes, and snippets.

@tomotaka
Created July 13, 2011 03:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tomotaka/1079635 to your computer and use it in GitHub Desktop.
Save tomotaka/1079635 to your computer and use it in GitHub Desktop.
offers concurrent execution method to Enumerable object
module ConcurrentExecutor
def concurrent_each(concurrency, queue_size, &proc)
require "thread"
queue = SizedQueue.new(queue_size)
queueing_thread = Thread.start{
each{|item| queue.enq([item, true]) }
concurrency.times{ queue.enq([nil, false]) } # send termination signal
}
workers = []
concurrency.times do
workers << Thread.start{
while true do
task, continue_flag = queue.deq()
break if !continue_flag
proc.call(task)
end
}
end
workers.each{|worker| worker.join }
queueing_thread.join
self
end
end
# Sample:
# class Array; include ConcurrentExecutor; end
# [1, 2, 3].concurrent_each(2, 10){|n| sleep(rand(10)); puts n }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment