Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Create co-located shards for range-distributed tables
CREATE OR REPLACE FUNCTION create_colocated_range_shards(table_name regclass, min_value text, max_value text)
RETURNS void
LANGUAGE plpgsql
AS $function$
DECLARE
table_colocationid int;
group_ids int[];
new_shard_id bigint;
current_table text;
node record;
BEGIN
SELECT colocationid INTO table_colocationid
FROM pg_dist_partition
WHERE logicalrelid = table_name;
SELECT array_agg(groupid) INTO group_ids
FROM pg_dist_node
WHERE isactive
ORDER BY random() LIMIT current_setting('citus.shard_replication_factor')::bigint;
FOR current_table IN
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = table_colocationid
LOOP
SELECT master_get_new_shardid() INTO new_shard_id;
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue)
VALUES (current_table, new_shard_id, 't', min_value, max_value);
FOR node IN
SELECT nodename, nodeport FROM pg_dist_node WHERE groupid = ANY (group_ids)
LOOP
PERFORM master_run_on_worker(
array_agg(nodename),
array_agg(nodeport),
array_agg(format($$SELECT worker_apply_shard_ddl_command(%s,%L)$$, new_shard_id, command)),
false)
FROM (
SELECT node.nodename, node.nodeport, command
FROM master_get_table_ddl_events(current_table) command
) commands;
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename , nodeport)
VALUES (new_shard_id, 1, 0, node.nodename, node.nodeport);
END LOOP;
END LOOP;
END;
$function$;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.