| CREATE TABLE github_events | |
| ( | |
| event_id bigint, | |
| event_type text, | |
| event_public boolean, | |
| repo_id bigint, | |
| payload jsonb, | |
| repo jsonb, | |
| actor jsonb, | |
| org jsonb, | |
| created_at timestamp | |
| ); | |
| SELECT master_create_distributed_table('github_events', 'created_at', 'append'); | |
| CREATE INDEX ON github_events (event_type); | |
| CREATE INDEX ON github_events USING GIN (actor jsonb_path_ops); | |
| CREATE INDEX ON github_events USING GIN (repo jsonb_path_ops); |
| CREATE OR REPLACE FUNCTION get_date_shard(events_date date) | |
| RETURNS bigint AS | |
| $BODY$ | |
| DECLARE | |
| date_shard_id bigint; | |
| BEGIN | |
| SELECT shardid INTO date_shard_id FROM pg_dist_shard | |
| WHERE logicalrelid = 'github_events'::regclass AND shardminvalue::date = events_date; | |
| IF NOT FOUND THEN | |
| SELECT master_create_empty_shard('github_events') INTO date_shard_id; | |
| UPDATE pg_dist_shard | |
| SET shardminvalue = events_date::timestamp, | |
| shardmaxvalue = events_date::timestamp + interval '1 day' - interval '1 second' | |
| WHERE shardid = date_shard_id; | |
| END IF; | |
| RETURN date_shard_id; | |
| END; | |
| $BODY$ | |
| LANGUAGE plpgsql; |
| #!/bin/sh -e | |
| date=$1 | |
| start_hour=$2 | |
| end_hour=${3:-$2} | |
| # Get a shard for the date | |
| shard_id=$(psql -tA -c "SELECT get_date_shard('$date')") | |
| # Stage from one of the shard placements | |
| worker_name=$(psql -tA -F" " -c "SELECT nodename FROM pg_dist_shard_placement WHERE shardid = $shard_id LIMIT 1") | |
| # Load the raw data from githubarchive.org | |
| stage_table=$(psql -tA -h $worker_name -c "SELECT load_github_events('$date', $start_hour, $end_hour)") | |
| # Append the data to the appropriate shard | |
| psql -c "SELECT master_append_table_to_shard($shard_id, '$stage_table', '$worker_name', 5432)" >/dev/null | |
| # Drop the stage table | |
| psql -h $worker_name -c "DROP TABLE $stage_table" >/dev/null | |
| echo loaded $date from $start_hour:00:00 to $end_hour:59:59 |
| CREATE SEQUENCE IF NOT EXISTS stage_id; | |
| CREATE OR REPLACE FUNCTION load_github_events(events_date date, start_hour int, end_hour int) RETURNS text AS | |
| $BODY$ | |
| DECLARE | |
| stage_table text := 'stage_'||nextval('stage_id'); | |
| BEGIN | |
| CREATE TEMPORARY TABLE input (data jsonb); | |
| /* Download, decompress, and filter JSON data */ | |
| EXECUTE format('COPY input FROM PROGRAM ''curl -s http://data.githubarchive.org/%s-{%s..%s}.json.gz | zcat | grep -v "\\u0000"'''|| | |
| 'CSV QUOTE e''\x01'' DELIMITER e''\x02''', events_date, start_hour, end_hour); | |
| /* Convert raw JSON to table format */ | |
| EXECUTE format('CREATE TABLE %I AS '|| | |
| 'SELECT (data->>''id'')::bigint AS event_id, '|| | |
| '(data->>''type'')::text AS event_type, '|| | |
| '(data->>''public'')::boolean AS event_public, '|| | |
| '(data->''repo''->>''id'')::bigint AS repo_id, '|| | |
| 'data->''payload'' AS payload, '|| | |
| 'data->''repo'' AS repo, '|| | |
| 'data->''actor'' AS actor, '|| | |
| 'data->''org'' AS org, '|| | |
| '(data->>''created_at'')::timestamp AS created_at '|| | |
| 'FROM input', stage_table); | |
| RETURN stage_table; | |
| END; | |
| $BODY$ | |
| LANGUAGE plpgsql; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment