Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Basic geo-partitioning for Citus
-- set up a global cluster
select citus_set_coordinator_host('us-coordinator-hostname', 5432);
select citus_add_node('us-worker1-hostname', 5432);
select citus_add_node('us-worker2-hostname', 5432);
select citus_add_node('eu-coordinator-hostname', 5432);
select citus_add_node('eu-worker1-hostname', 5432);
select citus_add_node('eu-worker2-hostname', 5432);
-- make sure we can run distributed queries on EU coordinator
select start_metadata_sync_to_node('eu-coordinator-hostname', 5432);
-- lazily store location information in unused noderack field
update pg_dist_node set noderack = 'eu' where nodename like 'eu-*';
update pg_dist_node set noderack = 'us' where nodename like 'us-*';
-- clean up from the previous run of this script
drop schema if exists us cascade;
drop schema if exists eu cascade;
drop schema if exists global cascade;
-- create schemas for different locations
create schema us;
create schema eu;
create schema global;
-- create a rebalance strategy that maps schemas to noderacks
create or replace function match_schema_with_noderack(shard_id bigint, node_id int)
returns bool language plpgsql as $$
declare
table_id regclass;
schema_name text;
node_location text;
begin
-- find the schema name
select nspname into schema_name
from pg_dist_shard s
join pg_class c on (s.logicalrelid = c.oid)
join pg_namespace n on (c.relnamespace = n.oid)
where s.shardid = shard_id;
-- schemas that do not have a corresponding "noderack" are allowed anywhere
if not exists (select 1 from pg_dist_node where noderack = schema_name) then
return true;
end if;
-- noderack field is used to store node location
select noderack into node_location
from pg_dist_node
where nodeid = node_id;
-- allow schemas that match a noderack only on that noderack
return node_location = schema_name;
end; $$;
select citus_add_rebalance_strategy('geo', 'citus_shard_cost_1', 'citus_node_capacity_1', 'match_schema_with_noderack', 0);
-- create distributed tables in EU and US
create table us.test (region text, key text, value bigint);
select create_distributed_table('us.test','key', colocate_with := 'none');
create table eu.test (region text, key text, value bigint);
select create_distributed_table('eu.test','key', colocate_with := 'none');
-- move shards to their intended geo using the rebalancer
select rebalance_table_shards('us.test', rebalance_strategy := 'geo');
select rebalance_table_shards('eu.test', rebalance_strategy := 'geo');
-- Set up global table using postgres_fdw and partitioning. These steps should be repeated on every coordinator.
create extension postgres_fdw;
create server us_server foreign data wrapper postgres_fdw options (host 'us-coordinator-hostname', port '5432');
create user mapping for marco server us_server options (user 'marco');
create server eu_server foreign data wrapper postgres_fdw options (host 'eu-coordinator-hostname', port '5432');
create user mapping for marco server eu_server options (user 'marco');
create table global.test (region text, key text, value bigserial) partition by list (region);
create foreign table global.test_eu partition of global.test for values in ('France', 'Germany', 'Netherlands') server eu_server options (schema_name 'eu', table_name 'test');
create foreign table global.test_us partition of global.test default server us_server options (schema_name 'us', table_name 'test');
-- insert EU data (will be fast from EU, slow from US)
insert into global.test values ('Netherlands','hello');
-- query all data (will always be slow, due to cross-atlantic round trip)
select * from global.test;
┌─────────────┬───────┬───────┐
│ region │ key │ value │
├─────────────┼───────┼───────┤
│ Netherlands │ hello │ 6
└─────────────┴───────┴───────┘
(1 row)
-- query only EU data (will be fast from EU, slow from US)
select * from eu.test;
┌─────────────┬───────┬───────┐
│ region │ key │ value │
├─────────────┼───────┼───────┤
│ Netherlands │ hello │ 6
└─────────────┴───────┴───────┘
(1 row)
-- query only US data (will be fast from US, slow from EU)
select * from us.test;
┌────────┬─────┬───────┐
│ region │ key │ value │
├────────┼─────┼───────┤
└────────┴─────┴───────┘
(0 rows)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment