Created
September 9, 2016 13:49
-
-
Save onderkalaci/813e504fad8fb9708de77306d08ef4e8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE OR REPLACE FUNCTION colocated_placement_count(source_relation_name regclass, dest_relation_name regclass) | |
RETURNS INT | |
LANGUAGE plpgsql | |
SECURITY DEFINER | |
SET search_path = pg_catalog | |
AS $colocated_placement_count$ | |
DECLARE | |
shard_placement_count_val INT := 0; | |
BEGIN | |
PERFORM | |
* | |
FROM | |
pg_dist_shard s1, pg_dist_shard s2, pg_dist_shard_placement p1, pg_dist_shard_placement p2 | |
WHERE | |
-- for the given tables | |
s1.logicalrelid = source_relation_name AND | |
s2.logicalrelid = dest_relation_name AND | |
-- associate shards and their corresponding placements | |
s1.shardid = p1.shardid AND | |
s2.shardid = p2.shardid AND | |
-- both shards have the same min/max values | |
s1.shardminvalue = s2.shardminvalue AND | |
s1.shardmaxvalue = s2.shardmaxvalue AND | |
-- both placements are on the same node | |
p1.nodename = p2.nodename AND | |
p1.nodeport = p2.nodeport | |
-- the following is commented out, but, in case placements are required uncomment | |
--ORDER BY s1.shardminvalue::int, s1.shardmaxvalue::int, p1.nodename , p1.nodeport DESC; | |
GET DIAGNOSTICS shard_placement_count_val = ROW_COUNT; | |
RETURN shard_placement_count_val; | |
END; | |
$colocated_placement_count$; | |
CREATE OR REPLACE FUNCTION shard_placement_count(relation_name regclass) | |
RETURNS INT | |
LANGUAGE plpgsql | |
SECURITY DEFINER | |
SET search_path = pg_catalog | |
AS $shard_placement_count$ | |
DECLARE | |
shard_placement_count_val INT := 0; | |
BEGIN | |
SELECT | |
count(*) | |
INTO | |
shard_placement_count_val | |
FROM | |
pg_dist_shard, pg_dist_shard_placement | |
WHERE | |
pg_dist_shard.logicalrelid = relation_name::oid AND | |
pg_dist_shard.shardid = pg_dist_shard_placement.shardid; | |
RETURN shard_placement_count_val; | |
END; | |
$shard_placement_count$; | |
CREATE OR REPLACE FUNCTION hash_partitioned_colocated_tables() | |
RETURNS text[] | |
LANGUAGE plpgsql | |
SECURITY DEFINER | |
SET search_path = pg_catalog | |
AS $hash_partitioned_non_colocated_tables$ | |
DECLARE | |
table_list text[] = '{}'; | |
invalidColocationId INT := 0; | |
BEGIN | |
SELECT | |
array_agg(logicalrelid::regclass) | |
INTO | |
table_list | |
FROM | |
pg_dist_partition | |
WHERE | |
partmethod = 'h' AND | |
colocationId <> invalidColocationId; | |
RETURN table_list; | |
END; | |
$hash_partitioned_non_colocated_tables$; | |
CREATE OR REPLACE FUNCTION table_colocation_id(tablename regclass) | |
RETURNS INT | |
LANGUAGE plpgsql | |
SECURITY DEFINER | |
SET search_path = pg_catalog | |
AS $table_colocation_id$ | |
DECLARE | |
table_list text[] = '{}'; | |
colocationIdValue INT := 0; | |
BEGIN | |
SELECT | |
colocationId | |
INTO | |
colocationIdValue | |
FROM | |
pg_dist_partition | |
WHERE | |
logicalrelid = tablename; | |
RETURN colocationIdValue; | |
END; | |
$table_colocation_id$; | |
CREATE OR REPLACE FUNCTION hash_partitioned_non_colocated_tables() | |
RETURNS text[] | |
LANGUAGE plpgsql | |
SECURITY DEFINER | |
SET search_path = pg_catalog | |
AS $hash_partitioned_non_colocated_tables$ | |
DECLARE | |
table_list text[] = '{}'; | |
invalidColocationId INT := 0; | |
BEGIN | |
SELECT | |
array_agg(logicalrelid::regclass) | |
INTO | |
table_list | |
FROM | |
pg_dist_partition | |
WHERE | |
partmethod = 'h' AND | |
colocationId = invalidColocationId; | |
RETURN table_list; | |
END; | |
$hash_partitioned_non_colocated_tables$; | |
CREATE OR REPLACE FUNCTION tables_have_colocated_placements(source_relation_name regclass, dest_relation_name regclass) | |
RETURNS bool | |
LANGUAGE plpgsql | |
SECURITY DEFINER | |
SET search_path = pg_catalog | |
AS $tables_have_colocated_placements$ | |
DECLARE | |
source_shard_placement_count INT := 0; | |
colocated_plecement_count_val INT := 0; | |
tables_colocated bool := false; | |
BEGIN | |
SELECT public.shard_placement_count(source_relation_name) | |
INTO source_shard_placement_count; | |
SELECT public.colocated_placement_count(source_relation_name, dest_relation_name) | |
INTO colocated_plecement_count_val; | |
IF colocated_plecement_count_val = source_shard_placement_count THEN | |
tables_colocated = true; | |
END IF; | |
RETURN tables_colocated; | |
END; | |
$tables_have_colocated_placements$; | |
CREATE OR REPLACE FUNCTION update_hash_colocated_ids() | |
RETURNS text[] | |
LANGUAGE plpgsql | |
SECURITY DEFINER | |
SET search_path = pg_catalog | |
AS $update_hash_colocated_ids$ | |
DECLARE | |
tables_updated text[] := '{}'; | |
hash_table_oids text[] := '{}'; | |
hash_table text := ''; | |
nextColocationId BIGINT; | |
colocated_hash_table_oids text[] := '{}'; | |
colocated_hash_table text := ''; | |
current_colocation_id INT := 0; | |
invalidColocationId INT := 0; | |
colocated_table_found bool := false; | |
hash_partmethod char := 'h'; | |
BEGIN | |
SELECT | |
public.hash_partitioned_non_colocated_tables() | |
INTO | |
hash_table_oids; | |
-- all hash partitioned tables already have colocation id | |
IF hash_table_oids is null THEN | |
RETURN hash_table_oids; | |
END IF; | |
FOREACH hash_table IN ARRAY hash_table_oids | |
LOOP | |
colocated_table_found = false; | |
SELECT public.hash_partitioned_colocated_tables() INTO colocated_hash_table_oids; | |
IF colocated_hash_table_oids is not null THEN | |
FOREACH colocated_hash_table IN ARRAY colocated_hash_table_oids | |
LOOP | |
IF public.tables_have_colocated_placements(hash_table::regclass, colocated_hash_table::regclass) THEN | |
SELECT public.table_colocation_id(colocated_hash_table::regclass) INTO current_colocation_id; | |
EXECUTE 'UPDATE pg_dist_partition SET colocationId =' || current_colocation_id || ' WHERE logicalrelid =' || hash_table::regclass::oid; | |
colocated_table_found = true; | |
EXIT; | |
END IF; | |
END LOOP; | |
END IF; | |
-- if comes here, get new id | |
IF colocated_table_found is not true THEN | |
SELECT nextval('pg_dist_collocation_id') INTO nextColocationId; | |
EXECUTE 'UPDATE pg_dist_partition SET colocationId =' || nextColocationId || ' WHERE logicalrelid =' || hash_table::regclass::oid; | |
END IF; | |
END LOOP; | |
RETURN tables_updated; | |
END; | |
$update_hash_colocated_ids$; | |
CREATE OR REPLACE FUNCTION update_non_hash_colocated_ids() | |
RETURNS JSON | |
LANGUAGE plpgsql | |
SECURITY DEFINER | |
SET search_path = pg_catalog | |
AS $update_non_hash_colocated_ids$ | |
DECLARE | |
invalidColocationId BIGINT := 0; | |
non_hash_table_oids text[] := '{}'; | |
non_hash_table text := ''; | |
resultJson json; | |
BEGIN | |
SELECT | |
array_agg(logicalrelid::regclass) | |
INTO | |
non_hash_table_oids | |
FROM | |
pg_dist_partition | |
WHERE | |
partmethod != 'h' AND | |
colocationId = 0; | |
IF non_hash_table_oids is null THEN | |
non_hash_table_oids = '{}'; | |
ELSE | |
FOREACH non_hash_table IN ARRAY non_hash_table_oids | |
LOOP | |
EXECUTE 'UPDATE pg_dist_partition SET colocationId =' || invalidColocationId || ' WHERE partmethod <>''h'';'; | |
END LOOP; | |
END IF; | |
WITH data(colocationId, tables) AS ( VALUES (invalidColocationId, non_hash_table_oids)) | |
SELECT row_to_json(data) INTO resultJson FROM data; | |
RETURN resultJson; | |
END; | |
$update_non_hash_colocated_ids$; | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment