Skip to content

Instantly share code, notes, and snippets.

@zhangjiazhi
Created September 13, 2016 05:50
Show Gist options
  • Save zhangjiazhi/1b135c86fbea8b3c521d120636fcdc54 to your computer and use it in GitHub Desktop.
Save zhangjiazhi/1b135c86fbea8b3c521d120636fcdc54 to your computer and use it in GitHub Desktop.
CitusDB research note
deleting data:
select master_modify_multiple_shards(‘delete from github_events where created_at >= ''2015-01-01 00:00:00’’’)
join:
citus.large_table_shard_count default value 4 treat as large table , evaluates several possible join order require minimum data to be transferred across network
broadcast join:
replicate the small table
co-located joins:
distribute large table on the same columns you used to join table, and the shard creation logic will ensure that shards with the same distribution key ranges on the same works
repartition joins:
inefficient ,because need shuffling of data.
== do not support window functions
== do not support distinct
citusdb metadata
|
|
Distribute query Planer
|
|
Postgresql Query planner
|
|
Distributed Query Executors
|
| ——Distributed queries
Distribute join Funcs ……………. Distribute join Funcs…………. Distribute join Funcs ——Worker node
Distributed query planner:
for the select query,the planner first create a plan tree of the input query and transform it into its commutative and associative form so it can be parallelized ,it also applies several
optimization to ensure that the query are executed in a scalable manner ,and that network I/O is minimized
THE planner breaks the query into two parts , the master query which runs on the master and the worker query fragments which run on individual shards on the workers , the planner
then assigns these query fragments to the workers such that all their resources are used efficiently , after this step ,the distributed query plan is passed on to the distributed executor for execution
The planning process for key-value lookup on the distribution column or modification queries is slightly different as they hit exactly one shard , once the planner receives an incoming
query it need to decide the correct shard to which they query should be routed ,to do this ,it extract the distribution column in the incoming row and looks up the metadata
to determine the right shard for the query ,then,the planner rewrites the sql of that command to reference the shard table instead of the original table ,this re-written plan is then
passed to the distributed executor .
Distributed Query Executor :
Citus distributed executors run distributed query plans and handle failures that occur during query execution , the executor connect to the workers ,send the assigned tasks to them
and oversee their execution,if the executor cannot assign a task to the designated worker of if a task execution fails,then the executor dynamically re-assigns the task to replicas on
other workers, the executor processes only the failed query sub-tree, and not the entire query while handing failures.
cities has two executor type, real-time and task tracker , former is useful for handing simple key-value lookups and insert ,update and delete ,while the task tracker is better suited for larger select
Real-time Executor:
The real-time executor is the default executor used by citus, it is well suited for getting fast response to queries involving filters,aggregation and co-located joins,the real time
executor opens one connection per shard to the workers and send all fragment queries to them,it then fetches the results from each fragment query, mergers them,and give the
final results back to the user
Since the real time executor maintains an open connection for each shard to which it send queries, it may reach file descriptor/connection limits while dealing with high shard counts,
in such cases, the real-time executor throttles on assigning more tasks to workers to avoid overwhelming them with too many tasks, one can typically increase the file descriptor limit
on modern operating systems to avoid throttling ,and change citus configuration to use the real-time executor ,But ,that may not be ideal for efficient resource management while
running complex queries, for queries that touch thousands of shards or require large table join ,you can use the task tracker executor.
furthermore ,when the real time executor detects simple insert,update or delete queries it assigns the incoming query to the worker which has the target shard,the query is then handled by the worker postgresql server and the results are returned back to the user,in case a modification fails on a shard replica,the executor marks the correspond shard replica as invalid
in order to maintain data consistency.
Task Tracker Executor:
The task tracker executor is will suited for long running , complex data warehousing queries,this executor open only one connection per worker , and assigns all fragment queries
to a task tracker daemon on the worker , the task tracker daemon then regularly schedule new tasks and see through their completion, the executor on the master regularly checks
with these task trackers to see if their tasks completed .
each task tracker daemon on the worker also make sure to execute at most citus.max_running_tasks_per_node concurrently this concurrency limit help in avoiding disk IO
contention when queries are not served from memory , the task tracker executor is designed to efficiently handle complex query with require repartitioning and shuffling intermediate
data among workers.
Example:
-- Set up a distributed table containing counters
CREATE TABLE counters (c_key text, c_date date, c_value int, primary key (c_key, c_date));
SELECT master_create_distributed_table('counters', 'c_key', 'hash');
SELECT master_create_worker_shards('counters', 128, 2);
-- Enable timing to see reponse times
\timing on
-- First INSERT requires connection set-up, second will be faster
INSERT INTO counters VALUES ('num_purchases', '2016-03-04', 12); -- Time: 10.314 ms
INSERT INTO counters VALUES ('num_purchases', '2016-03-05', 5); -- Time: 3.132 ms
SET citus.all_modifications_commutative TO on;
INSERT INTO counters VALUES ('num_purchases', '2016-03-04', 1)
ON CONFLICT (c_key, c_date) DO UPDATE SET c_value = counters.c_value + 1;
COPY counters FROM STDIN WITH (FORMAT CSV);
-- Set up the events table
CREATE TABLE events (time timestamp, data jsonb);
SELECT master_create_distributed_table('events', 'time', 'append');
-- Add data into a new staging table
\COPY events FROM 'path-to-csv-file' WITH CSV
-- Prepare a staging table
CREATE TABLE stage_1 (LIKE events);
\COPY stage_1 FROM 'path-to-csv-file WITH CSV
-- In a separate transaction, append the staging table
SELECT master_append_table_to_shard(select_events_shard(), 'stage_1', 'master-node', 5432);
CREATE OR REPLACE FUNCTION select_events_shard() RETURNS bigint AS $$
DECLARE
shard_id bigint;
BEGIN
SELECT shardid INTO shard_id
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
WHERE logicalrelid = 'events'::regclass AND shardlength < 1024*1024*1024;
IF shard_id IS NULL THEN
/* no shard smaller than 1GB, create a new one */
SELECT master_create_empty_shard('events') INTO shard_id;
END IF;
RETURN shard_id;
END;
$$ LANGUAGE plpgsql;
-- Create stage table name sequence
CREATE SEQUENCE stage_id_sequence;
-- Generate a stage table name
SELECT 'stage_'||nextval('stage_id_sequence');
stage_table=$(psql -tA -h worker-node-1 -c "SELECT 'stage_'||nextval('stage_id_sequence')")
psql -h worker-node-1 -c "CREATE TABLE $stage_table (time timestamp, data jsonb)"
psql -h worker-node-1 -c "\COPY $stage_table FROM 'data.csv' WITH CSV"
psql -h master-node -c "SELECT master_append_table_to_shard(choose_underutilized_shard(), '$stage_table', 'worker-node-1', 5432)"
psql -h worker-node-1 -c "DROP TABLE $stage_table"
/* Choose a shard to which to append */
CREATE OR REPLACE FUNCTION choose_underutilized_shard()
RETURNS bigint LANGUAGE plpgsql
AS $function$
DECLARE
shard_id bigint;
num_small_shards int;
BEGIN
SELECT shardid, count(*) OVER () INTO shard_id, num_small_shards
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
WHERE logicalrelid = 'events'::regclass AND shardlength < 1024*1024*1024
GROUP BY shardid ORDER BY RANDOM() ASC;
IF num_small_shards IS NULL OR num_small_shards < 20 THEN
SELECT master_create_empty_shard('events') INTO shard_id;
END IF;
RETURN shard_id;
END;
$function$;
SET citus.explain_all_tasks = 1; This will cause EXPLAIN to show the the query plan for all tasks, not just one.
citus.task_executor_type:
if your use case mainly require simple key-value lookup or sub-second responses to aggregations and join , choose the real-time executor,
long running queries which require repartitioning and shuffling of data across the works , then you can switch to the task tracker executor
citus.remote_task_check_interval:
the master assigns task to workers, and check with them about each task’s progress
citus.limit_clause_row_fetch_count
citus.count_distinct_error_rate
Task assignment policy:
greedy :The greedy policy aims to distribute task evenly across the workers.
round_robin:assigns tasks to workers in a round-robin fashion alternating between different replicas.
first-replica:assigns tasks on the basis of the insertion order of placements (replicas) for the shards.
Intermediate data transfer format:
data will be transferred across workers or between workers and the master, default is text format .this generally better as text files typically have smaller
size than the binary representation ,this leads to lower network and disk io while writing and transferring intermediate data.
for certain data type like hLL or store arrays , the cost of serializing and deserializing data is pretty high , in such case , using binary format for transferring intermediate data
can improve query performance due to reduced cpu page, 2 parameter can used to turn this behavior
citus.binary_master_copy_format
citus.binary_worker_copy_format
citus doesn’t support insert into select on distributed table,so instead we’ll run a function on all the workers which runs insert into select on every matching pair of shards
SELECT
date_trunc('minute', ingest_time) as minute,
COUNT(1) AS request_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 1 ELSE 0 END) as success_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 0 ELSE 1 END) as error_count,
SUM(response_time_msec) / COUNT(1) AS average_response_time_msec
FROM http_request
WHERE site_id = 1 AND date_trunc('minute', ingest_time) > now() - interval '5 minutes'
GROUP BY minute;
CREATE TABLE http_request_1min (
site_id INT,
ingest_time TIMESTAMPTZ, -- which minute this row represents
error_count INT,
success_count INT,
request_count INT,
average_response_time_msec INT,
CHECK (request_count = error_count + success_count),
CHECK (ingest_time = date_trunc('minute', ingest_time))
);
SELECT master_create_distributed_table('http_request_1min', 'site_id', 'hash');
SELECT master_create_worker_shards('http_request_1min', 16, 2);
-- indexes aren't automatically created by Citus
-- this will create the index on all shards
CREATE INDEX http_request_1min_idx ON http_request_1min (site_id, ingest_time);
-- this function is created on the workers
CREATE FUNCTION rollup_1min(p_source_shard text, p_dest_shard text) RETURNS void
AS $$
BEGIN
-- the dest shard will have a name like: http_request_1min_204566, where 204566 is the
-- shard id. We lock using that id, to make sure multiple instances of this function
-- never simultaneously write to the same shard.
IF pg_try_advisory_xact_lock(29999, split_part(p_dest_shard, '_', 4)::int) = false THEN
-- N.B. make sure the int constant (29999) you use here is unique within your system
RETURN;
END IF;
EXECUTE format($insert$
INSERT INTO %2$I (
site_id, ingest_time, request_count,
success_count, error_count, average_response_time_msec
) SELECT
site_id,
date_trunc('minute', ingest_time) as minute,
COUNT(1) as request_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 1 ELSE 0 END) as success_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 0 ELSE 1 END) as error_count,
SUM(response_time_msec) / COUNT(1) AS average_response_time_msec
FROM %1$I
WHERE
date_trunc('minute', ingest_time)
> (SELECT COALESCE(max(ingest_time), timestamp '10-10-1901') FROM %2$I)
AND date_trunc('minute', ingest_time) < date_trunc('minute', now())
GROUP BY site_id, minute
ORDER BY minute ASC;
$insert$, p_source_shard, p_dest_shard);
END;
$$ LANGUAGE 'plpgsql';
-- this function is created on the master
CREATE FUNCTION colocated_shard_placements(left_table REGCLASS, right_table REGCLASS)
RETURNS TABLE (left_shard TEXT, right_shard TEXT, nodename TEXT, nodeport BIGINT) AS $$
SELECT
a.logicalrelid::regclass||'_'||a.shardid,
b.logicalrelid::regclass||'_'||b.shardid,
nodename, nodeport
FROM pg_dist_shard a
JOIN pg_dist_shard b USING (shardminvalue)
JOIN pg_dist_shard_placement p ON (a.shardid = p.shardid)
WHERE a.logicalrelid = left_table AND b.logicalrelid = right_table;
$$ LANGUAGE 'sql';
#!/usr/bin/env bash
QUERY=$(cat <<END
SELECT * FROM colocated_shard_placements(
'http_request'::regclass, 'http_request_1min'::regclass
);
END
)
COMMAND="psql -h \$2 -p \$3 -c \"SELECT rollup_1min('\$0', '\$1')\""
psql -tA -F" " -c "$QUERY" | xargs -P32 -n4 sh -c "$COMMAND"
Expiring old data:
-- another function for the master
CREATE FUNCTION expire_old_request_data() RETURNS void
AS $$
SET LOCAL citus.all_modification_commutative TO TRUE;
SELECT master_modify_multiple_shards(
'DELETE FROM http_request WHERE ingest_time < now() - interval ''1 day'';');
SELECT master_modify_multiple_shards(
'DELETE FROM http_request_1min WHERE ingest_time < now() - interval ''1 month'';');
END;
$$ LANGUAGE 'sql';
Approximate distinct count:
A datatype called hyperloglog, or HLL, can answer the query approximately;
https://github.com/aggregateknowledge/postgresql-hll
-- this part must be run on all nodes
CREATE EXTENSION hll;
Scaling out your cluster
adding a worker ,first need to add the dos name of that node to the pg_worker_list.conf
file in your data directory on the master node.then select pg_reload_conf()
Citus does not allow subqueries in the where clause so we must choose a workaround.
Workaround 1. Generate explicit WHERE-IN expression
-- create temporary table with results
do language plpgsql $$
declare user_ids integer[];
begin
execute
'select user_id'
' from events'
' where event_type = ''B'''
into user_ids;
execute format(
'create temp table results_temp as '
'select user_id'
' from events'
' where user_id = any(array[%s])'
' and event_type = ''A''',
array_to_string(user_ids, ','));
end;
$$;
-- read results, remove temp table
select * from results_temp;
drop table results_temp;
Workaround 2. Build query in SQL client
INSERT INTO ... SELECT
Citus does not support directly inserting the results of a query into a distributed table. One workaround is to use two database connections to stream the query results to master and then distribute them to the shards.
psql -c "COPY (query) TO STDOUT" | psql -c "COPY table FROM STDIN"
SELECT DISTINCT
Citus does not yet support SELECT DISTINCT but you can use GROUP BY for a simple workaround:
-- rather than this
-- select distinct col from table;
-- use this
select col from table group by col;
JOIN a local and a distributed table
ERROR: cannot plan queries that include both regular and partitioned relations
-- Allow "here" to be distributed
-- (presuming a primary key called "here_id")
SELECT master_create_distributed_table('here', 'here_id', 'hash');
-- Now make a full copy into a shard on every worker
SELECT master_create_worker_shards(
'here', 1,
(SELECT count(1) from master_get_active_worker_nodes())::integer
);
Data Warehousing Queries
select repo_id, actor->'id', count(*)
over (partition by repo_id)
from github_events
where repo_id = 1 or repo_id = 2;
Statement 1:
create temp table results as (
select repo_id, actor->'id' as actor_id
from github_events
where repo_id = 1 or repo_id = 2
);
Statement 2:
select repo_id, actor_id, count(*)
over (partition by repo_id)
from results;
User Defined Functions Reference
master_create_distributed_table
This example informs the database that the github_events table should be distributed by hash on the repo_id column.
SELECT master_create_distributed_table('github_events', 'repo_id', 'hash');
This example usage would create a total of 16 shards for the github_events table where each shard owns a portion of a hash token space and gets replicated on 2 workers.
SELECT master_create_worker_shards('github_events', 16, 2);
This example creates an empty shard for the github_events table. The shard id of the created shard is 102089.
SELECT * from master_create_empty_shard('github_events');
master_create_empty_shard
---------------------------
102089
(1 row)
Example
This example appends the contents of the github_events_local table to the shard having shard id 102089. The table github_events_local is present on the database running on the node master-101 on port number 5432. The function returns the ratio of the the current shard size to the maximum shard size, which is 0.1 indicating that 10% of the shard has been filled.
SELECT * from master_append_table_to_shard(102089,'github_events_local','master-101', 5432);
master_append_table_to_shard
------------------------------
0.100548
Example
The first example deletes all the shards for the github_events table since no delete criteria is specified. In the second example, only the shards matching the criteria (3 in this case) are deleted.
SELECT * from master_apply_delete_command('DELETE FROM github_events');
master_apply_delete_command
-----------------------------
5
(1 row)
SELECT * from master_apply_delete_command('DELETE FROM github_events WHERE review_date < ''2009-03-01''');
master_apply_delete_command
-----------------------------
3
master_modify_multiple_shards
The master_modify_multiple_shards() function is used to run a query against all shards which match the criteria specified by the query. As the function uses shard metadata to decide whether or not a shard needs to be updated, it requires the WHERE clause in the query to be on the distribution column. Depending on the value of citus.multi_shard_commit_protocol, the commit can be done in one- or two-phases.
Limitations:
• It cannot be called inside a transaction block
• It must be called with simple operator expressions only
Example
SELECT master_modify_multiple_shards(
'DELETE FROM customer_delete_protocol WHERE c_custkey > 500 AND c_custkey < 500');
Metadata / Configuration Information
Example
SELECT * from master_get_active_worker_nodes();
node_name | node_port
-----------+-----------
localhost | 9700
localhost | 9702
localhost | 9701
The example below fetches and displays the table metadata for the github_events table.
logical_relid: Oid of the distributed table. This values references the relfilenode column in the pg_class system catalog table.
part_storage_type: Type of storage used for the table. May be ‘t’ (standard table), ‘f’ (foreign table) or ‘c’ (columnar table).
part_method: Distribution method used for the table. May be ‘a’ (append), or ‘h’ (hash).
part_key: Distribution column for the table.
part_replica_count: Current shard replication count.
part_max_size: Current maximum shard size in bytes.
part_placement_policy: Shard placement policy used for placing the table’s shards. May be 1 (local-node-first) or 2 (round-robin).
SELECT * from master_get_table_metadata('github_events’);
logical_relid | part_storage_type | part_method | part_key | part_replica_count | part_max_size | part_placement_policy
---------------+-------------------+-------------+----------+--------------------+---------------+-----------------------
24180 | t | h | repo_id | 2 | 1073741824 | 2
(1 row)
Cluster Management And Repair Functions
Example
The example below will repair an inactive shard placement of shard 12345 which is present on the database server running on ‘bad_host’ on port 5432. To repair it, it will use data from a healthy shard placement present on the server running on ‘good_host’ on port 5432.
SELECT master_copy_shard_placement(12345, 'good_host', 5432, 'bad_host', 5432);
Partition table
The pg_dist_partition table stores metadata about which tables in the database are distributed. For each distributed table, it also stores information about the distribution method and detailed information about the distribution column.
Shard table
The pg_dist_shard table stores metadata about individual shards of a table. This includes information about which distributed table the shard belongs to and statistics about the distribution column for that shard. For append distributed tables, these statistics correspond to min / max values of the distribution column. In case of hash distributed tables, they are hash token ranges assigned to that shard. These statistics are used for pruning away unrelated shards during SELECT queries.
Shard placement table
The pg_dist_shard_placement table tracks the location of shard replicas on worker nodes. Each replica of a shard assigned to a specific node is called a shard placement. This table stores information about the health and location of each shard placement.
Shard Placement States
Citus manages shard health on a per-placement basis and automatically marks a placement as unavailable if leaving the placement in service would put the cluster in an inconsistent state. The shardstate column in the pg_dist_shard_placement table is used to store the state of shard placements. A brief overview of different shard placement states and their representation is below.
pg_worker_list.conf
The Citus master needs to have information about the worker nodes in the cluster so that it can communicate with them. This information is stored in the pg_worker_list.conf file in the data directory on the master.
vi $PGDATA/pg_worker_list.conf
# HOSTNAME [PORT] [RACK]
worker-101
worker-102
citus.max_worker_nodes_tracked (integer)
Citus tracks worker nodes’ locations and their membership in a shared hash table on the master node. This configuration value limits the size of the hash table, and consequently the number of worker nodes that can be tracked. The default for this setting is 2048. This parameter can only be set at server start and is effective on the master node.
citus.multi_shard_commit_protocol (enum)
Sets the commit protocol to use when performing COPY on a hash distributed table. On each individual shard placement, the COPY is performed in a transaction block to ensure that no data is ingested if an error occurs during the COPY. However, there is a particular failure case in which the COPY succeeds on all placements, but a (hardware) failure occurs before all transactions commit. This parameter can be used to prevent data loss in that case by choosing between the following commit protocols:
• 1pc: The transactions in which which COPY is performed on the shard placements is committed in a single round. Data may be lost if a commit fails after COPY succeeds on all placements (rare). This is the default protocol.
• 2pc: The transactions in which COPY is performed on the shard placements are first prepared using PostgreSQL’s two-phase commit and then committed. Failed commits can be manually recovered or aborted using COMMIT PREPARED or ROLLBACK PREPARED, respectively. When using 2pc, max_prepared_transactions should be increased on all the workers, typically to the same value as max_connections.
citus.shard_max_size (integer)
Sets the maximum size to which a shard will grow before it gets split and defaults to 1GB. When the source file’s size (which is used for staging) for one shard exceeds this configuration value, the database ensures that a new shard gets created. This parameter can be set at run-time and is effective on the master.
citus.task_assignment_policy (enum)
Sets the policy to use when assigning tasks to workers. The master assigns tasks to workers based on shard locations. This configuration value specifies the policy to use when making these assignments. Currently, there are three possible task assignment policies which can be used.
• greedy: The greedy policy is the default and aims to evenly distribute tasks across workers.
• round-robin: The round-robin policy assigns tasks to workers in a round-robin fashion alternating between different replicas. This enables much better cluster utilization when the shard count for a table is low compared to the number of workers.
• first-replica: The first-replica policy assigns tasks on the basis of the insertion order of placements (replicas) for the shards. In other words, the fragment query for a shard is simply assigned to the worker which has the first replica of that shard. This method allows you to have strong guarantees about which shards will be used on which nodes (i.e. stronger memory residency guarantees).
citus.binary_worker_copy_format (boolean)
Use the binary copy format to transfer intermediate data between workers. During large table joins, Citus may have to dynamically repartition and shuffle data between different workers. By default, this data is transferred in text format. Enabling this parameter instructs the database to use PostgreSQL’s binary serialization format to transfer this data. This parameter is effective on the workers and needs to be changed in the postgresql.conf file. After editing the config file, users can send a SIGHUP signal or restart the server for this change to take effect.
citus.binary_master_copy_format (boolean)
Use the binary copy format to transfer data between master and the workers. When running distributed queries, the workers transfer their intermediate results to the master for final aggregation. By default, this data is transferred in text format. Enabling this parameter instructs the database to use PostgreSQL’s binary serialization format to transfer this data. This parameter can be set at runtime and is effective on the master.
citus.all_modifications_commutative
Citus enforces commutativity rules and acquires appropriate locks for modify operations in order to guarantee correctness of behavior. For example, it assumes that an INSERT statement commutes with another INSERT statement, but not with an UPDATE or DELETE statement. Similarly, it assumes that an UPDATE or DELETE statement does not commute with another UPDATE or DELETE statement. This means that UPDATEs and DELETEs require Citus to acquire stronger locks.
If you have UPDATE statements that are commutative with your INSERTs or other UPDATEs, then you can relax these commutativity assumptions by setting this parameter to true. When this parameter is set to true, all commands are considered commutative and claim a shared lock, which can improve overall throughput. This parameter can be set at runtime and is effective on the master.
citus.task_executor_type (enum)
Citus has two executor types for running distributed SELECT queries. The desired executor can be selected by setting this configuration parameter. The accepted values for this parameter are:
• real-time: The real-time executor is the default executor and is optimal when you require fast responses to queries that involve aggregations and colocated joins spanning across multiple shards.
• task-tracker: The task-tracker executor is well suited for long running, complex queries which require shuffling of data across worker nodes and efficient resource management.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment