Skip to content

Instantly share code, notes, and snippets.

@TApplencourt
Created February 15, 2019 19:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save TApplencourt/015041433176802607092e44d86aab9f to your computer and use it in GitHub Desktop.
Save TApplencourt/015041433176802607092e44d86aab9f to your computer and use it in GitHub Desktop.
Remove cycle from DAG
#!/usr/bin/env python3
# _ _ _ _ _
# | | | | | (_) |
# | | | | |_ _| |___
# | | | | __| | / __|
# | |_| | |_| | \__ \
# \___/ \__|_|_|___/
#
class cached_property(object):
def __init__(self, func):
self.__doc__ = getattr(func, '__doc__')
self.func = func
def __get__(self, obj, cls):
if obj is None:
return self
value = obj.__dict__[self.func.__name__] = self.func(obj)
return value
# ___ ___ _
# | \/ | | |
# | . . | ___ __ _| |_
# | |\/| |/ _ \/ _` | __|
# | | | | __/ (_| | |_
# \_| |_/\___|\__,_|\__|
#
import networkx as nx
from networkx import DiGraph
from typing import Hashable, Set, NamedTuple
from itertools import permutations, chain
class Node(object):
def __init__(self,node):
self.n = node
@property
def new(self):
return f"{self.n}'"
class Edge(NamedTuple):
source: Node
target: Node
class Transformation(object):
def __init__(self, G, edge_touched):
self.G = nx.DiGraph(G)
self.edge_touched = Edge._make(edge_touched)
@cached_property
def S(self) -> Node:
return self.edge_touched.source
@cached_property
def T(self) -> Node:
return self.edge_touched.target
@cached_property
def gTS(self) -> DiGraph:
g = DiGraph()
for p in nx.all_simple_paths(self.G, self.T, self.S):
nx.add_path(g,p)
return g
@cached_property
def R(self) -> Set[Node]:
s = set()
for n in set(self.gTS):
s |= set(self.G.predecessors(n))
return s - set(self.gTS)
@cached_property
def Rr(self) -> Set[Edge]:
r = set()
for s,t in self.G.edges(self.R):
if t in self.gTS and t is not self.T:
r.add(Edge(s,t))
return r
@cached_property
def L(self) -> Set[Node]:
s = set()
for n in set(self.gTS) - set([self.S]):
s |= set(self.G.successors(n))
return s - set(self.gTS)
@cached_property
def lL(self) -> Set[Edge]:
g = self.G.reverse()
r = set()
for t,s in g.edges(self.L):
if s in self.gTS and s is not self.S:
r.add( Edge(s,t) )
return r
@cached_property
def gTL(self) -> DiGraph:
g = self.gTS.copy()
g.add_edges_from(self.lL)
leafs_to_remove = set([self.S])
while leafs_to_remove:
possible_futur_leafs = list(chain.from_iterable(map(g.predecessors,leafs_to_remove)))
g.remove_nodes_from(leafs_to_remove)
leafs = set(n for n,d in g.out_degree(possible_futur_leafs) if d == 0 )
leafs_to_remove = leafs - self.L
return g
@cached_property
def H(self) -> DiGraph:
h = self.G.copy()
for edge in self.lL:
h.remove_edge(*edge)
for s,t in self.gTL.edges():
t1 = Node(t).new if t not in self.L else t
h.add_edge(Node(s).new,t1)
if self.L:
h.add_edge(self.S,Node(self.T).new)
for (s,t) in self.Rr:
if t in self.gTL:
h.add_edge(s,Node(t).new)
return h
# _ _ _ _ _ _
# | | | | (_) | | | | |
# | | | |_ __ _| |_ | |_ ___ ___| |_
# | | | | '_ \| | __| | __/ _ \/ __| __|
# | |_| | | | | | |_ | || __/\__ \ |_
# \___/|_| |_|_|\__| \__\___||___/\__|
#
import unittest
from unittest import TestCase
def print_graph(G, name, e=None):
F = G.copy()
if e is not None:
F.add_edge(e.source,e.target)
F[e.source][e.target].update({'color':'red'})
from networkx.drawing.nx_agraph import to_agraph
g = to_agraph(F)
g.layout('dot')
g.draw(name)
class Ref(NamedTuple):
g: DiGraph
gold: DiGraph
param_list = [
# Minimal example
Ref({'T':['S']},{'T':['S']}),
# No leaf no change 1
Ref({'T':'r', 'R':'r', 'r':'S'},{'T':'r', 'R':'r', 'r':'S'}),
# No leaf no change 2
Ref({'R':'T','T':'S'},{'R':'T','T':'S'}),
# Leaf no root
Ref({'T':'l', 'l':'LS'},{'T':'l','l':'S','S':['T.1'],'T.1':['l.1'],'l.1':'L'}),
# Leaf no root, depdency
Ref({'T':'lS', 'S':'L','l':'SL'},{'T':'Sl','l':'S','S':['T.1','L'],'T.1':['l.1'],'l.1':'L'}),
# Leaf no root. L depend of T
Ref({'T':'LS'}, {'T':'S','S':['T.1'],'T.1':'L'}),
# Root and leaf in two path
Ref({'R':'r', 'T':'rl','r':'S','l':'SL'}, {'R':'r', 'T':'rl','r':'S','l':'S','S':['T.1'], 'T.1':['l.1'],'l.1':'L'}),
# Root and leaf, same path
Ref({'R':'r', 'T':'r','r':'l','l':'SL'}, {'T':'r', 'R':['r','r.1'], 'r':'l', 'l':'S', 'S':['T.1'], 'T.1':['r.1'], 'r.1':['l.1'], 'l.1':'L'}),
# Root and leaf, same path. R and L merged
Ref({'R':['rl'], 'T':['rl'],'rl':'SL'}, {'R':['rl','rl.1'], 'T':['rl'], 'rl':'S', 'S':['T.1'],'T.1':['rl.1'],'rl.1':'L'}),
# Root and leaf, same path. R and L merged
Ref({'R':'r','T':'rL','r':'S'}, {'R':'r','T':'r', 'r':'S', 'S':['T.1'], 'T.1':'L'}),
# Root and leaf, no more dependency on T.1
Ref({'R':'T','T':'l','l':'SL'}, {'R':'T','T':'l', 'l':'S', 'S':['T.1'], 'T.1':['l.1'], 'l.1':'L'}),
# Root and leaf, no more dependency on T.1. l and T merged
Ref({'R':'T', 'T':'SL'}, {'R':'T','T':'S','S':['T.1'], 'T.1':['L']})
]
class TestDemonstrateSubtest(TestCase):
def compose(self,l_g):
# Merge two graph together.
# Connect them by S and T
l = []
for i,g in enumerate(l_g):
mapping = {k: f'g{i}.{k}' if all(not k.startswith(n) for n in 'TS') else k for k in g}
dg = nx.relabel_nodes(DiGraph(g), mapping)
l.append(dg)
return nx.compose_all(l)
def assertGraphTransIso(self, g, gold, e=('S','T')):
return self.assertTrue(nx.is_isomorphic(Transformation(g,e).H,
DiGraph(gold)))
# Test algorithm theory
def test_simple(self):
for i,(g, gold) in enumerate(param_list):
with self.subTest(i):
self.assertGraphTransIso(g,gold)
# Test algorithm implementation
#@unittest.skip
def test_composition(self):
for p in permutations(param_list,2):
l_g, l_gold = zip(*p)
g3 = self.compose(l_g)
gold3 = self.compose(l_gold)
with self.subTest():
self.assertGraphTransIso(g3,gold3)
# ___ ___ _
# | \/ | (_)
# | . . | __ _ _ _ __
# | |\/| |/ _` | | '_ \
# | | | | (_| | | | | |
# \_| |_/\__,_|_|_| |_|
#
if __name__ == '__main__':
unittest.main()
g = {'T':['L','S'],'L':['X'],'S':['X']}
t = Transformation(g,('S','T'))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment