Skip to content

Instantly share code, notes, and snippets.

@3manuek
Last active December 29, 2018 16:21
Show Gist options
  • Save 3manuek/0a6dec10cd796ec2e13f3f92eacf3642 to your computer and use it in GitHub Desktop.
Save 3manuek/0a6dec10cd796ec2e13f3f92eacf3642 to your computer and use it in GitHub Desktop.
Postgres sharding using FOREIGN DATA WRAPPERS and inheritance. [WIP]
# Sharding within Foreign Data Wrappers and Inheritance [WIP]
Previous sources:
https://www.depesz.com/2015/04/02/waiting-for-9-5-allow-foreign-tables-to-participate-in-inheritance/
http://snowman.net/slides/pgfdw_sharding.pdf
Postgres 10 next things:
https://wiki.postgresql.org/wiki/Built-in_Sharding
<logical replication>
How others do:
https://engineering.instagram.com/sharding-ids-at-instagram-1cf5a71e5a5c#.rcqj0jh5z
http://code.flickr.net/2010/02/08/ticket-servers-distributed-unique-primary-keys-on-the-cheap/
NOTE: Logical Replication is a Postgres 10 feature as shown in the current article.pglogical
implementation from 2ndquadrant is different although the concepts are roughly the same.
The idea here is to extend a bit more the concept and explore more diverse implementations like
multi end-points, investigate how this works operationally and the possibility of logical replication
for resharding and other tasks related to maintenance and operations.
Sharding by hashing or functional as shown by @depesz is potentially a PITA on resharding. Instead,
I'm going to approach in the most scalable way as possible on each of the techniques.
There are a couple of things that you can work toward a better solution for using hashing on your key.
You may be interested on the [Consistent Hashing](https://en.wikipedia.org/wiki/Consistent_hashing) concept,
which is a more efficient technique for key hashing. Solutions like [usharing](https://github.com/ultrabug/uhashring) based on [ketama[(https://github.com/RJ/ketama) are interesting for mapping hashs using a modular operation.
TODO: Example with https://github.com/stathat/consistent
## Types of keys for sharding:
### Deterministic key
Using a deterministic approach on the key, is the easiest way as your application logic will determine
the shard key (let's say your shard key is the customer region or the customer id).
### Dynamic Sharded Key
In the other hand you can link your shard key to a set of ids, storing them as metadata in the proxy database.
The shard key determines in which node will the data reside and this technique allows you the capability
of changing the shard by upadting the metadata. Most of the sharding tools have the id keys organized in chunks
in order to simplify the sharding scalability. This allows you to move data across nodes by moving the ids that
belong to a shard key, which can be done by linking 1:1 or using and id range.
## Sharded and non-sharded tables
It is a common mistake to start to think in a all-in sharded table setup, but at the end it turns that
in most of the cases it is not necessary and also is unpractical.
Sharding all the tables will require an extra effort and you problaby won't find that custom sharding
will solve. Generally, in most of the clasical setups, you will have no more than 5 tables that can represent
an 80% of the database size.
Keeping simplicity on smaller tables can be done with BDR (Bi-directional replication) or with [logical replication](https://2ndquadrant.com/en/resources/pglogical/pglogical-docs/).
## General
You can have more than one `proxy` database, as it does not persist the data. `proxy` database
can be placed in a stand alone instance or on each server.
For HA, SERVERs can point to slaves, but beware of this as you must need to change the
FOREIGN TABLE definition, otherwise writes won't work and reads will be stale.
The concept is very simple, proxy database holds the server definitions and user mappings, along
with the entry point (table main,without data) and the inherited foreign data wrappers (which do not store data).
This leaves the proxy database with _metadata_ only. Most importantly, it stores the partition
function and the trigger defition.
The real tables will be on each `shardXX` database, the location is up to the engineer.
This is not intended to be a production setup, it only address didactic purposes. Although,
for specific case scenarios could be interesting to see how this works.
# Combining both Foreign and Local tables
All the code for setting up the environments is published here:
https://gist.github.com/3manuek/52064a435f9a6c9e53d283af733af752
# Some sparkles and magic here
Ok, no let's become a bit more trickier. Let's imagine that we feed our Postgres from a Kafka
broker. For simplicity, we are not going to manage transactions on Kafka and will keep the
auto commit enabled.
`kafkacat` also supports custom formatting, so you are able to insert by column instead on a
single jsonb. In this example I'm using a more trickier way for didactic purposes, but in
real life you may want to keep straight columns for keys and split from payload messages.
Creating a topic,with 3 partitions:
```
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic PGSHARD
```
Automate content producing:
```
randtext() {cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 32 | head -n 1}
while (true) ; do for i in $(seq 1 5) ; do echo "$(uuidgen);$(randtext)"; done | kafkacat -P -b localhost:9092 -qe -K ';' -t PGSHARD -X group.id=1 ; sleep 2; done &
```
```
kafkacat -P -b localhost:9092 -qe -K -t PGSHARD -o {beginning|stored} -X group.id=1
```
First iteration will use the `beginning` offset, but subsequent executions must be changed to `stored` in order
to insert progressively.
```
COPY main(group_id,payload) FROM PROGRAM 'kafkacat -C -b localhost:9092 -qeJ -t PGSHARD -X group.id=1 -o beginning -p 0 | awk ''{print "p0\t"$0}'' ';
COPY main(group_id,payload) FROM PROGRAM 'kafkacat -C -b localhost:9092 -qeJ -t PGSHARD -X group.id=1 -o beginning -p 1 | awk ''{print "p1\t"$0}'' ';
COPY main(group_id,payload) FROM PROGRAM 'kafkacat -C -b localhost:9092 -qeJ -t PGSHARD -X group.id=1 -o beginning -p 2 | awk ''{print "p2\t"$0}'' ';
...
COPY main(group_id,payload) FROM PROGRAM 'kafkacat -C -b localhost:9092 -c100 -qeJ -t PGSHARD -X group.id=1 -o stored -p 0 | awk ''{print "p0\t"$0}'' ';
COPY main(group_id,payload) FROM PROGRAM 'kafkacat -C -b localhost:9092 -c100 -qeJ -t PGSHARD -X group.id=1 -o stored -p 1 | awk ''{print "p1\t"$0}'' ';
COPY main(group_id,payload) FROM PROGRAM 'kafkacat -C -b localhost:9092 -c100 -qeJ -t PGSHARD -X group.id=1 -o stored -p 2 | awk ''{print "p2\t"$0}'' ';
```
Checking the contents across the cluster:
```
shard0=# select group_id, tableoid,count(*) from main group by group_id,tableoid;
group_id | tableoid | count
----------+----------+-------
p0 | 16716 | 48
p2 | 16727 | 45
p1 | 16723 | 42
(3 rows)
...
shard0=# select group_id, tableoid,count(*) from main group by group_id,tableoid;
group_id | tableoid | count
----------+----------+-------
p0 | 16716 | 266
p2 | 16727 | 277
p1 | 16723 | 300
(3 rows)
```
Constraints can help to avoid scanning wrong partitions, but if we are looking for a single key,
Postgres will pushdown the conditions:
```
shard0=# explain (analyze,verbose) select payload from main where payload->>'key' = '8a3905bb-46c0-4ac3-8186-51f5c669d67e'; QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------
Append (cost=0.00..328.15 rows=16 width=41) (actual time=0.108..40.939 rows=1 loops=1)
-> Seq Scan on public.main (cost=0.00..0.00 rows=1 width=32) (actual time=0.013..0.013 rows=0 loops=1)
Output: main.payload
Filter: ((main.payload ->> 'key'::text) = '8a3905bb-46c0-4ac3-8186-51f5c669d67e'::text)
-> Index Scan using ix_main_shard_p0_key on public.main_shard_p0 (cost=0.27..8.29 rows=1 width=170) (actual time=0.092..0.097 rows=1 loops=1)
Output: main_shard_p0.payload
Index Cond: ((main_shard_p0.payload ->> 'key'::text) = '8a3905bb-46c0-4ac3-8186-51f5c669d67e'::text)
-> Foreign Scan on public.main_shard_p1 (cost=100.00..159.93 rows=7 width=32) (actual time=26.871..26.871 rows=0 loops=1)
Output: main_shard_p1.payload
Filter: ((main_shard_p1.payload ->> 'key'::text) = '8a3905bb-46c0-4ac3-8186-51f5c669d67e'::text)
Rows Removed by Filter: 300
Remote SQL: SELECT payload FROM public.main_shard_p1
-> Foreign Scan on public.main_shard_p2 (cost=100.00..159.93 rows=7 width=32) (actual time=13.950..13.950 rows=0 loops=1)
Output: main_shard_p2.payload
Filter: ((main_shard_p2.payload ->> 'key'::text) = '8a3905bb-46c0-4ac3-8186-51f5c669d67e'::text)
Rows Removed by Filter: 277
Remote SQL: SELECT payload FROM public.main_shard_p2
Planning time: 1.462 ms
Execution time: 43.507 ms
(19 rows)
```
TODO: test parallel workers for partitioning
TODO: partition by hash and use parallel abckground workers
http://dba.stackexchange.com/questions/164605/how-do-i-eliminate-a-second-seq-scan-over-a-table-when-deriving-a-new-table
https://speakerd.s3.amazonaws.com/presentations/a9351f0155fe4697b57f4913021501dd/How_PostgreSQL_is_tested.pdf
# On nodes you can also inherit on a local table for partitioning
# that will give you the possibility to move partitions around
```
CREATE TABLE main_shard01(shardKey char(2), key bigint, avalue text);
CREATE INDEX ON main_shard01(key);
CREATE TABLE main_shard02(shardKey char(2), key bigint, avalue text);
CREATE INDEX ON main_shard02(key);
```
# test
```
INSERT INTO main VALUES ('01',1,'trololol'),('01',2,random()::text),('02',2,random()::text);
```
# olala
```
proxy=# select tableoid,* from main;
tableoid | shardkey | key | avalue
----------+----------+-----+-------------------
16422 | 01 | 1 | trololol
16422 | 01 | 2 | 0.544912547804415
16426 | 02 | 2 | 0.459446560591459
(3 rows)
```
@3manuek
Copy link
Author

3manuek commented Jan 31, 2017

TODO:

  • Add indexes
  • Do more consistent tests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment