Skip to content

Instantly share code, notes, and snippets.

@marcocitus
Last active June 19, 2017 16:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save marcocitus/a2fa5d42053416e4c7dfa37fe91ddefa to your computer and use it in GitHub Desktop.
Save marcocitus/a2fa5d42053416e4c7dfa37fe91ddefa to your computer and use it in GitHub Desktop.
Split shards in Citus (experimental)
CREATE OR REPLACE FUNCTION run_command_on_master_and_workers(p_sql text)
RETURNS void LANGUAGE plpgsql AS $function$
BEGIN
EXECUTE p_sql;
PERFORM run_command_on_workers(p_sql);
END;
$function$;
SELECT run_command_on_master_and_workers($cmd$
CREATE SCHEMA IF NOT EXISTS citus_split_lower
$cmd$);
SELECT run_command_on_master_and_workers($cmd$
CREATE SCHEMA IF NOT EXISTS citus_split_upper
$cmd$);
SELECT run_command_on_master_and_workers($cmd$
CREATE TYPE shard_range AS (
shard_name text,
shardminvalue bigint,
shardmaxvalue bigint
);
$cmd$);
SELECT run_command_on_workers($cmd$
CREATE OR REPLACE FUNCTION worker_split_shard(
old_shard_name regclass,
new_shard_name regclass,
partition_column text,
shards shard_range[],
upper bool)
RETURNS void
LANGUAGE plpgsql
AS $function$
DECLARE
min_value bigint;
max_value bigint;
BEGIN
IF upper THEN
SELECT shardminvalue + (shardmaxvalue - shardminvalue) / 2 + 1, shardmaxvalue INTO min_value, max_value
FROM unnest(shards) WHERE shard_name = old_shard_name::text;
ELSE
SELECT shardminvalue, shardminvalue + (shardmaxvalue - shardminvalue) / 2 INTO min_value, max_value
FROM unnest(shards) WHERE shard_name = old_shard_name::text;
END IF;
EXECUTE format($$INSERT INTO %s SELECT * FROM %I WHERE worker_hash(%s) BETWEEN %L AND %L$$,
new_shard_name, old_shard_name, partition_column, min_value, max_value);
END;
$function$;
$cmd$);
-- create on coordinator
CREATE OR REPLACE FUNCTION split_shards(table_name regclass)
RETURNS void
LANGUAGE plpgsql
AS $function$
DECLARE
colocation_id int;
BEGIN
SELECT colocationid INTO colocation_id
FROM pg_dist_partition WHERE logicalrelid = table_name;
PERFORM block_writes_to_table(logicalrelid) FROM pg_dist_partition WHERE colocationid = colocation_id;
PERFORM split_shards_of_table(logicalrelid) FROM pg_dist_partition WHERE colocationid = colocation_id;
END;
$function$;
CREATE OR REPLACE FUNCTION block_writes_to_table(table_name regclass)
RETURNS void
LANGUAGE plpgsql
AS $function$
BEGIN
EXECUTE format($$LOCK TABLE %s IN SHARE ROW EXCLUSIVE MODE$$, table_name);
END;
$function$;
CREATE OR REPLACE FUNCTION split_shards_of_table(table_name regclass)
RETURNS void
LANGUAGE plpgsql
AS $function$
DECLARE
schema_name text := 'public';
lower_split text;
upper_split text;
partition_column text;
shards shard_range[];
command_succeeded bool;
error_message text;
BEGIN
PERFORM block_writes_to_table(table_name);
lower_split := format('citus_split_lower.%s', table_name);
upper_split := format('citus_split_upper.%s', table_name);
/* create identical distributed tables for the 2 splits */
EXECUTE format($$CREATE TABLE %s (LIKE %s INCLUDING ALL)$$, lower_split, table_name);
EXECUTE format($$CREATE TABLE %s (LIKE %s INCLUDING ALL)$$, upper_split, table_name);
SELECT column_to_column_name(logicalrelid, partkey) INTO partition_column
FROM pg_dist_partition WHERE logicalrelid = table_name;
RAISE NOTICE 'creating lower split for table %', table_name;
PERFORM create_distributed_table(lower_split, partition_column, colocate_with := table_name::text);
RAISE NOTICE 'creating upper split for table %', table_name;
PERFORM create_distributed_table(upper_split, partition_column, colocate_with := table_name::text);
SELECT array_agg((shard_name(logicalrelid, shardid),shardminvalue,shardmaxvalue)::shard_range) INTO shards
FROM pg_dist_shard s WHERE logicalrelid = table_name;
/* update shard ranges of lower and upper split */
RAISE NOTICE 'filling lower split of table %', table_name;
SELECT min(result), count(*) = 0 INTO error_message, command_succeeded
FROM run_command_on_colocated_placements(table_name, lower_split, format($$
SELECT worker_split_shard(%%L, 'citus_split_lower.'||%%L, %L, %L::shard_range[], false)
$$, partition_column, shards::text))
WHERE success = 'f';
IF NOT command_succeeded THEN
RAISE 'splitting shards failed: %', error_message;
END IF;
RAISE NOTICE 'filling upper split of table %', table_name;
SELECT min(result), count(*) = 0 INTO error_message, command_succeeded
FROM run_command_on_colocated_placements(table_name, upper_split, format($$
SELECT worker_split_shard(%%L, 'citus_split_upper.'||%%L, %L, %L::shard_range[], true)
$$, partition_column, shards::text))
WHERE success = 'f';
IF NOT command_succeeded THEN
RAISE 'splitting shards failed: %', error_message;
END IF;
RAISE NOTICE 'dropping old shards of table %', table_name;
PERFORM master_drop_all_shards(table_name,'','');
RAISE NOTICE 'replacing old shards with split in table %', table_name;
SELECT min(result), count(*) = 0 INTO error_message, command_succeeded
FROM run_command_on_placements(lower_split, format($$ALTER TABLE citus_split_lower.%%I SET SCHEMA %I$$, schema_name))
WHERE success = 'f';
IF NOT command_succeeded THEN
RAISE 'replacing shards failed: %', error_message;
END IF;
SELECT min(result), count(*) = 0 INTO error_message, command_succeeded
FROM run_command_on_placements(upper_split, format($$ALTER TABLE citus_split_upper.%%I SET SCHEMA %I$$, schema_name))
WHERE success = 'f';
IF NOT command_succeeded THEN
RAISE 'replacing shards failed: %', error_message;
END IF;
UPDATE pg_dist_shard
SET shardmaxvalue = shardminvalue::bigint + (shardmaxvalue::bigint - shardminvalue::bigint) / 2
WHERE logicalrelid = lower_split::regclass AND shardmaxvalue::bigint > shardminvalue::bigint;
UPDATE pg_dist_shard
SET shardminvalue = shardminvalue::bigint + (shardmaxvalue::bigint - shardminvalue::bigint) / 2 + 1
WHERE logicalrelid = upper_split::regclass AND shardmaxvalue::bigint > shardminvalue::bigint;
UPDATE pg_dist_shard
SET logicalrelid = table_name
WHERE logicalrelid = lower_split::regclass OR logicalrelid = upper_split::regclass;
EXECUTE format($$DROP TABLE %s$$, upper_split);
EXECUTE format($$DROP TABLE %s$$, lower_split);
END;
$function$;
CREATE TABLE test (x int, y int);
SELECT create_distributed_table('test','x');
INSERT INTO test VALUES (1,2);
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'test'::regclass;
count
-------
32
(1 row)
SELECT split_shards('test');
NOTICE: creating lower split for table test
NOTICE: creating upper split for table test
NOTICE: filling lower split of table test
NOTICE: filling upper split of table test
NOTICE: dropping old shards of table test
NOTICE: replacing old shards with split in table test
split_shards
--------------
(1 row)
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'test'::regclass;
count
-------
64
(1 row)
SELECT * FROM test;
x | y
---+---
1 | 2
(1 row)
@marcocitus
Copy link
Author

marcocitus commented Jun 16, 2017

The function currently renames indexes on the shards by using LIKE .. INCLUDING ALL, which prevents a subsequent DROP INDEX, but this is easy to fix by renaming the index on the coordinator.

$ psql -h coordinator

postgres=# \d test
    Table "public.test"
Column |  Type   | Modifiers 
--------+---------+-----------
x      | integer | 
y      | integer | 
Indexes:
   "x_idx" btree (x)

postgres=# DROP INDEX x_idx;
ERROR:  index "x_idx_106147" does not exist
CONTEXT:  while executing command on worker-1:5432

$ psql -h worker-1

postgres=# \d test_106147
 Table "public.test_106147"
Column |  Type   | Modifiers 
--------+---------+-----------
x      | integer | 
y      | integer | 
Indexes:
   "test_x_idx_106147" btree (x)

$ psql -h coordinator

postgres=# ALTER INDEX x_idx RENAME TO test_x_idx;
ALTER INDEX
postgres=# DROP INDEX test_x_idx;
DROP INDEX

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment