Last active
March 10, 2024 16:14
-
-
Save marcelkornblum/2e3ab375e823f520cde3b754ffe6f34c to your computer and use it in GitHub Desktop.
Simple Python script to rip everything from BitBucket across to Github with minimal interaction
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# heavily inspired by https://gist.github.com/rbellamy/3c5033ba605a090824e8 | |
# gets everything from bitbucket and brings it across to GH, adding LFS where necessary for file size | |
# then archives everything brought over | |
# | |
# runs on Python 3; does clone --mirror and push --mirror, cleaning up after itself | |
# | |
# you need git-lfs installed on the local system | |
# also make sure you've got git credential caching set up https://help.github.com/articles/caching-your-github-password-in-git/ | |
import json | |
import requests | |
import subprocess | |
# your particulars | |
bitbucket_user = '' | |
bitbucket_pass = '' | |
bitbucket_org = '' | |
github_user = '' | |
github_access_token = '' | |
github_org = '' | |
def get_bitbucket_repos_page(url): | |
r = requests.get(url, auth=(bitbucket_user, bitbucket_pass)) | |
if r.status_code == 200: | |
return r.json() | |
def get_bitbucket_repos(): | |
repos = [] | |
api_url = f"https://api.bitbucket.org/2.0/repositories/{bitbucket_org}" | |
response = get_bitbucket_repos_page(api_url) | |
values = response['values'] | |
while 'next' in response: | |
print(f"getting {response['next']}") | |
response = get_bitbucket_repos_page(response['next']) | |
values = values + response['values'] | |
for repo in values: | |
for clonelink in repo['links']['clone']: | |
if clonelink['name'] == 'https': | |
clone_url = clonelink['href'] | |
break | |
repos.append((repo['name'], clone_url)) | |
return repos | |
def create_github_name(bitbucket_name): | |
parts = bitbucket_name.split('_') | |
if parts[0].isdigit(): | |
job_no = parts.pop(0) | |
parts.append(job_no) | |
return '_'.join(parts).lower().replace(" ", "") | |
def get_github_origin(repo_name): | |
return f"https://github.com/{github_org}/{repo_name}.git" | |
def create_github_repo(repo_name): | |
api_url = f"https://api.github.com/orgs/{github_org}/repos" | |
r = requests.post(api_url, data=json.dumps({ | |
"name": repo_name, | |
"private": True, | |
'has_issues': False, | |
'has_projects': False, | |
'has_wiki': False, | |
'allow_merge_commit': False, | |
'allow_rebase_merge': False, | |
}), headers={ | |
'User-Agent': 'me@marcelkornblum.com', | |
'Content-Type': 'application/json' | |
}, auth=(github_user, github_access_token)) | |
print(r.url) | |
if r.status_code >= 200 and r.status_code < 300: | |
return True | |
return False | |
def archive_github_repo(repo_name): | |
api_url = f"https://api.github.com/repos/{github_org}/{repo_name}" | |
r = requests.patch(api_url, data=json.dumps({ | |
"name": repo_name, | |
"archived": True, | |
}), headers={ | |
'User-Agent': 'me@marcelkornblum.com', | |
'Content-Type': 'application/json' | |
}, auth=(github_user, github_access_token)) | |
print(r.url) | |
if r.status_code >= 200 and r.status_code < 300: | |
return True | |
return False | |
def clone(bitbucket_origin, path): | |
process = subprocess.Popen( | |
["git", "clone", "--mirror", bitbucket_origin, path], stdout=subprocess.PIPE) | |
process.communicate()[0] | |
def lfs(path): | |
conf = [] | |
process = subprocess.Popen( | |
["git", "lfs", "migrate", "info", "--above=100MB"], stdout=subprocess.PIPE, cwd=path) | |
for line in iter(process.stdout.readline, b''): | |
parts = line.split() | |
if len(parts) > 0: | |
conf.append(parts[0]) | |
process.communicate() | |
while len(conf) > 0: | |
process = subprocess.Popen( | |
["git", "lfs", "migrate", "import", f'--include="{conf.pop()}"'], stdout=subprocess.PIPE, cwd=path) | |
def push(github_origin, path): | |
process = subprocess.Popen( | |
["git", "push", "--mirror", github_origin], stdout=subprocess.PIPE, cwd=path) | |
process.communicate()[0] | |
def delete(path): | |
process = subprocess.Popen( | |
["rm", "-rf", path], stdout=subprocess.PIPE) | |
process.communicate()[0] | |
def migrate(bb_repo_name, bb_repo_clone_url): | |
repo_clone_url = ''.join([bb_repo_clone_url.split( | |
'@')[0], ':', bitbucket_pass, '@', bb_repo_clone_url.split('@')[1]]) | |
gh_repo = create_github_name(bb_repo_name) | |
print(f"{bb_repo_name} converted to {gh_repo}") | |
if not create_github_repo(gh_repo): | |
print("failed to create GH repo ") | |
return | |
print("new GH repo created") | |
local_path = f"/tmp/{gh_repo}" | |
delete(local_path) | |
clone(repo_clone_url, local_path) | |
print(f"cloned to {local_path}") | |
lfs(local_path) | |
push(get_github_origin(gh_repo), local_path) | |
print(f"pushed to {get_github_origin(gh_repo)}") | |
archive_github_repo(gh_repo) | |
print("Archived GH repo") | |
delete(local_path) | |
print("deleted local folder") | |
all_repos = get_bitbucket_repos() | |
for repo in all_repos: | |
migrate(*repo) |
@sudarshann That was a quirk of my specific use case. I was bringing a lot of old repos over and they weren't meant to be current, so I wanted to archive them as part of my automation.
Absolutely not necessary in general :D
@marcelkornblum Thanks. Your script was very much helpful.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@marcelkornblum why do you need to archive all repo in github after migration?
archive_github_repo(gh_repo)
print("Archived GH repo")