Created
March 11, 2020 04:59
-
-
Save jborean93/1a060f4c7c17bbecd94cc1597e580b54 to your computer and use it in GitHub Desktop.
Python script to help with migrating issues and PRs from ansible/ansible to a target collection.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# PYTHON_ARGCOMPLETE_OK | |
# Copyright: (c) 2020, Jordan Borean (@jborean93) <jborean93@gmail.com> | |
# MIT License (see LICENSE or https://opensource.org/licenses/MIT) | |
""" | |
Script that can be used to copy issues and PRs from the Ansible GitHub repo to it's target collection repo. Current | |
limitations are; | |
* Original issue/PR authors are lost in migration, they will appear as your user with a comment in the post stating the | |
original author | |
* PR review comments will be missing, only comments on the PR itself (not the code) will be migrated | |
* Has a very basic patch path rewriter, probably will fall short on some examples | |
To migrate an issue run | |
./ansible_migrate.py -i 1234 -c ansible.windows --token gh_pat | |
To migrate a PR run | |
./ansible_migrate.py -p 1234 -c ansible.windows -r git@github.com:jborean93/ansible.windows.git --token gh_pat | |
""" | |
from __future__ import (absolute_import, division, print_function) | |
__metaclass__ = type | |
import argparse | |
import contextlib | |
import json | |
import os | |
import re | |
import subprocess | |
import tempfile | |
from datetime import datetime | |
try: | |
import argcomplete | |
except ImportError: | |
argcomplete = None | |
try: | |
from urllib.request import Request, urlopen | |
except ImportError: # Python 2 | |
from urllib2 import Request, urlopen | |
GITHUB_API_ROOT = 'https://api.github.com/' | |
@contextlib.contextmanager | |
def open_url(url, method=None, data=None, headers=None): | |
req = Request(url, method=method, data=data, headers=headers) | |
resp = urlopen(req) | |
try: | |
yield resp | |
finally: | |
resp.close() | |
def _gh_call(args, url, method=None, data=None, json_response=True): | |
headers = {'Authorization': 'token %s' % args.gh_token} | |
with open_url(url, method=method, data=data, headers=headers) as resp: | |
resp_data = resp.read().decode('utf-8') | |
if json_response: | |
return json.loads(resp_data) | |
else: | |
return resp_data | |
def parse_args(): | |
parser = argparse.ArgumentParser(description='Migrate a PR or Issue from ansible/ansible to the specified ' | |
'collection repo.') | |
id_group = parser.add_mutually_exclusive_group(required=True) | |
id_group.add_argument('-i', '--issue', | |
dest='gh_issue', | |
help='The ansible/ansible Issue to migrate.') | |
id_group.add_argument('-p', '--pull-request', | |
dest='gh_pr', | |
help='The ansible/ansible PR to migrate.') | |
parser.add_argument('-c', '--target-collection', | |
dest='target_collection', | |
required=True, | |
help='The target collection name under ansible-collections to migrate the PR/Issue to.') | |
parser.add_argument('-r', '--target-repo', | |
dest='target_repo', | |
help='The target repository URI to push the new branch to.') | |
parser.add_argument('-b', '--base-branch', | |
dest='base_branch', | |
default='master', | |
help='When migrating pull requests, this is the target branch of the collection repo that the ' | |
'changes will be targeted to.') | |
# To generate a new token for this argument go to Github: | |
# User Settings -> Developer settings -> Personal access tokens -> Generate new token | |
# Call it whatever you want but it needs the following scopes | |
# public_repo | |
# read:org | |
parser.add_argument('-t', '--token', | |
dest='gh_token', | |
default=os.environ.get('GITHUB_TOKEN'), | |
help='The GitHub password or personal access token to authenticate with, defaults to ' | |
'GITHUB_PASS env var.') | |
if argcomplete: | |
argcomplete.autocomplete(parser) | |
args = parser.parse_args() | |
if args.gh_pr and not args.target_repo: | |
raise ValueError("--target-repo must be set when migrating a pull request.") | |
if not args.gh_token: | |
raise ValueError("--token must be set through the CLI or with GITHUB_TOKEN env var.") | |
return args | |
def gh_get_repos_pulls(args, owner, repo, pr_id): | |
return _gh_call(args, '%srepos/%s/%s/pulls/%s' % (GITHUB_API_ROOT, owner, repo, pr_id)) | |
def gh_get_repos_issues(args, owner, repo, issue_id): | |
return _gh_call(args, '%srepos/%s/%s/issues/%s' % (GITHUB_API_ROOT, owner, repo, issue_id)) | |
def gh_post_repos_pulls(args, owner, repo, title, head, base, body): | |
data = json.dumps({ | |
'title': title, | |
'head': head, | |
'base': base, | |
'body': body, | |
'maintainer_can_modify': True, | |
}).encode('utf-8') | |
return _gh_call(args, '%srepos/%s/%s/pulls' % (GITHUB_API_ROOT, owner, repo), method='POST', data=data) | |
def gh_post_repos_issues(args, owner, repo, title, body): | |
data = json.dumps({ | |
'title': title, | |
'body': body, | |
}).encode('utf-8') | |
return _gh_call(args, '%srepos/%s/%s/issues' % (GITHUB_API_ROOT, owner, repo), method='POST', data=data) | |
def gh_post_repos_issues_comments(args, issue_info, body): | |
data = json.dumps({'body': body}).encode('utf-8') | |
return _gh_call(args, issue_info['comments_url'], method='POST', data=data) | |
def get_github_comments(args, source_info): | |
comments = [] | |
if source_info['comments'] > 0: | |
comments = [c for c in _gh_call(args, source_info['comments_url']) | |
if c.get('user', {}).get('login') != 'ansibot'] | |
return comments | |
def get_ansible_issue(args, issue_id): | |
source_info = gh_get_repos_issues(args, 'ansible', 'ansible', issue_id) | |
return { | |
'post': source_info, | |
'comments': get_github_comments(args, source_info) | |
} | |
def get_ansible_pr(args, pr_id): | |
source_info = gh_get_repos_pulls(args, 'ansible', 'ansible', pr_id) | |
return { | |
'post': source_info, | |
'comments': get_github_comments(args, source_info) | |
} | |
def format_migrated_body(post): | |
source_date = datetime.strptime(post['created_at'], '%Y-%m-%dT%H:%M:%Sz') | |
new_post = '''_From @{0} on {1}_ | |
{2}'''.format(post['user']['login'], source_date.strftime('%b %d, %Y %H:%M'), post['body']) | |
if 'number' in post: | |
new_post += '\n\n_Copied from original issue: ansible/ansible#{0}_'.format(post['number']) | |
return new_post | |
def main(): | |
args = parse_args() | |
if args.gh_issue: | |
print("Getting existing GitHub issue %s info" % args.gh_issue) | |
source_info = get_ansible_issue(args, args.gh_issue) | |
new_issue_body = format_migrated_body(source_info['post']) | |
title = source_info['post']['title'] | |
print("Creating new GitHub issue at ansible-collections/%s - %s" % (args.target_collection, title)) | |
info = gh_post_repos_issues(args, 'ansible-collections', args.target_collection, title, new_issue_body) | |
print("Created new GitHub issue at '%s'" % info['html_url']) | |
else: | |
print("Getting existing GitHub PR %s info" % args.gh_pr) | |
source_info = get_ansible_pr(args, args.gh_pr) | |
new_branch = 'ansible_migration_%s' % source_info['post']['number'] | |
title = source_info['post']['title'] | |
user_match = re.match(r'.*github\.com[/|:](\w+)/%s(\.git)?$' % re.escape(args.target_collection), | |
args.target_repo) | |
if not user_match: | |
raise ValueError("Failed to get username from target repo URL") | |
head = '%s:%s' % (user_match.group(1), new_branch) | |
base = args.base_branch | |
# Get patch for original PR and do a poor man's path replacement. | |
print("Getting PR patch and rewrite paths") | |
pr_patch = _gh_call(args, source_info['post']['patch_url'], json_response=False) | |
pr_patch = ( | |
# Use regex to handle nested module directories to a flat dir. | |
re.sub(r'lib/ansible/modules/([\w]*/?)(\w+)\.(\w)', r'plugins/modules/\2.\3', pr_patch) | |
# This is a poor man's replacement, could be better but should be ok for general cases. | |
.replace('lib/ansible/plugins/', 'plugins/') | |
.replace('test/integration/', 'tests/integration/') | |
.replace('test/units/', 'tests/units/') | |
) | |
# Checkout the target branch and try and apply the patch and push to the repo. | |
with tempfile.TemporaryDirectory() as temp_dir: | |
repo_dir = os.path.join(temp_dir, args.target_collection) | |
print("Creating new clone of git repo '%s'" % args.target_repo) | |
subprocess.call(['git', 'clone', args.target_repo, repo_dir]) | |
print("Creating new git branch '%s'" % new_branch) | |
subprocess.call(['git', 'checkout', '-b', new_branch], cwd=repo_dir) | |
patch_path = os.path.join(temp_dir, 'my.patch') | |
with open(patch_path, mode='wb') as patch_fd: | |
patch_fd.write(pr_patch.encode('utf-8')) | |
print("Applying git patch") | |
subprocess.call(['git', 'am', patch_path], cwd=repo_dir) | |
print("Pushing branch to remote") | |
subprocess.call(['git', 'push', '-u', 'origin', new_branch, '--force'], cwd=repo_dir) | |
new_pr_body = format_migrated_body(source_info['post']) | |
print("Creating new GitHub PR at ansible-collections/%s - %s" % (args.target_collection, title)) | |
info = gh_post_repos_pulls(args, 'ansible-collections', args.target_collection, title, head, base, new_pr_body) | |
print("Created new GitHub PR at '%s'" % info['html_url']) | |
for idx, comment in enumerate(source_info['comments'], 1): | |
new_comment = format_migrated_body(comment) | |
print("Copying comment %d of %d" % (idx, len(source_info['comments']))) | |
gh_post_repos_issues_comments(args, info, new_comment) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment