Skip to content

Instantly share code, notes, and snippets.

@junara
Last active November 23, 2019 14:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save junara/f3c8548674bb2bf9bf56e97e21a4c63f to your computer and use it in GitHub Desktop.
Save junara/f3c8548674bb2bf9bf56e97e21a4c63f to your computer and use it in GitHub Desktop.
class BulkUpserter
attr_reader :scope, :on_duplicate_key_update, :raise_error, :validate, :batch_size, :reports, :error, :exit_on_error
# activerecort-importのon_duplicate_key_updateを使いやすくしたもの
# 使い方例
# bulk_upserter = BulkUpserter.new(Blog.limit(100), {'Blog' => [:name], 'Comment' => [:name]})
# bulk_upserter.run {|blog, instances| blog.assign_attributes(name: 'fuga') || instances.add(blog) || blog.comments.build(name: 'piyo') || instances.add(blog)}
#
# bulk_upserter.reports はactiverecord_importのreportの配列
# => #<BulkUpserter::Reports:0x000055f74b08c4e0 @messages={:Blog=>[#<struct ActiveRecord::Import::Result failed_instances=[#<Blog id: 1, name: nil, edited_at: nil, created_at: "2019-11-23 07:17:30", updated_at: "2019-11-23 10:56:26">], num_inserts=0, ids=[], results=[]>], :Comment=>[#<struct ActiveRecord::Import::Result failed_instances=[], num_inserts=1, ids=[], results=[]>]}>
# on_duplicate_key_update はモデル名毎にupdateするカラム名を配列で渡す
# => {'Blog' => [:name], 'Comment' => [:name]}
# raise_error は import時にvalidationエラーが起きた時即時にエラーをraiseしたい場合はtrue (default false)
# validate は import時にvalidationを行わない場合は false (default true)
# batch_size は 処理のバッチサイズ1000ぐらいが適当
# exit_on_error はエラーをraiseしたときにすぐにう処理を抜けたい場合は true (defaultはfalse)
def initialize(scope, on_duplicate_key_update, raise_error: false, validate: true, batch_size: 1000, exit_on_error: false)
raise 'BulkUpserter is only for mysql' if ActiveRecord::Base.connection_config[:adapter] != 'mysql2'
@scope = scope
@on_duplicate_key_update = on_duplicate_key_update
@raise_error = raise_error
@validate = validate
@batch_size = batch_size
@log = []
@error = false
@reports = Reports.new
@exit_on_error = exit_on_error
end
def run
ActiveRecord::Base.transaction do
scope.find_in_batches(batch_size: batch_size) do |scope|
collection = Collection.new(scope)
# モデル名をキーとし、objectの配列を取得する
collection.load_with_sorting do |primary_object, instances|
yield(primary_object, instances)
instances.list
end
# モデル毎にimportする
# collection.sorted_data
# => {'Blog' => [blog, blog, blog], 'Comment' => [comment, comment, comment]}
# model_name
# => 'Blog'
# objects
# => [blog, blog, blog]
collection.sorted_data.each do |model_name, objects|
import_objects(objects.compact, on_duplicate_key_update: on_duplicate_key_update[model_name], raise_error: raise_error, validate: validate)
end
end
raise ActiveRecord::Rollback if error
end
end
def valid?
!error
end
private
def import_objects(objects, on_duplicate_key_update: {}, raise_error: false, validate: true)
raise StandardError, '単一のobjectのリストにしてください' if objects.uniq.empty?
model = Object.const_get(objects.first.model_name.name)
begin
report = model.import(objects, validate: validate, raise_error: raise_error, on_duplicate_key_update: on_duplicate_key_update)
@reports.add(model.name, report)
@error = true if report.failed_instances.present?
raise 'Error raised !' if error && exit_on_error
rescue ActiveRecord::RecordInvalid
@error = true
end
end
class Collection
attr_reader :sorted_data, :scope
def initialize(scope)
@sorted_data = {}
@scope = scope
end
def load_with_sorting
scope.each do |primary_object|
instances = Instances.new
instances = yield(primary_object, instances)
sorting(instances)
end
end
private
def sorting(instances)
instances.each do |instance|
model_name = instance.model_name.name
sorted_data[model_name] ? sorted_data[model_name] << instance : sorted_data[model_name] = [instance]
end
end
end
class Instances
attr_reader :list
def initialize
@list = []
end
def add(*objects)
list.concat(objects)
end
end
class Reports
include Enumerable
attr_reader :messages
def initialize
@messages = {}
end
def add(attribute, message)
if messages[attribute.to_sym]
messages[attribute.to_sym] << message
else
messages[attribute.to_sym] = [message]
end
end
def each
messages.each_key do |attribute|
messages[attribute].each { |error| yield attribute, error }
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment