Skip to content

Instantly share code, notes, and snippets.

@coin8086
Last active December 27, 2018 08:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save coin8086/a66c5f1a706b3981d1bdbe2cb7ff154d to your computer and use it in GitHub Desktop.
Save coin8086/a66c5f1a706b3981d1bdbe2cb7ff154d to your computer and use it in GitHub Desktop.
Bulk insert/upsert method for ActiveRecord on PostgreSQL(9.5 and later)

ActiveRecord Bulk Write Patch

Usage

Suppose there's a table

create table users (
  id integer primary key,
  name text,
  hireable bool,
  created_at date_time
)

Bulk Insert

Then you can do insert in batch:

fields = %w(id name hireable created_at)
  rows = [
    [1, "Bob's", true, now - 10],
    [2, nil, "false", (now - 1).utc.iso8601],
  ]
result = User.bulk_write(fields, rows)

The result is the effected(inserted) rows.

It's just a normal batch insert and won't do upsert and thus doesn't have special requirement for PostgreSQL at all!

The method assumes rows of values are already in proper Ruby types and convert them to strings/types suitable for database. You need pay attention here: if your input for time is a string, it must be a UTC time, not a local time. See comment above for "connection.quote" for why.

Bulk Upsert

When you do upsert, you pass the 3rd parameter upsert:

upsert = { :conflict => [:id], :where => "users.id > 2 AND EXCLUDED.name IS NOT NULL" }
result = User.bulk_write(fields, rows, :upsert => upsert)

The only required field of upsert is conflict. See comment in code for the parameter's meanings. If you know PostgreSQL 9.5's upsert syntax, you understand it even more. Here it is: https://www.postgresql.org/docs/9.5/static/sql-insert.html#SQL-ON-CONFLICT

Compatible Rails Versions and Test

It's tested against ActiveRecord 4.2, but should work on 4.x as well as 5. You should test it on other versions before using it. The above test file is in minitest. Most of the test code is obvious except the init method, which is just for init test database from a spec. Its implementation is omitted but easy to do with AcitveRecord::Base#create. You could leverage the file for testing.

Moreover, PostgreSQL 9.5 and later is required for upsert, since it depends on PostgreSQL's native support for upsert. But if you insert only, then PostgreSQL is not a requirement and MySQL also works.

require 'active_record'
module ActiveRecord
class Base
# Insert or update a batch of rows.
# When upsert is nil, insert only. Otherwise insert or update on conflict.
#
# Parameters:
# fields: an array of field names to insert value
# rows: an array of array of values to insert. The element, values array,
# must correspond with the fields parameter.
# upsert:
# A hash with the following keys::
# conflict: an array of field names on which a unique constrain may fail the
# insert
# update: an array of field names to update when conflict happens. If
# omitted, it will be "fields - upsert[:conflict]".
# where: where clause to determine if a row will be updated when
# conflict happens. "EXCLUDED" is used for referencing the row
# that failed in insert. If omitted, update all rows that have
# conflict.
#
# Return:
# The number of affected rows
def self.bulk_write(fields, rows, upsert = nil)
return 0 if rows.empty?
columns = fields.map {|name| column_for_attribute(name) }
rows = rows.map do |row|
values = row.map.with_index do |val, i|
# NOTE: Here quote method treats val as a Ruby value from
# Value#type_cast_from_user, and thus won't convert a String time to
# Time object before passing the string time to database.
# That's OK for a time string in UTC, but NOT FOR LOCAL TIME, since
# ActiveRecord saves time as type "DATETIME WITHOUT TIMEZONE" in database!
connection.quote(val, columns[i])
end.join ', '
"(#{ values })"
end
sql = "INSERT INTO #{ table_name } (#{ fields.join ', ' }) VALUES #{ rows.join ', ' }"
if upsert
if !upsert[:update]
update = fields.map(&:to_s) - upsert[:conflict].map(&:to_s)
else
update = upsert[:update]
end
update = update.map {|field| "#{ field } = EXCLUDED.#{ field }" }
sql += " ON CONFLICT (#{ upsert[:conflict].join ', ' }) DO UPDATE SET #{ update.join ', ' }"
if upsert[:where]
sql += " WHERE #{ upsert[:where] }"
end
end
# res is a PG::Result object. See
# https://deveiate.org/code/pg/PG/Result.html
# for its details.
res = connection.execute(sql)
res.cmd_tuples
end
end
end
require 'time'
require 'active_record_patch'
class TestUser < ActiveRecord::Base
connection.create_table(:test_users, :force => true) do |t|
t.string :name
t.string :login
t.boolean :hireable
t.datetime :created_at
end
self.record_timestamps = false
end
class ActiveRecordTest < Minitest::Test
include TestHelpers
def test_bulk_write
now = Time.now
# It should bulk insert rows of data and do proper type convertion for database.
init
fields = %w(id name hireable created_at)
rows = [
[1, "Bob's", true, now - 10],
[2, nil, "false", (now - 1).utc.iso8601], # NOTE: The string time must be in UTC.
]
result = TestUser.bulk_write(fields, rows)
assert_equal 2, result
user = TestUser.first
assert_equal rows[0][0], user.id
assert_equal rows[0][1], user.name
assert_equal rows[0][2], user.hireable
# To be able to compare the time and ignore minor loss of accuracy,
assert_equal rows[0][3].to_i, user.created_at.to_i
user = TestUser.last
assert_equal 2, user.id
assert_equal nil, user.name
assert_equal false, user.hireable
assert_equal (now - 1).to_i, user.created_at.to_i
# It should insert or update, and respect where condition on update.
init(
:test_users => [
{ :id => 1, :name => 'n1', :login => 'l1' },
{ :id => 2, :name => 'n2', :login => 'l2' },
{ :id => 3, :name => 'n3', :login => 'l3' },
],
)
fields = %w(id name login)
rows = [
[2, "n22", "l22"],
[3, "n33", nil],
[4, nil, "l44"],
]
result = TestUser.bulk_write(
fields, rows, { :conflict => [:id], :where => "test_users.id > 2 AND EXCLUDED.name IS NOT NULL" }
)
assert_equal 2, result
users = TestUser.all.order(:id => :asc)
assert_equal 4, users.size
# user 1 doesn't get updated
assert_equal 1, users[0].id
assert_equal 'n1', users[0].name
assert_equal 'l1', users[0].login
# user 2 doesn't get updated
assert_equal 2, users[1].id
assert_equal 'n2', users[1].name
assert_equal 'l2', users[1].login
# user 3 gets updated
assert_equal 3, users[2].id
assert_equal 'n33', users[2].name
assert_equal nil, users[2].login
# user 4 gets inserted
assert_equal 4, users[3].id
assert_equal nil, users[3].name
assert_equal 'l44', users[3].login
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment