Skip to content

Instantly share code, notes, and snippets.

@michaeldorner
Last active July 11, 2023 13:04
Show Gist options
  • Save michaeldorner/7ac0e3627068b32d43f0e1afde48b329 to your computer and use it in GitHub Desktop.
Save michaeldorner/7ac0e3627068b32d43f0e1afde48b329 to your computer and use it in GitHub Desktop.
# pylint: disable=locally-disabled, multiple-statements, line-too-long, missing-module-docstring, no-member, missing-class-docstring, missing-function-docstring
import argparse
from pathlib import Path
import hashlib
import bz2
import logging
from datetime import datetime
import orjson
from tqdm.auto import tqdm
logging.basicConfig(filename=f'hamster_{datetime.now()}.log', encoding='utf-8', level=logging.INFO)
def hide(a_string: str):
if a_string:
return hashlib.md5(a_string.encode('utf-8')).hexdigest()
return None
def load_data(file_path: Path):
with open(file_path, 'rb') as file_handle:
byte_data = bz2.decompress(file_handle.read())
return orjson.loads(byte_data)
def store(data, file_path: Path):
(file_path.parent).mkdir(parents=True, exist_ok=True)
byte_data = orjson.dumps(data)
byte_data = bz2.compress(byte_data)
with open(file_path, 'wb') as file_handle:
file_handle.write(byte_data)
def remove_fields(data, fields_to_remove):
if isinstance(data, dict):
return {k: remove_fields(v, fields_to_remove) for k, v in data.items() if k not in fields_to_remove}
if isinstance(data, list):
return [remove_fields(i, fields_to_remove) for i in data]
return data
def anonymize_user_fields(data):
if isinstance(data, dict):
if 'login' in data: # is a user dict
login = data['login']
return hide(login)
else:
return {k: anonymize_user_fields(v) for k, v in data.items()}
if isinstance(data, list):
return [anonymize_user_fields(i) for i in data]
return data
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='desc')
parser.add_argument('data_dir', type=Path, help='The directory for all data')
parser.add_argument('--out_file', type=Path, help='The output file; should end with `.json.bz2`', default=Path('./pulls.json.bz2'))
args = parser.parse_args()
pulls = []
for pull_file in tqdm(list(args.data_dir.glob('repos/*/*/pulls.json.bz2'))):
pull_file_path = Path(pull_file)
for full_pull in load_data(pull_file_path):
org = pull_file_path.parts[-3]
repo = pull_file_path.parts[-2]
pull = {field: full_pull[field] for field in ('id', 'number', 'state', 'created_at', 'closed_at', 'merged_at', 'user')}
pull['org'] = hide(org)
pull['repo'] = hide(repo)
pull = anonymize_user_fields(pull)
timeline_path = pull_file_path.parent / f'timelines/{pull["number"]}.json.bz2'
try:
time_line_items = [item for item in load_data(timeline_path) if item['event'] != 'committed'] # we exclude commit events since the user data is not mapped to the GitHub datascheme
except FileNotFoundError:
logging.error('%s seems to be missing', timeline_path)
time_line_items = []
time_line_items = remove_fields(time_line_items, ('performed_via_github_app', 'label', 'reactions', 'commit_id', 'labels', 'repository', 'assignee', 'assignees', 'milestone', 'diff_hunk', 'path'))
time_line_items = anonymize_user_fields(time_line_items)
pull['timeline'] = time_line_items
pulls += [pull]
print(f'Write {len(pulls)} pulls to {args.out_file}...')
store(pulls, args.out_file)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment