Skip to content

Instantly share code, notes, and snippets.

@takuma-saito
Last active May 3, 2020 10:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save takuma-saito/688d0e7f8abbb942342834efe746a31b to your computer and use it in GitHub Desktop.
Save takuma-saito/688d0e7f8abbb942342834efe746a31b to your computer and use it in GitHub Desktop.
connection_pool.rb
class ConnectionPool
attr_reader :conns
def initialize(max, &block)
@mutex = Mutex.new # マルチスレッドからの排他的ロック
@cv = ConditionVariable.new
@max = max
@que = []
@conns = []
@create_block = block
@created = 0
end
def with_conn
conn = self.pop
yield conn
self.push(conn)
end
def push(conn)
puts "finish #{conn.name}"
synchronize do
@que.push(conn.lease)
@cv.broadcast
end
end
def pop
synchronize do
loop do
return borrow(@que.pop) unless @que.empty?
conn = fetch_conn
return borrow(conn) if conn
@cv.wait(@mutex)
end
end
end
def borrow(conn)
puts "start #{conn.name}"
conn.borrow
end
def reap!
synchronize {
stale_conns =
@conns.select {|conn| conn.owner.nil? || !conn.owner.alive? }
return if stale_conns.empty?
puts "reaping connections: #{stale_conns.map(&:name).join(",")}"
@created -= stale_conns.size
stale_conns.each {|c| @conns.delete(c)}
}
end
def synchronize
@mutex.synchronize { yield }
end
def fetch_conn
if @created < @max
@created += 1
conn = @create_block.call
@conns << conn
conn
end
end
end
class Connection
attr_reader :owner, :name
def initialize(name)
@owner = nil
@name = name
end
def borrow
@owner = Thread.current
self
end
def lease
@owner = nil
self
end
end
class Reaper
def initialize(cp, freq)
@cp = cp
@freq = freq
end
def run
Thread.new do
running = true
while running
sleep @freq
@cp.reap!
running = false if @cp.conns.empty?
end
end
end
end
def run_jobs
count = 0
cp = ConnectionPool.new(5) do
Connection.new "conn:#{count += 1}"
end
@mutex = Mutex.new
@total_time = 0
10.times.map do |i|
Thread.new do
total_time = 0
idle_time = 0
5.times do |j|
t1 = Time.now
cp.with_conn do |conn|
idle_time += Time.now - t1
time = (rand * 3)
@mutex.synchronize { @total_time += time }
total_time += time
sleep time
puts "[client:#{i}] job:%2d #{conn.name} %2.2f %2.2f" % [i*10+j, time, @total_time]
end
end
puts "DONE: [client:#{i}] %2.2f %2.2f %2.2f" % [
total_time,
idle_time,
(total_time / (total_time + idle_time)) * 100
]
end
end.each(&:join)
end
def run_reap
count = 0
cp = ConnectionPool.new(5) do
Connection.new "conn:#{count += 1}"
end
Reaper.new(cp, 1.0).run
t1 = Thread.new do
conn = cp.pop
puts "start: t1, conn => #{conn.name}"
sleep 3.0
puts "finish: t1"
# push しないで離脱
end
t2 = Thread.new do
cp.with_conn do |conn|
sleep 5.0
end
end
[t1, t2].each(&:join)
end
# run_reap
run_jobs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment