Skip to content

Instantly share code, notes, and snippets.

@Borda
Last active November 2, 2020 23:53
Show Gist options
  • Save Borda/67189c42b49ce6f0880452e550484cae to your computer and use it in GitHub Desktop.
Save Borda/67189c42b49ce6f0880452e550484cae to your computer and use it in GitHub Desktop.
Make a simple repository statistic about user contributions.
"""
Make a simple repository statistic about user contributions.
## Resources
- https://developer.github.com/v3/pulls/
- http://zetcode.com/python/requests/
- https://developer.github.com/v3/#authentication
**Requirements:**
```
pandas
tabulate
tqdm
```
**Deprecated** in favour of https://borda.github.io/pyRepoStats
"""
import os
import re
import datetime
import warnings
import requests
import logging
import json
import multiprocessing as mproc
from argparse import ArgumentParser
from functools import partial
import pandas as pd
import tqdm
from tabulate import tabulate
URL_GITHUB_API = 'https://api.github.com/repos'
MIN_CONTRIBUTIONS = 2
NB_JOBS = 10
API_LIMIT_REACHED = False
def get_arguments():
parser = ArgumentParser('Collect simple repository stats')
parser.add_argument('-o', '--gh_owner', type=str, required=False, default='PyTorchLightning',
help='GitHub repository owner (organisation/user)')
parser.add_argument('-r', '--gh_repo', type=str, required=False, default='pytorch-lightning',
help='GitHub repository name under selected organisation/user')
parser.add_argument('-t', '--gh_token', type=str, required=False, default=None,
help='Personal GH token needed for higher API request limit')
args = vars(parser.parse_args())
logging.info('Parsed arguments: %s', args)
return args
def fetch_all_issues(gh_owner: str, gh_repo: str, auth_header: dict) -> list:
"""Fetch all issues from a given repo using listing per pages."""
items, min_idx, page = [], float('inf'), 1
# get items
pbar = tqdm.tqdm(desc='Requesting issue/PR infos')
while min_idx > 1:
req = requests.get(
f"{URL_GITHUB_API}/{gh_owner}/{gh_repo}/issues"
f"?state=all&page={page}&per_page=100",
headers=auth_header)
if req.status_code == 403:
exit('Request failed, probably your limit is gone...')
items += json.loads(req.content)
if page == 1:
min_idx = items[0]['number']
pbar.reset(total=min_idx)
pbar.update(min_idx - items[-1]['number'])
min_idx = items[-1]['number']
page += 1
pbar.close()
return items
def _request_comments(idx_comments_url: (int, str), auth_header: dict) -> (int, list):
idx, comments_url = idx_comments_url
if API_LIMIT_REACHED:
return idx, None
# https://api.github.com/repos/PyTorchLightning/pytorch-lightning/issues/37/comments
req = requests.get(comments_url, headers=auth_header)
if req.status_code == 403:
return idx, None
comments = json.loads(req.content)
return idx, comments
def _fetch_details(issues: list, func, in_key: str, out_key: str, auth_header: dict) -> list:
"""Pull all exiting details to particular issues."""
global API_LIMIT_REACHED
queue = [issue[in_key] for issue in issues]
# filter existing results so we do not need to asks again
filter_queue = []
for i, val in enumerate(queue):
if issues[i].get(out_key) is None:
filter_queue.append((i, val))
# if there is nothing to queue, go ahead
if not filter_queue:
logging.info(f"The fetching queue for '{out_key}', let's continue to next stage...")
return issues
# prepare the pool
pool = mproc.Pool(NB_JOBS)
_req = partial(func, auth_header=auth_header)
# process remaining part of the queue
for i, res in tqdm.tqdm(pool.imap(_req, filter_queue), desc=f"Fetching issues' {out_key}"):
if res is None:
warnings.warn('Request failed, probably your limit is gone...')
API_LIMIT_REACHED = True
continue
issues[i][out_key] = res
pool.close()
pool.join()
return issues
def fetch_comments(issues: list, auth_header: dict) -> list:
"""Pull all exiting comments to particular issues."""
for issue in issues:
# the original frame has saved nb issues in the very came field, so drop it if it not the my list
if 'comments' in issue and not isinstance(issue['comments'], list):
del issue['comments']
return _fetch_details(issues, _request_comments, 'comments_url', 'comments', auth_header)
def _request_status(idx_event_url: (int, str), auth_header: dict) -> (int, list):
idx, event_url = idx_event_url
if API_LIMIT_REACHED:
return idx, None
# https://api.github.com/repos/PyTorchLightning/pytorch-lightning/issues/2154/events
req = requests.get(event_url, headers=auth_header)
if req.status_code == 403:
return idx, None
events = [c['event'] for c in json.loads(req.content)]
# https://developer.github.com/v3/pulls/#check-if-a-pull-request-has-been-merged
if 'merged' in events:
state = 'merged'
elif 'closed' in events:
state = 'closed'
else:
state = 'open'
return idx, state
def fetch_statuses(issues: list, auth_header: dict) -> list:
"""Pull all exiting statuses to particular issues."""
return _fetch_details(issues, _request_status, 'events_url', 'status', auth_header)
def compute_users_stat(issues: list) -> pd.DataFrame:
"""Aggregate issue/PR affiliations and summary counts."""
short_issues = [dict(
type='PR' if 'pull' in issue['html_url'] else 'issue',
status=issue['status'],
author=issue['user']['login'],
commenters=[com['user']['login'] for com in issue['comments']
if '[bot]' not in com['user']['login']],
) for issue in issues]
df_issues = pd.DataFrame(short_issues)
users_stat = []
for user in tqdm.tqdm(df_issues['author'].unique(), desc='Processing users'):
user_stat = {'user': user}
# parse particular user stats
for tp, df in df_issues.groupby('type'):
df_auth = df[df['author'] == user]
user_stat[f'opened {tp}s'] = len(df_auth)
user_stat[f'merged {tp}s'] = sum(df_auth['status'] == 'merged')
df_na = df[df['author'] != user]
user_stat[f'commented {tp}s'] = sum(df_na['commenters'].apply(lambda l: user in l))
users_stat.append(user_stat)
# transform to pandas table
df_users = pd.DataFrame(users_stat).set_index(['user'])
return df_users
def cyclic_load_fetch(base_file_name, issues, auth_header, funcs_kwargs):
"""For each stage check if a cache exists and if not try to fetch..."""
for i, (func, kwargs) in enumerate(funcs_kwargs):
# try to load some cache if it exists
cache_file_name = f'{base_file_name}_{i}.json'
if os.path.isfile(cache_file_name):
with open(cache_file_name, 'r') as fp:
issues = json.load(fp)
# process the remaining issues/pulls
issues = func(issues, auth_header, **kwargs)
# dump the results
with open(cache_file_name, 'w') as fp:
json.dump(issues, fp)
return issues
def main(gh_owner: str, gh_repo: str, gh_token: str = None):
"""Main entry point."""
auth_header = {'Authorization': f'token {gh_token}'} if gh_token else {}
base_file_name = f'dump-issues_{gh_owner}-{gh_repo}_{datetime.date.today()}'
cache_file_name = f'{base_file_name}.json'
if os.path.isfile(cache_file_name):
with open(cache_file_name, 'r') as fp:
issues = json.load(fp)
else:
issues = fetch_all_issues(gh_owner, gh_repo, auth_header)
issues = cyclic_load_fetch(base_file_name, issues, auth_header,
((fetch_statuses, {}),
(fetch_comments, {})))
if API_LIMIT_REACHED:
exit('The fetching was not probably completed, pls rerun...')
with open(cache_file_name, 'w') as fp:
json.dump(issues, fp)
logging.info('User statistic...')
df_users = compute_users_stat(issues)
df_users['all opened'] = df_users['opened PRs'] + df_users['opened issues']
df_users.sort_values(['all opened'], ascending=False, inplace=True)
df_users = df_users[['merged PRs', 'commented PRs', 'opened issues', 'commented issues', 'all opened']]
df_users.index = df_users.index.map(lambda u: f'[{u}](https://github.com/{u})')
print(tabulate(df_users[df_users['all opened'] >= MIN_CONTRIBUTIONS], tablefmt="pipe", headers="keys"))
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
logging.info('running...')
main(**get_arguments())
logging.info('Done :]')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment