Skip to content

Instantly share code, notes, and snippets.

@marcocitus
Last active September 7, 2022 14:27
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save marcocitus/141a321a0055a5053dae44059bfeb208 to your computer and use it in GitHub Desktop.
Basic geo-partitioning for Citus
-- set up a global cluster
select citus_set_coordinator_host('us-coordinator-hostname', 5432);
select citus_add_node('us-worker1-hostname', 5432);
select citus_add_node('us-worker2-hostname', 5432);
select citus_add_node('eu-coordinator-hostname', 5432);
select citus_add_node('eu-worker1-hostname', 5432);
select citus_add_node('eu-worker2-hostname', 5432);
-- make sure we can run distributed queries on EU coordinator
select start_metadata_sync_to_node('eu-coordinator-hostname', 5432);
-- lazily store location information in unused noderack field
update pg_dist_node set noderack = 'eu' where nodename like 'eu-*';
update pg_dist_node set noderack = 'us' where nodename like 'us-*';
-- clean up from the previous run of this script
drop schema if exists us cascade;
drop schema if exists eu cascade;
drop schema if exists global cascade;
-- create schemas for different locations
create schema us;
create schema eu;
create schema global;
-- create a rebalance strategy that maps schemas to noderacks
create or replace function match_schema_with_noderack(shard_id bigint, node_id int)
returns bool language plpgsql as $$
declare
table_id regclass;
schema_name text;
node_location text;
begin
-- find the schema name
select nspname into schema_name
from pg_dist_shard s
join pg_class c on (s.logicalrelid = c.oid)
join pg_namespace n on (c.relnamespace = n.oid)
where s.shardid = shard_id;
-- schemas that do not have a corresponding "noderack" are allowed anywhere
if not exists (select 1 from pg_dist_node where noderack = schema_name) then
return true;
end if;
-- noderack field is used to store node location
select noderack into node_location
from pg_dist_node
where nodeid = node_id;
-- allow schemas that match a noderack only on that noderack
return node_location = schema_name;
end; $$;
select citus_add_rebalance_strategy('geo', 'citus_shard_cost_1', 'citus_node_capacity_1', 'match_schema_with_noderack', 0);
-- create distributed tables in EU and US
create table us.test (region text, key text, value bigint);
select create_distributed_table('us.test','key', colocate_with := 'none');
create table eu.test (region text, key text, value bigint);
select create_distributed_table('eu.test','key', colocate_with := 'none');
-- move shards to their intended geo using the rebalancer
select rebalance_table_shards('us.test', rebalance_strategy := 'geo');
select rebalance_table_shards('eu.test', rebalance_strategy := 'geo');
-- Set up global table using postgres_fdw and partitioning. These steps should be repeated on every coordinator.
create extension postgres_fdw;
create server us_server foreign data wrapper postgres_fdw options (host 'us-coordinator-hostname', port '5432');
create user mapping for marco server us_server options (user 'marco');
create server eu_server foreign data wrapper postgres_fdw options (host 'eu-coordinator-hostname', port '5432');
create user mapping for marco server eu_server options (user 'marco');
create table global.test (region text, key text, value bigserial) partition by list (region);
create foreign table global.test_eu partition of global.test for values in ('France', 'Germany', 'Netherlands') server eu_server options (schema_name 'eu', table_name 'test');
create foreign table global.test_us partition of global.test default server us_server options (schema_name 'us', table_name 'test');
-- insert EU data (will be fast from EU, slow from US)
insert into global.test values ('Netherlands','hello');
-- query all data (will always be slow, due to cross-atlantic round trip)
select * from global.test;
┌─────────────┬───────┬───────┐
│ region │ key │ value │
├─────────────┼───────┼───────┤
│ Netherlands │ hello │ 6 │
└─────────────┴───────┴───────┘
(1 row)
-- query only EU data (will be fast from EU, slow from US)
select * from eu.test;
┌─────────────┬───────┬───────┐
│ region │ key │ value │
├─────────────┼───────┼───────┤
│ Netherlands │ hello │ 6 │
└─────────────┴───────┴───────┘
(1 row)
-- query only US data (will be fast from US, slow from EU)
select * from us.test;
┌────────┬─────┬───────┐
│ region │ key │ value │
├────────┼─────┼───────┤
└────────┴─────┴───────┘
(0 rows)
@mkstayalive
Copy link

Thanks for this @marcocitus. Can I have a partitioning strategy based on both region (us, eu, etc.) and a tenant_id? A tenant in my case will belong to only one region. But one region can have multiple tenants. All the data of a tenant should ideally be located in a single shard.

@marcocitus
Copy link
Author

marcocitus commented Sep 7, 2022

Thanks for this @marcocitus. Can I have a partitioning strategy based on both region (us, eu, etc.) and a tenant_id? A tenant in my case will belong to only one region. But one region can have multiple tenants. All the data of a tenant should ideally be located in a single shard.

I think it probably makes sense to create a separate schema per shard rather than a separate schema per region, and then create single shard distributed tables in the region, though support for this is not very good yet. This is in an experimental PoC stage.

@mkstayalive
Copy link

Thanks for your reply. I will then probably go ahead with multiple citus clusters in each region, each cluster having tables sharded by tenant_id. The citus cluster for a tentant will be chosen by the application layer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment