Skip to content

Instantly share code, notes, and snippets.

@shkumagai
Created December 12, 2019 02:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shkumagai/883d2f06ec1a910ff1a78fc99932a509 to your computer and use it in GitHub Desktop.
Save shkumagai/883d2f06ec1a910ff1a78fc99932a509 to your computer and use it in GitHub Desktop.
import re
import sys
from datetime import datetime
from subprocess import run, PIPE
from urllib.parse import urlparse
from jinja2 import Template
from github import Github
re_refs = re.compile(r"^refs/pull/(?P<pr_number>\d+)/head$")
re_remote = re.compile(r"^\w+://")
class DummyPullRequest:
def to_checklist_item(self):
return "- [ ] #??? THIS IS DUMMY PULL REQUEST"
def to_html_link(self):
return "https://github.com/DUMMY/DUMMY/pulls/?"
class PullRequest:
def __init__(self, pr):
self.pr = pr
def to_checklist_item(self):
return f"- [ ] #{self.pr.number} {self.pr.title}" + self.mention()
def to_html_link(self):
return self.pr.html_url
def mention(self):
return f"@{self.pr.user.login if self.pr.user else None}"
def host_and_repository_and_scheme():
remote = git("config", "remote.origin.url")[:-1]
match = re_remote.search(remote)
if not match:
remote = f"ssh://{remote.replace(':', '/')}"
remote_url = urlparse(remote)
repository = re.sub(r"^/", "", remote_url.path).replace(".git", "")
host = remote_url.netloc if not remote_url.netloc.endswith("github.com") else None
return (host, repository, 'https' if remote_url.scheme == 'http' else 'https')
def git(*cmd):
command = ['git', *cmd]
print(f"Executing `{' '.join(command)}`")
response_of_command = run(command, stdout=PIPE)
if response_of_command.returncode > 0:
raise RuntimeError(f"Executing `{' '.join(command)}` failed.")
return response_of_command.stdout.decode("utf-8")
def git_config(key):
host, _, _ = host_and_repository_and_scheme()
plain_key = ".".join(["pr-release", key])
if host:
host_aware_key = ".".join(["pr-release", host, key])
value = None
try:
value = git("config", "-f", ".git-pr-release", plain_key)[:-1]
except Exception as e:
if host:
value = git("config", host_aware_key)[:-1]
return value
DEFAULT_PR_TEMPLATE = """\
Release {{ timestamp }}
{% for pr in pull_requests %}
{{ pr.to_checklist_item() }}
{% endfor %}
"""
def build_pr_title_and_body(merged_prs):
merged_pull_requests = [PullRequest(mpr) for mpr in merged_prs]
template = Template(DEFAULT_PR_TEMPLATE)
content = template.render(timestamp=datetime.now(), pull_requests=merged_pull_requests)
return content.split("\n", 1)
def _extract_merged_feature_head_sha1(production_branch, staging_branch):
response_of_git_log = git(
"log", "--merges", "--pretty=format:%P",
f"origin/{production_branch}..origin/{staging_branch}",
)
return (line.split()[1] for line in response_of_git_log.split("\n"))
def _extract_merged_pull_request_number(production_branch: str, staging_branch: str):
response_of_git_ls_remote = git("ls-remote", "origin", "refs/pull/*/head")
merged_pr_numbers = []
for _line in response_of_git_ls_remote.split("\n"):
if not len(_line) > 0:
continue
sha1, ref = _line.split()
if sha1 in _extract_merged_feature_head_sha1(production_branch, staging_branch):
match = re_refs.search(ref)
if match:
pr_number = int(match.group(1))
response_of_git_merge_base = git("merge-base", sha1, f"origin/{production_branch}")
if response_of_git_merge_base[:-1] == sha1:
print(f"#{pr_number} ({sha1}) is already merged into {production_branch}")
else:
merged_pr_numbers.append(pr_number)
else:
print("Bad pull request head ref format: {ref}")
return merged_pr_numbers
def _filter_merged_prs(merged_pr_numbers: list, repo):
merged_prs = []
for pr_number in merged_pr_numbers:
pr = repo.get_pull(pr_number)
print(f"To be released: #{pr_number} {pr.title}")
merged_prs.append(pr)
return merged_prs
def main(args) -> None:
_, repository, _ = host_and_repository_and_scheme()
production_branch = args.production or git_config("branch.production")
staging_branch = args.staging or git_config("branch.staging")
token = git_config("token")
print(f"Repository: {repository}")
print(f"Production branch: {production_branch}")
print(f"Staging branch: {staging_branch}")
client = Github(token)
repo = client.get_repo(repository)
# git("remote", "update", "origin")
merged_pr_numbers = _extract_merged_pull_request_number(
production_branch,
staging_branch,
)
if len(merged_pr_numbers) == 0:
print("No pull requests to be released")
sys.exit(1)
merged_prs = _filter_merged_prs(merged_pr_numbers, repo)
print("Searching for existing release pull request...")
[found_release_pr] = [
pr for pr in repo.get_pulls()
if pr.head.ref == staging_branch and pr.base.ref == production_branch
]
changed_files = found_release_pr.changed_files if found_release_pr else 0
pr_title, pr_body = build_pr_title_and_body(merged_prs)
if args.dry_run:
print("Dry run. Not update PR")
print("-" * 89)
print(f"title: {pr_title}")
print("body:")
print(pr_body)
print(f"change_files: {changed_files}")
sys.exit(0)
if not found_release_pr:
created_pr = repo.create_pull(
title=pr_title,
body=pr_body,
base=production_branch,
head=staging_branch,
)
if not created_pr:
print("Failed to create a new pull request")
sys.exit(2)
release_pr = created_pr
else:
release_pr = found_release_pr
print("Pull request body:")
print(pr_body)
release_pr.edit(title=pr_title, body=pr_body)
print(f"{'Updated' if found_release_pr else 'Created'} pull request: {release_pr.html_url}")
if __name__ == "__main__":
from argparse import ArgumentParser
parser = ArgumentParser()
parser.add_argument("-s", "--staging",
help="specify staging (head) branch name (default: develop)")
parser.add_argument("-p", "--production",
help="specify production (base) branch name (default: master)")
parser.add_argument("-r", "--repository",
help="specify target Github repository")
parser.add_argument("-c", "--config",
help="specify config file (default: ./.gh-release)")
parser.add_argument("-t", "--title",
help="specify pull request title (default: `Release: <TimeStamp>`)")
parser.add_argument("--no-fetch",
action="store_true",
help="Do not fetch from remote repo before determining target PRs (CI friendly)")
parser.add_argument("-n", "--dry-run",
action="store_true",
help="do not create/update a PR. Just prints to stdout")
args = parser.parse_args()
main(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment