Skip to content

Instantly share code, notes, and snippets.

@pblocz
Created May 26, 2015 22:48
Show Gist options
  • Save pblocz/fc6e75f27157bad4d881 to your computer and use it in GitHub Desktop.
Save pblocz/fc6e75f27157bad4d881 to your computer and use it in GitHub Desktop.
Script to do some housekeeping of big or sensitive files and erase from repository history
#!/usr/bin/env python
# coding=utf-8
'''
git-big-files | (c) 2015 Pablo Cabeza
license: [modified BSD](http://opensource.org/licenses/BSD-3-Clause)
Script to do some housekeeping of big or sensitive files and erase
from repository history
'''
# Standard python libraries
import os
import sys
import glob
import shlex
import logging
import argparse
import subprocess
from collections import namedtuple
# Custom installed libraries
from tabulate import tabulate # pip tabulate
from hurry.filesize import size as hsize # pip hurry.filesize
logger = logging.getLogger(__name__) # Get current app logger
class Git(object):
"Represents a git repository and some functionality"
Entry = namedtuple("Entry", ["size", "pack", "sha", "path"])
"An entry from git pack with extra info, as returned by this class methods"
@classmethod
def _build_call(cls, cmd): return ' '.join(shlex.quote(a) for a in cmd)
@classmethod
def _unquoted_call(cls, cmd, retcode=False, stdout=False, shell=True):
'''
Arguments
---------
- `cmd`: command to execute passed to Popen
- `stdout`: whether print to stdout or not
- `retcode`: return call return code
- `shell`: whether to execute through shell or not
'''
kwargs = dict(l for l in [
(('stdout', subprocess.PIPE,) if stdout is False else tuple()),
('shell', shell),
] if l)
pid = subprocess.Popen(cmd, **kwargs)
out, err = pid.communicate()
out = out.decode('utf-8') if out is not None else None
return (pid.returncode, out,) if retcode else out
@classmethod
def _call(cls, cmd, *args, **kwargs):
""
return cls._unquoted_call(cls._build_call(cmd), *args, **kwargs)
def __init__(self):
self.cwd = os.getcwd()
rt, root = self._unquoted_call(['git', 'rev-parse', '--show-toplevel'],
shell=False, retcode=True)
self.root = root.strip()
if rt != 0: raise FileNotFoundError("not a git directory")
elif self.root != self.cwd:
raise FileNotFoundError("not a git root, try from %s" % os.path.relpath(self.root))
def get_objects(self):
"List git objects from the pack with their sizes"
cmd = "%s %s" % (self._build_call(["git", "verify-pack", "-v"]),
".git/objects/pack/pack-*.idx")
out = self._unquoted_call(cmd)
raw = out.splitlines()
lines = [l for l in raw if "chain" not in l][:-2] # strip chain lines plus metadata lines
return [l.split() for l in lines]
def get_sha_dict(self):
"Get dict form files the pack as (file-sha: path)"
out = self._call(["git", "rev-list", "--all", "--objects"])
rows = [l.split(maxsplit=1) for l in out.splitlines()]
return dict(tuple(r) for r in rows if len(r) == 2)
def get_files_by_size(self, maxlimit=10):
obj = sorted(self.get_objects(), key=lambda l: int(l[2]), reverse=True)
obj = obj[:maxlimit]
sha = self.get_sha_dict()
out = []
for o in obj:
e = self.Entry(
size=hsize(int(o[2])),
pack=hsize(int(o[3])),
sha=o[0],
path=sha.get(o[0], None),
)
if e.path is not None: out.append(e)
return out
def filter_file(self, file, force=False):
files = glob.glob(file) or [file]
print(files)
for f in files:
print(f)
cmd = ["git", "filter-branch"] + (["--force"] if force else []) +\
["--prune-empty", "--index-filter",
'git rm -rf --cached --ignore-unmatch "%s"' % file,
"--tag-name-filter", "cat", "--", "--all"]
self._call(cmd, stdout=True)
def clear_untracked_history(self):
cmds = [
"git for-each-ref --format='delete %(refname)' refs/original | git update-ref --stdin",
['git', 'reflog', 'expire', '--expire=now', '--all'],
["git", "gc", "--prune=now"],
]
for cmd in cmds:
if type(cmd) == str: rt, out = self._unquoted_call(cmd, retcode=True)
else: rt, out = self._call(cmd, retcode=True)
if rt != 0: raise subprocess.CalledProcessError(rt, cmd)
def _parse_args(arguments):
parser = argparse.ArgumentParser(description='')
parser.add_argument('--version', '-v', type=bool,
help='show version of the program')
parser.add_argument('--max', '-m', type=int, nargs='?', default=10,
help="maximum number of files")
subparsers = parser.add_subparsers(dest='cmd')
subparsers.required = False
delete_parser = subparsers.add_parser('delete',
description='filter out a file from history')
delete_parser.add_argument('--force', '-f', action="store_true", default=False,
help='force backup overwrite')
delete_parser.add_argument('file', help="complete path of file to delete or glob")
clean_parser = subparsers.add_parser('clean',
description='cleans untracked files from history')
clear_parser = subparsers.add_parser('clear',
description='filter out a file from history and clean')
clear_parser.add_argument('--force', '-f', action="store_true", default=False,
help='force backup overwrite')
clear_parser.add_argument('file', help="complete path of file to clear or glob")
return parser.parse_args(arguments)
def main(arguments=None):
'''
Main function of the script, use as:
git big-files [nfiles]
git big-files delete [file]
git big-files clean
git big-files clear [file]
`argmuntes`: list of arguments to execute main. if `None` then
sys.argv will be used
**Return**: the return code as an `int`
'''
arguments = arguments or sys.argv[1:]
args = _parse_args(arguments)
# Configure logger
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(message)s")
# Process input from arguments
repo = Git()
if args.cmd == "delete": repo.filter_file(args.file, force=args.force)
elif args.cmd == 'clean': repo.clear_untracked_history()
elif args.cmd == 'clear':
repo.filter_file(args.file, force=args.force)
repo.clear_untracked_history()
else:
ob = repo.get_files_by_size(maxlimit=args.max)
print(tabulate(ob, headers=Git.Entry._fields))
return 0
if __name__ == "__main__": sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment