Skip to content

Instantly share code, notes, and snippets.

@marcocitus
Last active September 8, 2023 09:25
Schema isolation in Citus
create table citus_schema_isolation (
schemaid regnamespace,
nodeid int,
primary key (schemaid)
);
CREATE OR REPLACE FUNCTION shard_allowed_on_node_with_schema_isolation(p_shardid bigint, p_nodeid integer)
RETURNS boolean
LANGUAGE plpgsql
AS $function$
DECLARE
node_has_isolated_schemas bool;
is_schema bool;
isolated_to_nodeid int;
BEGIN
select schemaid is not null, nodeid into is_schema, isolated_to_nodeid
from pg_dist_partition
join pg_dist_shard using (logicalrelid)
left join pg_dist_schema using (colocationid)
left join (
citus_schema_isolation
join pg_dist_node using (nodeid)
) using (schemaid)
where shardid = p_shardid;
if is_schema and isolated_to_nodeid is not null then
-- if the schema is isolated, only return true for the node it
-- is isolated to
return p_nodeid = isolated_to_nodeid;
end if;
-- when dealing with a regular distributed table shard, or non-isolated schema
-- we can only allow it if the node does not have isolated schemas
select count(*) > 0 into node_has_isolated_schemas
from citus_schema_isolation
where nodeid = p_nodeid;
return not node_has_isolated_schemas;
END;
$function$;
-- same as by_disk_size but with schema isolation
select citus_add_rebalance_strategy(
'by_disk_size_with_schema_isolation',
shard_cost_function := 'citus_shard_cost_by_disk_size',
node_capacity_function := 'citus_node_capacity_1',
shard_allowed_on_node_function := 'shard_allowed_on_node_with_schema_isolation',
default_threshold := 0.1,
minimum_threshold := 0.01,
improvement_threshold := 0.5
);
select citus_set_default_rebalance_strategy('by_disk_size_with_schema_isolation');
-- create some schemas
create table t1;
create table t1.test as select s as x, s as y from generate_series(1, 100) s;
create table t2;
create table t2.test as select s as x, s as y from generate_series(1, 100) s;
-- isolate a schema to node 2
insert into citus_schema_isolation values ('t1', 2);
-- rebalance
select citus_rebalance_start();
select citus_rebalance_status();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment