Last active
June 10, 2024 15:14
-
-
Save codebycarlos/6086e807e4025b2828e52579964887ef to your computer and use it in GitHub Desktop.
stats
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This script fetches and processes GitHub repository statistics. | |
- Fetches contributor statistics and pull requests from a specified GitHub repo | |
- Aggregates data by quarter and author | |
- Saves the aggregated data to a CSV file | |
- Saves raw data to a JSON file for troubleshooting | |
Functions: | |
- get_repo_stats: Fetches contributor statistics from GitHub | |
- get_pull_requests: Fetches closed pull requests from GitHub | |
- save_to_csv: Aggregates and saves data to CSV and JSON files | |
""" | |
import requests | |
import csv | |
import os | |
import json | |
from datetime import datetime | |
GITHUB_TOKEN = '' | |
OWNER = '' | |
REPO = '' | |
def get_repo_stats(owner, repo, token, raw_data_filename='stats_raw.json'): | |
if os.path.exists(raw_data_filename): | |
with open(raw_data_filename, 'r') as raw_file: | |
raw_data = json.load(raw_file) | |
if 'contributors' in raw_data: | |
print("Loaded data from stats_raw.json") | |
return raw_data['contributors'] | |
url = f'https://api.github.com/repos/{owner}/{repo}/stats/contributors' | |
headers = {'Authorization': f'token {token}'} | |
print(f"Making request to {url}") | |
response = requests.get(url, headers=headers) | |
if response.status_code == 202: | |
print("GitHub is computing the statistics. Please try again later.") | |
return None | |
elif response.status_code != 200: | |
print(f"Failed to fetch data: {response.status_code}") | |
return None | |
return response.json() | |
def get_pull_requests(owner, repo, token): | |
url = f'https://api.github.com/repos/{owner}/{repo}/pulls' | |
headers = {'Authorization': f'token {token}'} | |
params = {'state': 'closed', 'per_page': 100} | |
all_prs = [] | |
page = 1 | |
while True: | |
response = requests.get(url, headers=headers, params={**params, 'page': page}) | |
if response.status_code != 200: | |
print(f"Failed to fetch pull requests: {response.status_code}") | |
return None | |
prs = response.json() | |
if not prs: | |
break | |
all_prs.extend(prs) | |
page += 1 | |
merged_prs = [pr for pr in all_prs if pr.get('merged_at') is not None] | |
return merged_prs | |
def save_to_csv(data, prs, filename='stats.csv', raw_data_filename='stats_raw.json'): | |
if not data: | |
print("No data to save.") | |
return | |
# Save raw data to a JSON file for troubleshooting | |
with open(raw_data_filename, 'w') as raw_file: | |
json.dump({'contributors': data, 'pull_requests': prs}, raw_file, indent=4) | |
print(f"Raw data saved to {raw_data_filename}") | |
aggregated_data = {} | |
for contributor in data: | |
author = contributor['author']['login'] | |
total_commits = contributor['total'] | |
additions = sum(week['a'] for week in contributor['weeks']) | |
deletions = sum(week['d'] for week in contributor['weeks']) | |
for week in contributor['weeks']: | |
timestamp = week['w'] | |
date = datetime.fromtimestamp(timestamp) | |
quarter = (date.month - 1) // 3 + 1 | |
year = date.year | |
quarter_start_date = datetime(year, 3 * (quarter - 1) + 1, 1) | |
key = (author, quarter, year) | |
if key not in aggregated_data: | |
aggregated_data[key] = { | |
'total_commits': 0, | |
'additions': 0, | |
'deletions': 0, | |
'pr_count': 0, | |
'quarter_start_date': quarter_start_date | |
} | |
aggregated_data[key]['total_commits'] += week['c'] | |
aggregated_data[key]['additions'] += week['a'] | |
aggregated_data[key]['deletions'] += week['d'] | |
for pr in prs: | |
pr_author = pr['user']['login'] | |
pr_title = pr['title'] | |
merged_at = datetime.strptime(pr['merged_at'], '%Y-%m-%dT%H:%M:%SZ') | |
pr_quarter = (merged_at.month - 1) // 3 + 1 | |
pr_year = merged_at.year | |
pr_key = (pr_author, pr_quarter, pr_year) | |
# Log the PR title and quarter for troubleshooting | |
if pr_key in aggregated_data: | |
aggregated_data[pr_key]['pr_count'] += 1 | |
with open(filename, mode='w', newline='') as file: | |
writer = csv.writer(file) | |
writer.writerow(['Author', 'Total Commits', 'Additions', 'Deletions', | |
'Pull Requests (merged)', 'Quarter', 'Year', 'Quarter Start Date']) | |
for (author, quarter, year), stats in aggregated_data.items(): | |
if stats['total_commits'] > 0 or stats['additions'] > 0 or stats['deletions'] > 0 or stats['pr_count'] > 0: | |
writer.writerow([author, stats['total_commits'], stats['additions'], | |
stats['deletions'], stats['pr_count'], quarter, year, | |
stats['quarter_start_date'].strftime('%Y-%m-%d')]) | |
print(f"Data saved to {filename}") | |
if __name__ == "__main__": | |
stats = get_repo_stats(OWNER, REPO, GITHUB_TOKEN) | |
prs = get_pull_requests(OWNER, REPO, GITHUB_TOKEN) | |
save_to_csv(stats, prs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment