|
#!/usr/bin/env python |
|
|
|
""" |
|
Jist |
|
==== |
|
Create private gists with directories. |
|
MIT license. |
|
""" |
|
|
|
import json |
|
import logging |
|
import optparse |
|
import os |
|
import subprocess |
|
import sys |
|
import urllib2 |
|
|
|
|
|
logger = logging.getLogger('jist') |
|
run_logger = logging.getLogger('jist.run') |
|
|
|
def run(*args): |
|
run_logger.debug(' '.join(args)) |
|
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
stdout, stderr = p.communicate() |
|
exit_code = p.wait() |
|
if exit_code != 0: |
|
logger.error(stderr.strip() or stdout.strip()) |
|
sys.exit(1) |
|
return stdout.strip() |
|
|
|
def command(highlight=False): |
|
def decorator(fn): |
|
fn._is_command = True |
|
fn._is_highlighted = highlight |
|
return fn |
|
return decorator |
|
|
|
class Gist(object): |
|
user_agent = 'jist/0.1' |
|
|
|
def __init__(self, username=None, key=None, quiet=False, separator='___'): |
|
self.username = username or self.read_config_value('user') |
|
self.key = key or self.read_config_value('token') |
|
self.quiet = quiet |
|
self.separator = separator |
|
|
|
def print_message(self, msg): |
|
if not self.quiet: |
|
print(msg) |
|
|
|
def read_config_value(self, key): |
|
return run('git', 'config', '--global', 'jist.%s' % key) |
|
|
|
def get_api_headers(self): |
|
return { |
|
'X-Github-Username': self.username, |
|
'Content-Type': 'application/json', |
|
'Authorization': 'token %s' % self.key} |
|
|
|
def guess_dest(self, gist_id): |
|
try: |
|
api_detail = self.get_gist_details(gist_id) |
|
except: |
|
logger.exception('Unable to fetch gist details from GitHub API.') |
|
return |
|
|
|
if api_detail['description']: |
|
first_word = api_detail['description'].split()[0].lower() |
|
logger.debug('Gist description "%s"', first_word) |
|
return first_word |
|
|
|
logger.debug('Gist does not contain description, using filename.') |
|
if api_detail['files']: |
|
return max([ |
|
filename.lower().split('.')[0] |
|
for filename in api_detail['files']]) |
|
|
|
def get_gist_details(self, gist_id): |
|
url = 'https://api.github.com/gists/%s' % gist_id |
|
request = urllib2.Request(url, headers={'User-Agent': self.user_agent}) |
|
try: |
|
fh = urllib2.urlopen(request) |
|
except urllib2.HTTPError as exc: |
|
logger.debug('Received %s requesting %s', exc.code, url) |
|
raise |
|
else: |
|
return json.loads(fh.read()) |
|
|
|
def _clone(self, gist_id, dest): |
|
remote = 'git@gist.github.com:%s.git' % gist_id |
|
run('git', 'clone', remote, dest) |
|
|
|
@command() |
|
def pull(self): |
|
""" |
|
Update local checkout with the latest changes from GitHub. Execute |
|
this command from within the repository's root directory. |
|
jist pull |
|
""" |
|
run('git', 'pull', 'origin', 'master') |
|
|
|
@command(highlight=True) |
|
def clone(self, gist_id, dest=None): |
|
""" |
|
Clone a private gist with the given ID. Any flattened directories |
|
will be expanded. |
|
jist clone [gist id] [optional: path/for/code] |
|
""" |
|
if dest is None: |
|
dest = self.guess_dest(gist_id) or 'gist-%s' % gist_id[:6] |
|
|
|
if os.path.exists(dest): |
|
logger.info('%s already exists, not cloning.' % dest) |
|
else: |
|
self._clone(gist_id, dest) |
|
|
|
os.chdir(dest) |
|
self.pull() |
|
self.expand() |
|
|
|
@command() |
|
def expand(self, path=None): |
|
""" |
|
Expand flattened files into directories. For example `foo___bar.js` |
|
would be expanded to `foo/bar.js`. If no path is specified, command |
|
will run in the current working directory. |
|
jist expand [optional: path/to/expand] |
|
""" |
|
if path is not None: |
|
os.chdir(path) |
|
|
|
cwd = os.getcwd() |
|
logger.debug('Expanding: %s', cwd) |
|
for src in os.listdir(cwd): |
|
if os.path.isdir(src): |
|
logger.debug('Skipping %s, directory', src) |
|
continue |
|
parts = src.split(self.separator) |
|
path, filename = '/'.join(parts[:-1]), parts[-1] |
|
if path: |
|
if not os.path.exists(path): |
|
logger.debug('Making new directory %s', path) |
|
os.makedirs(path) |
|
elif os.path.isfile(path): |
|
raise Exception( |
|
'Directory %s and file of same name found.', path) |
|
|
|
dest = os.path.join(path, filename) |
|
logger.info('Renaming %s -> %s' % (src, dest)) |
|
os.rename(src, dest) |
|
|
|
@command(highlight=True) |
|
def push(self, path=None, force_push=True): |
|
""" |
|
Commit and push changes to GitHub. Any directories will be flattened |
|
before commiting and pushing, then re-expanded afterwards. |
|
jist push [optional: path/to/code] |
|
""" |
|
if path is not None: |
|
os.chdir(path) |
|
|
|
self.flatten() |
|
try: |
|
self.commit() |
|
run('git', 'push', '-f', 'origin', 'master') |
|
except: |
|
self.expand() |
|
raise |
|
self.expand() |
|
remote = run('git', 'config', '--get', 'remote.origin.url') |
|
self.print_message('Pushed changes to %s' % remote) |
|
|
|
@command() |
|
def flatten(self, path=None): |
|
""" |
|
Flatten all directories, renaming files so the directories can be |
|
reconstructed using `expand`. |
|
jist flatten [optional: path/to/files] |
|
""" |
|
if path is not None: |
|
os.chdir(path) |
|
|
|
cwd = os.getcwd() |
|
logger.debug('Flattening files in %s' % cwd) |
|
|
|
for dirpath, dirnames, filenames in os.walk('.'): |
|
if dirpath.startswith('./.git'): |
|
continue |
|
|
|
for base_filename in filenames: |
|
file_path = os.path.join(dirpath, base_filename) |
|
filename = os.path.relpath(file_path, '.') |
|
flattened = filename.replace('/', self.separator) |
|
if filename != flattened: |
|
logger.info('Renaming %s -> %s' % (filename, flattened)) |
|
os.rename(filename, flattened) |
|
|
|
@command() |
|
def commit(self): |
|
""" |
|
Commit changes to current working directory. |
|
jist commit |
|
""" |
|
run('git', 'add', '.') |
|
run('git', 'commit', '-a', '-m', 'updates') |
|
|
|
@command(highlight=True) |
|
def init(self, path=None, description=None): |
|
""" |
|
Initialize a Git repo and create a new Gist. |
|
jist init [optional: path/to/files] [optional: gist description] |
|
""" |
|
if path is not None: |
|
os.chdir(path) |
|
|
|
# Initialize a git repo in the specified directory. |
|
cwd = os.getcwd() |
|
logger.info('Initializing git repository in %s' % cwd) |
|
run('git', 'init') |
|
|
|
# Create a new Gist using GitHub's API. |
|
gist_id = self.create(description=description) |
|
|
|
# Add the gist URL as a remote. |
|
remote_url = 'git@gist.github.com:%s.git' % gist_id |
|
run('git', 'remote', 'add', 'origin', remote_url) |
|
|
|
self.push() |
|
self.print_message('https://gist.github.com/%s/%s' % ( |
|
self.username, gist_id)) |
|
|
|
def create(self, description='just a jist'): |
|
data = { |
|
'description': description, |
|
'public': False, |
|
'files': {'jist': {'content': 'jist 0.1 placeholder file.'}}} |
|
request = urllib2.Request( |
|
'https://api.github.com/gists', |
|
data=json.dumps(data), |
|
headers=self.get_api_headers()) |
|
fh = urllib2.urlopen(request) |
|
response = json.loads(fh.read()) |
|
return response['id'] |
|
|
|
@command() |
|
def help(self): |
|
""" |
|
Print available commands. |
|
""" |
|
commands = [] |
|
for key, value in Gist.__dict__.items(): |
|
if getattr(value, '_is_command', False): |
|
commands.append( |
|
(0 if value._is_highlighted else 1, key, value.__doc__)) |
|
|
|
for color_code, command, docstring in sorted(commands): |
|
print('\x1b[%s;34m%s\x1b[0m' % (color_code ^ 1, command)) |
|
print(docstring) |
|
|
|
|
|
def get_option_parser(): |
|
parser = optparse.OptionParser( |
|
usage='Usage: %prog [options] command param1 [param2 [param3]]') |
|
parser.add_option('-v', '--verbose', action='store_true', dest='verbose') |
|
parser.add_option('-d', '--debug', action='store_true', dest='debug') |
|
parser.add_option('-q', '--quiet', action='store_true', dest='quiet') |
|
parser.add_option('-k', '--key', dest='key') |
|
parser.add_option('-u', '--username', dest='username') |
|
parser.add_option('-s', '--separator', dest='separator', default='___') |
|
return parser |
|
|
|
if __name__ == '__main__': |
|
parser = get_option_parser() |
|
options, args = parser.parse_args() |
|
|
|
if not options.quiet: |
|
handler = logging.StreamHandler() |
|
handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT)) |
|
logger.addHandler(handler) |
|
if options.verbose: |
|
logger.setLevel(logging.INFO) |
|
if options.debug: |
|
logger.setLevel(logging.DEBUG) |
|
|
|
if len(args) == 0: |
|
sys.stderr.write('Error, missing command argument.\n') |
|
parser.print_usage(sys.stderr) |
|
sys.exit(1) |
|
else: |
|
command = args[0] |
|
|
|
gist = Gist( |
|
username=options.username, |
|
key=options.key, |
|
quiet=options.quiet, |
|
separator=options.separator or '___') |
|
|
|
method = getattr(gist, command, None) |
|
if not callable(method): |
|
sys.stderr.write('Error, unrecognized command "%s"' % command) |
|
sys.stderr.flush() |
|
sys.exit(1) |
|
|
|
method(*args[1:]) |