Adding stage tables to CitusDB
CREATE TABLE IF NOT EXISTS logs ( | |
id uuid, | |
date timestamptz, | |
version jsonb, | |
data jsonb | |
); | |
CREATE INDEX logs_id_idx ON logs (id); | |
SELECT master_create_distributed_table('logs', 'date', 'append'); | |
CREATE OR REPLACE FUNCTION master_get_shard(table_id regclass, max_size bigint DEFAULT 1073741824) | |
RETURNS bigint LANGUAGE plpgsql | |
AS $function$ | |
DECLARE | |
shard_id bigint; | |
num_small_shards bigint; | |
BEGIN | |
SELECT shardid, count(*) OVER () INTO shard_id, num_small_shards | |
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) | |
WHERE logicalrelid = table_id | |
AND shardlength < max_size | |
GROUP BY shardid | |
ORDER BY RANDOM() ASC; | |
/* stage in up to 20 shards in parallel */ | |
IF num_small_shards IS NULL OR num_small_shards < 20 THEN | |
SELECT master_create_empty_shard(table_id::text) INTO shard_id; | |
END IF; | |
RETURN shard_id; | |
END; | |
$function$; |
#!/bin/sh -e | |
export PGDATABASE=postgres | |
MASTER=localhost | |
WORKER=$1 | |
lockfile=/tmp/lock-$WORKER | |
if ( set -o noclobber; echo "locked" > "$lockfile") 2> /dev/null; then | |
trap 'rm -f "$lockfile"; exit $?' INT TERM EXIT | |
else | |
echo "Already staging from $WORKER, skipping" >&2 | |
exit 1 | |
fi | |
psql -h $WORKER -c "SELECT prepare_logs_stage_table()" | |
psql -h $MASTER -c "SELECT master_append_table_to_shard(master_get_shard('logs', 'logs_stage', '$WORKER', 5432)" | |
psql -h $WORKER -c "DROP TABLE logs_stage" |
CREATE TABLE IF NOT EXISTS logs ( | |
id uuid, | |
date timestamptz, | |
version jsonb, | |
data jsonb | |
); | |
CREATE OR REPLACE FUNCTION prepare_logs_stage_table() | |
RETURNS void LANGUAGE plpgsql | |
AS $function$ | |
DECLARE | |
num_rows bigint; | |
BEGIN | |
CREATE UNLOGGED TABLE IF NOT EXISTS logs_stage AS SELECT * FROM logs LIMIT 0; | |
WITH stage_rows AS (DELETE FROM logs RETURNING *) | |
INSERT INTO logs_stage SELECT * FROM stage_rows; | |
GET DIAGNOSTICS num_rows = ROW_COUNT; | |
IF num_rows = 0 THEN | |
RAISE EXCEPTION 'No rows to ingest'; | |
END IF; | |
END; | |
$function$; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment