Last active
November 23, 2019 14:03
-
-
Save junara/f3c8548674bb2bf9bf56e97e21a4c63f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class BulkUpserter | |
attr_reader :scope, :on_duplicate_key_update, :raise_error, :validate, :batch_size, :reports, :error, :exit_on_error | |
# activerecort-importのon_duplicate_key_updateを使いやすくしたもの | |
# 使い方例 | |
# bulk_upserter = BulkUpserter.new(Blog.limit(100), {'Blog' => [:name], 'Comment' => [:name]}) | |
# bulk_upserter.run {|blog, instances| blog.assign_attributes(name: 'fuga') || instances.add(blog) || blog.comments.build(name: 'piyo') || instances.add(blog)} | |
# | |
# bulk_upserter.reports はactiverecord_importのreportの配列 | |
# => #<BulkUpserter::Reports:0x000055f74b08c4e0 @messages={:Blog=>[#<struct ActiveRecord::Import::Result failed_instances=[#<Blog id: 1, name: nil, edited_at: nil, created_at: "2019-11-23 07:17:30", updated_at: "2019-11-23 10:56:26">], num_inserts=0, ids=[], results=[]>], :Comment=>[#<struct ActiveRecord::Import::Result failed_instances=[], num_inserts=1, ids=[], results=[]>]}> | |
# on_duplicate_key_update はモデル名毎にupdateするカラム名を配列で渡す | |
# => {'Blog' => [:name], 'Comment' => [:name]} | |
# raise_error は import時にvalidationエラーが起きた時即時にエラーをraiseしたい場合はtrue (default false) | |
# validate は import時にvalidationを行わない場合は false (default true) | |
# batch_size は 処理のバッチサイズ1000ぐらいが適当 | |
# exit_on_error はエラーをraiseしたときにすぐにう処理を抜けたい場合は true (defaultはfalse) | |
def initialize(scope, on_duplicate_key_update, raise_error: false, validate: true, batch_size: 1000, exit_on_error: false) | |
raise 'BulkUpserter is only for mysql' if ActiveRecord::Base.connection_config[:adapter] != 'mysql2' | |
@scope = scope | |
@on_duplicate_key_update = on_duplicate_key_update | |
@raise_error = raise_error | |
@validate = validate | |
@batch_size = batch_size | |
@log = [] | |
@error = false | |
@reports = Reports.new | |
@exit_on_error = exit_on_error | |
end | |
def run | |
ActiveRecord::Base.transaction do | |
scope.find_in_batches(batch_size: batch_size) do |scope| | |
collection = Collection.new(scope) | |
# モデル名をキーとし、objectの配列を取得する | |
collection.load_with_sorting do |primary_object, instances| | |
yield(primary_object, instances) | |
instances.list | |
end | |
# モデル毎にimportする | |
# collection.sorted_data | |
# => {'Blog' => [blog, blog, blog], 'Comment' => [comment, comment, comment]} | |
# model_name | |
# => 'Blog' | |
# objects | |
# => [blog, blog, blog] | |
collection.sorted_data.each do |model_name, objects| | |
import_objects(objects.compact, on_duplicate_key_update: on_duplicate_key_update[model_name], raise_error: raise_error, validate: validate) | |
end | |
end | |
raise ActiveRecord::Rollback if error | |
end | |
end | |
def valid? | |
!error | |
end | |
private | |
def import_objects(objects, on_duplicate_key_update: {}, raise_error: false, validate: true) | |
raise StandardError, '単一のobjectのリストにしてください' if objects.uniq.empty? | |
model = Object.const_get(objects.first.model_name.name) | |
begin | |
report = model.import(objects, validate: validate, raise_error: raise_error, on_duplicate_key_update: on_duplicate_key_update) | |
@reports.add(model.name, report) | |
@error = true if report.failed_instances.present? | |
raise 'Error raised !' if error && exit_on_error | |
rescue ActiveRecord::RecordInvalid | |
@error = true | |
end | |
end | |
class Collection | |
attr_reader :sorted_data, :scope | |
def initialize(scope) | |
@sorted_data = {} | |
@scope = scope | |
end | |
def load_with_sorting | |
scope.each do |primary_object| | |
instances = Instances.new | |
instances = yield(primary_object, instances) | |
sorting(instances) | |
end | |
end | |
private | |
def sorting(instances) | |
instances.each do |instance| | |
model_name = instance.model_name.name | |
sorted_data[model_name] ? sorted_data[model_name] << instance : sorted_data[model_name] = [instance] | |
end | |
end | |
end | |
class Instances | |
attr_reader :list | |
def initialize | |
@list = [] | |
end | |
def add(*objects) | |
list.concat(objects) | |
end | |
end | |
class Reports | |
include Enumerable | |
attr_reader :messages | |
def initialize | |
@messages = {} | |
end | |
def add(attribute, message) | |
if messages[attribute.to_sym] | |
messages[attribute.to_sym] << message | |
else | |
messages[attribute.to_sym] = [message] | |
end | |
end | |
def each | |
messages.each_key do |attribute| | |
messages[attribute].each { |error| yield attribute, error } | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment