Skip to content

Instantly share code, notes, and snippets.

@zr40
Last active January 13, 2016 22:37
Show Gist options
  • Save zr40/e280a45efdb35e3a2c9b to your computer and use it in GitHub Desktop.
Save zr40/e280a45efdb35e3a2c9b to your computer and use it in GitHub Desktop.
Upsert+delete voodoo test (implemented in Rust)
extern crate postgres;
use postgres::{Connection,SslMode};
use std::sync::mpsc::{channel,Receiver,TryRecvError};
use std::thread;
#[test]
fn test_upsert_delete_mvcc_voodoo() {
// This tests an upsert+delete technique to replace a set of rows with
// another set of rows. The intended use case is to replicate an external
// data source, of which an older version was previously replicated. Rows
// of this data source may have been added, kept, updated, or deleted.
//
// PostgreSQL 9.5 introduces INSERT ... ON CONFLICT DO UPDATE, which is
// sufficient for the add, keep, and update cases, however, it does not
// handle the delete case. Ideally, the way to do this is proper, concise,
// efficient, and without side-effects, however no method exists that
// provides all four properties.
//
// The methods are:
//
// * DELETE everything, then INSERT everything.
// Although proper and concise, this will produce dead rows for unchanged
// data, and will trigger cascading foreign keys.
//
// * Add a change number column to recognize which rows are to be deleted.
// Proper, concise and side-effect free, but will produce dead rows like
// the previous method. It will also increase the size of a row.
//
// * Keep track of all keys provided to INSERT ... ON CONFLICT DO UPDATE,
// then exclude the same keys in the DELETE.
// This is proper, efficient and side-effect free, but the exclusion
// clause must list all keys within the set of rows.
//
// * Put the data in a temporary table, and use that as a source for the
// INSERT ... ON CONFLICT DO UPDATE and the DELETE.
// This is proper, concise and side-effect free, but requires temporary
// duplication of the entire set.
//
// * Abuse MVCC and PostgreSQL's row locking implementation.
// This is concise, efficient and side-effect free, but is obviously
// terribly improper, and could delete rows matching the add, keep and
// update cases.
//
// The improper method works as follows:
//
// 1. Begin a new transaction.
//
// 2. Perform an upsert as follows:
//
// INSERT INTO tbl (key, set_id, col1, col2, ...)
// VALUES ($1, $2, $3, $4, ...)
// ON CONFLICT (key) DO UPDATE
// SET key = tbl.key,
// col1 = $3, col2 = $4, ...
// WHERE col1 <> $3 AND col2 <> $4 AND ...
//
// By setting the key to itself, the row will be locked in FOR UPDATE
// instead of FOR NO KEY UPDATE, which would not block FOR KEY SHARE.
//
// 3. Delete the remainder:
//
// DELETE FROM tbl
// WHERE set_id = $1
// AND age(xmin) <> 0
// AND age(xmax) <> 0
//
// When a row has previously been inserted or updated by this
// transaction, xmin will contain its xid. When it has only been locked
// by this transaction, xmax will contain its xid. Therefore, if both
// xmin and xmax differ from the transaction's xid, it must have been
// present at the start of the transaction and not locked.
//
// For the four cases:
// * Add: row is newly inserted.
// age(xmin) = 0
// * Keep: row is locked.
// age(xmax) = 0 (assuming no MultiXact)
// * Update: old row version is not visible. New version is inserted.
// age(xmin) = 0
// * Delete: row is not inserted by this transaction and is not locked.
// age(xmin) <> 0 AND age(xmax) <> 0
//
// 4. Commit the transaction.
//
// This improper method assumes that xmax contains a regular xid, not a
// MultiXact xid. This cannot be guaranteed in a general case, however, the
// use of FOR UPDATE row locks removes one particular opportunity of a
// MultiXact appearing. The effect of having a MultiXact xid in a row's
// xmax is that the row will be deleted when it would otherwise be kept.
let conn = "postgres://zr40@%2Ftmp/test";
let db = Connection::connect(conn, &SslMode::None).unwrap();
db.execute("drop table if exists test", &[]).unwrap();
db.execute("create table test (id int4 primary key, set_id int4 not null)", &[]).unwrap();
db.execute("create index on test (set_id)", &[]).unwrap();
// Populate test table with 10000 rows, equally divided into 20 groups of
// 500 rows
db.execute("insert into test (id, set_id) select generate_series(0, 9999), generate_series(0, 19)", &[]).unwrap();
// Prepared statement: lock 100 rows (0..1980 step 20)
let upsert_100 = db.prepare("insert into test (id, set_id) select generate_series($1, 1980 + $1, 20), 4 on conflict (id) do update set id = test.id where false").unwrap();
// Prepared statement: lock 50 rows (9000..9980 step 20)
let upsert_50 = db.prepare("insert into test (id, set_id) select generate_series(9000 + $1, 9980 + $1, 20), 4 on conflict (id) do update set id = test.id where false").unwrap();
// Prepared statement: insert 201 rows
let insert_201 = db.prepare("insert into test (id, set_id) select generate_series(20000, 20200), $1 on conflict (id) do update set id = test.id where false").unwrap();
// Prepared statement: delete using MVCC voodoo
let delete_remainder = db.prepare("delete from test where set_id = $1 and age(xmin) <> 0 and age(xmax) <> 0").unwrap();
// Start some concurrent lockers
let threads: Vec<_> = (0..6).map(|x| {
let x = match x {
// Start three at the same offset
4 | 5 | 6 => 4,
x => x,
};
let (tx, rx) = channel();
let thread = thread::spawn(move|| {
concurrency(rx, x * 421);
});
(tx, thread)
}).collect();
for x in 0..2000 {
let set_id = x % 20;
let xact = db.transaction().unwrap();
// The first upsert will lock 100 rows
upsert_100.execute(&[&set_id]).unwrap();
// The second upsert will lock another 50 rows
upsert_50.execute(&[&set_id]).unwrap();
// Add a bunch of non-conflicting rows.
insert_201.execute(&[&set_id]).unwrap();
// Out of the original 500 rows, 350 rows remain unlocked. This must be
// the exact number of rows affected by the DELETE.
let deleted = delete_remainder.execute(&[&set_id]).unwrap();
assert_eq!(deleted, 350);
// Rollback
xact.finish().unwrap();
}
// stop the concurrent activity
for thread in threads.iter() {
thread.0.send(()).unwrap();
}
for thread in threads.into_iter() {
thread.1.join().unwrap();
}
db.execute("drop table test", &[]).unwrap();
}
fn concurrency(rx: Receiver<()>, start: i32) {
let conn = "postgres://zr40@%2Ftmp/test";
let db = Connection::connect(conn, &SslMode::None).unwrap();
// non-update row lock types
let lock0 = db.prepare("select from test where id = $1 for key share").unwrap();
let lock1 = db.prepare("select from test where id = $1 for share").unwrap();
let lock2 = db.prepare("select from test where id = $1 for no key update").unwrap();
let lock3 = db.prepare("select from test where id = $1 for update").unwrap();
// update row lock types
let lock4 = db.prepare("update test set set_id = set_id where id = $1").unwrap();
let lock5 = db.prepare("update test set id = id where id = $1").unwrap();
while rx.try_recv() == Err(TryRecvError::Empty) {
for x in 0..9999 {
let x = (x + start) % 10000;
let xact = db.transaction().unwrap();
match x % 10 {
0 | 1 => &lock0,
2 | 3 => &lock1,
4 | 5 => &lock2,
6 | 7 => &lock3,
8 => &lock4,
9 => &lock5,
_ => panic!(),
}.execute(&[&x]).unwrap();
// Test both commits and rollbacks of row locks.
//
// Committing an update (in cases 8 and 9) will make the old row
// version invisible to the main thread's DELETE, causing less rows
// to be deleted. This is not what's being tested, and is also
// acceptable, so don't commit in that case.
if x % 2 == 0 && x % 10 < 8 {
xact.commit().unwrap();
} else {
xact.finish().unwrap();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment