Skip to content

Instantly share code, notes, and snippets.

@onderkalaci
Created September 9, 2016 13:49
Show Gist options
  • Save onderkalaci/813e504fad8fb9708de77306d08ef4e8 to your computer and use it in GitHub Desktop.
Save onderkalaci/813e504fad8fb9708de77306d08ef4e8 to your computer and use it in GitHub Desktop.
CREATE OR REPLACE FUNCTION colocated_placement_count(source_relation_name regclass, dest_relation_name regclass)
RETURNS INT
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = pg_catalog
AS $colocated_placement_count$
DECLARE
shard_placement_count_val INT := 0;
BEGIN
PERFORM
*
FROM
pg_dist_shard s1, pg_dist_shard s2, pg_dist_shard_placement p1, pg_dist_shard_placement p2
WHERE
-- for the given tables
s1.logicalrelid = source_relation_name AND
s2.logicalrelid = dest_relation_name AND
-- associate shards and their corresponding placements
s1.shardid = p1.shardid AND
s2.shardid = p2.shardid AND
-- both shards have the same min/max values
s1.shardminvalue = s2.shardminvalue AND
s1.shardmaxvalue = s2.shardmaxvalue AND
-- both placements are on the same node
p1.nodename = p2.nodename AND
p1.nodeport = p2.nodeport
-- the following is commented out, but, in case placements are required uncomment
--ORDER BY s1.shardminvalue::int, s1.shardmaxvalue::int, p1.nodename , p1.nodeport DESC;
GET DIAGNOSTICS shard_placement_count_val = ROW_COUNT;
RETURN shard_placement_count_val;
END;
$colocated_placement_count$;
CREATE OR REPLACE FUNCTION shard_placement_count(relation_name regclass)
RETURNS INT
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = pg_catalog
AS $shard_placement_count$
DECLARE
shard_placement_count_val INT := 0;
BEGIN
SELECT
count(*)
INTO
shard_placement_count_val
FROM
pg_dist_shard, pg_dist_shard_placement
WHERE
pg_dist_shard.logicalrelid = relation_name::oid AND
pg_dist_shard.shardid = pg_dist_shard_placement.shardid;
RETURN shard_placement_count_val;
END;
$shard_placement_count$;
CREATE OR REPLACE FUNCTION hash_partitioned_colocated_tables()
RETURNS text[]
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = pg_catalog
AS $hash_partitioned_non_colocated_tables$
DECLARE
table_list text[] = '{}';
invalidColocationId INT := 0;
BEGIN
SELECT
array_agg(logicalrelid::regclass)
INTO
table_list
FROM
pg_dist_partition
WHERE
partmethod = 'h' AND
colocationId <> invalidColocationId;
RETURN table_list;
END;
$hash_partitioned_non_colocated_tables$;
CREATE OR REPLACE FUNCTION table_colocation_id(tablename regclass)
RETURNS INT
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = pg_catalog
AS $table_colocation_id$
DECLARE
table_list text[] = '{}';
colocationIdValue INT := 0;
BEGIN
SELECT
colocationId
INTO
colocationIdValue
FROM
pg_dist_partition
WHERE
logicalrelid = tablename;
RETURN colocationIdValue;
END;
$table_colocation_id$;
CREATE OR REPLACE FUNCTION hash_partitioned_non_colocated_tables()
RETURNS text[]
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = pg_catalog
AS $hash_partitioned_non_colocated_tables$
DECLARE
table_list text[] = '{}';
invalidColocationId INT := 0;
BEGIN
SELECT
array_agg(logicalrelid::regclass)
INTO
table_list
FROM
pg_dist_partition
WHERE
partmethod = 'h' AND
colocationId = invalidColocationId;
RETURN table_list;
END;
$hash_partitioned_non_colocated_tables$;
CREATE OR REPLACE FUNCTION tables_have_colocated_placements(source_relation_name regclass, dest_relation_name regclass)
RETURNS bool
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = pg_catalog
AS $tables_have_colocated_placements$
DECLARE
source_shard_placement_count INT := 0;
colocated_plecement_count_val INT := 0;
tables_colocated bool := false;
BEGIN
SELECT public.shard_placement_count(source_relation_name)
INTO source_shard_placement_count;
SELECT public.colocated_placement_count(source_relation_name, dest_relation_name)
INTO colocated_plecement_count_val;
IF colocated_plecement_count_val = source_shard_placement_count THEN
tables_colocated = true;
END IF;
RETURN tables_colocated;
END;
$tables_have_colocated_placements$;
CREATE OR REPLACE FUNCTION update_hash_colocated_ids()
RETURNS text[]
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = pg_catalog
AS $update_hash_colocated_ids$
DECLARE
tables_updated text[] := '{}';
hash_table_oids text[] := '{}';
hash_table text := '';
nextColocationId BIGINT;
colocated_hash_table_oids text[] := '{}';
colocated_hash_table text := '';
current_colocation_id INT := 0;
invalidColocationId INT := 0;
colocated_table_found bool := false;
hash_partmethod char := 'h';
BEGIN
SELECT
public.hash_partitioned_non_colocated_tables()
INTO
hash_table_oids;
-- all hash partitioned tables already have colocation id
IF hash_table_oids is null THEN
RETURN hash_table_oids;
END IF;
FOREACH hash_table IN ARRAY hash_table_oids
LOOP
colocated_table_found = false;
SELECT public.hash_partitioned_colocated_tables() INTO colocated_hash_table_oids;
IF colocated_hash_table_oids is not null THEN
FOREACH colocated_hash_table IN ARRAY colocated_hash_table_oids
LOOP
IF public.tables_have_colocated_placements(hash_table::regclass, colocated_hash_table::regclass) THEN
SELECT public.table_colocation_id(colocated_hash_table::regclass) INTO current_colocation_id;
EXECUTE 'UPDATE pg_dist_partition SET colocationId =' || current_colocation_id || ' WHERE logicalrelid =' || hash_table::regclass::oid;
colocated_table_found = true;
EXIT;
END IF;
END LOOP;
END IF;
-- if comes here, get new id
IF colocated_table_found is not true THEN
SELECT nextval('pg_dist_collocation_id') INTO nextColocationId;
EXECUTE 'UPDATE pg_dist_partition SET colocationId =' || nextColocationId || ' WHERE logicalrelid =' || hash_table::regclass::oid;
END IF;
END LOOP;
RETURN tables_updated;
END;
$update_hash_colocated_ids$;
CREATE OR REPLACE FUNCTION update_non_hash_colocated_ids()
RETURNS JSON
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = pg_catalog
AS $update_non_hash_colocated_ids$
DECLARE
invalidColocationId BIGINT := 0;
non_hash_table_oids text[] := '{}';
non_hash_table text := '';
resultJson json;
BEGIN
SELECT
array_agg(logicalrelid::regclass)
INTO
non_hash_table_oids
FROM
pg_dist_partition
WHERE
partmethod != 'h' AND
colocationId = 0;
IF non_hash_table_oids is null THEN
non_hash_table_oids = '{}';
ELSE
FOREACH non_hash_table IN ARRAY non_hash_table_oids
LOOP
EXECUTE 'UPDATE pg_dist_partition SET colocationId =' || invalidColocationId || ' WHERE partmethod <>''h'';';
END LOOP;
END IF;
WITH data(colocationId, tables) AS ( VALUES (invalidColocationId, non_hash_table_oids))
SELECT row_to_json(data) INTO resultJson FROM data;
RETURN resultJson;
END;
$update_non_hash_colocated_ids$;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment