Skip to content

Instantly share code, notes, and snippets.

@simonwagner
Last active April 19, 2023 16:09
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save simonwagner/f9b6f66d06eb8a355e2d to your computer and use it in GitHub Desktop.
Save simonwagner/f9b6f66d06eb8a355e2d to your computer and use it in GitHub Desktop.
Git graph renderer
import sys
import os
import fnmatch
from itertools import chain
from inspect import getframeinfo, stack
import math
from collections import OrderedDict
import pygit2 as git
import cairocffi as cairo
def hex2rgb(hexvalue):
hexparts = (hexvalue[i:i+2] for i in xrange(0, len(hexvalue), 2))
hexintvalues = (int(value, 16) for value in hexparts)
hexfloatvalues = (value/255.0 for value in hexintvalues)
return tuple(hexfloatvalues)
COLORS = [hex2rgb(s) for s in
[
"1f77b4",
"aec7e8",
"ff7f0e",
"ffbb78",
"2ca02c",
"98df8a",
"d62728",
"ff9896",
"9467bd",
"c5b0d5",
"8c564b",
"c49c94",
"e377c2",
"f7b6d2",
"7f7f7f",
"c7c7c7",
"bcbd22",
"dbdb8d",
"17becf",
"9edae5",
]
]
def linecolor(line):
idvalue = int(line.start_commit.id.hex, 16)
color = COLORS[idvalue % len(COLORS)]
return color
def main():
if len(sys.argv) >= 2:
cur_path = sys.argv[1]
else:
cur_path = os.getcwd()
repo_path = git.discover_repository(cur_path)
repo = git.Repository(repo_path)
graph = generate_graph(repo)
rendering = render_graph(graph)
write_rendering(rendering, "graph.pdf")
def write_rendering(surface, fname):
#render into real surface with appropriate size
extents = surface.ink_extents()
out_surface = cairo.PDFSurface(fname, extents[2], extents[3])
out_ctx = cairo.Context(out_surface)
out_ctx.set_source_surface(surface, -extents[0], -extents[1])
out_ctx.paint()
def render_graph(graph):
CELL_SIZE = 12
COMMIT_CIRCLE_RADIUS = (CELL_SIZE - 6)/2
lines, order, commits = graph
#calculate the size
max_order = max(order.values())
width = len(lines)*CELL_SIZE
height = (max_order + 1)*CELL_SIZE
#surface = cairo.SVGSurface("graph.svg", width, height)
surface = cairo.RecordingSurface(cairo.CONTENT_COLOR_ALPHA, None)
ctx = cairo.Context(surface)
previous_commit = None
previous_commit_col = None
for i, commit in enumerate(commits):
commit_order = order[commit.id]
lines_around = lines_at_commit(commit, lines, order)
for idx, line in enumerate(lines_around):
if commit.id in set(commit.id for commit in line.commits):
line_with_commit_index = idx
break
commit_line = lines_around[line_with_commit_index]
commit_x = line_with_commit_index * CELL_SIZE
commit_y = i * CELL_SIZE
commit_cx = commit_x + CELL_SIZE/2
commit_cy = commit_y + CELL_SIZE/2
commit_lc = linecolor(commit_line)
#draw all other lines
splits = 0
skipped_lines = 0
for j, line in enumerate(lines_around):
offset = splits * -CELL_SIZE
if j == line_with_commit_index:
splits_before_commit = splits
continue
x = (j - skipped_lines) * CELL_SIZE
y = i * CELL_SIZE
cx = x + CELL_SIZE/2
cy = y + CELL_SIZE/2
simple_line = line.topo_length(order) == 1 and len(line) == 0
lc = linecolor(line)
#draw line
if line.merge is not None and line.merge.id == commit.id and not simple_line:
#draw merge line
ctx.new_path()
ctx.set_source_rgb(*lc)
ctx.move_to(cx + offset, y + CELL_SIZE)
ctx.line_to(commit_cx, commit_cy)
ctx.stroke()
if line.split is not None and line.split.id == commit.id and not simple_line:
#draw split line
ctx.new_path()
ctx.set_source_rgb(*lc)
ctx.move_to(cx, y)
ctx.line_to(commit_cx, commit_cy)
ctx.stroke()
splits += 1
if (line.merge is None or line.merge.id != commit.id) and \
(line.split is None or line.split.id != commit.id):
ctx.new_path()
ctx.set_source_rgb(*lc)
ctx.move_to(cx, y)
ctx.line_to(cx + offset, y + CELL_SIZE)
ctx.stroke()
if simple_line and line.split.id == commit.id:
#for all lines that have no commits and a
#topological distance of 1 (which
# means the previous commit is our child)
#simply draw a line connecting both commits
#and not a special line
#assert j == len(lines_around) - 1 #we should always be the last line
previous_commit_cx = previous_commit_col * CELL_SIZE + CELL_SIZE/2
previous_commit_bottom = y
ctx.new_path()
ctx.set_source_rgb(*linecolor(previous_commit_line))
ctx.move_to(previous_commit_cx, previous_commit_bottom)
ctx.line_to(commit_cx, commit_cy)
ctx.stroke()
skipped_lines += 1
commit_offset = splits_before_commit * -CELL_SIZE
if commit_line.end.id == commit.id:
if len(commit_line) > 1:
ctx.new_path()
ctx.set_source_rgb(*commit_lc)
ctx.move_to(commit_cx, commit_cy)
ctx.line_to(commit_cx, commit_cy - CELL_SIZE/2)
ctx.stroke()
if commit_line.split is not None:
ctx.new_path()
ctx.set_source_rgb(*commit_lc)
ctx.move_to(commit_cx, commit_cy)
ctx.line_to(commit_cx, commit_cy + CELL_SIZE/2)
ctx.stroke()
if commit_line.start_commit.id == commit.id:
if len(commit_line) > 1:
ctx.new_path()
ctx.set_source_rgb(*commit_lc)
ctx.move_to(commit_cx, commit_cy)
ctx.line_to(commit_cx, commit_cy + CELL_SIZE/2)
ctx.stroke()
if commit_line.merge is not None:
ctx.new_path()
ctx.set_source_rgb(*commit_lc)
ctx.move_to(commit_cx, commit_cy)
ctx.line_to(commit_cx, commit_cy - CELL_SIZE/2)
ctx.stroke()
if not (commit_line.start_commit.id == commit.id or commit_line.end.id == commit.id):
ctx.set_source_rgb(*commit_lc)
ctx.move_to(commit_cx, commit_cy - CELL_SIZE/2)
ctx.line_to(commit_cx, commit_cy)
ctx.line_to(commit_cx + commit_offset, commit_cy + CELL_SIZE/2)
ctx.stroke()
#draw circle
ctx.new_path()
ctx.arc(commit_cx, commit_cy, COMMIT_CIRCLE_RADIUS, 0, 2*math.pi)
ctx.set_source_rgb(0.0, 0.0, 0.0)
ctx.stroke_preserve()
ctx.set_source_rgb(1.0, 1.0, 1.0)
ctx.fill()
#continue with next step
previous_commit = commit
previous_commit_col = line_with_commit_index
previous_commit_line = commit_line
return surface
def line_for_commit(commit, lines):
for line in lines:
if commit.id in [c.id for c in line.commits]:
return line
assert False
def lines_at_commit(commit, lines, order):
commit_order = order[commit.id]
result = []
for line in lines:
begin = order[first_not_none(line.merge, line.start_commit).id]
stop = order[first_not_none(line.split, line.end).id]
if begin <= commit_order <= stop:
result.append(line)
return result
class GraphLine(object):
def __init__(self, commits, start, start_commit, end, merge, split):
self.start = start
self.start_commit = start_commit
self.end = end
self.merge = merge
self.split = split
self.commits = commits
def __len__(self):
return len(self.commits)
def topo_length(self, order):
begin = first_not_none(self.merge, self.start_commit)
end = first_not_none(self.split, self.end)
return order[end.id] - order[begin.id]
def __eq__(self, other):
return self._start == other._start and \
self._end == other._end and \
self._merge == other._merge and \
self._split == other._split
def __hash__(self):
return (hash(self._start) ^ hash(self._end) ^ hash(self._merge) ^ hash(self._split))
class GraphLineBuilder(object):
def __init__(self):
self._start = None
self._end = None
self._start_commit = None
self._commits = []
self._next = None
self._merge = None
self._split = None
def start(self, start, commit):
self._start = start
self._start_commit = commit
def get_start_order(self):
return self._start
def get_start(self):
return self._start_commit
def end(self, end):
self._end = end
self._next = None
def next(self, next):
self._next = next
def merge(self, merge):
self._merge = merge
def split(self, split):
self._split = split
def is_next(self, possible_next):
return self._next.id == possible_next.id
def get_next(self):
return self._next
def current(self):
if len(self._commits) > 0:
return self._commits[-1]
else:
return None
def is_last_commit(self, commit):
if len(self._commits) == 0:
return False
else:
return self._commits[-1] == commit
def add_commit(self, order, commit):
self._commits.append(commit)
def is_finished(self):
return self._end is not None
def build(self):
requirements = ("_start", "_end", "_start_commit")
missing_requirements = [requirement for requirement in requirements
if getattr(self, requirement) is None]
if len(missing_requirements) > 0:
raise ValueError("missing values for: %s in %r" % (str.join(",", missing_requirements), self))
return GraphLine(
commits=self._commits,
start=self._start,
start_commit=self._start_commit,
end=self._end,
merge=self._merge,
split=self._split
)
def __repr__(self):
if self.is_finished():
return "<GLB %d: [%r - %r]>" % (id(self), hex_or_none(self._start_commit), hex_or_none(self._end))
else:
return "<GLB %d: [%r - %r >" % (id(self), hex_or_none(self._start_commit), hex_or_none(self.current()))
class GraphBuilder(object):
def __init__(self):
self._lines = []
self._commit_lookup = {}
self._commit_order = {}
def add_line(self, line):
self._lines.append(line)
def add_commit_to_line(self, commit, line):
self._commit_lookup[commit] = line
def add_commit(commit, order):
self._commit_order[commit] = order
def log(message):
#print message
pass
class logged_list(list):
def __init__(self, name, *args, **kwargs):
list.__init__(self, *args, **kwargs)
self._name = name
def append(self, *args):
caller = getframeinfo(stack()[1][0])
log("At l. %d: appending to %s: %r" % (caller.lineno, self._name, args[0]))
list.append(self, *args)
def generate_graph(repo, ref_globs=("refs/heads/*","refs/remotes/*")):
all_refs = repo.listall_references()
ref_names = list(chain.from_iterable(
fnmatch.filter(all_refs, glob) for glob in ref_globs
))
#ref_names = ['refs/heads/master', 'refs/heads/c', 'refs/heads/a', 'refs/heads/b']
print ref_names
refs = (repo.lookup_reference(ref_name) for ref_name in ref_names)
resolved_refs = set(ref.peel(git.GIT_OBJ_COMMIT) for ref in refs)
walker = repo.walk(repo.head.target, git.GIT_SORT_TOPOLOGICAL | git.GIT_SORT_TIME)
for ref in resolved_refs:
walker.push(ref.id)
graph_builder = GraphBuilder()
commits = tuple(walker)
topo_order = tuple(commit.id for commit in commits)
topo_order_lookup = OrderedDict((id, i) for i, id in enumerate(topo_order))
next = repo[topo_order[0]]
line = GraphLineBuilder()
line.next(next)
line.start(start=0, commit=next)
current_lines = [line]
finished_lines = []
for order, oid in enumerate(topo_order):
commit = repo[oid]
log("At %d, %s - %s" % (order, oid.hex, summary(commit)))
log("\tCurrent line is %d" % id(line))
log("\tQueue: %r" % current_lines)
log("")
possible_lines = [line for line in current_lines if line.is_next(commit)]
if len(possible_lines) == 0:
line = GraphLineBuilder()
line.start(commit=commit, start=order)
line.next(first_or_none(commit.parents))
current_lines.append(line)
log("\tNo line found which expects this commit, created new line %r" % line)
elif len(possible_lines) == 1:
line = possible_lines[0]
log("\tCommit is on line: %r" % line)
else:
#use the line with the lowest topological order
log("\tCommit is possibly on lines %r" % possible_lines)
line = min(possible_lines, key=lambda line: line.get_start_order())
log("\tSelected line: %r" % line)
log("\tAdding commit %s to line %r" % (commit.id.hex, line))
line.add_commit(order, commit)
assert line is not None
#now, what should be the next commit under the assumption that the line
#continues?
parents = commit.parents
log("\tParents: %r" % [parent.id.hex for parent in parents])
if len(parents) > 0:
#the first parent should be our next commit on the line
line.next(parents[0])
log("\tNext expected commit on line %d: %s" % (id(line), line.get_next().id))
#if we have more than one parent, we need to prepare the
#new lines for them
for parent_on_other_line in parents[1:]:
new_line = GraphLineBuilder()
new_line.start(
commit=parent_on_other_line,
start=topo_order_lookup[parent_on_other_line.id]
)
#we are the point where the new line was merged with the current line
new_line.merge(commit)
new_line.next(parent_on_other_line)
#queue up the new line
current_lines.append(new_line)
log("\tNew line %d added: starting at %s" % (id(new_line), parent_on_other_line.id.hex))
assert new_line.get_next() is not None
assert line.get_next() is not None
else:
#we have reached the end of this path in the tree
#we won't merge with anyone, so finish up this line
#and go to the next
line.end(commit)
line.next(None)
#we are a leaf, so we didn't split from any commit
#e.g. we are the initial commit
line.split(None)
#finish this line
finished_lines.append(line)
current_lines.remove(line)
log("\tEnd of line %r (no parents anymore)" % line)
assert line.get_next() is not None or len(commit.parents) == 0, "Line has not next commit, but this commit has parents"
#now check if this line joins with any other line
#this is the case if the current commit on the line is
#already the next commit of another line
assert line.current() is not None
joined_lines = []
for other_line in current_lines:
if other_line is line:
continue #skip current line
#All lines that have no commits anymore should already have been removed
assert other_line.get_next() is not None
if other_line.get_next().id == line.current().id:
#so this line joins with other_line
#therefore we end this line and set the
#next commit as the split point where we join
#the lines
#who joins whom?
joining_line = other_line
continuing_line = line
log("\tEnd of line %r (joining %d)" % (joining_line, id(continuing_line)))
#assert joining_line.current() is not None
#assert joining_line.get_next() is not None
if joining_line.current() is not None:
#this is a line with at least 1 commit
joining_line.split(joining_line.get_next())
joining_line.end(joining_line.current())
else:
#this line does not have any commits
joining_line.split(joining_line.get_start())
joining_line.end(joining_line.get_next())
joined_lines.append(joining_line)
for joined_line in joined_lines:
#remove lines here, otherwise the iteration above breaks :(
current_lines.remove(joined_line)
finished_lines.append(joined_line)
if line.is_finished():
#if we are finished, create a new line in the next round
line = None
log("-----------------------------------------------------------------------------")
assert len(current_lines) == 0
finished_lines = [line.build() for line in finished_lines]
finished_lines = sorted(finished_lines, cmp=line_order_cmp(topo_order_lookup))
for i, line in enumerate(finished_lines):
print "%d:" % (i)
print "\tmerge: ", summary_with_id_and_order(line.merge, topo_order_lookup)
print "\tstart: ", summary_with_id_and_order(line.start_commit, topo_order_lookup)
print "\tend: ", summary_with_id_and_order(line.end, topo_order_lookup)
print "\tsplit: ", summary_with_id_and_order(line.split, topo_order_lookup)
print "\tcommits:"
for commit in line.commits:
print "\t\t%d: %s" % (topo_order_lookup[commit.id], summary_with_id(commit))
return (finished_lines, topo_order_lookup, commits)
def summary(commit):
if commit is None:
return None
try:
message = commit.message
except LookupError:
return "<unknown encoding>"
lines = commit.message.splitlines()
return lines[0] if len(lines) > 0 else ""
def summary_with_id(commit):
if commit is None:
return None
return "%s - %s" % (commit.id.hex, summary(commit))
def summary_with_id_and_order(commit, order_lookup):
if commit is None:
return None
order = order_lookup[commit.id]
return "%03d: %s" % (order, summary_with_id(commit))
def first_or_none(l):
if len(l) > 0:
return l[0]
else:
return None
def hex_or_none(commit):
if commit is None:
return None
else:
return commit.id.hex
def first_not_none(*args):
for arg in args:
if arg is not None:
return arg
return None
def line_order_cmp(order_lookup):
def _line_order_cmp(a, b):
a_order = order_lookup[a.merge.id] if a.merge is not None else order_lookup[a.start_commit.id] - 1
b_order = order_lookup[b.merge.id] if b.merge is not None else order_lookup[b.start_commit.id] - 1
return cmp(a_order, b_order)
return _line_order_cmp
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment