Last active
February 13, 2016 20:37
-
-
Save marcocitus/e6b90e1dbcd248d63dbd to your computer and use it in GitHub Desktop.
Adding stage tables to CitusDB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE TABLE IF NOT EXISTS logs ( | |
id uuid, | |
date timestamptz, | |
version jsonb, | |
data jsonb | |
); | |
CREATE INDEX logs_id_idx ON logs (id); | |
SELECT master_create_distributed_table('logs', 'date', 'append'); | |
CREATE OR REPLACE FUNCTION master_get_shard(table_id regclass, max_size bigint DEFAULT 1073741824) | |
RETURNS bigint LANGUAGE plpgsql | |
AS $function$ | |
DECLARE | |
shard_id bigint; | |
num_small_shards bigint; | |
BEGIN | |
SELECT shardid, count(*) OVER () INTO shard_id, num_small_shards | |
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) | |
WHERE logicalrelid = table_id | |
AND shardlength < max_size | |
GROUP BY shardid | |
ORDER BY RANDOM() ASC; | |
/* stage in up to 20 shards in parallel */ | |
IF num_small_shards IS NULL OR num_small_shards < 20 THEN | |
SELECT master_create_empty_shard(table_id::text) INTO shard_id; | |
END IF; | |
RETURN shard_id; | |
END; | |
$function$; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/sh -e | |
export PGDATABASE=postgres | |
MASTER=localhost | |
WORKER=$1 | |
lockfile=/tmp/lock-$WORKER | |
if ( set -o noclobber; echo "locked" > "$lockfile") 2> /dev/null; then | |
trap 'rm -f "$lockfile"; exit $?' INT TERM EXIT | |
else | |
echo "Already staging from $WORKER, skipping" >&2 | |
exit 1 | |
fi | |
psql -h $WORKER -c "SELECT prepare_logs_stage_table()" | |
psql -h $MASTER -c "SELECT master_append_table_to_shard(master_get_shard('logs', 'logs_stage', '$WORKER', 5432)" | |
psql -h $WORKER -c "DROP TABLE logs_stage" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/sh | |
# Run periodically (e.g. every minute) on the master node | |
cat /data/base/pg_worker_list.conf | xargs -n 2 -P 100 rotate-worker.sh |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE TABLE IF NOT EXISTS logs ( | |
id uuid, | |
date timestamptz, | |
version jsonb, | |
data jsonb | |
); | |
CREATE OR REPLACE FUNCTION prepare_logs_stage_table() | |
RETURNS void LANGUAGE plpgsql | |
AS $function$ | |
DECLARE | |
num_rows bigint; | |
BEGIN | |
CREATE UNLOGGED TABLE IF NOT EXISTS logs_stage AS SELECT * FROM logs LIMIT 0; | |
WITH stage_rows AS (DELETE FROM logs RETURNING *) | |
INSERT INTO logs_stage SELECT * FROM stage_rows; | |
GET DIAGNOSTICS num_rows = ROW_COUNT; | |
IF num_rows = 0 THEN | |
RAISE EXCEPTION 'No rows to ingest'; | |
END IF; | |
END; | |
$function$; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment