Skip to content

Instantly share code, notes, and snippets.

@cabecada
Last active March 13, 2023 13:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cabecada/6ff09966474dfdd2cd93bb479a3d86ec to your computer and use it in GitHub Desktop.
Save cabecada/6ff09966474dfdd2cd93bb479a3d86ec to your computer and use it in GitHub Desktop.
fun with citus extension
postgres@pg:~/citusdb$ more setup.sh
#!/bin/bash
export PATH=/usr/lib/postgresql/14/bin:$PATH
port=5432
for i in db1 db2 db3 db4
do
pg_ctl -D $i stop
rm -rf $i
initdb -D $i
echo "shared_preload_libraries = 'citus'" >> $i/postgresql.auto.conf
echo "port=$port" >> $i/postgresql.auto.conf
echo "wal_level=logical" >> $i/postgresql.auto.conf
port=$(( port + 1 ))
done
startdb1="pg_ctl -D db1 -l db1.log start"
stopdb1="pg_ctl -D db1 -l db1.log stop"
startdb2="pg_ctl -D db2 -l db2.log start"
stopdb2="pg_ctl -D db2 -l db2.log stop"
startdb3="pg_ctl -D db3 -l db3.log start"
stopdb3="pg_ctl -D db3 -l db3.log stop"
startdb4="pg_ctl -D db4 -l db4.log start"
stopdb4="pg_ctl -D db4 -l db4.log stop"
eval $startdb1
eval $startdb2
eval $startdb3
eval $startdb4
for p in 5432 5433 5434 5435
do
createdb -p $p citusdb
psql -p $p -d citusdb -c "create extension citus;"
done
postgres@pg:~/citusdb$ bash setup.sh
waiting for server to shut down.... done
server stopped
The files belonging to this database system will be owned by user "postgres".
This user must also own the server process.
The database cluster will be initialized with locale "C.UTF-8".
The default database encoding has accordingly been set to "UTF8".
The default text search configuration will be set to "english".
Data page checksums are disabled.
creating directory db1 ... ok
creating subdirectories ... ok
selecting dynamic shared memory implementation ... posix
selecting default max_connections ... 100
selecting default shared_buffers ... 128MB
selecting default time zone ... Asia/Kolkata
creating configuration files ... ok
running bootstrap script ... ok
performing post-bootstrap initialization ... ok
syncing data to disk ... ok
initdb: warning: enabling "trust" authentication for local connections
You can change this by editing pg_hba.conf or using the option -A, or
--auth-local and --auth-host, the next time you run initdb.
Success. You can now start the database server using:
pg_ctl -D db1 -l logfile start
waiting for server to shut down.... done
server stopped
The files belonging to this database system will be owned by user "postgres".
This user must also own the server process.
The database cluster will be initialized with locale "C.UTF-8".
The default database encoding has accordingly been set to "UTF8".
The default text search configuration will be set to "english".
Data page checksums are disabled.
creating directory db2 ... ok
creating subdirectories ... ok
selecting dynamic shared memory implementation ... posix
selecting default max_connections ... 100
selecting default shared_buffers ... 128MB
selecting default time zone ... Asia/Kolkata
creating configuration files ... ok
running bootstrap script ... ok
performing post-bootstrap initialization ... ok
syncing data to disk ... ok
initdb: warning: enabling "trust" authentication for local connections
You can change this by editing pg_hba.conf or using the option -A, or
--auth-local and --auth-host, the next time you run initdb.
Success. You can now start the database server using:
pg_ctl -D db2 -l logfile start
waiting for server to shut down.... done
server stopped
The files belonging to this database system will be owned by user "postgres".
This user must also own the server process.
The database cluster will be initialized with locale "C.UTF-8".
The default database encoding has accordingly been set to "UTF8".
The default text search configuration will be set to "english".
Data page checksums are disabled.
creating directory db3 ... ok
creating subdirectories ... ok
selecting dynamic shared memory implementation ... posix
selecting default max_connections ... 100
selecting default shared_buffers ... 128MB
selecting default time zone ... Asia/Kolkata
creating configuration files ... ok
running bootstrap script ... ok
performing post-bootstrap initialization ... ok
syncing data to disk ... ok
initdb: warning: enabling "trust" authentication for local connections
You can change this by editing pg_hba.conf or using the option -A, or
--auth-local and --auth-host, the next time you run initdb.
Success. You can now start the database server using:
pg_ctl -D db3 -l logfile start
pg_ctl: PID file "db4/postmaster.pid" does not exist
Is server running?
The files belonging to this database system will be owned by user "postgres".
This user must also own the server process.
The database cluster will be initialized with locale "C.UTF-8".
The default database encoding has accordingly been set to "UTF8".
The default text search configuration will be set to "english".
Data page checksums are disabled.
creating directory db4 ... ok
creating subdirectories ... ok
selecting dynamic shared memory implementation ... posix
selecting default max_connections ... 100
selecting default shared_buffers ... 128MB
selecting default time zone ... Asia/Kolkata
creating configuration files ... ok
running bootstrap script ... ok
performing post-bootstrap initialization ... ok
syncing data to disk ... ok
initdb: warning: enabling "trust" authentication for local connections
You can change this by editing pg_hba.conf or using the option -A, or
--auth-local and --auth-host, the next time you run initdb.
Success. You can now start the database server using:
pg_ctl -D db4 -l logfile start
waiting for server to start.... done
server started
waiting for server to start.... done
server started
waiting for server to start.... done
server started
waiting for server to start.... done
server started
CREATE EXTENSION
CREATE EXTENSION
CREATE EXTENSION
CREATE EXTENSION
-----------------------------------------------------------------------
postgres@pg:~/citusdb$ psql citusdb
psql (14.5 (Ubuntu 14.5-2.pgdg22.04+2))
Type "help" for help.
citusdb=# \s
citusdb=#
citusdb=# create table dist_t(dist_id bigint primary key);
create table ref_t(ref_id bigint primary key);
insert into dist_t select generate_series(1, 10000) x;
insert into ref_t select generate_series(1, 100) x;
SELECT * from master_add_node('localhost', 5433);
SELECT * from master_add_node('localhost', 5434);
SELECT * FROM master_get_active_worker_nodes();
SELECT create_distributed_table('dist_t', 'dist_id');
SELECT truncate_local_data_after_distributing_table($$public.dist_t$$);
select count(1) from dist_t;
SELECT create_reference_table('ref_t');
SELECT truncate_local_data_after_distributing_table($$public.ref_t$$);
alter table dist_t add column dist_col1 int;
alter table ref_t add column ref_col1 int;
update dist_t set dist_col1 = 0;
update ref_t set ref_col1 = 0;
create index on dist_t (dist_col1);
create index on ref_t (ref_col1);
CREATE TABLE
CREATE TABLE
INSERT 0 10000
INSERT 0 100
master_add_node
-----------------
1
(1 row)
master_add_node
-----------------
2
(1 row)
node_name | node_port
-----------+-----------
localhost | 5434
localhost | 5433
(2 rows)
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.dist_t$$)
create_distributed_table
--------------------------
(1 row)
truncate_local_data_after_distributing_table
----------------------------------------------
(1 row)
count
-------
10000
(1 row)
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.ref_t$$)
create_reference_table
------------------------
(1 row)
truncate_local_data_after_distributing_table
----------------------------------------------
(1 row)
ALTER TABLE
ALTER TABLE
UPDATE 10000
UPDATE 100
CREATE INDEX
CREATE INDEX
citusdb=# SELECT * from master_add_node('localhost', 5435);
master_add_node
-----------------
3
(1 row)
citusdb=# SELECT rebalance_table_shards('dist_t');
NOTICE: replicating reference table 'ref_t' to localhost:5435 ...
NOTICE: Moving shard 102009 from localhost:5434 to localhost:5435 ...
NOTICE: Moving shard 102008 from localhost:5433 to localhost:5435 ...
NOTICE: Moving shard 102011 from localhost:5434 to localhost:5435 ...
NOTICE: Moving shard 102010 from localhost:5433 to localhost:5435 ...
NOTICE: Moving shard 102013 from localhost:5434 to localhost:5435 ...
NOTICE: Moving shard 102012 from localhost:5433 to localhost:5435 ...
NOTICE: Moving shard 102015 from localhost:5434 to localhost:5435 ...
NOTICE: Moving shard 102014 from localhost:5433 to localhost:5435 ...
NOTICE: Moving shard 102017 from localhost:5434 to localhost:5435 ...
NOTICE: Moving shard 102016 from localhost:5433 to localhost:5435 ...
rebalance_table_shards
------------------------
(1 row)
citusdb=# SELECT isolate_tenant_to_new_shard('dist_t', 5, 'dist_id');
isolate_tenant_to_new_shard
-----------------------------
102042
(1 row)
citusdb=# SELECT nodename, nodeport
FROM pg_dist_placement AS placement,
pg_dist_node AS node
WHERE placement.groupid = node.groupid
AND node.noderole = 'primary'
AND shardid = 102042;
nodename | nodeport
-----------+----------
localhost | 5435
(1 row)
citusdb=# SELECT master_move_shard_placement(102042,'source_host',5435, 'dest_host', 5433);
ERROR: Moving shards to a non-existing node is not supported
HINT: Add the target node via SELECT citus_add_node('dest_host', 5433);
citusdb=# SELECT master_move_shard_placement(102042,'localhost',5435, 'localhost', 5433);
master_move_shard_placement
-----------------------------
(1 row)
citusdb=# SELECT nodename, nodeport
FROM pg_dist_placement AS placement,
pg_dist_node AS node
WHERE placement.groupid = node.groupid
AND node.noderole = 'primary'
AND shardid = 102042;
nodename | nodeport
-----------+----------
localhost | 5433
(1 row)
citusdb=# SELECT * from citus_drain_node('localhost', 5435);
NOTICE: Moving shard 102008 from localhost:5435 to localhost:5433 ...
NOTICE: Moving shard 102009 from localhost:5435 to localhost:5434 ...
NOTICE: Moving shard 102010 from localhost:5435 to localhost:5433 ...
NOTICE: Moving shard 102011 from localhost:5435 to localhost:5434 ...
NOTICE: Moving shard 102012 from localhost:5435 to localhost:5433 ...
NOTICE: Moving shard 102013 from localhost:5435 to localhost:5434 ...
NOTICE: Moving shard 102015 from localhost:5435 to localhost:5433 ...
NOTICE: Moving shard 102016 from localhost:5435 to localhost:5434 ...
NOTICE: Moving shard 102017 from localhost:5435 to localhost:5433 ...
NOTICE: Moving shard 102041 from localhost:5435 to localhost:5434 ...
NOTICE: Moving shard 102043 from localhost:5435 to localhost:5433 ...
citus_drain_node
------------------
(1 row)
citusdb=# SELECT * from citus_remove_node('localhost', 5435);
citus_remove_node
-------------------
(1 row)
citusdb=# SELECT * from master_add_node('localhost', 5435);
master_add_node
-----------------
4
(1 row)
citusdb=# SELECT rebalance_table_shards('dist_t');
NOTICE: replicating reference table 'ref_t' to localhost:5435 ...
NOTICE: Moving shard 102008 from localhost:5433 to localhost:5435 ...
NOTICE: Moving shard 102010 from localhost:5433 to localhost:5435 ...
NOTICE: Moving shard 102009 from localhost:5434 to localhost:5435 ...
NOTICE: Moving shard 102012 from localhost:5433 to localhost:5435 ...
NOTICE: Moving shard 102011 from localhost:5434 to localhost:5435 ...
NOTICE: Moving shard 102015 from localhost:5433 to localhost:5435 ...
NOTICE: Moving shard 102013 from localhost:5434 to localhost:5435 ...
NOTICE: Moving shard 102017 from localhost:5433 to localhost:5435 ...
NOTICE: Moving shard 102016 from localhost:5434 to localhost:5435 ...
NOTICE: Moving shard 102018 from localhost:5433 to localhost:5435 ...
NOTICE: Moving shard 102019 from localhost:5434 to localhost:5435 ...
rebalance_table_shards
------------------------
(1 row)
citusdb=# explain analyze select * from dist_t where dist_id = 5;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------
Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) (actual time=1.076..1.077 rows=1 loops=1)
Task Count: 1
Tuple data received from nodes: 12 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 12 bytes
Node: host=localhost port=5433 dbname=citusdb
-> Seq Scan on dist_t_102042 dist_t (cost=0.00..1.01 rows=1 width=12) (actual time=0.005..0.005 rows=1 loops=1)
Filter: (dist_id = 5)
Planning Time: 0.147 ms
Execution Time: 0.014 ms
Planning Time: 0.082 ms
Execution Time: 1.088 ms
(13 rows)
citusdb=# SELECT run_command_on_workers($cmd$ vacuum analyze dist_t $cmd$);
run_command_on_workers
---------------------------
(localhost,5433,t,VACUUM)
(localhost,5434,t,VACUUM)
(localhost,5435,t,VACUUM)
(3 rows)
citusdb=# explain analyze select * from dist_t where dist_id = 5;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------
Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) (actual time=9.078..9.078 rows=1 loops=1)
Task Count: 1
Tuple data received from nodes: 12 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 12 bytes
Node: host=localhost port=5433 dbname=citusdb
-> Seq Scan on dist_t_102042 dist_t (cost=0.00..1.01 rows=1 width=12) (actual time=0.004..0.004 rows=1 loops=1)
Filter: (dist_id = 5)
Planning Time: 0.269 ms
Execution Time: 0.015 ms
Planning Time: 0.085 ms
Execution Time: 9.091 ms
(13 rows)
citusdb=# create index on dist_t(dist_id);
CREATE INDEX
citusdb=# SELECT run_command_on_workers($cmd$ vacuum analyze dist_t $cmd$);
run_command_on_workers
---------------------------
(localhost,5433,t,VACUUM)
(localhost,5434,t,VACUUM)
(localhost,5435,t,VACUUM)
(3 rows)
citusdb=# explain analyze select * from dist_t where dist_id = 5;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------
Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) (actual time=1.019..1.020 rows=1 loops=1)
Task Count: 1
Tuple data received from nodes: 12 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 12 bytes
Node: host=localhost port=5433 dbname=citusdb
-> Seq Scan on dist_t_102042 dist_t (cost=0.00..1.01 rows=1 width=12) (actual time=0.005..0.005 rows=1 loops=1)
Filter: (dist_id = 5)
Planning Time: 0.145 ms
Execution Time: 0.015 ms
Planning Time: 0.283 ms
Execution Time: 1.036 ms
(13 rows)
citusdb=#
\q
#fun with cdc
citusdb=# SELECT * from citus_drain_node('localhost', 5435);
NOTICE: Moving shard 102008 from localhost:5435 to localhost:5433 ...
NOTICE: Moving shard 102009 from localhost:5435 to localhost:5434 ...
NOTICE: Moving shard 102010 from localhost:5435 to localhost:5433 ...
NOTICE: Moving shard 102011 from localhost:5435 to localhost:5434 ...
NOTICE: Moving shard 102012 from localhost:5435 to localhost:5433 ...
NOTICE: Moving shard 102013 from localhost:5435 to localhost:5434 ...
NOTICE: Moving shard 102015 from localhost:5435 to localhost:5433 ...
NOTICE: Moving shard 102016 from localhost:5435 to localhost:5434 ...
NOTICE: Moving shard 102017 from localhost:5435 to localhost:5433 ...
NOTICE: Moving shard 102018 from localhost:5435 to localhost:5434 ...
NOTICE: Moving shard 102019 from localhost:5435 to localhost:5433 ...
citus_drain_node
------------------
these changes now get pushed via cdc which is just noise :(
postgres@pg:~$ pg_recvlogical -d citusdb -p 5433 --slot ldb3 --start --create-slot -f -
BEGIN 1038
table pg_catalog.pg_dist_node: UPDATE: nodeid[integer]:4 groupid[integer]:4 nodename[text]:'localhost' nodeport[integer]:5435 noderack[text]:'default' hasmetadata[boolean]:true isactive[boolean]:true noderole[noderole]:'primary' nodecluster[name]:'default' metadatasynced[boolean]:true shouldhaveshards[boolean]:false
COMMIT 1038
BEGIN 1039
COMMIT 1039
BEGIN 1040
COMMIT 1040
BEGIN 1041
COMMIT 1041
BEGIN 1042
COMMIT 1042
BEGIN 1043
COMMIT 1043
BEGIN 1044
COMMIT 1044
BEGIN 1045
table public.dist_t_102008: INSERT: dist_id[bigint]:8 dist_col1[integer]:0
table public.dist_t_102008: INSERT: dist_id[bigint]:20 dist_col1[integer]:0
table public.dist_t_102008: INSERT: dist_id[bigint]:60 dist_col1[integer]:0
table public.dist_t_102008: INSERT: dist_id[bigint]:132 dist_col1[integer]:0
table public.dist_t_102008: INSERT: dist_id[bigint]:138 dist_col1[integer]:0
table public.dist_t_102008: INSERT: dist_id[bigint]:139 dist_col1[integer]:0
....
postgres@pg:~$ pg_recvlogical -d citusdb -p 5434 --slot ldb2 --start --create-slot -f -
BEGIN 998
table pg_catalog.pg_dist_node: UPDATE: nodeid[integer]:4 groupid[integer]:4 nodename[text]:'localhost' nodeport[integer]:5435 noderack[text]:'default' hasmetadata[boolean]:true isactive[boolean]:true noderole[noderole]:'primary' nodecluster[name]:'default' metadatasynced[boolean]:true shouldhaveshards[boolean]:false
COMMIT 998
BEGIN 999
table pg_catalog.pg_dist_placement: UPDATE: placementid[bigint]:1 shardid[bigint]:102008 shardstate[integer]:1 shardlength[bigint]:0 groupid[integer]:1
COMMIT 999
BEGIN 1000
COMMIT 1000
BEGIN 1001
COMMIT 1001
BEGIN 1002
COMMIT 1002
BEGIN 1003
COMMIT 1003
BEGIN 1004
COMMIT 1004
BEGIN 1005
COMMIT 1005
BEGIN 1006
table public.dist_t_102009: INSERT: dist_id[bigint]:1 dist_col1[integer]:0
table public.dist_t_102009: INSERT: dist_id[bigint]:15 dist_col1[integer]:0
table public.dist_t_102009: INSERT: dist_id[bigint]:25 dist_col1[integer]:0
table public.dist_t_102009: INSERT: dist_id[bigint]:63 dist_col1[integer]:0
table public.dist_t_102009: INSERT: dist_id[bigint]:82 dist_col1[integer]:0
table public.dist_t_102009: INSERT: dist_id[bigint]:118 dist_col1[integer]:0
table public.dist_t_102009: INSERT: dist_id[bigint]:205 dist_col1[integer]:0
table public.dist_t_102009: INSERT: dist_id[bigint]:210 dist_col1[integer]:0
table public.dist_t_102009: INSERT: dist_id[bigint]:218 dist_col1[integer]:0
table public.dist_t_102009: INSERT: dist_id[bigint]:220 dist_col1[integer]:0
table public.dist_t_102009: INSERT: dist_id[bigint]:230 dist_col1[integer]:0
table public.dist_t_102009: INSERT: dist_id[bigint]:248 dist_col1[integer]:0
table public.dist_t_102009: INSERT: dist_id[bigint]:259 dist_col1[integer]:0
table public.dist_t_102009: INSERT: dist_id[bigint]:303 dist_col1[integer]:0
table public.dist_t_102009: INSERT: dist_id[bigint]:313 dist_col1[integer]:0
table public.dist_t_102009: INSERT: dist_id[bigint]:361 dist_col1[integer]:0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment