Last active
May 15, 2017 07:45
-
-
Save marcocitus/e02f457afa37dec45e43546d34f094e2 to your computer and use it in GitHub Desktop.
Partitioning in Citus 6.1, 6.2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE TABLE events ( | |
id int not null, | |
created_at timestamptz, | |
payload jsonb | |
); | |
CREATE INDEX ON events (id); | |
CREATE INDEX ON events USING BRIN (created_at); | |
SELECT create_distributed_table('events', 'id'); | |
-- Set up the trigger on workers (idempotent) | |
SELECT partition_distributed_table_by_date('events', 'created_at'); | |
-- Create a distributed table for each day and attach all shard placements to the parent table (idempotent) | |
SELECT create_partitions_for_date_range('events', 'created_at', '2017-05-14', '2017-05-21'); | |
INSERT INTO events VALUES (1, '2017-05-14 10:00:00', '{"hello":"world"}'); | |
INSERT INTO events VALUES (1, '2017-05-20 10:00:00', '{"foo":"bar"}'); | |
SELECT * FROM events; | |
id | created_at | payload | |
----+------------------------+-------------------- | |
1 | 2017-05-14 10:00:00+02 | {"hello": "world"} | |
1 | 2017-05-20 10:00:00+02 | {"foo": "bar"} | |
(2 rows) | |
-- Detach the shard placements of all partitions before 2017-05-15, exclusive (idempotent) | |
SELECT detach_partitions('events', '2017-05-15'); | |
SELECT * FROM events; | |
id | created_at | payload | |
----+------------------------+---------------- | |
1 | 2017-05-20 10:00:00+02 | {"foo": "bar"} | |
(1 row) | |
-- Reattach all shard placements of all partitions to their parents (idempotent) | |
SELECT reattach_partitions('events'); | |
SELECT * FROM events; | |
id | created_at | payload | |
----+------------------------+-------------------- | |
1 | 2017-05-14 10:00:00+02 | {"hello": "world"} | |
1 | 2017-05-20 10:00:00+02 | {"foo": "bar"} | |
(2 rows) | |
-- Drop all partitions we've created so far (idempotent) | |
SELECT drop_partitions('events', '2017-05-31'); | |
SELECT * FROM events; | |
id | created_at | payload | |
----+------------+--------- | |
(0 rows) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- Partitioning metadata | |
CREATE TABLE partitions ( | |
parent_table regclass not null, | |
day date not null, | |
child_table text not null, | |
PRIMARY KEY (parent_table, day) | |
); | |
CREATE TABLE shard_partitions ( | |
parent_shard text not null, | |
day date not null, | |
child_table text not null, | |
child_shard text not null, | |
PRIMARY KEY (parent_shard, day) | |
); | |
SELECT create_reference_table('shard_partitions'); | |
-- Partitioning functions | |
CREATE OR REPLACE FUNCTION partition_distributed_table_by_date(table_name text, column_name text) | |
RETURNS void LANGUAGE plpgsql SET search_path = public,pg_catalog | |
AS $fn$ | |
DECLARE | |
BEGIN | |
/* create trigger function on workers */ | |
PERFORM run_command_on_workers(format($cmd$ | |
CREATE OR REPLACE FUNCTION %s_insert_router() | |
RETURNS TRIGGER LANGUAGE plpgsql | |
AS $ifn$ | |
DECLARE | |
target_table text; | |
BEGIN | |
SELECT child_shard INTO target_table FROM %s | |
WHERE parent_shard = TG_TABLE_NAME AND day = NEW.%s::date; | |
IF NOT FOUND THEN | |
RAISE EXCEPTION 'no partition for date %%', NEW.%s::date; | |
END IF; | |
EXECUTE 'INSERT INTO '|| target_table ||' SELECT ($1).*' USING NEW; | |
RETURN NULL; | |
END; | |
$ifn$;$cmd$, table_name, shard_name(logicalrelid, shardid), column_name, column_name)) | |
FROM pg_dist_shard WHERE logicalrelid = 'shard_partitions'::regclass; | |
/* add trigger to shards */ | |
PERFORM run_command_on_placements(table_name, format($cmd$ | |
CREATE TRIGGER insert_trigger BEFORE INSERT ON %%I | |
FOR EACH ROW EXECUTE PROCEDURE %s_insert_router(); | |
$cmd$, table_name)); | |
END; | |
$fn$; | |
CREATE OR REPLACE FUNCTION create_partitions_for_date_range( | |
table_name regclass, | |
column_name text, | |
start_date date, | |
end_date date) | |
RETURNS void LANGUAGE plpgsql SET search_path = public,pg_catalog | |
AS $fn$ | |
BEGIN | |
/* create partitions for days which don't yet have a partition */ | |
PERFORM create_partition_for_date(table_name, column_name, d::date) | |
FROM generate_series(start_date, end_date, interval '1 day') d LEFT JOIN partitions ON (d = partitions.day) | |
WHERE partitions.day IS NULL; | |
END; | |
$fn$; | |
CREATE OR REPLACE FUNCTION create_partition_for_date( | |
table_name regclass, | |
column_name text, | |
day date) | |
RETURNS void LANGUAGE plpgsql SET search_path = public,pg_catalog | |
AS $fn$ | |
DECLARE | |
partition_name text := table_name || '_' || to_char(day, 'YYYYMMDD'); | |
rec record; | |
BEGIN | |
/* create the child table */ | |
EXECUTE format('CREATE TABLE %I (LIKE %I INCLUDING ALL)', partition_name, table_name); | |
EXECUTE format('ALTER TABLE %I ADD CONSTRAINT %I CHECK (%I >= date %L AND %I < date %L)', | |
partition_name, partition_name || '_cons', column_name, day, column_name, day + interval '1 day'); | |
PERFORM create_distributed_table(partition_name, column_to_column_name(logicalrelid, partkey)) | |
FROM pg_dist_partition WHERE logicalrelid = table_name; | |
/* attach each shard placement to the co-located shard placement of the parent */ | |
PERFORM run_command_on_colocated_placements(partition_name, table_name, 'ALTER TABLE %I INHERIT %I'); | |
/* add shard mappings for insert trigger */ | |
FOR rec IN | |
SELECT shard_name(parent.logicalrelid, parent.shardid) AS parent_shard, | |
shard_name(child.logicalrelid, child.shardid) AS child_shard | |
FROM pg_dist_shard parent JOIN pg_dist_shard child USING (shardminvalue) | |
WHERE parent.logicalrelid = table_name AND child.logicalrelid = partition_name::regclass | |
LOOP | |
EXECUTE format('INSERT INTO shard_partitions VALUES (%L,%L,%L,%L)', | |
rec.parent_shard, day, partition_name, rec.child_shard); | |
END LOOP; | |
/* add distributed table mappings for administrative operations */ | |
EXECUTE format('INSERT INTO partitions VALUES (%L,%L,%L)', | |
table_name, day, partition_name); | |
END; | |
$fn$; | |
CREATE OR REPLACE FUNCTION detach_partitions( | |
table_name regclass, | |
before_date date) | |
RETURNS void LANGUAGE plpgsql SET search_path = public,pg_catalog | |
AS $fn$ | |
BEGIN | |
/* detach all shard placements of all partitions before a certain date */ | |
PERFORM run_command_on_colocated_placements(child_table, table_name, 'ALTER TABLE %I NO INHERIT %I') | |
FROM partitions WHERE parent_table = table_name AND day < before_date; | |
END; | |
$fn$; | |
CREATE OR REPLACE FUNCTION reattach_partitions(table_name regclass) | |
RETURNS void LANGUAGE plpgsql SET search_path = public,pg_catalog | |
AS $fn$ | |
BEGIN | |
/* reattach all shard placements of all partitions before a certain date */ | |
PERFORM run_command_on_colocated_placements(child_table, table_name, 'ALTER TABLE %I INHERIT %I') | |
FROM partitions WHERE parent_table = table_name; | |
END; | |
$fn$; | |
CREATE OR REPLACE FUNCTION drop_partitions( | |
table_name regclass, | |
before_date date) | |
RETURNS void LANGUAGE plpgsql SET search_path = public,pg_catalog | |
AS $fn$ | |
DECLARE | |
table_to_drop text; | |
BEGIN | |
/* drop all partitions before a certain date */ | |
FOR table_to_drop IN | |
SELECT child_table FROM partitions | |
WHERE parent_table = table_name AND day < before_date | |
LOOP | |
EXECUTE format('DROP TABLE IF EXISTS %I', table_to_drop); | |
END LOOP; | |
END; | |
$fn$; | |
CREATE OR REPLACE FUNCTION delete_partition_metadata() | |
RETURNS event_trigger LANGUAGE plpgsql SET search_path = public,pg_catalog | |
AS $fn$ | |
DECLARE | |
table_being_dropped text; | |
BEGIN | |
IF NOT EXISTS ( | |
SELECT 1 FROM information_schema.tables | |
WHERE table_schema = 'public' AND table_name = 'partitions') THEN | |
RETURN; | |
END IF; | |
/* delete metadata from partitions and shard_partitions when a table is dropped */ | |
FOR table_being_dropped IN | |
SELECT object_name | |
FROM pg_event_trigger_dropped_objects(), partitions | |
WHERE object_name = child_table | |
LOOP | |
DELETE FROM partitions WHERE child_table = table_being_dropped; | |
DELETE FROM shard_partitions WHERE child_table = table_being_dropped; | |
END LOOP; | |
END; | |
$fn$; | |
CREATE EVENT TRIGGER delete_partition_metadata_trigger ON SQL_DROP | |
EXECUTE PROCEDURE delete_partition_metadata(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment