-
-
Save marcocitus/141a321a0055a5053dae44059bfeb208 to your computer and use it in GitHub Desktop.
-- set up a global cluster | |
select citus_set_coordinator_host('us-coordinator-hostname', 5432); | |
select citus_add_node('us-worker1-hostname', 5432); | |
select citus_add_node('us-worker2-hostname', 5432); | |
select citus_add_node('eu-coordinator-hostname', 5432); | |
select citus_add_node('eu-worker1-hostname', 5432); | |
select citus_add_node('eu-worker2-hostname', 5432); | |
-- make sure we can run distributed queries on EU coordinator | |
select start_metadata_sync_to_node('eu-coordinator-hostname', 5432); | |
-- lazily store location information in unused noderack field | |
update pg_dist_node set noderack = 'eu' where nodename like 'eu-*'; | |
update pg_dist_node set noderack = 'us' where nodename like 'us-*'; |
-- clean up from the previous run of this script | |
drop schema if exists us cascade; | |
drop schema if exists eu cascade; | |
drop schema if exists global cascade; | |
-- create schemas for different locations | |
create schema us; | |
create schema eu; | |
create schema global; | |
-- create a rebalance strategy that maps schemas to noderacks | |
create or replace function match_schema_with_noderack(shard_id bigint, node_id int) | |
returns bool language plpgsql as $$ | |
declare | |
table_id regclass; | |
schema_name text; | |
node_location text; | |
begin | |
-- find the schema name | |
select nspname into schema_name | |
from pg_dist_shard s | |
join pg_class c on (s.logicalrelid = c.oid) | |
join pg_namespace n on (c.relnamespace = n.oid) | |
where s.shardid = shard_id; | |
-- schemas that do not have a corresponding "noderack" are allowed anywhere | |
if not exists (select 1 from pg_dist_node where noderack = schema_name) then | |
return true; | |
end if; | |
-- noderack field is used to store node location | |
select noderack into node_location | |
from pg_dist_node | |
where nodeid = node_id; | |
-- allow schemas that match a noderack only on that noderack | |
return node_location = schema_name; | |
end; $$; | |
select citus_add_rebalance_strategy('geo', 'citus_shard_cost_1', 'citus_node_capacity_1', 'match_schema_with_noderack', 0); | |
-- create distributed tables in EU and US | |
create table us.test (region text, key text, value bigint); | |
select create_distributed_table('us.test','key', colocate_with := 'none'); | |
create table eu.test (region text, key text, value bigint); | |
select create_distributed_table('eu.test','key', colocate_with := 'none'); | |
-- move shards to their intended geo using the rebalancer | |
select rebalance_table_shards('us.test', rebalance_strategy := 'geo'); | |
select rebalance_table_shards('eu.test', rebalance_strategy := 'geo'); | |
-- Set up global table using postgres_fdw and partitioning. These steps should be repeated on every coordinator. | |
create extension postgres_fdw; | |
create server us_server foreign data wrapper postgres_fdw options (host 'us-coordinator-hostname', port '5432'); | |
create user mapping for marco server us_server options (user 'marco'); | |
create server eu_server foreign data wrapper postgres_fdw options (host 'eu-coordinator-hostname', port '5432'); | |
create user mapping for marco server eu_server options (user 'marco'); | |
create table global.test (region text, key text, value bigserial) partition by list (region); | |
create foreign table global.test_eu partition of global.test for values in ('France', 'Germany', 'Netherlands') server eu_server options (schema_name 'eu', table_name 'test'); | |
create foreign table global.test_us partition of global.test default server us_server options (schema_name 'us', table_name 'test'); |
-- insert EU data (will be fast from EU, slow from US) | |
insert into global.test values ('Netherlands','hello'); | |
-- query all data (will always be slow, due to cross-atlantic round trip) | |
select * from global.test; | |
┌─────────────┬───────┬───────┐ | |
│ region │ key │ value │ | |
├─────────────┼───────┼───────┤ | |
│ Netherlands │ hello │ 6 │ | |
└─────────────┴───────┴───────┘ | |
(1 row) | |
-- query only EU data (will be fast from EU, slow from US) | |
select * from eu.test; | |
┌─────────────┬───────┬───────┐ | |
│ region │ key │ value │ | |
├─────────────┼───────┼───────┤ | |
│ Netherlands │ hello │ 6 │ | |
└─────────────┴───────┴───────┘ | |
(1 row) | |
-- query only US data (will be fast from US, slow from EU) | |
select * from us.test; | |
┌────────┬─────┬───────┐ | |
│ region │ key │ value │ | |
├────────┼─────┼───────┤ | |
└────────┴─────┴───────┘ | |
(0 rows) |
Thanks for this @marcocitus. Can I have a partitioning strategy based on both region (us, eu, etc.) and a tenant_id? A tenant in my case will belong to only one region. But one region can have multiple tenants. All the data of a tenant should ideally be located in a single shard.
I think it probably makes sense to create a separate schema per shard rather than a separate schema per region, and then create single shard distributed tables in the region, though support for this is not very good yet. This is in an experimental PoC stage.
Thanks for your reply. I will then probably go ahead with multiple citus clusters in each region, each cluster having tables sharded by tenant_id. The citus cluster for a tentant will be chosen by the application layer.
Thanks for this @marcocitus. Can I have a partitioning strategy based on both region (us, eu, etc.) and a tenant_id? A tenant in my case will belong to only one region. But one region can have multiple tenants. All the data of a tenant should ideally be located in a single shard.