Last active
August 29, 2015 14:19
-
-
Save gavinwahl/46ad9f417415ea3c3670 to your computer and use it in GitHub Desktop.
Temporal foreign key
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DROP TABLE IF EXISTS parent; | |
DROP TABLE IF EXISTS child; | |
CREATE TABLE parent( | |
identity_id integer, | |
valid_range int4range | |
); | |
CREATE TABLE child( | |
valid_range int4range, | |
parent integer | |
); | |
CREATE CONSTRAINT TRIGGER foo AFTER DELETE OR UPDATE ON parent INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE check_fk_parent('child', 'parent'); | |
CREATE TRIGGER foo AFTER INSERT OR UPDATE ON child FOR EACH ROW EXECUTE PROCEDURE check_fk_child('parent', 'parent'); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
from django.test import TransactionTestCase | |
from django.db import connection, transaction, InternalError | |
class TestTriggers(TransactionTestCase): | |
available_apps = ['temporal'] | |
@classmethod | |
def setUpClass(cls): | |
with open('temporal/triggers.sql') as f: | |
trigger_code = f.read() | |
c = connection.cursor() | |
c.execute(trigger_code) | |
c.execute(""" | |
DROP TABLE IF EXISTS parent; | |
DROP TABLE IF EXISTS child; | |
CREATE TABLE parent( | |
identity_id integer, | |
valid_range int4range | |
); | |
CREATE TABLE child( | |
valid_range int4range, | |
parent integer | |
); | |
CREATE CONSTRAINT TRIGGER foo AFTER DELETE OR UPDATE ON parent INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE check_fk_parent('child', 'parent'); | |
CREATE TRIGGER foo AFTER INSERT OR UPDATE ON child FOR EACH ROW EXECUTE PROCEDURE check_fk_child('parent', 'parent'); | |
""") | |
def setUp(self): | |
self.c = connection.cursor() | |
self.c.execute(""" | |
TRUNCATE parent, child; | |
INSERT INTO parent (identity_id, valid_range) VALUES | |
(1, '[1, 3)'), | |
(1, '[3, 4)'), | |
(3, '[3, 4)'), | |
(4, '[1, 4)'), | |
(4, '[6, 8)'), | |
(5, '[1, 7)') | |
; | |
INSERT INTO child (valid_range, parent) VALUES | |
('[1, 2)', 1), | |
('[1, 3)', 4) | |
; | |
""") | |
def tearDown(self): | |
self.c.execute("DELETE FROM child;") | |
self.c.execute("DELETE FROM parent;") | |
self.c.close() | |
def assertDBFailure(self, q): | |
with self.assertRaises(InternalError): | |
with transaction.atomic(): | |
self.c.execute(q) | |
def assertDBSuccess(self, q): | |
with transaction.atomic(): | |
self.c.execute(q) | |
# the query must affect at least one row | |
self.assertTrue(self.c.rowcount) | |
def test_update_id_range_not_referenced(self): | |
self.assertDBSuccess("UPDATE parent SET identity_id = 2 WHERE identity_id = 1 AND valid_range = '[3,4)';") | |
def test_update_id_not_referenced_at_all(self): | |
self.assertDBSuccess("UPDATE parent SET identity_id = 1 WHERE identity_id = 3;") | |
def test_update_id_when_range_referenced(self): | |
self.assertDBFailure("UPDATE parent SET identity_id = 2 WHERE identity_id = 1 AND valid_range = '[1,3)';") | |
def test_update_range_out_of_range_when_referenced(self): | |
self.assertDBFailure("UPDATE parent SET valid_range = '[2, 3)' WHERE identity_id = 1 AND valid_range = '[1,3)';") | |
def test_update_child_range(self): | |
self.assertDBFailure("UPDATE child SET valid_range = '[0, 2)' WHERE parent = 4;") | |
self.assertDBFailure("UPDATE child SET valid_range = '[1, 5)' WHERE parent = 4;") | |
def test_update_range_in_range_when_referenced(self): | |
self.assertDBFailure("UPDATE parent SET valid_range = '[2, 3)' WHERE identity_id = 1 AND valid_range = '[1,3)';") | |
def test_update_range_not_referenced_parent(self): | |
self.assertDBSuccess("UPDATE parent SET valid_range = '[1, 2)' WHERE identity_id = 1 AND valid_range = '[1,3)';") | |
self.assertDBSuccess("SELECT * FROM parent WHERE identity_id = 1 AND valid_range = '[1,2)'") | |
def test_delete_success(self): | |
self.assertDBSuccess("UPDATE parent SET identity_id = 2 WHERE valid_range = '[3,4)';") | |
self.assertDBSuccess("DELETE FROM parent WHERE identity_id = 2") | |
self.assertDBSuccess("SELECT count(*) FROM parent WHERE identity_id = 2") | |
row = self.c.fetchone() | |
self.assertEqual(row[0], 0) | |
def test_null_fk(self): | |
self.assertDBSuccess("UPDATE child SET parent = NULL") | |
self.assertDBSuccess("DELETE FROM parent") | |
def test_insert_not_completely_range(self): | |
self.assertDBFailure("INSERT INTO child VALUES ('[1,7)', 1);") | |
def test_insert_no_match_at_all(self): | |
self.assertDBFailure("INSERT INTO child VALUES ('[1,7)', 11);") | |
def test_insert_gap_in_the_middle(self): | |
self.assertDBFailure("INSERT INTO child VALUES ('[2,7)', 4);") | |
def test_commit(self): | |
""" | |
Committing involves shrinking the valid range, then inserting a new row | |
that extends it, requiring a deferred constraint. | |
""" | |
self.assertDBSuccess("INSERT INTO child VALUES ('[2,6)', 5)") | |
with transaction.atomic(): | |
self.assertDBSuccess("UPDATE parent SET valid_range = '[1,4)' WHERE identity_id = 5") | |
self.assertDBSuccess("INSERT INTO parent VALUES (5, '[4,7)')") | |
self.assertDBSuccess("INSERT INTO child VALUES ('[6,7)', 5)") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE OR REPLACE FUNCTION recheck_children(parent_table TEXT, fk_val INT, valid_range anyelement, lock_mode TEXT) RETURNS boolean AS $f$ | |
DECLARE | |
has_gaps INT; | |
query TEXT; | |
dependency RECORD; | |
BEGIN | |
-- First, lock all the parent rows we depend on (all of them that overlap), | |
-- and make sure there is at least one. | |
query := format('SELECT 1 FROM %I WHERE identity_id = $2 AND valid_range && $1 %s', parent_table, lock_mode); | |
EXECUTE query USING valid_range, fk_val INTO dependency; | |
RAISE DEBUG '%, %', query, dependency; | |
IF dependency IS NULL THEN | |
RETURN FALSE; | |
END IF; | |
-- Now, make sure there are no gaps in parent during our range of validity. | |
-- This query will return rows iff there are gaps. | |
query := format($$ | |
SELECT 1 | |
FROM %I current | |
LEFT JOIN %I next ON next.identity_id = current.identity_id AND next.valid_range -|- current.valid_range and current.valid_range << next.valid_range | |
LEFT JOIN %I prev ON prev.identity_id = current.identity_id AND prev.valid_range -|- current.valid_range and prev.valid_range << current.valid_range | |
WHERE ((NOT $1 &< current.valid_range AND next.valid_range IS NULL AND NOT current.valid_range << $1 ) | |
OR (NOT $1 &> current.valid_range AND prev.valid_range IS NULL AND NOT current.valid_range >> $1)) | |
AND current.identity_id = $2 | |
LIMIT 1 | |
$$, parent_table, parent_table, parent_table); | |
EXECUTE query USING valid_range, fk_val INTO has_gaps; | |
RAISE DEBUG '%, %', query, has_gaps; | |
IF has_gaps IS NOT NULL THEN | |
RAISE EXCEPTION 'There is a gap in (%.identity_id)=(%) during %', parent_table, fk_val, valid_range; | |
END IF; | |
RETURN TRUE; | |
END; | |
$f$ LANGUAGE 'plpgsql'; | |
CREATE OR REPLACE FUNCTION check_fk_child() RETURNS trigger AS $f$ | |
DECLARE | |
fk_local TEXT := TG_ARGV[0]; | |
parent_table TEXT := TG_ARGV[1]; | |
fk_val INT; | |
BEGIN | |
-- fk_val = getattr(NEW, fk_local) | |
EXECUTE format('SELECT $1.%I', fk_local) USING NEW INTO fk_val; | |
IF fk_val IS NULL THEN | |
RETURN NULL; | |
END IF; | |
IF NOT recheck_children(parent_table, fk_val, NEW.valid_range, 'FOR SHARE') THEN | |
RAISE EXCEPTION 'insert or update on table "%" violates foreign key trigger "%"', TG_RELNAME, TG_NAME USING | |
DETAIL=format('Key (%s)=(%s) (valid_range)&&(%s), is not present in table "%s".', fk_local, fk_val, NEW.valid_range, parent_table); | |
END IF; | |
return NULL; | |
END; | |
$f$ LANGUAGE 'plpgsql'; | |
CREATE OR REPLACE FUNCTION check_fk_parent() RETURNS trigger AS $f$ | |
DECLARE | |
child_table TEXT := TG_ARGV[0]; | |
fk_name TEXT := TG_ARGV[1]; | |
query TEXT; | |
child_row RECORD; | |
BEGIN | |
IF TG_OP = 'DELETE' OR NEW.identity_id != OLD.identity_id OR NEW.valid_range != OLD.valid_range THEN | |
query := format('SELECT valid_range FROM %I WHERE %I = $1 AND valid_range && $2 LIMIT 1', child_table, fk_name); | |
FOR child_row IN EXECUTE query USING OLD.identity_id, OLD.valid_range LOOP | |
IF NOT recheck_children(TG_RELNAME, OLD.identity_id, child_row.valid_range, '') THEN | |
RAISE EXCEPTION 'delete or update on table "%" violates foreign key trigger "%"', TG_RELNAME, TG_NAME USING | |
DETAIL=format('Key (identity_id)=(%s) (valid_range)&&(%s), is not present in table "%s".', OLD.identity_id, NEW.valid_range, TG_RELNAME); | |
END IF; | |
END LOOP; | |
END IF; | |
RETURN NULL; | |
END; | |
$f$ LANGUAGE 'plpgsql'; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment