Skip to content

Instantly share code, notes, and snippets.

@MarshalX
Created February 22, 2023 12:52
Show Gist options
  • Save MarshalX/d3349392728aeb7a18dffd3919809a97 to your computer and use it in GitHub Desktop.
Save MarshalX/d3349392728aeb7a18dffd3919809a97 to your computer and use it in GitHub Desktop.
Fetch public email addresses of all stargazers of public repositories of the GitHub user by username
# Was written in Python 3.11
# 1 dep: pip install aiohttp
# required env vars: GITHUB_PAT, GITHUB_USERNAME
# due to strict API limits it uses simple cache file to save state
# probably you need many runs of the script to fetch all emails
import asyncio
import os
import re
from typing import Optional, Coroutine, Tuple
import aiohttp
GITHUB_PAT = os.environ['GITHUB_PAT']
USERNAME = os.environ['GITHUB_USERNAME']
GITHUB_API_LIMIT_PER_HOUR = 5_000
COUNT_OF_RUNNING_WORKFLOW_AT_SAME_TIME = 5 # just random number
API_BASE_URL = 'https://api.github.com'
REQUEST_KWARGS = {
'headers': {
'Authorization': f'token {GITHUB_PAT}'
}
}
GATHER_BATCH_SIZE = 10
CACHE_FILENAME = 'db.csv'
OUTPUT_FILENAME = 'emails.txt'
async def send_req_until_success(coroutine: Coroutine) -> dict:
delay_sec = 5
count_of_retries = int(GITHUB_API_LIMIT_PER_HOUR / COUNT_OF_RUNNING_WORKFLOW_AT_SAME_TIME / delay_sec)
retry_number = 0
while retry_number <= count_of_retries:
if retry_number > 0:
print(f'Retry number for #{id(coroutine)} is {retry_number}')
retry_number += 1
try:
res = await coroutine
if res.status != 200:
await asyncio.sleep(delay_sec)
continue
except:
await asyncio.sleep(delay_sec)
continue
json = await res.json()
return json
raise RuntimeError('Surprise. Time is over')
async def get_last_page_number(url: str) -> int:
last_page_number = 0
async with aiohttp.request('GET', url, **REQUEST_KWARGS) as r:
pagination_data = r.headers.get('Link', '')
matches = re.findall(r'page=(\d+)>; rel="last"', pagination_data)
if matches:
last_page_number = int(matches[0])
return last_page_number
async def get_stargazers(url: str, page: int) -> list[str]:
params = {
'per_page': '100',
'page': str(page),
}
context = aiohttp.request('GET', url, params=params, **REQUEST_KWARGS)
resp = await send_req_until_success(context.__aenter__())
await context.__aexit__(None, None, None)
return [u['login'] for u in resp]
async def get_all_stargazers(owner: str, repo: str) -> list[str]:
url = f'{API_BASE_URL}/repos/{owner}/{repo}/stargazers'
res = []
last_page_number = await get_last_page_number(url)
for page in range(1, last_page_number):
res.extend(await get_stargazers(url, page))
return res
async def get_user_email(username: str) -> Optional[Tuple[str, str]]:
url = f'{API_BASE_URL}/users/{username}'
context = aiohttp.request('GET', url, **REQUEST_KWARGS)
resp = await send_req_until_success(context.__aenter__())
await context.__aexit__(None, None, None)
return username, resp.get('email')
async def get_user_repos(username: str) -> list[str]:
# this method can't fetch more than 100 repos. you need to implement pagination
url = f'{API_BASE_URL}/users/{username}/repos'
params = {
'per_page': '100',
'sort': 'updated',
}
context = aiohttp.request('GET', url, params=params, **REQUEST_KWARGS)
resp = await send_req_until_success(context.__aenter__())
await context.__aexit__(None, None, None)
return [r['name'] for r in resp]
def read_from_db() -> dict[str, str]:
db = {}
if not os.path.exists(CACHE_FILENAME):
return db
with open(CACHE_FILENAME, 'r', encoding='UTF-8') as f:
lines = f.read().split('\n')
for line in lines:
if not line:
continue
username, email = line.split('\t')
db[username] = email
return db
def write_to_db(username: str, email: str):
# could be asynced; could save array per one open
with open(CACHE_FILENAME, 'a+', encoding='UTF-8') as f:
f.write(f'{username}\t{email}\n')
def write_plain_emails(db: dict[str, str]):
with open(OUTPUT_FILENAME, 'w+', encoding='UTF-8') as f:
for _, email in db.items():
if email != 'None':
f.write(f'{email}\n')
async def main():
db = read_from_db()
repository_names = await get_user_repos(USERNAME)
tasks = [get_all_stargazers(USERNAME, repository_name) for repository_name in repository_names]
task_results = await asyncio.gather(*tasks)
stargazers = set()
for task_result in task_results:
for result in task_result:
stargazers.add(result)
print('Stargazers count:', len(stargazers))
tasks = [get_user_email(username) for username in stargazers if username not in db]
print('Not cached stargazers:', len(tasks))
for i, limit in enumerate(range(GATHER_BATCH_SIZE, len(tasks) + GATHER_BATCH_SIZE, GATHER_BATCH_SIZE)):
print(f'[{i}] Fetch emails from {limit - GATHER_BATCH_SIZE} to {limit}/{len(tasks)}')
task_results = await asyncio.gather(*tasks[limit - GATHER_BATCH_SIZE: limit])
for task_result in task_results:
username, email = task_result
write_to_db(username, email)
write_plain_emails(read_from_db())
if __name__ == '__main__':
asyncio.run(main())
@MarshalX
Copy link
Author

This script doesn't make sense because there is GrahpQL interface: https://docs.github.com/en/graphql/overview/explorer

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment