Skip to content

Instantly share code, notes, and snippets.

@jix
Created January 30, 2019 16:47
Show Gist options
  • Save jix/6de102cc83b2a81b3e61f49f1283ce33 to your computer and use it in GitHub Desktop.
Save jix/6de102cc83b2a81b3e61f49f1283ce33 to your computer and use it in GitHub Desktop.
import networkx as nx
import itertools
def transitive_closure_compression(g):
# For a given DAG return a smaller (edge count) DAG with additional nodes
# and a transitive closure that has the transitive closure of the original
# DAG as induced subgraph.
# It does this by generating a contraction hierachy (CH) and building a new
# DAG from it. The main addition is, that it also replaces 2,2-bicliques
# with a 4-star graph. With suitable edge direction they have the same
# reachability on the original nodes. This wouldn't work for distance
# queries where CHs are usually used but is perfectly fine for
# reachability.
g_orig = g
g = g.copy()
if not nx.is_directed_acyclic_graph(g):
raise ValueError('requires a DAG as input')
new_node = itertools.count(max(g) + 1)
# nodes without edges can be ignored
for v in nx.isolates(g):
g.remove_node(v)
orig_nodes = set(g)
g_ch = nx.DiGraph()
while g:
remove_triangles(g)
remove_2_2_bicliques(g, new_node)
candidates = []
for v in g:
pred_count = len(g.pred[v])
succ_count = len(g.succ[v])
cost = pred_count * succ_count - pred_count - succ_count
candidates.append((cost, v))
candidates.sort()
blocked = set()
for cost, v in candidates:
if v in blocked:
continue
pred = set(g.pred[v])
succ = set(g.succ[v])
blocked.update(pred, succ)
for x in succ:
g_ch.add_edge(v, x)
for x in pred:
g_ch.add_edge(x, v)
for x in pred:
for y in succ:
g.add_edge(x, y)
g.remove_node(v)
simplified = True
while simplified:
simplified = False
simplified |= remove_triangles(g_ch)
simplified |= remove_deg_1(g_ch, orig_nodes)
return g_ch
def remove_2_2_bicliques(g, new_node_iter):
# Repeatedly replaces a 2,2-biclique (4 nodes, 4 edges) with a 4-star (5
# nodes, 4 edges). Naive implementation, this can be done _much_ faster.
while True:
best_edge = None
best_common = set(range(2))
for v, w in itertools.combinations(g.nodes, 2):
if (v, w) in g.edges or (w, v) in g.edges:
# Cannot be a 2,2-biclique after remove_triangles
continue
common_preds = set(g.pred[v]) & set(g.pred[w])
common_succs = set(g.succ[v]) & set(g.succ[w])
if len(common_preds) > len(best_common):
best_common = common_preds
best_edge = (v, w, 'pred')
if len(common_succs) > len(best_common):
best_common = common_succs
best_edge = (v, w, 'succ')
if best_edge is None:
break
v, w, direction = best_edge
n = next(new_node_iter)
for x in best_common:
for y in [v, w]:
if direction == 'pred':
g.remove_edge(x, y)
else:
g.remove_edge(y, x)
if direction == 'pred':
g.add_edge(x, n)
else:
g.add_edge(n, x)
if direction == 'pred':
g.add_edge(n, v)
g.add_edge(n, w)
else:
g.add_edge(v, n)
g.add_edge(w, n)
def remove_triangles(g):
# Removes triangle edges that are redundant. Naive implementation.
removals = []
for (v, w) in g.edges:
if set(g.succ[v]) & set(g.pred[w]):
removals.append((v, w))
for (v, w) in removals:
g.remove_edge(v, w)
return bool(removals)
def remove_deg_1(g, keep):
# Remove nodes with in- or out-degree 1 apart from the nodes in keep.
changed = False
for v in list(g):
if v in keep:
continue
if g.in_degree(v) == 1:
pred = next(iter(g.pred[v]))
for x in g.succ[v]:
g.add_edge(pred, x)
g.remove_node(v)
changed = True
elif g.out_degree(v) == 1:
succ = next(iter(g.succ[v]))
for x in g.pred[v]:
g.add_edge(x, succ)
g.remove_node(v)
changed = True
return changed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment