Created
May 12, 2023 19:10
-
-
Save hrz6976/3172fb1637da7033cb17d95b3e2dcb72 to your computer and use it in GitHub Desktop.
Sync gharchive dataset to clickhouse on a schedule
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
set -e | |
# import settings from .env | |
CWD=$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd ) | |
source $CWD/.env | |
FILELIST_PATH=$(realpath $CWD/filelist.txt) | |
# Set the timezone to UTC | |
export TZ=UTC | |
function date_to_gha(){ | |
# convert %Y-%m-%d %H:%M:%S to %Y-%m-%d-%-H | |
echo $(date -d "$1" +"%Y-%m-%d-%-H") | |
} | |
function date_from_gha(){ | |
# convert %Y-%m-%d-%-H to %Y-%m-%d %H:%M:%S | |
echo $(date -d "$(echo ${1} | sed 's/\(.*\)\(\-\)/\1 /')" +"%Y-%m-%d %H:%M:%S") | |
} | |
function usage(){ | |
echo "Downloads GHArchive Dumps from $BASE_URL" | |
echo "Usage: $0 [start_time] [end_time]" | |
echo "Example: $0 \"2011-02-11 00:00:00\" \"2022-02-11 01:00:00\"" | |
exit 1 | |
} | |
if [ $# -lt 1 ] || [ $# -gt 2 ]; then | |
usage | |
fi | |
# first argument | |
# START_TIME in format that GNU date accepts | |
START_TIME=${1} | |
# END_TIME in format that GUU date accepts | |
END_TIME=${2:-$(date +"%Y-%m-%d %H:%M:%S")} # current time | |
echo "Downloading archives from $START_TIME to $END_TIME" | |
# Create download directory if it doesn't exist | |
if [ ! -d "$DOWNLOAD_DIR" ]; then | |
mkdir -p $DOWNLOAD_DIR | |
fi | |
# if filelist.txt already exists, remove | |
if [ -f "${FILELIST_PATH}" ]; then | |
mv "${FILELIST_PATH}" "${FILELIST_PATH}".bak | |
fi | |
# START -> %Y-%m-%d-%-H to %Y-%m-%d %-H:00:00 to %s | |
START=$(date -d "$START_TIME" "+%s") | |
END=$(date -d "$END_TIME" "+%s") | |
for ((i=$START;i<=$END;i+=3600)); do | |
HOUR=$(date -d "@${i}" "+%Y-%m-%d-%-H") | |
# skip download if file exists and aria2c control file not exists | |
if [ -f "$DOWNLOAD_DIR/$HOUR.json.gz" ] && [ ! -f "$DOWNLOAD_DIR/$HOUR.json.gz.aria2" ]; then | |
continue | |
fi | |
printf "\n${BASE_URL}${HOUR}.json.gz" >> ${FILELIST_PATH} | |
done | |
printf "Downloading %s files, first is %s\n" $(wc -l ${FILELIST_PATH}) $(cat ${FILELIST_PATH} | head -n1) | |
# ignore download errors | |
set +e | |
# Download the archives using aria2c | |
# Try with best effort; write files | |
aria2c -i "${FILELIST_PATH}" -d $DOWNLOAD_DIR --continue=true \ | |
--auto-file-renaming=false \ | |
--min-split-size=1M --split=10 \ | |
--summary-interval=0 \ | |
--timeout=600 \ | |
--connect-timeout=60 \ | |
--max-tries=5 \ | |
--retry-wait=10 \ | |
--check-certificate=false \ | |
--user-agent="Mozilla/5.0 (Windows NT 10.0; rv:78.0) Gecko/20100101 Firefox/78.0" \ | |
--header="Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8" \ | |
--header="Accept-Language: en-US,en;q=0.5" | |
# restore exit on error | |
set -e | |
echo "Downloaded archives to $DOWNLOAD_DIR" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Set the base URL for the archives | |
BASE_URL="https://data.gharchive.org/" | |
# Set the path to the folder where you want to store the downloaded archives temporarily | |
DOWNLOAD_DIR=${DOWNLOAD_DIR:-"/path/to/gharchive/temp"} | |
# Set the name of the ClickHouse database and table where you want to import the data | |
DB_NAME="gharchive" | |
TABLE_NAME="github_events" | |
# Set the ClickHouse server host, port, username, and password | |
CLICKHOUSE_HOST="localhost" | |
CLICKHOUSE_PORT="9000" | |
CLICKHOUSE_USER="default" | |
CLICKHOUSE_PASSWORD="CLICKHOUSE_PASSWORD" | |
# The number of parallel processes to use for importing | |
NPROC=16 | |
# Cleanup obsolete archives before importing | |
CLEANUP=1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
### Add this file to /etc/cron.d to run it every day ### | |
55 8 * * * user test -x /archive/gharchive/sync.sh && DOWNLOAD_DIR="/archive/gharchive/temp" bash /archive/gharchive/sync.sh >> /archive/gharchive/sync.log 2>&1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
set -e | |
# import settings from .env | |
CWD=$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd ) | |
source $CWD/.env | |
IMPORTLIST_PATH=$(realpath $CWD/importlist.txt) | |
# Set the timezone to UTC | |
export TZ=UTC | |
function date_to_gha(){ | |
# convert %Y-%m-%d %H:%M:%S to %Y-%m-%d-%-H | |
echo $(date -d "$1" +"%Y-%m-%d-%-H") | |
} | |
function date_from_gha(){ | |
# convert %Y-%m-%d-%-H to %Y-%m-%d %H:%M:%S | |
echo $(date -d "$(echo ${1} | sed 's/\(.*\)\(\-\)/\1 /')" +"%Y-%m-%d %H:%M:%S") | |
} | |
# Create download directory if it doesn't exist | |
if [ ! -d "$DOWNLOAD_DIR" ]; then | |
mkdir -p $DOWNLOAD_DIR | |
fi | |
function run_clickhouse_query() { | |
clickhouse-client -h $CLICKHOUSE_HOST --port $CLICKHOUSE_PORT --user $CLICKHOUSE_USER --password $CLICKHOUSE_PASSWORD -q "$1" | |
} | |
# Use an associative array to check the existence of a timestamp in clickhouse in O(1) | |
declare -A TIMESTAMP_CHECK=() | |
function is_timestamp_exists() { | |
return ${TIMESTAMP_CHECK["${1}"]+1} | |
} | |
ALL_TIMESTAMPS=$(run_clickhouse_query "SELECT DISTINCT(file_time) FROM $DB_NAME.$TABLE_NAME") | |
printf "%d dumps in Clickhouse\n" $(wc -l <<< "$ALL_TIMESTAMPS") | |
# import all timestamps to TIMESTAMP_CHECK | |
while read -r TIMESTAMP; do | |
TIMESTAMP_CHECK["${TIMESTAMP}"]=1 | |
done <<< "$ALL_TIMESTAMPS" | |
if [ -f $IMPORTLIST_PATH ]; then | |
mv $IMPORTLIST_PATH $IMPORTLIST_PATH.bak | |
fi | |
for file in $(ls $DOWNLOAD_DIR/*.json.gz); do | |
# get timestamp from filename %Y-%m-%d %H:%M:%S | |
TIMESTAMP=$(basename $file | sed 's/\.json\.gz//' | sed 's/\(.*\)\(\-\)/\1 /' | date -f - +"%Y-%m-%d %H:%M:%S") | |
# check if timestamp exists in TIMESTAMP_CHECK | |
if [ ! ${TIMESTAMP_CHECK["${TIMESTAMP}"]+1} ]; then | |
echo "Adding" $file "@" $TIMESTAMP | |
# if not exists, add to filelist.txt | |
printf "${file}\n" >> $IMPORTLIST_PATH | |
else | |
if [ ! -z $CLEANUP ]; then | |
echo "Removing" $file "@" $TIMESTAMP | |
# if exists, remove the file | |
rm $file | |
fi | |
fi | |
done | |
# remove the \n at the end of the file | |
truncate -s -1 $IMPORTLIST_PATH | |
echo "Importing $(cat $IMPORTLIST_PATH | wc -l) files on $NPROC workers, first is $(cat $IMPORTLIST_PATH | head -n1)" | |
PREV_N_RECORDS=$(run_clickhouse_query "SELECT count() FROM $DB_NAME.$TABLE_NAME") | |
cat $IMPORTLIST_PATH | xargs -P${NPROC} -I{} bash -c " | |
gzip -cd {} | jq -c ' | |
[ | |
(\"{}\" | scan(\"[0-9]+-[0-9]+-[0-9]+-[0-9]+\")), | |
.type, | |
.actor.login? // .actor_attributes.login? // (.actor | strings) // null, | |
.repo.name? // (.repository.owner? + \"/\" + .repository.name?) // null, | |
.created_at, | |
.payload.updated_at? // .payload.comment?.updated_at? // .payload.issue?.updated_at? // .payload.pull_request?.updated_at? // null, | |
.payload.action, | |
.payload.comment.id, | |
.payload.review.body // .payload.comment.body // .payload.issue.body? // .payload.pull_request.body? // .payload.release.body? // null, | |
.payload.comment?.path? // null, | |
.payload.comment?.position? // null, | |
.payload.comment?.line? // null, | |
.payload.ref? // null, | |
.payload.ref_type? // null, | |
.payload.comment.user?.login? // .payload.issue.user?.login? // .payload.pull_request.user?.login? // null, | |
.payload.issue.number? // .payload.pull_request.number? // .payload.number? // null, | |
.payload.issue.title? // .payload.pull_request.title? // null, | |
[.payload.issue.labels?[]?.name // .payload.pull_request.labels?[]?.name], | |
.payload.issue.state? // .payload.pull_request.state? // null, | |
.payload.issue.locked? // .payload.pull_request.locked? // null, | |
.payload.issue.assignee?.login? // .payload.pull_request.assignee?.login? // null, | |
[.payload.issue.assignees?[]?.login? // .payload.pull_request.assignees?[]?.login?], | |
.payload.issue.comments? // .payload.pull_request.comments? // null, | |
.payload.review.author_association // .payload.issue.author_association? // .payload.pull_request.author_association? // null, | |
.payload.issue.closed_at? // .payload.pull_request.closed_at? // null, | |
.payload.pull_request.merged_at? // null, | |
.payload.pull_request.merge_commit_sha? // null, | |
[.payload.pull_request.requested_reviewers?[]?.login], | |
[.payload.pull_request.requested_teams?[]?.name], | |
.payload.pull_request.head?.ref? // null, | |
.payload.pull_request.head?.sha? // null, | |
.payload.pull_request.base?.ref? // null, | |
.payload.pull_request.base?.sha? // null, | |
.payload.pull_request.merged? // null, | |
.payload.pull_request.mergeable? // null, | |
.payload.pull_request.rebaseable? // null, | |
.payload.pull_request.mergeable_state? // null, | |
.payload.pull_request.merged_by?.login? // null, | |
.payload.pull_request.review_comments? // null, | |
.payload.pull_request.maintainer_can_modify? // null, | |
.payload.pull_request.commits? // null, | |
.payload.pull_request.additions? // null, | |
.payload.pull_request.deletions? // null, | |
.payload.pull_request.changed_files? // null, | |
.payload.comment.diff_hunk? // null, | |
.payload.comment.original_position? // null, | |
.payload.comment.commit_id? // null, | |
.payload.comment.original_commit_id? // null, | |
.payload.size? // null, | |
.payload.distinct_size? // null, | |
.payload.member.login? // .payload.member? // null, | |
.payload.release?.tag_name? // null, | |
.payload.release?.name? // null, | |
.payload.review?.state? // null | |
]' | clickhouse-client -h $CLICKHOUSE_HOST --port $CLICKHOUSE_PORT --user $CLICKHOUSE_USER --password $CLICKHOUSE_PASSWORD \ | |
--input_format_null_as_default 1 --date_time_input_format best_effort \ | |
--query 'INSERT INTO $DB_NAME.$TABLE_NAME FORMAT JSONCompactEachRow' \ | |
|| echo 'File {} failed to import' | |
" | |
NEW_N_RECORDS=$(run_clickhouse_query "SELECT count() FROM $DB_NAME.$TABLE_NAME") | |
echo "Imported $(($NEW_N_RECORDS - $PREV_N_RECORDS)) records, total $NEW_N_RECORDS records" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
### Add this file to cron.d to run it every day ### | |
# 0 0 * * * user test -x /home/user/gharchive/sync.sh && bash /home/user/gharchive/sync.sh 2>&1 | tee -a /home/user/gharchive/sync.log | |
#!/bin/bash | |
set -e | |
# import settings from .env | |
CWD=$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd ) | |
source $CWD/.env | |
# Set the timezone to UTC | |
export TZ=UTC | |
function run_clickhouse_query() { | |
clickhouse-client -h $CLICKHOUSE_HOST --port $CLICKHOUSE_PORT --user $CLICKHOUSE_USER --password $CLICKHOUSE_PASSWORD -q "$1" | |
} | |
# find max(file_time) in clickhouse, add 1 hour, convert it to gha format | |
# if there is no data in clickhouse, set it to 2011-02-11 | |
MAX_TIMESTAMP=$(run_clickhouse_query "SELECT max(file_time) FROM $DB_NAME.$TABLE_NAME") | |
if [ -z "$MAX_TIMESTAMP" ]; then | |
MAX_TIMESTAMP="2011-02-11 00:00:00" | |
fi | |
MAX_TIMESTAMP=$(date -d "$MAX_TIMESTAMP 1 hour" +"%Y-%m-%d %H:%M:%S") | |
NOW_TIMESTAMP=$(date +"%Y-%m-%d %H:%M:%S") | |
printf "From %s to %s\n" "$MAX_TIMESTAMP" "$NOW_TIMESTAMP" | |
# run download.sh in the same directory | |
bash $CWD/download.sh "$MAX_TIMESTAMP" "$NOW_TIMESTAMP" | |
# run import.sh in the same directory | |
bash $CWD/import.sh |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment