Last active
June 19, 2017 16:44
-
-
Save marcocitus/a2fa5d42053416e4c7dfa37fe91ddefa to your computer and use it in GitHub Desktop.
Split shards in Citus (experimental)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE OR REPLACE FUNCTION run_command_on_master_and_workers(p_sql text) | |
RETURNS void LANGUAGE plpgsql AS $function$ | |
BEGIN | |
EXECUTE p_sql; | |
PERFORM run_command_on_workers(p_sql); | |
END; | |
$function$; | |
SELECT run_command_on_master_and_workers($cmd$ | |
CREATE SCHEMA IF NOT EXISTS citus_split_lower | |
$cmd$); | |
SELECT run_command_on_master_and_workers($cmd$ | |
CREATE SCHEMA IF NOT EXISTS citus_split_upper | |
$cmd$); | |
SELECT run_command_on_master_and_workers($cmd$ | |
CREATE TYPE shard_range AS ( | |
shard_name text, | |
shardminvalue bigint, | |
shardmaxvalue bigint | |
); | |
$cmd$); | |
SELECT run_command_on_workers($cmd$ | |
CREATE OR REPLACE FUNCTION worker_split_shard( | |
old_shard_name regclass, | |
new_shard_name regclass, | |
partition_column text, | |
shards shard_range[], | |
upper bool) | |
RETURNS void | |
LANGUAGE plpgsql | |
AS $function$ | |
DECLARE | |
min_value bigint; | |
max_value bigint; | |
BEGIN | |
IF upper THEN | |
SELECT shardminvalue + (shardmaxvalue - shardminvalue) / 2 + 1, shardmaxvalue INTO min_value, max_value | |
FROM unnest(shards) WHERE shard_name = old_shard_name::text; | |
ELSE | |
SELECT shardminvalue, shardminvalue + (shardmaxvalue - shardminvalue) / 2 INTO min_value, max_value | |
FROM unnest(shards) WHERE shard_name = old_shard_name::text; | |
END IF; | |
EXECUTE format($$INSERT INTO %s SELECT * FROM %I WHERE worker_hash(%s) BETWEEN %L AND %L$$, | |
new_shard_name, old_shard_name, partition_column, min_value, max_value); | |
END; | |
$function$; | |
$cmd$); | |
-- create on coordinator | |
CREATE OR REPLACE FUNCTION split_shards(table_name regclass) | |
RETURNS void | |
LANGUAGE plpgsql | |
AS $function$ | |
DECLARE | |
colocation_id int; | |
BEGIN | |
SELECT colocationid INTO colocation_id | |
FROM pg_dist_partition WHERE logicalrelid = table_name; | |
PERFORM block_writes_to_table(logicalrelid) FROM pg_dist_partition WHERE colocationid = colocation_id; | |
PERFORM split_shards_of_table(logicalrelid) FROM pg_dist_partition WHERE colocationid = colocation_id; | |
END; | |
$function$; | |
CREATE OR REPLACE FUNCTION block_writes_to_table(table_name regclass) | |
RETURNS void | |
LANGUAGE plpgsql | |
AS $function$ | |
BEGIN | |
EXECUTE format($$LOCK TABLE %s IN SHARE ROW EXCLUSIVE MODE$$, table_name); | |
END; | |
$function$; | |
CREATE OR REPLACE FUNCTION split_shards_of_table(table_name regclass) | |
RETURNS void | |
LANGUAGE plpgsql | |
AS $function$ | |
DECLARE | |
schema_name text := 'public'; | |
lower_split text; | |
upper_split text; | |
partition_column text; | |
shards shard_range[]; | |
command_succeeded bool; | |
error_message text; | |
BEGIN | |
PERFORM block_writes_to_table(table_name); | |
lower_split := format('citus_split_lower.%s', table_name); | |
upper_split := format('citus_split_upper.%s', table_name); | |
/* create identical distributed tables for the 2 splits */ | |
EXECUTE format($$CREATE TABLE %s (LIKE %s INCLUDING ALL)$$, lower_split, table_name); | |
EXECUTE format($$CREATE TABLE %s (LIKE %s INCLUDING ALL)$$, upper_split, table_name); | |
SELECT column_to_column_name(logicalrelid, partkey) INTO partition_column | |
FROM pg_dist_partition WHERE logicalrelid = table_name; | |
RAISE NOTICE 'creating lower split for table %', table_name; | |
PERFORM create_distributed_table(lower_split, partition_column, colocate_with := table_name::text); | |
RAISE NOTICE 'creating upper split for table %', table_name; | |
PERFORM create_distributed_table(upper_split, partition_column, colocate_with := table_name::text); | |
SELECT array_agg((shard_name(logicalrelid, shardid),shardminvalue,shardmaxvalue)::shard_range) INTO shards | |
FROM pg_dist_shard s WHERE logicalrelid = table_name; | |
/* update shard ranges of lower and upper split */ | |
RAISE NOTICE 'filling lower split of table %', table_name; | |
SELECT min(result), count(*) = 0 INTO error_message, command_succeeded | |
FROM run_command_on_colocated_placements(table_name, lower_split, format($$ | |
SELECT worker_split_shard(%%L, 'citus_split_lower.'||%%L, %L, %L::shard_range[], false) | |
$$, partition_column, shards::text)) | |
WHERE success = 'f'; | |
IF NOT command_succeeded THEN | |
RAISE 'splitting shards failed: %', error_message; | |
END IF; | |
RAISE NOTICE 'filling upper split of table %', table_name; | |
SELECT min(result), count(*) = 0 INTO error_message, command_succeeded | |
FROM run_command_on_colocated_placements(table_name, upper_split, format($$ | |
SELECT worker_split_shard(%%L, 'citus_split_upper.'||%%L, %L, %L::shard_range[], true) | |
$$, partition_column, shards::text)) | |
WHERE success = 'f'; | |
IF NOT command_succeeded THEN | |
RAISE 'splitting shards failed: %', error_message; | |
END IF; | |
RAISE NOTICE 'dropping old shards of table %', table_name; | |
PERFORM master_drop_all_shards(table_name,'',''); | |
RAISE NOTICE 'replacing old shards with split in table %', table_name; | |
SELECT min(result), count(*) = 0 INTO error_message, command_succeeded | |
FROM run_command_on_placements(lower_split, format($$ALTER TABLE citus_split_lower.%%I SET SCHEMA %I$$, schema_name)) | |
WHERE success = 'f'; | |
IF NOT command_succeeded THEN | |
RAISE 'replacing shards failed: %', error_message; | |
END IF; | |
SELECT min(result), count(*) = 0 INTO error_message, command_succeeded | |
FROM run_command_on_placements(upper_split, format($$ALTER TABLE citus_split_upper.%%I SET SCHEMA %I$$, schema_name)) | |
WHERE success = 'f'; | |
IF NOT command_succeeded THEN | |
RAISE 'replacing shards failed: %', error_message; | |
END IF; | |
UPDATE pg_dist_shard | |
SET shardmaxvalue = shardminvalue::bigint + (shardmaxvalue::bigint - shardminvalue::bigint) / 2 | |
WHERE logicalrelid = lower_split::regclass AND shardmaxvalue::bigint > shardminvalue::bigint; | |
UPDATE pg_dist_shard | |
SET shardminvalue = shardminvalue::bigint + (shardmaxvalue::bigint - shardminvalue::bigint) / 2 + 1 | |
WHERE logicalrelid = upper_split::regclass AND shardmaxvalue::bigint > shardminvalue::bigint; | |
UPDATE pg_dist_shard | |
SET logicalrelid = table_name | |
WHERE logicalrelid = lower_split::regclass OR logicalrelid = upper_split::regclass; | |
EXECUTE format($$DROP TABLE %s$$, upper_split); | |
EXECUTE format($$DROP TABLE %s$$, lower_split); | |
END; | |
$function$; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE TABLE test (x int, y int); | |
SELECT create_distributed_table('test','x'); | |
INSERT INTO test VALUES (1,2); | |
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'test'::regclass; | |
count | |
------- | |
32 | |
(1 row) | |
SELECT split_shards('test'); | |
NOTICE: creating lower split for table test | |
NOTICE: creating upper split for table test | |
NOTICE: filling lower split of table test | |
NOTICE: filling upper split of table test | |
NOTICE: dropping old shards of table test | |
NOTICE: replacing old shards with split in table test | |
split_shards | |
-------------- | |
(1 row) | |
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'test'::regclass; | |
count | |
------- | |
64 | |
(1 row) | |
SELECT * FROM test; | |
x | y | |
---+--- | |
1 | 2 | |
(1 row) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The function currently renames indexes on the shards by using
LIKE .. INCLUDING ALL
, which prevents a subsequentDROP INDEX
, but this is easy to fix by renaming the index on the coordinator.