Skip to content

Instantly share code, notes, and snippets.

@cpuguy83
Last active December 19, 2015 11:09
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cpuguy83/5945217 to your computer and use it in GitHub Desktop.
Save cpuguy83/5945217 to your computer and use it in GitHub Desktop.
Working out connection pooling for Mongoid. Right now in Mongoid every thread gets a new connection, and is not closing those connections when the thread is complete.
require 'thread'
require 'thread_safe'
require 'monitor'
module Mongoid
def with_session(&block)
Sessions.with_session(&block)
end
module Config
option :session_pool_size, :default => 5
option :session_checkout_timeout, :default => 5
option :session_reap_frequency, :default => 3
end
module Sessions
class << self
def synchronize(&block)
@lock ||= Mutex.new
@lock.synchronize(&block)
end
def session_pool(name=:default)
if !@session_pool || !@session_pool[name]
synchronize do
@session_pool ||= {}
@session_pool[name] ||= Pool.new(
:size => Config.session_pool_size,
:name => name,
:checkout_timeout => Config.session_checkout_timeout,
:reap_frequency => Config.session_reap_frequency)
end
end
@session_pool[name]
end
def disconnect
synchronize do
session_pool.each {|s| s.disconnect}
@session_pool = nil
end
end
def with_name(name)
session_pool(name).session_for_thread(Thread.current) ||
session_pool(name).checkout
end
def with_session(name=:default)
yield
ensure
reap_current_session(name)
end
private
def reap_current_session(name, thread = Thread.current)
session_pool(name).checkin_from_thread thread
true
end
end
class Pool
class Queue
def initialize(lock=Monitor.new)
@lock = lock
@cond = @lock.new_cond
@num_waiting = 0
@queue = []
end
def any_waiting?
synchronize do
@num_waiting > 0
end
end
def num_waiting
synchronize do
@num_waiting
end
end
def add(session)
synchronize do
@queue.push session
@cond.signal
end
end
def remove
synchronize do
@queue.shift
end
end
def poll(timeout = nil)
synchronize do
if timeout
no_wait_poll || wait_poll(timeout)
else
no_wait_poll
end
end
end
def count
@queue.count
end
private
def synchronize(&block)
@lock.synchronize(&block)
end
def any?
!@queue.empty?
end
def can_remove_no_wait?
@queue.size > @num_waiting
end
def no_wait_poll
remove if can_remove_no_wait?
end
def wait_poll(timeout)
@num_waiting += 1
t0 = Time.now
elapsed = 0
loop do
@cond.wait(timeout - elapsed)
return remove if any?
elapsed = Time.now - t0
if elapsed >= timeout
msg = 'Timed out waiting for database session'
raise ConnectionTimeoutError, msg
end
end
ensure
@num_waiting -= 1
end
end
include MonitorMixin
attr_reader :sessions, :size, :reaper, :reserved_sessions, :available
def initialize(opts={})
super()
opts[:name] ||= :default
@reaper = Reaper.new(opts[:reap_frequency] || 10, self)
@reaper.run
@checkout_timeout = opts[:checkout_timeout] || 5
@size = opts[:size] || 5
@name = opts[:name] || :default
@sessions = []
@reserved_sessions = ThreadSafe::Cache.new(:initial_capacity => @size)
@available = Queue.new self
end
def checkout
unless (session = session_for_thread(Thread.current))
synchronize do
session = get_session
reserve(session)
end
end
session
end
# Returns a session back to the available pool
def checkin(session)
synchronize do
@available.add session
release(session)
end
end
def checkin_from_thread(thread)
checkin @reserved_sessions[thread]
end
def count
@available.count
end
def reap
@reserved_sessions.keys.each do |thread|
session = @reserved_sessions[thread]
checkin(session) if thread.stop?
end
end
def session_for_thread(thread)
@reserved_sessions[thread]
end
private
def reserve(session)
@reserved_sessions[current_session_id] = session
end
def current_session_id
Thread.current
end
def release(session)
thread = if @reserved_sessions[current_session_id] == session
current_session_id
else
@reserved_sessions.keys.find do |k|
@reserved_sessions[k] == session
end
end
@reserved_sessions.delete thread if thread
end
def get_session
if session = @available.poll
session
elsif @sessions.size < @size
checkout_new_session
else
@available.poll(@checkout_timeout)
end
end
def checkout_new_session
session = new_session
@sessions << session
session
end
def new_session
Factory.create(@name)
end
class ConnectionTimeoutError < StandardError; end
def create_new_session
Factory.create(@name)
end
class Reaper
attr_reader :pool
attr_reader :frequency
def initialize(frequency, pool)
@frequency = frequency
@pool = pool
end
def run
return unless frequency
Thread.new(frequency, pool) do |t, p|
while true
sleep t
p.reap
end
end
end
end
end
end
end
@cpuguy83
Copy link
Author

cpuguy83 commented Jul 7, 2013

Still need to do some investigating on Mongoid internals and work out how to return a session back to the pool once a thread is completed

@cpuguy83
Copy link
Author

cpuguy83 commented Jul 8, 2013

It looks like this is working in my Rails app, at least in basic testing from the console. Totally taken over the session creation and everything from what's currently in Mongoid

@cpuguy83
Copy link
Author

cpuguy83 commented Jul 8, 2013

@cpuguy83
Copy link
Author

cpuguy83 commented Jul 9, 2013

Updated to use a reaper thread for potentially unused connections. Also now supports using Mongoid's named sessions

@cpuguy83
Copy link
Author

@zaidakram
Copy link

@cpuguy63 I used you patch. But no help still got this error. PrimitiveFailure (FFI::Pointer#primitive_write_int primitive failed). Any pointers? Wasn't able to test moped master as you suggested. Dont know if i can get mongoid to use moped edge.. Any thoughts on How can I use moped master with mongoid? Thanks

@cpuguy83
Copy link
Author

@zaidakram Had a couple of bugs in this. Updated it.
I have this running in production just fine.

@cpuguy83
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment