Skip to content

Instantly share code, notes, and snippets.

@Wlodarski
Created May 21, 2017 00:26
Show Gist options
  • Save Wlodarski/b78c6662474028958b77ff302940ba33 to your computer and use it in GitHub Desktop.
Save Wlodarski/b78c6662474028958b77ff302940ba33 to your computer and use it in GitHub Desktop.
Finding the Critical Path with Topological Sorting
#! python3
import networkx as nx
class CPM(nx.DiGraph):
def __init__(self):
super().__init__()
self._dirty = True
self._critical_path_length = -1
self._criticalPath = None
def add_node(self, *args, **kwargs):
self._dirty = True
super().add_node(*args, **kwargs)
def add_nodes_from(self, *args, **kwargs):
self._dirty = True
super().add_nodes_from(*args, **kwargs)
def add_edge(self, *args): # , **kwargs):
self._dirty = True
super().add_edge(*args) # , **kwargs)
def add_edges_from(self, *args, **kwargs):
self._dirty = True
super().add_edges_from(*args, **kwargs)
def remove_node(self, *args, **kwargs):
self._dirty = True
super().remove_node(*args, **kwargs)
def remove_nodes_from(self, *args, **kwargs):
self._dirty = True
super().remove_nodes_from(*args, **kwargs)
def remove_edge(self, *args): # , **kwargs):
self._dirty = True
super().remove_edge(*args) # , **kwargs)
def remove_edges_from(self, *args, **kwargs):
self._dirty = True
super().remove_edges_from(*args, **kwargs)
def _forward(self):
for n in nx.topological_sort(self):
es = max([self.node[j]['EF'] for j in self.predecessors(n)], default=0)
self.add_node(n, ES=es, EF=es + self.node[n]['duration'])
def _backward(self):
for n in nx.topological_sort(self, reverse=True):
lf = min([self.node[j]['LS'] for j in self.successors(n)], default=self._critical_path_length)
self.add_node(n, LS=lf - self.node[n]['duration'], LF=lf)
def _compute_critical_path(self):
graph = set()
for n in self:
if self.node[n]['EF'] == self.node[n]['LF']:
graph.add(n)
self._criticalPath = self.subgraph(graph)
@property
def critical_path_length(self):
if self._dirty:
self._update()
return self._critical_path_length
@property
def critical_path(self):
if self._dirty:
self._update()
return sorted(self._criticalPath, key=lambda x: self.node[x]['ES'])
def _update(self):
self._forward()
self._critical_path_length = max(nx.get_node_attributes(self, 'EF').values())
self._backward()
self._compute_critical_path()
self._dirty = False
if __name__ == "__main__":
G = CPM()
G.add_node('A', duration=5)
G.add_node('B', duration=2)
G.add_node('C', duration=4)
G.add_node('D', duration=4)
G.add_node('E', duration=3)
G.add_node('F', duration=7)
G.add_node('G', duration=3)
G.add_node('H', duration=2)
G.add_node('I', duration=4)
G.add_edges_from([
('A', 'C'),
('A', 'D'),
('B', 'E'),
('B', 'F'),
('D', 'G'), ('E', 'G'),
('F', 'H'),
('C', 'I'), ('G', 'I'), ('H', 'I')
])
print('Critical path:')
print(G.critical_path_length, G.critical_path)
G.add_node('D', duration=2) # editing existing node
print('\nCrushing D, from 4 to 2, gives:')
print(G.critical_path_length, G.critical_path)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment