Functions to drain and remove a node in Citus Enterprise
CREATE OR REPLACE FUNCTION public.drain_node(remove_host text, remove_port int DEFAULT 5432, shard_transfer_mode citus.shard_transfer_mode DEFAULT 'auto'::citus.shard_transfer_mode) | |
RETURNS void | |
LANGUAGE plpgsql | |
AS $function$ | |
DECLARE | |
coordinator_name text; | |
coordinator_port int; | |
candidate_count int; | |
candidate_hosts text[]; | |
candidate_ports int[]; | |
exec_result record; | |
BEGIN | |
SELECT | |
'localhost', setting::int INTO coordinator_name, coordinator_port | |
FROM | |
pg_settings | |
WHERE | |
name = 'port'; | |
SELECT | |
array_agg(nodename), array_agg(nodeport), count(*) INTO candidate_hosts, candidate_ports, candidate_count | |
FROM | |
pg_dist_node | |
WHERE | |
noderole = 'primary' AND NOT (nodename = remove_host AND nodeport = remove_port); | |
FOR exec_result IN | |
WITH shard_moves AS ( | |
SELECT | |
shardid, candidate_hosts[shardid % candidate_count + 1] target_host, candidate_ports[shardid % candidate_count + 1] target_port | |
FROM | |
pg_dist_shard_placement JOIN pg_dist_shard USING (shardid) JOIN pg_dist_partition USING (logicalrelid) | |
WHERE | |
nodename = remove_host AND nodeport = remove_port AND partmethod != 'n' | |
), | |
commands AS ( | |
SELECT | |
array_agg(coordinator_name) run_on_host, | |
array_agg(coordinator_port) run_on_port, | |
array_agg(format('SELECT master_move_shard_placement(%s, %L, %s, %L, %s, %L)', | |
shardid, remove_host, remove_port, target_host, target_port, shard_transfer_mode)) command | |
FROM | |
shard_moves | |
) | |
SELECT | |
success, result | |
FROM | |
commands, master_run_on_worker(run_on_host, run_on_port, command, false) | |
LOOP | |
IF NOT exec_result.success THEN | |
RAISE '%s', exec_result.result; | |
END IF; | |
END LOOP; | |
END; | |
$function$; | |
CREATE OR REPLACE FUNCTION public.drain_and_remove_node(remove_host text, remove_port int DEFAULT 5432, shard_transfer_mode citus.shard_transfer_mode DEFAULT 'auto'::citus.shard_transfer_mode) | |
RETURNS void | |
LANGUAGE plpgsql | |
AS $function$ | |
BEGIN | |
PERFORM drain_node(remove_host, remove_port, shard_transfer_mode); | |
PERFORM master_remove_node(remove_host, remove_port); | |
END; | |
$function$; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment