Skip to content

Instantly share code, notes, and snippets.

@huacnlee
Created November 15, 2010 08:36
Show Gist options
  • Save huacnlee/700167 to your computer and use it in GitHub Desktop.
Save huacnlee/700167 to your computer and use it in GitHub Desktop.
Ruby 线程池类,用于控制同时最大允许多少个线程执行,并加入列队
require 'thread'
class ThreadPool
class Worker
def initialize
@mutex = Mutex.new
@thread = Thread.new do
while true
sleep 0.001
block = get_block
if block
block.call
reset_block
end
end
end
end
def get_block
@mutex.synchronize {@block}
end
def set_block(block)
@mutex.synchronize do
raise RuntimeError, "Thread already busy." if @block
@block = block
end
end
def reset_block
@mutex.synchronize {@block = nil}
end
def busy?
@mutex.synchronize {!@block.nil?}
end
end
attr_accessor :max_size
attr_reader :workers
def initialize(max_size = 10)
@max_size = max_size
@workers = []
@mutex = Mutex.new
end
def size
@mutex.synchronize {@workers.size}
end
def busy?
@mutex.synchronize {@workers.any? {|w| w.busy?}}
end
def join
sleep 0.01 while busy?
end
def process(&block)
while true
@mutex.synchronize do
worker = find_available_worker
if worker
return worker.set_block(block)
end
end
sleep 0.01
end
end
def wait_for_worker
while true
worker = find_available_worker
return worker if worker
sleep 0.01
end
end
def find_available_worker
free_worker || create_worker
end
def free_worker
@workers.each {|w| return w unless w.busy?}; nil
end
def create_worker
return nil if @workers.size >= @max_size
worker = Worker.new
@workers << worker
worker
end
end
@huacnlee
Copy link
Author

例如:
pool = ThreadPool.new(10) # 最大10个线程
30.times do |i|
pool.process {
puts "#{i} running..."
sleep(rand(10))
puts "#{i} done."
}
end

@huacnlee
Copy link
Author

这个有 Bug, 线程会堵塞

@helps
Copy link

helps commented Feb 24, 2013

...有比较好的gems 吗?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment