Created
February 15, 2019 19:54
-
-
Save TApplencourt/015041433176802607092e44d86aab9f to your computer and use it in GitHub Desktop.
Remove cycle from DAG
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# _ _ _ _ _ | |
# | | | | | (_) | | |
# | | | | |_ _| |___ | |
# | | | | __| | / __| | |
# | |_| | |_| | \__ \ | |
# \___/ \__|_|_|___/ | |
# | |
class cached_property(object): | |
def __init__(self, func): | |
self.__doc__ = getattr(func, '__doc__') | |
self.func = func | |
def __get__(self, obj, cls): | |
if obj is None: | |
return self | |
value = obj.__dict__[self.func.__name__] = self.func(obj) | |
return value | |
# ___ ___ _ | |
# | \/ | | | | |
# | . . | ___ __ _| |_ | |
# | |\/| |/ _ \/ _` | __| | |
# | | | | __/ (_| | |_ | |
# \_| |_/\___|\__,_|\__| | |
# | |
import networkx as nx | |
from networkx import DiGraph | |
from typing import Hashable, Set, NamedTuple | |
from itertools import permutations, chain | |
class Node(object): | |
def __init__(self,node): | |
self.n = node | |
@property | |
def new(self): | |
return f"{self.n}'" | |
class Edge(NamedTuple): | |
source: Node | |
target: Node | |
class Transformation(object): | |
def __init__(self, G, edge_touched): | |
self.G = nx.DiGraph(G) | |
self.edge_touched = Edge._make(edge_touched) | |
@cached_property | |
def S(self) -> Node: | |
return self.edge_touched.source | |
@cached_property | |
def T(self) -> Node: | |
return self.edge_touched.target | |
@cached_property | |
def gTS(self) -> DiGraph: | |
g = DiGraph() | |
for p in nx.all_simple_paths(self.G, self.T, self.S): | |
nx.add_path(g,p) | |
return g | |
@cached_property | |
def R(self) -> Set[Node]: | |
s = set() | |
for n in set(self.gTS): | |
s |= set(self.G.predecessors(n)) | |
return s - set(self.gTS) | |
@cached_property | |
def Rr(self) -> Set[Edge]: | |
r = set() | |
for s,t in self.G.edges(self.R): | |
if t in self.gTS and t is not self.T: | |
r.add(Edge(s,t)) | |
return r | |
@cached_property | |
def L(self) -> Set[Node]: | |
s = set() | |
for n in set(self.gTS) - set([self.S]): | |
s |= set(self.G.successors(n)) | |
return s - set(self.gTS) | |
@cached_property | |
def lL(self) -> Set[Edge]: | |
g = self.G.reverse() | |
r = set() | |
for t,s in g.edges(self.L): | |
if s in self.gTS and s is not self.S: | |
r.add( Edge(s,t) ) | |
return r | |
@cached_property | |
def gTL(self) -> DiGraph: | |
g = self.gTS.copy() | |
g.add_edges_from(self.lL) | |
leafs_to_remove = set([self.S]) | |
while leafs_to_remove: | |
possible_futur_leafs = list(chain.from_iterable(map(g.predecessors,leafs_to_remove))) | |
g.remove_nodes_from(leafs_to_remove) | |
leafs = set(n for n,d in g.out_degree(possible_futur_leafs) if d == 0 ) | |
leafs_to_remove = leafs - self.L | |
return g | |
@cached_property | |
def H(self) -> DiGraph: | |
h = self.G.copy() | |
for edge in self.lL: | |
h.remove_edge(*edge) | |
for s,t in self.gTL.edges(): | |
t1 = Node(t).new if t not in self.L else t | |
h.add_edge(Node(s).new,t1) | |
if self.L: | |
h.add_edge(self.S,Node(self.T).new) | |
for (s,t) in self.Rr: | |
if t in self.gTL: | |
h.add_edge(s,Node(t).new) | |
return h | |
# _ _ _ _ _ _ | |
# | | | | (_) | | | | | | |
# | | | |_ __ _| |_ | |_ ___ ___| |_ | |
# | | | | '_ \| | __| | __/ _ \/ __| __| | |
# | |_| | | | | | |_ | || __/\__ \ |_ | |
# \___/|_| |_|_|\__| \__\___||___/\__| | |
# | |
import unittest | |
from unittest import TestCase | |
def print_graph(G, name, e=None): | |
F = G.copy() | |
if e is not None: | |
F.add_edge(e.source,e.target) | |
F[e.source][e.target].update({'color':'red'}) | |
from networkx.drawing.nx_agraph import to_agraph | |
g = to_agraph(F) | |
g.layout('dot') | |
g.draw(name) | |
class Ref(NamedTuple): | |
g: DiGraph | |
gold: DiGraph | |
param_list = [ | |
# Minimal example | |
Ref({'T':['S']},{'T':['S']}), | |
# No leaf no change 1 | |
Ref({'T':'r', 'R':'r', 'r':'S'},{'T':'r', 'R':'r', 'r':'S'}), | |
# No leaf no change 2 | |
Ref({'R':'T','T':'S'},{'R':'T','T':'S'}), | |
# Leaf no root | |
Ref({'T':'l', 'l':'LS'},{'T':'l','l':'S','S':['T.1'],'T.1':['l.1'],'l.1':'L'}), | |
# Leaf no root, depdency | |
Ref({'T':'lS', 'S':'L','l':'SL'},{'T':'Sl','l':'S','S':['T.1','L'],'T.1':['l.1'],'l.1':'L'}), | |
# Leaf no root. L depend of T | |
Ref({'T':'LS'}, {'T':'S','S':['T.1'],'T.1':'L'}), | |
# Root and leaf in two path | |
Ref({'R':'r', 'T':'rl','r':'S','l':'SL'}, {'R':'r', 'T':'rl','r':'S','l':'S','S':['T.1'], 'T.1':['l.1'],'l.1':'L'}), | |
# Root and leaf, same path | |
Ref({'R':'r', 'T':'r','r':'l','l':'SL'}, {'T':'r', 'R':['r','r.1'], 'r':'l', 'l':'S', 'S':['T.1'], 'T.1':['r.1'], 'r.1':['l.1'], 'l.1':'L'}), | |
# Root and leaf, same path. R and L merged | |
Ref({'R':['rl'], 'T':['rl'],'rl':'SL'}, {'R':['rl','rl.1'], 'T':['rl'], 'rl':'S', 'S':['T.1'],'T.1':['rl.1'],'rl.1':'L'}), | |
# Root and leaf, same path. R and L merged | |
Ref({'R':'r','T':'rL','r':'S'}, {'R':'r','T':'r', 'r':'S', 'S':['T.1'], 'T.1':'L'}), | |
# Root and leaf, no more dependency on T.1 | |
Ref({'R':'T','T':'l','l':'SL'}, {'R':'T','T':'l', 'l':'S', 'S':['T.1'], 'T.1':['l.1'], 'l.1':'L'}), | |
# Root and leaf, no more dependency on T.1. l and T merged | |
Ref({'R':'T', 'T':'SL'}, {'R':'T','T':'S','S':['T.1'], 'T.1':['L']}) | |
] | |
class TestDemonstrateSubtest(TestCase): | |
def compose(self,l_g): | |
# Merge two graph together. | |
# Connect them by S and T | |
l = [] | |
for i,g in enumerate(l_g): | |
mapping = {k: f'g{i}.{k}' if all(not k.startswith(n) for n in 'TS') else k for k in g} | |
dg = nx.relabel_nodes(DiGraph(g), mapping) | |
l.append(dg) | |
return nx.compose_all(l) | |
def assertGraphTransIso(self, g, gold, e=('S','T')): | |
return self.assertTrue(nx.is_isomorphic(Transformation(g,e).H, | |
DiGraph(gold))) | |
# Test algorithm theory | |
def test_simple(self): | |
for i,(g, gold) in enumerate(param_list): | |
with self.subTest(i): | |
self.assertGraphTransIso(g,gold) | |
# Test algorithm implementation | |
#@unittest.skip | |
def test_composition(self): | |
for p in permutations(param_list,2): | |
l_g, l_gold = zip(*p) | |
g3 = self.compose(l_g) | |
gold3 = self.compose(l_gold) | |
with self.subTest(): | |
self.assertGraphTransIso(g3,gold3) | |
# ___ ___ _ | |
# | \/ | (_) | |
# | . . | __ _ _ _ __ | |
# | |\/| |/ _` | | '_ \ | |
# | | | | (_| | | | | | | |
# \_| |_/\__,_|_|_| |_| | |
# | |
if __name__ == '__main__': | |
unittest.main() | |
g = {'T':['L','S'],'L':['X'],'S':['X']} | |
t = Transformation(g,('S','T')) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment