Skip to content

Instantly share code, notes, and snippets.

@marcocitus
Last active September 20, 2016 20:55
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save marcocitus/0af4455bd1bc1224ce0476560235b49e to your computer and use it in GitHub Desktop.
Save marcocitus/0af4455bd1bc1224ce0476560235b49e to your computer and use it in GitHub Desktop.
Function to run a SQL command across all workers, shards, or placements on Citus
CREATE OR REPLACE FUNCTION citus_shard_name(table_name regclass, shard_id bigint)
RETURNS text
LANGUAGE sql
AS $function$
SELECT table_name||'_'||shard_id;
$function$;
CREATE OR REPLACE FUNCTION citus_shard_name(shard_id bigint)
RETURNS text
LANGUAGE sql
AS $function$
SELECT citus_shard_name(logicalrelid, shardid)
FROM pg_dist_shard
WHERE shardid = shard_id;
$function$;
CREATE OR REPLACE VIEW citus_shards AS
SELECT logicalrelid::regclass AS tablename,
shardid AS shardid,
citus_shard_name(
logicalrelid,
shardid) AS shardname,
array_agg(nodename) AS nodenames,
array_agg(nodeport) AS nodeports,
array_agg(format('%s/%s/%s',
nodename,
nodeport,
shardid)) AS connname,
array_agg(format('host=%s port=%s dbname=%s',
nodename,
nodeport,
current_database())) AS connstring
FROM pg_dist_shard LEFT JOIN pg_dist_shard_placement USING (shardid)
WHERE shardstate = 1
GROUP BY logicalrelid, shardid
ORDER BY logicalrelid, shardid;
CREATE OR REPLACE VIEW citus_placements AS
SELECT logicalrelid::regclass AS tablename,
shardid AS shardid,
citus_shard_name(
logicalrelid,
shardid) AS shardname,
nodename AS nodename,
nodeport::int AS nodeport,
format('%s/%s/%s',
nodename,
nodeport,
shardid) AS connname,
format('host=%s port=%s dbname=%s',
nodename,
nodeport,
current_database()) AS connstring
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
WHERE shardstate = 1
ORDER BY logicalrelid, shardid, nodename, nodeport;
CREATE OR REPLACE VIEW citus_workers AS
SELECT node_name AS nodename,
node_port::int AS nodeport,
format('%s/%s',
node_name,
node_port) AS connname,
format('host=%s port=%s dbname=%s',
node_name,
node_port,
current_database()) AS connstring
FROM master_get_active_worker_nodes()
ORDER BY node_name, node_port;
CREATE OR REPLACE FUNCTION citus_run_on_all_workers(
command text,
parallel bool default true,
OUT nodename text,
OUT nodeport int,
OUT success bool,
OUT result text)
RETURNS SETOF record
LANGUAGE plpgsql
AS $function$
DECLARE
worker citus_workers%rowtype;
BEGIN
PERFORM dblink_connect(connname, connstring)
FROM citus_workers w LEFT JOIN unnest(dblink_get_connections()) conn ON (w.connname = conn)
WHERE conn IS NULL;
IF parallel THEN
PERFORM dblink_send_query(connname, command)
FROM citus_workers;
END IF;
FOR worker IN
SELECT * FROM citus_workers
LOOP
IF NOT parallel THEN
PERFORM dblink_send_query(worker.connname, command);
END IF;
LOOP
BEGIN
RETURN QUERY
SELECT worker.nodename, worker.nodeport, true, res
FROM dblink_get_result(worker.connname) AS r(res text);
EXIT WHEN NOT FOUND;
EXCEPTION WHEN others THEN
RETURN QUERY SELECT worker.nodename, worker.nodeport, false, SQLERRM;
END;
END LOOP;
PERFORM FROM dblink_get_result(worker.connname, false) AS r(res text);
END LOOP;
END;
$function$;
CREATE OR REPLACE FUNCTION citus_run_on_all_placements(
table_name regclass,
command text,
parallel bool default false,
OUT nodename text,
OUT nodeport int,
OUT shardid bigint,
OUT success bool,
OUT result text)
RETURNS SETOF record
LANGUAGE plpgsql
AS $function$
DECLARE
placement citus_placements%rowtype;
BEGIN
PERFORM dblink_connect(connname, connstring)
FROM citus_placements p LEFT JOIN unnest(dblink_get_connections()) conn ON (p.connname = conn)
WHERE tablename = table_name AND conn IS NULL;
IF parallel THEN
PERFORM dblink_send_query(connname, format(command, shardname, shardname, shardname))
FROM citus_placements WHERE tablename = table_name;
END IF;
FOR placement IN
SELECT * FROM citus_placements WHERE tablename = table_name
LOOP
IF NOT parallel THEN
PERFORM dblink_send_query(placement.connname, format(command,
placement.shardname,
placement.shardname,
placement.shardname));
END IF;
LOOP
BEGIN
RETURN QUERY
SELECT placement.nodename, placement.nodeport, placement.shardid, true, res
FROM dblink_get_result(placement.connname) AS r(res text);
EXIT WHEN NOT FOUND;
EXCEPTION WHEN others THEN
RETURN QUERY
SELECT placement.nodename, placement.nodeport, placement.shardid, false, SQLERRM;
END;
END LOOP;
PERFORM FROM dblink_get_result(placement.connname, false) AS r(res text);
END LOOP;
END;
$function$;
CREATE OR REPLACE FUNCTION citus_run_on_shards(
table_name regclass,
command text,
parallel bool default true,
OUT shardid bigint,
OUT success bool,
OUT result text)
RETURNS SETOF record
LANGUAGE plpgsql
AS $function$
DECLARE
shard citus_shards%rowtype;
BEGIN
PERFORM dblink_connect(connname[1], connstring[1])
FROM citus_shards s LEFT JOIN unnest(dblink_get_connections()) conn ON (s.connname[1] = conn)
WHERE tablename = table_name AND conn IS NULL;
IF parallel THEN
PERFORM dblink_send_query(connname[1], format(command, shardname, shardname, shardname))
FROM citus_shards WHERE tablename = table_name;
END IF;
FOR shard IN
SELECT * FROM citus_shards WHERE tablename = table_name
LOOP
IF NOT parallel THEN
PERFORM dblink_send_query(shard.connname[1], format(command,
shard.shardname,
shard.shardname,
shard.shardname));
END IF;
LOOP
BEGIN
RETURN QUERY
SELECT shard.shardid, true, res
FROM dblink_get_result(shard.connname[1], false) AS r(res text);
EXIT WHEN NOT FOUND;
EXCEPTION WHEN others THEN
RETURN QUERY
SELECT shard.shardid, false, SQLERRM;
END;
END LOOP;
PERFORM FROM dblink_get_result(shard.connname[1], false) AS r(res text);
END LOOP;
END;
$function$;
CREATE OR REPLACE FUNCTION citus_close_dblink()
RETURNS void
LANGUAGE plpgsql
AS $function$
BEGIN
PERFORM dblink_disconnect(connname)
FROM citus_placements p JOIN unnest(dblink_get_connections()) conn ON (p.connname = conn);
PERFORM dblink_disconnect(connname)
FROM citus_workers w JOIN unnest(dblink_get_connections()) conn ON (w.connname = conn);
END;
$function$;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment