Skip to content

Instantly share code, notes, and snippets.

@rkh
Created March 14, 2012 15:05
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rkh/2037092 to your computer and use it in GitHub Desktop.
Save rkh/2037092 to your computer and use it in GitHub Desktop.
require 'thread'
class Worker
def initialize(count = 1)
@queue, @closing, @threads, @mutex = Queue.new, false, [], Mutex.new
add_worker(count)
end
def add_worker(count = 1)
@mutex.synchronize do
@threads += count.times.map { Thread.new { @queue.pop.call until @closing } }
end
end
def run(block = Proc.new)
@queue << block
end
def close(&block)
run do
@closing = true
yield if block_given?
wakeup
end
end
def join
@threads.each(&:join)
end
private
def wakeup
run { wakeup if @queue.num_waiting > 0 }
end
end
@judofyr
Copy link

judofyr commented Mar 14, 2012

You could probably avoid the busy-sleep in #wait by storing the Thread.current, call sleep and rather call thread.wakeup when you pop.

@rkh
Copy link
Author

rkh commented Mar 14, 2012

I just removed wait. Dunno what it did there.

@judofyr
Copy link

judofyr commented Mar 14, 2012

What are you trying to accomplish with run { @closing = true }?

@ryanlecompte
Copy link

Couldn't you replace run { @closing = true } with @closing = true since the running threads will see it automagically?

@rkh
Copy link
Author

rkh commented Mar 14, 2012

def wait
  cv, mutex = ConditionalVariable.new, Mutex.new
  mutex.synchronize do
    run { mutex.synchronize { cv.signal } }
    cv.wait(mutex)
  end
end

@ryanlecompte
Copy link

Actually nevermind, I see what it's necessary. :-)

@rkh
Copy link
Author

rkh commented Mar 14, 2012

That run { @closing = true } is to make sure that the block passed to close will run.

@rkh
Copy link
Author

rkh commented Mar 14, 2012

Note that wait really only makes sense with worker size 1.

@rkh
Copy link
Author

rkh commented Mar 14, 2012

I like to have one Worker for reading from an IO and one for writing to it, one for logging to stdout and one for doing all the work. I also like to set the thread count to 0 and increase it later, which turns it into something like a deferrable.

@ryanlecompte
Copy link

I guess if you really wanted your threads to exit gracefully (honor the until @closing part), you'd need to send that for all of your threads, not just once:

count.times { run { @closing = true } }

@rkh
Copy link
Author

rkh commented Mar 14, 2012

Why? I clear the queue anyways. I mean, there still is a race condition in there (i.e. someone queues after I cleared it but before I queue the close). After the first thread called the block, @closing will be true, no matter how often it is queued.

@ryanlecompte
Copy link

Right, but let's say you have 10 threads, and there is no work to be done but you want to close/exit gracefully. You'll only schedule one @closing = true, which means only one of your threads will return from the blocking @queue.pop method and realize @closing = true. Your other threads will continue blocking on that call waiting for more work. I think it only really matters if you have some cleanup (e.g., cleaning up file handles, etc) that you want each of your worker threads to do before the program exits.

@rkh
Copy link
Author

rkh commented Mar 14, 2012

right

@judofyr
Copy link

judofyr commented Mar 14, 2012

Exactly what are you using this for?

@rkh
Copy link
Author

rkh commented Mar 14, 2012

@ryanlecompte check out the dummy addition I made.

@rkh
Copy link
Author

rkh commented Mar 14, 2012

@judofyr just nonsens scripts atm

@rkh
Copy link
Author

rkh commented Mar 14, 2012

@judofyr just nonsens scripts atm

@judofyr
Copy link

judofyr commented Mar 14, 2012

Wouldn't it be better to do something like this to make sure the block is called?

def close
  run do
    @closing = true
    yield if block_given?
  end
end

@rkh
Copy link
Author

rkh commented Mar 14, 2012

right

@meineerde
Copy link

If you have more than one worker, only one of them will shutdown, as the @closing will only be set once. You probably need to remember your worker threads and schedule close on each of them directly (or at least schedule close worker_threads.count times.

@rkh
Copy link
Author

rkh commented Mar 14, 2012

No, @closing is local to the worker instance which can launch more than one thread, but it will be true in all threads.

@meineerde
Copy link

Meh, of course... But then I wouldn't call @queue.clear to have something like a soft-stop and to not lose already scheduled jobs.

@rkh
Copy link
Author

rkh commented Mar 14, 2012

But that rather depends on your use case.

@Burgestrand
Copy link

I have an old piece of code that looks awfully similar to this, but uses throw/catch instead of the @closing thing: http://burgestrand.se/code/ruby-thread-pool/thread-pool.rb (I partly did it as an experiment with rocco)

It does not support adding more workers, however.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment