Last active
September 20, 2016 20:55
-
-
Save marcocitus/0af4455bd1bc1224ce0476560235b49e to your computer and use it in GitHub Desktop.
Function to run a SQL command across all workers, shards, or placements on Citus
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE OR REPLACE FUNCTION citus_shard_name(table_name regclass, shard_id bigint) | |
RETURNS text | |
LANGUAGE sql | |
AS $function$ | |
SELECT table_name||'_'||shard_id; | |
$function$; | |
CREATE OR REPLACE FUNCTION citus_shard_name(shard_id bigint) | |
RETURNS text | |
LANGUAGE sql | |
AS $function$ | |
SELECT citus_shard_name(logicalrelid, shardid) | |
FROM pg_dist_shard | |
WHERE shardid = shard_id; | |
$function$; | |
CREATE OR REPLACE VIEW citus_shards AS | |
SELECT logicalrelid::regclass AS tablename, | |
shardid AS shardid, | |
citus_shard_name( | |
logicalrelid, | |
shardid) AS shardname, | |
array_agg(nodename) AS nodenames, | |
array_agg(nodeport) AS nodeports, | |
array_agg(format('%s/%s/%s', | |
nodename, | |
nodeport, | |
shardid)) AS connname, | |
array_agg(format('host=%s port=%s dbname=%s', | |
nodename, | |
nodeport, | |
current_database())) AS connstring | |
FROM pg_dist_shard LEFT JOIN pg_dist_shard_placement USING (shardid) | |
WHERE shardstate = 1 | |
GROUP BY logicalrelid, shardid | |
ORDER BY logicalrelid, shardid; | |
CREATE OR REPLACE VIEW citus_placements AS | |
SELECT logicalrelid::regclass AS tablename, | |
shardid AS shardid, | |
citus_shard_name( | |
logicalrelid, | |
shardid) AS shardname, | |
nodename AS nodename, | |
nodeport::int AS nodeport, | |
format('%s/%s/%s', | |
nodename, | |
nodeport, | |
shardid) AS connname, | |
format('host=%s port=%s dbname=%s', | |
nodename, | |
nodeport, | |
current_database()) AS connstring | |
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) | |
WHERE shardstate = 1 | |
ORDER BY logicalrelid, shardid, nodename, nodeport; | |
CREATE OR REPLACE VIEW citus_workers AS | |
SELECT node_name AS nodename, | |
node_port::int AS nodeport, | |
format('%s/%s', | |
node_name, | |
node_port) AS connname, | |
format('host=%s port=%s dbname=%s', | |
node_name, | |
node_port, | |
current_database()) AS connstring | |
FROM master_get_active_worker_nodes() | |
ORDER BY node_name, node_port; | |
CREATE OR REPLACE FUNCTION citus_run_on_all_workers( | |
command text, | |
parallel bool default true, | |
OUT nodename text, | |
OUT nodeport int, | |
OUT success bool, | |
OUT result text) | |
RETURNS SETOF record | |
LANGUAGE plpgsql | |
AS $function$ | |
DECLARE | |
worker citus_workers%rowtype; | |
BEGIN | |
PERFORM dblink_connect(connname, connstring) | |
FROM citus_workers w LEFT JOIN unnest(dblink_get_connections()) conn ON (w.connname = conn) | |
WHERE conn IS NULL; | |
IF parallel THEN | |
PERFORM dblink_send_query(connname, command) | |
FROM citus_workers; | |
END IF; | |
FOR worker IN | |
SELECT * FROM citus_workers | |
LOOP | |
IF NOT parallel THEN | |
PERFORM dblink_send_query(worker.connname, command); | |
END IF; | |
LOOP | |
BEGIN | |
RETURN QUERY | |
SELECT worker.nodename, worker.nodeport, true, res | |
FROM dblink_get_result(worker.connname) AS r(res text); | |
EXIT WHEN NOT FOUND; | |
EXCEPTION WHEN others THEN | |
RETURN QUERY SELECT worker.nodename, worker.nodeport, false, SQLERRM; | |
END; | |
END LOOP; | |
PERFORM FROM dblink_get_result(worker.connname, false) AS r(res text); | |
END LOOP; | |
END; | |
$function$; | |
CREATE OR REPLACE FUNCTION citus_run_on_all_placements( | |
table_name regclass, | |
command text, | |
parallel bool default false, | |
OUT nodename text, | |
OUT nodeport int, | |
OUT shardid bigint, | |
OUT success bool, | |
OUT result text) | |
RETURNS SETOF record | |
LANGUAGE plpgsql | |
AS $function$ | |
DECLARE | |
placement citus_placements%rowtype; | |
BEGIN | |
PERFORM dblink_connect(connname, connstring) | |
FROM citus_placements p LEFT JOIN unnest(dblink_get_connections()) conn ON (p.connname = conn) | |
WHERE tablename = table_name AND conn IS NULL; | |
IF parallel THEN | |
PERFORM dblink_send_query(connname, format(command, shardname, shardname, shardname)) | |
FROM citus_placements WHERE tablename = table_name; | |
END IF; | |
FOR placement IN | |
SELECT * FROM citus_placements WHERE tablename = table_name | |
LOOP | |
IF NOT parallel THEN | |
PERFORM dblink_send_query(placement.connname, format(command, | |
placement.shardname, | |
placement.shardname, | |
placement.shardname)); | |
END IF; | |
LOOP | |
BEGIN | |
RETURN QUERY | |
SELECT placement.nodename, placement.nodeport, placement.shardid, true, res | |
FROM dblink_get_result(placement.connname) AS r(res text); | |
EXIT WHEN NOT FOUND; | |
EXCEPTION WHEN others THEN | |
RETURN QUERY | |
SELECT placement.nodename, placement.nodeport, placement.shardid, false, SQLERRM; | |
END; | |
END LOOP; | |
PERFORM FROM dblink_get_result(placement.connname, false) AS r(res text); | |
END LOOP; | |
END; | |
$function$; | |
CREATE OR REPLACE FUNCTION citus_run_on_shards( | |
table_name regclass, | |
command text, | |
parallel bool default true, | |
OUT shardid bigint, | |
OUT success bool, | |
OUT result text) | |
RETURNS SETOF record | |
LANGUAGE plpgsql | |
AS $function$ | |
DECLARE | |
shard citus_shards%rowtype; | |
BEGIN | |
PERFORM dblink_connect(connname[1], connstring[1]) | |
FROM citus_shards s LEFT JOIN unnest(dblink_get_connections()) conn ON (s.connname[1] = conn) | |
WHERE tablename = table_name AND conn IS NULL; | |
IF parallel THEN | |
PERFORM dblink_send_query(connname[1], format(command, shardname, shardname, shardname)) | |
FROM citus_shards WHERE tablename = table_name; | |
END IF; | |
FOR shard IN | |
SELECT * FROM citus_shards WHERE tablename = table_name | |
LOOP | |
IF NOT parallel THEN | |
PERFORM dblink_send_query(shard.connname[1], format(command, | |
shard.shardname, | |
shard.shardname, | |
shard.shardname)); | |
END IF; | |
LOOP | |
BEGIN | |
RETURN QUERY | |
SELECT shard.shardid, true, res | |
FROM dblink_get_result(shard.connname[1], false) AS r(res text); | |
EXIT WHEN NOT FOUND; | |
EXCEPTION WHEN others THEN | |
RETURN QUERY | |
SELECT shard.shardid, false, SQLERRM; | |
END; | |
END LOOP; | |
PERFORM FROM dblink_get_result(shard.connname[1], false) AS r(res text); | |
END LOOP; | |
END; | |
$function$; | |
CREATE OR REPLACE FUNCTION citus_close_dblink() | |
RETURNS void | |
LANGUAGE plpgsql | |
AS $function$ | |
BEGIN | |
PERFORM dblink_disconnect(connname) | |
FROM citus_placements p JOIN unnest(dblink_get_connections()) conn ON (p.connname = conn); | |
PERFORM dblink_disconnect(connname) | |
FROM citus_workers w JOIN unnest(dblink_get_connections()) conn ON (w.connname = conn); | |
END; | |
$function$; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment