Skip to content

Instantly share code, notes, and snippets.

@tgxworld
Created August 18, 2018 08:27
Show Gist options
  • Save tgxworld/16ed66a59f822c0c817a9fec7b26aec5 to your computer and use it in GitHub Desktop.
Save tgxworld/16ed66a59f822c0c817a9fec7b26aec5 to your computer and use it in GitHub Desktop.
diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
index f721e91203..746e0cf8c1 100644
--- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
+++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
@@ -2,6 +2,7 @@
require "thread"
require "concurrent/map"
+require "concurrent/executor/timer_set"
require "monitor"
module ActiveRecord
@@ -285,30 +286,41 @@ def internal_poll(timeout)
# Configure the frequency by setting +reaping_frequency+ in your database
# yaml file (default 60 seconds).
class Reaper
- attr_reader :pool, :frequency
-
- def initialize(pool, frequency)
- @pool = pool
- @frequency = frequency
+ def initialize
+ @timer_set = Concurrent::TimerSet.new(
+ executor: Concurrent::FixedThreadPool.new(1)
+ )
end
def run
- return unless frequency && frequency > 0
- Thread.new(frequency, pool) { |t, p|
- loop do
- sleep t
- p.reap
- p.flush
- end
- }
+ Thread.new do
+ klass = ActiveRecord::ConnectionAdapters::ConnectionPool
+ ObjectSpace.each_object(klass) { |pool| queue_reaping(pool) }
+ end
end
+
+ private
+
+ def queue_reaping(pool)
+ @timer_set.post(pool.reaping_frequency) do
+ if !pool.connections.empty?
+ pool.reap
+ pool.flush
+ end
+
+ queue_reaping(pool)
+ end
+ end
end
include MonitorMixin
include QueryCache::ConnectionPoolConfiguration
attr_accessor :automatic_reconnect, :checkout_timeout, :schema_cache
- attr_reader :spec, :connections, :size, :reaper
+ attr_reader :spec, :connections, :size, :reaping_frequency
+
+ @@pools_to_reap = []
+ @@reaper = nil
# Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification
# object which describes database connection information (e.g. adapter,
@@ -321,14 +333,12 @@ def initialize(spec)
@spec = spec
- @checkout_timeout = (spec.config[:checkout_timeout] && spec.config[:checkout_timeout].to_f) || 5
- if @idle_timeout = spec.config.fetch(:idle_timeout, 300)
- @idle_timeout = @idle_timeout.to_f
- @idle_timeout = nil if @idle_timeout <= 0
- end
+ @checkout_timeout = spec.config.fetch(:checkout_timeout, 5).to_f
+ @idle_timeout = spec.config.fetch(:idle_timeout, 300).to_f
+ @idle_timeout = nil if @idle_timeout <= 0
# default max pool size to 5
- @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5
+ @size = spec.config.fetch(:pool, 5).to_i
# This variable tracks the cache of threads mapped to reserved connections, with the
# sole purpose of speeding up the +connection+ method. It is not the authoritative
@@ -358,9 +368,9 @@ def initialize(spec)
# +reaping_frequency+ is configurable mostly for historical reasons, but it could
# also be useful if someone wants a very low +idle_timeout+.
- reaping_frequency = spec.config.fetch(:reaping_frequency, 60)
- @reaper = Reaper.new(self, reaping_frequency && reaping_frequency.to_f)
- @reaper.run
+ @reaping_frequency = spec.config.fetch(:reaping_frequency, 1).to_f
+ @@pools_to_reap << self if reaping_frequency > 0
+ @@reaper ||= Reaper.new.run
end
def lock_thread=(lock_thread)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment