Skip to content

Instantly share code, notes, and snippets.

@gsdatta
Created August 26, 2015 23:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gsdatta/bac4a233e4cc4888df6f to your computer and use it in GitHub Desktop.
Save gsdatta/bac4a233e4cc4888df6f to your computer and use it in GitHub Desktop.
import heapq
import random
def return_item(entry, return_priority=False):
priority, item = entry
if return_priority:
return priority, item
else:
return item
class PriorityQueue(object):
"""
A generic priority queue implementation using heapq.
Provides push, pop, peek, remove, and a random access functionality.
TODO: Error checking for finding the item in the dictionary, popping from empty queue, etc.
"""
def __init__(self):
self.heap = []
self.entry_finder = {}
def push(self, item, priority=0):
"""
Pushes an item onto the priority queue with a default priority of 0. If
the item already exists, it simply updates the priority.
"""
if item in self.entry_finder:
self.remove(item)
entry = (priority, item)
self.entry_finder[item] = entry
heapq.heappush(self.heap, entry)
def remove(self, item):
""" Remove and return the corresponding item from the PQ. """
entry = self.entry_finder.pop(item)
self.heap.remove(entry)
return entry[1]
def pop(self, return_priority=False):
priority, item = heapq.heappop(self.heap)
self.entry_finder.pop(item)
return return_item((priority, item), return_priority)
def random(self, return_priority=False):
""" Returns a random item from the priority queue (does not pop it) """
index = random.randrange(len(self.heap))
priority, item = self.heap[index]
return return_item((priority, item), return_priority)
def empty(self):
return len(self.heap) == 0
def peek(self, return_priority=False):
priority, item = self.heap[0]
return return_item((priority, item), return_priority)
def __len__(self):
return len(self.heap)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment