Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Scripts for loading Github events into Citus
CREATE TABLE github_events
(
event_id bigint,
event_type text,
event_public boolean,
repo_id bigint,
payload jsonb,
repo jsonb,
actor jsonb,
org jsonb,
created_at timestamp
);
CREATE INDEX ON github_events (event_type);
CREATE INDEX ON github_events USING GIN (actor jsonb_path_ops);
CREATE INDEX ON github_events USING GIN (repo jsonb_path_ops);
SELECT master_create_distributed_table('github_events', 'created_at', 'append');
CREATE OR REPLACE FUNCTION get_date_shard(events_date date)
RETURNS bigint AS
$BODY$
DECLARE
date_shard_id bigint;
BEGIN
SELECT shardid INTO date_shard_id FROM pg_dist_shard
WHERE logicalrelid = 'github_events'::regclass AND shardminvalue::date = events_date;
IF NOT FOUND THEN
SELECT master_create_empty_shard('github_events') INTO date_shard_id;
UPDATE pg_dist_shard
SET shardminvalue = events_date::timestamp,
shardmaxvalue = events_date::timestamp + interval '1 day' - interval '1 second'
WHERE shardid = date_shard_id;
END IF;
RETURN date_shard_id;
END;
$BODY$
LANGUAGE plpgsql;
#!/bin/sh -e
date=$1
start_hour=$2
end_hour=${3:-$2}
# Get a shard for the date
shard_id=$(psql -tA -c "SELECT get_date_shard('$date')")
# Stage from one of the shard placements
worker_name=$(psql -tA -F" " -c "SELECT nodename FROM pg_dist_shard_placement WHERE shardid = $shard_id LIMIT 1")
# Load the raw data from githubarchive.org
stage_table=$(psql -tA -h $worker_name -c "SELECT load_github_events('$date', $start_hour, $end_hour)")
# Append the data to the appropriate shard
psql -c "SELECT master_append_table_to_shard($shard_id, '$stage_table', '$worker_name', 5432)" >/dev/null
# Drop the stage table
psql -h $worker_name -c "DROP TABLE $stage_table" >/dev/null
echo loaded $date from $start_hour:00:00 to $end_hour:59:59
CREATE SEQUENCE IF NOT EXISTS stage_id;
CREATE OR REPLACE FUNCTION load_github_events(events_date date, start_hour int, end_hour int) RETURNS text AS
$BODY$
DECLARE
stage_table text := 'stage_'||nextval('stage_id');
BEGIN
CREATE TEMPORARY TABLE input (data jsonb);
/* Download, decompress, and filter JSON data */
EXECUTE format('COPY input FROM PROGRAM ''curl -s http://data.githubarchive.org/%s-{%s..%s}.json.gz | zcat | grep -v "\\u0000"'''||
'CSV QUOTE e''\x01'' DELIMITER e''\x02''', events_date, start_hour, end_hour);
/* Convert raw JSON to table format */
EXECUTE format('CREATE TABLE %I AS '||
'SELECT (data->>''id'')::bigint AS event_id, '||
'(data->>''type'')::text AS event_type, '||
'(data->>''public'')::boolean AS event_public, '||
'(data->''repo''->>''id'')::bigint AS repo_id, '||
'data->''payload'' AS payload, '||
'data->''repo'' AS repo, '||
'data->''actor'' AS actor, '||
'data->''org'' AS org, '||
'(data->>''created_at'')::timestamp AS created_at '||
'FROM input', stage_table);
RETURN stage_table;
END;
$BODY$
LANGUAGE plpgsql;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment