Skip to content

Instantly share code, notes, and snippets.

@felipecruz
Last active August 29, 2015 14:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save felipecruz/77d93460b0e410f41225 to your computer and use it in GitHub Desktop.
Save felipecruz/77d93460b0e410f41225 to your computer and use it in GitHub Desktop.
import heapq
import pprint
import sys
from collections import deque
inf = float('inf')
capacity = 'capacity'
flow = 'flow'
cost = 'cost'
lower = 'lower'
class FlexObject(object):
'''
An object with dynamic properties
'''
def __init__(self, _id, data):
self.id = _id
for k, v in data.items():
self.__dict__[k] = v
@property
def data(self):
return self.__dict__
class DirectedEdge(FlexObject):
'''
Directed edge between f and t
'''
def __init__(self, _id, data, f, t):
super(DirectedEdge, self).__init__(_id, data)
self.f = f
self.t = t
def __str__(self):
return "Edge id: {} -> from {} to {} - data: {}".format(
self.id, self.f, self.t, str(self.__dict__))
class Vertex(FlexObject):
'''
Vertex object
edges: [] - Adjacency list
visited: False
'''
def __init__(self, _id, data):
super(Vertex, self).__init__(_id, data)
self.edges = []
self.visited = False
def __str__(self):
return "{}".format(self.id)
def __repr__(self):
return self.__str__()
def add_from(self, v):
pass
def __eq__(self, other):
return self.id == other.id
class Graph(object):
def __init__(self):
self._vertexes = {}
self.edges = []
self.residual = False
@property
def vertexes(self):
return self._vertexes.values()
def get_vertex(self, _id):
vertex = self._vertexes.get(_id, None)
if not vertex:
return None
return vertex
def add_vertex(self, vertex_id, data):
v = Vertex(vertex_id, data)
self._vertexes[vertex_id] = v
def get_edge(self, _id):
for edge, s, t in self.edges:
if edge.id == _id:
return edge
return None
def add_edge(self, f_id, t_id, data=None, directed=True):
f = self.get_vertex(f_id)
if not f:
f = Vertex(f_id, {})
t = self.get_vertex(t_id)
if not t:
t = Vertex(t_id, {})
edge = None
e_id = (f.id, t.id)
if directed:
edge = DirectedEdge(e_id, data, f, t)
if not f.id in self._vertexes:
self._vertexes[f.id] = f
if not t.id in self._vertexes:
self._vertexes[t.id] = t
if not (edge, t) in f.edges:
f.edges.append((edge, t))
self.edges.append((edge, f, t))
else:
raise Exception("Use directed edges only")
return edge
def to_dot_lang(self):
def flow_cap(data, residual=False):
if not residual and lower in data:
return "{},{}".format(data[lower], data[capacity])
elif not residual:
return "{}".format(data[capacity])
return "{},{}".format(data[flow], data[capacity])
header = ['digraph Flow {\n node[shape=circle];']
for v in self.vertexes:
header.append('{} [label="{}"];'.format(v, v.id))
for edge, target in v.edges:
if edge.capacity != 0:
header.append('{} -> {} [label="{}"];'.format(
v, target, flow_cap(edge.data, residual=self.residual)))
header.append('}')
return '\n'.join(header)
def clear_visited(G):
'''Clear visited and pred(ecessor) attributes'''
for v in G.vertexes:
v.visited = False
v.pred = None
def BFS(G, s=None):
'''BFS'''
if not s:
s = G.vertexes[0]
s.visited = True
queue = deque([])
queue.append(s)
while queue:
v = queue.popleft()
print("Visited {}".format(v))
for e, v in v.edges:
if not v.visited:
v.visited = True
queue.append(v)
clear_visited(G)
def BFS_Constrained(G, s, t, constraint_func):
'''Constrained BFS
Find a path from s to t when constraints are respected.
Parameters:
G: graph
s: source node
t: terminal node
constraint_func: constraint function
Use regular functions or lambdas:
BFS_Constrained(G, s, t, lambda e, s, t: e.capacity - e.flow > 0)
or
def residual_capacity(e, s, t):
return e.capacity - e.flow > 0
BFS_Constrained(G, s, t, residual_capacity)
Return:
bool, left_edges_list, left_vertexes_list = BFS(...)
bool: There is a path from s to t
left_edges_list: Min cut *inner* edges
left_vertexes_list: min cut vertexes
'''
s.visited = True
queue = deque([])
queue.append(s)
path = []
vertexes = []
s.pred = (None, None)
original_t = t
while queue:
u = queue.popleft()
if u == t:
break
for e, v in u.edges:
if constraint_func(e, e.f, e.t):
if not v.visited:
v.pred = (e, u)
v.visited = True
queue.append(v)
while True:
vertexes.append(u)
if not u.pred[0]:
break
print(u.pred)
path.append(u.pred[0])
e, u = u.pred
clear_visited(G)
result = True, list(reversed(path)), list(reversed(vertexes))
if not vertexes[0] == original_t:
result = False, list(reversed(path)), list(reversed(vertexes))
return result
def DFS(G, s=None):
'''DSF'''
if not s:
s = G.vertexes[0]
s.visited = True
stack = []
stack.append(s)
while stack:
v = stack.pop()
print("Visited {}".format(v))
for e, v in v.edges:
if not v.visited:
v.visited = True
stack.append(v)
clear_visited(G)
def Dijkstra_Initialize(G, s):
''' Dijkstra Initialization
Set all, except s, distances to infinity and add to the heap.
Set distance to s s 0
Mark s as visited
Return: heap with (distance, node) elements
'''
predecessors = {}
distances = {s: 0}
heap = [(0, s)]
for v in [v for v in G.vertexes if not v == s]:
distances[v] = inf
heapq.heappush(heap, (inf, v))
s.visited = True
return heap, distances, predecessors
def Dijkstra(G, s=None):
''' DFS
Dijkstra shortest path from s algorithm.
Note: Using priotity queue.
'''
if not s:
s = G.vertexes[0]
heap, distances, predecessors = Dijkstra_Initialize(G, s)
while heap:
distance, u = heapq.heappop(heap)
distances[u] = distance
u.visited = True
for e, v in u.edges:
if not v.visited:
prev_dist = distances[v]
new_dist = distances[u] + e.capacity
if prev_dist > new_dist:
distances[v] = new_dist
predecessors[v] = u
i = heap.remove((prev_dist, v))
heapq.heappush(heap, (new_dist, v))
clear_visited(G)
return distances, predecessors
def BellmanFord_Initialize(G, s):
distances = {}
successors = {}
for v in G.vertexes:
distances[v] = inf
successors[v] = None
distances[s] = 0
return distances, successors
def relax(source_node, target_node, value, d, p):
if d[target_node] > d[source_node] + value:
d[target_node] = d[source_node] + value
p[target_node] = source_node
def BellmanFord(G, s=None):
if not s:
s = G.vertexes[0]
distances, successors = BellmanFord_Initialize(G, s)
for i in range(len(G.vertexes) - 1):
for e, f, t in G.edges:
relax(f, t, e.capacity, distances, successors)
for e, f, t in G.edges:
if distances[t] > distances[f] + e.capacity:
return True
return False
def residual_edge(edge):
''' Return residual edge and reverse residual edge attributes
'''
edge_capacity = edge.capacity
edge_flow = edge.flow
residual_capacity = edge_capacity - edge_flow
reverse_capacity = edge_flow
edge_data = edge.data.copy()
if residual_capacity > 0:
reverse_data = {capacity: reverse_capacity, flow: 0, cost: edge.cost}
return edge_data, reverse_data
return edge_data, {}
def residual_Graph(G):
''' Create a new Graph Gr as the residual graph from G
'''
Gr = Graph()
for edge, source, target in G.edges:
edge.r = False
edge_data, reverse_edge_data = residual_edge(edge)
new_edge = Gr.add_edge(source.id, target.id, data=edge_data)
new_edge.r = False
if reverse_edge_data:
rev_edge = Gr.add_edge(target.id, source.id, data=reverse_edge_data)
rev_edge.r = True
Gr.residual = True
return Gr
def write(G, Gr):
with open('G.dot', 'w') as out:
data = G.to_dot_lang()
out.write(data)
with open('rG.dot', 'w') as out:
rG_data = Gr.to_dot_lang()
out.write(rG_data)
def FordFulkerson(G, s, t):
for edge, _, _ in G.edges:
edge.flow = 0
max_flow = 0
Gr = residual_Graph(G)
s, t = Gr.get_vertex(s.id), Gr.get_vertex(t.id)
s_to_t, path ,vertexes = BFS_Constrained(Gr, s, t,
lambda e, s, t: e.capacity - e.flow > 0)
while s_to_t:
path_capacities = [(e.capacity - e.flow, e) for e in path]
bottleneck = min(path_capacities)[0]
max_flow += bottleneck
for e in path:
e_g = G.get_edge(e.id)
if not e.r and e.capacity >= e.flow + bottleneck:
e.flow = e.flow + bottleneck
e_g.flow += bottleneck
if e.r and e.capacity >= e.flow + bottleneck:
e.flow = e.flow - bottleneck
e_g.flow -= bottleneck
write(G, Gr)
#pprint.pprint(locals())
s_to_t, path, vertexes = BFS_Constrained(Gr, s, t,
lambda e, s, t: e.capacity - e.flow > 0)
print("Min cut: {}".format(vertexes))
return max_flow
def CycleCancealing(G, s, t):
pass
def main():
G = Graph()
G.add_edge(1, 2, data={capacity: 4 , cost: 1, flow: 0})
G.add_edge(1, 3, data={capacity: 5 , cost: 1, flow: 0})
G.add_edge(2, 4, data={capacity: 4 , cost: 1, flow: 0})
G.add_edge(3, 5, data={capacity: 4 , cost: 1, flow: 0})
G.add_edge(4, 6, data={capacity: 4 , cost: 1, flow: 0})
G.add_edge(5, 6, data={capacity: 1 , cost: 1, flow: 0})
G.add_edge(2, 3, data={capacity: 1 , cost: 1, flow: 0})
G.add_edge(5, 2, data={capacity: -8, cost: 1, flow: 0})
print("Edges")
for edge in G.edges:
print(edge)
print("Vertexes")
for vertex in G.vertexes:
print(vertex)
print("Edges by vertex")
for vertex in G.vertexes:
for edge, v in vertex.edges:
print("Vertex {} -> {}".format(vertex.id, edge.id))
#print("BFS")
#BFS(G)
print("BFS Constrained flow > 0")
s, t = G.get_vertex(1), G.get_vertex(6)
s_to_t, path, vertexes = BFS_Constrained(G, s, t, lambda e, s, t: e.capacity - e.flow > 0)
print("Path from {} to {}".format(s, t))
for p in path:
print(p)
X = Graph()
X.add_edge(1, 2, data={capacity: 20, cost: 1, flow: 0})
X.add_edge(1, 3, data={capacity: 10, cost: 1, flow: 0})
X.add_edge(2, 3, data={capacity: 30, cost: 1, flow: 0})
X.add_edge(2, 4, data={capacity: 10, cost: 1, flow: 0})
X.add_edge(3, 4, data={capacity: 20, cost: 1, flow: 0})
s, t = X.get_vertex(1), X.get_vertex(4)
max_flow = FordFulkerson(X, s, t)
print("FordFulkerson Max Flow: {}".format(max_flow))
X = Graph()
X.add_edge(1, 2, data={capacity: 5 , cost: 1, flow: 0})
X.add_edge(1, 3, data={capacity: 15, cost: 1, flow: 0})
X.add_edge(2, 4, data={capacity: 5 , cost: 1, flow: 0})
X.add_edge(2, 5, data={capacity: 5, cost: 1, flow: 0})
X.add_edge(3, 4, data={capacity: 5, cost: 1, flow: 0})
X.add_edge(3, 5, data={capacity: 5, cost: 1, flow: 0})
X.add_edge(4, 6, data={capacity: 15, cost: 1, flow: 0})
X.add_edge(5, 6, data={capacity: 5, cost: 1, flow: 0})
s, t = X.get_vertex(1), X.get_vertex(6)
max_flow = FordFulkerson(X, s, t)
for edge, s, t in X.edges:
print(edge)
print("FordFulkerson Max Flow: {}".format(max_flow))
#print("DSF")
#DFS(G)
#print("Dijkstra")
distances, predecessors = Dijkstra(G, G.get_vertex(1))
#for v, dis in distances.items():
# print("Distance to {}: {}".format(str(v), dis))
#print("Dijkstra")
distances, predecessors = Dijkstra(G, G.get_vertex(2))
#for v, dis in distances.items():
# print("Distance to {}: {}".format(str(v), dis))
X = Graph()
X.add_edge('x','a', data={capacity: 3.0, cost: 1, flow: 0})
X.add_edge('x','b', data={capacity: 1.0, cost: 1, flow: 0})
X.add_edge('a','c', data={capacity: 3.0, cost: 1, flow: 0})
X.add_edge('b','c', data={capacity: 5.0, cost: 1, flow: 0})
X.add_edge('b','d', data={capacity: 4.0, cost: 1, flow: 0})
X.add_edge('d','e', data={capacity: 2.0, cost: 1, flow: 0})
X.add_edge('c','y', data={capacity: 2.0, cost: 1, flow: 0})
X.add_edge('e','y', data={capacity: 3.0, cost: 1, flow: 0})
s, t = X.get_vertex('x'), X.get_vertex('y')
max_flow = FordFulkerson(X, s, t)
print("FordFulkerson Max Flow: {}".format(max_flow))
print("Negative Cycle in G? {}".format(BellmanFord(G, s=G.get_vertex(1))))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment