Skip to content

Instantly share code, notes, and snippets.

@Mic92
Created October 22, 2023 05:57
Show Gist options
  • Save Mic92/4b2288769c28bc84d534f5804ac326c3 to your computer and use it in GitHub Desktop.
Save Mic92/4b2288769c28bc84d534f5804ac326c3 to your computer and use it in GitHub Desktop.
Scrape all nixpkgs pull requests
#!/usr/bin/env python3
import http.client
import json
import os
import time
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Any
QUERY = """
query {
repository(owner: "NixOS", name: "nixpkgs") {
pullRequests(first: 20, after: $endCursor) {
totalCount
pageInfo {
hasNextPage
endCursor
}
nodes {
title
author {
login
}
assignees(first: 100) {
edges {
node {
id
login
}
}
}
authorAssociation
comments(first: 100) {
edges {
node {
id
author {
login
}
body
}
}
}
commits(first: 100) {
edges {
node {
id
commit {
oid
messageHeadline
messageBody
committer {
user {
login
}
}
}
}
}
}
number
createdAt
editor {
login
}
files(first: 100) {
edges {
node {
additions
deletions
path
}
}
}
labels(first: 100) {
edges {
node {
id
description
}
}
}
latestReviews(first: 100) {
edges {
node {
id
publishedAt
updatedAt
body
author {
login
}
}
}
}
activeLockReason
additions
deletions
changedFiles
checksUrl
headRefName
headRefOid
baseRefName
baseRefOid
body
isDraft
locked
maintainerCanModify
merged
mergeable
mergedAt
mergedBy {
login
}
closed
closedAt
url
reviewRequests(first: 100) {
nodes {
requestedReviewer {
... on User {
name
login
}
}
}
}
}
}
}
rateLimit {
limit
cost
remaining
resetAt
}
}
"""
class HttpResponse:
def __init__(self, raw: http.client.HTTPResponse) -> None:
self.raw = raw
def json(self) -> Any:
return json.load(self.raw)
def save(self, path: str) -> None:
with open(path, "wb") as f:
f.write(self.raw.read())
def headers(self) -> http.client.HTTPMessage:
return self.raw.headers
class GithubClientError(Exception):
code: int
reason: str
url: str
body: str
class GithubClient:
def __init__(self, api_token: str | None) -> None:
self.api_token = api_token
def _request(
self, path: str, method: str, data: dict[str, Any] | None = None
) -> Any:
url = urllib.parse.urljoin("https://api.github.com/", path)
headers = {"Content-Type": "application/json"}
if self.api_token:
headers["Authorization"] = f"token {self.api_token}"
body = None
if data:
body = json.dumps(data).encode("ascii")
req = urllib.request.Request(url, headers=headers, method=method, data=body)
try:
resp = urllib.request.urlopen(req)
except urllib.request.HTTPError as e:
resp_body = ""
try:
resp_body = e.fp.read().decode("utf-8", "replace")
except Exception:
pass
raise GithubClientError(e.code, e.reason, url, resp_body) from e
return HttpResponse(resp)
def get(self, path: str) -> Any:
return self._request(path, "GET")
def post(self, path: str, data: dict[str, str]) -> Any:
return self._request(path, "POST", data)
def put(self, path: str) -> Any:
return self._request(path, "PUT")
def graphql(self, query: str) -> HttpResponse:
return self.post("/graphql", data=dict(query=query))
def main() -> None:
token = os.environ.get("GITHUB_TOKEN")
if not token:
raise RuntimeError("GITHUB_TOKEN not set")
cursor_state = Path("cursor-state")
cursor = None
if cursor_state.exists():
cursor = cursor_state.read_text()
print(f"Resuming from cursor: {cursor}")
github = GithubClient(token)
number = 0
while True:
query = QUERY
if cursor:
query = query.replace("$endCursor", f'"{cursor}"')
else:
query = query.replace("$endCursor", "null")
resp = github.graphql(query)
if int(resp.headers()["X-RateLimit-Remaining"]) < 100:
reset = int(resp.headers()["X-RateLimit-Reset"])
print(f"Sleeping for {reset - time.time()}s")
time.sleep(reset - time.time())
data: dict[str, Any] = resp.json()
if "errors" in data:
raise RuntimeError(f"Expected data from graphql api, got: {data}")
data = data["data"]
prs = data["repository"]["pullRequests"]["nodes"]
if not prs:
break
number += len(prs)
with open("prs.jsonl", "a") as f:
for pr in prs:
f.write(json.dumps(pr) + "\n")
if number % 100 == 0:
print(f"Downloaded {number} PRs")
page_info = data["repository"]["pullRequests"]["pageInfo"]
cursor = page_info["endCursor"]
cursor_state.write_text(cursor)
if not page_info["hasNextPage"]:
break
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment