Last active
March 13, 2023 13:03
-
-
Save cabecada/6ff09966474dfdd2cd93bb479a3d86ec to your computer and use it in GitHub Desktop.
fun with citus extension
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
postgres@pg:~/citusdb$ more setup.sh | |
#!/bin/bash | |
export PATH=/usr/lib/postgresql/14/bin:$PATH | |
port=5432 | |
for i in db1 db2 db3 db4 | |
do | |
pg_ctl -D $i stop | |
rm -rf $i | |
initdb -D $i | |
echo "shared_preload_libraries = 'citus'" >> $i/postgresql.auto.conf | |
echo "port=$port" >> $i/postgresql.auto.conf | |
echo "wal_level=logical" >> $i/postgresql.auto.conf | |
port=$(( port + 1 )) | |
done | |
startdb1="pg_ctl -D db1 -l db1.log start" | |
stopdb1="pg_ctl -D db1 -l db1.log stop" | |
startdb2="pg_ctl -D db2 -l db2.log start" | |
stopdb2="pg_ctl -D db2 -l db2.log stop" | |
startdb3="pg_ctl -D db3 -l db3.log start" | |
stopdb3="pg_ctl -D db3 -l db3.log stop" | |
startdb4="pg_ctl -D db4 -l db4.log start" | |
stopdb4="pg_ctl -D db4 -l db4.log stop" | |
eval $startdb1 | |
eval $startdb2 | |
eval $startdb3 | |
eval $startdb4 | |
for p in 5432 5433 5434 5435 | |
do | |
createdb -p $p citusdb | |
psql -p $p -d citusdb -c "create extension citus;" | |
done | |
postgres@pg:~/citusdb$ bash setup.sh | |
waiting for server to shut down.... done | |
server stopped | |
The files belonging to this database system will be owned by user "postgres". | |
This user must also own the server process. | |
The database cluster will be initialized with locale "C.UTF-8". | |
The default database encoding has accordingly been set to "UTF8". | |
The default text search configuration will be set to "english". | |
Data page checksums are disabled. | |
creating directory db1 ... ok | |
creating subdirectories ... ok | |
selecting dynamic shared memory implementation ... posix | |
selecting default max_connections ... 100 | |
selecting default shared_buffers ... 128MB | |
selecting default time zone ... Asia/Kolkata | |
creating configuration files ... ok | |
running bootstrap script ... ok | |
performing post-bootstrap initialization ... ok | |
syncing data to disk ... ok | |
initdb: warning: enabling "trust" authentication for local connections | |
You can change this by editing pg_hba.conf or using the option -A, or | |
--auth-local and --auth-host, the next time you run initdb. | |
Success. You can now start the database server using: | |
pg_ctl -D db1 -l logfile start | |
waiting for server to shut down.... done | |
server stopped | |
The files belonging to this database system will be owned by user "postgres". | |
This user must also own the server process. | |
The database cluster will be initialized with locale "C.UTF-8". | |
The default database encoding has accordingly been set to "UTF8". | |
The default text search configuration will be set to "english". | |
Data page checksums are disabled. | |
creating directory db2 ... ok | |
creating subdirectories ... ok | |
selecting dynamic shared memory implementation ... posix | |
selecting default max_connections ... 100 | |
selecting default shared_buffers ... 128MB | |
selecting default time zone ... Asia/Kolkata | |
creating configuration files ... ok | |
running bootstrap script ... ok | |
performing post-bootstrap initialization ... ok | |
syncing data to disk ... ok | |
initdb: warning: enabling "trust" authentication for local connections | |
You can change this by editing pg_hba.conf or using the option -A, or | |
--auth-local and --auth-host, the next time you run initdb. | |
Success. You can now start the database server using: | |
pg_ctl -D db2 -l logfile start | |
waiting for server to shut down.... done | |
server stopped | |
The files belonging to this database system will be owned by user "postgres". | |
This user must also own the server process. | |
The database cluster will be initialized with locale "C.UTF-8". | |
The default database encoding has accordingly been set to "UTF8". | |
The default text search configuration will be set to "english". | |
Data page checksums are disabled. | |
creating directory db3 ... ok | |
creating subdirectories ... ok | |
selecting dynamic shared memory implementation ... posix | |
selecting default max_connections ... 100 | |
selecting default shared_buffers ... 128MB | |
selecting default time zone ... Asia/Kolkata | |
creating configuration files ... ok | |
running bootstrap script ... ok | |
performing post-bootstrap initialization ... ok | |
syncing data to disk ... ok | |
initdb: warning: enabling "trust" authentication for local connections | |
You can change this by editing pg_hba.conf or using the option -A, or | |
--auth-local and --auth-host, the next time you run initdb. | |
Success. You can now start the database server using: | |
pg_ctl -D db3 -l logfile start | |
pg_ctl: PID file "db4/postmaster.pid" does not exist | |
Is server running? | |
The files belonging to this database system will be owned by user "postgres". | |
This user must also own the server process. | |
The database cluster will be initialized with locale "C.UTF-8". | |
The default database encoding has accordingly been set to "UTF8". | |
The default text search configuration will be set to "english". | |
Data page checksums are disabled. | |
creating directory db4 ... ok | |
creating subdirectories ... ok | |
selecting dynamic shared memory implementation ... posix | |
selecting default max_connections ... 100 | |
selecting default shared_buffers ... 128MB | |
selecting default time zone ... Asia/Kolkata | |
creating configuration files ... ok | |
running bootstrap script ... ok | |
performing post-bootstrap initialization ... ok | |
syncing data to disk ... ok | |
initdb: warning: enabling "trust" authentication for local connections | |
You can change this by editing pg_hba.conf or using the option -A, or | |
--auth-local and --auth-host, the next time you run initdb. | |
Success. You can now start the database server using: | |
pg_ctl -D db4 -l logfile start | |
waiting for server to start.... done | |
server started | |
waiting for server to start.... done | |
server started | |
waiting for server to start.... done | |
server started | |
waiting for server to start.... done | |
server started | |
CREATE EXTENSION | |
CREATE EXTENSION | |
CREATE EXTENSION | |
CREATE EXTENSION | |
----------------------------------------------------------------------- | |
postgres@pg:~/citusdb$ psql citusdb | |
psql (14.5 (Ubuntu 14.5-2.pgdg22.04+2)) | |
Type "help" for help. | |
citusdb=# \s | |
citusdb=# | |
citusdb=# create table dist_t(dist_id bigint primary key); | |
create table ref_t(ref_id bigint primary key); | |
insert into dist_t select generate_series(1, 10000) x; | |
insert into ref_t select generate_series(1, 100) x; | |
SELECT * from master_add_node('localhost', 5433); | |
SELECT * from master_add_node('localhost', 5434); | |
SELECT * FROM master_get_active_worker_nodes(); | |
SELECT create_distributed_table('dist_t', 'dist_id'); | |
SELECT truncate_local_data_after_distributing_table($$public.dist_t$$); | |
select count(1) from dist_t; | |
SELECT create_reference_table('ref_t'); | |
SELECT truncate_local_data_after_distributing_table($$public.ref_t$$); | |
alter table dist_t add column dist_col1 int; | |
alter table ref_t add column ref_col1 int; | |
update dist_t set dist_col1 = 0; | |
update ref_t set ref_col1 = 0; | |
create index on dist_t (dist_col1); | |
create index on ref_t (ref_col1); | |
CREATE TABLE | |
CREATE TABLE | |
INSERT 0 10000 | |
INSERT 0 100 | |
master_add_node | |
----------------- | |
1 | |
(1 row) | |
master_add_node | |
----------------- | |
2 | |
(1 row) | |
node_name | node_port | |
-----------+----------- | |
localhost | 5434 | |
localhost | 5433 | |
(2 rows) | |
NOTICE: Copying data from local table... | |
NOTICE: copying the data has completed | |
DETAIL: The local data in the table is no longer visible, but is still on disk. | |
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.dist_t$$) | |
create_distributed_table | |
-------------------------- | |
(1 row) | |
truncate_local_data_after_distributing_table | |
---------------------------------------------- | |
(1 row) | |
count | |
------- | |
10000 | |
(1 row) | |
NOTICE: Copying data from local table... | |
NOTICE: copying the data has completed | |
DETAIL: The local data in the table is no longer visible, but is still on disk. | |
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.ref_t$$) | |
create_reference_table | |
------------------------ | |
(1 row) | |
truncate_local_data_after_distributing_table | |
---------------------------------------------- | |
(1 row) | |
ALTER TABLE | |
ALTER TABLE | |
UPDATE 10000 | |
UPDATE 100 | |
CREATE INDEX | |
CREATE INDEX | |
citusdb=# SELECT * from master_add_node('localhost', 5435); | |
master_add_node | |
----------------- | |
3 | |
(1 row) | |
citusdb=# SELECT rebalance_table_shards('dist_t'); | |
NOTICE: replicating reference table 'ref_t' to localhost:5435 ... | |
NOTICE: Moving shard 102009 from localhost:5434 to localhost:5435 ... | |
NOTICE: Moving shard 102008 from localhost:5433 to localhost:5435 ... | |
NOTICE: Moving shard 102011 from localhost:5434 to localhost:5435 ... | |
NOTICE: Moving shard 102010 from localhost:5433 to localhost:5435 ... | |
NOTICE: Moving shard 102013 from localhost:5434 to localhost:5435 ... | |
NOTICE: Moving shard 102012 from localhost:5433 to localhost:5435 ... | |
NOTICE: Moving shard 102015 from localhost:5434 to localhost:5435 ... | |
NOTICE: Moving shard 102014 from localhost:5433 to localhost:5435 ... | |
NOTICE: Moving shard 102017 from localhost:5434 to localhost:5435 ... | |
NOTICE: Moving shard 102016 from localhost:5433 to localhost:5435 ... | |
rebalance_table_shards | |
------------------------ | |
(1 row) | |
citusdb=# SELECT isolate_tenant_to_new_shard('dist_t', 5, 'dist_id'); | |
isolate_tenant_to_new_shard | |
----------------------------- | |
102042 | |
(1 row) | |
citusdb=# SELECT nodename, nodeport | |
FROM pg_dist_placement AS placement, | |
pg_dist_node AS node | |
WHERE placement.groupid = node.groupid | |
AND node.noderole = 'primary' | |
AND shardid = 102042; | |
nodename | nodeport | |
-----------+---------- | |
localhost | 5435 | |
(1 row) | |
citusdb=# SELECT master_move_shard_placement(102042,'source_host',5435, 'dest_host', 5433); | |
ERROR: Moving shards to a non-existing node is not supported | |
HINT: Add the target node via SELECT citus_add_node('dest_host', 5433); | |
citusdb=# SELECT master_move_shard_placement(102042,'localhost',5435, 'localhost', 5433); | |
master_move_shard_placement | |
----------------------------- | |
(1 row) | |
citusdb=# SELECT nodename, nodeport | |
FROM pg_dist_placement AS placement, | |
pg_dist_node AS node | |
WHERE placement.groupid = node.groupid | |
AND node.noderole = 'primary' | |
AND shardid = 102042; | |
nodename | nodeport | |
-----------+---------- | |
localhost | 5433 | |
(1 row) | |
citusdb=# SELECT * from citus_drain_node('localhost', 5435); | |
NOTICE: Moving shard 102008 from localhost:5435 to localhost:5433 ... | |
NOTICE: Moving shard 102009 from localhost:5435 to localhost:5434 ... | |
NOTICE: Moving shard 102010 from localhost:5435 to localhost:5433 ... | |
NOTICE: Moving shard 102011 from localhost:5435 to localhost:5434 ... | |
NOTICE: Moving shard 102012 from localhost:5435 to localhost:5433 ... | |
NOTICE: Moving shard 102013 from localhost:5435 to localhost:5434 ... | |
NOTICE: Moving shard 102015 from localhost:5435 to localhost:5433 ... | |
NOTICE: Moving shard 102016 from localhost:5435 to localhost:5434 ... | |
NOTICE: Moving shard 102017 from localhost:5435 to localhost:5433 ... | |
NOTICE: Moving shard 102041 from localhost:5435 to localhost:5434 ... | |
NOTICE: Moving shard 102043 from localhost:5435 to localhost:5433 ... | |
citus_drain_node | |
------------------ | |
(1 row) | |
citusdb=# SELECT * from citus_remove_node('localhost', 5435); | |
citus_remove_node | |
------------------- | |
(1 row) | |
citusdb=# SELECT * from master_add_node('localhost', 5435); | |
master_add_node | |
----------------- | |
4 | |
(1 row) | |
citusdb=# SELECT rebalance_table_shards('dist_t'); | |
NOTICE: replicating reference table 'ref_t' to localhost:5435 ... | |
NOTICE: Moving shard 102008 from localhost:5433 to localhost:5435 ... | |
NOTICE: Moving shard 102010 from localhost:5433 to localhost:5435 ... | |
NOTICE: Moving shard 102009 from localhost:5434 to localhost:5435 ... | |
NOTICE: Moving shard 102012 from localhost:5433 to localhost:5435 ... | |
NOTICE: Moving shard 102011 from localhost:5434 to localhost:5435 ... | |
NOTICE: Moving shard 102015 from localhost:5433 to localhost:5435 ... | |
NOTICE: Moving shard 102013 from localhost:5434 to localhost:5435 ... | |
NOTICE: Moving shard 102017 from localhost:5433 to localhost:5435 ... | |
NOTICE: Moving shard 102016 from localhost:5434 to localhost:5435 ... | |
NOTICE: Moving shard 102018 from localhost:5433 to localhost:5435 ... | |
NOTICE: Moving shard 102019 from localhost:5434 to localhost:5435 ... | |
rebalance_table_shards | |
------------------------ | |
(1 row) | |
citusdb=# explain analyze select * from dist_t where dist_id = 5; | |
QUERY PLAN | |
--------------------------------------------------------------------------------------------------------------------------- | |
Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) (actual time=1.076..1.077 rows=1 loops=1) | |
Task Count: 1 | |
Tuple data received from nodes: 12 bytes | |
Tasks Shown: All | |
-> Task | |
Tuple data received from node: 12 bytes | |
Node: host=localhost port=5433 dbname=citusdb | |
-> Seq Scan on dist_t_102042 dist_t (cost=0.00..1.01 rows=1 width=12) (actual time=0.005..0.005 rows=1 loops=1) | |
Filter: (dist_id = 5) | |
Planning Time: 0.147 ms | |
Execution Time: 0.014 ms | |
Planning Time: 0.082 ms | |
Execution Time: 1.088 ms | |
(13 rows) | |
citusdb=# SELECT run_command_on_workers($cmd$ vacuum analyze dist_t $cmd$); | |
run_command_on_workers | |
--------------------------- | |
(localhost,5433,t,VACUUM) | |
(localhost,5434,t,VACUUM) | |
(localhost,5435,t,VACUUM) | |
(3 rows) | |
citusdb=# explain analyze select * from dist_t where dist_id = 5; | |
QUERY PLAN | |
--------------------------------------------------------------------------------------------------------------------------- | |
Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) (actual time=9.078..9.078 rows=1 loops=1) | |
Task Count: 1 | |
Tuple data received from nodes: 12 bytes | |
Tasks Shown: All | |
-> Task | |
Tuple data received from node: 12 bytes | |
Node: host=localhost port=5433 dbname=citusdb | |
-> Seq Scan on dist_t_102042 dist_t (cost=0.00..1.01 rows=1 width=12) (actual time=0.004..0.004 rows=1 loops=1) | |
Filter: (dist_id = 5) | |
Planning Time: 0.269 ms | |
Execution Time: 0.015 ms | |
Planning Time: 0.085 ms | |
Execution Time: 9.091 ms | |
(13 rows) | |
citusdb=# create index on dist_t(dist_id); | |
CREATE INDEX | |
citusdb=# SELECT run_command_on_workers($cmd$ vacuum analyze dist_t $cmd$); | |
run_command_on_workers | |
--------------------------- | |
(localhost,5433,t,VACUUM) | |
(localhost,5434,t,VACUUM) | |
(localhost,5435,t,VACUUM) | |
(3 rows) | |
citusdb=# explain analyze select * from dist_t where dist_id = 5; | |
QUERY PLAN | |
--------------------------------------------------------------------------------------------------------------------------- | |
Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) (actual time=1.019..1.020 rows=1 loops=1) | |
Task Count: 1 | |
Tuple data received from nodes: 12 bytes | |
Tasks Shown: All | |
-> Task | |
Tuple data received from node: 12 bytes | |
Node: host=localhost port=5433 dbname=citusdb | |
-> Seq Scan on dist_t_102042 dist_t (cost=0.00..1.01 rows=1 width=12) (actual time=0.005..0.005 rows=1 loops=1) | |
Filter: (dist_id = 5) | |
Planning Time: 0.145 ms | |
Execution Time: 0.015 ms | |
Planning Time: 0.283 ms | |
Execution Time: 1.036 ms | |
(13 rows) | |
citusdb=# | |
\q | |
#fun with cdc | |
citusdb=# SELECT * from citus_drain_node('localhost', 5435); | |
NOTICE: Moving shard 102008 from localhost:5435 to localhost:5433 ... | |
NOTICE: Moving shard 102009 from localhost:5435 to localhost:5434 ... | |
NOTICE: Moving shard 102010 from localhost:5435 to localhost:5433 ... | |
NOTICE: Moving shard 102011 from localhost:5435 to localhost:5434 ... | |
NOTICE: Moving shard 102012 from localhost:5435 to localhost:5433 ... | |
NOTICE: Moving shard 102013 from localhost:5435 to localhost:5434 ... | |
NOTICE: Moving shard 102015 from localhost:5435 to localhost:5433 ... | |
NOTICE: Moving shard 102016 from localhost:5435 to localhost:5434 ... | |
NOTICE: Moving shard 102017 from localhost:5435 to localhost:5433 ... | |
NOTICE: Moving shard 102018 from localhost:5435 to localhost:5434 ... | |
NOTICE: Moving shard 102019 from localhost:5435 to localhost:5433 ... | |
citus_drain_node | |
------------------ | |
these changes now get pushed via cdc which is just noise :( | |
postgres@pg:~$ pg_recvlogical -d citusdb -p 5433 --slot ldb3 --start --create-slot -f - | |
BEGIN 1038 | |
table pg_catalog.pg_dist_node: UPDATE: nodeid[integer]:4 groupid[integer]:4 nodename[text]:'localhost' nodeport[integer]:5435 noderack[text]:'default' hasmetadata[boolean]:true isactive[boolean]:true noderole[noderole]:'primary' nodecluster[name]:'default' metadatasynced[boolean]:true shouldhaveshards[boolean]:false | |
COMMIT 1038 | |
BEGIN 1039 | |
COMMIT 1039 | |
BEGIN 1040 | |
COMMIT 1040 | |
BEGIN 1041 | |
COMMIT 1041 | |
BEGIN 1042 | |
COMMIT 1042 | |
BEGIN 1043 | |
COMMIT 1043 | |
BEGIN 1044 | |
COMMIT 1044 | |
BEGIN 1045 | |
table public.dist_t_102008: INSERT: dist_id[bigint]:8 dist_col1[integer]:0 | |
table public.dist_t_102008: INSERT: dist_id[bigint]:20 dist_col1[integer]:0 | |
table public.dist_t_102008: INSERT: dist_id[bigint]:60 dist_col1[integer]:0 | |
table public.dist_t_102008: INSERT: dist_id[bigint]:132 dist_col1[integer]:0 | |
table public.dist_t_102008: INSERT: dist_id[bigint]:138 dist_col1[integer]:0 | |
table public.dist_t_102008: INSERT: dist_id[bigint]:139 dist_col1[integer]:0 | |
.... | |
postgres@pg:~$ pg_recvlogical -d citusdb -p 5434 --slot ldb2 --start --create-slot -f - | |
BEGIN 998 | |
table pg_catalog.pg_dist_node: UPDATE: nodeid[integer]:4 groupid[integer]:4 nodename[text]:'localhost' nodeport[integer]:5435 noderack[text]:'default' hasmetadata[boolean]:true isactive[boolean]:true noderole[noderole]:'primary' nodecluster[name]:'default' metadatasynced[boolean]:true shouldhaveshards[boolean]:false | |
COMMIT 998 | |
BEGIN 999 | |
table pg_catalog.pg_dist_placement: UPDATE: placementid[bigint]:1 shardid[bigint]:102008 shardstate[integer]:1 shardlength[bigint]:0 groupid[integer]:1 | |
COMMIT 999 | |
BEGIN 1000 | |
COMMIT 1000 | |
BEGIN 1001 | |
COMMIT 1001 | |
BEGIN 1002 | |
COMMIT 1002 | |
BEGIN 1003 | |
COMMIT 1003 | |
BEGIN 1004 | |
COMMIT 1004 | |
BEGIN 1005 | |
COMMIT 1005 | |
BEGIN 1006 | |
table public.dist_t_102009: INSERT: dist_id[bigint]:1 dist_col1[integer]:0 | |
table public.dist_t_102009: INSERT: dist_id[bigint]:15 dist_col1[integer]:0 | |
table public.dist_t_102009: INSERT: dist_id[bigint]:25 dist_col1[integer]:0 | |
table public.dist_t_102009: INSERT: dist_id[bigint]:63 dist_col1[integer]:0 | |
table public.dist_t_102009: INSERT: dist_id[bigint]:82 dist_col1[integer]:0 | |
table public.dist_t_102009: INSERT: dist_id[bigint]:118 dist_col1[integer]:0 | |
table public.dist_t_102009: INSERT: dist_id[bigint]:205 dist_col1[integer]:0 | |
table public.dist_t_102009: INSERT: dist_id[bigint]:210 dist_col1[integer]:0 | |
table public.dist_t_102009: INSERT: dist_id[bigint]:218 dist_col1[integer]:0 | |
table public.dist_t_102009: INSERT: dist_id[bigint]:220 dist_col1[integer]:0 | |
table public.dist_t_102009: INSERT: dist_id[bigint]:230 dist_col1[integer]:0 | |
table public.dist_t_102009: INSERT: dist_id[bigint]:248 dist_col1[integer]:0 | |
table public.dist_t_102009: INSERT: dist_id[bigint]:259 dist_col1[integer]:0 | |
table public.dist_t_102009: INSERT: dist_id[bigint]:303 dist_col1[integer]:0 | |
table public.dist_t_102009: INSERT: dist_id[bigint]:313 dist_col1[integer]:0 | |
table public.dist_t_102009: INSERT: dist_id[bigint]:361 dist_col1[integer]:0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment