-
-
Save jhjguxin/5160792 to your computer and use it in GitHub Desktop.
mongoid batches operate include find and remove
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def find_in_batches(criteria, batch_size = 1000, sleep_seconds = 1) | |
current_batch = 0 | |
count = criteria.count | |
while count > 0 do | |
criteria.skip(current_batch * batch_size).limit(batch_size).collect{|doc| yield doc} | |
count -= batch_size | |
current_batch += 1 | |
#logger.info "now picked #{current_batch * batch_size}, batch_size is #{batch_size} then sleep #{sleep_seconds}'s ..." | |
sleep sleep_seconds | |
end | |
end | |
def handle_in_batches(criteria, batch_size = 1000, sleep_seconds = 1) | |
current_batch = 0 | |
count = criteria.count | |
while count > 0 do | |
yield criteria.skip(current_batch * batch_size).limit(batch_size) | |
count -= batch_size | |
current_batch += 1 | |
#logger.info "now picked #{current_batch * batch_size}, batch_size is #{batch_size} then sleep #{sleep_seconds}'s ..." | |
sleep sleep_seconds | |
end | |
end | |
# criteria = GxUser.where(user_id: {"$gt" => 400}) | |
# Utility.find_batches_with_id(criteria, batch_size = 10) {|user| puts user.user_id } | |
def find_batches_with_id(criteria, batch_size = 1000, sleep_seconds = 1) | |
if criteria.class.name.eql?("Mongoid::Criteria") | |
criteria = criteria.asc(:_id) | |
first_document = criteria.only(:id).first | |
next_batch_document = criteria.only(:id).skip(batch_size).first | |
loop do | |
id_query ||= {"$gte" => first_document.id, "$lt" => next_batch_document.id}.keep_if{|_,v| v.present?} | |
criteria = criteria.where(id: id_query) | |
is_finished = ! criteria.exists? | |
# puts "do loop with criteria is #{criteria.selector} and id_query is #{id_query}" | |
if is_finished or first_document.nil? | |
# puts "finished break loop #{is_finished} and #{first_document}" | |
break; | |
end | |
criteria.collect{|doc| yield doc} | |
# prepare to next loop | |
if next_batch_document.present? | |
criteria = criteria.where(id: {"$gte" => next_batch_document.id }) | |
truth_next_batch_document = criteria.skip(batch_size).first | |
end | |
if truth_next_batch_document.nil? | |
# skip first_document when next_batch_document is nil | |
id_query = {"$gt" => first_document.id } | |
end | |
first_document = next_batch_document | |
next_batch_document = truth_next_batch_document | |
end | |
end | |
end | |
# criteria = GxUser.where(user_id: {"$gt" => 400}) | |
# Utility.batches_handle_with_id(criteria, batch_size = 10) {|criteria| criteria.map(&:user_id) } | |
def batches_handle_with_id(criteria, batch_size = 1000, sleep_seconds = 1) | |
if criteria.class.name.eql?("Mongoid::Criteria") | |
criteria = criteria.asc(:_id) | |
first_document = criteria.only(:id).first | |
next_batch_document = criteria.only(:id).skip(batch_size).first | |
loop do | |
id_query ||= {"$gte" => first_document.id, "$lt" => next_batch_document.id}.keep_if{|_,v| v.present?} | |
criteria = criteria.where(id: id_query) | |
is_finished = ! criteria.exists? | |
# puts "do loop with criteria is #{criteria.selector} and id_query is #{id_query}" | |
if is_finished or first_document.nil? | |
# puts "finished break loop #{is_finished} and #{first_document}" | |
break; | |
end | |
yield criteria | |
sleep sleep_seconds | |
# prepare to next loop | |
if next_batch_document.present? | |
criteria = criteria.where(id: {"$gte" => next_batch_document.id }) | |
truth_next_batch_document = criteria.skip(batch_size).first | |
end | |
if truth_next_batch_document.nil? | |
# skip first_document when next_batch_document is nil | |
id_query = {"$gt" => first_document.id } | |
end | |
first_document = next_batch_document | |
next_batch_document = truth_next_batch_document | |
end | |
end | |
end | |
def remove_in_batches(criteria, batch_size = 1000, remove_method = "destroy_all") | |
if criteria.is_a? Mongoid::Criteria | |
batch_size ||= 1000 | |
i = 0 | |
loop do | |
first_document = criteria.first | |
next_batch_document = criteria.skip(batch_size).first | |
if first_document.present? | |
if next_batch_document.present? | |
criteria.where(id: {"$gte" => first_document.id, "$lt" => next_batch_document.id}).send(remove_method) | |
else | |
criteria.where(id: {"$gte" => first_document.id }).send(remove_method) | |
end | |
end | |
unless criteria.any? | |
# puts "finished break loop" | |
break; | |
else | |
sleep 0.5 | |
i += 1 | |
# puts "#{i * batch_size} count" | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment