Skip to content

Instantly share code, notes, and snippets.

@ehmo
Created December 27, 2017 21:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ehmo/fc4f76011ad1504ea5f95b698e1016d5 to your computer and use it in GitHub Desktop.
Save ehmo/fc4f76011ad1504ea5f95b698e1016d5 to your computer and use it in GitHub Desktop.
CREATE OR REPLACE FUNCTION citus_run_on_3_colocated_placements(table_name1 regclass,
table_name2 regclass,
table_name3 regclass,
command text,
parallel bool default true,
OUT nodename text,
OUT nodeport int,
OUT shardid1 bigint,
OUT shardid2 bigint,
OUT shardid3 bigint,
OUT success bool,
OUT result text)
RETURNS SETOF record
LANGUAGE plpgsql
AS $function$
DECLARE
workers text[];
ports int[];
shards1 bigint[];
shards2 bigint[];
shards3 bigint[];
commands text[];
BEGIN
IF NOT (SELECT citus_tables_colocated(table_name1, table_name2)) THEN
RAISE EXCEPTION 'tables % and % are not co-located', table_name1, table_name2;
END IF;
IF NOT (SELECT citus_tables_colocated(table_name2, table_name2)) THEN
RAISE EXCEPTION 'tables % and % are not co-located', table_name2, table_name3;
END IF;
IF NOT (SELECT citus_tables_colocated(table_name1, table_name3)) THEN
RAISE EXCEPTION 'tables % and % are not co-located', table_name1, table_name3;
END IF;
WITH active_shard_placements AS (
SELECT
ds.logicalrelid,
ds.shardid AS shardid,
shard_name(ds.logicalrelid, ds.shardid) AS shardname,
ds.shardminvalue AS shardminvalue,
ds.shardmaxvalue AS shardmaxvalue,
dsp.nodename AS nodename,
dsp.nodeport::int AS nodeport
FROM pg_dist_shard ds JOIN pg_dist_shard_placement dsp USING (shardid)
WHERE dsp.shardstate = 1 and (ds.logicalrelid::regclass = table_name1 or
ds.logicalrelid::regclass = table_name2 or
ds.logicalrelid::regclass = table_name3)
ORDER BY ds.logicalrelid, ds.shardid, dsp.nodename, dsp.nodeport),
citus_colocated_placements AS (
SELECT
a.logicalrelid::regclass AS tablename1,
a.shardid AS shardid1,
shard_name(a.logicalrelid, a.shardid) AS shardname1,
b.logicalrelid::regclass AS tablename2,
b.shardid AS shardid2,
shard_name(b.logicalrelid, b.shardid) AS shardname2,
c.logicalrelid::regclass AS tablename3,
c.shardid AS shardid3,
shard_name(c.logicalrelid, c.shardid) AS shardname3,
a.nodename AS nodename,
a.nodeport::int AS nodeport
FROM
active_shard_placements a, active_shard_placements b, active_shard_placements c
WHERE
a.shardminvalue = b.shardminvalue AND
a.shardminvalue = c.shardminvalue AND
a.shardmaxvalue = b.shardmaxvalue AND
a.shardmaxvalue = c.shardmaxvalue AND
a.logicalrelid != b.logicalrelid AND
a.logicalrelid != c.logicalrelid AND
a.nodename = b.nodename AND
a.nodeport = b.nodeport AND
a.nodeport = c.nodeport AND
a.logicalrelid::regclass = table_name1 AND
b.logicalrelid::regclass = table_name2 AND
c.logicalrelid::regclass = table_name3
ORDER BY a.logicalrelid, a.shardid, nodename, nodeport)
SELECT
array_agg(cp.nodename), array_agg(cp.nodeport), array_agg(cp.shardid1),
array_agg(cp.shardid2), array_agg(cp.shardid3), array_agg(format(command, cp.shardname1, cp.shardname2, cp.shardname3))
INTO workers, ports, shards1, shards2, shards3, commands
FROM citus_colocated_placements cp;
RETURN QUERY SELECT r.node_name, r.node_port, shards1[ordinality],
shards2[ordinality], shards3[ordinality], r.success, r.result
FROM master_run_on_worker(workers, ports, commands, parallel) WITH ORDINALITY r;
END;
$function$;
CREATE OR REPLACE FUNCTION pg_catalog.run_command_on_3_colocated_placements(
table_name1 regclass,
table_name2 regclass,
table_name3
regclass,
command text,
parallel bool default true,
OUT nodename text,
OUT nodeport int,
OUT shardid1 bigint,
OUT shardid2 bigint,
OUT shardid3 bigint,
OUT success bool,
OUT result text)
RETURNS SETOF record
LANGUAGE plpgsql
AS $function$
DECLARE
workers text[];
ports int[];
shards1 bigint[];
shards2 bigint[];
shards3 bigint[];
commands text[];
BEGIN
IF NOT (SELECT distributed_tables_colocated(table_name1, table_name2)) THEN
RAISE EXCEPTION 'tables % and % are not co-located', table_name1, table_name2;
END IF;
IF NOT (SELECT distributed_tables_colocated(table_name2, table_name3)) THEN
RAISE EXCEPTION 'tables % and % are not co-located', table_name2, table_name3;
END IF;
IF NOT (SELECT distributed_tables_colocated(table_name1, table_name3)) THEN
RAISE EXCEPTION 'tables % and % are not co-located', table_name1, table_name3;
END IF;
WITH active_shard_placements AS (
SELECT
ds.logicalrelid,
ds.shardid AS shardid,
shard_name(ds.logicalrelid, ds.shardid) AS shardname,
ds.shardminvalue AS shardminvalue,
ds.shardmaxvalue AS shardmaxvalue,
dsp.nodename AS nodename,
dsp.nodeport::int AS nodeport
FROM pg_dist_shard ds JOIN pg_dist_shard_placement dsp USING (shardid)
WHERE dsp.shardstate = 1 and (ds.logicalrelid::regclass = table_name1 or
ds.logicalrelid::regclass = table_name2 or ds.logicalrelid::regclass = table_name3)
ORDER BY ds.logicalrelid, ds.shardid, dsp.nodename, dsp.nodeport),
citus_colocated_placements AS (
SELECT
a.logicalrelid::regclass AS tablename1,
a.shardid AS shardid1,
shard_name(a.logicalrelid, a.shardid) AS shardname1,
b.logicalrelid::regclass AS tablename2,
b.shardid AS shardid2,
shard_name(b.logicalrelid, b.shardid) AS shardname2,
c.logicalrelid::regclass AS tablename3,
c.shardid AS shardid3,
shard_name(c.logicalrelid, c.shardid) AS shardname3,
a.nodename AS nodename,
a.nodeport::int AS nodeport
FROM
active_shard_placements a, active_shard_placements b, active_shard_placements c
WHERE
a.shardminvalue = b.shardminvalue AND
a.shardminvalue = c.shardminvalue AND
a.shardmaxvalue = b.shardmaxvalue AND
a.shardmaxvalue = c.shardmaxvalue AND
a.logicalrelid != b.logicalrelid AND
a.logicalrelid != c.logicalrelid AND
a.nodename = b.nodename AND
a.nodeport = b.nodeport AND
a.nodeport = c.nodeport AND
a.logicalrelid::regclass = table_name1 AND
b.logicalrelid::regclass = table_name2 AND
c.logicalrelid::regclass = table_name3
ORDER BY a.logicalrelid, a.shardid, nodename, nodeport)
SELECT
array_agg(cp.nodename), array_agg(cp.nodeport), array_agg(cp.shardid1),
array_agg(cp.shardid2), array_agg(cp.shardid3), array_agg(format(command, cp.shardname1, cp.shardname2, cp.shardname3))
INTO workers, ports, shards1, shards2, shards3, commands
FROM citus_colocated_placements cp;
RETURN QUERY SELECT r.node_name, r.node_port, shards1[ordinality],
shards2[ordinality], shards3[ordinality], r.success, r.result
FROM master_run_on_worker(workers, ports, commands, parallel) WITH ORDINALITY r;
END;
$function$;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment