Skip to content

Instantly share code, notes, and snippets.

@marcocitus
Last active May 15, 2017 07:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save marcocitus/e02f457afa37dec45e43546d34f094e2 to your computer and use it in GitHub Desktop.
Save marcocitus/e02f457afa37dec45e43546d34f094e2 to your computer and use it in GitHub Desktop.
Partitioning in Citus 6.1, 6.2
CREATE TABLE events (
id int not null,
created_at timestamptz,
payload jsonb
);
CREATE INDEX ON events (id);
CREATE INDEX ON events USING BRIN (created_at);
SELECT create_distributed_table('events', 'id');
-- Set up the trigger on workers (idempotent)
SELECT partition_distributed_table_by_date('events', 'created_at');
-- Create a distributed table for each day and attach all shard placements to the parent table (idempotent)
SELECT create_partitions_for_date_range('events', 'created_at', '2017-05-14', '2017-05-21');
INSERT INTO events VALUES (1, '2017-05-14 10:00:00', '{"hello":"world"}');
INSERT INTO events VALUES (1, '2017-05-20 10:00:00', '{"foo":"bar"}');
SELECT * FROM events;
id | created_at | payload
----+------------------------+--------------------
1 | 2017-05-14 10:00:00+02 | {"hello": "world"}
1 | 2017-05-20 10:00:00+02 | {"foo": "bar"}
(2 rows)
-- Detach the shard placements of all partitions before 2017-05-15, exclusive (idempotent)
SELECT detach_partitions('events', '2017-05-15');
SELECT * FROM events;
id | created_at | payload
----+------------------------+----------------
1 | 2017-05-20 10:00:00+02 | {"foo": "bar"}
(1 row)
-- Reattach all shard placements of all partitions to their parents (idempotent)
SELECT reattach_partitions('events');
SELECT * FROM events;
id | created_at | payload
----+------------------------+--------------------
1 | 2017-05-14 10:00:00+02 | {"hello": "world"}
1 | 2017-05-20 10:00:00+02 | {"foo": "bar"}
(2 rows)
-- Drop all partitions we've created so far (idempotent)
SELECT drop_partitions('events', '2017-05-31');
SELECT * FROM events;
id | created_at | payload
----+------------+---------
(0 rows)
-- Partitioning metadata
CREATE TABLE partitions (
parent_table regclass not null,
day date not null,
child_table text not null,
PRIMARY KEY (parent_table, day)
);
CREATE TABLE shard_partitions (
parent_shard text not null,
day date not null,
child_table text not null,
child_shard text not null,
PRIMARY KEY (parent_shard, day)
);
SELECT create_reference_table('shard_partitions');
-- Partitioning functions
CREATE OR REPLACE FUNCTION partition_distributed_table_by_date(table_name text, column_name text)
RETURNS void LANGUAGE plpgsql SET search_path = public,pg_catalog
AS $fn$
DECLARE
BEGIN
/* create trigger function on workers */
PERFORM run_command_on_workers(format($cmd$
CREATE OR REPLACE FUNCTION %s_insert_router()
RETURNS TRIGGER LANGUAGE plpgsql
AS $ifn$
DECLARE
target_table text;
BEGIN
SELECT child_shard INTO target_table FROM %s
WHERE parent_shard = TG_TABLE_NAME AND day = NEW.%s::date;
IF NOT FOUND THEN
RAISE EXCEPTION 'no partition for date %%', NEW.%s::date;
END IF;
EXECUTE 'INSERT INTO '|| target_table ||' SELECT ($1).*' USING NEW;
RETURN NULL;
END;
$ifn$;$cmd$, table_name, shard_name(logicalrelid, shardid), column_name, column_name))
FROM pg_dist_shard WHERE logicalrelid = 'shard_partitions'::regclass;
/* add trigger to shards */
PERFORM run_command_on_placements(table_name, format($cmd$
CREATE TRIGGER insert_trigger BEFORE INSERT ON %%I
FOR EACH ROW EXECUTE PROCEDURE %s_insert_router();
$cmd$, table_name));
END;
$fn$;
CREATE OR REPLACE FUNCTION create_partitions_for_date_range(
table_name regclass,
column_name text,
start_date date,
end_date date)
RETURNS void LANGUAGE plpgsql SET search_path = public,pg_catalog
AS $fn$
BEGIN
/* create partitions for days which don't yet have a partition */
PERFORM create_partition_for_date(table_name, column_name, d::date)
FROM generate_series(start_date, end_date, interval '1 day') d LEFT JOIN partitions ON (d = partitions.day)
WHERE partitions.day IS NULL;
END;
$fn$;
CREATE OR REPLACE FUNCTION create_partition_for_date(
table_name regclass,
column_name text,
day date)
RETURNS void LANGUAGE plpgsql SET search_path = public,pg_catalog
AS $fn$
DECLARE
partition_name text := table_name || '_' || to_char(day, 'YYYYMMDD');
rec record;
BEGIN
/* create the child table */
EXECUTE format('CREATE TABLE %I (LIKE %I INCLUDING ALL)', partition_name, table_name);
EXECUTE format('ALTER TABLE %I ADD CONSTRAINT %I CHECK (%I >= date %L AND %I < date %L)',
partition_name, partition_name || '_cons', column_name, day, column_name, day + interval '1 day');
PERFORM create_distributed_table(partition_name, column_to_column_name(logicalrelid, partkey))
FROM pg_dist_partition WHERE logicalrelid = table_name;
/* attach each shard placement to the co-located shard placement of the parent */
PERFORM run_command_on_colocated_placements(partition_name, table_name, 'ALTER TABLE %I INHERIT %I');
/* add shard mappings for insert trigger */
FOR rec IN
SELECT shard_name(parent.logicalrelid, parent.shardid) AS parent_shard,
shard_name(child.logicalrelid, child.shardid) AS child_shard
FROM pg_dist_shard parent JOIN pg_dist_shard child USING (shardminvalue)
WHERE parent.logicalrelid = table_name AND child.logicalrelid = partition_name::regclass
LOOP
EXECUTE format('INSERT INTO shard_partitions VALUES (%L,%L,%L,%L)',
rec.parent_shard, day, partition_name, rec.child_shard);
END LOOP;
/* add distributed table mappings for administrative operations */
EXECUTE format('INSERT INTO partitions VALUES (%L,%L,%L)',
table_name, day, partition_name);
END;
$fn$;
CREATE OR REPLACE FUNCTION detach_partitions(
table_name regclass,
before_date date)
RETURNS void LANGUAGE plpgsql SET search_path = public,pg_catalog
AS $fn$
BEGIN
/* detach all shard placements of all partitions before a certain date */
PERFORM run_command_on_colocated_placements(child_table, table_name, 'ALTER TABLE %I NO INHERIT %I')
FROM partitions WHERE parent_table = table_name AND day < before_date;
END;
$fn$;
CREATE OR REPLACE FUNCTION reattach_partitions(table_name regclass)
RETURNS void LANGUAGE plpgsql SET search_path = public,pg_catalog
AS $fn$
BEGIN
/* reattach all shard placements of all partitions before a certain date */
PERFORM run_command_on_colocated_placements(child_table, table_name, 'ALTER TABLE %I INHERIT %I')
FROM partitions WHERE parent_table = table_name;
END;
$fn$;
CREATE OR REPLACE FUNCTION drop_partitions(
table_name regclass,
before_date date)
RETURNS void LANGUAGE plpgsql SET search_path = public,pg_catalog
AS $fn$
DECLARE
table_to_drop text;
BEGIN
/* drop all partitions before a certain date */
FOR table_to_drop IN
SELECT child_table FROM partitions
WHERE parent_table = table_name AND day < before_date
LOOP
EXECUTE format('DROP TABLE IF EXISTS %I', table_to_drop);
END LOOP;
END;
$fn$;
CREATE OR REPLACE FUNCTION delete_partition_metadata()
RETURNS event_trigger LANGUAGE plpgsql SET search_path = public,pg_catalog
AS $fn$
DECLARE
table_being_dropped text;
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.tables
WHERE table_schema = 'public' AND table_name = 'partitions') THEN
RETURN;
END IF;
/* delete metadata from partitions and shard_partitions when a table is dropped */
FOR table_being_dropped IN
SELECT object_name
FROM pg_event_trigger_dropped_objects(), partitions
WHERE object_name = child_table
LOOP
DELETE FROM partitions WHERE child_table = table_being_dropped;
DELETE FROM shard_partitions WHERE child_table = table_being_dropped;
END LOOP;
END;
$fn$;
CREATE EVENT TRIGGER delete_partition_metadata_trigger ON SQL_DROP
EXECUTE PROCEDURE delete_partition_metadata();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment