Skip to content

Instantly share code, notes, and snippets.

@niroyb
Last active March 23, 2023 10:07
Show Gist options
  • Save niroyb/5401781 to your computer and use it in GitHub Desktop.
Save niroyb/5401781 to your computer and use it in GitHub Desktop.
Searches a graph and yields all the minimum spanning trees in order of increasing cost. This could be used to solve minimum spanning trees with constraints by yielding trees until we reach the first one which satisfies a constraint. For example it could solve the degree constrained minimum spanning tree DCMST
#!/Usr/bin/env python
# -*- coding: utf-8 -*-
'''
Searches a graph and yields all the minimum spanning trees in order of increasing cost.
This could be used to solve minimum spanning trees with constraints by yielding trees until
we reach the first one which satisfies a constraint.
For example it could solve the degree constrained minimum spanning tree DCMST
'''
__author__ = 'Nicolas Roy @niroyb'
__date__ = '2013/04/17'
from UnionFind import UnionFind # Data structure for Kruskal Algorithm
import copy
import heapq
from collections import namedtuple
# Algorithm Article
# http://www.scielo.br/scielo.php?script=sci_arttext&pid=S0101-74382005000200004
# Original Author : Wladston Viana Ferreira Filho @wladston
# Original Code : https://gist.github.com/wladston/1981862
# Original post : https://networkx.lanl.gov/trac/ticket/471
# ChangeLog : Made the original code work, optimized and translated
'''
Data Types:
Partition : Two lists of edges : [edges included, edges excluded]
Edge : A tuple of 3 elements (cost, vertice1, vertice2)
Graph : A list of edges
Acronyms:
MST : minimum spanning tree
'''
Partition = namedtuple('Partition', ['included', 'excluded'])
def getGraphCost(graph):
'''Returns the total sum of weights of edges in a graph'''
return sum(e[0] for e in graph)
def kruskal(edges):
'''Returns the mst'''
edges.sort()
subtrees = UnionFind()
solution = []
for edge in edges:
_, u, v = edge
if subtrees[u] != subtrees[v]:
solution.append(edge)
subtrees.union(u, v)
return solution
class IncreasingMST():
def __init__(self, edges):
self.edges = edges
self.edges.sort() # Sort edges by weight
self.order = self.__getNbVertices(edges)
@classmethod
def __getNbVertices(self, edges):
vertices = set()
for c, u, v in edges:
vertices.add(u)
vertices.add(v)
return len(vertices)
@classmethod
def __partition(self , mstEdges , p):
'''
Given an MST and a __partition P, P partitions the vertices of the MST.
The union of the set of partitions generated here with the last MST
is equivalent to the last __partition as an input.
'''
p1 = copy.deepcopy(p)
p2 = copy.deepcopy(p)
partitions = []
# Find open edges : they are in MST but are not included nor excluded
open_edges = [edge for edge in mstEdges if edge not in p.included
and edge not in p.excluded]
for e in open_edges :
p1.excluded.append(e)
p2.included.append(e)
partitions.append(p1)
p1 = copy.deepcopy(p2)
return partitions
def __kruskalMST(self, P):
'''
Returns the MST of a graph contained in the __partition P.
Returns None if there is not any.
P [0] -> edges included, P [1] -> edges excluded
'''
# Initialize the subtrees for Kruskal Algorithm with included edges
subtrees = UnionFind()
for c, u , v in P.included :
subtrees.union(u , v)
# Find open Edges
edges = [e for e in self.edges if e not in P.included and e not in P.excluded]
# Create a solution tree with the list of included edges from the __partition
tree = list(P.included)
# Add edges connecting vertices not connected yet, until we get to
# A solution, or discover that you can not connect all vertices
# Apply Kruskal on open edges
for edge in edges :
c, u , v = edge
if subtrees [u] != subtrees [v]:
tree.append(edge)
subtrees.union(u, v)
if self.order == self.__getNbVertices(tree) and \
len(tree) == self.order - 1:
return tree
else:
return None
def mst_iter(self):
'''Minimum spanning tree iterator : yields the MST in order of their cost
Warning this quickly becomes computer intensive both on CPU and memory'''
mst = kruskal(self.edges)
mstCost = getGraphCost(mst)
List = [(mstCost, Partition([], []), mst)]
# While we have a __partition
while len(List):
# Get __partition with the smallest spanning tree
# and remove it from the list
cost, partition, tree = heapq.heappop(List)
# Yield MST result
yield tree
edges = tree
# Partition previous partition
newPartitions = self.__partition(edges, partition)
for p in newPartitions:
tree = self.__kruskalMST(p)
if tree:
# print 'isTree', tree
cost = getGraphCost(tree)
heapq.heappush(List, (cost, p, tree))
if __name__ == '__main__' :
# Given example
G = [(5, 'A', 'B'), (4, 'B', 'C'), (5, 'C', 'D'),
(3, 'D', 'E'), (7, 'C', 'E'), (6, 'B', 'E')]
iMST = IncreasingMST(G)
print 'Edges sorted by weight'
for i, edge in enumerate(iMST.edges, 1):
print i, edge
print '\nMinimum Spanning trees in order of increasing cost:'
for tree in iMST.mst_iter():
print tree
#print map(id, tree) # Edges share the same memory space
print 'Cost =', getGraphCost(tree)
print
#!/Usr/bin/env python
# -*- coding: utf-8 -*-
'''
Searches a graph and yields all the minimum spanning trees in order of increasing cost.
This could be used to solve minimum spanning trees with constraints by yielding trees until
we reach the first one which satisfies a constraint.
For example it could solve the degree constrained minimum spanning tree DCMST
'''
import networkx as nx
from networkx.utils import UnionFind #Data structure for Kruskal Algorithm
import copy
import heapq
from collections import namedtuple
__author__ = 'Nicolas Roy @niroyb'
__date__ = '2013/04/17'
# Algorithm Article
# http://www.scielo.br/scielo.php?script=sci_arttext&pid=S0101-74382005000200004
# Original Author : Wladston Viana Ferreira Filho @wladston
# Original Code : https://gist.github.com/wladston/1981862
# Original post : https://networkx.lanl.gov/trac/ticket/471
# ChangeLog : Made the original code work, optimized and translated
'''
Data Types:
Partition : A tuple of two lists of edges (edges included, edges excluded)
Edge : A tuple of two vertices and an attribute dict (v1, v2, d)
Acronyms:
MST : minimum spanning tree
'''
Partition = namedtuple('Partition', ['included', 'excluded'])
def edgeValue(edge):
'''Returns the weight of an edge'''
return edge[2].get('weight', 1)
def getGraphCost(graph):
'''Returns the total sum of weights of edges in a graph'''
return graph.size(weight='weight')
class IncreasingMST():
def __init__(self, nxGraph):
self.graph = nxGraph
self.edges = self.getEdges(nxGraph)
#Sort edges by weight
self.edges.sort(key = edgeValue)
@classmethod
def getEdges(self, nxGraph):
'''Returns the different edges in an undirected graph with v1 < v2'''
# The reason for doing this is that there is no order warranty with the
# networkx edges method on vertex order
edges = []
for v1 in nxGraph:
for v2, data in nxGraph[v1].items():
if v1 < v2:
edges.append((v1, v2, data))
return edges
@classmethod
def __partition(self , mstEdges , p):
'''
Given an MST and a partition P, P partitions the vertices of the MST.
The union of the set of partitions generated here with the last MST
is equivalent to the last partition as an input.
'''
p1 = copy.deepcopy(p)
p2 = copy.deepcopy(p)
partitions = []
#Find open edges : they are in MST but are not included nor excluded
open_edges = [edge for edge in mstEdges if edge not in p.included
and edge not in p.excluded]
for e in open_edges :
p1[1].append(e)
p2[0].append(e)
partitions.append(p1)
p1 = copy.deepcopy(p2)
return partitions
def __kruskalMST(self, P):
'''
Returns the MST of a graph contained in the partition P.
Returns None if there is not any.
P [0] -> edges included, P [1] -> edges excluded
'''
# Create a solution tree with the list of included edges from the partition
tree = nx.Graph(P.included)
#Initialize the subtrees for Kruskal Algorithm with included edges
subtrees = UnionFind()
for u , v , d in P.included :
subtrees.union(u , v)
# Find open Edges
edges = [e for e in self.edges if e not in P.included and e not in P.excluded]
# Add edges connecting vertices not connected yet, until we get to
# A solution, or discover that you can not connect all vertices
#Apply Kruskal on open edges
for u , v , d in edges :
if subtrees [u] != subtrees [v]:
tree.add_edge(u, v, d)
subtrees.union(u, v)
#Check if all the nodes have been reached and with n-1 edges
if tree.order() == self.graph.order() and \
tree.size() == tree.order() - 1:
return tree
else:
return None
def mst_iter(self):
'''Minimum spanning tree iterator : yields the MST in order of their cost
Warning this quickly becomes computer intensive both on CPU and memory'''
mst = nx.minimum_spanning_tree(self.graph)
mstCost = getGraphCost(mst)
List = [(mstCost, Partition([], []), mst)]
#While we have a partition
while len(List):
# Get partition with the smallest spanning tree
# and remove it from the list
cost, partition, tree = heapq.heappop(List)
# Yield MST result
yield tree
edges = self.getEdges(tree)
edges.sort()
#Partition previous partition
newPartitions = self.__partition(edges, partition)
#Find minimum spanning trees in new partitions
for p in newPartitions:
tree = self.__kruskalMST(p)
if tree:
cost = getGraphCost(tree)
heapq.heappush(List, (cost, p, tree))
if __name__ == '__main__' :
#Given example
G = nx.Graph()
G.add_edge('A', 'B', weight = 5)
G.add_edge('B', 'C', weight = 4)
G.add_edge('C', 'D', weight = 5)
G.add_edge('D', 'E', weight = 3)
G.add_edge('C', 'E', weight = 7)
G.add_edge('B', 'E', weight = 6)
iMST = IncreasingMST(G)
print 'Edges sorted by weight'
for i, edge in enumerate(iMST.edges, 1):
print i, edge
print '\nMinimum Spanning trees in order of increasing cost:'
for tree in iMST.mst_iter():
print solver.getEdges(G)
print 'Cost =', getGraphCost(tree)
print
"""UnionFind.py
Union-find data structure. Based on Josiah Carlson's code,
http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/215912
with significant additional changes by D. Eppstein.
"""
class UnionFind:
"""Union-find data structure.
Each unionFind instance X maintains a family of disjoint sets of
hashable objects, supporting the following two methods:
- X[item] returns a name for the set containing the given item.
Each set is named by an arbitrarily-chosen one of its members; as
long as the set remains unchanged it will keep the same name. If
the item is not yet part of a set in X, a new singleton set is
created for it.
- X.union(item1, item2, ...) merges the sets containing each item
into a single larger set. If any item is not yet part of a set
in X, it is added to X as one of the members of the merged set.
"""
def __init__(self):
"""Create a new empty union-find structure."""
self.weights = {}
self.parents = {}
def __getitem__(self, object):
"""Find and return the name of the set containing the object."""
# check for previously unknown object
if object not in self.parents:
self.parents[object] = object
self.weights[object] = 1
return object
# find path of objects leading to the root
path = [object]
root = self.parents[object]
while root != path[-1]:
path.append(root)
root = self.parents[root]
# compress the path and return
for ancestor in path:
self.parents[ancestor] = root
return root
def __iter__(self):
"""Iterate through all items ever found or unioned by this structure."""
return iter(self.parents)
def union(self, *objects):
"""Find the sets containing the objects and merge them all."""
roots = [self[x] for x in objects]
heaviest = max([(self.weights[r],r) for r in roots])[1]
for r in roots:
if r != heaviest:
self.weights[heaviest] += self.weights[r]
self.parents[r] = heaviest
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment