Skip to content

Instantly share code, notes, and snippets.

@CyberAstronaut101
Created June 21, 2023 20:07
Show Gist options
  • Save CyberAstronaut101/7ad6d99e9638fe632c43e212b4ee588d to your computer and use it in GitHub Desktop.
Save CyberAstronaut101/7ad6d99e9638fe632c43e212b4ee588d to your computer and use it in GitHub Desktop.
osage house
class Node:
def __init__(self, name, capacity, flow=0, is_arrow_head=False):
self.name = name
self.capacity = capacity
self.flow = flow
self.is_arrow_head = is_arrow_head
class Edge:
def __init__(self, start_node, end_node, capacity, is_bidirectional=False):
self.start_node = start_node
self.end_node = end_node
self.capacity = capacity
self.is_bidirectional = is_bidirectional
class Osage:
def __init__(self, nodes={}, edges={}):
self.nodes = {}
self.edges = {}
def print_state(self):
print('Nodes:')
print(self.nodes)
print('Edges:')
print(self.edges)
def add_node(self, node):
self.nodes[node.name] = node
def add_edge(self, start_node_name, end_node_name, capacity, is_bidirectional=False):
if start_node_name not in self.nodes or end_node_name not in self.nodes:
raise ValueError("Both nodes need to exist in graph!")
edge = Edge(self.nodes[start_node_name],
self.nodes[end_node_name], capacity, is_bidirectional)
self.edges[(start_node_name, end_node_name)] = edge
if is_bidirectional:
rev_edge = Edge(
self.nodes[end_node_name], self.nodes[start_node_name], capacity, is_bidirectional)
self.edges[(end_node_name, start_node_name)] = rev_edge
def bfs(self, source, sink, parent):
visited = {node: False for node in self.nodes}
queue = [source]
visited[source] = True
while queue:
current_node = queue.pop(0)
for (start, end), edge in self.edges.items():
if visited[end] is False and edge.capacity > 0 and start == current_node:
queue.append(end)
visited[end] = True
parent[end] = current_node
if end == sink:
return True
return False
def ford_fulkerson(self, source, sink):
parent = {node: -1 for node in self.nodes}
max_flow = 0
while self.bfs(source, sink, parent):
path_flow = float("Inf")
s = sink
while s != source:
path_flow = min(path_flow, self.edges[(parent[s], s)].capacity)
s = parent[s]
max_flow += path_flow
v = sink
while v != source:
u = parent[v]
self.edges[(u, v)].capacity -= path_flow
if self.edges[(u, v)].is_bidirectional:
self.edges[(v, u)].capacity += path_flow
v = parent[v]
return max_flow
def check_arrow_heads(self):
"""
Check if any of the sink nodes, or generators, are constricted
constricted == requested flow is more than their capacity
"""
constricted_arrow_heads = []
for node in self.nodes.values():
if node.is_arrow_head and node.flow > node.capacity:
constricted_arrow_heads.append(node.name)
return constricted_arrow_heads
# # Define nodes
# nodes = [Node('A', 100, 40, False), Node(
# 'B', 60, 40, False), Node('C', 20, 40, False)]
# # Define edges
# edges = [
# Edge(nodes[0], nodes[1], 16),
# Edge(nodes[0], nodes[2], 13),
# Edge(nodes[1], nodes[2], 10)
# ]
# # Create network
# network = Osage(nodes, edges)
# network.print_state()
# # Define source and sink
# source = nodes[0]
# sink = nodes[2]
# # Compute max flow
# max_flow = network.ford_fulkerson(source, sink)
# print('Max flow:', max_flow)
# Create Osage object
osage = Osage()
# Add nodes
osage.add_node(Node('A', 20))
osage.add_node(Node('B', 15))
osage.add_node(Node('C', 10))
osage.add_node(Node('D', 5, is_arrow_head=True))
# Add edges
osage.add_edge('A', 'B', 10)
osage.add_edge('B', 'C', 5)
osage.add_edge('C', 'D', 10)
osage.add_edge('A', 'D', 5)
# Run Ford-Fulkerson algorithm
max_flow = osage.ford_fulkerson('A', 'D')
print(f"Maximum Flow: {max_flow}")
# Check for constricted arrow heads
constricted_arrow_heads = osage.check_arrow_heads()
if constricted_arrow_heads:
print("Constricted Arrow Heads:")
for node in constricted_arrow_heads:
print(node)
else:
print("No constricted Arrow Heads")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment