Last active
November 2, 2020 23:53
-
-
Save Borda/67189c42b49ce6f0880452e550484cae to your computer and use it in GitHub Desktop.
Make a simple repository statistic about user contributions.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Make a simple repository statistic about user contributions. | |
## Resources | |
- https://developer.github.com/v3/pulls/ | |
- http://zetcode.com/python/requests/ | |
- https://developer.github.com/v3/#authentication | |
**Requirements:** | |
``` | |
pandas | |
tabulate | |
tqdm | |
``` | |
**Deprecated** in favour of https://borda.github.io/pyRepoStats | |
""" | |
import os | |
import re | |
import datetime | |
import warnings | |
import requests | |
import logging | |
import json | |
import multiprocessing as mproc | |
from argparse import ArgumentParser | |
from functools import partial | |
import pandas as pd | |
import tqdm | |
from tabulate import tabulate | |
URL_GITHUB_API = 'https://api.github.com/repos' | |
MIN_CONTRIBUTIONS = 2 | |
NB_JOBS = 10 | |
API_LIMIT_REACHED = False | |
def get_arguments(): | |
parser = ArgumentParser('Collect simple repository stats') | |
parser.add_argument('-o', '--gh_owner', type=str, required=False, default='PyTorchLightning', | |
help='GitHub repository owner (organisation/user)') | |
parser.add_argument('-r', '--gh_repo', type=str, required=False, default='pytorch-lightning', | |
help='GitHub repository name under selected organisation/user') | |
parser.add_argument('-t', '--gh_token', type=str, required=False, default=None, | |
help='Personal GH token needed for higher API request limit') | |
args = vars(parser.parse_args()) | |
logging.info('Parsed arguments: %s', args) | |
return args | |
def fetch_all_issues(gh_owner: str, gh_repo: str, auth_header: dict) -> list: | |
"""Fetch all issues from a given repo using listing per pages.""" | |
items, min_idx, page = [], float('inf'), 1 | |
# get items | |
pbar = tqdm.tqdm(desc='Requesting issue/PR infos') | |
while min_idx > 1: | |
req = requests.get( | |
f"{URL_GITHUB_API}/{gh_owner}/{gh_repo}/issues" | |
f"?state=all&page={page}&per_page=100", | |
headers=auth_header) | |
if req.status_code == 403: | |
exit('Request failed, probably your limit is gone...') | |
items += json.loads(req.content) | |
if page == 1: | |
min_idx = items[0]['number'] | |
pbar.reset(total=min_idx) | |
pbar.update(min_idx - items[-1]['number']) | |
min_idx = items[-1]['number'] | |
page += 1 | |
pbar.close() | |
return items | |
def _request_comments(idx_comments_url: (int, str), auth_header: dict) -> (int, list): | |
idx, comments_url = idx_comments_url | |
if API_LIMIT_REACHED: | |
return idx, None | |
# https://api.github.com/repos/PyTorchLightning/pytorch-lightning/issues/37/comments | |
req = requests.get(comments_url, headers=auth_header) | |
if req.status_code == 403: | |
return idx, None | |
comments = json.loads(req.content) | |
return idx, comments | |
def _fetch_details(issues: list, func, in_key: str, out_key: str, auth_header: dict) -> list: | |
"""Pull all exiting details to particular issues.""" | |
global API_LIMIT_REACHED | |
queue = [issue[in_key] for issue in issues] | |
# filter existing results so we do not need to asks again | |
filter_queue = [] | |
for i, val in enumerate(queue): | |
if issues[i].get(out_key) is None: | |
filter_queue.append((i, val)) | |
# if there is nothing to queue, go ahead | |
if not filter_queue: | |
logging.info(f"The fetching queue for '{out_key}', let's continue to next stage...") | |
return issues | |
# prepare the pool | |
pool = mproc.Pool(NB_JOBS) | |
_req = partial(func, auth_header=auth_header) | |
# process remaining part of the queue | |
for i, res in tqdm.tqdm(pool.imap(_req, filter_queue), desc=f"Fetching issues' {out_key}"): | |
if res is None: | |
warnings.warn('Request failed, probably your limit is gone...') | |
API_LIMIT_REACHED = True | |
continue | |
issues[i][out_key] = res | |
pool.close() | |
pool.join() | |
return issues | |
def fetch_comments(issues: list, auth_header: dict) -> list: | |
"""Pull all exiting comments to particular issues.""" | |
for issue in issues: | |
# the original frame has saved nb issues in the very came field, so drop it if it not the my list | |
if 'comments' in issue and not isinstance(issue['comments'], list): | |
del issue['comments'] | |
return _fetch_details(issues, _request_comments, 'comments_url', 'comments', auth_header) | |
def _request_status(idx_event_url: (int, str), auth_header: dict) -> (int, list): | |
idx, event_url = idx_event_url | |
if API_LIMIT_REACHED: | |
return idx, None | |
# https://api.github.com/repos/PyTorchLightning/pytorch-lightning/issues/2154/events | |
req = requests.get(event_url, headers=auth_header) | |
if req.status_code == 403: | |
return idx, None | |
events = [c['event'] for c in json.loads(req.content)] | |
# https://developer.github.com/v3/pulls/#check-if-a-pull-request-has-been-merged | |
if 'merged' in events: | |
state = 'merged' | |
elif 'closed' in events: | |
state = 'closed' | |
else: | |
state = 'open' | |
return idx, state | |
def fetch_statuses(issues: list, auth_header: dict) -> list: | |
"""Pull all exiting statuses to particular issues.""" | |
return _fetch_details(issues, _request_status, 'events_url', 'status', auth_header) | |
def compute_users_stat(issues: list) -> pd.DataFrame: | |
"""Aggregate issue/PR affiliations and summary counts.""" | |
short_issues = [dict( | |
type='PR' if 'pull' in issue['html_url'] else 'issue', | |
status=issue['status'], | |
author=issue['user']['login'], | |
commenters=[com['user']['login'] for com in issue['comments'] | |
if '[bot]' not in com['user']['login']], | |
) for issue in issues] | |
df_issues = pd.DataFrame(short_issues) | |
users_stat = [] | |
for user in tqdm.tqdm(df_issues['author'].unique(), desc='Processing users'): | |
user_stat = {'user': user} | |
# parse particular user stats | |
for tp, df in df_issues.groupby('type'): | |
df_auth = df[df['author'] == user] | |
user_stat[f'opened {tp}s'] = len(df_auth) | |
user_stat[f'merged {tp}s'] = sum(df_auth['status'] == 'merged') | |
df_na = df[df['author'] != user] | |
user_stat[f'commented {tp}s'] = sum(df_na['commenters'].apply(lambda l: user in l)) | |
users_stat.append(user_stat) | |
# transform to pandas table | |
df_users = pd.DataFrame(users_stat).set_index(['user']) | |
return df_users | |
def cyclic_load_fetch(base_file_name, issues, auth_header, funcs_kwargs): | |
"""For each stage check if a cache exists and if not try to fetch...""" | |
for i, (func, kwargs) in enumerate(funcs_kwargs): | |
# try to load some cache if it exists | |
cache_file_name = f'{base_file_name}_{i}.json' | |
if os.path.isfile(cache_file_name): | |
with open(cache_file_name, 'r') as fp: | |
issues = json.load(fp) | |
# process the remaining issues/pulls | |
issues = func(issues, auth_header, **kwargs) | |
# dump the results | |
with open(cache_file_name, 'w') as fp: | |
json.dump(issues, fp) | |
return issues | |
def main(gh_owner: str, gh_repo: str, gh_token: str = None): | |
"""Main entry point.""" | |
auth_header = {'Authorization': f'token {gh_token}'} if gh_token else {} | |
base_file_name = f'dump-issues_{gh_owner}-{gh_repo}_{datetime.date.today()}' | |
cache_file_name = f'{base_file_name}.json' | |
if os.path.isfile(cache_file_name): | |
with open(cache_file_name, 'r') as fp: | |
issues = json.load(fp) | |
else: | |
issues = fetch_all_issues(gh_owner, gh_repo, auth_header) | |
issues = cyclic_load_fetch(base_file_name, issues, auth_header, | |
((fetch_statuses, {}), | |
(fetch_comments, {}))) | |
if API_LIMIT_REACHED: | |
exit('The fetching was not probably completed, pls rerun...') | |
with open(cache_file_name, 'w') as fp: | |
json.dump(issues, fp) | |
logging.info('User statistic...') | |
df_users = compute_users_stat(issues) | |
df_users['all opened'] = df_users['opened PRs'] + df_users['opened issues'] | |
df_users.sort_values(['all opened'], ascending=False, inplace=True) | |
df_users = df_users[['merged PRs', 'commented PRs', 'opened issues', 'commented issues', 'all opened']] | |
df_users.index = df_users.index.map(lambda u: f'[{u}](https://github.com/{u})') | |
print(tabulate(df_users[df_users['all opened'] >= MIN_CONTRIBUTIONS], tablefmt="pipe", headers="keys")) | |
if __name__ == '__main__': | |
logging.basicConfig(level=logging.INFO) | |
logging.info('running...') | |
main(**get_arguments()) | |
logging.info('Done :]') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment