Last active
September 16, 2016 09:06
-
-
Save onderkalaci/b6547eb54cc68f86f08fbf1a54ddec79 to your computer and use it in GitHub Desktop.
Citus upgrade for table colocation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Preprequiste: | |
* The followings are already added by 5.3 update. Hoever, in order to test non 5.3 clusters, do the following: | |
* Add a new sequence: | |
CREATE SEQUENCE citus.pg_dist_collocation_id | |
MINVALUE 1 | |
NO CYCLE; | |
ALTER SEQUENCE citus.pg_dist_collocation_id SET SCHEMA pg_catalog; | |
* Add a column to pg_dist_partition: | |
ALTER TABLE pg_dist_partition ADD COLUMN colocationId BIGINT DEFAULT 0; | |
Low-level API; | |
* colocated_placement_count(source_relation_name regclass, dest_relation_name regclass): given two table names, returns the co-located placement count | |
* shard_placement_count(relation_name regclass): given a relation returns the shard placement count | |
* table_colocation_id(tablename regclass): given a table, returns its colocationid | |
High-level API:; | |
* tables_have_colocated_placements(source_relation_name regclass, dest_relation_name regclass): given two tables, returns true if two tables are co-located | |
* update_table_colocation_ids(dry_run BOOL DEFAULT true): Iterates over all distributed tables. For all hash-partitioned tables which does not have a valid co-locationid, the function finds its co-located tables and gives the same colocation id. For all non-hash-partitioned tables, the function updates colocationid to INVALID_COLOCATION_ID which is 0. When dry_run is set to true, the function generates psudo-ids and do not update the colocationid's on pg_dist_partition table. | |
Some examples; | |
CREATE TABLE hash_4_1_v1 | |
( | |
a int, | |
b int | |
); | |
SELECT master_create_distributed_table('hash_4_1_v1', 'a', 'hash'); | |
SELECT master_create_worker_shards('hash_4_1_v1', 4, 1); | |
CREATE TABLE hash_4_1_v2 | |
( | |
a int, | |
b int | |
); | |
SELECT master_create_distributed_table('hash_4_1_v2', 'a', 'hash'); | |
SELECT master_create_worker_shards('hash_4_1_v2', 4, 1); | |
CREATE SCHEMA test_schema; | |
CREATE TABLE test_schema.hash_4_1_v1 | |
( | |
a int, | |
b int | |
); | |
SELECT master_create_distributed_table('test_schema.hash_4_1_v1', 'a', 'hash'); | |
SELECT master_create_worker_shards('test_schema.hash_4_1_v1', 4, 1); | |
CREATE TABLE test_schema.hash_4_1_v2 | |
( | |
a int, | |
b int | |
); | |
SELECT master_create_distributed_table('test_schema.hash_4_1_v2', 'a', 'hash'); | |
SELECT master_create_worker_shards('test_schema.hash_4_1_v2', 4, 1); | |
CREATE TABLE hash_4_2_v1 | |
( | |
a int, | |
b int | |
); | |
SELECT master_create_distributed_table('hash_4_2_v1', 'a', 'hash'); | |
SELECT master_create_worker_shards('hash_4_2_v1', 4, 2); | |
CREATE TABLE hash_4_2_v2 | |
( | |
a int, | |
b int | |
); | |
SELECT master_create_distributed_table('hash_4_2_v2', 'a', 'hash'); | |
SELECT master_create_worker_shards('hash_4_2_v2', 4, 2); | |
CREATE TABLE hash_8_1_v1 | |
( | |
a int, | |
b int | |
); | |
SELECT master_create_distributed_table('hash_8_1_v1', 'a', 'hash'); | |
SELECT master_create_worker_shards('hash_8_1_v1', 8, 1); | |
CREATE TABLE hash_8_1_v2 | |
( | |
a int, | |
b int | |
); | |
SELECT master_create_distributed_table('hash_4_2_v2', 'a', 'hash'); | |
SELECT master_create_worker_shards('hash_4_2_v2', 8, 1); | |
CREATE TABLE hash_32_2_v1 | |
( | |
a int, | |
b int | |
); | |
SELECT master_create_distributed_table('hash_32_2_v1', 'a', 'hash'); | |
SELECT master_create_worker_shards('hash_32_2_v1', 32, 2); | |
CREATE TABLE append_no_shards | |
( | |
a int, | |
b int | |
); | |
SELECT master_create_distributed_table('append_no_shards', 'a', 'append'); | |
CREATE TABLE append_4_shards_1_rep | |
( | |
a int, | |
b int | |
); | |
SELECT master_create_distributed_table('append_4_shards_1_rep', 'a', 'append'); | |
SET citus.replication_factor TO 1; | |
SELECT master_create_empty_shard('append_4_shards_1_rep'); | |
SELECT master_create_empty_shard('append_4_shards_1_rep'); | |
SELECT master_create_empty_shard('append_4_shards_1_rep'); | |
SELECT master_create_empty_shard('append_4_shards_1_rep'); | |
CREATE TABLE append_4_shards_2_rep | |
( | |
a int, | |
b int | |
); | |
SELECT master_create_distributed_table('append_4_shards_2_rep', 'a', 'append'); | |
SET citus.replication_factor TO 2; | |
SELECT master_create_empty_shard('append_4_shards_2_rep'); | |
SELECT master_create_empty_shard('append_4_shards_2_rep'); | |
SELECT master_create_empty_shard('append_4_shards_2_rep'); | |
SELECT master_create_empty_shard('append_4_shards_2_rep'); | |
SELECT tables_have_colocated_placements('hash_4_1_v1', 'hash_4_1_v2'); | |
SELECT tables_have_colocated_placements('hash_4_1_v1', 'test_schema.hash_4_1_v1'); | |
SELECT tables_have_colocated_placements('hash_4_1_v1', 'test_schema.hash_4_1_v2'); | |
SELECT tables_have_colocated_placements('hash_4_1_v1', 'hash_4_2_v2'); | |
SELECT tables_have_colocated_placements('hash_4_1_v1', 'hash_8_1_v1'); | |
SELECT tables_have_colocated_placements('hash_4_2_v2', 'hash_8_1_v1'); | |
SELECT tables_have_colocated_placements('hash_4_1_v1', 'append_4_shards_1_rep'); | |
SELECT tables_have_colocated_placements('hash_8_1_v1', 'append_4_shards_2_rep'); | |
SELECT tables_have_colocated_placements('hash_8_1_v1', 'hash_32_2_v1'); | |
SELECT update_table_colocation_ids(dry_run=>true); | |
SELECT update_table_colocation_ids(dry_run=>false); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- given two table names, returns the co-located placement count | |
CREATE OR REPLACE FUNCTION colocated_placement_count(source_relation_name regclass, dest_relation_name regclass) | |
RETURNS INT | |
LANGUAGE plpgsql | |
SECURITY DEFINER | |
SET search_path = pg_catalog | |
AS $colocated_placement_count$ | |
DECLARE | |
shard_placement_count_val INT := 0; | |
BEGIN | |
PERFORM | |
* | |
FROM | |
pg_dist_shard s1, pg_dist_shard s2, pg_dist_shard_placement p1, pg_dist_shard_placement p2 | |
WHERE | |
-- for the given tables | |
s1.logicalrelid = source_relation_name AND | |
s2.logicalrelid = dest_relation_name AND | |
-- associate shards and their corresponding placements | |
s1.shardid = p1.shardid AND | |
s2.shardid = p2.shardid AND | |
-- both shards have the same min/max values | |
s1.shardminvalue = s2.shardminvalue AND | |
s1.shardmaxvalue = s2.shardmaxvalue AND | |
-- both placements are on the same node | |
p1.nodename = p2.nodename AND | |
p1.nodeport = p2.nodeport; | |
-- the following is commented out, but, in case placements are required uncomment | |
--ORDER BY s1.shardminvalue::int, s1.shardmaxvalue::int, p1.nodename , p1.nodeport DESC; | |
GET DIAGNOSTICS shard_placement_count_val = ROW_COUNT; | |
RETURN shard_placement_count_val; | |
END; | |
$colocated_placement_count$; | |
-- given a relation returns the shard placement count | |
CREATE OR REPLACE FUNCTION shard_placement_count(relation_name regclass) | |
RETURNS INT | |
LANGUAGE plpgsql | |
SECURITY DEFINER | |
SET search_path = pg_catalog | |
AS $shard_placement_count$ | |
DECLARE | |
shard_placement_count_val INT := 0; | |
BEGIN | |
SELECT | |
count(*) | |
INTO | |
shard_placement_count_val | |
FROM | |
pg_dist_shard, pg_dist_shard_placement | |
WHERE | |
pg_dist_shard.logicalrelid = relation_name::oid AND | |
pg_dist_shard.shardid = pg_dist_shard_placement.shardid; | |
RETURN shard_placement_count_val; | |
END; | |
$shard_placement_count$; | |
CREATE OR REPLACE FUNCTION hash_partitioned_colocated_tables() | |
RETURNS text[] | |
LANGUAGE plpgsql | |
SECURITY DEFINER | |
SET search_path = pg_catalog | |
AS $hash_partitioned_non_colocated_tables$ | |
DECLARE | |
table_list text[] = '{}'; | |
invalidColocationId INT := 0; | |
BEGIN | |
SELECT | |
array_agg(logicalrelid::regclass) | |
INTO | |
table_list | |
FROM | |
pg_dist_partition | |
WHERE | |
partmethod = 'h' AND | |
colocationId <> invalidColocationId; | |
RETURN table_list; | |
END; | |
$hash_partitioned_non_colocated_tables$; | |
CREATE OR REPLACE FUNCTION non_hash_partitioned_tables() | |
RETURNS text[] | |
LANGUAGE plpgsql | |
SECURITY DEFINER | |
SET search_path = pg_catalog | |
AS $non_hash_partitioned_tables$ | |
DECLARE | |
table_list text[] = '{}'; | |
invalidColocationId INT := 0; | |
BEGIN | |
SELECT | |
array_agg(logicalrelid::regclass) | |
INTO | |
table_list | |
FROM | |
pg_dist_partition | |
WHERE | |
partmethod <> 'h'; | |
RETURN table_list; | |
END; | |
$non_hash_partitioned_tables$; | |
-- given a table, returns its colocationid | |
CREATE OR REPLACE FUNCTION table_colocation_id(tablename regclass) | |
RETURNS INT | |
LANGUAGE plpgsql | |
SECURITY DEFINER | |
SET search_path = pg_catalog | |
AS $table_colocation_id$ | |
DECLARE | |
table_list text[] = '{}'; | |
colocationIdValue INT := 0; | |
BEGIN | |
SELECT | |
colocationId | |
INTO | |
colocationIdValue | |
FROM | |
pg_dist_partition | |
WHERE | |
logicalrelid = tablename; | |
RETURN colocationIdValue; | |
END; | |
$table_colocation_id$; | |
CREATE OR REPLACE FUNCTION hash_partitioned_non_colocated_tables() | |
RETURNS text[] | |
LANGUAGE plpgsql | |
SECURITY DEFINER | |
SET search_path = pg_catalog | |
AS $hash_partitioned_non_colocated_tables$ | |
DECLARE | |
table_list text[] = '{}'; | |
invalidColocationId INT := 0; | |
BEGIN | |
SELECT | |
array_agg(logicalrelid::regclass) | |
INTO | |
table_list | |
FROM | |
pg_dist_partition | |
WHERE | |
partmethod = 'h' AND | |
colocationId = invalidColocationId; | |
RETURN table_list; | |
END; | |
$hash_partitioned_non_colocated_tables$; | |
-- given two tables, returns true if two tables are co-located | |
CREATE OR REPLACE FUNCTION tables_have_colocated_placements(source_relation_name regclass, dest_relation_name regclass) | |
RETURNS bool | |
LANGUAGE plpgsql | |
SECURITY DEFINER | |
SET search_path = pg_catalog | |
AS $tables_have_colocated_placements$ | |
DECLARE | |
source_shard_placement_count INT := 0; | |
destination_shard_placement_count INT := 0; | |
colocated_plecement_count_val INT := 0; | |
tables_colocated bool := false; | |
BEGIN | |
SELECT public.shard_placement_count(source_relation_name) | |
INTO source_shard_placement_count; | |
SELECT public.shard_placement_count(dest_relation_name) | |
INTO destination_shard_placement_count; | |
SELECT public.colocated_placement_count(source_relation_name, dest_relation_name) | |
INTO colocated_plecement_count_val; | |
IF source_shard_placement_count = destination_shard_placement_count AND colocated_plecement_count_val = source_shard_placement_count THEN | |
tables_colocated = true; | |
END IF; | |
RETURN tables_colocated; | |
END; | |
$tables_have_colocated_placements$; | |
-- Iterates over all distributed tables. For all hash-partitioned tables which does not | |
-- have a valid co-locationid, the function finds its co-located tables and gives the same | |
-- colocation id. For all non-hash-partitioned tables, the function updates colocationid to | |
-- INVALID_COLOCATION_ID which is 0. When dry_run is set to true, the function generates | |
-- psudo-ids and do not update the colocationid's on pg_dist_partition table. | |
CREATE OR REPLACE FUNCTION update_table_colocation_ids(dry_run BOOL DEFAULT true) | |
RETURNS VOID | |
LANGUAGE plpgsql | |
SECURITY DEFINER | |
SET search_path = pg_catalog | |
AS $update_table_colocation_ids$ | |
DECLARE | |
hash_table_oids text[] := '{}'; | |
hash_table text := ''; | |
non_hash_table_oids text[] := '{}'; | |
non_hash_table text := ''; | |
non_hash_table_array text[] := '{}'; | |
colocated_table_id text := ''; | |
colocated_table_list text[] := '{}'; | |
current_table_colocation_id BIGINT := 0; | |
current_colocated_table_list text[] := '{}'; | |
table_colocated bool := false; | |
table_colocation_id_counter BIGINT := 1; | |
new_table_colocation_id BIGINT := 0; | |
invalidColocationId BIGINT := 0; | |
tmp_table_record record; | |
BEGIN | |
-- get hash partitioned tables whose colocationid field | |
-- is InvalidColocationId on pg_dist_partition | |
SELECT public.hash_partitioned_non_colocated_tables() INTO hash_table_oids; | |
-- all hash partitioned tables already have colocation id, | |
-- so there is nothing to process | |
IF hash_table_oids is null THEN | |
RAISE NOTICE 'No hash partitioned tables with InvalidColocationId.'; | |
RETURN ; | |
END IF; | |
-- create a temporary table for ease of implementation | |
DROP TABLE IF EXISTS tmp_table_collocation; | |
CREATE TEMP TABLE tmp_table_collocation (colocationid BIGINT, collocated_tables text[]); | |
-- get distributed tables that are not hash partitioned | |
SELECT | |
public.non_hash_partitioned_tables() | |
INTO | |
non_hash_table_oids; | |
-- iterate over non-hash partitioned tables and add them all to invalidColocationId list | |
FOREACH non_hash_table IN ARRAY non_hash_table_oids | |
LOOP | |
non_hash_table_array = non_hash_table_array || ARRAY[non_hash_table::text]; | |
END LOOP; | |
-- populdate temp table with the existing colocation ids | |
INSERT INTO tmp_table_collocation | |
SELECT colocationid, array_agg(logicalrelid::regclass::text) FROM pg_dist_partition WHERE colocationid <> invalidColocationId GROUP BY colocationid ; | |
-- now, insert non_hash tables with invalidColocationId | |
INSERT INTO tmp_table_collocation VALUES (invalidColocationId, non_hash_table_array); | |
-- iterate over the hash partitioned tables which does not have a valid colocationId | |
FOREACH hash_table IN ARRAY hash_table_oids | |
LOOP | |
-- check that if there already exists a hash partitioned table, which have the same colocation configuration | |
SELECT | |
public.tables_have_colocated_placements(hash_table, collocated_tables[1]), colocationid, collocated_tables | |
INTO table_colocated, current_table_colocation_id, current_colocated_table_list | |
FROM | |
tmp_table_collocation | |
WHERE | |
public.tables_have_colocated_placements(hash_table, collocated_tables[1]); | |
-- if there exists any colocated table, update the colocation group and add the table | |
IF table_colocated THEN | |
UPDATE | |
tmp_table_collocation | |
SET | |
collocated_tables = collocated_tables || ARRAY[hash_table::text] | |
WHERE | |
colocationId = current_table_colocation_id; | |
ELSE | |
-- if not found, generate a new colocationId to use | |
IF dry_run = true THEN | |
-- if dry_run, then use a psudo id | |
new_table_colocation_id = table_colocation_id_counter; | |
-- then increment the psudo id | |
table_colocation_id_counter = table_colocation_id_counter + 1; | |
ELSE | |
SELECT nextval('pg_dist_collocation_id') INTO new_table_colocation_id; | |
END IF; | |
-- now that insert this to the table | |
INSERT INTO tmp_table_collocation VALUES (new_table_colocation_id, ARRAY[hash_table]::text[]); | |
END IF; | |
END LOOP; | |
FOR tmp_table_record IN SELECT * FROM tmp_table_collocation ORDER BY colocationid | |
LOOP | |
RAISE NOTICE 'ColocationId: % - Colocated Tables: %', tmp_table_record.colocationid, tmp_table_record.collocated_tables; | |
-- OK, if it is not a dry run, | |
-- lets update the pg_dist_partition tables | |
IF dry_run = false then | |
UPDATE | |
pg_dist_partition | |
SET | |
colocationid = tmp_table_record.colocationid | |
WHERE | |
logicalrelid::regclass::text = ANY(tmp_table_record.collocated_tables); | |
END IF; | |
END LOOP; | |
RETURN ; | |
END; | |
$update_table_colocation_ids$; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment