Skip to content

Instantly share code, notes, and snippets.

@gavinwahl
Last active August 29, 2015 14:19
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gavinwahl/46ad9f417415ea3c3670 to your computer and use it in GitHub Desktop.
Save gavinwahl/46ad9f417415ea3c3670 to your computer and use it in GitHub Desktop.
Temporal foreign key
DROP TABLE IF EXISTS parent;
DROP TABLE IF EXISTS child;
CREATE TABLE parent(
identity_id integer,
valid_range int4range
);
CREATE TABLE child(
valid_range int4range,
parent integer
);
CREATE CONSTRAINT TRIGGER foo AFTER DELETE OR UPDATE ON parent INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE check_fk_parent('child', 'parent');
CREATE TRIGGER foo AFTER INSERT OR UPDATE ON child FOR EACH ROW EXECUTE PROCEDURE check_fk_child('parent', 'parent');
import datetime
from django.test import TransactionTestCase
from django.db import connection, transaction, InternalError
class TestTriggers(TransactionTestCase):
available_apps = ['temporal']
@classmethod
def setUpClass(cls):
with open('temporal/triggers.sql') as f:
trigger_code = f.read()
c = connection.cursor()
c.execute(trigger_code)
c.execute("""
DROP TABLE IF EXISTS parent;
DROP TABLE IF EXISTS child;
CREATE TABLE parent(
identity_id integer,
valid_range int4range
);
CREATE TABLE child(
valid_range int4range,
parent integer
);
CREATE CONSTRAINT TRIGGER foo AFTER DELETE OR UPDATE ON parent INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE check_fk_parent('child', 'parent');
CREATE TRIGGER foo AFTER INSERT OR UPDATE ON child FOR EACH ROW EXECUTE PROCEDURE check_fk_child('parent', 'parent');
""")
def setUp(self):
self.c = connection.cursor()
self.c.execute("""
TRUNCATE parent, child;
INSERT INTO parent (identity_id, valid_range) VALUES
(1, '[1, 3)'),
(1, '[3, 4)'),
(3, '[3, 4)'),
(4, '[1, 4)'),
(4, '[6, 8)'),
(5, '[1, 7)')
;
INSERT INTO child (valid_range, parent) VALUES
('[1, 2)', 1),
('[1, 3)', 4)
;
""")
def tearDown(self):
self.c.execute("DELETE FROM child;")
self.c.execute("DELETE FROM parent;")
self.c.close()
def assertDBFailure(self, q):
with self.assertRaises(InternalError):
with transaction.atomic():
self.c.execute(q)
def assertDBSuccess(self, q):
with transaction.atomic():
self.c.execute(q)
# the query must affect at least one row
self.assertTrue(self.c.rowcount)
def test_update_id_range_not_referenced(self):
self.assertDBSuccess("UPDATE parent SET identity_id = 2 WHERE identity_id = 1 AND valid_range = '[3,4)';")
def test_update_id_not_referenced_at_all(self):
self.assertDBSuccess("UPDATE parent SET identity_id = 1 WHERE identity_id = 3;")
def test_update_id_when_range_referenced(self):
self.assertDBFailure("UPDATE parent SET identity_id = 2 WHERE identity_id = 1 AND valid_range = '[1,3)';")
def test_update_range_out_of_range_when_referenced(self):
self.assertDBFailure("UPDATE parent SET valid_range = '[2, 3)' WHERE identity_id = 1 AND valid_range = '[1,3)';")
def test_update_child_range(self):
self.assertDBFailure("UPDATE child SET valid_range = '[0, 2)' WHERE parent = 4;")
self.assertDBFailure("UPDATE child SET valid_range = '[1, 5)' WHERE parent = 4;")
def test_update_range_in_range_when_referenced(self):
self.assertDBFailure("UPDATE parent SET valid_range = '[2, 3)' WHERE identity_id = 1 AND valid_range = '[1,3)';")
def test_update_range_not_referenced_parent(self):
self.assertDBSuccess("UPDATE parent SET valid_range = '[1, 2)' WHERE identity_id = 1 AND valid_range = '[1,3)';")
self.assertDBSuccess("SELECT * FROM parent WHERE identity_id = 1 AND valid_range = '[1,2)'")
def test_delete_success(self):
self.assertDBSuccess("UPDATE parent SET identity_id = 2 WHERE valid_range = '[3,4)';")
self.assertDBSuccess("DELETE FROM parent WHERE identity_id = 2")
self.assertDBSuccess("SELECT count(*) FROM parent WHERE identity_id = 2")
row = self.c.fetchone()
self.assertEqual(row[0], 0)
def test_null_fk(self):
self.assertDBSuccess("UPDATE child SET parent = NULL")
self.assertDBSuccess("DELETE FROM parent")
def test_insert_not_completely_range(self):
self.assertDBFailure("INSERT INTO child VALUES ('[1,7)', 1);")
def test_insert_no_match_at_all(self):
self.assertDBFailure("INSERT INTO child VALUES ('[1,7)', 11);")
def test_insert_gap_in_the_middle(self):
self.assertDBFailure("INSERT INTO child VALUES ('[2,7)', 4);")
def test_commit(self):
"""
Committing involves shrinking the valid range, then inserting a new row
that extends it, requiring a deferred constraint.
"""
self.assertDBSuccess("INSERT INTO child VALUES ('[2,6)', 5)")
with transaction.atomic():
self.assertDBSuccess("UPDATE parent SET valid_range = '[1,4)' WHERE identity_id = 5")
self.assertDBSuccess("INSERT INTO parent VALUES (5, '[4,7)')")
self.assertDBSuccess("INSERT INTO child VALUES ('[6,7)', 5)")
CREATE OR REPLACE FUNCTION recheck_children(parent_table TEXT, fk_val INT, valid_range anyelement, lock_mode TEXT) RETURNS boolean AS $f$
DECLARE
has_gaps INT;
query TEXT;
dependency RECORD;
BEGIN
-- First, lock all the parent rows we depend on (all of them that overlap),
-- and make sure there is at least one.
query := format('SELECT 1 FROM %I WHERE identity_id = $2 AND valid_range && $1 %s', parent_table, lock_mode);
EXECUTE query USING valid_range, fk_val INTO dependency;
RAISE DEBUG '%, %', query, dependency;
IF dependency IS NULL THEN
RETURN FALSE;
END IF;
-- Now, make sure there are no gaps in parent during our range of validity.
-- This query will return rows iff there are gaps.
query := format($$
SELECT 1
FROM %I current
LEFT JOIN %I next ON next.identity_id = current.identity_id AND next.valid_range -|- current.valid_range and current.valid_range << next.valid_range
LEFT JOIN %I prev ON prev.identity_id = current.identity_id AND prev.valid_range -|- current.valid_range and prev.valid_range << current.valid_range
WHERE ((NOT $1 &< current.valid_range AND next.valid_range IS NULL AND NOT current.valid_range << $1 )
OR (NOT $1 &> current.valid_range AND prev.valid_range IS NULL AND NOT current.valid_range >> $1))
AND current.identity_id = $2
LIMIT 1
$$, parent_table, parent_table, parent_table);
EXECUTE query USING valid_range, fk_val INTO has_gaps;
RAISE DEBUG '%, %', query, has_gaps;
IF has_gaps IS NOT NULL THEN
RAISE EXCEPTION 'There is a gap in (%.identity_id)=(%) during %', parent_table, fk_val, valid_range;
END IF;
RETURN TRUE;
END;
$f$ LANGUAGE 'plpgsql';
CREATE OR REPLACE FUNCTION check_fk_child() RETURNS trigger AS $f$
DECLARE
fk_local TEXT := TG_ARGV[0];
parent_table TEXT := TG_ARGV[1];
fk_val INT;
BEGIN
-- fk_val = getattr(NEW, fk_local)
EXECUTE format('SELECT $1.%I', fk_local) USING NEW INTO fk_val;
IF fk_val IS NULL THEN
RETURN NULL;
END IF;
IF NOT recheck_children(parent_table, fk_val, NEW.valid_range, 'FOR SHARE') THEN
RAISE EXCEPTION 'insert or update on table "%" violates foreign key trigger "%"', TG_RELNAME, TG_NAME USING
DETAIL=format('Key (%s)=(%s) (valid_range)&&(%s), is not present in table "%s".', fk_local, fk_val, NEW.valid_range, parent_table);
END IF;
return NULL;
END;
$f$ LANGUAGE 'plpgsql';
CREATE OR REPLACE FUNCTION check_fk_parent() RETURNS trigger AS $f$
DECLARE
child_table TEXT := TG_ARGV[0];
fk_name TEXT := TG_ARGV[1];
query TEXT;
child_row RECORD;
BEGIN
IF TG_OP = 'DELETE' OR NEW.identity_id != OLD.identity_id OR NEW.valid_range != OLD.valid_range THEN
query := format('SELECT valid_range FROM %I WHERE %I = $1 AND valid_range && $2 LIMIT 1', child_table, fk_name);
FOR child_row IN EXECUTE query USING OLD.identity_id, OLD.valid_range LOOP
IF NOT recheck_children(TG_RELNAME, OLD.identity_id, child_row.valid_range, '') THEN
RAISE EXCEPTION 'delete or update on table "%" violates foreign key trigger "%"', TG_RELNAME, TG_NAME USING
DETAIL=format('Key (identity_id)=(%s) (valid_range)&&(%s), is not present in table "%s".', OLD.identity_id, NEW.valid_range, TG_RELNAME);
END IF;
END LOOP;
END IF;
RETURN NULL;
END;
$f$ LANGUAGE 'plpgsql';
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment