Last active
May 13, 2016 18:35
-
-
Save marcocitus/9bc622b31d3faaa2ad6c0b6eeb08e91d to your computer and use it in GitHub Desktop.
Simple partitioning for Citus
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- Functions to partition a table and drop old partitions | |
CREATE SEQUENCE IF NOT EXISTS partition_id_seq; | |
CREATE OR REPLACE FUNCTION partition_create( | |
table_name regclass) | |
RETURNS text | |
LANGUAGE plpgsql | |
AS $function$ | |
DECLARE | |
function_name text := table_name || '_insert_handler'; | |
trigger_name text := table_name || '_trigger'; | |
current_name text := table_name || '_0'; | |
BEGIN | |
PERFORM worker_apply_shard_ddl_command(0, command) | |
FROM master_get_table_ddl_events(table_name::text) command; | |
EXECUTE format($$ALTER TABLE %s INHERIT %s$$, | |
current_name::regclass, | |
table_name); | |
EXECUTE format($$CREATE OR REPLACE FUNCTION %s() | |
RETURNS TRIGGER | |
LANGUAGE plpgsql | |
AS ' | |
BEGIN | |
INSERT INTO %s VALUES (NEW.*); | |
RETURN NULL; | |
END;' $$, | |
function_name, | |
current_name::regclass); | |
EXECUTE format($$CREATE TRIGGER %I BEFORE INSERT ON %s | |
FOR EACH ROW EXECUTE PROCEDURE %s(%L)$$, | |
trigger_name, | |
table_name, | |
function_name, | |
current_name::regclass); | |
RETURN current_name; | |
END; | |
$function$; | |
CREATE OR REPLACE FUNCTION partition_rotate( | |
table_name regclass) | |
RETURNS text | |
LANGUAGE plpgsql | |
AS $function$ | |
DECLARE | |
partition_id bigint := nextval('partition_id_seq'); | |
partition_name text := table_name || '_' || partition_id; | |
current_name text := table_name || '_0'; | |
index_name text; | |
BEGIN | |
EXECUTE format($$ALTER TABLE %s RENAME TO %I$$, | |
current_name::regclass, | |
partition_name); | |
FOR index_name IN | |
SELECT indexname | |
FROM pg_indexes | |
WHERE tablename = table_name::name | |
LOOP | |
EXECUTE format($$ALTER INDEX %I RENAME TO %I$$, | |
index_name, | |
index_name||'_'||partition_id); | |
END LOOP; | |
PERFORM worker_apply_shard_ddl_command(0, command) | |
FROM master_get_table_ddl_events(table_name::text) command; | |
EXECUTE format($$ALTER TABLE %s INHERIT %s$$, | |
current_name::regclass, | |
table_name); | |
RETURN partition_name; | |
END; | |
$function$; | |
CREATE OR REPLACE FUNCTION partition_drop_old( | |
table_name regclass, | |
partition_column text, | |
max_time timestamp) | |
RETURNS SETOF text | |
LANGUAGE plpgsql | |
AS $function$ | |
DECLARE | |
drop_table record; | |
BEGIN | |
FOR drop_table IN | |
SELECT inhrelid::regclass AS name, relkind AS kind | |
FROM pg_inherits i JOIN pg_class c ON (i.inhrelid = c.oid) | |
WHERE inhparent = table_name | |
AND partition_is_old(inhrelid, partition_column, max_time) | |
AND inhrelid::regclass::text NOT LIKE '%\_0' | |
LOOP | |
IF relkind = 'f' THEN | |
EXECUTE format($$DROP FOREIGN TABLE %s$$, drop_table.name); | |
ELSE | |
EXECUTE format($$DROP TABLE %s$$, drop_table.name); | |
END IF; | |
RETURN NEXT drop_table; | |
END LOOP; | |
RETURN; | |
END; | |
$function$; | |
CREATE OR REPLACE FUNCTION partition_is_old( | |
table_name regclass, | |
partition_column text, | |
max_time timestamp) | |
RETURNS bool | |
LANGUAGE plpgsql | |
AS $function$ | |
DECLARE | |
result timestamp; | |
BEGIN | |
EXECUTE format($$SELECT max(%I) FROM %s$$, partition_column, table_name) | |
INTO result; | |
RETURN result IS NULL OR result < max_time; | |
END; | |
$function$; | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- Functions to compress old partitions | |
CREATE EXTENSION IF NOT EXISTS cstore_fdw; | |
CREATE SERVER IF NOT EXISTS cstore_server FOREIGN DATA WRAPPER cstore_fdw; | |
CREATE OR REPLACE FUNCTION partition_compress_old( | |
table_name regclass, | |
partition_column text, | |
max_time timestamp) | |
RETURNS SETOF text | |
LANGUAGE plpgsql | |
AS $function$ | |
DECLARE | |
tables_to_compress regclass[]; | |
compress_table text; | |
temp_table text; | |
min_value timestamp; | |
max_value timestamp; | |
BEGIN | |
SELECT array_agg(inhrelid::regclass) INTO tables_to_compress | |
FROM pg_inherits i JOIN pg_class c ON (i.inhrelid = c.oid) | |
WHERE inhparent = table_name | |
AND partition_is_old(inhrelid, partition_column, max_time) | |
AND inhrelid::regclass::text NOT LIKE '%\_0' | |
AND relkind = 'r'; | |
IF tables_to_compress IS NULL THEN | |
RETURN; | |
END IF; | |
FOREACH compress_table IN ARRAY tables_to_compress | |
LOOP | |
temp_table := compress_table||'_tmp'; | |
EXECUTE format($$CREATE FOREIGN TABLE %s () | |
INHERITS (%s) | |
SERVER cstore_server | |
OPTIONS(compression 'pglz')$$, | |
temp_table, | |
table_name); | |
EXECUTE format($$INSERT INTO %s SELECT * FROM %s$$, | |
temp_table::regclass, | |
compress_table); | |
EXECUTE format($$SELECT min(%I), max(%I) FROM %s$$, | |
partition_column, | |
partition_column, | |
compress_table) | |
INTO min_value, max_value; | |
IF min_value IS NOT NULL AND max_value IS NOT NULL THEN | |
EXECUTE format($$ALTER TABLE %s ADD CONSTRAINT time_range | |
CHECK (%s BETWEEN %L AND %L) $$, | |
temp_table, | |
partition_column, | |
min_value, | |
max_value); | |
END IF; | |
END LOOP; | |
FOREACH compress_table IN ARRAY tables_to_compress | |
LOOP | |
temp_table := compress_table||'_tmp'; | |
EXECUTE format($$DROP TABLE %s$$, | |
compress_table); | |
EXECUTE format($$ALTER TABLE %s RENAME TO %s$$, | |
temp_table::regclass, | |
compress_table); | |
RETURN NEXT compress_table; | |
END LOOP; | |
RETURN; | |
END; | |
$function$; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- Create a table | |
CREATE TABLE measurement (...); | |
CREATE INDEX ON measurement (logdate); | |
-- Initialize partitioning (creates measurement_0) | |
SELECT partition_create('measurement'); | |
-- Insert a bunch of data (goes into measurement_0) | |
INSERT INTO measurement ... | |
-- Rotate measurement (measurement_0 renamed to measurement_x) | |
SELECT partition_rotate('measurement'); | |
-- Insert a bunch of data (goes into measurement_0) | |
INSERT INTO measurement ... | |
-- Clean up partitions with only data from before 2015-04-01 | |
SELECT partition_drop_old('measurement', 'logdate', date '2015-04-01'); | |
-- Compress partitions with only data from before 2016-04-01 | |
SELECT partition_compress_old('measurement', 'logdate', date '2016-04-01'); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment