-
-
Save johnthethird/2880034 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
###################### | |
# | |
# ActiveRecord's ConnectionPool in Rails 3.2.3 allows threads to 'steal' | |
# connections from each other, so some threads get starved out. | |
# | |
# This monkey patch uses an implementation from https://github.com/rails/rails/pull/6492 | |
# that ensures 'fair' queue in ConnectionPool. | |
# | |
# It's actually a weird hybrid which ALSO maintains the clear_stale_cached_connections! | |
# behavior to reclaim leaked orphaned connections, and calls that method | |
# in checkout when pool has no avail connections. | |
# | |
# This file referenced from an initializer in our main engine | |
# class, that loads it to monkey patch AR. | |
# | |
########################## | |
# give a backdoor to disable this patch | |
unless ENV["NO_AR_PATCH"] | |
# make sure it's there so we can monkey patch | |
require 'active_record' | |
ActiveRecord::ConnectionAdapters::ConnectionPool | |
# Unload it so we can redefine it completely | |
ActiveRecord::ConnectionAdapters.send(:remove_const, :ConnectionPool) | |
# Some require's our new definition will need | |
require 'thread' | |
require 'monitor' | |
require 'set' | |
require 'active_support/core_ext/module/deprecation' | |
# And completely redefine ConnectionPool | |
class ActiveRecord::ConnectionAdapters::ConnectionPool | |
# Threadsafe, fair, FIFO queue. Meant to be used by ConnectionPool | |
# with which it shares a Monitor. But could be a generic Queue. | |
# | |
# The Queue in stdlib's 'thread' could replace this class except | |
# stdlib's doesn't support waiting with a timeout. | |
class Queue | |
def initialize(lock = Monitor.new) | |
@lock = lock | |
@cond = @lock.new_cond | |
@num_waiting = 0 | |
@queue = [] | |
end | |
# Test if any threads are currently waiting on the queue. | |
def any_waiting? | |
synchronize do | |
@num_waiting > 0 | |
end | |
end | |
# Return the number of threads currently waiting on this | |
# queue. | |
def num_waiting | |
synchronize do | |
@num_waiting | |
end | |
end | |
# Add +element+ to the queue. Never blocks. | |
def add(element) | |
synchronize do | |
@queue.push element | |
@cond.signal | |
end | |
end | |
# If +element+ is in the queue, remove and return it, or nil. | |
def delete(element) | |
synchronize do | |
@queue.delete(element) | |
end | |
end | |
# Remove all elements from the queue. | |
def clear | |
synchronize do | |
@queue.clear | |
end | |
end | |
# Remove the head of the queue. | |
# | |
# If +timeout+ is not given, remove and return the head the | |
# queue if the number of available elements is strictly | |
# greater than the number of threads currently waiting (that | |
# is, don't jump ahead in line). Otherwise, return nil. | |
# | |
# If +timeout+ is given, block if it there is no element | |
# available, waiting up to +timeout+ seconds for an element to | |
# become available. | |
# | |
# Raises: | |
# - ConnectionTimeoutError if +timeout+ is given and no element | |
# becomes available after +timeout+ seconds, | |
def poll(timeout = nil) | |
synchronize do | |
if timeout | |
no_wait_poll || wait_poll(timeout) | |
else | |
no_wait_poll | |
end | |
end | |
end | |
def num_available | |
synchronize do | |
@queue.size | |
end | |
end | |
private | |
def synchronize(&block) | |
@lock.synchronize(&block) | |
end | |
# Test if the queue currently contains any elements. | |
def any? | |
!@queue.empty? | |
end | |
# A thread can remove an element from the queue without | |
# waiting if an only if the number of currently available | |
# connections is strictly greater than the number of waiting | |
# threads. | |
def can_remove_no_wait? | |
@queue.size > @num_waiting | |
end | |
# Removes and returns the head of the queue if possible, or nil. | |
def remove | |
@queue.shift | |
end | |
# Remove and return the head the queue if the number of | |
# available elements is strictly greater than the number of | |
# threads currently waiting. Otherwise, return nil. | |
def no_wait_poll | |
remove if can_remove_no_wait? | |
end | |
# Waits on the queue up to +timeout+ seconds, then removes and | |
# returns the head of the queue. | |
def wait_poll(timeout) | |
@num_waiting += 1 | |
t0 = Time.now | |
elapsed = 0 | |
loop do | |
@cond.wait(timeout - elapsed) | |
return remove if any? | |
elapsed = Time.now - t0 | |
raise ActiveRecord::ConnectionTimeoutError if elapsed >= timeout | |
end | |
ensure | |
@num_waiting -= 1 | |
end | |
end | |
# Every +frequency+ seconds, the reaper will call +reap+ on +pool+. | |
# A reaper instantiated with a nil frequency will never reap the | |
# connection pool. | |
# | |
# Configure the frequency by setting "reaping_frequency" in your | |
# database yaml file. | |
class Reaper | |
attr_reader :pool, :frequency | |
def initialize(pool, frequency) | |
@pool = pool | |
@frequency = frequency | |
end | |
def run | |
return unless frequency | |
Thread.new(frequency, pool) { |t, p| | |
while true | |
sleep t | |
p.reap | |
end | |
} | |
end | |
end | |
include MonitorMixin | |
attr_accessor :automatic_reconnect, :checkout_timeout, :dead_connection_timeout | |
attr_reader :spec, :connections, :size, :reaper | |
# Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification | |
# object which describes database connection information (e.g. adapter, | |
# host name, username, password, etc), as well as the maximum size for | |
# this ConnectionPool. | |
# | |
# The default ConnectionPool maximum size is 5. | |
def initialize(spec) | |
super() | |
@spec = spec | |
# The cache of reserved connections mapped to threads | |
@reserved_connections = {} | |
@checkout_timeout = spec.config[:checkout_timeout] || 5 | |
@dead_connection_timeout = spec.config[:dead_connection_timeout] | |
@reaper = Reaper.new self, spec.config[:reaping_frequency] | |
@reaper.run | |
# default max pool size to 5 | |
@size = (spec.config[:pool] && spec.config[:pool].to_i) || 5 | |
@connections = [] | |
@automatic_reconnect = true | |
@available = Queue.new self | |
end | |
# Hack for tests to be able to add connections. Do not call outside of tests | |
def insert_connection_for_test!(c) #:nodoc: | |
synchronize do | |
@connections << c | |
@available.add c | |
end | |
end | |
# Retrieve the connection associated with the current thread, or call | |
# #checkout to obtain one if necessary. | |
# | |
# #connection can be called any number of times; the connection is | |
# held in a hash keyed by the thread id. | |
def connection | |
synchronize do | |
@reserved_connections[current_connection_id] ||= checkout | |
end | |
end | |
# Is there an open connection that is being used for the current thread? | |
def active_connection? | |
synchronize do | |
@reserved_connections.fetch(current_connection_id) { | |
return false | |
}.in_use? | |
end | |
end | |
# Signal that the thread is finished with the current connection. | |
# #release_connection releases the connection-thread association | |
# and returns the connection to the pool. | |
def release_connection(with_id = current_connection_id) | |
synchronize do | |
conn = @reserved_connections.delete(with_id) | |
checkin conn if conn | |
end | |
end | |
# If a connection already exists yield it to the block. If no connection | |
# exists checkout a connection, yield it to the block, and checkin the | |
# connection when finished. | |
def with_connection | |
connection_id = current_connection_id | |
fresh_connection = true unless active_connection? | |
yield connection | |
ensure | |
release_connection(connection_id) if fresh_connection | |
end | |
# Returns true if a connection has already been opened. | |
def connected? | |
synchronize { @connections.any? } | |
end | |
# Disconnects all connections in the pool, and clears the pool. | |
def disconnect! | |
synchronize do | |
@reserved_connections = {} | |
@connections.each do |conn| | |
checkin conn | |
conn.disconnect! | |
end | |
@connections = [] | |
@available.clear | |
end | |
end | |
# Clears the cache which maps classes. | |
def clear_reloadable_connections! | |
synchronize do | |
@reserved_connections = {} | |
@connections.each do |conn| | |
checkin conn | |
conn.disconnect! if conn.requires_reloading? | |
end | |
@connections.delete_if do |conn| | |
conn.requires_reloading? | |
end | |
@available.clear | |
@connections.each do |conn| | |
@available.add conn | |
end | |
end | |
end | |
# clear_stale_cached imp from Rails 3.2, still using Threads. | |
# Yes, we've created a monster. | |
# Return any checked-out connections back to the pool by threads that | |
# are no longer alive. | |
def clear_stale_cached_connections! | |
keys = @reserved_connections.keys - Thread.list.find_all { |t| | |
t.alive? | |
}.map { |thread| thread.object_id } | |
keys.each do |key| | |
conn = @reserved_connections[key] | |
ActiveSupport::Deprecation.warn(<<-eowarn) if conn.in_use? | |
Database connections will not be closed automatically, please close your | |
database connection at the end of the thread by calling `close` on your | |
connection. For example: ActiveRecord::Base.connection.close | |
eowarn | |
checkin conn | |
@reserved_connections.delete(key) | |
end | |
end | |
# Check-out a database connection from the pool, indicating that you want | |
# to use it. You should call #checkin when you no longer need this. | |
# | |
# This is done by either returning and leasing existing connection, or by | |
# creating a new connection and leasing it. | |
# | |
# If all connections are leased and the pool is at capacity (meaning the | |
# number of currently leased connections is greater than or equal to the | |
# size limit set), an ActiveRecord::PoolFullError exception will be raised. | |
# | |
# Returns: an AbstractAdapter object. | |
# | |
# Raises: | |
# - PoolFullError: no connection can be obtained from the pool. | |
def checkout | |
synchronize do | |
conn = acquire_connection | |
conn.lease | |
checkout_and_verify(conn) | |
end | |
end | |
# Check-in a database connection back into the pool, indicating that you | |
# no longer need this connection. | |
# | |
# +conn+: an AbstractAdapter object, which was obtained by earlier by | |
# calling +checkout+ on this pool. | |
def checkin(conn) | |
synchronize do | |
conn.run_callbacks :checkin do | |
conn.expire | |
end | |
release conn | |
@available.add conn | |
end | |
end | |
# Remove a connection from the connection pool. The connection will | |
# remain open and active but will no longer be managed by this pool. | |
def remove(conn) | |
synchronize do | |
@connections.delete conn | |
@available.delete conn | |
# FIXME: we might want to store the key on the connection so that removing | |
# from the reserved hash will be a little easier. | |
release conn | |
@available.add checkout_new_connection if @available.any_waiting? | |
end | |
end | |
# Removes dead connections from the pool. A dead connection can occur | |
# if a programmer forgets to close a connection at the end of a thread | |
# or a thread dies unexpectedly. | |
def reap | |
synchronize do | |
stale = Time.now - @dead_connection_timeout | |
connections.dup.each do |conn| | |
remove conn if conn.in_use? && stale > conn.last_use && !conn.active? | |
end | |
end | |
end | |
private | |
# Acquire a connection by one of 1) immediately removing one | |
# from the queue of available connections, 2) creating a new | |
# connection if the pool is not at capacity, 3) waiting on the | |
# queue for a connection to become available. | |
# | |
# Raises: | |
# - PoolFullError if a connection could not be acquired (FIXME: | |
# why not ConnectionTimeoutError? | |
def acquire_connection | |
if conn = @available.poll | |
conn | |
elsif @connections.size < @size | |
checkout_new_connection | |
else | |
clear_stale_cached_connections! | |
t0 = Time.now | |
Rails.logger.info("POLLED_CHECKOUT: num avail connections: #{@available.num_available}; num waiting: #{@available.num_waiting}; total connections: #{@connections.size}") | |
begin | |
@available.poll(@checkout_timeout) | |
rescue ActiveRecord::ConnectionTimeoutError | |
msg = 'could not obtain a database connection within %0.3f seconds (waited %0.3f seconds)' % | |
[@checkout_timeout, Time.now - t0] | |
raise ActiveRecord::ConnectionTimeoutError, msg | |
end | |
end | |
end | |
def release(conn) | |
thread_id = if @reserved_connections[current_connection_id] == conn | |
current_connection_id | |
else | |
@reserved_connections.keys.find { |k| | |
@reserved_connections[k] == conn | |
} | |
end | |
@reserved_connections.delete thread_id if thread_id | |
end | |
def new_connection | |
ActiveRecord::Base.send(spec.adapter_method, spec.config) | |
end | |
def current_connection_id #:nodoc: | |
ActiveRecord::Base.connection_id ||= Thread.current.object_id | |
end | |
def checkout_new_connection | |
raise ConnectionNotEstablished unless @automatic_reconnect | |
c = new_connection | |
c.pool = self | |
@connections << c | |
c | |
end | |
def checkout_and_verify(c) | |
c.run_callbacks :checkout do | |
c.verify! | |
end | |
c | |
end | |
end | |
end | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment