git filter-branch magic using Python
#! /usr/bin/env python | |
import glob | |
import os | |
import shutil | |
import re | |
from collections import namedtuple | |
import subprocess | |
from subprocess import PIPE | |
Row = namedtuple('row', ['size_kb', 'pack_kb', 'sha', 'location']) | |
def split_row(x, obj_file_mapper): | |
row = re.split(' *', x) | |
sha = row[0] | |
# typ = row[1] | |
size = int(row[2])/1024 | |
pack = int(row[3])/1024 | |
# offset = row[4] | |
# the rest of the info. is for objects that aren't deltified | |
location = obj_file_mapper[sha] | |
return Row(size, pack, sha, location) | |
def get_obj_location_mapping(): | |
p = subprocess.Popen(['git', 'rev-list', '--all', '--objects'], | |
stdout=PIPE) | |
output = p.communicate()[0].decode('ascii').split('\n') | |
# get all hashes that map to something, potentially files | |
obj_map = filter(lambda x: len(x.strip().split(' ')) > 1, output) | |
obj_map = map(lambda x: x.split(' ', 1), obj_map) | |
obj_map = dict(obj_map) | |
return obj_map | |
def get_largest_n(n=25, sortby='size'): | |
# you can use size or packed for sort | |
p1 = subprocess.Popen('git verify-pack -v .git/objects/pack/pack-*.idx', | |
shell=True, stdout=PIPE) | |
p2 = subprocess.Popen(['grep', '-v', 'chain'], stdin=p1.stdout, | |
stdout=PIPE) | |
field = {'size': 3, 'packed': 4}[sortby] | |
p3 = subprocess.Popen(['sort', '-k{}nr'.format(field)], stdin=p2.stdout, | |
stdout=PIPE) | |
p4 = subprocess.Popen(['head', '-n', str(n)], stdin=p3.stdout, stdout=PIPE) | |
file_output = p4.communicate()[0].decode('ascii').split('\n')[:-1] | |
obj_file_mapper = get_obj_location_mapping() | |
def row_splitter(x): | |
return split_row(x, obj_file_mapper) | |
*objects, = map(row_splitter, file_output) | |
return objects | |
def filter_branch(loc): | |
# removes object at location from all commits and tags | |
subcommand = "'git rm --cached --ignore-unmatch {}'".format(loc) | |
with open("output.tmp", "a") as fout: | |
# don't use pipe or it will deadlock when the buffer fills up | |
p = subprocess.Popen(' '.join(['git', 'filter-branch', '-f', | |
'--index-filter', subcommand, | |
'--tag-name-filter', 'cat', '--', | |
'--all']), | |
stdout=fout, stderr=fout, shell=True) | |
# result = p.communicate() # so buffer doesn't fill up | |
p.wait() | |
return p | |
def print_objects(objects): | |
fmt = "{sha} {size_kb:>8.0f} {pack_kb:>8.0f} {loc}" | |
print(" " * 37 + "sha size_kb pack_kb location") | |
for row in objects: | |
print(fmt.format(sha=row.sha, size_kb=int(row.size_kb), | |
pack_kb=(row.pack_kb), loc=row.location)) | |
def get_size(): | |
p = subprocess.Popen(["git", "count-objects", "-v"], stdout=PIPE) | |
output = p.communicate()[0].decode('ascii') | |
pack_size = re.search("(?<=size-pack: )\d+", output).group() | |
size = int(pack_size) / 1024 | |
return size | |
def ignore_missing(func, path, exc_info): | |
if exc_info[0] is FileNotFoundError: | |
pass | |
else: | |
raise exc_info | |
def git_ferocious(): | |
p1 = subprocess.Popen(" ".join(["git", "remote", "rm", "origin", "||", | |
"true"]), | |
stdout=PIPE, stdin=PIPE, shell=True) | |
p1.wait() | |
# git branch -D in || true | |
# cur_dir = os.path.abspath(os.path.dirname(__file__)) | |
try: | |
os.chdir(".git") | |
shutil.rmtree("refs/remotes/", onerror=ignore_missing) | |
shutil.rmtree("refs/original/", onerror=ignore_missing) | |
files = glob.glob("*_HEAD") | |
for f in files: | |
os.unlink(f) | |
shutil.rmtree("logs/", onerror=ignore_missing) | |
cmd = ("git " | |
"-c gc.reflogExpire=0 " | |
"-c gc.reflogExpireUnreachable=0 " | |
"-c gc.rerereresolved=0 " | |
"-c gc.rerereunresolved=0 " | |
"-c gc.pruneExpire=now " | |
"gc \"$@\"") | |
p4 = subprocess.Popen(cmd, shell=True, stdout=PIPE, stdin=PIPE) | |
p4.wait() | |
except Exception as exc: | |
raise exc | |
finally: | |
os.chdir('..') | |
def remove_backup(): | |
cmd = """ | |
git for-each-ref --format='%(refname)' refs/original | \ | |
while read ref | |
do | |
git update-ref -d "$ref" | |
done | |
""" | |
p = subprocess.Popen(cmd, shell=True, stdout=PIPE, stdin=PIPE) | |
p.wait() | |
return p | |
if __name__ == "__main__": | |
size = get_size() | |
print("The size of the repository before repacking " | |
"is {:5.2f} MB".format(size)) | |
# go ahead and pack everything up first | |
p = subprocess.Popen(['git', 'repack', '-ad']) | |
p.wait() | |
# this filtering strategy is dumb unless you know | |
# you've got a lot of cruft | |
# be smarter about what's in here if you want. | |
# filter on types, use a diff-tree, or list them by hand | |
objects = get_largest_n(25) | |
print_objects(objects) | |
to_filter = [] | |
for obj in objects: | |
remove = input('Remove {} (y/n) [y] ? '.format(obj.location)) | |
if not remove or remove == 'y': | |
to_filter.append(obj.location) | |
pause = input("About to run filter-branch. Last chance to backup.") | |
size = get_size() | |
print("The original size of the repository is {:5.2f} MB".format(size)) | |
for loc in to_filter: | |
print("Filtering {}".format(loc)) | |
filter_branch(loc) | |
# clear up the refs/original backups | |
remove_backup() | |
# # clean the reflog | |
p1 = subprocess.Popen(["git", "reflog", "expire", "--expire=0", "--all"], | |
stdout=PIPE, stdin=PIPE) | |
p1.wait() | |
# # repack | |
p2 = subprocess.Popen(["git", "repack", "-ad"], stdout=PIPE, stdin=PIPE) | |
p2.wait() | |
# # aggressively garbage collect | |
git_ferocious() | |
size = get_size() | |
print("The new size of the repository is {:5.2f} MB".format(size)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment