Skip to content

Instantly share code, notes, and snippets.

@rkh
Created March 14, 2012 15:05
Show Gist options
  • Save rkh/2037092 to your computer and use it in GitHub Desktop.
Save rkh/2037092 to your computer and use it in GitHub Desktop.
require 'thread'
class Worker
def initialize(count = 1)
@queue, @closing, @threads, @mutex = Queue.new, false, [], Mutex.new
add_worker(count)
end
def add_worker(count = 1)
@mutex.synchronize do
@threads += count.times.map { Thread.new { @queue.pop.call until @closing } }
end
end
def run(block = Proc.new)
@queue << block
end
def close(&block)
run do
@closing = true
yield if block_given?
wakeup
end
end
def join
@threads.each(&:join)
end
private
def wakeup
run { wakeup if @queue.num_waiting > 0 }
end
end
@rkh
Copy link
Author

rkh commented Mar 14, 2012

right

@meineerde
Copy link

If you have more than one worker, only one of them will shutdown, as the @closing will only be set once. You probably need to remember your worker threads and schedule close on each of them directly (or at least schedule close worker_threads.count times.

@rkh
Copy link
Author

rkh commented Mar 14, 2012

No, @closing is local to the worker instance which can launch more than one thread, but it will be true in all threads.

@meineerde
Copy link

Meh, of course... But then I wouldn't call @queue.clear to have something like a soft-stop and to not lose already scheduled jobs.

@rkh
Copy link
Author

rkh commented Mar 14, 2012

But that rather depends on your use case.

@Burgestrand
Copy link

I have an old piece of code that looks awfully similar to this, but uses throw/catch instead of the @closing thing: http://burgestrand.se/code/ruby-thread-pool/thread-pool.rb (I partly did it as an experiment with rocco)

It does not support adding more workers, however.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment