Skip to content

Instantly share code, notes, and snippets.

@ololobus
Last active October 11, 2021 11:39
Show Gist options
  • Save ololobus/8fba33241f68be2e3765d27bf04882a3 to your computer and use it in GitHub Desktop.
Save ololobus/8fba33241f68be2e3765d27bf04882a3 to your computer and use it in GitHub Desktop.
PostgreSQL Multi-Tenant Sharding Test

PostgreSQL Multi-Tenant Sharding Test

Setup

  1. Create two Postgres clusters, one on 5432 port (node1) and one on 5433 (node2). Compile and install postgres_fdw extension on both.

  2. Set enable_partitionwise_join and enable_partitionwise_aggregate to on.

  3. Set postgres_fdw.use_remote_estimate to true.

  4. On node2:

\i init2.sql
  1. On node1:
\i init1.sql
\i load.sql
  1. Run ANALYSE on both node1 and node2.

Or simply run setup.sh script if all Postgres binaries are present in the PATH.

Then check some queries from the queries.sql.

Results for PostgreSQL 14 (June 2020)

At the time of 30.06.2020 PostgreSQL cannot efectively push-down queries to the specific node, when postgres_fdw + native partitioning and a filter by sharding key are used, e.g.:

postgres_node1=# EXPLAIN (
     ANALYZE,
     VERBOSE
 )
 SELECT
     *
 FROM
     documents
     INNER JOIN users ON documents.user_id = users.id
 WHERE
     documents.company_id = 5
     AND users.company_id = 5;
                                                                           QUERY PLAN                                                                            
-----------------------------------------------------------------------------------------------------------------------------------------------------------------
 Nested Loop  (cost=200.00..248.36 rows=1 width=100) (actual time=3.024..688.854 rows=20235 loops=1)
   Output: documents.company_id, documents.id, documents.user_id, documents.created_at, documents.text, users.company_id, users.id, users.created_at, users.name
   Join Filter: (documents.user_id = users.id)
   Rows Removed by Join Filter: 2003265
   ->  Foreign Scan on public.users_node2 users  (cost=100.00..124.33 rows=6 width=48) (actual time=1.280..1.697 rows=100 loops=1)
         Output: users.company_id, users.id, users.created_at, users.name
         Remote SQL: SELECT company_id, id, created_at, name FROM public.users_node2 WHERE ((company_id = 5))
   ->  Materialize  (cost=100.00..123.59 rows=5 width=52) (actual time=0.017..3.366 rows=20235 loops=100)
         Output: documents.company_id, documents.id, documents.user_id, documents.created_at, documents.text
         ->  Foreign Scan on public.documents_node2 documents  (cost=100.00..123.56 rows=5 width=52) (actual time=1.668..131.552 rows=20235 loops=1)
               Output: documents.company_id, documents.id, documents.user_id, documents.created_at, documents.text
               Remote SQL: SELECT company_id, id, user_id, created_at, text FROM public.documents_node2 WHERE ((company_id = 5))
 Planning Time: 0.378 ms
 Execution Time: 692.072 ms

It uses two foreign scans and performs a final join locally. However, once one specifies a required partition in the query it is immediately pushed-down to the required node:

postgres_node1=# EXPLAIN (
     ANALYZE,
     VERBOSE
 )
 SELECT
     *
 FROM
     documents_node2
     INNER JOIN users_node2 ON documents_node2.user_id = users_node2.id
 WHERE
     documents_node2.company_id = 5
     AND users_node2.company_id = 5;
                                                                                                                                  QUERY PLAN                                                                                                                                   
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Foreign Scan  (cost=100.00..147.92 rows=1 width=100) (actual time=2.973..204.586 rows=20235 loops=1)
   Output: documents_node2.company_id, documents_node2.id, documents_node2.user_id, documents_node2.created_at, documents_node2.text, users_node2.company_id, users_node2.id, users_node2.created_at, users_node2.name
   Relations: (public.documents_node2) INNER JOIN (public.users_node2)
   Remote SQL: SELECT r1.company_id, r1.id, r1.user_id, r1.created_at, r1.text, r2.company_id, r2.id, r2.created_at, r2.name FROM (public.documents_node2 r1 INNER JOIN public.users_node2 r2 ON (((r1.user_id = r2.id)) AND ((r2.company_id = 5)) AND ((r1.company_id = 5))))
 Planning Time: 0.217 ms
 Execution Time: 206.470 ms

You can see that execution time has dropped significantly — by more than 3 times.

DROP TABLE IF EXISTS companies CASCADE;
DROP TABLE IF EXISTS users CASCADE;
DROP TABLE IF EXISTS documents CASCADE;
DROP SERVER IF EXISTS node2 CASCADE;
CREATE EXTENSION IF NOT EXISTS postgres_fdw;
CREATE SERVER node2 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (port '5433');
CREATE USER MAPPING FOR current_user SERVER node2;
CREATE TABLE companies (
company_id int not null,
created_at timestamp without time zone default current_timestamp,
name text
) PARTITION BY HASH (company_id);
CREATE TABLE users (
company_id int not null,
id int not null,
created_at timestamp without time zone default current_timestamp,
name text
) PARTITION BY HASH (company_id);
CREATE TABLE documents (
company_id int not null,
id int not null,
user_id int not null,
created_at timestamp without time zone default current_timestamp,
text text
) PARTITION BY HASH (company_id);
CREATE TABLE companies_node1 PARTITION OF companies
FOR VALUES WITH (MODULUS 2, REMAINDER 0);
CREATE FOREIGN TABLE companies_node2 PARTITION OF companies
FOR VALUES WITH (MODULUS 2, REMAINDER 1)
SERVER node2;
CREATE TABLE users_node1 PARTITION OF users
FOR VALUES WITH (MODULUS 2, REMAINDER 0);
CREATE FOREIGN TABLE users_node2 PARTITION OF users
FOR VALUES WITH (MODULUS 2, REMAINDER 1)
SERVER node2;
CREATE TABLE documents_node1 PARTITION OF documents
FOR VALUES WITH (MODULUS 2, REMAINDER 0);
CREATE FOREIGN TABLE documents_node2 PARTITION OF documents
FOR VALUES WITH (MODULUS 2, REMAINDER 1)
SERVER node2;
ALTER TABLE companies_node1 ADD CONSTRAINT companies_pk PRIMARY KEY (company_id);
ALTER TABLE users_node1 ADD CONSTRAINT users_pk PRIMARY KEY (company_id, id);
ALTER TABLE documents_node1 ADD CONSTRAINT documents_pk PRIMARY KEY (company_id, id);
DROP TABLE IF EXISTS companies CASCADE;
DROP TABLE IF EXISTS users CASCADE;
DROP TABLE IF EXISTS documents CASCADE;
DROP SERVER IF EXISTS node1 CASCADE;
CREATE EXTENSION IF NOT EXISTS postgres_fdw;
CREATE SERVER node1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (port '5432');
CREATE USER MAPPING FOR current_user SERVER node1;
CREATE TABLE companies (
company_id int not null,
created_at timestamp without time zone default current_timestamp,
name text
) PARTITION BY HASH (company_id);
CREATE TABLE users (
company_id int not null,
id int not null,
created_at timestamp without time zone default current_timestamp,
name text
) PARTITION BY HASH (company_id);
CREATE TABLE documents (
company_id int not null,
id int not null,
user_id int not null,
created_at timestamp without time zone default current_timestamp,
text text
) PARTITION BY HASH (company_id);
CREATE TABLE companies_node2 PARTITION OF companies
FOR VALUES WITH (MODULUS 2, REMAINDER 1);
CREATE FOREIGN TABLE companies_node1 PARTITION OF companies
FOR VALUES WITH (MODULUS 2, REMAINDER 0)
SERVER node1;
CREATE TABLE users_node2 PARTITION OF users
FOR VALUES WITH (MODULUS 2, REMAINDER 1);
CREATE FOREIGN TABLE users_node1 PARTITION OF users
FOR VALUES WITH (MODULUS 2, REMAINDER 0)
SERVER node1;
CREATE TABLE documents_node2 PARTITION OF documents
FOR VALUES WITH (MODULUS 2, REMAINDER 1);
CREATE FOREIGN TABLE documents_node1 PARTITION OF documents
FOR VALUES WITH (MODULUS 2, REMAINDER 0)
SERVER node1;
ALTER TABLE companies_node2 ADD CONSTRAINT companies_pk PRIMARY KEY (company_id);
ALTER TABLE users_node2 ADD CONSTRAINT users_pk PRIMARY KEY (company_id, id);
ALTER TABLE documents_node2 ADD CONSTRAINT documents_pk PRIMARY KEY (company_id, id);
DELETE FROM companies;
DELETE FROM users;
DELETE FROM documents;
INSERT INTO companies(company_id, name)
SELECT id, md5(id::text) FROM generate_series(1, 10) c(id);
DO
$do$
BEGIN
FOR cid IN 1..10 LOOP
INSERT INTO users(id, company_id, name)
SELECT id, cid, md5(id::text) FROM generate_series(1, 100) u(id);
END LOOP;
END
$do$;
INSERT INTO documents(id, company_id, user_id, text)
SELECT id, trunc(random() * 10 + 1)::int, trunc(random() * 100 + 1)::int, md5(id::text)
FROM generate_series(1, 200000) p(id);
-- Basic aggregate + filter by sharding key
EXPLAIN (
ANALYZE,
VERBOSE
)
SELECT
count(*)
FROM
documents
WHERE
company_id = 5;
EXPLAIN (
ANALYZE,
VERBOSE
)
SELECT
count(*)
FROM
documents_node2
WHERE
company_id = 5;
-- Basic aggregate + filter by sharding key + fake group by -> then it is pushdown sometimes
EXPLAIN (
ANALYZE,
VERBOSE
)
SELECT
count(*)
FROM
documents
WHERE
company_id = 5
GROUP BY company_id;
EXPLAIN (
ANALYZE,
VERBOSE
)
SELECT
sum(id)
FROM
documents
WHERE
company_id = 5
GROUP BY company_id;
-- Basic join by secondary key + filter by sharding key
EXPLAIN (
ANALYZE,
VERBOSE
)
SELECT
*
FROM
documents
INNER JOIN users ON documents.user_id = users.id
WHERE
documents.company_id = 5
AND users.company_id = 5;
EXPLAIN (
ANALYZE,
VERBOSE
)
SELECT
*
FROM
documents_node2
INNER JOIN users_node2 ON documents_node2.user_id = users_node2.id
WHERE
documents_node2.company_id = 5
AND users_node2.company_id = 5;
-- Join by secondary key + aggregate + filter by sharding key
EXPLAIN (
ANALYZE,
VERBOSE
)
SELECT
count(*)
FROM
documents
INNER JOIN users ON documents.user_id = users.id
WHERE
documents.company_id = 5
AND users.company_id = 5;
EXPLAIN (
ANALYZE,
VERBOSE
)
SELECT
count(*)
FROM
documents_node2
INNER JOIN users_node2 ON documents_node2.user_id = users_node2.id
WHERE
documents_node2.company_id = 5
AND users_node2.company_id = 5;
-- Join by secondary key + aggregate + group by secondary key
EXPLAIN (
ANALYZE,
VERBOSE
)
SELECT
user_id,
count(*) AS documents_count
FROM
documents
INNER JOIN users ON documents.user_id = users.id
WHERE
documents.company_id = 5
AND users.company_id = 5
GROUP BY
user_id;
EXPLAIN (
ANALYZE,
VERBOSE
)
SELECT
user_id,
count(*) AS documents_count
FROM
documents_node2
INNER JOIN users_node2 ON documents_node2.user_id = users_node2.id
WHERE
documents_node2.company_id = 5
AND users_node2.company_id = 5
GROUP BY
user_id;
#!/usr/bin/env sh
pg_ctl -D node1 stop > /dev/null
pg_ctl -D node2 stop > /dev/null
rm -rf node1 node2
rm node1.log node2.log
initdb -D node1
initdb -D node2
echo 'port = 5433' >> node2/postgresql.conf
echo 'enable_partitionwise_join = 1' >> node1/postgresql.conf
echo 'enable_partitionwise_aggregate = 1' >> node1/postgresql.conf
echo 'postgres_fdw.use_remote_estimate = 1' >> node1/postgresql.conf
echo 'enable_partitionwise_join = 1' >> node2/postgresql.conf
echo 'enable_partitionwise_aggregate = 1' >> node2/postgresql.conf
echo 'postgres_fdw.use_remote_estimate = 1' >> node2/postgresql.conf
pg_ctl -D node1 -l node1.log start
pg_ctl -D node2 -l node2.log start
createdb
createdb -p5433
psql -f init1.sql
psql -p5433 -f init2.sql
psql -f load.sql
psql -c 'ANALYSE'
psql -p5433 -c 'ANALYSE'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment