Skip to content

Instantly share code, notes, and snippets.

@onderkalaci
Last active September 16, 2016 09:06
Show Gist options
  • Save onderkalaci/b6547eb54cc68f86f08fbf1a54ddec79 to your computer and use it in GitHub Desktop.
Save onderkalaci/b6547eb54cc68f86f08fbf1a54ddec79 to your computer and use it in GitHub Desktop.
Citus upgrade for table colocation
Preprequiste:
* The followings are already added by 5.3 update. Hoever, in order to test non 5.3 clusters, do the following:
* Add a new sequence:
CREATE SEQUENCE citus.pg_dist_collocation_id
MINVALUE 1
NO CYCLE;
ALTER SEQUENCE citus.pg_dist_collocation_id SET SCHEMA pg_catalog;
* Add a column to pg_dist_partition:
ALTER TABLE pg_dist_partition ADD COLUMN colocationId BIGINT DEFAULT 0;
Low-level API;
* colocated_placement_count(source_relation_name regclass, dest_relation_name regclass): given two table names, returns the co-located placement count
* shard_placement_count(relation_name regclass): given a relation returns the shard placement count
* table_colocation_id(tablename regclass): given a table, returns its colocationid
High-level API:;
* tables_have_colocated_placements(source_relation_name regclass, dest_relation_name regclass): given two tables, returns true if two tables are co-located
* update_table_colocation_ids(dry_run BOOL DEFAULT true): Iterates over all distributed tables. For all hash-partitioned tables which does not have a valid co-locationid, the function finds its co-located tables and gives the same colocation id. For all non-hash-partitioned tables, the function updates colocationid to INVALID_COLOCATION_ID which is 0. When dry_run is set to true, the function generates psudo-ids and do not update the colocationid's on pg_dist_partition table.
Some examples;
CREATE TABLE hash_4_1_v1
(
a int,
b int
);
SELECT master_create_distributed_table('hash_4_1_v1', 'a', 'hash');
SELECT master_create_worker_shards('hash_4_1_v1', 4, 1);
CREATE TABLE hash_4_1_v2
(
a int,
b int
);
SELECT master_create_distributed_table('hash_4_1_v2', 'a', 'hash');
SELECT master_create_worker_shards('hash_4_1_v2', 4, 1);
CREATE SCHEMA test_schema;
CREATE TABLE test_schema.hash_4_1_v1
(
a int,
b int
);
SELECT master_create_distributed_table('test_schema.hash_4_1_v1', 'a', 'hash');
SELECT master_create_worker_shards('test_schema.hash_4_1_v1', 4, 1);
CREATE TABLE test_schema.hash_4_1_v2
(
a int,
b int
);
SELECT master_create_distributed_table('test_schema.hash_4_1_v2', 'a', 'hash');
SELECT master_create_worker_shards('test_schema.hash_4_1_v2', 4, 1);
CREATE TABLE hash_4_2_v1
(
a int,
b int
);
SELECT master_create_distributed_table('hash_4_2_v1', 'a', 'hash');
SELECT master_create_worker_shards('hash_4_2_v1', 4, 2);
CREATE TABLE hash_4_2_v2
(
a int,
b int
);
SELECT master_create_distributed_table('hash_4_2_v2', 'a', 'hash');
SELECT master_create_worker_shards('hash_4_2_v2', 4, 2);
CREATE TABLE hash_8_1_v1
(
a int,
b int
);
SELECT master_create_distributed_table('hash_8_1_v1', 'a', 'hash');
SELECT master_create_worker_shards('hash_8_1_v1', 8, 1);
CREATE TABLE hash_8_1_v2
(
a int,
b int
);
SELECT master_create_distributed_table('hash_4_2_v2', 'a', 'hash');
SELECT master_create_worker_shards('hash_4_2_v2', 8, 1);
CREATE TABLE hash_32_2_v1
(
a int,
b int
);
SELECT master_create_distributed_table('hash_32_2_v1', 'a', 'hash');
SELECT master_create_worker_shards('hash_32_2_v1', 32, 2);
CREATE TABLE append_no_shards
(
a int,
b int
);
SELECT master_create_distributed_table('append_no_shards', 'a', 'append');
CREATE TABLE append_4_shards_1_rep
(
a int,
b int
);
SELECT master_create_distributed_table('append_4_shards_1_rep', 'a', 'append');
SET citus.replication_factor TO 1;
SELECT master_create_empty_shard('append_4_shards_1_rep');
SELECT master_create_empty_shard('append_4_shards_1_rep');
SELECT master_create_empty_shard('append_4_shards_1_rep');
SELECT master_create_empty_shard('append_4_shards_1_rep');
CREATE TABLE append_4_shards_2_rep
(
a int,
b int
);
SELECT master_create_distributed_table('append_4_shards_2_rep', 'a', 'append');
SET citus.replication_factor TO 2;
SELECT master_create_empty_shard('append_4_shards_2_rep');
SELECT master_create_empty_shard('append_4_shards_2_rep');
SELECT master_create_empty_shard('append_4_shards_2_rep');
SELECT master_create_empty_shard('append_4_shards_2_rep');
SELECT tables_have_colocated_placements('hash_4_1_v1', 'hash_4_1_v2');
SELECT tables_have_colocated_placements('hash_4_1_v1', 'test_schema.hash_4_1_v1');
SELECT tables_have_colocated_placements('hash_4_1_v1', 'test_schema.hash_4_1_v2');
SELECT tables_have_colocated_placements('hash_4_1_v1', 'hash_4_2_v2');
SELECT tables_have_colocated_placements('hash_4_1_v1', 'hash_8_1_v1');
SELECT tables_have_colocated_placements('hash_4_2_v2', 'hash_8_1_v1');
SELECT tables_have_colocated_placements('hash_4_1_v1', 'append_4_shards_1_rep');
SELECT tables_have_colocated_placements('hash_8_1_v1', 'append_4_shards_2_rep');
SELECT tables_have_colocated_placements('hash_8_1_v1', 'hash_32_2_v1');
SELECT update_table_colocation_ids(dry_run=>true);
SELECT update_table_colocation_ids(dry_run=>false);
-- given two table names, returns the co-located placement count
CREATE OR REPLACE FUNCTION colocated_placement_count(source_relation_name regclass, dest_relation_name regclass)
RETURNS INT
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = pg_catalog
AS $colocated_placement_count$
DECLARE
shard_placement_count_val INT := 0;
BEGIN
PERFORM
*
FROM
pg_dist_shard s1, pg_dist_shard s2, pg_dist_shard_placement p1, pg_dist_shard_placement p2
WHERE
-- for the given tables
s1.logicalrelid = source_relation_name AND
s2.logicalrelid = dest_relation_name AND
-- associate shards and their corresponding placements
s1.shardid = p1.shardid AND
s2.shardid = p2.shardid AND
-- both shards have the same min/max values
s1.shardminvalue = s2.shardminvalue AND
s1.shardmaxvalue = s2.shardmaxvalue AND
-- both placements are on the same node
p1.nodename = p2.nodename AND
p1.nodeport = p2.nodeport;
-- the following is commented out, but, in case placements are required uncomment
--ORDER BY s1.shardminvalue::int, s1.shardmaxvalue::int, p1.nodename , p1.nodeport DESC;
GET DIAGNOSTICS shard_placement_count_val = ROW_COUNT;
RETURN shard_placement_count_val;
END;
$colocated_placement_count$;
-- given a relation returns the shard placement count
CREATE OR REPLACE FUNCTION shard_placement_count(relation_name regclass)
RETURNS INT
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = pg_catalog
AS $shard_placement_count$
DECLARE
shard_placement_count_val INT := 0;
BEGIN
SELECT
count(*)
INTO
shard_placement_count_val
FROM
pg_dist_shard, pg_dist_shard_placement
WHERE
pg_dist_shard.logicalrelid = relation_name::oid AND
pg_dist_shard.shardid = pg_dist_shard_placement.shardid;
RETURN shard_placement_count_val;
END;
$shard_placement_count$;
CREATE OR REPLACE FUNCTION hash_partitioned_colocated_tables()
RETURNS text[]
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = pg_catalog
AS $hash_partitioned_non_colocated_tables$
DECLARE
table_list text[] = '{}';
invalidColocationId INT := 0;
BEGIN
SELECT
array_agg(logicalrelid::regclass)
INTO
table_list
FROM
pg_dist_partition
WHERE
partmethod = 'h' AND
colocationId <> invalidColocationId;
RETURN table_list;
END;
$hash_partitioned_non_colocated_tables$;
CREATE OR REPLACE FUNCTION non_hash_partitioned_tables()
RETURNS text[]
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = pg_catalog
AS $non_hash_partitioned_tables$
DECLARE
table_list text[] = '{}';
invalidColocationId INT := 0;
BEGIN
SELECT
array_agg(logicalrelid::regclass)
INTO
table_list
FROM
pg_dist_partition
WHERE
partmethod <> 'h';
RETURN table_list;
END;
$non_hash_partitioned_tables$;
-- given a table, returns its colocationid
CREATE OR REPLACE FUNCTION table_colocation_id(tablename regclass)
RETURNS INT
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = pg_catalog
AS $table_colocation_id$
DECLARE
table_list text[] = '{}';
colocationIdValue INT := 0;
BEGIN
SELECT
colocationId
INTO
colocationIdValue
FROM
pg_dist_partition
WHERE
logicalrelid = tablename;
RETURN colocationIdValue;
END;
$table_colocation_id$;
CREATE OR REPLACE FUNCTION hash_partitioned_non_colocated_tables()
RETURNS text[]
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = pg_catalog
AS $hash_partitioned_non_colocated_tables$
DECLARE
table_list text[] = '{}';
invalidColocationId INT := 0;
BEGIN
SELECT
array_agg(logicalrelid::regclass)
INTO
table_list
FROM
pg_dist_partition
WHERE
partmethod = 'h' AND
colocationId = invalidColocationId;
RETURN table_list;
END;
$hash_partitioned_non_colocated_tables$;
-- given two tables, returns true if two tables are co-located
CREATE OR REPLACE FUNCTION tables_have_colocated_placements(source_relation_name regclass, dest_relation_name regclass)
RETURNS bool
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = pg_catalog
AS $tables_have_colocated_placements$
DECLARE
source_shard_placement_count INT := 0;
destination_shard_placement_count INT := 0;
colocated_plecement_count_val INT := 0;
tables_colocated bool := false;
BEGIN
SELECT public.shard_placement_count(source_relation_name)
INTO source_shard_placement_count;
SELECT public.shard_placement_count(dest_relation_name)
INTO destination_shard_placement_count;
SELECT public.colocated_placement_count(source_relation_name, dest_relation_name)
INTO colocated_plecement_count_val;
IF source_shard_placement_count = destination_shard_placement_count AND colocated_plecement_count_val = source_shard_placement_count THEN
tables_colocated = true;
END IF;
RETURN tables_colocated;
END;
$tables_have_colocated_placements$;
-- Iterates over all distributed tables. For all hash-partitioned tables which does not
-- have a valid co-locationid, the function finds its co-located tables and gives the same
-- colocation id. For all non-hash-partitioned tables, the function updates colocationid to
-- INVALID_COLOCATION_ID which is 0. When dry_run is set to true, the function generates
-- psudo-ids and do not update the colocationid's on pg_dist_partition table.
CREATE OR REPLACE FUNCTION update_table_colocation_ids(dry_run BOOL DEFAULT true)
RETURNS VOID
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = pg_catalog
AS $update_table_colocation_ids$
DECLARE
hash_table_oids text[] := '{}';
hash_table text := '';
non_hash_table_oids text[] := '{}';
non_hash_table text := '';
non_hash_table_array text[] := '{}';
colocated_table_id text := '';
colocated_table_list text[] := '{}';
current_table_colocation_id BIGINT := 0;
current_colocated_table_list text[] := '{}';
table_colocated bool := false;
table_colocation_id_counter BIGINT := 1;
new_table_colocation_id BIGINT := 0;
invalidColocationId BIGINT := 0;
tmp_table_record record;
BEGIN
-- get hash partitioned tables whose colocationid field
-- is InvalidColocationId on pg_dist_partition
SELECT public.hash_partitioned_non_colocated_tables() INTO hash_table_oids;
-- all hash partitioned tables already have colocation id,
-- so there is nothing to process
IF hash_table_oids is null THEN
RAISE NOTICE 'No hash partitioned tables with InvalidColocationId.';
RETURN ;
END IF;
-- create a temporary table for ease of implementation
DROP TABLE IF EXISTS tmp_table_collocation;
CREATE TEMP TABLE tmp_table_collocation (colocationid BIGINT, collocated_tables text[]);
-- get distributed tables that are not hash partitioned
SELECT
public.non_hash_partitioned_tables()
INTO
non_hash_table_oids;
-- iterate over non-hash partitioned tables and add them all to invalidColocationId list
FOREACH non_hash_table IN ARRAY non_hash_table_oids
LOOP
non_hash_table_array = non_hash_table_array || ARRAY[non_hash_table::text];
END LOOP;
-- populdate temp table with the existing colocation ids
INSERT INTO tmp_table_collocation
SELECT colocationid, array_agg(logicalrelid::regclass::text) FROM pg_dist_partition WHERE colocationid <> invalidColocationId GROUP BY colocationid ;
-- now, insert non_hash tables with invalidColocationId
INSERT INTO tmp_table_collocation VALUES (invalidColocationId, non_hash_table_array);
-- iterate over the hash partitioned tables which does not have a valid colocationId
FOREACH hash_table IN ARRAY hash_table_oids
LOOP
-- check that if there already exists a hash partitioned table, which have the same colocation configuration
SELECT
public.tables_have_colocated_placements(hash_table, collocated_tables[1]), colocationid, collocated_tables
INTO table_colocated, current_table_colocation_id, current_colocated_table_list
FROM
tmp_table_collocation
WHERE
public.tables_have_colocated_placements(hash_table, collocated_tables[1]);
-- if there exists any colocated table, update the colocation group and add the table
IF table_colocated THEN
UPDATE
tmp_table_collocation
SET
collocated_tables = collocated_tables || ARRAY[hash_table::text]
WHERE
colocationId = current_table_colocation_id;
ELSE
-- if not found, generate a new colocationId to use
IF dry_run = true THEN
-- if dry_run, then use a psudo id
new_table_colocation_id = table_colocation_id_counter;
-- then increment the psudo id
table_colocation_id_counter = table_colocation_id_counter + 1;
ELSE
SELECT nextval('pg_dist_collocation_id') INTO new_table_colocation_id;
END IF;
-- now that insert this to the table
INSERT INTO tmp_table_collocation VALUES (new_table_colocation_id, ARRAY[hash_table]::text[]);
END IF;
END LOOP;
FOR tmp_table_record IN SELECT * FROM tmp_table_collocation ORDER BY colocationid
LOOP
RAISE NOTICE 'ColocationId: % - Colocated Tables: %', tmp_table_record.colocationid, tmp_table_record.collocated_tables;
-- OK, if it is not a dry run,
-- lets update the pg_dist_partition tables
IF dry_run = false then
UPDATE
pg_dist_partition
SET
colocationid = tmp_table_record.colocationid
WHERE
logicalrelid::regclass::text = ANY(tmp_table_record.collocated_tables);
END IF;
END LOOP;
RETURN ;
END;
$update_table_colocation_ids$;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment