Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Simple partitioning for Citus
-- Functions to partition a table and drop old partitions
CREATE SEQUENCE IF NOT EXISTS partition_id_seq;
CREATE OR REPLACE FUNCTION partition_create(
table_name regclass)
RETURNS text
LANGUAGE plpgsql
AS $function$
DECLARE
function_name text := table_name || '_insert_handler';
trigger_name text := table_name || '_trigger';
current_name text := table_name || '_0';
BEGIN
PERFORM worker_apply_shard_ddl_command(0, command)
FROM master_get_table_ddl_events(table_name::text) command;
EXECUTE format($$ALTER TABLE %s INHERIT %s$$,
current_name::regclass,
table_name);
EXECUTE format($$CREATE OR REPLACE FUNCTION %s()
RETURNS TRIGGER
LANGUAGE plpgsql
AS '
BEGIN
INSERT INTO %s VALUES (NEW.*);
RETURN NULL;
END;' $$,
function_name,
current_name::regclass);
EXECUTE format($$CREATE TRIGGER %I BEFORE INSERT ON %s
FOR EACH ROW EXECUTE PROCEDURE %s(%L)$$,
trigger_name,
table_name,
function_name,
current_name::regclass);
RETURN current_name;
END;
$function$;
CREATE OR REPLACE FUNCTION partition_rotate(
table_name regclass)
RETURNS text
LANGUAGE plpgsql
AS $function$
DECLARE
partition_id bigint := nextval('partition_id_seq');
partition_name text := table_name || '_' || partition_id;
current_name text := table_name || '_0';
index_name text;
BEGIN
EXECUTE format($$ALTER TABLE %s RENAME TO %I$$,
current_name::regclass,
partition_name);
FOR index_name IN
SELECT indexname
FROM pg_indexes
WHERE tablename = table_name::name
LOOP
EXECUTE format($$ALTER INDEX %I RENAME TO %I$$,
index_name,
index_name||'_'||partition_id);
END LOOP;
PERFORM worker_apply_shard_ddl_command(0, command)
FROM master_get_table_ddl_events(table_name::text) command;
EXECUTE format($$ALTER TABLE %s INHERIT %s$$,
current_name::regclass,
table_name);
RETURN partition_name;
END;
$function$;
CREATE OR REPLACE FUNCTION partition_drop_old(
table_name regclass,
partition_column text,
max_time timestamp)
RETURNS SETOF text
LANGUAGE plpgsql
AS $function$
DECLARE
drop_table record;
BEGIN
FOR drop_table IN
SELECT inhrelid::regclass AS name, relkind AS kind
FROM pg_inherits i JOIN pg_class c ON (i.inhrelid = c.oid)
WHERE inhparent = table_name
AND partition_is_old(inhrelid, partition_column, max_time)
AND inhrelid::regclass::text NOT LIKE '%\_0'
LOOP
IF relkind = 'f' THEN
EXECUTE format($$DROP FOREIGN TABLE %s$$, drop_table.name);
ELSE
EXECUTE format($$DROP TABLE %s$$, drop_table.name);
END IF;
RETURN NEXT drop_table;
END LOOP;
RETURN;
END;
$function$;
CREATE OR REPLACE FUNCTION partition_is_old(
table_name regclass,
partition_column text,
max_time timestamp)
RETURNS bool
LANGUAGE plpgsql
AS $function$
DECLARE
result timestamp;
BEGIN
EXECUTE format($$SELECT max(%I) FROM %s$$, partition_column, table_name)
INTO result;
RETURN result IS NULL OR result < max_time;
END;
$function$;
-- Functions to compress old partitions
CREATE EXTENSION IF NOT EXISTS cstore_fdw;
CREATE SERVER IF NOT EXISTS cstore_server FOREIGN DATA WRAPPER cstore_fdw;
CREATE OR REPLACE FUNCTION partition_compress_old(
table_name regclass,
partition_column text,
max_time timestamp)
RETURNS SETOF text
LANGUAGE plpgsql
AS $function$
DECLARE
tables_to_compress regclass[];
compress_table text;
temp_table text;
min_value timestamp;
max_value timestamp;
BEGIN
SELECT array_agg(inhrelid::regclass) INTO tables_to_compress
FROM pg_inherits i JOIN pg_class c ON (i.inhrelid = c.oid)
WHERE inhparent = table_name
AND partition_is_old(inhrelid, partition_column, max_time)
AND inhrelid::regclass::text NOT LIKE '%\_0'
AND relkind = 'r';
IF tables_to_compress IS NULL THEN
RETURN;
END IF;
FOREACH compress_table IN ARRAY tables_to_compress
LOOP
temp_table := compress_table||'_tmp';
EXECUTE format($$CREATE FOREIGN TABLE %s ()
INHERITS (%s)
SERVER cstore_server
OPTIONS(compression 'pglz')$$,
temp_table,
table_name);
EXECUTE format($$INSERT INTO %s SELECT * FROM %s$$,
temp_table::regclass,
compress_table);
EXECUTE format($$SELECT min(%I), max(%I) FROM %s$$,
partition_column,
partition_column,
compress_table)
INTO min_value, max_value;
IF min_value IS NOT NULL AND max_value IS NOT NULL THEN
EXECUTE format($$ALTER TABLE %s ADD CONSTRAINT time_range
CHECK (%s BETWEEN %L AND %L) $$,
temp_table,
partition_column,
min_value,
max_value);
END IF;
END LOOP;
FOREACH compress_table IN ARRAY tables_to_compress
LOOP
temp_table := compress_table||'_tmp';
EXECUTE format($$DROP TABLE %s$$,
compress_table);
EXECUTE format($$ALTER TABLE %s RENAME TO %s$$,
temp_table::regclass,
compress_table);
RETURN NEXT compress_table;
END LOOP;
RETURN;
END;
$function$;
-- Create a table
CREATE TABLE measurement (...);
CREATE INDEX ON measurement (logdate);
-- Initialize partitioning (creates measurement_0)
SELECT partition_create('measurement');
-- Insert a bunch of data (goes into measurement_0)
INSERT INTO measurement ...
-- Rotate measurement (measurement_0 renamed to measurement_x)
SELECT partition_rotate('measurement');
-- Insert a bunch of data (goes into measurement_0)
INSERT INTO measurement ...
-- Clean up partitions with only data from before 2015-04-01
SELECT partition_drop_old('measurement', 'logdate', date '2015-04-01');
-- Compress partitions with only data from before 2016-04-01
SELECT partition_compress_old('measurement', 'logdate', date '2016-04-01');
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.