Skip to content

Instantly share code, notes, and snippets.

@xvzf
Created July 18, 2018 21:25
Show Gist options
  • Save xvzf/2e33c7f21213fcbf31778535bcb563e8 to your computer and use it in GitHub Desktop.
Save xvzf/2e33c7f21213fcbf31778535bcb563e8 to your computer and use it in GitHub Desktop.
Prim
from typing import Dict, Tuple
from pprint import pformat
# Example graph
graph_slides = {
0: {1: 8, 2: 2},
1: {0: 8, 2: 7, 3: 9, 4: 5},
2: {0: 2, 1: 7, 4: 6},
3: {1: 9, 4: 3, 5: 1},
4: {2: 6, 1: 5, 3: 3, 5: 4},
5: {3: 1, 4: 4}
}
graph_slides_2 = {
0: {1: 1, 2: 7, 4: 2},
1: {0: 1, 3: 8, 4: 2, 5: 4},
2: {0: 7, 4: 3},
3: {1: 8, 5: 3},
4: {0: 2, 1: 2, 2: 3, 5: 9},
5: {1: 4, 3: 3, 4: 9}
}
test_graph_1 = {
0: {1: 4, 3: 8},
1: {0: 4, 2: 5, 3: 7},
2: {1: 5, 3: 6},
3: {0: 8, 2: 6}
}
class PriorityQueue():
""" Worst possible implementation of a priority queue - get's the job done
for now
"""
def __init__(self):
self._queue = []
def enqueue(self, node: int, value: int):
""" Adds to the queue
:param node: Node
:param value: Weight
"""
self._queue.append((node, value))
self._sort()
def _sort(self):
""" Sorts """
self._queue.sort(key=lambda a: a[1])
def contains(self, node: int) -> bool:
""" Checks if a node is still in the queue
:param node: Node
:returns: Node is in queue or not
"""
for i in range(len(self._queue)):
if self._queue[i][0] == node:
return True
return False
def pop(self) -> Tuple[int, int]:
""" removes the first element
:returns: (node, value)
"""
return self._queue.pop(0)
def decrease_key(self, node: int, value: int) -> bool:
""" Changes a value if it is smaller than the existing one
:param node: Node
:param value: New value
:returns: Key changed
"""
for i in range(len(self._queue)):
if self._queue[i][0] == node:
if self._queue[i][1] > value:
self._queue[i] = (node, value)
self._sort()
return True
break
return False
@property
def empty(self) -> bool:
""" Checks if the queue is empty """
return len(self._queue) == 0
class Prim(object):
""" Prim algorithm as described in Informatik 2 """
def __init__(self, s: int, graph: Dict[int, Dict[int, int]]):
""" Init """
self.graph = graph
self.queue = PriorityQueue()
# Initialize
self.t = []
self.dis = {i: float("inf") for i in graph.keys()}
self.pre = {i: -1 for i in graph.keys()}
# Initialize Priority Queue
for node, _ in self.graph.items():
self.queue.enqueue(node, float("inf"))
self.queue.decrease_key(s, 0)
def compute(self):
""" Prim algorithm """
while not self.queue.empty:
# Get node with smallest path
u, w_prev = self.queue.pop()
# Add to t
if self.pre[u] != -1:
self.t.append((self.pre[u], u))
# print(u)
# Iter over all path
for v, w in self.graph[u].items():
if self.queue.contains(v):
if self.queue.decrease_key(v, w):
self.pre[v] = u
if __name__ == "__main__":
p = Prim(0, graph_slides)
p.compute()
print(f"T: {pformat(p.t)}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment