Last active
December 19, 2015 11:09
-
-
Save cpuguy83/5945217 to your computer and use it in GitHub Desktop.
Working out connection pooling for Mongoid. Right now in Mongoid every thread gets a new connection, and is not closing those connections when the thread is complete.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'thread' | |
require 'thread_safe' | |
require 'monitor' | |
module Mongoid | |
def with_session(&block) | |
Sessions.with_session(&block) | |
end | |
module Config | |
option :session_pool_size, :default => 5 | |
option :session_checkout_timeout, :default => 5 | |
option :session_reap_frequency, :default => 3 | |
end | |
module Sessions | |
class << self | |
def synchronize(&block) | |
@lock ||= Mutex.new | |
@lock.synchronize(&block) | |
end | |
def session_pool(name=:default) | |
if !@session_pool || !@session_pool[name] | |
synchronize do | |
@session_pool ||= {} | |
@session_pool[name] ||= Pool.new( | |
:size => Config.session_pool_size, | |
:name => name, | |
:checkout_timeout => Config.session_checkout_timeout, | |
:reap_frequency => Config.session_reap_frequency) | |
end | |
end | |
@session_pool[name] | |
end | |
def disconnect | |
synchronize do | |
session_pool.each {|s| s.disconnect} | |
@session_pool = nil | |
end | |
end | |
def with_name(name) | |
session_pool(name).session_for_thread(Thread.current) || | |
session_pool(name).checkout | |
end | |
def with_session(name=:default) | |
yield | |
ensure | |
reap_current_session(name) | |
end | |
private | |
def reap_current_session(name, thread = Thread.current) | |
session_pool(name).checkin_from_thread thread | |
true | |
end | |
end | |
class Pool | |
class Queue | |
def initialize(lock=Monitor.new) | |
@lock = lock | |
@cond = @lock.new_cond | |
@num_waiting = 0 | |
@queue = [] | |
end | |
def any_waiting? | |
synchronize do | |
@num_waiting > 0 | |
end | |
end | |
def num_waiting | |
synchronize do | |
@num_waiting | |
end | |
end | |
def add(session) | |
synchronize do | |
@queue.push session | |
@cond.signal | |
end | |
end | |
def remove | |
synchronize do | |
@queue.shift | |
end | |
end | |
def poll(timeout = nil) | |
synchronize do | |
if timeout | |
no_wait_poll || wait_poll(timeout) | |
else | |
no_wait_poll | |
end | |
end | |
end | |
def count | |
@queue.count | |
end | |
private | |
def synchronize(&block) | |
@lock.synchronize(&block) | |
end | |
def any? | |
!@queue.empty? | |
end | |
def can_remove_no_wait? | |
@queue.size > @num_waiting | |
end | |
def no_wait_poll | |
remove if can_remove_no_wait? | |
end | |
def wait_poll(timeout) | |
@num_waiting += 1 | |
t0 = Time.now | |
elapsed = 0 | |
loop do | |
@cond.wait(timeout - elapsed) | |
return remove if any? | |
elapsed = Time.now - t0 | |
if elapsed >= timeout | |
msg = 'Timed out waiting for database session' | |
raise ConnectionTimeoutError, msg | |
end | |
end | |
ensure | |
@num_waiting -= 1 | |
end | |
end | |
include MonitorMixin | |
attr_reader :sessions, :size, :reaper, :reserved_sessions, :available | |
def initialize(opts={}) | |
super() | |
opts[:name] ||= :default | |
@reaper = Reaper.new(opts[:reap_frequency] || 10, self) | |
@reaper.run | |
@checkout_timeout = opts[:checkout_timeout] || 5 | |
@size = opts[:size] || 5 | |
@name = opts[:name] || :default | |
@sessions = [] | |
@reserved_sessions = ThreadSafe::Cache.new(:initial_capacity => @size) | |
@available = Queue.new self | |
end | |
def checkout | |
unless (session = session_for_thread(Thread.current)) | |
synchronize do | |
session = get_session | |
reserve(session) | |
end | |
end | |
session | |
end | |
# Returns a session back to the available pool | |
def checkin(session) | |
synchronize do | |
@available.add session | |
release(session) | |
end | |
end | |
def checkin_from_thread(thread) | |
checkin @reserved_sessions[thread] | |
end | |
def count | |
@available.count | |
end | |
def reap | |
@reserved_sessions.keys.each do |thread| | |
session = @reserved_sessions[thread] | |
checkin(session) if thread.stop? | |
end | |
end | |
def session_for_thread(thread) | |
@reserved_sessions[thread] | |
end | |
private | |
def reserve(session) | |
@reserved_sessions[current_session_id] = session | |
end | |
def current_session_id | |
Thread.current | |
end | |
def release(session) | |
thread = if @reserved_sessions[current_session_id] == session | |
current_session_id | |
else | |
@reserved_sessions.keys.find do |k| | |
@reserved_sessions[k] == session | |
end | |
end | |
@reserved_sessions.delete thread if thread | |
end | |
def get_session | |
if session = @available.poll | |
session | |
elsif @sessions.size < @size | |
checkout_new_session | |
else | |
@available.poll(@checkout_timeout) | |
end | |
end | |
def checkout_new_session | |
session = new_session | |
@sessions << session | |
session | |
end | |
def new_session | |
Factory.create(@name) | |
end | |
class ConnectionTimeoutError < StandardError; end | |
def create_new_session | |
Factory.create(@name) | |
end | |
class Reaper | |
attr_reader :pool | |
attr_reader :frequency | |
def initialize(frequency, pool) | |
@frequency = frequency | |
@pool = pool | |
end | |
def run | |
return unless frequency | |
Thread.new(frequency, pool) do |t, p| | |
while true | |
sleep t | |
p.reap | |
end | |
end | |
end | |
end | |
end | |
end | |
end |
@cpuguy63 I used you patch. But no help still got this error. PrimitiveFailure (FFI::Pointer#primitive_write_int primitive failed). Any pointers? Wasn't able to test moped master as you suggested. Dont know if i can get mongoid to use moped edge.. Any thoughts on How can I use moped master with mongoid? Thanks
@zaidakram Had a couple of bugs in this. Updated it.
I have this running in production just fine.
Released as a gem: http://rubygems.org/gems/mongoid_connection_pool
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Updated to use a reaper thread for potentially unused connections. Also now supports using Mongoid's named sessions