Skip to content

Instantly share code, notes, and snippets.

@lmmendes
Created May 24, 2015 10:03
Show Gist options
  • Save lmmendes/4d3e828984cd9df5d1d5 to your computer and use it in GitHub Desktop.
Save lmmendes/4d3e828984cd9df5d1d5 to your computer and use it in GitHub Desktop.
Monkey patch for Mongoid 4.x "Could not connect to a primary node for replica set"
#==================================================================================================
# Monkey Patch (lmmendes)
# MongoDB or in this case Mongoid and it's driver Moped have problems running some commands
# when the Primary node goes down and Moped tries to write to the database before
# refreshing the cluster info or at least trying the same command on each node before failing.
# From GitHub:
# Moped::Errors::ConnectionFailure: Could not connect to a primary node for replica set
# https://github.com/mongoid/moped/issues/348
#==================================================================================================
require 'mongoid'
if Mongoid::VERSION >= '5.0.0'
raise "Please remove this patch for Mongoid 4.x before upgrading."
end
# ==================================================================================================
# Patch from
# https://github.com/wandenberg/moped/commit/fe36cd65b105d83253e25d0e474711244ae31302
# ==================================================================================================
require 'moped'
require "moped/query"
# ==================================================================================================
# From
# https://raw.githubusercontent.com/wandenberg/moped/fe36cd65b105d83253e25d0e474711244ae31302/lib/moped/retryable.rb
# ==================================================================================================
module Moped
# Provides the shared behaviour for retry failed operations.
#
# @since 2.0.0
module Retryable
private
# Execute the provided block on the cluster and retry if the execution
# fails.
#
# @api private
#
# @example Execute with retry.
# preference.with_retry(cluster) do
# cluster.with_primary do |node|
# node.refresh
# end
# end
#
# @param [ Cluster ] cluster The cluster.
# @param [ Integer ] retries The number of times to retry.
#
# @return [ Object ] The result of the block.
#
# @since 2.0.0
def with_retry(cluster, retries = cluster.max_retries, &block)
begin
block.call
rescue Errors::ConnectionFailure, Errors::PotentialReconfiguration => e
raise e if e.is_a?(Errors::PotentialReconfiguration) &&
! (e.message.include?("not master") || e.message.include?("Not primary"))
if retries > 0
Loggable.warn(" MOPED:", "Retrying connection attempt #{retries} more time(s).", "n/a")
sleep(cluster.retry_interval)
cluster.refresh
with_retry(cluster, retries - 1, &block)
else
raise e
end
end
end
end
end
# ==================================================================================================
# From
# https://raw.githubusercontent.com/wandenberg/moped/fe36cd65b105d83253e25d0e474711244ae31302/lib/moped/collection.rb
# ==================================================================================================
module Moped
# The class for interacting with a MongoDB collection.
#
# @since 1.0.0
class Collection
include Readable
include Retryable
# @!attribute database
# @return [ Database ] The database for the collection.
# @!attribute name
# @return [ String ] The name of the collection.
attr_reader :database, :name
# Return whether or not this collection is a capped collection.
#
# @example Is the collection capped?
# collection.capped?
#
# @return [ true, false ] If the collection is capped.
#
# @since 1.4.0
def capped?
database.command(collstats: name)["capped"]
end
# Drop the collection.
#
# @example Drop the collection.
# collection.drop
#
# @return [ Hash ] The command information.
#
# @since 1.0.0
def drop
begin
session.with(read: :primary).command(drop: name)
rescue Moped::Errors::OperationFailure => e
raise e unless e.ns_not_found?
false
end
end
# Rename the collection
#
# @example Rename the collection to 'foo'
# collection.rename('foo')
#
# @return [ Hash ] The command information.
#
# @since 2.0.0
def rename(to_name)
begin
session.
with(database: "admin", read: :primary).
command(renameCollection: "#{database.name}.#{name}", to: "#{database.name}.#{to_name}")
rescue Moped::Errors::OperationFailure => e
raise e unless e.ns_not_exists?
false
end
end
# Build a query for this collection.
#
# @example Build a query based on the provided selector.
# collection.find(name: "Placebo")
#
# @param [ Hash ] selector The query selector.
#
# @return [ Query ] The generated query.
#
# @since 1.0.0
def find(selector = {})
Query.new(self, selector)
end
alias :where :find
# Access information about this collection's indexes.
#
# @example Get the index information.
# collection.indexes
#
# @return [ Indexes ] The index information.
#
# @since 1.0.0
def indexes
Indexes.new(database, name)
end
# Initialize the new collection.
#
# @example Initialize the collection.
# Collection.new(database, :artists)
#
# @param [ Database ] database The collection's database.
# @param [ String, Symbol] name The collection name.
#
# @since 1.0.0
def initialize(database, name)
@database = database
@name = name.to_s
end
# Insert one or more documents into the collection.
#
# @example Insert a single document.
# db[:people].insert(name: "John")
#
# @example Insert multiple documents in batch.
# db[:people].insert([{name: "John"}, {name: "Joe"}])
#
# @param [ Hash, Array<Hash> ] documents The document(s) to insert.
# @param [ Array ] flags The flags, valid values are :continue_on_error.
#
# @option options [Array] :continue_on_error Whether to continue on error.
#
# @return [ nil ] nil.
#
# @since 1.0.0
def insert(documents, flags = nil)
with_retry(cluster) do
docs = documents.is_a?(Array) ? documents : [ documents ]
cluster.with_primary do |node|
node.insert(database.name, name, docs, write_concern, flags: flags || [])
end
end
end
# Call aggregate function over the collection.
#
# @example Execute an aggregation.
# session[:users].aggregate({
# "$group" => {
# "_id" => "$city",
# "totalpop" => { "$sum" => "$pop" }
# }
# })
#
# @param [ Hash, Array<Hash> ] documents representing the aggregate
# function to execute
#
# @return [ Hash ] containing the result of aggregation
#
# @since 1.3.0
def aggregate(*pipeline)
session.command(aggregate: name, pipeline: pipeline.flatten)["result"]
end
# Get the session for the collection.
#
# @example Get the session for the collection.
# collection.session
#
# @return [ Session ] The session for the collection.
#
# @since 2.0.0
def session
database.session
end
def write_concern
session.write_concern
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment