Skip to content

Instantly share code, notes, and snippets.

@thedrow
Created July 5, 2020 13:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save thedrow/b9ce13b1c93816f3279adceb9dde564f to your computer and use it in GitHub Desktop.
Save thedrow/b9ce13b1c93816f3279adceb9dde564f to your computer and use it in GitHub Desktop.
import csv
import math
import gidgethub.httpx
import httpx
import trio
import asyncclick as click
STARGAZAERS = """query StargazersCount($owner: String!, $name: String!, $after: String = null)
{
repository(owner: $owner, name: $name) {
stargazers(first: 100, after: $after) {
edges {
node {
email
}
}
pageInfo {
endCursor
hasNextPage
}
}
}
}"""
async def extract_emails(emails, sender, has_next_page):
for email in emails:
await sender.send(email)
if not has_next_page:
await sender.send(None)
async def write_csv(receiver):
with open('/home/thedrow/Documents/Projects/github-marketing-tools/emails.csv', 'w+', newline='') as csvfile:
writer = csv.writer(csvfile)
async for email in receiver:
if not email:
return
writer.writerow([email])
csvfile.flush()
async def fetch_stargazers(nursery, sender, gh, owner, name, after=None):
response = await gh.graphql(STARGAZAERS, owner=owner, name=name, after=after)
emails = {node["node"]["email"].strip() for node in response["repository"]["stargazers"]["edges"]}
emails.remove("")
page_info = response["repository"]["stargazers"]["pageInfo"]
has_next_page = page_info["hasNextPage"]
end_cursor = page_info["endCursor"]
nursery.start_soon(extract_emails, emails, sender, has_next_page)
if has_next_page:
nursery.start_soon(fetch_stargazers, nursery, sender, gh, owner, name, end_cursor)
@click.command()
@click.option("--token", default=None, help="Github API token.")
@click.argument("repository")
async def stargazers(repository, token=None):
owner, name = repository.split('/')
sender, receiver = trio.open_memory_channel(math.inf)
async with httpx.AsyncClient() as client:
gh = gidgethub.httpx.GitHubAPI(client, "github-marketing-tools",
oauth_token=token)
async with sender, receiver:
async with trio.open_nursery() as nursery:
nursery.start_soon(fetch_stargazers, nursery, sender, gh, owner, name)
nursery.start_soon(write_csv, receiver)
if __name__ == '__main__':
stargazers()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment