Skip to content

Instantly share code, notes, and snippets.

@marcocitus marcocitus/split.sql
Last active Jun 19, 2017

Embed
What would you like to do?
Split shards in Citus (experimental)
CREATE OR REPLACE FUNCTION run_command_on_master_and_workers(p_sql text)
RETURNS void LANGUAGE plpgsql AS $function$
BEGIN
EXECUTE p_sql;
PERFORM run_command_on_workers(p_sql);
END;
$function$;
SELECT run_command_on_master_and_workers($cmd$
CREATE SCHEMA IF NOT EXISTS citus_split_lower
$cmd$);
SELECT run_command_on_master_and_workers($cmd$
CREATE SCHEMA IF NOT EXISTS citus_split_upper
$cmd$);
SELECT run_command_on_master_and_workers($cmd$
CREATE TYPE shard_range AS (
shard_name text,
shardminvalue bigint,
shardmaxvalue bigint
);
$cmd$);
SELECT run_command_on_workers($cmd$
CREATE OR REPLACE FUNCTION worker_split_shard(
old_shard_name regclass,
new_shard_name regclass,
partition_column text,
shards shard_range[],
upper bool)
RETURNS void
LANGUAGE plpgsql
AS $function$
DECLARE
min_value bigint;
max_value bigint;
BEGIN
IF upper THEN
SELECT shardminvalue + (shardmaxvalue - shardminvalue) / 2 + 1, shardmaxvalue INTO min_value, max_value
FROM unnest(shards) WHERE shard_name = old_shard_name::text;
ELSE
SELECT shardminvalue, shardminvalue + (shardmaxvalue - shardminvalue) / 2 INTO min_value, max_value
FROM unnest(shards) WHERE shard_name = old_shard_name::text;
END IF;
EXECUTE format($$INSERT INTO %s SELECT * FROM %I WHERE worker_hash(%s) BETWEEN %L AND %L$$,
new_shard_name, old_shard_name, partition_column, min_value, max_value);
END;
$function$;
$cmd$);
-- create on coordinator
CREATE OR REPLACE FUNCTION split_shards(table_name regclass)
RETURNS void
LANGUAGE plpgsql
AS $function$
DECLARE
colocation_id int;
BEGIN
SELECT colocationid INTO colocation_id
FROM pg_dist_partition WHERE logicalrelid = table_name;
PERFORM block_writes_to_table(logicalrelid) FROM pg_dist_partition WHERE colocationid = colocation_id;
PERFORM split_shards_of_table(logicalrelid) FROM pg_dist_partition WHERE colocationid = colocation_id;
END;
$function$;
CREATE OR REPLACE FUNCTION block_writes_to_table(table_name regclass)
RETURNS void
LANGUAGE plpgsql
AS $function$
BEGIN
EXECUTE format($$LOCK TABLE %s IN SHARE ROW EXCLUSIVE MODE$$, table_name);
END;
$function$;
CREATE OR REPLACE FUNCTION split_shards_of_table(table_name regclass)
RETURNS void
LANGUAGE plpgsql
AS $function$
DECLARE
schema_name text := 'public';
lower_split text;
upper_split text;
partition_column text;
shards shard_range[];
command_succeeded bool;
error_message text;
BEGIN
PERFORM block_writes_to_table(table_name);
lower_split := format('citus_split_lower.%s', table_name);
upper_split := format('citus_split_upper.%s', table_name);
/* create identical distributed tables for the 2 splits */
EXECUTE format($$CREATE TABLE %s (LIKE %s INCLUDING ALL)$$, lower_split, table_name);
EXECUTE format($$CREATE TABLE %s (LIKE %s INCLUDING ALL)$$, upper_split, table_name);
SELECT column_to_column_name(logicalrelid, partkey) INTO partition_column
FROM pg_dist_partition WHERE logicalrelid = table_name;
RAISE NOTICE 'creating lower split for table %', table_name;
PERFORM create_distributed_table(lower_split, partition_column, colocate_with := table_name::text);
RAISE NOTICE 'creating upper split for table %', table_name;
PERFORM create_distributed_table(upper_split, partition_column, colocate_with := table_name::text);
SELECT array_agg((shard_name(logicalrelid, shardid),shardminvalue,shardmaxvalue)::shard_range) INTO shards
FROM pg_dist_shard s WHERE logicalrelid = table_name;
/* update shard ranges of lower and upper split */
RAISE NOTICE 'filling lower split of table %', table_name;
SELECT min(result), count(*) = 0 INTO error_message, command_succeeded
FROM run_command_on_colocated_placements(table_name, lower_split, format($$
SELECT worker_split_shard(%%L, 'citus_split_lower.'||%%L, %L, %L::shard_range[], false)
$$, partition_column, shards::text))
WHERE success = 'f';
IF NOT command_succeeded THEN
RAISE 'splitting shards failed: %', error_message;
END IF;
RAISE NOTICE 'filling upper split of table %', table_name;
SELECT min(result), count(*) = 0 INTO error_message, command_succeeded
FROM run_command_on_colocated_placements(table_name, upper_split, format($$
SELECT worker_split_shard(%%L, 'citus_split_upper.'||%%L, %L, %L::shard_range[], true)
$$, partition_column, shards::text))
WHERE success = 'f';
IF NOT command_succeeded THEN
RAISE 'splitting shards failed: %', error_message;
END IF;
RAISE NOTICE 'dropping old shards of table %', table_name;
PERFORM master_drop_all_shards(table_name,'','');
RAISE NOTICE 'replacing old shards with split in table %', table_name;
SELECT min(result), count(*) = 0 INTO error_message, command_succeeded
FROM run_command_on_placements(lower_split, format($$ALTER TABLE citus_split_lower.%%I SET SCHEMA %I$$, schema_name))
WHERE success = 'f';
IF NOT command_succeeded THEN
RAISE 'replacing shards failed: %', error_message;
END IF;
SELECT min(result), count(*) = 0 INTO error_message, command_succeeded
FROM run_command_on_placements(upper_split, format($$ALTER TABLE citus_split_upper.%%I SET SCHEMA %I$$, schema_name))
WHERE success = 'f';
IF NOT command_succeeded THEN
RAISE 'replacing shards failed: %', error_message;
END IF;
UPDATE pg_dist_shard
SET shardmaxvalue = shardminvalue::bigint + (shardmaxvalue::bigint - shardminvalue::bigint) / 2
WHERE logicalrelid = lower_split::regclass AND shardmaxvalue::bigint > shardminvalue::bigint;
UPDATE pg_dist_shard
SET shardminvalue = shardminvalue::bigint + (shardmaxvalue::bigint - shardminvalue::bigint) / 2 + 1
WHERE logicalrelid = upper_split::regclass AND shardmaxvalue::bigint > shardminvalue::bigint;
UPDATE pg_dist_shard
SET logicalrelid = table_name
WHERE logicalrelid = lower_split::regclass OR logicalrelid = upper_split::regclass;
EXECUTE format($$DROP TABLE %s$$, upper_split);
EXECUTE format($$DROP TABLE %s$$, lower_split);
END;
$function$;
CREATE TABLE test (x int, y int);
SELECT create_distributed_table('test','x');
INSERT INTO test VALUES (1,2);
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'test'::regclass;
count
-------
32
(1 row)
SELECT split_shards('test');
NOTICE: creating lower split for table test
NOTICE: creating upper split for table test
NOTICE: filling lower split of table test
NOTICE: filling upper split of table test
NOTICE: dropping old shards of table test
NOTICE: replacing old shards with split in table test
split_shards
--------------
(1 row)
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'test'::regclass;
count
-------
64
(1 row)
SELECT * FROM test;
x | y
---+---
1 | 2
(1 row)
@marcocitus

This comment has been minimized.

Copy link
Owner Author

marcocitus commented Jun 16, 2017

The function currently renames indexes on the shards by using LIKE .. INCLUDING ALL, which prevents a subsequent DROP INDEX, but this is easy to fix by renaming the index on the coordinator.

$ psql -h coordinator

postgres=# \d test
    Table "public.test"
Column |  Type   | Modifiers 
--------+---------+-----------
x      | integer | 
y      | integer | 
Indexes:
   "x_idx" btree (x)

postgres=# DROP INDEX x_idx;
ERROR:  index "x_idx_106147" does not exist
CONTEXT:  while executing command on worker-1:5432

$ psql -h worker-1

postgres=# \d test_106147
 Table "public.test_106147"
Column |  Type   | Modifiers 
--------+---------+-----------
x      | integer | 
y      | integer | 
Indexes:
   "test_x_idx_106147" btree (x)

$ psql -h coordinator

postgres=# ALTER INDEX x_idx RENAME TO test_x_idx;
ALTER INDEX
postgres=# DROP INDEX test_x_idx;
DROP INDEX
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.