Skip to content

Instantly share code, notes, and snippets.

@dutc
Last active August 6, 2018 05:35
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save dutc/a06e4918583586a3fa788ef3abab20a5 to your computer and use it in GitHub Desktop.
simple, slow, probably buggy `git init+add+commit` in <167 lines Python
#!/usr/bin/env python3
from hashlib import sha1
from pathlib import Path
from collections import namedtuple, defaultdict
from zlib import compress
from os import makedirs, stat
from os.path import isfile, exists
from struct import pack
from math import modf, ceil
from dateutil.parser import parse as dateutil_parse
from tzlocal import get_localzone
from datetime import datetime
from argparse import ArgumentParser
REPO_DIR = Path('.') / '.git'
OBJECTS_DIR = REPO_DIR / 'objects'
class GitObject:
hexdigest = property(lambda s: sha1(bytes(s)).hexdigest())
digest = property(lambda s: sha1(bytes(s)).digest())
path = property(lambda s: Path(s.hexdigest[:2]) / s.hexdigest[2:])
header = property(lambda s: f'{type(s).__name__.lower()} {len(s.body)}\0')
def __bytes__(self):
return self.header.encode() + self.body
def write(self):
path = OBJECTS_DIR / self.path
makedirs(path.parent, exist_ok=True)
with open(path, 'wb') as f:
f.write(compress(bytes(self)))
class Oct(namedtuple('Oct', 'value')):
def __new__(cls, value):
d = {'r': 1 << 2,
'w': 1 << 1,
'x': 1 << 0,}
return super().__new__(cls, sum(d[v] for v in value))
def __str__(self):
return oct(self.value)[2:]
class Mode(namedtuple('Mode', 'user group other')):
def __new__(cls, user='', group='', other=''):
return super().__new__(cls, Oct(user), Oct(group), Oct(other))
def __str__(self):
return f'100{self.user}{self.group}{self.other}'
class Node(namedtuple('Node', 'name digest mode')):
@classmethod
def from_disk(cls, path, digest):
return cls(path.name, digest, oct(stat(path).st_mode())[2:])
def __bytes__(self):
return f'{self.mode} {self.name}\0'.encode() + self.digest
class Author(namedtuple('Author', 'name email ts')):
def __str__(self):
return f'{self.name} <{self.email}> {round(self.ts.timestamp())} {self.ts:%z}'
class Blob(namedtuple('Blob', 'contents'), GitObject):
body = property(lambda s: s.contents)
class Tree(namedtuple('Tree', 'listing'), GitObject):
@property
def body(self):
return b''.join(bytes(e) for e in self.listing)
def insert(self, node):
self.listing.append(node)
class Commit(namedtuple('Commit', 'tree author message parent'), GitObject):
def __new__(cls, tree, author, message, parent=None):
return super().__new__(cls, tree, author, message, parent)
@property
def body(self):
return '\n'.join([
f'tree {self.tree}',
*([f'parent {self.parent}'] if self.parent is not None else ()),
f'author {self.author}',
f'committer {self.author}',
'\n',
self.message
]).encode()
class Entry(namedtuple('Entry', 'path digest')):
def __bytes__(self):
st = stat(self.path)
path = str(self.path).encode()
(ctime_ns, ctime), (mtime_ns, mtime) = modf(st.st_ctime), modf(st.st_mtime)
return b''.join([
pack('>II', round(ctime), round(ctime_ns * 10e6)),
pack('>II', round(mtime), round(mtime_ns * 10e6)),
pack('>III', st.st_dev, st.st_ino, st.st_mode),
pack('>III', st.st_uid, st.st_gid, st.st_size),
self.digest,
pack('>H', 0 << 15 | 0 << 14 | 0 << 12 | len(path) if len(path) < 0xFFF else 0xFFF),
pack(f'{ceil((len(path)) / 8) * 8}sxx', path), # pad to 8-bytes
])
class Index(namedtuple('Index', 'entries')):
VERSION = 2
def write(self):
header = b''.join([
b'DIRC',
pack('>I', self.VERSION),
pack('>I', len(self.entries)),
])
with open(REPO_DIR / 'index', 'wb') as f:
f.write(header)
for ent in sorted(self.entries, key=lambda e: e.path):
f.write(bytes(ent))
parser = ArgumentParser(description='simple, slow, probably buggy `git init+add+commit` in <167 lines Python')
parser.add_argument('-a', '--author', default='me', help='author & committer')
parser.add_argument('-e', '--email', default='me', help='author & committer email')
parser.add_argument('-m', '--message', default='first commit', help='commit message')
parser.add_argument('-p', '--parent', default=None, help='parent commit hash')
parser.add_argument('-t', '--timestamp', default=datetime.now(get_localzone()), type=dateutil_parse, help='author & commit timestamp')
parser.add_argument('paths', nargs='+', metavar='PATH', type=Path, help='paths to add & commit')
if __name__ == '__main__':
args = parser.parse_args()
paths = {p for p in args.paths if isfile(p)}
blobs, nodes, entries = [], {}, []
for p in paths:
with open(p, 'rb') as f:
b = Blob(f.read())
n = Node.from_disk(p, b.digest)
e = Entry(p, b.digest)
blobs.append(b)
nodes[p] = n
entries.append(e)
trees = {p.parent: Tree([]) for p in paths}
for p in paths:
trees[p.parent].insert(nodes[p])
for p, t in trees.items():
if p == p.parent: continue
n = nodes[p] = Node.from_disk(p, t.digest)
trees[p.parent].insert(n)
for b in blobs:
b.write()
for t in trees.values():
t.write()
author = Author(args.author, args.email, args.timestamp)
commit = Commit(trees[Path('.')].hexdigest, author, args.message, args.parent)
commit.write()
i = Index(entries)
i.write()
head, ref = REPO_DIR / 'HEAD', REPO_DIR / 'refs/heads/master'
if not exists(head):
makedirs(head.parent, exist_ok=True)
with open(head, 'w') as f:
f.write('ref: refs/heads/master')
f.write('\n')
if not exists(ref):
makedirs(ref.parent, exist_ok=True)
with open(ref, 'w') as f:
f.write(commit.hexdigest)
f.write('\n')
# no code beyond this point
.PHONY: test clean
test:
seq 1 10 > seq1
seq 10 20 > seq2
mkdir -p dir
seq 20 30 > dir/seq3
python git-init+add+commit.py seq1 seq2 dir/seq3 || (git init . ; git add seq1 seq2 dir ; git commit -am 'first commit')
git status
git log
tree .git/objects
clean:
-rm seq1 seq2 dir/seq3
-rmdir dir
-rm .git -rf
@dutc
Copy link
Author

dutc commented Jul 20, 2018

$ make clean && make test

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment