Skip to content

Instantly share code, notes, and snippets.

@marcocitus
Last active February 13, 2016 20:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save marcocitus/e6b90e1dbcd248d63dbd to your computer and use it in GitHub Desktop.
Save marcocitus/e6b90e1dbcd248d63dbd to your computer and use it in GitHub Desktop.
Adding stage tables to CitusDB
CREATE TABLE IF NOT EXISTS logs (
id uuid,
date timestamptz,
version jsonb,
data jsonb
);
CREATE INDEX logs_id_idx ON logs (id);
SELECT master_create_distributed_table('logs', 'date', 'append');
CREATE OR REPLACE FUNCTION master_get_shard(table_id regclass, max_size bigint DEFAULT 1073741824)
RETURNS bigint LANGUAGE plpgsql
AS $function$
DECLARE
shard_id bigint;
num_small_shards bigint;
BEGIN
SELECT shardid, count(*) OVER () INTO shard_id, num_small_shards
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
WHERE logicalrelid = table_id
AND shardlength < max_size
GROUP BY shardid
ORDER BY RANDOM() ASC;
/* stage in up to 20 shards in parallel */
IF num_small_shards IS NULL OR num_small_shards < 20 THEN
SELECT master_create_empty_shard(table_id::text) INTO shard_id;
END IF;
RETURN shard_id;
END;
$function$;
#!/bin/sh -e
export PGDATABASE=postgres
MASTER=localhost
WORKER=$1
lockfile=/tmp/lock-$WORKER
if ( set -o noclobber; echo "locked" > "$lockfile") 2> /dev/null; then
trap 'rm -f "$lockfile"; exit $?' INT TERM EXIT
else
echo "Already staging from $WORKER, skipping" >&2
exit 1
fi
psql -h $WORKER -c "SELECT prepare_logs_stage_table()"
psql -h $MASTER -c "SELECT master_append_table_to_shard(master_get_shard('logs', 'logs_stage', '$WORKER', 5432)"
psql -h $WORKER -c "DROP TABLE logs_stage"
#!/bin/sh
# Run periodically (e.g. every minute) on the master node
cat /data/base/pg_worker_list.conf | xargs -n 2 -P 100 rotate-worker.sh
CREATE TABLE IF NOT EXISTS logs (
id uuid,
date timestamptz,
version jsonb,
data jsonb
);
CREATE OR REPLACE FUNCTION prepare_logs_stage_table()
RETURNS void LANGUAGE plpgsql
AS $function$
DECLARE
num_rows bigint;
BEGIN
CREATE UNLOGGED TABLE IF NOT EXISTS logs_stage AS SELECT * FROM logs LIMIT 0;
WITH stage_rows AS (DELETE FROM logs RETURNING *)
INSERT INTO logs_stage SELECT * FROM stage_rows;
GET DIAGNOSTICS num_rows = ROW_COUNT;
IF num_rows = 0 THEN
RAISE EXCEPTION 'No rows to ingest';
END IF;
END;
$function$;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment