Skip to content

Instantly share code, notes, and snippets.

@bmander
Created June 21, 2018 21:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bmander/ffbbc8e1b0417d6944c04c4cca139038 to your computer and use it in GitHub Desktop.
Save bmander/ffbbc8e1b0417d6944c04c4cca139038 to your computer and use it in GitHub Desktop.
minimal graph and shortest path tree implementation
import heapq
import itertools
from sys import maxsize
INFINITY = maxsize
class PriorityQueue(object):
def __init__(self):
self.pq = [] # list of entries arranged in a heap
self.entry_finder = {} # mapping of items to entries
self.REMOVED = '<removed-item>' # placeholder for a removed item
self.counter = itertools.count() # unique sequence count
def add_with_priority(self, item, priority):
'Add a new item or update the priority of an existing item'
if item in self.entry_finder:
self.remove(item)
count = next(self.counter)
entry = [priority, count, item]
self.entry_finder[item] = entry
heapq.heappush(self.pq, entry)
def _remove(self, item):
'Mark an existing item as REMOVED. Raise KeyError if not found.'
entry = self.entry_finder.pop(item)
entry[-1] = self.REMOVED
def decrease_priority(self, item, priority):
#if queue doesn't already have item, decrease_priority acts like add_with_priority
try:
self._remove(item)
except KeyError, e:
pass
self.add_with_priority(item, priority)
def extract_min(self):
'Remove and return the lowest priority item. Return None if empty.'
while self.pq:
priority, count, item =heapq.heappop(self.pq)
if item is not self.REMOVED:
del self.entry_finder[item]
return item, priority
return None, None
class Graph(object):
def __init__(self):
self.edges = {}
self.vertices = set()
def addedge(self, fromid, toid, weight):
self.vertices.add( fromid )
self.vertices.add( toid )
if fromid not in self.edges:
self.edges[fromid] = {}
self.edges[fromid][toid] = weight
class Dijkstra(object):
def __init__(self, graph, source, queueclass=PriorityQueue):
self.dist = {}
self.prev = {}
self.Q = queueclass()
self.G = graph
self.dist[source] = 0
self.Q.add_with_priority(source, 0)
def step(self, verbose=False):
mm = self.Q.extract_min()
if mm is None:
return None, None
u, prio = mm
for v, length_u_v in self.G.edges.get(u,{}).items():
alt = self.dist.get(u,INFINITY) + length_u_v
if alt < self.dist.get(v,INFINITY):
self.dist[v] = alt
self.prev[v] = u
self.Q.decrease_priority(v, alt)
if verbose:
print "%s -> %s (%s)"%(u, v, alt)
return u, prio
def run(self, maxweight=None):
while True:
u, prio = self.step()
if u is None:
break
if maxweight is not None and prio>=maxweight:
break
def path(self, dest_vid):
if dest_vid not in self.prev:
return []
ret = []
curs = dest_vid
while curs is not None:
ret.append( curs )
curs = self.prev.get(curs)
ret.reverse()
return ret
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment