Last active
April 4, 2017 16:14
-
-
Save nkmathew/43aec97134a204a5228d to your computer and use it in GitHub Desktop.
Script for cloning all projects and gists
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Date: 9th August 2015 | |
Author: nkmathew | |
A script for cloning all projects and gists from my Github account when I move to a | |
new environment | |
USAGE: | |
Run githubclone.py --help for the help message | |
CHANGELOG: | |
+ [Thursday] Feb 11, 2016 | |
- Use argparser | |
- Ability to clone only gists or repos | |
- Ability to clone with ssh using the '-ssh' flag | |
- Now uses the name of the largest file in the gist as the gist foldername instead | |
of the first encountered file(in alphabetical order) | |
- Renames gist foldernames in case of any existing folders with same name | |
+ [Tuesday] Apr 04, 2017 | |
- Add option for limiting the number of commits to clone | |
""" | |
import json | |
import os | |
import subprocess | |
import sys | |
import time | |
import urllib.request | |
import colorama as color | |
import argparse | |
import collections | |
from pprint import pprint | |
REPOS_API_URL = 'https://api.github.com/users/{0}/repos' | |
GISTS_API_URL = 'https://api.github.com/users/{0}/gists' | |
def brighten(msg): | |
""" | |
Make text brighter in terminal | |
""" | |
return color.Style.BRIGHT + msg + color.Style.NORMAL | |
def print_y(msg): | |
""" | |
Print information message in yellow | |
""" | |
print(brighten(color.Style.BRIGHT + color.Fore.YELLOW + msg + color.Fore.WHITE)) | |
def print_r(msg): | |
""" | |
Print error message in red | |
""" | |
print(brighten(color.Style.BRIGHT + color.Fore.RED + msg + color.Fore.WHITE)) | |
def download_json(url): | |
""" | |
Fetches json from the passed url | |
""" | |
with urllib.request.urlopen(url) as response: | |
return response.read().decode('utf8') | |
def get_gists(username, use_ssh=False): | |
""" | |
Extracts the names of the files and the urls of the gists only from the json | |
string received: | |
json --> [{pull_url, repo_name}, {pull_url, repo_name}, ...] | |
The name of the first file found(where the gist has severall) will be | |
used as the folder name | |
""" | |
url = GISTS_API_URL.format(username) | |
gists_json = download_json(url) | |
gists = json.loads(gists_json) | |
gist_list = [] | |
for gist in gists: | |
gist_info = {} | |
pull_url = gist['git_pull_url'] | |
if use_ssh: | |
pull_url = pull_url.split('/')[-1] | |
pull_url = 'git@github.com:' + pull_url | |
gist_info['pull_url'] = pull_url | |
for key in gist: | |
gist_name = '' | |
largest_size = 0 | |
if key == 'files': | |
for fname in gist[key]: | |
size = gist[key][fname]['size'] | |
if size > largest_size: | |
largest_size = size | |
gist_name = fname | |
if gist_name == '': | |
# Shouldn't be possible, but handle it anyway | |
gist_name = 'no-name-gist' | |
gist_info['repo_name'] = gist_name | |
break | |
gist_list += [gist_info] | |
return gist_list | |
def get_repos(username, use_ssh=False): | |
""" | |
Extracts clone urls for the users repos and stores them in a list | |
received: | |
json --> [{pull_url, repo_name}, {pull_url, repo_name}, ...] | |
""" | |
url = REPOS_API_URL.format(username) | |
repos_json = download_json(url) | |
repos = json.loads(repos_json) | |
repo_list = [] | |
for repo in repos: | |
clone_url = repo['clone_url'] | |
if use_ssh: | |
clone_url = repo['ssh_url'] | |
repo_list += [{'pull_url': clone_url, | |
'repo_name': repo['name']}] | |
return repo_list | |
def get_commit_log(folder): | |
""" | |
Returns commit log in order to detect a valid existing git folder | |
""" | |
cwd = os.getcwd() | |
os.chdir(folder) | |
ret = "" | |
try: | |
ret = subprocess.check_output( | |
['git', 'log', '-1'], stderr=subprocess.STDOUT).decode('utf8') | |
except subprocess.CalledProcessError as ex: | |
ret = ex.output.decode('utf8') | |
os.chdir(cwd) | |
return ret | |
# Checks if the folder is indeed a git repo and whether its indexes are | |
# uncorrupted | |
def proper_git_repo(folder): | |
""" | |
Determine if folder is a valid git repo | |
""" | |
log = get_commit_log(folder) | |
return 'fatal' not in log | |
def create_directory(dirname): | |
""" | |
Create a folder with the name is none already exists | |
""" | |
if dirname: | |
try: | |
if os.path.isfile(dirname): | |
print('-- {0} is a file'.format(dirname)) | |
elif not os.path.exists(dirname): | |
os.mkdir(dirname) | |
except: | |
print( | |
'Error occurred when creating folder name {0}'.format(dirname)) | |
exit(0) | |
def clone(folder, username, what, use_ssh, limit=-1): | |
""" | |
Does the actual cloning from Github | |
""" | |
repo_list = [] | |
if what == 'gists': | |
print('\n~~[ Cloning gists to folder {0} ]~~'.format(folder)) | |
repo_list = get_gists(username, use_ssh) | |
else: | |
print('\n~~[ Cloning repos to folder {0} ]~~'.format(folder)) | |
repo_list = get_repos(username, use_ssh) | |
cwd = os.getcwd() | |
os.chdir(folder) | |
gist_name_count = collections.defaultdict(int) | |
for repo in repo_list: | |
print('') | |
repo_name = repo['repo_name'] | |
repo_path = os.path.abspath(repo['repo_name']) | |
pull_url = repo['pull_url'] | |
# Check if the repos is corrupt and delete it if so | |
if os.path.exists(repo_path): | |
if not proper_git_repo(repo_path): | |
print_r( | |
"Repo `{0}' is corrupt, deleting and recloning...".format(repo_name)) | |
subprocess.call(['rm', '-rf', repo_path]) | |
if what == 'gists': | |
# Find non clashing folder name for the soon-to-be-cloned gist | |
count = gist_name_count[repo_name.lower()] | |
name = repo_name | |
while os.path.exists(name): | |
name = '%s~%d' % (repo_name, count) | |
count = count + 1 | |
print('Count: %d Name: %s' % (count, name)) | |
if count > 0: | |
repo_name = name | |
gist_name_count[repo_name.lower()] = count | |
if not os.path.exists(repo_name): | |
clone_args = ['git', 'clone', '--recursive', pull_url, repo_name] | |
if limit != -1: | |
clone_args += ['--depth={}'.format(limit)] | |
subprocess.call(clone_args) | |
os.chdir(cwd) | |
return len(repo_list) | |
def run(options=None): | |
""" | |
Main runner code. Clone the repos and into their appropriate folders and returns | |
the number of repos and gists cloned | |
""" | |
opts = parse_options(options) | |
username = opts.username | |
limit = opts.commit_limit | |
gist_count = repo_count = 0 | |
if not opts.no_repos: | |
repo_folder = os.path.abspath(opts.repo_folder) | |
create_directory(repo_folder) | |
repo_count = clone(repo_folder, username, 'repos', opts.use_ssh, limit) | |
if not opts.no_gists: | |
gist_folder = os.path.abspath(opts.gist_folder) | |
create_directory(gist_folder) | |
gist_count = clone(gist_folder, username, 'gists', opts.use_ssh, limit) | |
return gist_count, repo_count | |
def desc_elapsed_time(start_time): | |
""" | |
Describes elapsed time from upto now in English | |
""" | |
elapsed = int(time.time() - start_time) | |
hours = elapsed // 3600 | |
elapsed = elapsed - hours * 3600 | |
hours, mins, secs = (hours, elapsed // 60, elapsed % 60) | |
msg = 'Elapsed time: ' | |
if hours: | |
msg += '{0} Hours {1} Minute(s) {2} seconds'.format(hours, mins, secs) | |
elif mins: | |
msg += '{0} Minute(s) {1} seconds'.format(mins, secs) | |
else: | |
msg += '{0} seconds'.format(secs) | |
return msg | |
def create_args_parser(): | |
""" Return command line parser """ | |
parser = argparse.ArgumentParser( | |
description="A script for cloning repos from someone's Github account", | |
prog='githubclone') | |
parser.add_argument('username', help='Github username', type=str, nargs='?', | |
default='nkmathew') | |
parser.add_argument('-gist', '-g', dest='gist_folder', help='Gist folder name', | |
type=str, default='gists') | |
parser.add_argument('-repo', '-r', dest='repo_folder', help='Repo folder name', | |
type=str, default='repos') | |
parser.add_argument('-no-gists', '-ng', help='Do not clone gists', | |
action='store_true') | |
parser.add_argument('-no-repos', '-nr', help='Do not clone repos', | |
action='store_true') | |
parser.add_argument('-ssh', help='Clone with ssh', dest='use_ssh', | |
action='store_true') | |
parser.add_argument('-limit', '-l', help='Commit limit', dest='commit_limit', | |
type=int, default=-1) | |
return parser | |
def parse_options(arguments=None): | |
""" Reads command-line arguments | |
>>> parse_options('nkmathew -r=nkmathew-repos -g=nkmathew-gists') | |
Namespace(gist_folder='gists', | |
no_gists=True, | |
no_repos=False, | |
repo_folder='repos', | |
username='nkmathew') | |
""" | |
if arguments is None: | |
arguments = sys.argv[1:] | |
if isinstance(arguments, str): | |
arguments = arguments.split() | |
if isinstance(arguments, argparse.Namespace): | |
return arguments | |
parser = create_args_parser() | |
args = parser.parse_args(arguments) | |
return args | |
def main(): | |
""" | |
Entry point | |
""" | |
opts = parse_options() | |
color.init() | |
start = time.time() | |
ngists, nrepos = run(opts) | |
print_y('\nCloned {0} gists and {1} repos...'.format(ngists, nrepos)) | |
print_y(desc_elapsed_time(start)) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment