Skip to content

Instantly share code, notes, and snippets.

@bawNg
Last active December 25, 2015 20:59
Show Gist options
  • Save bawNg/7039438 to your computer and use it in GitHub Desktop.
Save bawNg/7039438 to your computer and use it in GitHub Desktop.
Patch for the mongo-1.9.2 driver which includes fiber based connection pool support
class EventMachine::Synchrony::MongoTimeoutHandler
def self.timeout(op_timeout, ex_class, &block)
f = Fiber.current
timer = EM::Timer.new(op_timeout) { f.resume(nil) }
res = block.call
timer.cancel
res
end
end
old_verbose = $VERBOSE
begin
$VERBOSE = nil
module Mongo
class MongoClient
ConditionVariable = ::EventMachine::Synchrony::Thread::ConditionVariable
Timeout = ::EventMachine::Synchrony::MongoTimeoutHandler
end
class Pool
TCPSocket = ::EventMachine::Synchrony::TCPSocket
Mutex = ::EventMachine::Synchrony::Thread::Mutex
ConditionVariable = ::EventMachine::Synchrony::Thread::ConditionVariable
def checkout_new_socket
begin
socket = @client.socket_class.new(@host, @port, @client.op_timeout,
@client.connect_timeout,
@client.socket_opts)
socket.pool = self
rescue => ex
socket.close if socket
@node.close if @node
raise ConnectionFailure, "Failed to connect to host #{@host} and port #{@port}: #{ex}"
end
@client.apply_saved_authentication(:socket => socket)
@sockets << socket
@checked_out << socket
@thread_ids_to_sockets[Fiber.current.object_id] = socket
socket
end
def checkout_existing_socket(socket=nil)
if !socket
available = @sockets - @checked_out
socket = available[rand(available.length)]
end
if socket.pid != Process.pid
@sockets.delete(socket)
if socket
socket.close unless socket.closed?
end
checkout_new_socket
else
@checked_out << socket
@thread_ids_to_sockets[Fiber.current.object_id] = socket
socket
end
end
def checkout
@client.connect if !@client.connected?
start_time = Time.now
loop do
if (Time.now - start_time) > @timeout
raise ConnectionTimeoutError, "could not obtain connection within " +
"#{@timeout} seconds. The max pool size is currently #{@size}; " +
"consider increasing the pool size or timeout."
end
@connection_mutex.synchronize do
check_prune
socket = nil
if socket_for_thread = @thread_ids_to_sockets[Fiber.current.object_id]
if !@checked_out.include?(socket_for_thread)
socket = checkout_existing_socket(socket_for_thread)
end
else
if @sockets.size < @size
socket = checkout_new_socket
elsif @checked_out.size < @sockets.size
socket = checkout_existing_socket
end
end
if socket
@socket_ops[socket].reject! do |op|
op.call
end
if socket.closed?
@checked_out.delete(socket)
@sockets.delete(socket)
@thread_ids_to_sockets.delete(Fiber.current.object_id)
socket = checkout_new_socket
end
return socket
else
@queue.wait(@connection_mutex)
end
end
end
end
end
class Node
Mutex = ::EventMachine::Synchrony::Thread::Mutex
Timeout = ::EventMachine::Synchrony::MongoTimeoutHandler
end
class TCPSocket
Timeout = ::EventMachine::Synchrony::MongoTimeoutHandler
def initialize(host, port, op_timeout=nil, connect_timeout=nil, opts={})
@op_timeout = op_timeout || 30
@connect_timeout = connect_timeout || 30
@pid = Process.pid
@address = Socket.getaddrinfo(host, nil, Socket::AF_INET).first[3]
@port = port
@socket = nil
connect
end
def connect
::EventMachine::Synchrony::MongoTimeoutHandler.timeout(@connect_timeout, Mongo::ConnectionTimeoutError) do
@socket = EM::Synchrony::TCPSocket.new(@address, @port)
end
end
def send(data)
raise SocketError, 'Not connected yet' if not @socket
@socket.write(data)
end
def read(maxlen, buffer)
raise SocketError, 'Not connected yet' if not @socket
::EventMachine::Synchrony::MongoTimeoutHandler.timeout(@op_timeout, Mongo::OperationTimeout) do
@socket.read(maxlen, buffer)
end
end
end
class SSLSocket
Timeout = ::EventMachine::Synchrony::MongoTimeoutHandler
end
class MongoReplicaSetClient
Mutex = ::EventMachine::Synchrony::Thread::Mutex
end
class MongoShardedClient
Mutex = ::EventMachine::Synchrony::Thread::Mutex
end
class PoolManager
Mutex = ::EventMachine::Synchrony::Thread::Mutex
end
end
ensure
$VERBOSE = old_verbose
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment